require 'pry' require 'set' require 'hot_bunnies' require 'thread' require 'json' require 'eventmachine' import java.util.concurrent.Executors include Jampb # add new field to client connection module EventMachine module WebSocket class Connection < EventMachine::Connection attr_accessor :encode_json, :client_id # client_id is uuid we give to each client to track them as we like end end end module JamWebsockets class Router attr_accessor :user_context_lookup, :session_context_lookup def initialize(options={}) @log = Logging.logger[self] @pending_clients = Set.new # clients that have connected to server, but not logged in. @clients = {} # clients that have logged in @user_context_lookup = {} # lookup a set of client_contexts by user_id @session_context_lookup = {} # lookup a set of client_contexts by session_id @client_lookup = {} # lookup a client by client_id @sessions_exchange = nil @connection = nil @channel = nil @users_exchange = nil @message_factory = JamRuby::MessageFactory.new @semaphore = Mutex.new @user_topic = nil @user_subscription = nil @session_topic = nil @session_subscription = nil @client_topic = nil @client_subscription = nil @thread_pool = nil end def start(options = {}) @log.info "startup" begin @thread_pool = Executors.new_fixed_thread_pool(8) @connection = HotBunnies.connect(:host => options[:host], :port => options[:port]) @channel = @connection.create_channel @channel.prefetch = 10 register_topics rescue => e cleanup raise e end end def add_client(client_id, client) # should never occur if @client_lookup.has_key?(client_id) @log.warn "client_id #{client_id} connected while the old connection has not yet been terminated" end @client_lookup[client_id] = client end def remove_client(client_id, client) deleted = @client_lookup.delete(client_id) if deleted.nil? @log.warn "unable to delete #{client_id} from client_lookup" elsif deleted == client # put it back--this is only possible if add_client hit the 'old connection' path # so in other words if this happens: # add_client(1, clientX) # add_client(1, clientY) # but clientX is essentially defunct - this could happen due to a bug in client, or EM doesn't notify always of connection close in time # remove_client(1, clientX) -- this check maintains that clientY stays as the current client in the hash @client_lookup[client_id] = client end end def add_user(context) user_contexts = @user_context_lookup[context.user.id] if user_contexts.nil? user_contexts = Set.new @user_context_lookup[context.user.id] = user_contexts end user_contexts.add(context) end def remove_user(context) user_contexts = @user_context_lookup[context.user.id] if user_contexts.nil? @log.warn "user can not be removed #{context}" else # delete the context from set of user contexts user_contexts.delete(context) # if last user context, delete entire set (memory leak concern) if user_contexts.length == 0 @user_context_lookup.delete(context.user.id) end end end def add_session(context) session_contexts = @session_context_lookup[context.session.id] if session_contexts.nil? session_contexts = Set.new @session_context_lookup[context.session.id] = session_contexts end session_contexts.add(context) end def remove_session(context) session_contexts = @session_context_lookup[context.session.id] if session_contexts.nil? @log.warn "session can not be removed #{context}" else # delete the context from set of session contexts session_contexts.delete(context) # if last session context, delete entire set (memory leak concern) if session_contexts.length == 0 @session_context_lookup.delete(context.session.id) end context.session = nil end end # register topic for user messages and session messages def register_topics @users_exchange = @channel.exchange('users', :type => :topic) @sessions_exchange = @channel.exchange('sessions', :type => :topic) @clients_exchange = @channel.exchange('clients', :type => :topic) # create user messaging topic @user_topic = @channel.queue("", :auto_delete => true) @user_topic.bind(@users_exchange, :routing_key => "user.#") @user_topic.purge # TODO: alert friends # subscribe for any messages to users @user_subscription = @user_topic.subscribe(:ack => false) @user_subscription.each(:blocking => false, :executor => @threadpool) do |headers, msg| begin routing_key = headers.envelope.routing_key user_id = routing_key["user.".length..-1] @sempahore.synchronize do contexts = @user_context_lookup[user_id] unless contexts.nil? @log.debug "received user-directed message for session: #{user_id}" msg = Jampb::ClientMessage.parse(msg) contexts.each do |context| EM.schedule do @log.debug "sending user message to #{context}" send_to_client(context.client, msg) end end end end rescue => e @log.error "unhandled error in messaging to client" @log.error e end end @session_topic = @channel.queue("", :auto_delete => true) @session_topic.bind(@sessions_exchange, :routing_key => "session.#") @session_topic.purge # subscribe for any messages to session @session_subscription = @session_topic.subscribe(:ack => false) @session_subscription.each(:blocking => false, :executor => @threadpool) do |headers, msg| begin routing_key = headers.envelope.routing_key session_id = routing_key["session.".length..-1] @semaphore.synchronize do contexts = @session_context_lookup[session_id] unless contexts.nil? @log.debug "received session-directed message for session: #{session_id}" msg = Jampb::ClientMessage.parse(msg) # ok, its very odd to have your own message that you sent bounce back to you. # In one small favor to the client, we purposefully disallow messages a client # sent from bouncing back to itself. properties = headers.properties unless headers.nil? inner_headers = properties.headers unless properties.nil? origin_client_id = inner_headers["client_id"] # counter-intuitively, even though a string is passed in when you send the header, an (apparently) auto-generated class is sent back which, if you to_s, returns the original value origin_client_id = origin_client_id.to_s unless origin_client_id.nil? @log.debug "message received from client #{origin_client_id}" contexts.each do |context| if context.client.client_id != origin_client_id EM.schedule do @log.debug "sending session message to #{context}" send_to_client(context.client, msg) end end end end end rescue => e @log.error "unhandled error in messaging to client" @log.error e end end @client_topic = @channel.queue("", :auto_delete => true) @client_topic.bind(@clients_exchange, :routing_key => "client.#") @client_topic.purge # subscribe for any p2p messages to a client @client_subscription = @client_topic.subscribe(:ack => false) @client_subscription.each(:blocking => false, :executor => @threadpool) do |headers, msg| begin routing_key = headers.envelope.routing_key client_id = routing_key["client.".length..-1] @semaphore.synchronize do client = @client_lookup[client_id] unless client.nil? msg = Jampb::ClientMessage.parse(msg) properties = headers.properties unless headers.nil? inner_headers = properties.headers unless properties.nil? origin_client_id = inner_headers["client_id"] @log.debug "p2p message received from client #{origin_client_id} to client #{client_id}" EM.schedule do @log.debug "sending p2p message to #{client_id}" send_to_client(client, msg) end end end rescue => e @log.error "unhandled error in messaging to client" @log.error e end end end def send_to_client(client, msg) if client.encode_json client.send(msg.to_json.to_s) else # this is so odd that this is necessary from an API perspective. but searching through the source code... it's all I could find in em-websocket for allowing a binary message to be sent client.instance_variable_get(:@handler).send_frame(:binary, msg.to_s) end end def cleanup() # shutdown topic listeners and mq connection begin if !@user_subscription.nil? && @user_subscription.active? @log.debug "cleaning up user subscription" @user_subscription.cancel @user_subscription.shutdown! end if !@session_subscription.nil? && @session_subscription.active? @log.debug "cleaning up session subscription" @session_subscription.cancel @session_subscription.shutdown! end if !@client_subscription.nil? && @client_subscription.active? @log.debug "cleaning up client subscription" @client_subscription.cancel @client_subscription.shutdown! end rescue => e @log.debug "unable to cancel subscription on cleanup: #{e}" end @thread_pool.shutdown if !@channel.nil? @channel.close end if !@connection.nil? @connection.close end # tear down each individual client @clients.each do |client, context| cleanup_client(client) end end def stop @log.info "shutdown" cleanup end def new_client(client) @semaphore.synchronize do @pending_clients.add(client) end # default to using json instead of pb client.encode_json = true client.onopen { #binding.pry @log.debug "client connected #{client}" # check for '?pb' or '?pb=true' in url query parameters query_pb = client.request["query"]["pb"] if !query_pb.nil? && (query_pb == "" || query_pb == "true") client.encode_json = false end } client.onclose { @log.debug "Connection closed" cleanup_client(client) } client.onerror { |error| if error.kind_of?(EM::WebSocket::WebSocketError) @log.error "websockets error: #{error}" else @log.error "generic error: #{error} #{error.backtrace}" end cleanup_client(client) client.close_websocket } client.onmessage { |msg| @log.debug("msg received") # TODO: set a max message size before we put it through PB? # TODO: rate limit? begin if client.encode_json #example: {"type":"LOGIN", "route_to":"server", "login" : {"username":"hi"}} parse = JSON.parse(msg) pb_msg = Jampb::ClientMessage.json_create(parse) self.route(pb_msg, client) else pb_msg = Jampb::ClientMessage.parse(msg.to_s) self.route(pb_msg, client) end rescue SessionError => e @log.info "ending client session deliberately due to malformed client behavior. reason=#{e}" begin # wrap the message up and send it down error_msg = @message_factory.server_rejection_error(e.to_s) send_to_client(client, error_msg) ensure client.close_websocket cleanup_client(client) end rescue PermissionError => e @log.info "permission error. reason=#{e.to_s}" @log.info e # wrap the message up and send it down error_msg = @message_factory.server_permission_error(msg.message_id, e.to_s) send_to_client(client, error_msg) rescue => e @log.error "ending client session due to server programming or runtime error. reason=#{e.to_s}" @log.error e begin # wrap the message up and send it down error_msg = @message_factory.server_generic_error(e.to_s) send_to_client(client, error_msg) ensure client.close_websocket cleanup_client(client) end end } end # removes all resources associated with a client def cleanup_client(client) @semaphore.synchronize do pending = @pending_clients.delete?(client) if !pending.nil? @log.debug "cleaning up pending client #{client}" else remove_client(client.client_id, client) context = @clients.delete(client) if !context.nil? remove_user(context) if !context.session.nil? remove_session(context) end else @log.debug "skipping duplicate cleanup attempt of authorized client" end end end end def route(client_msg, client) message_type = @message_factory.get_message_type(client_msg) raise SessionError, "unknown message type received: #{client_msg.type}" if message_type.nil? @log.debug("msg received #{message_type}") raise SessionError, 'client_msg.route_to is null' if client_msg.route_to.nil? if @pending_clients.include? client and client_msg.type != ClientMessage::Type::LOGIN # this client has not logged in and is trying to send a non-login message raise SessionError, "must 'Login' first" end if @message_factory.server_directed? client_msg handle_server_directed(client_msg, client) elsif @message_factory.client_directed? client_msg to_client_id = client_msg.route_to[MessageFactory::CLIENT_TARGET_PREFIX.length..-1] handle_client_directed(to_client_id, client_msg, client) elsif @message_factory.session_directed? client_msg session = client_msg.route_to[MessageFactory::SESSION_TARGET_PREFIX.length..-1] handle_session_directed(session, client_msg, client) elsif @message_factory.user_directed? client_msg user = client_msg.route_to[MessageFactory::USER_PREFIX_TARGET.length..-1] handle_user_directed(user, client_msg, client) else raise SessionError, "client_msg.route_to is unknown type: #{client_msg.route_to}" end end def handle_server_directed(client_msg, client) if client_msg.type == ClientMessage::Type::LOGIN handle_login(client_msg.login, client) elsif client_msg.type == ClientMessage::Type::HEARTBEAT handle_heartbeat(client_msg.heartbeat, client) elsif client_msg.type == ClientMessage::Type::LOGIN_MUSIC_SESSION handle_join_music_session(client_msg.login_music_session, client) elsif client_msg.type == ClientMessage::Type::LEAVE_MUSIC_SESSION handle_leave_music_session(client_msg.leave_music_session, client) else raise SessionError, "unknown message type '#{client_msg.type}' for #{client_msg.route_to}-directed message" end end def handle_login(login, client) username = login.username if login.value_for_tag(1) password = login.password if login.value_for_tag(2) token = login.token if login.value_for_tag(3) client_id = login.client_id if login.value_for_tag(4) # you don't have to supply client_id in login--if you don't, we'll generate one if client_id.nil? # give a unique ID to this client. This is used to prevent session messages # from echoing back to the sender, for instance. client_id = UUIDTools::UUID.random_create.to_s end client.client_id = client_id user = valid_login(username, password, token, client_id) if !user.nil? @log.debug "user #{user.email} logged in" # respond with LOGIN_ACK to let client know it was successful #binding.pry remote_port, remote_ip = Socket.unpack_sockaddr_in(client.get_peername) login_ack = @message_factory.login_ack(remote_ip, client_id, user.remember_token) send_to_client(client, login_ack) # remove from pending_queue @semaphore.synchronize do @pending_clients.delete(client) # add a tracker for this user context = ClientContext.new(user, client) @clients[client] = context add_user(context) add_client(client_id, client) end else raise SessionError, 'invalid login' end end def handle_heartbeat(heartbeat, client) # todo: manage staleness end def handle_join_music_session(join_music_session, client) # verify that the current user has the rights to actually join the music session context = @clients[client] session_id = join_music_session.music_session begin session = access_music_session(session_id, context.user) @log.debug "user #{context} joining new session #{session}" @semaphore.synchronize do old_session = context.session if !old_session.nil? @log.debug "#{context} is already in session. auto-logging out to join new session." remove_session(context) end context.session = session add_session(context) end rescue => e # send back a failure ack and bail @log.debug "client requested non-existent session. client:#{client.request['origin']} user:#{context.user.email}" login_music_session = @message_factory.login_music_session_ack(true, e.to_s) send_to_client(client, login_music_session) return end # respond with LOGIN_MUSIC_SESSION_ACK to let client know it was successful login_music_session = @message_factory.login_music_session_ack(false, nil) send_to_client(client, login_music_session) # send 'new client' message to other members in the session handle_session_directed(session_id, @message_factory.user_joined_music_session(context.user.id, context.user.name), client) end def handle_leave_music_session(leave_music_session, client) context = @clients[client] raise SessionError, "unsupported" end def valid_login(username, password, token, client_id) if !token.nil? && token != '' @log.debug "logging in via token" # attempt login with token user = User.find_by_remember_token(token) if user.nil? @log.debug "no user found with token" return false else @log.debug "#{user} login via token" return user end elsif !username.nil? and !password.nil? @log.debug "logging in via user/pass '#{username}' '#{password}'" # attempt login with username and password user = User.find_by_email(username) if !user.nil? && user.authenticate(password) @log.debug "#{user} login via password" return user else @log.debug "#{username} login failure" return nil end else raise SessionError, 'no login data was found in Login message' end end def access_music_session(music_session_id, user) music_session = MusicSession.find_by_id(music_session_id) if music_session.nil? raise SessionError, 'specified session not found' end if !music_session.access? user raise SessionError, 'not allowed to join the specified session' end return music_session end # client_id = the id of the client being accessed # client = the current client def access_p2p(client_id, user, msg) # ping_request and ping_ack messages are special in that they are simply allowed if msg.type == ClientMessage::Type::PING_REQUEST || msg.type == ClientMessage::Type::PING_ACK return nil end music_session_client = MusicSessionClient.find_by_client_id(client_id) if music_session_client.nil? raise PermissionError, 'specified client not found' end if !music_session_client.access_p2p? user raise SessionError, 'not allowed to message this client' end end def handle_session_directed(session_id, client_msg, client) context = @clients[client] # by not catching any exception here, this will kill the connection # if for some reason the client is trying to send to a session that it doesn't # belong to session = access_music_session(session_id, context.user) @log.debug "publishing to session #{session} from client_id #{client.client_id}" # put it on the topic exchange for sessions @sessions_exchange.publish(client_msg.to_s, :routing_key => "session.#{session_id}", :properties => {:headers => {"client_id" => client.client_id}}) end def handle_client_directed(to_client_id, client_msg, client) context = @clients[client] # by not catching any exception here, a PermissionError will be thrown if this isn't valid # if for some reason the client is trying to send to a client that it doesn't # belong to access_p2p(to_client_id, context.user, client_msg) # populate routing data client_msg.from = client.client_id @log.debug "publishing to client #{to_client_id} from client_id #{client.client_id}" # put it on the topic exchange for clients @clients_exchange.publish(client_msg.to_s, :routing_key => "client.#{to_client_id}", :properties => {:headers => {"client_id" => client.client_id}}) end def handle_user_directed(user, client_msg, client) raise SessionError, 'not implemented' end end end