EXPLAIN ANALYZE with ActiveRecord

Tagged analyze, explain  Languages ruby
module ActiveRecord
  module ConnectionAdapters
    module PostgreSQL
      module DatabaseStatements
        def explain_analyze(arel, binds = [])
          sql = "EXPLAIN ANALYZE #{to_sql(arel, binds)}"
          PostgreSQL::ExplainPrettyPrinter.new.pp(exec_query(sql, "EXPLAIN", binds))
        end
      end
    end
  end
end

puts Job.where("slow = true").explain_analyze
...

See: https://github.com/rails/rails/blob/b8787b92e7c94b8575118b5f259dd47b5de4772f/activerecord/lib/active_record/connection_adapters/postgresql/database_statements.rb#L7-L10

Converting a string to a number with CRC

Tagged conversion, crc, number, string  Languages ruby

CRC can be used to convert a string into a number:

require 'zlib'
integer = Zlib.crc32('snippet')
=> 2518453461
Zlib.crc32('cat')
=> 2656977832

The CRC algorithm is normally used to detect changes or errors in large chunks of data.

Waiting for Socket (IO) to be readable / writable in Ruby

Tagged socket, io-wait, readable, ruby, writable  Languages ruby

Option 1: Use IO.select

Use IO.select with read_nonblock, write_nonblock, and connect_nonblock.

Option 2: Use io-wait

require 'io/wait'
r, w = IO.pipe
w.wait_writable(0.1)

Example from https://bugs.ruby-lang.org/issues/12013?tab=history:

# 30 second wait
IO.select([mysock],[mysock], nil, 30)
# as opposed to (60 second wait)
require 'io/wait'
mysock.wait_readable(30) && mysock.wait_writable(30)

How to simulate TCP read, write, and connect timeouts

Tagged write, connect, read, tcp, timeout  Languages 

How to simulate TCP read, write, and connect timeouts:

  • Connect timeout

Option 1: Drop all SYN packets with firewall or iptables rules

Option 2: Try connecting to a non-routable IP, e.g., 10.0.0.0

  • Read timeout

Read from a socket to which the client or server is not writing while keeping the socket open on both ends.

  • Write timeout

You’re (almost) out of luck. There are OS-level buffers, so the write timeout might never happen.

Option 1: Use https://github.com/openresty/mockeagain and LD_PRELOAD to mock syscalls

Reference:

https://github.com/openresty/programming-openresty/blob/master/testing/testing-erroneous-cases.adoc

https://github.com/openresty/mockeagain

Advisory locks in Linux

Tagged lock, advisory, flock  Languages bash

Use Linux advisory locks to prevent concurrent execution of scripts:

flock -nx /var/lock/scriptx scriptx.sh

REST API thead-safety feature

Tagged request id, thread-safety, safe, rest  Languages 

REST API safety feature:

  1. Include a request ID in the request and response.
  2. Check that the response contains the request ID.

Benefits:

  • You will be able to notice any concurrency issues in your own code and 3rd party code, which might cause the wrong response to be returned
  • Easier to debug requests and responses

Ruby TCP socket with read, write, and connect timeout

Tagged write, connect, read, ruby, socket, tcp, timeout  Languages ruby

See Ruby’s Net::Protocol implementation for an example on how to implement connect, read, and write timeouts:

Or, write your own custom TCP socket implementation that supports read, write, and connect timeouts (warning, not tested):

require 'socket'
require 'timeout'

class TCPSocketWithTimeout
  class Error < StandardError; end
  class Timeout < StandardError; end
  class ReadTimeout < Timeout; end
  class WriteTimeout < Timeout; end
  attr_reader :host, :port, :tls, :read_timeout, :write_timeout, :connect_timeout, :socket

  def initialize(host:, port:, tls: false, connect_timeout: 5, read_timeout: 5, write_timeout: 5)
    @tls = tls
    @host = host
    @port = port
    @write_timeout = write_timeout
    @read_timeout = read_timeout
    @connect_timeout = connect_timeout
  end
  
  def ssl_context
    ctx = OpenSSL::SSL::SSLContext.new
    ctx.ssl_version = :TLSv1_2
    ctx.ca_file = ca_file if ca_file
    ctx.verify_mode = OpenSSL::SSL::VERIFY_PEER
    ctx
  end

  # NOTE: Upgrading Ruby 2.7 might change the Ruby's socket API
  def init_socket
    if tls
      sock = OpenSSL::SSL::SSLSocket.new(
        TCPSocket.open(host, port), # opens connection to server
        ssl_context
      )
      # Close both socket and encrypted layer
      sock.sync_close = true
    else
      sock = Socket.new(:INET, :STREAM, 0)
    end
    sock.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
    sock
  end

  def connect(timeout: CONNECT_TIMEOUT)
    @socket = init_socket
    deadline = Time.now.utc + timeout
    non_blocking(socket, deadline) do
      # NOTE: different method arity for non-SSL
      if tls
        socket.connect_nonblock
      else
        socket_address = Socket.pack_sockaddr_in(port, host)
        socket.connect_nonblock(socket_address)
      end
    end
  rescue Errno::EISCONN
    # Connection established
  rescue Timeout, Errno::ETIMEDOUT => e
    raise ConnectTimeout, "Connection timeout after #{timeout} seconds trying to connect to '#{host}:#{port}': #{e.class}: #{e.message}"
  rescue SystemCallError, IOError => e
    raise Error, "Connection failure while connecting to '#{host}:#{port}': #{e.class}: #{e.message}"
  end

  def read(timeout: READ_TIMEOUT, length: 1024)
    deadline = Time.now.utc + timeout
    non_blocking(socket, deadline) do
      socket.read_nonblock(length)
    end
  rescue Timeout
    raise ReadTimeout, "Timeout after #{timeout}s while reading data from #{host}:#{port}"
  rescue SystemCallError, IOError => e
    raise Error, "Connection error while reading data from #{host}:#{port} #{e.class}: #{e.message}"
  end

  def write(data, timeout: WRITE_TIMEOUT)
    deadline = Time.now.utc + timeout
    length = data.bytesize
    total_count = 0
    non_blocking(socket, deadline) do
      loop do
        count = socket.write_nonblock(data)
        total_count += count
        return total_count if total_count >= length

        data = data.byteslice(count..-1)
      end
    end
  rescue Timeout
    raise WriteTimeout, "Timeout after #{timeout}s while writing data to #{host}:#{port}"
  rescue SystemCallError, IOError => e
    raise Error, "Connection error while writing data to #{host}:#{port} #{e.class}: #{e.message}"
  end

  def disconnect
    socket&.close
  end

  def non_blocking(socket, deadline)
    raise Error, "Socket #{host}:#{port} is closed" if closed?

    yield
  rescue IO::WaitReadable => e
    time_remaining = calculate_remaining_time(deadline)
    raise Timeout, e unless IO.select([socket], nil, nil, time_remaining)

    retry
  rescue IO::WaitWritable => e
    time_remaining = calculate_remaining_time(deadline)
    raise Timeout, e unless IO.select(nil, [socket], nil, time_remaining)

    retry
  end

  def calculate_remaining_time(deadline)
    time_remaining = deadline - Time.now.utc
    raise Timeout if time_remaining.negative?

    time_remaining
  end
end

sock = TCPSocketWithTimeout.new(host: 'localhost', port: 8888)
sock.write "HELLO"
puts "Writing done"

Reference:

https://ruby-doc.org/core-2.6.2/IO.html#method-c-select https://workingwithruby.com/wwtcps/nonblocking/

A basic TCP server that can be used to test the client:

require 'socket'

server = TCPServer.open(8888)

while client = server.accept
  puts "Accepted"
  puts "Received #{client.read}"
  puts "Wrote #{client.write('Hello back')}"
  client.close
end

Notes

Use non-blocking methods and IO.select to implement timeouts using non-blocking methods.

Use TCPSocket with TLS connections.

Use Socket with SSL connections.

TCPSocket in Ruby version 3 and greater includes a connect_timeout parameter in the constructor: https://ruby-doc.org/stdlib-3.0.0/libdoc/socket/rdoc/TCPSocket.html

Ruby’s socket API is a work in progress…

References

Socket connection timeout in Ruby: https://spin.atomicobject.com/2013/09/30/socket-connection-timeout-ruby/

Working with TCP sockets: https://workingwithruby.com/downloads/Working%20With%20TCP%20Sockets.pdf

Ruby’s socket API and related documentation is not that great, so you might need to read the source: https://github.com/ruby/ruby/blob/v2_7_6/ext/socket/tcpsocket.c https://github.com/ruby/ruby/blob/v2_7_6/ext/socket/socket.c https://github.com/ruby/ruby/blob/v2_7_6/test/openssl/test_ssl.rb

Celery tips

Tagged celery, tips  Languages 
  1. Set a global task timeout

The default is no timeout.

CELERYD_TASK_SOFT_TIME_LIMIT = 30 CELERYD_TASK_TIME_LIMIT = 60

  1. Use autoretry_for

By default, no exceptions will be retried.

  1. Set max_retries at task or global level

The default is 3 retries. None means unlimited retries.

  1. Use queues

Otherwise, low priority tasks might prevent higher priority tasks from being executed.

@app.task(base=SqlAlchemyTask, queue=’medium’)

  1. Use Flower for monitoring

https://docs.celeryproject.org/en/latest/userguide/monitoring.html#flower-real-time-celery-web-monitor

  1. Separate slow tasks from fast tasks

Slow tasks will monopolizes workers, even if you have separate queues.

  1. Use late_ack=True

Setting late_ack to True means we acknowledge the message after the task has run, not when received. This increases reliability.

  1. Make tasks idempotent

  2. There’s no exactly-once

Exactly-once is a lie.

Reference: