SQLAlchemy's yield_for with raw SQL and models
from sqlalchemy import text
session = ...
q = session.query(Report).from_statement(text("""
SELECT
id
FROM
reports
WHERE
id = '51812'
ORDER BY
id DESC;
""")).yield_per(100)
from sqlalchemy import text
session = ...
q = session.query(Report).from_statement(text("""
SELECT
id
FROM
reports
WHERE
id = '51812'
ORDER BY
id DESC;
""")).yield_per(100)
How to parse XML with Python’s etree when XML has namespaces, even a default namespace:
from lxml import etree
doc = etree.XML(bytes(bytearray(xml, encoding='utf-8')))
ns = {}
for k in doc.nsmap.keys():
ns[k] = doc.nsmap[k]
doc.find('.//tag', ns)
doc.findtext('./periodOfReport', namespaces=ns)
LOL, someone needs to clean up the API.
To give your cron script access to the same environment variables as the Docker container, you can read and export the environment variables for the PID 1 process in your script:
crontab -l
* * * * * /app/run.sh jobs.xxx
/app/run.sh
#!/usr/bin/env bash
# Read the environment variables for the main process (PID 1) running in the Docker container:
export $(xargs -0 -a "/proc/1/environ")
python3 -m $1
from json import JSONEncoder
class MyEncoder(JSONEncoder):
def default(self, o):
if isinstance(obj, XY):
return { 'x': obj.x, 'y': obj.y }
return o.__dict__
def to_json2(value):
return MyEncoder(indent=2).encode(value)
def url_join(*parts):
# This does not work with many parts:
# import urllib.parse
# urllib.parse.urljoin(*parts)
return os.path.join(*parts)
The goal is to use fail2ban to block invalid HTTP requests sent to nginx from Iran:
"45.132.170.81 - - [04/Jan/2021:02:46:12 +0000] "\x10\xD2\xE9\xA68\x8A\x98\xB3\x00\xB9mO\xD7\xC9\xAA]"
The request is a HEX encoded string instead of the normal “GET /“.
First, configure the jail:
sudo cp /etc/fail2ban/jail.conf /etc/fail2ban/jail.local
Add the following at the end of the file:
[nginx-idiots]
enabled = true
port = http,https
filter = nginx-idiots
logpath = /var/log/nginx/access.log
# NOTE: Docker will override fail2ban's rules, so good luck with iptables:
# logpath = /var/lib/docker/containers/*/*.log
bantime = 86400
findtime = 86400
maxretry = 2
Add a filter to block the idiots:
sudo vim /etc/fail2ban/filter.d/nginx-idiots.conf
[Definition]
failregex = ^.* <HOST> .*".*\\x.*".*$
ignoreregex =
Test the regex:
sudo fail2ban-regex -v --print-all-missed /var/log/nginx/access.log /etc/fail2ban/filter.d/nginx-idiots.conf
Restart fail2ban.
View banned IPs with:
sudo fail2ban-client status nginx-idiots
Note that this could also be a real idiot that is sending SSL traffic to the non-SSL port, or someone who has configured SSL traffic to be sent to the non-SSL port.
config/application.rb:
require_relative 'boot'
require 'rails/all'
# Require the gems listed in Gemfile, including any gems
# you've limited to :test, :development, or :production.
Bundler.require(*Rails.groups)
module YourApp
class Application < Rails::Application
# Initialize configuration defaults for originally generated Rails version.
config.load_defaults 6.0
# Set to :all to dump all schemas
config.active_record.dump_schemas = :all
# Don't dump in Docker
config.active_record.dump_schema_after_migration = !File.exist?('/.dockerenv')
# Settings in config/environments/* take precedence over those specified here.
# Application configuration can go into files in config/initializers
# -- all .rb files in that directory are automatically loaded after loading
# the framework and any gems in your application.
config.active_record.schema_format = :sql
end
end
If all else fails, override the rake task in Rakefile:
Rake::Task["db:schema:dump"].clear
Rake::Task["db:structure:dump"].clear
namespace :db do
namespace :schema do
task :dump do
Kernel.system "pg_dump -s -x -O -f db/structure.sql --schema=xxx xxx_development && sed do_xyz_with_dump"
end
end
end
A tool for exploring each layer in a docker image: https://github.com/wagoodman/dive
This works on Linux if “/dev/tcp” is not disabled:
$ cat < /dev/tcp/google.com/80
If the connection is opened successfully the command will wait for input, i.e., nothing will happen. If the port is closed you will get an error.
See:
https://news.ycombinator.com/item?id=3609061
http://tmartiro.blogspot.com/2012/02/udptcp-connection-using-pure-bash.html
advisory_lock.py
from models import Session
from sqlalchemy import func, select
def execute(session, lock_fn, lock_id, scope):
"""
Executes the lock function
"""
return session.execute(select([lock_fn(lock_id, scope)])).scalar()
def obtain_lock(session, lock_id, scope):
"""
Obtains the advisory lock
"""
lock_fn = func.pg_try_advisory_lock
return execute(session, lock_fn, lock_id, scope)
def release_lock(session, lock_id, scope):
"""
Releases the advisory lock
"""
lock_fn = func.pg_advisory_unlock
return execute(session, lock_fn, lock_id, scope)
def with_lock(my_func, lock_id, scope=1):
"""
Executes my_func if the lock can be obtained.
"""
session = Session()
obtained_lock = False
try:
obtained_lock = obtain_lock(session, lock_id, scope)
if obtained_lock:
my_func()
finally:
if obtained_lock:
release_lock(session, lock_id, scope)
Usage:
from advisory_lock import with_lock
def run():
print("It runs")
if __name__ == '__main__':
with_lock(run, 300000)