From cbb9508b6b38989aef43277ea5b8f000de13d69e Mon Sep 17 00:00:00 2001 From: Brian Smith Date: Sat, 6 Oct 2012 23:38:38 -0400 Subject: [PATCH] more changes for user presence --- lib/jam_websockets/client_context.rb | 2 +- lib/jam_websockets/router.rb | 267 ++++++++++++++------------- lib/jam_websockets/server.rb | 1 + 3 files changed, 139 insertions(+), 131 deletions(-) diff --git a/lib/jam_websockets/client_context.rb b/lib/jam_websockets/client_context.rb index 706e578b6..46cb87a92 100644 --- a/lib/jam_websockets/client_context.rb +++ b/lib/jam_websockets/client_context.rb @@ -1,4 +1,4 @@ -module JamWebsockets + module JamWebsockets class ClientContext attr_accessor :user, :client, :msg_count, :session diff --git a/lib/jam_websockets/router.rb b/lib/jam_websockets/router.rb index efade4a56..0d322db1c 100644 --- a/lib/jam_websockets/router.rb +++ b/lib/jam_websockets/router.rb @@ -20,7 +20,6 @@ end module JamWebsockets - class Router attr_accessor :user_context_lookup, :session_context_lookup @@ -62,75 +61,24 @@ module JamWebsockets end - def add_user(context) - user_contexts = @user_context_lookup[context.user.id] - if user_contexts.nil? - user_contexts = Set.new - @user_context_lookup[context.user.id] = user_contexts - end - - user_contexts.add(context) - end - - def remove_user(context) - user_contexts = @user_context_lookup[context.user.id] - if user_contexts.nil? - @log.warn "user can not be removed #{context}" - else - # delete the context from set of user contexts - user_contexts.delete(context) - - # if last user context, delete entire set (memory leak concern) - if user_contexts.length == 0 - @user_context_lookup.delete(context.user.id) - end - end - end - - def add_session(context) - session_contexts = @session_context_lookup[context.session.id] - if session_contexts.nil? - session_contexts = Set.new - @session_context_lookup[context.session.id] = session_contexts - end - - session_contexts.add(context) - end - - def remove_session(context) - session_contexts = @session_context_lookup[context.session.id] - if session_contexts.nil? - @log.warn "session can not be removed #{context}" - else - # delete the context from set of session contexts - session_contexts.delete(context) - - # if last session context, delete entire set (memory leak concern) - if session_contexts.length == 0 - @session_context_lookup.delete(context.session.id) - end - - context.session = nil - end - end - - # register topic for user messages and session messages def register_topics + + ######################## USER MESSAGING ########################### + + # create user exchange @users_exchange = @channel.exchange('users', :type => :topic) - @sessions_exchange = @channel.exchange('sessions', :type => :topic) # create user messaging topic @user_topic = @channel.queue("", :auto_delete => true) @user_topic.bind(@users_exchange, :routing_key => "user.#") @user_topic.purge - # TODO: alert friends - - # subscribe for any messages to users - + # subscribe for any messages to users #@user_subscription = @user_topic.subscribe(:ack => false, :blocking => false, :executor => @threadpool) do |headers, msg| @user_subscription = @user_topic.subscribe(:ack => false) + + # this code serves as a callback that dequeues messages and processes them @user_subscription.each(:blocking => false, :executor => @threadpool) do |headers, msg| begin routing_key = headers.envelope.routing_key @@ -140,7 +88,7 @@ module JamWebsockets unless contexts.nil? - @log.debug "received user-directed message for session: #{user_id}" + @log.debug "received user-directed message for user: #{user_id}" msg = Jampb::ClientMessage.parse(msg) contexts.each do |context| @@ -157,6 +105,12 @@ module JamWebsockets end end + ######################## SESSION MESSAGING ########################### + + # create session exchange + @sessions_exchange = @channel.exchange('sessions', :type => :topic) + + # create session messaging topic @session_topic = @channel.queue("", :auto_delete => true) @session_topic.bind(@sessions_exchange, :routing_key => "session.#") @session_topic.purge @@ -164,6 +118,8 @@ module JamWebsockets # subscribe for any messages to session #@session_subscription = @session_topic.subscribe(:ack => false, :blocking => false) do |headers, msg| @session_subscription = @session_topic.subscribe(:ack => false) + + # this code serves as a callback that dequeues messages and processes them @session_subscription.each(:blocking => false, :executor => @threadpool) do |headers, msg| begin routing_key = headers.envelope.routing_key @@ -174,8 +130,6 @@ module JamWebsockets unless contexts.nil? @log.debug "received session-directed message for session: #{session_id}" - - msg = Jampb::ClientMessage.parse(msg) # ok, its very odd to have your own message that you sent bounce back to you. # In one small favor to the client, we purposefully disallow messages a client @@ -187,7 +141,9 @@ module JamWebsockets # counter-intuitively, even though a string is passed in when you send the header, an (apparently) auto-generated class is sent back which, if you to_s, returns the original value origin_client_id = origin_client_id.to_s unless origin_client_id.nil? - @log.debug "message received from client #{origin_client_id}" + @log.debug "session message received from client #{origin_client_id}" + + msg = Jampb::ClientMessage.parse(msg) contexts.each do |context| if context.client.client_id != origin_client_id EM.schedule do @@ -205,55 +161,6 @@ module JamWebsockets end end - def send_to_client(client, msg) - if client.encode_json - client.send(msg.to_json.to_s) - else - # this is so odd that this is necessary from an API perspective. but searching through the source code... it's all I could find in em-websocket for allowing a binary message to be sent - client.instance_variable_get(:@handler).send_frame(:binary, msg.to_s) - end - end - - def cleanup() - # shutdown topic listeners and mq connection - begin - if !@user_subscription.nil? && @user_subscription.active? - @log.debug "cleaning up user subscription" - @user_subscription.cancel - @user_subscription.shutdown! - end - - if !@session_subscription.nil? && @session_subscription.active? - @log.debug "cleaning up session subscription" - @session_subscription.cancel - @session_subscription.shutdown! - end - - rescue => e - @log.debug "unable to cancel subscription on cleanup: #{e}" - end - - @thread_pool.shutdown - - if !@channel.nil? - @channel.close - end - - if !@connection.nil? - @connection.close - end - - # tear down each individual client - @clients.each do |client, context| - cleanup_client(client) - end - end - - def stop - @log.info "shutdown" - cleanup - end - def new_client(client) # give a unique ID to this client. This is used to prevent session messages @@ -342,6 +249,113 @@ module JamWebsockets end + def add_user(client_context) + user_contexts = @user_context_lookup[client_context.user.id] + + if user_contexts.nil? + user_contexts = Set.new + @user_context_lookup[client_context.user.id] = user_contexts + end + + user_contexts.add(client_context) + end + + def remove_user(client_context) + user_contexts = @user_context_lookup[client_context.user.id] + + if user_contexts.nil? + @log.warn "user can not be removed #{client_context}" + else + # delete the context from set of user contexts + user_contexts.delete(client_context) + + # if last user context, delete entire set (memory leak concern) + if user_contexts.length == 0 + @user_context_lookup.delete(client_context.user.id) + end + + client_context.user = nil + end + end + + def add_session(client_context) + session_contexts = @session_context_lookup[client_context.session.id] + + if session_contexts.nil? + session_contexts = Set.new + @session_context_lookup[client_context.session.id] = session_contexts + end + + session_contexts.add(client_context) + end + + def remove_session(client_context) + session_contexts = @session_context_lookup[client_context.session.id] + + if session_contexts.nil? + @log.warn "session can not be removed #{client_context}" + else + # delete the context from set of session contexts + session_contexts.delete(client_context) + + # if last session context, delete entire set (memory leak concern) + if session_contexts.length == 0 + @session_context_lookup.delete(client_context.session.id) + end + + client_context.session = nil + end + end + + def send_to_client(client, msg) + if client.encode_json + client.send(msg.to_json.to_s) + else + # this is so odd that this is necessary from an API perspective. but searching through the source code... it's all I could find in em-websocket for allowing a binary message to be sent + client.instance_variable_get(:@handler).send_frame(:binary, msg.to_s) + end + end + + def cleanup() + # shutdown topic listeners and mq connection + begin + if !@user_subscription.nil? && @user_subscription.active? + @log.debug "cleaning up user subscription" + @user_subscription.cancel + @user_subscription.shutdown! + end + + if !@session_subscription.nil? && @session_subscription.active? + @log.debug "cleaning up session subscription" + @session_subscription.cancel + @session_subscription.shutdown! + end + + rescue => e + @log.debug "unable to cancel subscription on cleanup: #{e}" + end + + @thread_pool.shutdown + + if !@channel.nil? + @channel.close + end + + if !@connection.nil? + @connection.close + end + + # tear down each individual client + @clients.each do |client, context| + cleanup_client(client) + end + end + + def stop + @log.info "shutdown" + cleanup + end + # removes all resources associated with a client def cleanup_client(client) @@ -358,8 +372,8 @@ module JamWebsockets remove_user(context) # remove this connection from the database - connection = Connection.delete_all "user_id = '#{context.user.id}' AND client_id = '#{context.client.client_id}'" - send_friend_update(user, false) + connection = JamRuby::Connection.delete_all "user_id = '#{context.user.id}' AND client_id = '#{context.client.client_id}'" + send_friend_update(client, user, false) if !context.session.nil? remove_session(context) @@ -447,8 +461,8 @@ module JamWebsockets login_ack = @message_factory.login_ack(remote_ip) send_to_client(client, login_ack) - # remove from pending_queue @semaphore.synchronize do + # remove from pending_queue @pending_clients.delete(client) # add a tracker for this user @@ -457,10 +471,10 @@ module JamWebsockets add_user(context) # log this connection in the database - connection = Connection.new(user.id, client.id) + connection = JamRuby::Connection.new(user.id, client.id) if connection.save? - send_friend_update(user, true) + send_friend_update(client, user, true) end end else @@ -468,26 +482,19 @@ module JamWebsockets end end - def send_friend_update(user, online) + def send_friend_update(client, user, online) unless user.friends.nil? @log.debug "sending friend update message to friends" # create the friend_update message - friend_update = @message_factory.friend_update(user.id, online) + friend_update_msg = @message_factory.friend_update(user.id, online) # send the friend_update to each friend that has active connections user.friends.each do |friend| - # only send to friends that have active connections - active_connections = @user_context_lookup[friend.id] - unless active_connections.nil? - # send the update to each active connection of this friend - active_connections.each do |context| - EM.schedule do - @log.debug "sending friend update message to #{friend}" - send_to_client(context.client, friend_update) - end - end - end + @log.debug "sending friend update message to #{context}" + + # put it on the topic exchange for users + @users_exchange.publish(friend_update_msg.to_s, :routing_key => "user.#{friend.id}") end end end diff --git a/lib/jam_websockets/server.rb b/lib/jam_websockets/server.rb index 470eaecb9..3b7f0c9c0 100644 --- a/lib/jam_websockets/server.rb +++ b/lib/jam_websockets/server.rb @@ -1,6 +1,7 @@ require 'em-websocket' module JamWebsockets + class Server def initialize(options={})