diff --git a/lib/jam_websockets.rb b/lib/jam_websockets.rb index a2e24ee9e..f57f55206 100644 --- a/lib/jam_websockets.rb +++ b/lib/jam_websockets.rb @@ -2,6 +2,7 @@ require "logging" require "jam_ruby" require "jam_websockets/version" require "jam_websockets/session_error" +require "jam_websockets/permission_error" require "jam_websockets/client_context" require "jam_websockets/message" require "jam_websockets/router" diff --git a/lib/jam_websockets/permission_error.rb b/lib/jam_websockets/permission_error.rb new file mode 100644 index 000000000..83a0af45a --- /dev/null +++ b/lib/jam_websockets/permission_error.rb @@ -0,0 +1,4 @@ +class PermissionError < Exception + +end + diff --git a/lib/jam_websockets/router.rb b/lib/jam_websockets/router.rb index 875078498..92eb0d49d 100644 --- a/lib/jam_websockets/router.rb +++ b/lib/jam_websockets/router.rb @@ -11,37 +11,40 @@ include Jampb # add new field to client connection module EventMachine -module WebSocket - class Connection < EventMachine::Connection + module WebSocket + class Connection < EventMachine::Connection attr_accessor :encode_json, :client_id # client_id is uuid we give to each client to track them as we like + end end end -end module JamWebsockets class Router - attr_accessor :user_context_lookup, :session_context_lookup - + attr_accessor :user_context_lookup, :session_context_lookup + def initialize(options={}) @log = Logging.logger[self] @pending_clients = Set.new # clients that have connected to server, but not logged in. @clients = {} # clients that have logged in - @user_context_lookup = {} # lookup a set of client_contexts by user_id - @session_context_lookup = {} # lookup a set of client_contexts by session_id + @user_context_lookup = {} # lookup a set of client_contexts by user_id + @session_context_lookup = {} # lookup a set of client_contexts by session_id + @client_lookup = {} # lookup a client by client_id @sessions_exchange = nil @connection = nil @channel = nil @users_exchange = nil @message_factory = JamRuby::MessageFactory.new - @semaphore = Mutex.new - @user_topic = nil - @user_subscription = nil - @session_topic = nil - @session_subscription = nil - @thread_pool = nil + @semaphore = Mutex.new + @user_topic = nil + @user_subscription = nil + @session_topic = nil + @session_subscription = nil + @client_topic = nil + @client_subscription = nil + @thread_pool = nil end def start(options = {}) @@ -49,7 +52,7 @@ module JamWebsockets @log.info "startup" begin - @thread_pool = Executors.new_fixed_thread_pool(8) + @thread_pool = Executors.new_fixed_thread_pool(8) @connection = HotBunnies.connect(:host => options[:host], :port => options[:port]) @channel = @connection.create_channel @channel.prefetch = 10 @@ -62,119 +65,143 @@ module JamWebsockets end - def add_user(context) - user_contexts = @user_context_lookup[context.user.id] - if user_contexts.nil? - user_contexts = Set.new - @user_context_lookup[context.user.id] = user_contexts - end + def add_client(client_id, client) - user_contexts.add(context) - end + # should never occur + if @client_lookup.has_key?(client_id) + @log.warn "client_id #{client_id} connected while the old connection has not yet been terminated" + end - def remove_user(context) - user_contexts = @user_context_lookup[context.user.id] - if user_contexts.nil? - @log.warn "user can not be removed #{context}" - else - # delete the context from set of user contexts - user_contexts.delete(context) + @client_lookup[client_id] = client + end - # if last user context, delete entire set (memory leak concern) - if user_contexts.length == 0 - @user_context_lookup.delete(context.user.id) - end - end - end + def remove_client(client_id, client) + deleted = @client_lookup.delete(client_id) - def add_session(context) - session_contexts = @session_context_lookup[context.session.id] - if session_contexts.nil? - session_contexts = Set.new - @session_context_lookup[context.session.id] = session_contexts - end + if deleted.nil? + @log.warn "unable to delete #{client_id} from client_lookup" + elsif deleted == client + # put it back--this is only possible if add_client hit the 'old connection' path + # so in other words if this happens: + # add_client(1, clientX) + # add_client(1, clientY) # but clientX is essentially defunct - this could happen due to a bug in client, or EM doesn't notify always of connection close in time + # remove_client(1, clientX) -- this check maintains that clientY stays as the current client in the hash + @client_lookup[client_id] = client + end + end - session_contexts.add(context) - end + def add_user(context) + user_contexts = @user_context_lookup[context.user.id] + if user_contexts.nil? + user_contexts = Set.new + @user_context_lookup[context.user.id] = user_contexts + end - def remove_session(context) - session_contexts = @session_context_lookup[context.session.id] - if session_contexts.nil? - @log.warn "session can not be removed #{context}" - else - # delete the context from set of session contexts - session_contexts.delete(context) + user_contexts.add(context) + end - # if last session context, delete entire set (memory leak concern) - if session_contexts.length == 0 - @session_context_lookup.delete(context.session.id) - end + def remove_user(context) + user_contexts = @user_context_lookup[context.user.id] + if user_contexts.nil? + @log.warn "user can not be removed #{context}" + else + # delete the context from set of user contexts + user_contexts.delete(context) - context.session = nil - end - end + # if last user context, delete entire set (memory leak concern) + if user_contexts.length == 0 + @user_context_lookup.delete(context.user.id) + end + end + end + def add_session(context) + session_contexts = @session_context_lookup[context.session.id] + if session_contexts.nil? + session_contexts = Set.new + @session_context_lookup[context.session.id] = session_contexts + end - # register topic for user messages and session messages - def register_topics - @users_exchange = @channel.exchange('users', :type => :topic) + session_contexts.add(context) + end + + def remove_session(context) + session_contexts = @session_context_lookup[context.session.id] + if session_contexts.nil? + @log.warn "session can not be removed #{context}" + else + # delete the context from set of session contexts + session_contexts.delete(context) + + # if last session context, delete entire set (memory leak concern) + if session_contexts.length == 0 + @session_context_lookup.delete(context.session.id) + end + + context.session = nil + end + end + + # register topic for user messages and session messages + def register_topics + @users_exchange = @channel.exchange('users', :type => :topic) @sessions_exchange = @channel.exchange('sessions', :type => :topic) - - # create user messaging topic + @clients_exchange = @channel.exchange('clients', :type => :topic) + + # create user messaging topic @user_topic = @channel.queue("", :auto_delete => true) @user_topic.bind(@users_exchange, :routing_key => "user.#") @user_topic.purge # TODO: alert friends - # subscribe for any messages to users - - #@user_subscription = @user_topic.subscribe(:ack => false, :blocking => false, :executor => @threadpool) do |headers, msg| - @user_subscription = @user_topic.subscribe(:ack => false) - @user_subscription.each(:blocking => false, :executor => @threadpool) do |headers, msg| - begin - routing_key = headers.envelope.routing_key - user_id = routing_key["user.".length..-1] - @sempahore.synchronize do - contexts = @user_context_lookup[user_id] + # subscribe for any messages to users + + @user_subscription = @user_topic.subscribe(:ack => false) + @user_subscription.each(:blocking => false, :executor => @threadpool) do |headers, msg| + begin + routing_key = headers.envelope.routing_key + user_id = routing_key["user.".length..-1] + @sempahore.synchronize do + contexts = @user_context_lookup[user_id] + + unless contexts.nil? + + @log.debug "received user-directed message for session: #{user_id}" - unless contexts.nil? - - @log.debug "received user-directed message for session: #{user_id}" - msg = Jampb::ClientMessage.parse(msg) - contexts.each do |context| - EM.schedule do - @log.debug "sending user message to #{context}" - send_to_client(context.client, msg) - end - end - end - end + contexts.each do |context| + EM.schedule do + @log.debug "sending user message to #{context}" + send_to_client(context.client, msg) + end + end + end + end - rescue => e - @log.error "unhandled error in messaging to client" - end - end + rescue => e + @log.error "unhandled error in messaging to client" + @log.error e + end + end - @session_topic = @channel.queue("", :auto_delete => true) + @session_topic = @channel.queue("", :auto_delete => true) @session_topic.bind(@sessions_exchange, :routing_key => "session.#") @session_topic.purge # subscribe for any messages to session - #@session_subscription = @session_topic.subscribe(:ack => false, :blocking => false) do |headers, msg| @session_subscription = @session_topic.subscribe(:ack => false) - @session_subscription.each(:blocking => false, :executor => @threadpool) do |headers, msg| - begin - routing_key = headers.envelope.routing_key - session_id = routing_key["session.".length..-1] - @semaphore.synchronize do - contexts = @session_context_lookup[session_id] + @session_subscription.each(:blocking => false, :executor => @threadpool) do |headers, msg| + begin + routing_key = headers.envelope.routing_key + session_id = routing_key["session.".length..-1] + @semaphore.synchronize do + contexts = @session_context_lookup[session_id] + + unless contexts.nil? + + @log.debug "received session-directed message for session: #{session_id}" - unless contexts.nil? - - @log.debug "received session-directed message for session: #{session_id}" - msg = Jampb::ClientMessage.parse(msg) # ok, its very odd to have your own message that you sent bounce back to you. @@ -183,94 +210,132 @@ module JamWebsockets properties = headers.properties unless headers.nil? inner_headers = properties.headers unless properties.nil? origin_client_id = inner_headers["client_id"] - + # counter-intuitively, even though a string is passed in when you send the header, an (apparently) auto-generated class is sent back which, if you to_s, returns the original value origin_client_id = origin_client_id.to_s unless origin_client_id.nil? - + @log.debug "message received from client #{origin_client_id}" - contexts.each do |context| + contexts.each do |context| if context.client.client_id != origin_client_id EM.schedule do @log.debug "sending session message to #{context}" send_to_client(context.client, msg) end end - end - end - end + end + end + end - rescue => e - @log.error "unhandled error in messaging to client" - end + rescue => e + @log.error "unhandled error in messaging to client" + @log.error e + end end - end - def send_to_client(client, msg) + @client_topic = @channel.queue("", :auto_delete => true) + @client_topic.bind(@clients_exchange, :routing_key => "client.#") + @client_topic.purge + + # subscribe for any p2p messages to a client + @client_subscription = @client_topic.subscribe(:ack => false) + @client_subscription.each(:blocking => false, :executor => @threadpool) do |headers, msg| + begin + routing_key = headers.envelope.routing_key + client_id = routing_key["client.".length..-1] + @semaphore.synchronize do + client = @client_lookup[client_id] + + unless client.nil? + msg = Jampb::ClientMessage.parse(msg) + + properties = headers.properties unless headers.nil? + inner_headers = properties.headers unless properties.nil? + origin_client_id = inner_headers["client_id"] + + @log.debug "p2p message received from client #{origin_client_id} to client #{client_id}" + EM.schedule do + @log.debug "sending p2p message to #{client_id}" + send_to_client(client, msg) + end + end + end + + + rescue => e + @log.error "unhandled error in messaging to client" + @log.error e + end + end + end + + def send_to_client(client, msg) if client.encode_json client.send(msg.to_json.to_s) else - # this is so odd that this is necessary from an API perspective. but searching through the source code... it's all I could find in em-websocket for allowing a binary message to be sent - client.instance_variable_get(:@handler).send_frame(:binary, msg.to_s) + # this is so odd that this is necessary from an API perspective. but searching through the source code... it's all I could find in em-websocket for allowing a binary message to be sent + client.instance_variable_get(:@handler).send_frame(:binary, msg.to_s) end - end + end def cleanup() - # shutdown topic listeners and mq connection - begin - if !@user_subscription.nil? && @user_subscription.active? - @log.debug "cleaning up user subscription" - @user_subscription.cancel - @user_subscription.shutdown! - end + # shutdown topic listeners and mq connection + begin + if !@user_subscription.nil? && @user_subscription.active? + @log.debug "cleaning up user subscription" + @user_subscription.cancel + @user_subscription.shutdown! + end - if !@session_subscription.nil? && @session_subscription.active? - @log.debug "cleaning up session subscription" - @session_subscription.cancel - @session_subscription.shutdown! - end + if !@session_subscription.nil? && @session_subscription.active? + @log.debug "cleaning up session subscription" + @session_subscription.cancel + @session_subscription.shutdown! + end - rescue => e - @log.debug "unable to cancel subscription on cleanup: #{e}" - end + if !@client_subscription.nil? && @client_subscription.active? + @log.debug "cleaning up client subscription" + @client_subscription.cancel + @client_subscription.shutdown! + end - @thread_pool.shutdown + rescue => e + @log.debug "unable to cancel subscription on cleanup: #{e}" + end - if !@channel.nil? - @channel.close - end + @thread_pool.shutdown - if !@connection.nil? - @connection.close - end + if !@channel.nil? + @channel.close + end - # tear down each individual client - @clients.each do |client, context| - cleanup_client(client) - end - end + if !@connection.nil? + @connection.close + end - def stop - @log.info "shutdown" - cleanup - end + # tear down each individual client + @clients.each do |client, context| + cleanup_client(client) + end + end - def new_client(client) + def stop + @log.info "shutdown" + cleanup + end - # give a unique ID to this client. This is used to prevent session messages - # from echoing back to the sender, for instance. - client.client_id = UUIDTools::UUID.random_create.to_s + def new_client(client) - @semaphore.synchronize do - @pending_clients.add(client) - end + @semaphore.synchronize do + @pending_clients.add(client) + end # default to using json instead of pb - client.encode_json = true + client.encode_json = true + + client.onopen { + #binding.pry + @log.debug "client connected #{client}" - client.onopen { - #binding.pry - @log.debug "client connected #{client}" - # check for '?pb' or '?pb=true' in url query parameters query_pb = client.request["query"]["pb"] @@ -280,10 +345,10 @@ module JamWebsockets } - client.onclose { - @log.debug "Connection closed" + client.onclose { + @log.debug "Connection closed" - cleanup_client(client) + cleanup_client(client) } client.onerror { |error| @@ -294,7 +359,7 @@ module JamWebsockets end cleanup_client(client) - client.close_websocket + client.close_websocket } client.onmessage { |msg| @@ -303,79 +368,90 @@ module JamWebsockets # TODO: set a max message size before we put it through PB? # TODO: rate limit? - + begin - if client.encode_json - #example: {"type":"LOGIN", "target":"server", "login" : {"username":"hi"}} - parse = JSON.parse(msg) - pb_msg = Jampb::ClientMessage.json_create(parse) + if client.encode_json + #example: {"type":"LOGIN", "route_to":"server", "login" : {"username":"hi"}} + parse = JSON.parse(msg) + pb_msg = Jampb::ClientMessage.json_create(parse) self.route(pb_msg, client) - else - pb_msg = Jampb::ClientMessage.parse(msg.to_s) - self.route(pb_msg, client) - end - rescue SessionError => e - @log.info "ending client session deliberately due to malformed client behavior. reason=#{e}" - begin + else + pb_msg = Jampb::ClientMessage.parse(msg.to_s) + self.route(pb_msg, client) + end + rescue SessionError => e + @log.info "ending client session deliberately due to malformed client behavior. reason=#{e}" + begin # wrap the message up and send it down error_msg = @message_factory.server_rejection_error(e.to_s) - send_to_client(client, error_msg) + send_to_client(client, error_msg) ensure - client.close_websocket + client.close_websocket cleanup_client(client) end + rescue PermissionError => e + @log.info "permission error. reason=#{e.to_s}" + @log.info e + + # wrap the message up and send it down + error_msg = @message_factory.server_permission_error(msg.message_id, e.to_s) + send_to_client(client, error_msg) rescue => e @log.error "ending client session due to server programming or runtime error. reason=#{e.to_s}" - @log.error e - - begin + @log.error e + + begin # wrap the message up and send it down error_msg = @message_factory.server_generic_error(e.to_s) - send_to_client(client, error_msg) + send_to_client(client, error_msg) ensure - client.close_websocket + client.close_websocket cleanup_client(client) end end + } end - # removes all resources associated with a client + # removes all resources associated with a client def cleanup_client(client) - @semaphore.synchronize do - pending = @pending_clients.delete?(client) + @semaphore.synchronize do + pending = @pending_clients.delete?(client) - if !pending.nil? - @log.debug "cleaning up pending client #{client}" - else - context = @clients.delete(client) + if !pending.nil? + @log.debug "cleaning up pending client #{client}" + else - if !context.nil? - - remove_user(context) + remove_client(client.client_id, client) - if !context.session.nil? - remove_session(context) - end - else - @log.debug "skipping duplicate cleanup attempt of authorized client" - end + context = @clients.delete(client) - end - end + if !context.nil? + + remove_user(context) + + if !context.session.nil? + remove_session(context) + end + else + @log.debug "skipping duplicate cleanup attempt of authorized client" + end + + end + end end def route(client_msg, client) - message_type = @message_factory.get_message_type(client_msg) - - raise SessionError, "unknown message type received: #{client_msg.type}" if message_type.nil? - - @log.debug("msg received #{message_type}") + message_type = @message_factory.get_message_type(client_msg) - raise SessionError, 'client_msg.target is null' if client_msg.target.nil? + raise SessionError, "unknown message type received: #{client_msg.type}" if message_type.nil? + + @log.debug("msg received #{message_type}") + + raise SessionError, 'client_msg.route_to is null' if client_msg.route_to.nil? if @pending_clients.include? client and client_msg.type != ClientMessage::Type::LOGIN # this client has not logged in and is trying to send a non-login message @@ -386,18 +462,23 @@ module JamWebsockets handle_server_directed(client_msg, client) + elsif @message_factory.client_directed? client_msg + + to_client_id = client_msg.route_to[MessageFactory::CLIENT_TARGET_PREFIX.length..-1] + handle_client_directed(to_client_id, client_msg, client) + elsif @message_factory.session_directed? client_msg - session = client_msg.target[MessageFactory::SESSION_TARGET_PREFIX.length..-1] + session = client_msg.route_to[MessageFactory::SESSION_TARGET_PREFIX.length..-1] handle_session_directed(session, client_msg, client) elsif @message_factory.user_directed? client_msg - user = client_msg.target[MessageFactory::USER_PREFIX_TARGET.length..-1] + user = client_msg.route_to[MessageFactory::USER_PREFIX_TARGET.length..-1] handle_user_directed(user, client_msg, client) else - raise SessionError, "client_msg.target is unknown type: #{client_msg.target}" + raise SessionError, "client_msg.route_to is unknown type: #{client_msg.route_to}" end end @@ -408,9 +489,9 @@ module JamWebsockets handle_login(client_msg.login, client) - elsif client_msg.type == ClientMessage::Type::HEARTBEAT + elsif client_msg.type == ClientMessage::Type::HEARTBEAT - handle_heartbeat(client_msg.heartbeat, client) + handle_heartbeat(client_msg.heartbeat, client) elsif client_msg.type == ClientMessage::Type::LOGIN_MUSIC_SESSION @@ -421,7 +502,7 @@ module JamWebsockets handle_leave_music_session(client_msg.leave_music_session, client) else - raise SessionError, "unknown message type '#{client_msg.type}' for #{client_msg.target}-directed message" + raise SessionError, "unknown message type '#{client_msg.type}' for #{client_msg.route_to}-directed message" end end @@ -430,36 +511,47 @@ module JamWebsockets username = login.username if login.value_for_tag(1) password = login.password if login.value_for_tag(2) token = login.token if login.value_for_tag(3) + client_id = login.client_id if login.value_for_tag(4) - user = valid_login(username, password, token) + # you don't have to supply client_id in login--if you don't, we'll generate one + if client_id.nil? + # give a unique ID to this client. This is used to prevent session messages + # from echoing back to the sender, for instance. + client_id = UUIDTools::UUID.random_create.to_s + end + + client.client_id = client_id + + user = valid_login(username, password, token, client_id) if !user.nil? @log.debug "user #{user.email} logged in" # respond with LOGIN_ACK to let client know it was successful - #binding.pry - remote_port, remote_ip = Socket.unpack_sockaddr_in(client.get_peername) - login_ack = @message_factory.login_ack(remote_ip) - send_to_client(client, login_ack) + #binding.pry + remote_port, remote_ip = Socket.unpack_sockaddr_in(client.get_peername) + login_ack = @message_factory.login_ack(remote_ip, client_id, user.remember_token) + send_to_client(client, login_ack) # remove from pending_queue - @semaphore.synchronize do - @pending_clients.delete(client) + @semaphore.synchronize do + @pending_clients.delete(client) - # add a tracker for this user - context = ClientContext.new(user, client) - @clients[client] = context - add_user(context) - end + # add a tracker for this user + context = ClientContext.new(user, client) + @clients[client] = context + add_user(context) + add_client(client_id, client) + end else raise SessionError, 'invalid login' end end - def handle_heartbeat(heartbeat, client) - # todo: manage staleness - end + def handle_heartbeat(heartbeat, client) + # todo: manage staleness + end def handle_join_music_session(join_music_session, client) # verify that the current user has the rights to actually join the music session @@ -468,107 +560,146 @@ module JamWebsockets session_id = join_music_session.music_session begin - session = access_music_session?(session_id, context.user) - @log.debug "user #{context} joining new session #{session}" - @semaphore.synchronize do - old_session = context.session - if !old_session.nil? - @log.debug "#{context} is already in session. auto-logging out to join new session." - remove_session(context) - end - context.session = session - add_session(context) - end + session = access_music_session(session_id, context.user) + @log.debug "user #{context} joining new session #{session}" + @semaphore.synchronize do + old_session = context.session + if !old_session.nil? + @log.debug "#{context} is already in session. auto-logging out to join new session." + remove_session(context) + end + context.session = session + add_session(context) + end rescue => e # send back a failure ack and bail - @log.debug "client requested non-existent session. client:#{client.request['origin']} user:#{context.user.email}" + @log.debug "client requested non-existent session. client:#{client.request['origin']} user:#{context.user.email}" login_music_session = @message_factory.login_music_session_ack(true, e.to_s) - send_to_client(client, login_music_session) + send_to_client(client, login_music_session) return end # respond with LOGIN_MUSIC_SESSION_ACK to let client know it was successful login_music_session = @message_factory.login_music_session_ack(false, nil) - send_to_client(client, login_music_session) - + send_to_client(client, login_music_session) + # send 'new client' message to other members in the session - handle_session_directed(session_id, - @message_factory.user_joined_music_session(context.user.id, context.user.name), - client) + handle_session_directed(session_id, + @message_factory.user_joined_music_session(context.user.id, context.user.name), + client) end - def handle_leave_music_session(leave_music_session, client) + def handle_leave_music_session(leave_music_session, client) - context = @clients[client] + context = @clients[client] - raise SessionError, "unsupported" - end + raise SessionError, "unsupported" + end - def valid_login(username, password, token) + def valid_login(username, password, token, client_id) if !token.nil? && token != '' @log.debug "logging in via token" - # attempt login with token - user = User.find_by_remember_token(token) + # attempt login with token + user = User.find_by_remember_token(token) - if user.nil? - @log.debug "no user found with token" - return false - else - @log.debug "#{user} login via token" - return user - end + if user.nil? + @log.debug "no user found with token" + return false + else + @log.debug "#{user} login via token" + return user + end - elsif !username.nil? and !password.nil? + elsif !username.nil? and !password.nil? @log.debug "logging in via user/pass '#{username}' '#{password}'" - # attempt login with username and password - user = User.find_by_email(username) + # attempt login with username and password + user = User.find_by_email(username) - if !user.nil? && user.authenticate(password) - @log.debug "#{user} login via password" - return user - else - @log.debug "#{username} login failure" - return nil - end - else - raise SessionError, 'no login data was found in Login message' - end + if !user.nil? && user.authenticate(password) + @log.debug "#{user} login via password" + return user + else + @log.debug "#{username} login failure" + return nil + end + else + raise SessionError, 'no login data was found in Login message' + end - end + end - def access_music_session?(music_session_id, user) - music_session = MusicSession.find_by_id(music_session_id) + def access_music_session(music_session_id, user) + music_session = MusicSession.find_by_id(music_session_id) - if music_session.nil? - raise SessionError, 'specified session not found' - end + if music_session.nil? + raise SessionError, 'specified session not found' + end - if !music_session.access? user - raise SessionError, 'not allowed to join the specified session' - end + if !music_session.access? user + raise SessionError, 'not allowed to join the specified session' + end - return music_session - end + return music_session + end - def handle_session_directed(session_id, client_msg, client) + # client_id = the id of the client being accessed + # client = the current client + def access_p2p(client_id, user, msg) - context = @clients[client] + # ping_request and ping_ack messages are special in that they are simply allowed + if msg.type == ClientMessage::Type::PING_REQUEST || msg.type == ClientMessage::Type::PING_ACK + return nil + end - # by not catching any exception here, this will kill the connection - # if for some reason the client is trying to send to a session that it doesn't - # belong to - session = access_music_session?(session_id, context.user) + music_session_client = MusicSessionClient.find_by_client_id(client_id) - @log.debug "publishing to session #{session} from client_id #{client.client_id}" - # put it on the topic exchange for sessions - @sessions_exchange.publish(client_msg.to_s, :routing_key => "session.#{session_id}", :properties => { :headers => { "client_id" => client.client_id } } ) - end + if music_session_client.nil? + raise PermissionError, 'specified client not found' + end - def handle_user_directed(user, client_msg, client) + if !music_session_client.access_p2p? user - raise SessionError, 'not implemented' - end - end + raise SessionError, 'not allowed to message this client' + + end + end + + def handle_session_directed(session_id, client_msg, client) + + context = @clients[client] + + # by not catching any exception here, this will kill the connection + # if for some reason the client is trying to send to a session that it doesn't + # belong to + session = access_music_session(session_id, context.user) + + @log.debug "publishing to session #{session} from client_id #{client.client_id}" + # put it on the topic exchange for sessions + @sessions_exchange.publish(client_msg.to_s, :routing_key => "session.#{session_id}", :properties => {:headers => {"client_id" => client.client_id}}) + end + + def handle_client_directed(to_client_id, client_msg, client) + context = @clients[client] + + # by not catching any exception here, a PermissionError will be thrown if this isn't valid + # if for some reason the client is trying to send to a client that it doesn't + # belong to + access_p2p(to_client_id, context.user, client_msg) + + # populate routing data + client_msg.from = client.client_id + + @log.debug "publishing to client #{to_client_id} from client_id #{client.client_id}" + + # put it on the topic exchange for clients + @clients_exchange.publish(client_msg.to_s, :routing_key => "client.#{to_client_id}", :properties => {:headers => {"client_id" => client.client_id}}) + end + + def handle_user_directed(user, client_msg, client) + + raise SessionError, 'not implemented' + end + end end diff --git a/spec/jam_websockets/router_spec.rb b/spec/jam_websockets/router_spec.rb index 6736ca0e7..6ec66fe5f 100644 --- a/spec/jam_websockets/router_spec.rb +++ b/spec/jam_websockets/router_spec.rb @@ -2,78 +2,80 @@ require 'spec_helper' require 'thread' LoginClient = Class.new do - attr_accessor :onmsgblock, :onopenblock, :encode_json, :client_id + attr_accessor :onmsgblock, :onopenblock, :encode_json, :client_id - def initiaize() + def initiaize() - end + end - def onopen(&block) - @onopenblock = block - end + def onopen(&block) + @onopenblock = block + end - def onmessage(&block) - @onmsgblock = block - end + def onmessage(&block) + @onmsgblock = block + end - def close(&block) - @oncloseblock = block - end + def close(&block) + @oncloseblock = block + end - def close_websocket() - - end + def close_websocket() - def send(msg) - puts msg - end + end - def get_peername - return "\x00\x02\x93\v\x7F\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00" # 37643, "localhost" - end + def send(msg) + puts msg + end + + def get_peername + return "\x00\x02\x93\v\x7F\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00" # 37643, "localhost" + end end # does a login and returns client -def login(router, user, password) +def login(router, user, password, client_id) - message_factory = MessageFactory.new - client = LoginClient.new + message_factory = MessageFactory.new + client = LoginClient.new - login_ack = message_factory.login_ack("127.0.0.1") + login_ack = message_factory.login_ack("127.0.0.1", client_id) - router.should_receive(:send_to_client).with(client, login_ack) - client.should_receive(:onclose) - client.should_receive(:onerror) - client.should_receive(:request).and_return({ "query" => { "pb" => "true" } }) - client.should_receive(:get_peername).and_return("\x00\x02\x93\v\x7F\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00") + router.should_receive(:send_to_client).with(client, login_ack) + client.should_receive(:onclose) + client.should_receive(:onerror) + client.should_receive(:request).and_return({"query" => {"pb" => "true"}}) + client.should_receive(:get_peername).and_return("\x00\x02\x93\v\x7F\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00") - @router.new_client(client) - client.onopenblock.call + @router.new_client(client) + client.onopenblock.call - # create a login message, and pass it into the router via onmsgblock.call - login = message_factory.login_with_user_pass(user.email, password) + # create a login message, and pass it into the router via onmsgblock.call + login = message_factory.login_with_user_pass(user.email, password, :client_id => client_id) - # first log in - client.onmsgblock.call login.to_s + # first log in + client.onmsgblock.call login.to_s - # then join music session - return client + # then join music session + return client end +# currently commented out; we have deprecated logging in for jam sessions via websocket-gateway; +# use rest API instead (or direct db access with factory-girl) def login_music_session(router, client, music_session) - message_factory = MessageFactory.new - login_music_session = message_factory.login_music_session(music_session.id) - login_ack = message_factory.login_music_session_ack(false, nil); - router.should_receive(:send_to_client).with(client, login_ack) - client.onmsgblock.call login_music_session.to_s + #message_factory = MessageFactory.new + #login_music_session = message_factory.login_music_session(music_session.id) + #login_ack = message_factory.login_music_session_ack(false, nil); + #router.should_receive(:send_to_client).with(client, login_ack) + #client.onmsgblock.call login_music_session.to_s end describe Router do - message_factory = MessageFactory.new + message_factory = MessageFactory.new before do @@ -100,31 +102,30 @@ describe Router do client.should_receive(:onerror) client.should_receive(:onmessage) client.should_receive(:encode_json=) - client.should_receive(:client_id=) @router.new_client(client) end end - describe "topic routing helpers" do - it "create and delete user lookup set" do - user = double(User) - user.should_receive(:id).any_number_of_times.and_return("1") - client = double("client") - context = ClientContext.new(user, client) + describe "topic routing helpers" do + it "create and delete user lookup set" do + user = double(User) + user.should_receive(:id).any_number_of_times.and_return("1") + client = double("client") + context = ClientContext.new(user, client) - @router.user_context_lookup.length.should == 0 + @router.user_context_lookup.length.should == 0 - @router.add_user(context) - - @router.user_context_lookup.length.should == 1 + @router.add_user(context) - @router.remove_user(context) + @router.user_context_lookup.length.should == 1 - @router.user_context_lookup.length.should == 0 - end - end + @router.remove_user(context) + + @router.user_context_lookup.length.should == 0 + end + end describe "login" do @@ -145,8 +146,8 @@ describe Router do @onmsgblock = block end - def close_websocket() - end + def close_websocket() + end def close() end @@ -160,7 +161,7 @@ describe Router do client.should_receive(:close_websocket) client.should_receive(:onclose) client.should_receive(:onerror) - client.should_receive(:request).and_return({ "query" => { "pb" => "true" } }) + client.should_receive(:request).and_return({"query" => {"pb" => "true"}}) @router.new_client(client) @@ -178,7 +179,7 @@ describe Router do :password => "foobar", :password_confirmation => "foobar") @user.save - client1 = login(@router, @user, "foobar") + client1 = login(@router, @user, "foobar", "1") end @@ -190,40 +191,79 @@ describe Router do music_session = FactoryGirl.create(:music_session, :creator => user1) - music_session_member1 = FactoryGirl.create(:music_session_client, :user => user1, :music_session => music_session) - music_session_member2 = FactoryGirl.create(:music_session_client, :user => user2, :music_session => music_session) + music_session_member1 = FactoryGirl.create(:music_session_client, :user => user1, :music_session => music_session, :client_id => "1") + music_session_member2 = FactoryGirl.create(:music_session_client, :user => user2, :music_session => music_session, :client_id => "2") # make a music_session and define two members - # create client 1, log him in, and log him in to music session - client1 = login(@router, user1, "foobar") - login_music_session(@router, client1, music_session) + # create client 1, log him in, and log him in to music session + client1 = login(@router, user1, "foobar", "1") + login_music_session(@router, client1, music_session) \ + end - it "should allow two valid subscribers to communicate with session-directed messages", :mq => true do + it "should allow two valid subscribers to communicate with session-directed messages", :mq => true do - EventMachine.run do - user1 = FactoryGirl.create(:user) # in the music session - user2 = FactoryGirl.create(:user) # in the music session + EventMachine.run do + user1 = FactoryGirl.create(:user) # in the music session + user2 = FactoryGirl.create(:user) # in the music session - music_session = FactoryGirl.create(:music_session, :creator => user1) - - music_session_member1 = FactoryGirl.create(:music_session_client, :user => user1, :music_session => music_session) - music_session_member2 = FactoryGirl.create(:music_session_client, :user => user2, :music_session => music_session) - - # make a music_session and define two members + music_session = FactoryGirl.create(:music_session, :creator => user1) - # create client 1, log him in, and log him in to music session - client1 = login(@router, user1, "foobar") - login_music_session(@router, client1, music_session) + # create client 1, log him in, and log him in to music session + client1 = login(@router, user1, "foobar", "1") + login_music_session(@router, client1, music_session) - client2 = login(@router, user2, "foobar") - login_music_session(@router, client2, music_session) - EM.stop - end + client2 = login(@router, user2, "foobar", "2") + login_music_session(@router, client2, music_session) + + # make a music_session and define two members + + music_session_member1 = FactoryGirl.create(:music_session_client, :user => user1, :music_session => music_session, :client_id => "1") + music_session_member2 = FactoryGirl.create(:music_session_client, :user => user2, :music_session => music_session, :client_id => "2") + + + EM.stop + end end + it "should allow two valid subscribers to communicate with p2p messages", :mq => true do + + EventMachine.run do + user1 = FactoryGirl.create(:user) # in the music session + user2 = FactoryGirl.create(:user) # in the music session + + music_session = FactoryGirl.create(:music_session, :creator => user1) + + # create client 1, log him in, and log him in to music session + client1 = login(@router, user1, "foobar", "1") + #login_music_session(@router, client1, music_session) + + client2 = login(@router, user2, "foobar", "2") + #login_music_session(@router, client2, music_session) + + # by creating + music_session_member1 = FactoryGirl.create(:music_session_client, :user => user1, :music_session => music_session, :client_id => "1") + + # now attempt to message p2p! + + # first test: user 2 should be able to send a ping message to user 1, even though he isn't in the same session yet + + # create a login message, and pass it into the router via onmsgblock.call + ping = message_factory.ping_request("1", "2") + + #@router.should_receive(:send_to_client) #.with(client1, ping) + + ## send ping to client 2 + #client2.onmsgblock.call ping.to_s + + #music_session_member2 = FactoryGirl.create(:music_session_client, :user => user2, :music_session => music_session, :client_id => "2") + + + EM.stop + end + end end end