* VRFS-196, VRFS-195, VRFS-197 - integrate amqp_connection_manager into websocket-gateway, and send server up/down messages in response to pings
This commit is contained in:
parent
3507bf5b79
commit
404c07db1b
|
|
@ -1,13 +1,14 @@
|
|||
module JamWebsockets
|
||||
class ClientContext
|
||||
|
||||
attr_accessor :user, :client, :msg_count, :session
|
||||
attr_accessor :user, :client, :msg_count, :session, :sent_bad_state_previously
|
||||
|
||||
def initialize(user, client)
|
||||
@user = user
|
||||
@client = client
|
||||
@msg_count = 0
|
||||
@session = nil
|
||||
@sent_bad_state_previously = false
|
||||
end
|
||||
|
||||
def to_s
|
||||
|
|
|
|||
|
|
@ -27,8 +27,7 @@ module JamWebsockets
|
|||
@clients = {} # clients that have logged in
|
||||
@user_context_lookup = {} # lookup a set of client_contexts by user_id
|
||||
@client_lookup = {} # lookup a client by client_id
|
||||
@connection = nil
|
||||
@channel = nil
|
||||
@amqp_connection_manager = nil
|
||||
@users_exchange = nil
|
||||
@message_factory = JamRuby::MessageFactory.new
|
||||
@semaphore = Mutex.new
|
||||
|
|
@ -46,11 +45,13 @@ module JamWebsockets
|
|||
@heartbeat_interval = connect_time_stale / 2
|
||||
|
||||
begin
|
||||
@connection = AMQP.connect(:host => options[:host], :port => options[:port])
|
||||
@channel = AMQP::Channel.new(@connection)
|
||||
#@channel.prefetch = 10
|
||||
register_topics
|
||||
@amqp_connection_manager = AmqpConnectionManager.new(true, 4, :host => options[:host], :port => options[:port])
|
||||
@amqp_connection_manager.connect do |channel|
|
||||
register_topics(channel)
|
||||
end
|
||||
|
||||
rescue => e
|
||||
@log.error "unable to initialize #{e.to_s}"
|
||||
cleanup
|
||||
raise e
|
||||
end
|
||||
|
|
@ -110,14 +111,14 @@ module JamWebsockets
|
|||
end
|
||||
|
||||
# register topic for user messages and session messages
|
||||
def register_topics
|
||||
def register_topics(channel)
|
||||
|
||||
######################## USER MESSAGING ###########################
|
||||
|
||||
# create user exchange
|
||||
@users_exchange = @channel.topic('users')
|
||||
@users_exchange = channel.topic('users')
|
||||
# create user messaging topic
|
||||
@user_topic = @channel.queue("", :auto_delete => true)
|
||||
@user_topic = channel.queue("", :auto_delete => true)
|
||||
@user_topic.bind(@users_exchange, :routing_key => "user.#")
|
||||
@user_topic.purge
|
||||
|
||||
|
|
@ -155,9 +156,9 @@ module JamWebsockets
|
|||
|
||||
############## CLIENT MESSAGING ###################
|
||||
|
||||
@clients_exchange = @channel.topic('clients')
|
||||
@clients_exchange = channel.topic('clients')
|
||||
|
||||
@client_topic = @channel.queue("", :auto_delete => true)
|
||||
@client_topic = channel.queue("", :auto_delete => true)
|
||||
@client_topic.bind(@clients_exchange, :routing_key => "client.#")
|
||||
@client_topic.purge
|
||||
|
||||
|
|
@ -294,12 +295,8 @@ module JamWebsockets
|
|||
def cleanup()
|
||||
# shutdown topic listeners and mq connection
|
||||
|
||||
if !@channel.nil?
|
||||
@channel.close
|
||||
end
|
||||
|
||||
if !@connection.nil?
|
||||
@connection.close
|
||||
unless @amqp_connection_manager.nil?
|
||||
@amqp_connection_manager.disconnect
|
||||
end
|
||||
|
||||
# tear down each individual client
|
||||
|
|
@ -423,7 +420,7 @@ module JamWebsockets
|
|||
|
||||
elsif client_msg.type == ClientMessage::Type::HEARTBEAT
|
||||
|
||||
handle_heartbeat(client_msg.heartbeat, client)
|
||||
handle_heartbeat(client_msg.heartbeat, client_msg.message_id, client)
|
||||
|
||||
else
|
||||
raise SessionError, "unknown message type '#{client_msg.type}' for #{client_msg.route_to}-directed message"
|
||||
|
|
@ -516,7 +513,7 @@ module JamWebsockets
|
|||
end
|
||||
end
|
||||
|
||||
def handle_heartbeat(heartbeat, client)
|
||||
def handle_heartbeat(heartbeat, heartbeat_message_id, client)
|
||||
unless context = @clients[client]
|
||||
@log.warn "*** WARNING: unable to find context due to heartbeat from client: #{client.client_id}; calling cleanup"
|
||||
cleanup_client(client)
|
||||
|
|
@ -530,6 +527,18 @@ module JamWebsockets
|
|||
connection_manager.reconnect(connection)
|
||||
end if connection.stale?
|
||||
end
|
||||
|
||||
# send errors to clients in response to heartbeats if
|
||||
if !@amqp_connection_manager.connected?
|
||||
error_msg = @message_factory.server_bad_state_error(heartbeat_message_id, "messaging system down")
|
||||
context.sent_bad_state_previously = true
|
||||
send_to_client(client, error_msg)
|
||||
return
|
||||
elsif context.sent_bad_state_previously
|
||||
context.sent_bad_state_previously = false
|
||||
recovery_msg = @message_factory.server_bad_state_recovered(heartbeat_message_id)
|
||||
send_to_client(client, recovery_msg)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
|
|
|||
|
|
@ -88,13 +88,12 @@ describe Router do
|
|||
|
||||
em_before do
|
||||
@router = Router.new()
|
||||
@router.start(30)
|
||||
end
|
||||
|
||||
subject { @router }
|
||||
|
||||
em_after do
|
||||
@router.stop
|
||||
|
||||
end
|
||||
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue