From 0e4d5a13f8ed4dec54211d68cb77ed73f330f0fa Mon Sep 17 00:00:00 2001 From: Seth Call Date: Tue, 16 Oct 2012 22:50:28 -0500 Subject: [PATCH] * removing session topic concept; instead sending to a session causes a message to be created for each client --- lib/jam_websockets/router.rb | 282 ++++++----------------------------- 1 file changed, 46 insertions(+), 236 deletions(-) diff --git a/lib/jam_websockets/router.rb b/lib/jam_websockets/router.rb index 7114d5e3a..82fa6f2b2 100644 --- a/lib/jam_websockets/router.rb +++ b/lib/jam_websockets/router.rb @@ -22,16 +22,14 @@ module JamWebsockets class Router - attr_accessor :user_context_lookup, :session_context_lookup + attr_accessor :user_context_lookup def initialize(options={}) @log = Logging.logger[self] @pending_clients = Set.new # clients that have connected to server, but not logged in. @clients = {} # clients that have logged in @user_context_lookup = {} # lookup a set of client_contexts by user_id - @session_context_lookup = {} # lookup a set of client_contexts by session_id @client_lookup = {} # lookup a client by client_id - @sessions_exchange = nil @connection = nil @channel = nil @users_exchange = nil @@ -39,8 +37,6 @@ module JamWebsockets @semaphore = Mutex.new @user_topic = nil @user_subscription = nil - @session_topic = nil - @session_subscription = nil @client_topic = nil @client_subscription = nil @thread_pool = nil @@ -103,45 +99,21 @@ module JamWebsockets user_contexts.add(context) end - def remove_user(context) - user_contexts = @user_context_lookup[context.user.id] + def remove_user(client_context) + user_contexts = @user_context_lookup[client_context.user.id] + if user_contexts.nil? - @log.warn "user can not be removed #{context}" + @log.warn "user can not be removed #{client_context}" else # delete the context from set of user contexts - user_contexts.delete(context) + user_contexts.delete(client_context) # if last user context, delete entire set (memory leak concern) if user_contexts.length == 0 - @user_context_lookup.delete(context.user.id) - end - end - end - - def add_session(context) - session_contexts = @session_context_lookup[context.session.id] - if session_contexts.nil? - session_contexts = Set.new - @session_context_lookup[context.session.id] = session_contexts - end - - session_contexts.add(context) - end - - def remove_session(context) - session_contexts = @session_context_lookup[context.session.id] - if session_contexts.nil? - @log.warn "session can not be removed #{context}" - else - # delete the context from set of session contexts - session_contexts.delete(context) - - # if last session context, delete entire set (memory leak concern) - if session_contexts.length == 0 - @session_context_lookup.delete(context.session.id) + @user_context_lookup.delete(client_context.user.id) end - context.session = nil + client_context.user = nil end end @@ -192,63 +164,6 @@ module JamWebsockets end end - ######################## SESSION MESSAGING ########################### - - # create session exchange - @sessions_exchange = @channel.exchange('sessions', :type => :topic) - - # create session messaging topic - @session_topic = @channel.queue("", :auto_delete => true) - @session_topic.bind(@sessions_exchange, :routing_key => "session.#") - @session_topic.purge - - # subscribe for any messages to session - @session_subscription = @session_topic.subscribe(:ack => false) - - # this code serves as a callback that dequeues messages and processes them - @session_subscription.each(:blocking => false, :executor => @threadpool) do |headers, msg| - begin - routing_key = headers.envelope.routing_key - session_id = routing_key["session.".length..-1] - @semaphore.synchronize do - contexts = @session_context_lookup[session_id] - - unless contexts.nil? - - @log.debug "received session-directed message for session: #{session_id}" - - - # ok, its very odd to have your own message that you sent bounce back to you. - # In one small favor to the client, we purposefully disallow messages a client - # sent from bouncing back to itself. - properties = headers.properties unless headers.nil? - inner_headers = properties.headers unless properties.nil? - origin_client_id = inner_headers["client_id"] - - # counter-intuitively, even though a string is passed in when you send the header, an (apparently) auto-generated class is sent back which, if you to_s, returns the original value - origin_client_id = origin_client_id.to_s unless origin_client_id.nil? - - - @log.debug "session message received from client #{origin_client_id}" - - msg = Jampb::ClientMessage.parse(msg) - contexts.each do |context| - if context.client.client_id != origin_client_id - EM.schedule do - @log.debug "sending session message to #{context}" - send_to_client(context.client, msg) - end - end - end - end - end - - rescue => e - @log.error "unhandled error in messaging to client" - @log.error e - end - end - ############## CLIENT MESSAGING ################### @clients_exchange = @channel.exchange('clients', :type => :topic) @@ -266,14 +181,11 @@ module JamWebsockets @semaphore.synchronize do client = @client_lookup[client_id] - properties = headers.properties unless headers.nil? - inner_headers = properties.headers unless properties.nil? - origin_client_id = inner_headers["client_id"] + msg = Jampb::ClientMessage.parse(msg) - @log.debug "p2p message received from client #{origin_client_id} to client #{client_id}" + @log.debug "p2p message received from #{msg.from} to client #{client_id}" unless client.nil? - msg = Jampb::ClientMessage.parse(msg) EM.schedule do @log.debug "sending p2p message to #{client_id}" @@ -282,10 +194,7 @@ module JamWebsockets else @log.debug "p2p message unroutable to disconnected client #{client_id}" end - end - - rescue => e @log.error "unhandled error in messaging to client" @log.error e @@ -294,7 +203,6 @@ module JamWebsockets end - def new_client(client) @semaphore.synchronize do @@ -385,63 +293,6 @@ module JamWebsockets } end - def add_user(client_context) - user_contexts = @user_context_lookup[client_context.user.id] - - if user_contexts.nil? - user_contexts = Set.new - @user_context_lookup[client_context.user.id] = user_contexts - end - - user_contexts.add(client_context) - end - - def remove_user(client_context) - user_contexts = @user_context_lookup[client_context.user.id] - - if user_contexts.nil? - @log.warn "user can not be removed #{client_context}" - else - # delete the context from set of user contexts - user_contexts.delete(client_context) - - # if last user context, delete entire set (memory leak concern) - if user_contexts.length == 0 - @user_context_lookup.delete(client_context.user.id) - end - - client_context.user = nil - end - end - - def add_session(client_context) - session_contexts = @session_context_lookup[client_context.session.id] - - if session_contexts.nil? - session_contexts = Set.new - @session_context_lookup[client_context.session.id] = session_contexts - end - - session_contexts.add(client_context) - end - - def remove_session(client_context) - session_contexts = @session_context_lookup[client_context.session.id] - - if session_contexts.nil? - @log.warn "session can not be removed #{client_context}" - else - # delete the context from set of session contexts - session_contexts.delete(client_context) - - # if last session context, delete entire set (memory leak concern) - if session_contexts.length == 0 - @session_context_lookup.delete(client_context.session.id) - end - - client_context.session = nil - end - end def send_to_client(client, msg) @log.debug "SEND TO CLIENT START" @@ -463,12 +314,6 @@ module JamWebsockets @user_subscription.shutdown! end - if !@session_subscription.nil? && @session_subscription.active? - @log.debug "cleaning up session subscription" - @session_subscription.cancel - @session_subscription.shutdown! - end - if !@client_subscription.nil? && @client_subscription.active? @log.debug "cleaning up client subscription" @client_subscription.cancel @@ -526,10 +371,6 @@ module JamWebsockets send_friend_update(context.user, false, context.client) remove_user(context) - - if !context.session.nil? - remove_session(context) - end else @log.debug "skipping duplicate cleanup attempt of logged-in client" end @@ -587,14 +428,6 @@ module JamWebsockets handle_heartbeat(client_msg.heartbeat, client) - elsif client_msg.type == ClientMessage::Type::LOGIN_MUSIC_SESSION - - handle_join_music_session(client_msg.login_music_session, client) - - elsif client_msg.type == ClientMessage::Type::LEAVE_MUSIC_SESSION - - handle_leave_music_session(client_msg.leave_music_session, client) - else raise SessionError, "unknown message type '#{client_msg.type}' for #{client_msg.route_to}-directed message" end @@ -680,50 +513,6 @@ module JamWebsockets end end - def handle_join_music_session(join_music_session, client) - # verify that the current user has the rights to actually join the music session - context = @clients[client] - - session_id = join_music_session.music_session - - begin - session = access_music_session(session_id, context.user) - - @log.debug "user #{context} joining new session #{session}" - @semaphore.synchronize do - old_session = context.session - if !old_session.nil? - @log.debug "#{context} is already in session. auto-logging out to join new session." - remove_session(context) - end - context.session = session - add_session(context) - end - rescue => e - # send back a failure ack and bail - @log.debug "client requested non-existent session. client:#{client.request['origin']} user:#{context.user.email}" - login_music_session = @message_factory.login_music_session_ack(true, e.to_s) - send_to_client(client, login_music_session) - return - end - - # respond with LOGIN_MUSIC_SESSION_ACK to let client know it was successful - login_music_session = @message_factory.login_music_session_ack(false, nil) - send_to_client(client, login_music_session) - - # send 'new client' message to other members in the session - handle_session_directed(session_id, - @message_factory.user_joined_music_session(context.user.id, context.user.name), - client) - end - - def handle_leave_music_session(leave_music_session, client) - - context = @clients[client] - - raise SessionError, "unsupported" - end - def valid_login(username, password, token, client_id) if !token.nil? && token != '' @@ -787,25 +576,10 @@ module JamWebsockets end if !music_session_client.access_p2p? user - raise SessionError, 'not allowed to message this client' - end end - def handle_session_directed(session_id, client_msg, client) - - context = @clients[client] - - # by not catching any exception here, this will kill the connection - # if for some reason the client is trying to send to a session that it doesn't - # belong to - session = access_music_session(session_id, context.user) - - @log.debug "publishing to session #{session} from client_id #{client.client_id}" - # put it on the topic exchange for sessions - @sessions_exchange.publish(client_msg.to_s, :routing_key => "session.#{session_id}", :properties => {:headers => {"client_id" => client.client_id}}) - end def handle_client_directed(to_client_id, client_msg, client) context = @clients[client] @@ -831,5 +605,41 @@ module JamWebsockets # put it on the topic exchange for users @users_exchange.publish(client_msg.to_s, :routing_key => "user.#{user_id}") end + + def handle_session_directed(session_id, client_msg, client) + context = @clients[client] + + user_publish_to_session(session_id, context.user, client_msg, :client_id => client.client_id) + end + + # sends a message to a session on behalf of a user + # if this is originating in the context of a client, it should be specified as :client_id => "value" + # client_msg should be a well-structure message (jam-pb message) + def user_publish_to_session(music_session_id, user, client_msg, sender = {:client_id => ""}) + music_session = access_music_session(music_session_id, user) + + # gather up client_ids in the session + client_ids = music_session.music_session_clients.map { |client| client.client_id }.reject { |client_id| client_id == sender[:client_id] } + + publish_to_session(music_session.id, client_ids, client_msg.to_s, sender) + end + + + # sends a message to a session with no checking of permissions + # this method deliberately has no database interactivity/active_record objects + def publish_to_session(music_session_id, client_ids, client_msg, sender = {:client_id => ""}) + + EM.schedule do + sender_client_id = sender[:client_id] + + # iterate over each person in the session, and send a p2p message + client_ids.each do |client_id| + + @@log.debug "publishing to session:#{music_session_id} client:#{client_id} from client:#{sender_client_id}" + # put it on the topic exchange3 for clients + self.class.client_exchange.publish(client_msg, :routing_key => "client.#{music_session_id}") + end + end + end end end