diff --git a/lib/jam_websockets/client_context.rb b/lib/jam_websockets/client_context.rb index 46cb87a92..316057c56 100644 --- a/lib/jam_websockets/client_context.rb +++ b/lib/jam_websockets/client_context.rb @@ -1,13 +1,14 @@ module JamWebsockets class ClientContext - attr_accessor :user, :client, :msg_count, :session + attr_accessor :user, :client, :msg_count, :session, :sent_bad_state_previously def initialize(user, client) @user = user @client = client @msg_count = 0 @session = nil + @sent_bad_state_previously = false end def to_s diff --git a/lib/jam_websockets/router.rb b/lib/jam_websockets/router.rb index 5f3e07913..209baff4b 100644 --- a/lib/jam_websockets/router.rb +++ b/lib/jam_websockets/router.rb @@ -27,8 +27,7 @@ module JamWebsockets @clients = {} # clients that have logged in @user_context_lookup = {} # lookup a set of client_contexts by user_id @client_lookup = {} # lookup a client by client_id - @connection = nil - @channel = nil + @amqp_connection_manager = nil @users_exchange = nil @message_factory = JamRuby::MessageFactory.new @semaphore = Mutex.new @@ -46,11 +45,13 @@ module JamWebsockets @heartbeat_interval = connect_time_stale / 2 begin - @connection = AMQP.connect(:host => options[:host], :port => options[:port]) - @channel = AMQP::Channel.new(@connection) - #@channel.prefetch = 10 - register_topics + @amqp_connection_manager = AmqpConnectionManager.new(true, 4, :host => options[:host], :port => options[:port]) + @amqp_connection_manager.connect do |channel| + register_topics(channel) + end + rescue => e + @log.error "unable to initialize #{e.to_s}" cleanup raise e end @@ -110,14 +111,14 @@ module JamWebsockets end # register topic for user messages and session messages - def register_topics + def register_topics(channel) ######################## USER MESSAGING ########################### # create user exchange - @users_exchange = @channel.topic('users') + @users_exchange = channel.topic('users') # create user messaging topic - @user_topic = @channel.queue("", :auto_delete => true) + @user_topic = channel.queue("", :auto_delete => true) @user_topic.bind(@users_exchange, :routing_key => "user.#") @user_topic.purge @@ -155,9 +156,9 @@ module JamWebsockets ############## CLIENT MESSAGING ################### - @clients_exchange = @channel.topic('clients') + @clients_exchange = channel.topic('clients') - @client_topic = @channel.queue("", :auto_delete => true) + @client_topic = channel.queue("", :auto_delete => true) @client_topic.bind(@clients_exchange, :routing_key => "client.#") @client_topic.purge @@ -294,12 +295,8 @@ module JamWebsockets def cleanup() # shutdown topic listeners and mq connection - if !@channel.nil? - @channel.close - end - - if !@connection.nil? - @connection.close + unless @amqp_connection_manager.nil? + @amqp_connection_manager.disconnect end # tear down each individual client @@ -423,7 +420,7 @@ module JamWebsockets elsif client_msg.type == ClientMessage::Type::HEARTBEAT - handle_heartbeat(client_msg.heartbeat, client) + handle_heartbeat(client_msg.heartbeat, client_msg.message_id, client) else raise SessionError, "unknown message type '#{client_msg.type}' for #{client_msg.route_to}-directed message" @@ -516,7 +513,7 @@ module JamWebsockets end end - def handle_heartbeat(heartbeat, client) + def handle_heartbeat(heartbeat, heartbeat_message_id, client) unless context = @clients[client] @log.warn "*** WARNING: unable to find context due to heartbeat from client: #{client.client_id}; calling cleanup" cleanup_client(client) @@ -530,6 +527,18 @@ module JamWebsockets connection_manager.reconnect(connection) end if connection.stale? end + + # send errors to clients in response to heartbeats if + if !@amqp_connection_manager.connected? + error_msg = @message_factory.server_bad_state_error(heartbeat_message_id, "messaging system down") + context.sent_bad_state_previously = true + send_to_client(client, error_msg) + return + elsif context.sent_bad_state_previously + context.sent_bad_state_previously = false + recovery_msg = @message_factory.server_bad_state_recovered(heartbeat_message_id) + send_to_client(client, recovery_msg) + end end end diff --git a/spec/jam_websockets/router_spec.rb b/spec/jam_websockets/router_spec.rb index 5fc26c8d7..070fbbeaf 100644 --- a/spec/jam_websockets/router_spec.rb +++ b/spec/jam_websockets/router_spec.rb @@ -88,13 +88,12 @@ describe Router do em_before do @router = Router.new() - @router.start(30) end subject { @router } em_after do - @router.stop + end