jam-cloud/ruby/lib/jam_ruby/amqp/amqp_connection_manager.rb

101 lines
3.1 KiB
Ruby

module JamRuby
# The purpose of this class is to handle reconnect logic and 'recover' logic (which means automatically resubscribe to topics/queues).
# It's 'leaky' in that it will give you a AMQP::Channel to do these subscriptions yourself in the block you pass to connect.
# Use the *connected* property to check if the connection is currently up.
class AmqpConnectionManager
attr_accessor :should_reconnect, :reconnect_interval, :connection, :connect, :connect_options, :connect_block, :channel,
:connected
def initialize(should_reconnect, reconnect_interval, connect_options = {})
@should_reconnect = should_reconnect
@reconnect_interval = reconnect_interval
@connect_options = connect_options
@connected = false
@log = Logging.logger[self]
end
# the block you pass in will be passed a channel upon successful connect. You need
#
def connect(&block)
@connect = true # indicate that we should be connected
@connect_block = block
try_connect
end
def try_connect
@connection = AMQP.connect(@connect_options, &method(:successful_connect))
@connection.on_tcp_connection_failure(&method(:on_tcp_connection_failure))
@connection.on_tcp_connection_loss(&method(:on_tcp_connection_loss))
@connection.on_recovery(&method(:on_recovery))
@connection.on_error(&method(:on_error))
end
def successful_connect(connection)
@log.debug "connected to #{@connect_options}"
@connected = true
@channel = AMQP::Channel.new(connection)
@channel.auto_recovery = true
unless @connect_block.nil?
@connect_block.call(@channel)
end
end
def on_tcp_connection_failure(settings)
@connected = false
if @connect && @should_reconnect
@log.warn "[network failure] Trying to connect in 4 seconds to #{@connect_options}"
EventMachine.add_timer(@reconnect_interval, &method(:try_connect))
end
end
def on_tcp_connection_loss(conn, settings)
@connected = false
if @connect && @should_reconnect
@log.warn "[network failure] Trying to reconnect..."
conn.reconnect(false, @reconnect_interval)
end
end
def on_recovery(conn, settings)
@connected = true
@log.debug "reconnected #{conn} #{settings}"
#puts "#channel before #{@channel}"
#puts "recovered channel: #{@channel.reuse}"
end
def disconnect
@connect = false # indicate that we should no longer be connected
unless @connection.nil?
if @connection.connected?
@connection.disconnect do
@connected = false
@log.debug "disconnected"
end
end
end
end
def on_error(connection, connection_close)
@log.error "Handling a connection-level exception."
@log.error "AMQP class id : #{connection_close.class_id}"
@log.error "AMQP method id: #{connection_close.method_id}"
@log.error "Status code : #{connection_close.reply_code}"
@log.error "Error message : #{connection_close.reply_text}"
end
def connected?
return @connected
end
end
end