diff --git a/Gemfile b/Gemfile index 1b800a230..987c4a2b0 100644 --- a/Gemfile +++ b/Gemfile @@ -11,7 +11,7 @@ gem 'pg_migrate','0.1.5' #:path => '~/workspace/pg_migrate_ruby' #'0.1.4' gem 'jam_db', :path => '~/workspace/jam-db/target/ruby_package' gem 'jam_ruby', :path => '~/workspace/jam-ruby' gem 'jampb', :path => '~/workspace/jam-pb/target/ruby/jampb' -gem 'em-websocket' # :path=> '~/workspace/em-websocket' +gem 'em-websocket', :path=> '~/workspace/em-websocket' gem 'hot_bunnies', '1.3.8' gem 'activerecord', '3.2.7' gem 'logging' diff --git a/README.md b/README.md index dd14c7de9..cd61f2a75 100644 --- a/README.md +++ b/README.md @@ -5,6 +5,8 @@ TODO & DESIGN LIMITATIONS * The rabbitmq connection isn't pooled. Throughput limitation (but could be resolved by just starting more instances of JamWebsocket behind Haproxy) * The database connection isn't pooled. Throughput limitation (but could be resolved by just starting more instances of JamWebsocket behind Haproxy) +* We make just one user topic registration and session registration for all users/sessions. If ever we had 10 of servers, it could be wasteful. It just depends on how fast the bogus messaging can be ignored +* The database connection is pooled. * The user model is stored in memory, meaning periodically it should be reloaded from the database (in case a user was marked inactive and you want them knocked out of the system) * The user could easily join to multiple sessions. Currently, though, the ClientContext object only tracks one jam session topic subscription. This is minimial to change. -* peek logic not implemented on server for protoc messages; this could be done to save cost of deserialization and serialization for session/user directed messages \ No newline at end of file +* peek logic not implemented on server for protoc messages; this could be done to save cost of deserialization and serialization for session/user directed messages diff --git a/bin/websocket_gateway b/bin/websocket_gateway index 7cb03c5af..8087fdb46 100755 --- a/bin/websocket_gateway +++ b/bin/websocket_gateway @@ -22,4 +22,4 @@ end Logging.logger.root.appenders = Logging.appenders.stdout ActiveRecord::Base.establish_connection(db_config) -Server.new.run :port => config["port"], :debug => false#=> config["verbose"] +Server.new.run :port => config["port"], :debug => true# config["debug"] diff --git a/config/application.yml b/config/application.yml index cc37caeb7..34bf0c45d 100644 --- a/config/application.yml +++ b/config/application.yml @@ -1,6 +1,7 @@ development: port: 6767 verbose: true + emwebsocket_debug: false test: port: 6769 diff --git a/lib/jam_websockets.rb b/lib/jam_websockets.rb index 6c249661a..b09be5a42 100644 --- a/lib/jam_websockets.rb +++ b/lib/jam_websockets.rb @@ -1,6 +1,7 @@ require "logging" require "jam_ruby" require "jam_websockets/version" +require "jam_websockets/session_error" require "jam_websockets/client_context" require "jam_websockets/router" require "jam_websockets/server" diff --git a/lib/jam_websockets/client_context.rb b/lib/jam_websockets/client_context.rb index ddcf6e5cf..eb40fb215 100644 --- a/lib/jam_websockets/client_context.rb +++ b/lib/jam_websockets/client_context.rb @@ -1,13 +1,18 @@ module JamWebsockets class ClientContext - attr_accessor :user, :user_queue, :session_topic, :subscription + attr_accessor :user, :client, :msg_count, :session - def initialize(user, user_queue, subscription) + def initialize(user, client) @user = user - @user_queue = user_queue - @subscription = subscription - @session_topic = nil + @client = client + @msg_count = 0 + @session = nil end + + def to_s + return "Client[user:#{@user} client:#{@client} msgs:#{@msg_count} session:#{@session}]" + end + end end diff --git a/lib/jam_websockets/router.rb b/lib/jam_websockets/router.rb index 20cac0e87..5150fa918 100644 --- a/lib/jam_websockets/router.rb +++ b/lib/jam_websockets/router.rb @@ -1,33 +1,46 @@ require 'set' require 'hot_bunnies' +require 'thread' + +import java.util.concurrent.Executors include Jampb module JamWebsockets class Router + attr_accessor :user_context_lookup, :session_context_lookup + def initialize(options={}) @log = Logging.logger[self] @pending_clients = Set.new # clients that have connected to server, but not logged in. @clients = {} # clients that have logged in - @sessions = nil + @user_context_lookup = {} # lookup a set of client_contexts by user_id + @session_context_lookup = {} # lookup a set of client_contexts by session_id + @sessions_exchange = nil @connection = nil @channel = nil - @endpoints = nil + @users_exchange = nil @message_factory = JamRuby::MessageFactory.new + @semaphore = Mutex.new + @user_topic = nil + @user_subscription = nil + @session_topic = nil + @session_subscription = nil + @thread_pool = nil end def start(options = {}) - @log.debug "startup" + @log.info "startup" begin + @thread_pool = Executors.new_fixed_thread_pool(8) @connection = HotBunnies.connect(:host => options[:host], :port => options[:port]) @channel = @connection.create_channel @channel.prefetch = 10 - @endpoints = @channel.exchange('client_endpoints', :type => :direct) - @sessions = @channel.exchange('client_sessions', :type => :topic) + register_topics rescue => e cleanup raise e @@ -35,29 +48,180 @@ module JamWebsockets end - def cleanup() - @clients.each do |client, context| - cleanup_client(client) - #context.user_queue.unbind(@endpoints) - end + def add_user(context) + user_contexts = @user_context_lookup[context.user.id] + if user_contexts.nil? + user_contexts = Set.new + @user_context_lookup[context.user.id] = user_contexts + end - if !@channel.nil? - @channel.close - end + user_contexts.add(context) + end - if !@connection.nil? - @connection.close + def remove_user(context) + user_contexts = @user_context_lookup[context.user.id] + if user_contexts.nil? + @log.warn "user can not be removed #{context}" + else + # delete the context from set of user contexts + user_contexts.delete(context) + + # if last user context, delete entire set (memory leak concern) + if user_contexts.length == 0 + @user_context_lookup.delete(context.user.id) + end end end + def add_session(context) + session_contexts = @session_context_lookup[context.session.id] + if session_contexts.nil? + session_contexts = Set.new + @session_context_lookup[context.session.id] = session_contexts + end + + session_contexts.add(context) + end + + def remove_session(context) + session_contexts = @session_context_lookup[context.session.id] + if session_contexts.nil? + @log.warn "session can not be removed #{context}" + else + # delete the context from set of session contexts + session_contexts.delete(context) + + # if last session context, delete entire set (memory leak concern) + if session_contexts.length == 0 + @session_context_lookup.delete(context.session.id) + end + + context.session = nil + end + end + + + # register topic for user messages and session messages + def register_topics + @users_exchange = @channel.exchange('users', :type => :topic) + @sessions_exchange = @channel.exchange('sessions', :type => :topic) + + # create user messaging topic + @user_topic = @channel.queue("", :auto_delete => true) + @user_topic.bind(@users_exchange, :routing_key => "user.#") + @user_topic.purge + + # TODO: alert friends + + # subscribe for any messages to users + + #@user_subscription = @user_topic.subscribe(:ack => false, :blocking => false, :executor => @threadpool) do |headers, msg| + @user_subscription = @user_topic.subscribe(:ack => false) + @user_subscription.each(:blocking => false, :executor => @threadpool) do |headers, msg| + begin + routing_key = headers.envelope.routing_key + user_id = routing_key["user.".length..-1] + @sempahore.synchronize do + contexts = @user_context_lookup[user_id] + + unless contexts.nil? + + @log.debug "received user-directed message for session: #{user_id}" + + contexts.each do |context| + @log.debug "sending user message to #{context}" + EM.schedule do + context.client.instance_variable_get(:@handler).send_frame(:binary, msg) + end + end + end + end + + rescue => e + @log.error "unhandled error in messaging to client" + end + end + + @session_topic = @channel.queue("", :auto_delete => true) + @session_topic.bind(@sessions_exchange, :routing_key => "session.#") + @session_topic.purge + + # subscribe for any messages to session + #@session_subscription = @session_topic.subscribe(:ack => false, :blocking => false) do |headers, msg| + @session_subscription = @session_topic.subscribe(:ack => false) + @session_subscription.each(:blocking => false, :executor => @threadpool) do |headers, msg| + begin + routing_key = headers.envelope.routing_key + session_id = routing_key["session.".length..-1] + @semaphore.synchronize do + contexts = @session_context_lookup[session_id] + + unless contexts.nil? + + @log.debug "received session-directed message for session: #{session_id}" + + contexts.each do |context| + @log.debug "sending session message to #{context}" + EM.schedule do + @log.debug "ONTUHNOTEHU" + context.client.instance_variable_get(:@handler).send_frame(:binary, msg) + @log.debug "gross" + end + end + end + end + + rescue => e + @log.error "unhandled error in messaging to client" + end + end + end + + def cleanup() + # shutdown topic listeners and mq connection + begin + if !@user_subscription.nil? && @user_subscription.active? + @log.debug "cleaning up user subscription" + @user_subscription.cancel + @user_subscription.shutdown! + end + + if !@session_subscription.nil? && @session_subscription.active? + @log.debug "cleaning up session subscription" + @session_subscription.cancel + @session_subscription.shutdown! + end + + rescue => e + @log.debug "unable to cancel subscription on cleanup: #{e}" + end + + @thread_pool.shutdown + + if !@channel.nil? + @channel.close + end + + if !@connection.nil? + @connection.close + end + + # tear down each individual client + @clients.each do |client, context| + cleanup_client(client) + end + end + def stop - @log.debug "shutdown" + @log.info "shutdown" cleanup end def new_client(client) - @pending_clients.add(client) + @semaphore.synchronize do + @pending_clients.add(client) + end client.onopen { @log.debug "client connected #{client}" @@ -77,6 +241,7 @@ module JamWebsockets end cleanup_client(client) + client.close_websocket } client.onmessage { |msg| @@ -88,79 +253,70 @@ module JamWebsockets begin pb_msg = Jampb::ClientMessage.parse(msg.to_s) self.route(pb_msg, client) + rescue SessionError => e + @log.info "ending client session deliberately due to malformed client behavior. reason=#{e}" + begin + # wrap the message up and send it down + client.send(@message_factory.server_rejection_error(e.to_s).to_s) + ensure + client.close_websocket + cleanup_client(client) + end rescue => e - @log.debug "ending client session due to error: #{e.to_s}" - @log.debug e - - begin + @log.error "ending client session due to server programming or runtime error. reason=#{e.to_s}" + @log.error e + + begin # wrap the message up and send it down client.send(@message_factory.server_generic_error(e.to_s).to_s) ensure + client.close_websocket cleanup_client(client) end - end } end + # removes all resources associated with a client def cleanup_client(client) - @log.debug "cleaning up client #{client}" - begin - @pending_clients.delete(client) - context = @clients.delete(client) - - if !context.nil? - begin - if context.subscription.active? - @log.debug "cleaning up user subscription" - context.subscription.shutdown! - end + @semaphore.synchronize do + pending = @pending_clients.delete?(client) - if !context.session_topic.nil? && context.session_topic.active? - @log.debug "cleaning up session subscription" - context.session_topic.shutdown! - end + if !pending.nil? + @log.debug "cleaning up pending client #{client}" + else + context = @clients.delete(client) - rescue => e - @log.debug "unable to cancel subscription on cleanup: #{e}" + if !context.nil? + + remove_user(context) + + if !context.session.nil? + remove_session(context) + end + else + @log.debug "skipping duplicate cleanup attempt of authorized client" end - mark_context_for_deletion(context) - end - - ensure - client.close_websocket - end - end - - # we want to eventually not do all the work here until a grace reconnect period has elasped - def mark_context_for_deletion(context) - # TODO handle notifies to sessions and friends about disconnect - end - - def extract_inner_message(client_msg) - msg = client_msg.value_for_tag(client_msg.type) - - if msg.nil? - raise "inner message is null. type: #{client_msg.type}, target: #{client_msg.target}" - end - - return msg + end + end end def route(client_msg, client) - @log.debug("msg #{client_msg.type} #{client_msg.target}") + message_type = @message_factory.get_message_type(client_msg) + + raise SessionError, "unknown message type received: #{client_msg.type}" if message_type.nil? + + @log.debug("msg received #{message_type}") - if client_msg.target.nil? - raise 'client_msg.target is null' - end + raise SessionError, 'client_msg.target is null' if client_msg.target.nil? if @pending_clients.include? client and client_msg.type != ClientMessage::Type::LOGIN # this client has not logged in and is trying to send a non-login message - raise "must 'Login' first" + raise SessionError, "must 'Login' first" end if @message_factory.server_directed? client_msg @@ -178,13 +334,12 @@ module JamWebsockets handle_user_directed(user, client_msg, client) else - raise "client_msg.target is unknown type: #{client_msg.target}" + raise SessionError, "client_msg.target is unknown type: #{client_msg.target}" end end def handle_server_directed(client_msg, client) - type, inner_msg = extract_inner_message(client_msg) if client_msg.type == ClientMessage::Type::LOGIN @@ -203,7 +358,7 @@ module JamWebsockets handle_leave_jam_session(client_msg.leave_jam_session, client) else - raise "unknown message type '#{client_msg.type}' for #{client_msg.target}-directed message" + raise SessionError, "unknown message type '#{client_msg.type}' for #{client_msg.target}-directed message" end end @@ -218,45 +373,45 @@ module JamWebsockets @log.debug "user #{user.email} logged in" - # create a queue for just this user - queue = @channel.queue(user.id) - queue.bind(@endpoints, :routing_key => user.id) - queue.purge - - # TODO: alert friends - - # subscribe for any messages to self - subscription = queue.subscribe(:ack => true, :blocking => false) do |headers, msg| - client.send(msg) - headers.ack - end - # respond with LOGIN_ACK to let client know it was successful client.send(@message_factory.login_ack(client.request["origin"]).to_s) # remove from pending_queue - @pending_clients.delete(client) + @semaphore.synchronize do + @pending_clients.delete(client) - # add a tracker for this user - context = ClientContext.new(user, queue, subscription) - @clients[client] = context + # add a tracker for this user + context = ClientContext.new(user, client) + @clients[client] = context + add_user(context) + end else - raise 'invalid login' + raise SessionError, 'invalid login' end end def handle_heartbeat(heartbeat, client) - # todo + # todo: manage staleness end def handle_join_jam_session(join_jam_session, client) # verify that the current user has the rights to actually join the jam session context = @clients[client] - session_id = join_jam_session.jam_session; + session_id = join_jam_session.jam_session begin - access_jam_session?(session_id, context.user) + session = access_jam_session?(session_id, context.user) + @log.debug "user #{context} joining new session #{session}" + @semaphore.synchronize do + old_session = context.session + if !old_session.nil? + @log.debug "#{context} is already in session. auto-logging out to join new session." + remove_session(context) + end + context.session = session + add_session(context) + end rescue => e # send back a failure ack and bail @log.debug "client requested non-existent session. client:#{client.request['origin']} user:#{context.user.email}" @@ -264,101 +419,83 @@ module JamWebsockets return end - topic = @channel.queue(session_id) - topic.bind(@sessions, :routing_key => session_id) - topic.purge - - # subscribe for any messages to session - subscription = topic.subscribe(:ack => false, :blocking => false) do |headers, msg| - client.send(msg) - #headers.ack - end - - old_session = context.session_topic - if !old_session.nil? && old_session.active? - # remove subscription to previous session - @log.debug "auto-removing user from previous session" - old_session.shutdown! - end - - context.session_topic = subscription - # respond with LOGIN_JAM_SESSION_ACK to let client know it was successful - client.send(@message_factory.login_jam_session_ack(false, nil)) + client.send(@message_factory.login_jam_session_ack(false, nil).to_s) - # send 'new client' message - + # send 'new client' message to other members in the session + handle_session_directed(session_id, + @message_factory.user_joined_jam_session(context.user.id, context.user.name), + client) end - end + def handle_leave_jam_session(leave_jam_session, client) - def handle_leave_jam_session(leave_jam_session, client) + context = @clients[client] - context = @clients[client] + raise SessionError, "unsupported" + end - raise 'unsupported' - end + def valid_login(username, password, token) - def valid_login(username, password, token) + if !username.nil? and !password.nil? + # attempt login with username and password + user = User.find_by_email(username) - if !username.nil? and !password.nil? - # attempt login with username and password - user = User.find_by_email(username) + if !user.nil? && user.authenticate(password) + @log.debug "#{username} login via password" + return user + else + @log.debug "#{username} login failure" + return nil + end + elsif !token.nil? + # attempt login with token + user = User.find_by_remember_token(token) - if !user.nil? && user.authenticate(password) - @log.debug "#{username} login via password" - return user - else - @log.debug "#{username} login failure" - return nil - end - elsif !token.nil? - # attempt login with token - user = User.find_by_remember_token(token) + if user.nil? + @log.debug "no user found with token" + return false + else + @log.debug "#{username} login via token" + return nil + end + else + raise SessionError, 'no login data was found in Login message' + end - if user.nil? - @log.debug "no user found with token" - return false - else - @log.debug "#{username} login via token" - return nil - end - else - raise 'no login data was found in Login message' - end + end - end + def access_jam_session?(jam_session_id, user) + jam_session = JamSession.find_by_id(jam_session_id) - def access_jam_session?(jam_session_id, user) - jam_session = JamSession.find_by_id(jam_session_id) + if jam_session.nil? + raise SessionError, 'specified session not found' + end - if jam_session.nil? - raise 'specified session not found' - end + if !jam_session.access? user + raise SessionError, 'not allowed to join the specified session' + end - if !jam_session.access? user - raise 'not allowed to join the specified session' - end + return jam_session + end - return jam_session - end + def handle_session_directed(session_id, client_msg, client) - def handle_session_directed(session, client_msg, client) - type, inner_msg = extract_inner_message(client_msg) + context = @clients[client] - context = @clients[client] + # by not catching any exception here, this will kill the connection + # if for some reason the client is trying to send to a session that it doesn't + # belong to + session = access_jam_session?(session_id, context.user) - # by not catching any exception here, this will kill the connection - # if for some reason the client is trying to send to a session that it doesn't - # belong to - access_jam_session?(session, context.user) + @log.debug "publishing to session #{session}" + # put it on the topic exchange for sessions + @sessions_exchange.publish(client_msg.to_s, :routing_key => "session.#{session_id}") + end - # put it on the topic exchange for sessions - @sessions.publish(client_msg.to_s, :routing_key => session) - end + def handle_user_directed(user, client_msg, client) - def handle_user_directed(user, client_msg, client) - type, inner_msg = extract_inner_message(client_msg) - - end + raise SessionError, 'not implemented' + end + end end diff --git a/lib/jam_websockets/server.rb b/lib/jam_websockets/server.rb index 968ccc1e3..470eaecb9 100644 --- a/lib/jam_websockets/server.rb +++ b/lib/jam_websockets/server.rb @@ -14,19 +14,19 @@ module JamWebsockets host = "0.0.0.0" port = options[:port] - @log.debug "starting server #{host}:#{port}" + @log.info "starting server #{host}:#{port}" @router.start # if you don't do this, the app won't exit unless you kill -9 at_exit do - @log.debug "cleaning up server" + @log.info "cleaning up server" @router.cleanup end EventMachine.run { - EventMachine::WebSocket.start(:host => "0.0.0.0", :port => options[:port], :debug => options[:debug]) do |ws| - @log.debug "new client #{ws}" + EventMachine::WebSocket.start(:host => "0.0.0.0", :port => options[:port], :debug => options[:emwebsocket_debug]) do |ws| + @log.info "new client #{ws}" @router.new_client(ws) end } diff --git a/lib/jam_websockets/session_error.rb b/lib/jam_websockets/session_error.rb new file mode 100644 index 000000000..4d8a82166 --- /dev/null +++ b/lib/jam_websockets/session_error.rb @@ -0,0 +1,4 @@ +class SessionError < Exception + +end + diff --git a/spec/jam_websockets/router_spec.rb b/spec/jam_websockets/router_spec.rb index 2d6733a80..55af3d486 100644 --- a/spec/jam_websockets/router_spec.rb +++ b/spec/jam_websockets/router_spec.rb @@ -1,9 +1,79 @@ require 'spec_helper' require 'thread' + +LoginClient = Class.new do + attr_accessor :onmsgblock, :onopenblock + + def initiaize() + + end + + def onopen(&block) + @onopenblock = block + end + + def onmessage(&block) + @onmsgblock = block + end + + def close(&block) + @oncloseblock = block + end + + def close_websocket() + + end + + def send(msg) + puts msg + end + + def request() + return { "origin" => "1.1.1.1"} + end + +end + + +# does a login and returns client +def login(router, user, password) + + message_factory = MessageFactory.new + client = LoginClient.new + + login_ack = message_factory.login_ack("1.1.1.1") + + client.should_receive(:send).with(login_ack.to_s) + client.should_receive(:onclose) + client.should_receive(:onerror) + client.should_receive(:request).and_return({"origin" => "1.1.1.1"}) + + @router.new_client(client) + client.onopenblock.call + + # create a login message, and pass it into the router via onmsgblock.call + login = message_factory.login_with_user_pass(user.email, password) + + # first log in + client.onmsgblock.call login.to_s + + # then join jam session + return client +end + +def login_jam_session(router, client, jam_session) + message_factory = MessageFactory.new + login_jam_session = message_factory.login_jam_session(jam_session.id) + login_ack = message_factory.login_jam_session_ack(false, nil); + client.should_receive(:send).with(login_ack.to_s) + client.onmsgblock.call login_jam_session.to_s +end + + describe Router do - message_factory = MessageFactory.new + message_factory = MessageFactory.new before do @@ -35,6 +105,25 @@ describe Router do end end + describe "topic routing helpers" do + it "create and delete user lookup set" do + user = double(User) + user.should_receive(:id).any_number_of_times.and_return("1") + client = double("client") + context = ClientContext.new(user, client) + + @router.user_context_lookup.length.should == 0 + + @router.add_user(context) + + @router.user_context_lookup.length.should == 1 + + @router.remove_user(context) + + @router.user_context_lookup.length.should == 0 + end + end + describe "login" do it "should not allow login of bogus user", :mq => true do @@ -54,17 +143,19 @@ describe Router do @onmsgblock = block end - def close() + def close_websocket() + end + def close() end end client = TestClient.new - error_msg = message_factory.server_generic_error("invalid login") + error_msg = message_factory.server_rejection_error("invalid login") client.should_receive(:send).with(error_msg.to_s) - client.should_receive(:close) + client.should_receive(:close_websocket) client.should_receive(:onclose) client.should_receive(:onerror) @@ -83,47 +174,7 @@ describe Router do :password => "foobar", :password_confirmation => "foobar") @user.save - - TestClient = Class.new do - - attr_accessor :onmsgblock, :onopenblock, :oncloseblock - - def initiaize() - - end - - def onopen(&block) - @onopenblock = block - end - - def onmessage(&block) - @onmsgblock = block - end - - def close(&block) - @oncloseblock = block - end - end - - client = TestClient.new - - login_ack = message_factory.login_ack("1.1.1.1") - - client.should_receive(:send).with(login_ack.to_s) - client.should_receive(:close).exactly(0).times # close would occur on error, but this is good path - client.should_receive(:onclose) - client.should_receive(:onerror) - client.should_receive(:ip).and_return("1.1.1.1") - - @router.new_client(client) - client.onopenblock.call - - # create a login message, and pass it into the router via onmsgblock.call - login = message_factory.login_with_user_pass("user@example.com", "foobar") - - # attempt to log in, causing chain of events - client.onmsgblock.call login.to_s - + client1 = login(@router, @user, "foobar") end @@ -140,52 +191,35 @@ describe Router do # make a jam_session and define two members - - TestClient = Class.new do - - attr_accessor :onmsgblock, :onopenblock - - def initiaize() - - end - - def onopen(&block) - @onopenblock = block - end - - def onmessage(&block) - @onmsgblock = block - end - - def close(&block) - @oncloseblock = block - end - end - - client = TestClient.new - - login_ack = message_factory.login_ack("1.1.1.1") - - client.should_receive(:send).with(login_ack.to_s) - #client.should_receive(:close) - client.should_receive(:onclose) - client.should_receive(:onerror) - client.should_receive(:ip).and_return("1.1.1.1") - - @router.new_client(client) - client.onopenblock.call - - # create a login message, and pass it into the router via onmsgblock.call - login = message_factory.login_with_user_pass(user1.email, "foobar") - - # first log in - client.onmsgblock.call login.to_s - - # then join jam session - login_jam_session = message_factory.login_jam_session(jam_session.id) - client.onmsgblock.call login_jam_session.to_s - + # create client 1, log him in, and log him in to jam session + client1 = login(@router, user1, "foobar") + login_jam_session(@router, client1, jam_session) end + + it "should allow two valid subscribers to communicate with session-directed messages", :mq => true do + + EventMachine.run do + user1 = FactoryGirl.create(:user) # in the jam session + user2 = FactoryGirl.create(:user) # in the jam session + + jam_session = FactoryGirl.create(:jam_session, :user => user1) + + jam_session_member1 = FactoryGirl.create(:jam_session_member, :user => user1, :jam_session => jam_session) + jam_session_member2 = FactoryGirl.create(:jam_session_member, :user => user2, :jam_session => jam_session) + + # make a jam_session and define two members + + + # create client 1, log him in, and log him in to jam session + client1 = login(@router, user1, "foobar") + login_jam_session(@router, client1, jam_session) + + client2 = login(@router, user2, "foobar") + login_jam_session(@router, client2, jam_session) + EM.stop + end + end + end end