amqp snippets

RabbitMQ consumer and publisher (Bunny/Ruby)

Tagged amqp, bunny, rabbitmq, ruby  Languages bash, ruby

Create consumer.rb:

require 'bunny'
bunny = Bunny.new(ENV.fetch('AMQP_URL'))
bunny.start
at_exit { bunny.stop }
channel = bunny.create_channel
channel.prefetch(1)
exchange = channel.topic(ENV.fetch('EXCHANGE'), durable: true, passive: true)
queue = channel.queue(ENV.fetch('QUEUE'), durable: true, passive: true)
queue.bind(exchange)
queue.subscribe(manual_ack: true, block: true) do |delivery_info, _metadata, payload|
  puts "==============="
  puts payload
  puts "==============="
  channel.acknowledge(delivery_info.delivery_tag, false)
end

Start consumer:

$ EXCHANGE=cnn QUEUE=news AMQP_URL=amqp://username:[email protected]:5672 ruby consumer.rb

Create publisher.rb:

require "bunny"

class Publisher
  def self.initialize(amqp_url, exchange:)
    @@bunny = Bunny.new(amqp_url)
    @@bunny.start
    @@channel = @@bunny.create_channel
    # 'passive = true' means exchange already exists
    @@exchange = @@bunny.channel.topic(exchange, passive: true)
    at_exit { @@bunny.stop }
  end

  def self.send(message, queue:)
    @@exchange.publish(message, routing_key: queue)
  end
end

queue = ENV.fetch('QUEUE')
Publisher.initialize(ENV.fetch('AMQP_URL'), exchange: ENV.fetch('EXCHANGE'))
Publisher.send('Ruby is the best programming language', queue: queue)
Publisher.send('Top 10 silver-bullet solutions in Java', queue: queue)

Start publisher.rb:

$ EXCHANGE=cnn QUEUE=news AMQP_URL=amqp://username:[email protected]:5672 ruby publisher.rb

RabbitMQ Ruby client example (AMQP)

Tagged amqp, bunny, rabbitmq, ruby, thread-safe  Languages ruby
require 'bunny'

# See http://rubybunny.info/articles/concurrency.html
class AMQP
  # Bunny::Session
  def self.session
    Thread.current[:bunny_session] ||= Bunny.new(uri).start
  end
  class << self
    alias connect session
  end

  # Bunny::Channel
  def self.channel
    Thread.current[:bunny_channel] ||= session.create_channel
  end

  # Bunny::Exchange
  def self.exchange
    channel.topic(EXCHANGE, auto_delete: false, durable: true)
  end

  def self.publish(message, queue:)
    exchange.publish(message, routing_key: queue)
  end

  def self.disconnect
    session&.close
  end

  # amqp://user:[email protected]:5672
  def self.uri
    ENV.fetch('AMQP_URI')
  end
end
  • All Bunny methods are not thread safe, e.g., channel
  • Call AMQP.connect/disconnect in Puma/Unicorn/Sneakers/Sidekiq’s after/before_fork callbacks.