* login and login_jam_session working with tests. logging cleaned up
This commit is contained in:
parent
89a4be2321
commit
b7fd331c2b
2
Gemfile
2
Gemfile
|
|
@ -11,7 +11,7 @@ gem 'pg_migrate','0.1.5' #:path => '~/workspace/pg_migrate_ruby' #'0.1.4'
|
||||||
gem 'jam_db', :path => '~/workspace/jam-db/target/ruby_package'
|
gem 'jam_db', :path => '~/workspace/jam-db/target/ruby_package'
|
||||||
gem 'jam_ruby', :path => '~/workspace/jam-ruby'
|
gem 'jam_ruby', :path => '~/workspace/jam-ruby'
|
||||||
gem 'jampb', :path => '~/workspace/jam-pb/target/ruby/jampb'
|
gem 'jampb', :path => '~/workspace/jam-pb/target/ruby/jampb'
|
||||||
gem 'em-websocket' # :path=> '~/workspace/em-websocket'
|
gem 'em-websocket', :path=> '~/workspace/em-websocket'
|
||||||
gem 'hot_bunnies', '1.3.8'
|
gem 'hot_bunnies', '1.3.8'
|
||||||
gem 'activerecord', '3.2.7'
|
gem 'activerecord', '3.2.7'
|
||||||
gem 'logging'
|
gem 'logging'
|
||||||
|
|
|
||||||
|
|
@ -5,6 +5,8 @@ TODO & DESIGN LIMITATIONS
|
||||||
|
|
||||||
* The rabbitmq connection isn't pooled. Throughput limitation (but could be resolved by just starting more instances of JamWebsocket behind Haproxy)
|
* The rabbitmq connection isn't pooled. Throughput limitation (but could be resolved by just starting more instances of JamWebsocket behind Haproxy)
|
||||||
* The database connection isn't pooled. Throughput limitation (but could be resolved by just starting more instances of JamWebsocket behind Haproxy)
|
* The database connection isn't pooled. Throughput limitation (but could be resolved by just starting more instances of JamWebsocket behind Haproxy)
|
||||||
|
* We make just one user topic registration and session registration for all users/sessions. If ever we had 10 of servers, it could be wasteful. It just depends on how fast the bogus messaging can be ignored
|
||||||
|
* The database connection is pooled.
|
||||||
* The user model is stored in memory, meaning periodically it should be reloaded from the database (in case a user was marked inactive and you want them knocked out of the system)
|
* The user model is stored in memory, meaning periodically it should be reloaded from the database (in case a user was marked inactive and you want them knocked out of the system)
|
||||||
* The user could easily join to multiple sessions. Currently, though, the ClientContext object only tracks one jam session topic subscription. This is minimial to change.
|
* The user could easily join to multiple sessions. Currently, though, the ClientContext object only tracks one jam session topic subscription. This is minimial to change.
|
||||||
* peek logic not implemented on server for protoc messages; this could be done to save cost of deserialization and serialization for session/user directed messages
|
* peek logic not implemented on server for protoc messages; this could be done to save cost of deserialization and serialization for session/user directed messages
|
||||||
|
|
|
||||||
|
|
@ -22,4 +22,4 @@ end
|
||||||
Logging.logger.root.appenders = Logging.appenders.stdout
|
Logging.logger.root.appenders = Logging.appenders.stdout
|
||||||
|
|
||||||
ActiveRecord::Base.establish_connection(db_config)
|
ActiveRecord::Base.establish_connection(db_config)
|
||||||
Server.new.run :port => config["port"], :debug => false#=> config["verbose"]
|
Server.new.run :port => config["port"], :debug => true# config["debug"]
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,7 @@
|
||||||
development:
|
development:
|
||||||
port: 6767
|
port: 6767
|
||||||
verbose: true
|
verbose: true
|
||||||
|
emwebsocket_debug: false
|
||||||
|
|
||||||
test:
|
test:
|
||||||
port: 6769
|
port: 6769
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,7 @@
|
||||||
require "logging"
|
require "logging"
|
||||||
require "jam_ruby"
|
require "jam_ruby"
|
||||||
require "jam_websockets/version"
|
require "jam_websockets/version"
|
||||||
|
require "jam_websockets/session_error"
|
||||||
require "jam_websockets/client_context"
|
require "jam_websockets/client_context"
|
||||||
require "jam_websockets/router"
|
require "jam_websockets/router"
|
||||||
require "jam_websockets/server"
|
require "jam_websockets/server"
|
||||||
|
|
|
||||||
|
|
@ -1,13 +1,18 @@
|
||||||
module JamWebsockets
|
module JamWebsockets
|
||||||
class ClientContext
|
class ClientContext
|
||||||
|
|
||||||
attr_accessor :user, :user_queue, :session_topic, :subscription
|
attr_accessor :user, :client, :msg_count, :session
|
||||||
|
|
||||||
def initialize(user, user_queue, subscription)
|
def initialize(user, client)
|
||||||
@user = user
|
@user = user
|
||||||
@user_queue = user_queue
|
@client = client
|
||||||
@subscription = subscription
|
@msg_count = 0
|
||||||
@session_topic = nil
|
@session = nil
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def to_s
|
||||||
|
return "Client[user:#{@user} client:#{@client} msgs:#{@msg_count} session:#{@session}]"
|
||||||
|
end
|
||||||
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
||||||
|
|
@ -1,33 +1,46 @@
|
||||||
require 'set'
|
require 'set'
|
||||||
require 'hot_bunnies'
|
require 'hot_bunnies'
|
||||||
|
require 'thread'
|
||||||
|
|
||||||
|
import java.util.concurrent.Executors
|
||||||
|
|
||||||
include Jampb
|
include Jampb
|
||||||
|
|
||||||
module JamWebsockets
|
module JamWebsockets
|
||||||
class Router
|
class Router
|
||||||
|
|
||||||
|
attr_accessor :user_context_lookup, :session_context_lookup
|
||||||
|
|
||||||
def initialize(options={})
|
def initialize(options={})
|
||||||
@log = Logging.logger[self]
|
@log = Logging.logger[self]
|
||||||
@pending_clients = Set.new # clients that have connected to server, but not logged in.
|
@pending_clients = Set.new # clients that have connected to server, but not logged in.
|
||||||
@clients = {} # clients that have logged in
|
@clients = {} # clients that have logged in
|
||||||
@sessions = nil
|
@user_context_lookup = {} # lookup a set of client_contexts by user_id
|
||||||
|
@session_context_lookup = {} # lookup a set of client_contexts by session_id
|
||||||
|
@sessions_exchange = nil
|
||||||
@connection = nil
|
@connection = nil
|
||||||
@channel = nil
|
@channel = nil
|
||||||
@endpoints = nil
|
@users_exchange = nil
|
||||||
@message_factory = JamRuby::MessageFactory.new
|
@message_factory = JamRuby::MessageFactory.new
|
||||||
|
@semaphore = Mutex.new
|
||||||
|
@user_topic = nil
|
||||||
|
@user_subscription = nil
|
||||||
|
@session_topic = nil
|
||||||
|
@session_subscription = nil
|
||||||
|
@thread_pool = nil
|
||||||
end
|
end
|
||||||
|
|
||||||
def start(options = {})
|
def start(options = {})
|
||||||
|
|
||||||
@log.debug "startup"
|
@log.info "startup"
|
||||||
|
|
||||||
begin
|
begin
|
||||||
|
@thread_pool = Executors.new_fixed_thread_pool(8)
|
||||||
@connection = HotBunnies.connect(:host => options[:host], :port => options[:port])
|
@connection = HotBunnies.connect(:host => options[:host], :port => options[:port])
|
||||||
@channel = @connection.create_channel
|
@channel = @connection.create_channel
|
||||||
@channel.prefetch = 10
|
@channel.prefetch = 10
|
||||||
|
|
||||||
@endpoints = @channel.exchange('client_endpoints', :type => :direct)
|
register_topics
|
||||||
@sessions = @channel.exchange('client_sessions', :type => :topic)
|
|
||||||
rescue => e
|
rescue => e
|
||||||
cleanup
|
cleanup
|
||||||
raise e
|
raise e
|
||||||
|
|
@ -35,29 +48,180 @@ module JamWebsockets
|
||||||
|
|
||||||
end
|
end
|
||||||
|
|
||||||
def cleanup()
|
def add_user(context)
|
||||||
@clients.each do |client, context|
|
user_contexts = @user_context_lookup[context.user.id]
|
||||||
cleanup_client(client)
|
if user_contexts.nil?
|
||||||
#context.user_queue.unbind(@endpoints)
|
user_contexts = Set.new
|
||||||
end
|
@user_context_lookup[context.user.id] = user_contexts
|
||||||
|
end
|
||||||
|
|
||||||
if !@channel.nil?
|
user_contexts.add(context)
|
||||||
@channel.close
|
end
|
||||||
end
|
|
||||||
|
|
||||||
if !@connection.nil?
|
def remove_user(context)
|
||||||
@connection.close
|
user_contexts = @user_context_lookup[context.user.id]
|
||||||
|
if user_contexts.nil?
|
||||||
|
@log.warn "user can not be removed #{context}"
|
||||||
|
else
|
||||||
|
# delete the context from set of user contexts
|
||||||
|
user_contexts.delete(context)
|
||||||
|
|
||||||
|
# if last user context, delete entire set (memory leak concern)
|
||||||
|
if user_contexts.length == 0
|
||||||
|
@user_context_lookup.delete(context.user.id)
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def add_session(context)
|
||||||
|
session_contexts = @session_context_lookup[context.session.id]
|
||||||
|
if session_contexts.nil?
|
||||||
|
session_contexts = Set.new
|
||||||
|
@session_context_lookup[context.session.id] = session_contexts
|
||||||
|
end
|
||||||
|
|
||||||
|
session_contexts.add(context)
|
||||||
|
end
|
||||||
|
|
||||||
|
def remove_session(context)
|
||||||
|
session_contexts = @session_context_lookup[context.session.id]
|
||||||
|
if session_contexts.nil?
|
||||||
|
@log.warn "session can not be removed #{context}"
|
||||||
|
else
|
||||||
|
# delete the context from set of session contexts
|
||||||
|
session_contexts.delete(context)
|
||||||
|
|
||||||
|
# if last session context, delete entire set (memory leak concern)
|
||||||
|
if session_contexts.length == 0
|
||||||
|
@session_context_lookup.delete(context.session.id)
|
||||||
|
end
|
||||||
|
|
||||||
|
context.session = nil
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
|
||||||
|
# register topic for user messages and session messages
|
||||||
|
def register_topics
|
||||||
|
@users_exchange = @channel.exchange('users', :type => :topic)
|
||||||
|
@sessions_exchange = @channel.exchange('sessions', :type => :topic)
|
||||||
|
|
||||||
|
# create user messaging topic
|
||||||
|
@user_topic = @channel.queue("", :auto_delete => true)
|
||||||
|
@user_topic.bind(@users_exchange, :routing_key => "user.#")
|
||||||
|
@user_topic.purge
|
||||||
|
|
||||||
|
# TODO: alert friends
|
||||||
|
|
||||||
|
# subscribe for any messages to users
|
||||||
|
|
||||||
|
#@user_subscription = @user_topic.subscribe(:ack => false, :blocking => false, :executor => @threadpool) do |headers, msg|
|
||||||
|
@user_subscription = @user_topic.subscribe(:ack => false)
|
||||||
|
@user_subscription.each(:blocking => false, :executor => @threadpool) do |headers, msg|
|
||||||
|
begin
|
||||||
|
routing_key = headers.envelope.routing_key
|
||||||
|
user_id = routing_key["user.".length..-1]
|
||||||
|
@sempahore.synchronize do
|
||||||
|
contexts = @user_context_lookup[user_id]
|
||||||
|
|
||||||
|
unless contexts.nil?
|
||||||
|
|
||||||
|
@log.debug "received user-directed message for session: #{user_id}"
|
||||||
|
|
||||||
|
contexts.each do |context|
|
||||||
|
@log.debug "sending user message to #{context}"
|
||||||
|
EM.schedule do
|
||||||
|
context.client.instance_variable_get(:@handler).send_frame(:binary, msg)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
rescue => e
|
||||||
|
@log.error "unhandled error in messaging to client"
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
@session_topic = @channel.queue("", :auto_delete => true)
|
||||||
|
@session_topic.bind(@sessions_exchange, :routing_key => "session.#")
|
||||||
|
@session_topic.purge
|
||||||
|
|
||||||
|
# subscribe for any messages to session
|
||||||
|
#@session_subscription = @session_topic.subscribe(:ack => false, :blocking => false) do |headers, msg|
|
||||||
|
@session_subscription = @session_topic.subscribe(:ack => false)
|
||||||
|
@session_subscription.each(:blocking => false, :executor => @threadpool) do |headers, msg|
|
||||||
|
begin
|
||||||
|
routing_key = headers.envelope.routing_key
|
||||||
|
session_id = routing_key["session.".length..-1]
|
||||||
|
@semaphore.synchronize do
|
||||||
|
contexts = @session_context_lookup[session_id]
|
||||||
|
|
||||||
|
unless contexts.nil?
|
||||||
|
|
||||||
|
@log.debug "received session-directed message for session: #{session_id}"
|
||||||
|
|
||||||
|
contexts.each do |context|
|
||||||
|
@log.debug "sending session message to #{context}"
|
||||||
|
EM.schedule do
|
||||||
|
@log.debug "ONTUHNOTEHU"
|
||||||
|
context.client.instance_variable_get(:@handler).send_frame(:binary, msg)
|
||||||
|
@log.debug "gross"
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
rescue => e
|
||||||
|
@log.error "unhandled error in messaging to client"
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def cleanup()
|
||||||
|
# shutdown topic listeners and mq connection
|
||||||
|
begin
|
||||||
|
if !@user_subscription.nil? && @user_subscription.active?
|
||||||
|
@log.debug "cleaning up user subscription"
|
||||||
|
@user_subscription.cancel
|
||||||
|
@user_subscription.shutdown!
|
||||||
|
end
|
||||||
|
|
||||||
|
if !@session_subscription.nil? && @session_subscription.active?
|
||||||
|
@log.debug "cleaning up session subscription"
|
||||||
|
@session_subscription.cancel
|
||||||
|
@session_subscription.shutdown!
|
||||||
|
end
|
||||||
|
|
||||||
|
rescue => e
|
||||||
|
@log.debug "unable to cancel subscription on cleanup: #{e}"
|
||||||
|
end
|
||||||
|
|
||||||
|
@thread_pool.shutdown
|
||||||
|
|
||||||
|
if !@channel.nil?
|
||||||
|
@channel.close
|
||||||
|
end
|
||||||
|
|
||||||
|
if !@connection.nil?
|
||||||
|
@connection.close
|
||||||
|
end
|
||||||
|
|
||||||
|
# tear down each individual client
|
||||||
|
@clients.each do |client, context|
|
||||||
|
cleanup_client(client)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
def stop
|
def stop
|
||||||
@log.debug "shutdown"
|
@log.info "shutdown"
|
||||||
cleanup
|
cleanup
|
||||||
end
|
end
|
||||||
|
|
||||||
def new_client(client)
|
def new_client(client)
|
||||||
|
|
||||||
@pending_clients.add(client)
|
@semaphore.synchronize do
|
||||||
|
@pending_clients.add(client)
|
||||||
|
end
|
||||||
|
|
||||||
client.onopen {
|
client.onopen {
|
||||||
@log.debug "client connected #{client}"
|
@log.debug "client connected #{client}"
|
||||||
|
|
@ -77,6 +241,7 @@ module JamWebsockets
|
||||||
end
|
end
|
||||||
|
|
||||||
cleanup_client(client)
|
cleanup_client(client)
|
||||||
|
client.close_websocket
|
||||||
}
|
}
|
||||||
|
|
||||||
client.onmessage { |msg|
|
client.onmessage { |msg|
|
||||||
|
|
@ -88,79 +253,70 @@ module JamWebsockets
|
||||||
begin
|
begin
|
||||||
pb_msg = Jampb::ClientMessage.parse(msg.to_s)
|
pb_msg = Jampb::ClientMessage.parse(msg.to_s)
|
||||||
self.route(pb_msg, client)
|
self.route(pb_msg, client)
|
||||||
|
rescue SessionError => e
|
||||||
|
@log.info "ending client session deliberately due to malformed client behavior. reason=#{e}"
|
||||||
|
begin
|
||||||
|
# wrap the message up and send it down
|
||||||
|
client.send(@message_factory.server_rejection_error(e.to_s).to_s)
|
||||||
|
ensure
|
||||||
|
client.close_websocket
|
||||||
|
cleanup_client(client)
|
||||||
|
end
|
||||||
rescue => e
|
rescue => e
|
||||||
@log.debug "ending client session due to error: #{e.to_s}"
|
@log.error "ending client session due to server programming or runtime error. reason=#{e.to_s}"
|
||||||
@log.debug e
|
@log.error e
|
||||||
|
|
||||||
begin
|
begin
|
||||||
# wrap the message up and send it down
|
# wrap the message up and send it down
|
||||||
client.send(@message_factory.server_generic_error(e.to_s).to_s)
|
client.send(@message_factory.server_generic_error(e.to_s).to_s)
|
||||||
ensure
|
ensure
|
||||||
|
client.close_websocket
|
||||||
cleanup_client(client)
|
cleanup_client(client)
|
||||||
end
|
end
|
||||||
|
|
||||||
end
|
end
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
end
|
end
|
||||||
|
|
||||||
|
# removes all resources associated with a client
|
||||||
def cleanup_client(client)
|
def cleanup_client(client)
|
||||||
@log.debug "cleaning up client #{client}"
|
|
||||||
begin
|
|
||||||
@pending_clients.delete(client)
|
|
||||||
|
|
||||||
context = @clients.delete(client)
|
@semaphore.synchronize do
|
||||||
|
pending = @pending_clients.delete?(client)
|
||||||
if !context.nil?
|
|
||||||
begin
|
|
||||||
if context.subscription.active?
|
|
||||||
@log.debug "cleaning up user subscription"
|
|
||||||
context.subscription.shutdown!
|
|
||||||
end
|
|
||||||
|
|
||||||
if !context.session_topic.nil? && context.session_topic.active?
|
if !pending.nil?
|
||||||
@log.debug "cleaning up session subscription"
|
@log.debug "cleaning up pending client #{client}"
|
||||||
context.session_topic.shutdown!
|
else
|
||||||
end
|
context = @clients.delete(client)
|
||||||
|
|
||||||
rescue => e
|
if !context.nil?
|
||||||
@log.debug "unable to cancel subscription on cleanup: #{e}"
|
|
||||||
|
remove_user(context)
|
||||||
|
|
||||||
|
if !context.session.nil?
|
||||||
|
remove_session(context)
|
||||||
|
end
|
||||||
|
else
|
||||||
|
@log.debug "skipping duplicate cleanup attempt of authorized client"
|
||||||
end
|
end
|
||||||
|
|
||||||
mark_context_for_deletion(context)
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
ensure
|
|
||||||
client.close_websocket
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
# we want to eventually not do all the work here until a grace reconnect period has elasped
|
|
||||||
def mark_context_for_deletion(context)
|
|
||||||
# TODO handle notifies to sessions and friends about disconnect
|
|
||||||
end
|
|
||||||
|
|
||||||
def extract_inner_message(client_msg)
|
|
||||||
msg = client_msg.value_for_tag(client_msg.type)
|
|
||||||
|
|
||||||
if msg.nil?
|
|
||||||
raise "inner message is null. type: #{client_msg.type}, target: #{client_msg.target}"
|
|
||||||
end
|
|
||||||
|
|
||||||
return msg
|
|
||||||
end
|
end
|
||||||
|
|
||||||
def route(client_msg, client)
|
def route(client_msg, client)
|
||||||
@log.debug("msg #{client_msg.type} #{client_msg.target}")
|
message_type = @message_factory.get_message_type(client_msg)
|
||||||
|
|
||||||
|
raise SessionError, "unknown message type received: #{client_msg.type}" if message_type.nil?
|
||||||
|
|
||||||
|
@log.debug("msg received #{message_type}")
|
||||||
|
|
||||||
if client_msg.target.nil?
|
raise SessionError, 'client_msg.target is null' if client_msg.target.nil?
|
||||||
raise 'client_msg.target is null'
|
|
||||||
end
|
|
||||||
|
|
||||||
if @pending_clients.include? client and client_msg.type != ClientMessage::Type::LOGIN
|
if @pending_clients.include? client and client_msg.type != ClientMessage::Type::LOGIN
|
||||||
# this client has not logged in and is trying to send a non-login message
|
# this client has not logged in and is trying to send a non-login message
|
||||||
raise "must 'Login' first"
|
raise SessionError, "must 'Login' first"
|
||||||
end
|
end
|
||||||
|
|
||||||
if @message_factory.server_directed? client_msg
|
if @message_factory.server_directed? client_msg
|
||||||
|
|
@ -178,13 +334,12 @@ module JamWebsockets
|
||||||
handle_user_directed(user, client_msg, client)
|
handle_user_directed(user, client_msg, client)
|
||||||
|
|
||||||
else
|
else
|
||||||
raise "client_msg.target is unknown type: #{client_msg.target}"
|
raise SessionError, "client_msg.target is unknown type: #{client_msg.target}"
|
||||||
end
|
end
|
||||||
|
|
||||||
end
|
end
|
||||||
|
|
||||||
def handle_server_directed(client_msg, client)
|
def handle_server_directed(client_msg, client)
|
||||||
type, inner_msg = extract_inner_message(client_msg)
|
|
||||||
|
|
||||||
if client_msg.type == ClientMessage::Type::LOGIN
|
if client_msg.type == ClientMessage::Type::LOGIN
|
||||||
|
|
||||||
|
|
@ -203,7 +358,7 @@ module JamWebsockets
|
||||||
handle_leave_jam_session(client_msg.leave_jam_session, client)
|
handle_leave_jam_session(client_msg.leave_jam_session, client)
|
||||||
|
|
||||||
else
|
else
|
||||||
raise "unknown message type '#{client_msg.type}' for #{client_msg.target}-directed message"
|
raise SessionError, "unknown message type '#{client_msg.type}' for #{client_msg.target}-directed message"
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
@ -218,45 +373,45 @@ module JamWebsockets
|
||||||
|
|
||||||
@log.debug "user #{user.email} logged in"
|
@log.debug "user #{user.email} logged in"
|
||||||
|
|
||||||
# create a queue for just this user
|
|
||||||
queue = @channel.queue(user.id)
|
|
||||||
queue.bind(@endpoints, :routing_key => user.id)
|
|
||||||
queue.purge
|
|
||||||
|
|
||||||
# TODO: alert friends
|
|
||||||
|
|
||||||
# subscribe for any messages to self
|
|
||||||
subscription = queue.subscribe(:ack => true, :blocking => false) do |headers, msg|
|
|
||||||
client.send(msg)
|
|
||||||
headers.ack
|
|
||||||
end
|
|
||||||
|
|
||||||
# respond with LOGIN_ACK to let client know it was successful
|
# respond with LOGIN_ACK to let client know it was successful
|
||||||
client.send(@message_factory.login_ack(client.request["origin"]).to_s)
|
client.send(@message_factory.login_ack(client.request["origin"]).to_s)
|
||||||
|
|
||||||
# remove from pending_queue
|
# remove from pending_queue
|
||||||
@pending_clients.delete(client)
|
@semaphore.synchronize do
|
||||||
|
@pending_clients.delete(client)
|
||||||
|
|
||||||
# add a tracker for this user
|
# add a tracker for this user
|
||||||
context = ClientContext.new(user, queue, subscription)
|
context = ClientContext.new(user, client)
|
||||||
@clients[client] = context
|
@clients[client] = context
|
||||||
|
add_user(context)
|
||||||
|
end
|
||||||
else
|
else
|
||||||
raise 'invalid login'
|
raise SessionError, 'invalid login'
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def handle_heartbeat(heartbeat, client)
|
def handle_heartbeat(heartbeat, client)
|
||||||
# todo
|
# todo: manage staleness
|
||||||
end
|
end
|
||||||
|
|
||||||
def handle_join_jam_session(join_jam_session, client)
|
def handle_join_jam_session(join_jam_session, client)
|
||||||
# verify that the current user has the rights to actually join the jam session
|
# verify that the current user has the rights to actually join the jam session
|
||||||
context = @clients[client]
|
context = @clients[client]
|
||||||
|
|
||||||
session_id = join_jam_session.jam_session;
|
session_id = join_jam_session.jam_session
|
||||||
|
|
||||||
begin
|
begin
|
||||||
access_jam_session?(session_id, context.user)
|
session = access_jam_session?(session_id, context.user)
|
||||||
|
@log.debug "user #{context} joining new session #{session}"
|
||||||
|
@semaphore.synchronize do
|
||||||
|
old_session = context.session
|
||||||
|
if !old_session.nil?
|
||||||
|
@log.debug "#{context} is already in session. auto-logging out to join new session."
|
||||||
|
remove_session(context)
|
||||||
|
end
|
||||||
|
context.session = session
|
||||||
|
add_session(context)
|
||||||
|
end
|
||||||
rescue => e
|
rescue => e
|
||||||
# send back a failure ack and bail
|
# send back a failure ack and bail
|
||||||
@log.debug "client requested non-existent session. client:#{client.request['origin']} user:#{context.user.email}"
|
@log.debug "client requested non-existent session. client:#{client.request['origin']} user:#{context.user.email}"
|
||||||
|
|
@ -264,101 +419,83 @@ module JamWebsockets
|
||||||
return
|
return
|
||||||
end
|
end
|
||||||
|
|
||||||
topic = @channel.queue(session_id)
|
|
||||||
topic.bind(@sessions, :routing_key => session_id)
|
|
||||||
topic.purge
|
|
||||||
|
|
||||||
# subscribe for any messages to session
|
|
||||||
subscription = topic.subscribe(:ack => false, :blocking => false) do |headers, msg|
|
|
||||||
client.send(msg)
|
|
||||||
#headers.ack
|
|
||||||
end
|
|
||||||
|
|
||||||
old_session = context.session_topic
|
|
||||||
if !old_session.nil? && old_session.active?
|
|
||||||
# remove subscription to previous session
|
|
||||||
@log.debug "auto-removing user from previous session"
|
|
||||||
old_session.shutdown!
|
|
||||||
end
|
|
||||||
|
|
||||||
context.session_topic = subscription
|
|
||||||
|
|
||||||
# respond with LOGIN_JAM_SESSION_ACK to let client know it was successful
|
# respond with LOGIN_JAM_SESSION_ACK to let client know it was successful
|
||||||
client.send(@message_factory.login_jam_session_ack(false, nil))
|
client.send(@message_factory.login_jam_session_ack(false, nil).to_s)
|
||||||
|
|
||||||
# send 'new client' message
|
# send 'new client' message to other members in the session
|
||||||
|
handle_session_directed(session_id,
|
||||||
|
@message_factory.user_joined_jam_session(context.user.id, context.user.name),
|
||||||
|
client)
|
||||||
end
|
end
|
||||||
|
|
||||||
end
|
def handle_leave_jam_session(leave_jam_session, client)
|
||||||
|
|
||||||
def handle_leave_jam_session(leave_jam_session, client)
|
context = @clients[client]
|
||||||
|
|
||||||
context = @clients[client]
|
raise SessionError, "unsupported"
|
||||||
|
end
|
||||||
|
|
||||||
raise 'unsupported'
|
def valid_login(username, password, token)
|
||||||
end
|
|
||||||
|
|
||||||
def valid_login(username, password, token)
|
if !username.nil? and !password.nil?
|
||||||
|
# attempt login with username and password
|
||||||
|
user = User.find_by_email(username)
|
||||||
|
|
||||||
if !username.nil? and !password.nil?
|
if !user.nil? && user.authenticate(password)
|
||||||
# attempt login with username and password
|
@log.debug "#{username} login via password"
|
||||||
user = User.find_by_email(username)
|
return user
|
||||||
|
else
|
||||||
|
@log.debug "#{username} login failure"
|
||||||
|
return nil
|
||||||
|
end
|
||||||
|
elsif !token.nil?
|
||||||
|
# attempt login with token
|
||||||
|
user = User.find_by_remember_token(token)
|
||||||
|
|
||||||
if !user.nil? && user.authenticate(password)
|
if user.nil?
|
||||||
@log.debug "#{username} login via password"
|
@log.debug "no user found with token"
|
||||||
return user
|
return false
|
||||||
else
|
else
|
||||||
@log.debug "#{username} login failure"
|
@log.debug "#{username} login via token"
|
||||||
return nil
|
return nil
|
||||||
end
|
end
|
||||||
elsif !token.nil?
|
else
|
||||||
# attempt login with token
|
raise SessionError, 'no login data was found in Login message'
|
||||||
user = User.find_by_remember_token(token)
|
end
|
||||||
|
|
||||||
if user.nil?
|
end
|
||||||
@log.debug "no user found with token"
|
|
||||||
return false
|
|
||||||
else
|
|
||||||
@log.debug "#{username} login via token"
|
|
||||||
return nil
|
|
||||||
end
|
|
||||||
else
|
|
||||||
raise 'no login data was found in Login message'
|
|
||||||
end
|
|
||||||
|
|
||||||
end
|
def access_jam_session?(jam_session_id, user)
|
||||||
|
jam_session = JamSession.find_by_id(jam_session_id)
|
||||||
|
|
||||||
def access_jam_session?(jam_session_id, user)
|
if jam_session.nil?
|
||||||
jam_session = JamSession.find_by_id(jam_session_id)
|
raise SessionError, 'specified session not found'
|
||||||
|
end
|
||||||
|
|
||||||
if jam_session.nil?
|
if !jam_session.access? user
|
||||||
raise 'specified session not found'
|
raise SessionError, 'not allowed to join the specified session'
|
||||||
end
|
end
|
||||||
|
|
||||||
if !jam_session.access? user
|
return jam_session
|
||||||
raise 'not allowed to join the specified session'
|
end
|
||||||
end
|
|
||||||
|
|
||||||
return jam_session
|
def handle_session_directed(session_id, client_msg, client)
|
||||||
end
|
|
||||||
|
|
||||||
def handle_session_directed(session, client_msg, client)
|
context = @clients[client]
|
||||||
type, inner_msg = extract_inner_message(client_msg)
|
|
||||||
|
|
||||||
context = @clients[client]
|
# by not catching any exception here, this will kill the connection
|
||||||
|
# if for some reason the client is trying to send to a session that it doesn't
|
||||||
|
# belong to
|
||||||
|
session = access_jam_session?(session_id, context.user)
|
||||||
|
|
||||||
# by not catching any exception here, this will kill the connection
|
@log.debug "publishing to session #{session}"
|
||||||
# if for some reason the client is trying to send to a session that it doesn't
|
# put it on the topic exchange for sessions
|
||||||
# belong to
|
@sessions_exchange.publish(client_msg.to_s, :routing_key => "session.#{session_id}")
|
||||||
access_jam_session?(session, context.user)
|
end
|
||||||
|
|
||||||
# put it on the topic exchange for sessions
|
def handle_user_directed(user, client_msg, client)
|
||||||
@sessions.publish(client_msg.to_s, :routing_key => session)
|
|
||||||
end
|
|
||||||
|
|
||||||
def handle_user_directed(user, client_msg, client)
|
raise SessionError, 'not implemented'
|
||||||
type, inner_msg = extract_inner_message(client_msg)
|
end
|
||||||
|
end
|
||||||
end
|
|
||||||
end
|
end
|
||||||
|
|
|
||||||
|
|
@ -14,19 +14,19 @@ module JamWebsockets
|
||||||
host = "0.0.0.0"
|
host = "0.0.0.0"
|
||||||
port = options[:port]
|
port = options[:port]
|
||||||
|
|
||||||
@log.debug "starting server #{host}:#{port}"
|
@log.info "starting server #{host}:#{port}"
|
||||||
|
|
||||||
@router.start
|
@router.start
|
||||||
|
|
||||||
# if you don't do this, the app won't exit unless you kill -9
|
# if you don't do this, the app won't exit unless you kill -9
|
||||||
at_exit do
|
at_exit do
|
||||||
@log.debug "cleaning up server"
|
@log.info "cleaning up server"
|
||||||
@router.cleanup
|
@router.cleanup
|
||||||
end
|
end
|
||||||
|
|
||||||
EventMachine.run {
|
EventMachine.run {
|
||||||
EventMachine::WebSocket.start(:host => "0.0.0.0", :port => options[:port], :debug => options[:debug]) do |ws|
|
EventMachine::WebSocket.start(:host => "0.0.0.0", :port => options[:port], :debug => options[:emwebsocket_debug]) do |ws|
|
||||||
@log.debug "new client #{ws}"
|
@log.info "new client #{ws}"
|
||||||
@router.new_client(ws)
|
@router.new_client(ws)
|
||||||
end
|
end
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,4 @@
|
||||||
|
class SessionError < Exception
|
||||||
|
|
||||||
|
end
|
||||||
|
|
||||||
|
|
@ -1,9 +1,79 @@
|
||||||
require 'spec_helper'
|
require 'spec_helper'
|
||||||
require 'thread'
|
require 'thread'
|
||||||
|
|
||||||
|
|
||||||
|
LoginClient = Class.new do
|
||||||
|
attr_accessor :onmsgblock, :onopenblock
|
||||||
|
|
||||||
|
def initiaize()
|
||||||
|
|
||||||
|
end
|
||||||
|
|
||||||
|
def onopen(&block)
|
||||||
|
@onopenblock = block
|
||||||
|
end
|
||||||
|
|
||||||
|
def onmessage(&block)
|
||||||
|
@onmsgblock = block
|
||||||
|
end
|
||||||
|
|
||||||
|
def close(&block)
|
||||||
|
@oncloseblock = block
|
||||||
|
end
|
||||||
|
|
||||||
|
def close_websocket()
|
||||||
|
|
||||||
|
end
|
||||||
|
|
||||||
|
def send(msg)
|
||||||
|
puts msg
|
||||||
|
end
|
||||||
|
|
||||||
|
def request()
|
||||||
|
return { "origin" => "1.1.1.1"}
|
||||||
|
end
|
||||||
|
|
||||||
|
end
|
||||||
|
|
||||||
|
|
||||||
|
# does a login and returns client
|
||||||
|
def login(router, user, password)
|
||||||
|
|
||||||
|
message_factory = MessageFactory.new
|
||||||
|
client = LoginClient.new
|
||||||
|
|
||||||
|
login_ack = message_factory.login_ack("1.1.1.1")
|
||||||
|
|
||||||
|
client.should_receive(:send).with(login_ack.to_s)
|
||||||
|
client.should_receive(:onclose)
|
||||||
|
client.should_receive(:onerror)
|
||||||
|
client.should_receive(:request).and_return({"origin" => "1.1.1.1"})
|
||||||
|
|
||||||
|
@router.new_client(client)
|
||||||
|
client.onopenblock.call
|
||||||
|
|
||||||
|
# create a login message, and pass it into the router via onmsgblock.call
|
||||||
|
login = message_factory.login_with_user_pass(user.email, password)
|
||||||
|
|
||||||
|
# first log in
|
||||||
|
client.onmsgblock.call login.to_s
|
||||||
|
|
||||||
|
# then join jam session
|
||||||
|
return client
|
||||||
|
end
|
||||||
|
|
||||||
|
def login_jam_session(router, client, jam_session)
|
||||||
|
message_factory = MessageFactory.new
|
||||||
|
login_jam_session = message_factory.login_jam_session(jam_session.id)
|
||||||
|
login_ack = message_factory.login_jam_session_ack(false, nil);
|
||||||
|
client.should_receive(:send).with(login_ack.to_s)
|
||||||
|
client.onmsgblock.call login_jam_session.to_s
|
||||||
|
end
|
||||||
|
|
||||||
|
|
||||||
describe Router do
|
describe Router do
|
||||||
|
|
||||||
message_factory = MessageFactory.new
|
message_factory = MessageFactory.new
|
||||||
|
|
||||||
before do
|
before do
|
||||||
|
|
||||||
|
|
@ -35,6 +105,25 @@ describe Router do
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
describe "topic routing helpers" do
|
||||||
|
it "create and delete user lookup set" do
|
||||||
|
user = double(User)
|
||||||
|
user.should_receive(:id).any_number_of_times.and_return("1")
|
||||||
|
client = double("client")
|
||||||
|
context = ClientContext.new(user, client)
|
||||||
|
|
||||||
|
@router.user_context_lookup.length.should == 0
|
||||||
|
|
||||||
|
@router.add_user(context)
|
||||||
|
|
||||||
|
@router.user_context_lookup.length.should == 1
|
||||||
|
|
||||||
|
@router.remove_user(context)
|
||||||
|
|
||||||
|
@router.user_context_lookup.length.should == 0
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
|
||||||
describe "login" do
|
describe "login" do
|
||||||
it "should not allow login of bogus user", :mq => true do
|
it "should not allow login of bogus user", :mq => true do
|
||||||
|
|
@ -54,17 +143,19 @@ describe Router do
|
||||||
@onmsgblock = block
|
@onmsgblock = block
|
||||||
end
|
end
|
||||||
|
|
||||||
def close()
|
def close_websocket()
|
||||||
|
end
|
||||||
|
|
||||||
|
def close()
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
client = TestClient.new
|
client = TestClient.new
|
||||||
|
|
||||||
error_msg = message_factory.server_generic_error("invalid login")
|
error_msg = message_factory.server_rejection_error("invalid login")
|
||||||
|
|
||||||
client.should_receive(:send).with(error_msg.to_s)
|
client.should_receive(:send).with(error_msg.to_s)
|
||||||
client.should_receive(:close)
|
client.should_receive(:close_websocket)
|
||||||
client.should_receive(:onclose)
|
client.should_receive(:onclose)
|
||||||
client.should_receive(:onerror)
|
client.should_receive(:onerror)
|
||||||
|
|
||||||
|
|
@ -83,47 +174,7 @@ describe Router do
|
||||||
:password => "foobar", :password_confirmation => "foobar")
|
:password => "foobar", :password_confirmation => "foobar")
|
||||||
@user.save
|
@user.save
|
||||||
|
|
||||||
|
client1 = login(@router, @user, "foobar")
|
||||||
TestClient = Class.new do
|
|
||||||
|
|
||||||
attr_accessor :onmsgblock, :onopenblock, :oncloseblock
|
|
||||||
|
|
||||||
def initiaize()
|
|
||||||
|
|
||||||
end
|
|
||||||
|
|
||||||
def onopen(&block)
|
|
||||||
@onopenblock = block
|
|
||||||
end
|
|
||||||
|
|
||||||
def onmessage(&block)
|
|
||||||
@onmsgblock = block
|
|
||||||
end
|
|
||||||
|
|
||||||
def close(&block)
|
|
||||||
@oncloseblock = block
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
client = TestClient.new
|
|
||||||
|
|
||||||
login_ack = message_factory.login_ack("1.1.1.1")
|
|
||||||
|
|
||||||
client.should_receive(:send).with(login_ack.to_s)
|
|
||||||
client.should_receive(:close).exactly(0).times # close would occur on error, but this is good path
|
|
||||||
client.should_receive(:onclose)
|
|
||||||
client.should_receive(:onerror)
|
|
||||||
client.should_receive(:ip).and_return("1.1.1.1")
|
|
||||||
|
|
||||||
@router.new_client(client)
|
|
||||||
client.onopenblock.call
|
|
||||||
|
|
||||||
# create a login message, and pass it into the router via onmsgblock.call
|
|
||||||
login = message_factory.login_with_user_pass("user@example.com", "foobar")
|
|
||||||
|
|
||||||
# attempt to log in, causing chain of events
|
|
||||||
client.onmsgblock.call login.to_s
|
|
||||||
|
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -140,52 +191,35 @@ describe Router do
|
||||||
|
|
||||||
# make a jam_session and define two members
|
# make a jam_session and define two members
|
||||||
|
|
||||||
|
# create client 1, log him in, and log him in to jam session
|
||||||
TestClient = Class.new do
|
client1 = login(@router, user1, "foobar")
|
||||||
|
login_jam_session(@router, client1, jam_session)
|
||||||
attr_accessor :onmsgblock, :onopenblock
|
|
||||||
|
|
||||||
def initiaize()
|
|
||||||
|
|
||||||
end
|
|
||||||
|
|
||||||
def onopen(&block)
|
|
||||||
@onopenblock = block
|
|
||||||
end
|
|
||||||
|
|
||||||
def onmessage(&block)
|
|
||||||
@onmsgblock = block
|
|
||||||
end
|
|
||||||
|
|
||||||
def close(&block)
|
|
||||||
@oncloseblock = block
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
client = TestClient.new
|
|
||||||
|
|
||||||
login_ack = message_factory.login_ack("1.1.1.1")
|
|
||||||
|
|
||||||
client.should_receive(:send).with(login_ack.to_s)
|
|
||||||
#client.should_receive(:close)
|
|
||||||
client.should_receive(:onclose)
|
|
||||||
client.should_receive(:onerror)
|
|
||||||
client.should_receive(:ip).and_return("1.1.1.1")
|
|
||||||
|
|
||||||
@router.new_client(client)
|
|
||||||
client.onopenblock.call
|
|
||||||
|
|
||||||
# create a login message, and pass it into the router via onmsgblock.call
|
|
||||||
login = message_factory.login_with_user_pass(user1.email, "foobar")
|
|
||||||
|
|
||||||
# first log in
|
|
||||||
client.onmsgblock.call login.to_s
|
|
||||||
|
|
||||||
# then join jam session
|
|
||||||
login_jam_session = message_factory.login_jam_session(jam_session.id)
|
|
||||||
client.onmsgblock.call login_jam_session.to_s
|
|
||||||
|
|
||||||
end
|
end
|
||||||
|
|
||||||
|
it "should allow two valid subscribers to communicate with session-directed messages", :mq => true do
|
||||||
|
|
||||||
|
EventMachine.run do
|
||||||
|
user1 = FactoryGirl.create(:user) # in the jam session
|
||||||
|
user2 = FactoryGirl.create(:user) # in the jam session
|
||||||
|
|
||||||
|
jam_session = FactoryGirl.create(:jam_session, :user => user1)
|
||||||
|
|
||||||
|
jam_session_member1 = FactoryGirl.create(:jam_session_member, :user => user1, :jam_session => jam_session)
|
||||||
|
jam_session_member2 = FactoryGirl.create(:jam_session_member, :user => user2, :jam_session => jam_session)
|
||||||
|
|
||||||
|
# make a jam_session and define two members
|
||||||
|
|
||||||
|
|
||||||
|
# create client 1, log him in, and log him in to jam session
|
||||||
|
client1 = login(@router, user1, "foobar")
|
||||||
|
login_jam_session(@router, client1, jam_session)
|
||||||
|
|
||||||
|
client2 = login(@router, user2, "foobar")
|
||||||
|
login_jam_session(@router, client2, jam_session)
|
||||||
|
EM.stop
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue