amqp snippets

RabbitMQ consumer and publisher (Bunny/Ruby)

Tagged amqp, bunny, rabbitmq, ruby  Languages bash, ruby

Create consumer.rb:

require 'bunny'
bunny = Bunny.new(ENV.fetch('AMQP_URL'))
bunny.start
at_exit { bunny.stop }
channel = bunny.create_channel
channel.prefetch(1)
exchange = channel.topic(ENV.fetch('EXCHANGE'), durable: true, passive: true)
queue = channel.queue(ENV.fetch('QUEUE'), durable: true, passive: true)
queue.bind(exchange)
queue.subscribe(manual_ack: true, block: true) do |delivery_info, _metadata, payload|
  puts "==============="
  puts payload
  puts "==============="
  channel.acknowledge(delivery_info.delivery_tag, false)
end

Start consumer:

$ EXCHANGE=cnn QUEUE=news AMQP_URL=amqp://username:[email protected]:5672 ruby consumer.rb

Create publisher.rb:

require "bunny"

class Publisher
  def self.initialize(amqp_url, exchange:)
    @@bunny = Bunny.new(amqp_url)
    @@bunny.start
    @@channel = @@bunny.create_channel
    # 'passive = true' means exchange already exists
    @@exchange = @@bunny.channel.topic(exchange, passive: true)
    at_exit { @@bunny.stop }
  end

  def self.send(message, queue:)
    @@exchange.publish(message, routing_key: queue)
  end
end

queue = ENV.fetch('QUEUE')
Publisher.initialize(ENV.fetch('AMQP_URL'), exchange: ENV.fetch('EXCHANGE'))
Publisher.send('Ruby is the best programming language', queue: queue)
Publisher.send('Top 10 silver-bullet solutions in Java', queue: queue)

Start publisher.rb:

$ EXCHANGE=cnn QUEUE=news AMQP_URL=amqp://username:[email protected]:5672 ruby publisher.rb

RabbitMQ Ruby client example (AMQP)

Tagged amqp, bunny, rabbitmq, ruby, thread-safe  Languages ruby
require 'bunny'

# See http://rubybunny.info/articles/concurrency.html
class AMQP
  # Bunny::Session
  def self.session
    Thread.current[:bunny_session] ||= Bunny.new(uri).start
  end
  class << self
    alias connect session
  end

  # Bunny::Channel
  def self.channel
    Thread.current[:bunny_channel] ||= session.create_channel
  end

  # Bunny::Exchange
  def self.exchange
    channel.topic(EXCHANGE, auto_delete: false, durable: true)
  end

  def self.publish(message, queue:)
    exchange.publish(message, routing_key: queue)
  end

  def self.disconnect
    session&.close
  end

  # amqp://user:[email protected]:5672
  def self.uri
    ENV.fetch('AMQP_URI')
  end
end
  • All Bunny methods are not thread safe, e.g., channel
  • Call AMQP.connect/disconnect in Puma/Unicorn/Sneakers/Sidekiq’s after/before_fork callbacks.

Exactly-once delivery with RabbitMQ

Tagged amqp, exactly-once, messaging, rabbitmq, reconciliation  Languages 

Use late acknowledgment and idempotency to fake exactly-once delivery with RabbitMQ. Remember, there’s no exactly-once delivery even if your messaging software provider claims so.

Late acknowledgment = acknowledge the message after the message has been processed and the database transaction has been committed.

Idempotency = ensure the effect is the same when processing the same message multiple times. In other words, don’t process the same message twice.

Reconciliation step = if you need 100% reliability create a process (manual or automatic) that checks that all work is done, if not resend the message.

The customer is the last line of defense and they will let you know if one of the above steps have failed.