2012-08-27 13:56:02 +00:00
require 'pry'
2012-08-17 03:22:31 +00:00
require 'set'
require 'hot_bunnies'
2012-08-26 18:42:22 +00:00
require 'thread'
2012-09-03 02:45:18 +00:00
require 'json'
2012-09-04 01:22:46 +00:00
require 'eventmachine'
2012-08-26 18:42:22 +00:00
import java . util . concurrent . Executors
2012-08-17 03:22:31 +00:00
include Jampb
2012-09-04 01:22:46 +00:00
# add new field to client connection
module EventMachine
module WebSocket
class Connection < EventMachine :: Connection
2012-09-05 01:23:34 +00:00
attr_accessor :encode_json , :client_id # client_id is uuid we give to each client to track them as we like
2012-09-04 01:22:46 +00:00
end
end
end
2012-08-17 03:22:31 +00:00
module JamWebsockets
2012-09-04 01:22:46 +00:00
2012-08-17 03:22:31 +00:00
class Router
2012-08-26 18:42:22 +00:00
attr_accessor :user_context_lookup , :session_context_lookup
2012-08-17 03:22:31 +00:00
def initialize ( options = { } )
@log = Logging . logger [ self ]
@pending_clients = Set . new # clients that have connected to server, but not logged in.
@clients = { } # clients that have logged in
2012-08-26 18:42:22 +00:00
@user_context_lookup = { } # lookup a set of client_contexts by user_id
@session_context_lookup = { } # lookup a set of client_contexts by session_id
@sessions_exchange = nil
2012-08-17 03:22:31 +00:00
@connection = nil
@channel = nil
2012-08-26 18:42:22 +00:00
@users_exchange = nil
2012-08-24 02:46:58 +00:00
@message_factory = JamRuby :: MessageFactory . new
2012-08-26 18:42:22 +00:00
@semaphore = Mutex . new
@user_topic = nil
@user_subscription = nil
@session_topic = nil
@session_subscription = nil
@thread_pool = nil
2012-08-17 03:22:31 +00:00
end
def start ( options = { } )
2012-08-26 18:42:22 +00:00
@log . info " startup "
2012-08-17 03:22:31 +00:00
begin
2012-08-26 18:42:22 +00:00
@thread_pool = Executors . new_fixed_thread_pool ( 8 )
2012-08-17 03:22:31 +00:00
@connection = HotBunnies . connect ( :host = > options [ :host ] , :port = > options [ :port ] )
@channel = @connection . create_channel
@channel . prefetch = 10
2012-08-26 18:42:22 +00:00
register_topics
2012-08-17 03:22:31 +00:00
rescue = > e
cleanup
raise e
end
end
2012-08-26 18:42:22 +00:00
def add_user ( context )
user_contexts = @user_context_lookup [ context . user . id ]
if user_contexts . nil?
user_contexts = Set . new
@user_context_lookup [ context . user . id ] = user_contexts
end
user_contexts . add ( context )
end
def remove_user ( context )
user_contexts = @user_context_lookup [ context . user . id ]
if user_contexts . nil?
@log . warn " user can not be removed #{ context } "
else
# delete the context from set of user contexts
user_contexts . delete ( context )
# if last user context, delete entire set (memory leak concern)
if user_contexts . length == 0
@user_context_lookup . delete ( context . user . id )
end
end
end
def add_session ( context )
session_contexts = @session_context_lookup [ context . session . id ]
if session_contexts . nil?
session_contexts = Set . new
@session_context_lookup [ context . session . id ] = session_contexts
end
session_contexts . add ( context )
end
def remove_session ( context )
session_contexts = @session_context_lookup [ context . session . id ]
if session_contexts . nil?
@log . warn " session can not be removed #{ context } "
else
# delete the context from set of session contexts
session_contexts . delete ( context )
# if last session context, delete entire set (memory leak concern)
if session_contexts . length == 0
@session_context_lookup . delete ( context . session . id )
end
context . session = nil
end
end
# register topic for user messages and session messages
def register_topics
@users_exchange = @channel . exchange ( 'users' , :type = > :topic )
@sessions_exchange = @channel . exchange ( 'sessions' , :type = > :topic )
# create user messaging topic
@user_topic = @channel . queue ( " " , :auto_delete = > true )
@user_topic . bind ( @users_exchange , :routing_key = > " user. # " )
@user_topic . purge
# TODO: alert friends
# subscribe for any messages to users
#@user_subscription = @user_topic.subscribe(:ack => false, :blocking => false, :executor => @threadpool) do |headers, msg|
@user_subscription = @user_topic . subscribe ( :ack = > false )
@user_subscription . each ( :blocking = > false , :executor = > @threadpool ) do | headers , msg |
begin
routing_key = headers . envelope . routing_key
user_id = routing_key [ " user. " . length .. - 1 ]
@sempahore . synchronize do
contexts = @user_context_lookup [ user_id ]
unless contexts . nil?
@log . debug " received user-directed message for session: #{ user_id } "
2012-09-04 01:22:46 +00:00
msg = Jampb :: ClientMessage . parse ( msg )
2012-08-26 18:42:22 +00:00
contexts . each do | context |
EM . schedule do
2012-08-27 03:00:03 +00:00
@log . debug " sending user message to #{ context } "
send_to_client ( context . client , msg )
2012-08-26 18:42:22 +00:00
end
end
end
end
rescue = > e
@log . error " unhandled error in messaging to client "
end
end
2012-08-17 03:22:31 +00:00
2012-08-26 18:42:22 +00:00
@session_topic = @channel . queue ( " " , :auto_delete = > true )
@session_topic . bind ( @sessions_exchange , :routing_key = > " session. # " )
@session_topic . purge
# subscribe for any messages to session
#@session_subscription = @session_topic.subscribe(:ack => false, :blocking => false) do |headers, msg|
@session_subscription = @session_topic . subscribe ( :ack = > false )
@session_subscription . each ( :blocking = > false , :executor = > @threadpool ) do | headers , msg |
begin
routing_key = headers . envelope . routing_key
session_id = routing_key [ " session. " . length .. - 1 ]
@semaphore . synchronize do
contexts = @session_context_lookup [ session_id ]
unless contexts . nil?
@log . debug " received session-directed message for session: #{ session_id } "
2012-09-04 01:22:46 +00:00
msg = Jampb :: ClientMessage . parse ( msg )
2012-09-05 01:23:34 +00:00
# ok, its very odd to have your own message that you sent bounce back to you.
# In one small favor to the client, we purposefully disallow messages a client
# sent from bouncing back to itself.
properties = headers . properties unless headers . nil?
inner_headers = properties . headers unless properties . nil?
origin_client_id = inner_headers [ " client_id " ]
# counter-intuitively, even though a string is passed in when you send the header, an (apparently) auto-generated class is sent back which, if you to_s, returns the original value
origin_client_id = origin_client_id . to_s unless origin_client_id . nil?
@log . debug " message received from client #{ origin_client_id } "
2012-08-26 18:42:22 +00:00
contexts . each do | context |
2012-09-05 01:23:34 +00:00
if context . client . client_id != origin_client_id
EM . schedule do
@log . debug " sending session message to #{ context } "
send_to_client ( context . client , msg )
end
end
2012-08-26 18:42:22 +00:00
end
end
end
rescue = > e
@log . error " unhandled error in messaging to client "
end
2012-08-17 03:22:31 +00:00
end
2012-08-26 18:42:22 +00:00
end
2012-08-17 03:22:31 +00:00
2012-08-27 03:00:03 +00:00
def send_to_client ( client , msg )
2012-09-04 01:22:46 +00:00
if client . encode_json
client . send ( msg . to_json . to_s )
else
# this is so odd that this is necessary from an API perspective. but searching through the source code... it's all I could find in em-websocket for allowing a binary message to be sent
client . instance_variable_get ( :@handler ) . send_frame ( :binary , msg . to_s )
end
2012-08-27 03:00:03 +00:00
end
2012-08-26 18:42:22 +00:00
def cleanup ( )
# shutdown topic listeners and mq connection
begin
if ! @user_subscription . nil? && @user_subscription . active?
@log . debug " cleaning up user subscription "
@user_subscription . cancel
@user_subscription . shutdown!
end
if ! @session_subscription . nil? && @session_subscription . active?
@log . debug " cleaning up session subscription "
@session_subscription . cancel
@session_subscription . shutdown!
end
rescue = > e
@log . debug " unable to cancel subscription on cleanup: #{ e } "
2012-08-24 02:46:58 +00:00
end
2012-08-26 18:42:22 +00:00
@thread_pool . shutdown
if ! @channel . nil?
@channel . close
end
if ! @connection . nil?
@connection . close
end
# tear down each individual client
@clients . each do | client , context |
cleanup_client ( client )
end
2012-08-24 02:46:58 +00:00
end
2012-08-17 03:22:31 +00:00
2012-08-24 02:46:58 +00:00
def stop
2012-08-26 18:42:22 +00:00
@log . info " shutdown "
2012-08-24 02:46:58 +00:00
cleanup
end
2012-08-17 03:22:31 +00:00
2012-08-24 02:46:58 +00:00
def new_client ( client )
2012-08-17 03:22:31 +00:00
2012-09-05 01:23:34 +00:00
# give a unique ID to this client. This is used to prevent session messages
# from echoing back to the sender, for instance.
client . client_id = UUIDTools :: UUID . random_create . to_s
2012-08-26 18:42:22 +00:00
@semaphore . synchronize do
@pending_clients . add ( client )
end
2012-08-17 03:22:31 +00:00
2012-09-04 01:22:46 +00:00
# default to using json instead of pb
client . encode_json = true
2012-09-03 02:45:18 +00:00
2012-08-24 02:46:58 +00:00
client . onopen {
2012-09-03 02:45:18 +00:00
#binding.pry
2012-08-24 02:46:58 +00:00
@log . debug " client connected #{ client } "
2012-09-04 01:22:46 +00:00
# check for '?pb' or '?pb=true' in url query parameters
query_pb = client . request [ " query " ] [ " pb " ]
if ! query_pb . nil? && ( query_pb == " " || query_pb == " true " )
client . encode_json = false
end
2012-09-03 02:45:18 +00:00
2012-09-04 01:22:46 +00:00
}
2012-08-17 03:22:31 +00:00
2012-08-24 02:46:58 +00:00
client . onclose {
@log . debug " Connection closed "
2012-08-17 03:22:31 +00:00
2012-08-24 02:46:58 +00:00
cleanup_client ( client )
2012-08-17 03:22:31 +00:00
}
client . onerror { | error |
if error . kind_of? ( EM :: WebSocket :: WebSocketError )
@log . error " websockets error: #{ error } "
else
2012-08-24 02:46:58 +00:00
@log . error " generic error: #{ error } #{ error . backtrace } "
2012-08-17 03:22:31 +00:00
end
cleanup_client ( client )
2012-08-26 18:42:22 +00:00
client . close_websocket
2012-08-17 03:22:31 +00:00
}
client . onmessage { | msg |
@log . debug ( " msg received " )
# TODO: set a max message size before we put it through PB?
# TODO: rate limit?
2012-09-03 02:45:18 +00:00
2012-08-17 03:22:31 +00:00
begin
2012-09-04 01:22:46 +00:00
if client . encode_json
#example: {"type":"LOGIN", "target":"server", "login" : {"username":"hi"}}
2012-09-03 02:45:18 +00:00
parse = JSON . parse ( msg )
pb_msg = Jampb :: ClientMessage . json_create ( parse )
2012-09-04 01:22:46 +00:00
self . route ( pb_msg , client )
2012-09-03 02:45:18 +00:00
else
pb_msg = Jampb :: ClientMessage . parse ( msg . to_s )
self . route ( pb_msg , client )
end
2012-08-26 18:42:22 +00:00
rescue SessionError = > e
@log . info " ending client session deliberately due to malformed client behavior. reason= #{ e } "
begin
# wrap the message up and send it down
2012-09-04 01:22:46 +00:00
error_msg = @message_factory . server_rejection_error ( e . to_s )
2012-08-27 03:00:03 +00:00
send_to_client ( client , error_msg )
2012-08-26 18:42:22 +00:00
ensure
client . close_websocket
cleanup_client ( client )
end
2012-08-17 03:22:31 +00:00
rescue = > e
2012-08-26 18:42:22 +00:00
@log . error " ending client session due to server programming or runtime error. reason= #{ e . to_s } "
@log . error e
begin
2012-08-17 03:22:31 +00:00
# wrap the message up and send it down
2012-09-04 01:22:46 +00:00
error_msg = @message_factory . server_generic_error ( e . to_s )
2012-08-27 03:00:03 +00:00
send_to_client ( client , error_msg )
2012-08-17 03:22:31 +00:00
ensure
2012-08-26 18:42:22 +00:00
client . close_websocket
2012-08-17 03:22:31 +00:00
cleanup_client ( client )
end
end
}
end
2012-08-26 18:42:22 +00:00
# removes all resources associated with a client
2012-08-17 03:22:31 +00:00
def cleanup_client ( client )
2012-08-26 18:42:22 +00:00
@semaphore . synchronize do
pending = @pending_clients . delete? ( client )
2012-08-24 02:46:58 +00:00
2012-08-26 18:42:22 +00:00
if ! pending . nil?
@log . debug " cleaning up pending client #{ client } "
else
context = @clients . delete ( client )
2012-08-24 02:46:58 +00:00
2012-08-26 18:42:22 +00:00
if ! context . nil?
remove_user ( context )
2012-08-17 03:22:31 +00:00
2012-08-26 18:42:22 +00:00
if ! context . session . nil?
remove_session ( context )
end
else
@log . debug " skipping duplicate cleanup attempt of authorized client "
end
2012-08-17 03:22:31 +00:00
2012-08-26 18:42:22 +00:00
end
end
2012-08-17 03:22:31 +00:00
end
def route ( client_msg , client )
2012-08-26 18:42:22 +00:00
message_type = @message_factory . get_message_type ( client_msg )
raise SessionError , " unknown message type received: #{ client_msg . type } " if message_type . nil?
@log . debug ( " msg received #{ message_type } " )
2012-08-17 03:22:31 +00:00
2012-08-26 18:42:22 +00:00
raise SessionError , 'client_msg.target is null' if client_msg . target . nil?
2012-08-17 03:22:31 +00:00
if @pending_clients . include? client and client_msg . type != ClientMessage :: Type :: LOGIN
# this client has not logged in and is trying to send a non-login message
2012-08-26 18:42:22 +00:00
raise SessionError , " must 'Login' first "
2012-08-17 03:22:31 +00:00
end
if @message_factory . server_directed? client_msg
handle_server_directed ( client_msg , client )
elsif @message_factory . session_directed? client_msg
2012-08-22 03:08:01 +00:00
session = client_msg . target [ MessageFactory :: SESSION_TARGET_PREFIX . length .. - 1 ]
2012-08-17 03:22:31 +00:00
handle_session_directed ( session , client_msg , client )
elsif @message_factory . user_directed? client_msg
user = client_msg . target [ MessageFactory :: USER_PREFIX_TARGET . length .. - 1 ]
handle_user_directed ( user , client_msg , client )
else
2012-08-26 18:42:22 +00:00
raise SessionError , " client_msg.target is unknown type: #{ client_msg . target } "
2012-08-17 03:22:31 +00:00
end
end
def handle_server_directed ( client_msg , client )
if client_msg . type == ClientMessage :: Type :: LOGIN
handle_login ( client_msg . login , client )
2012-08-24 02:46:58 +00:00
elsif client_msg . type == ClientMessage :: Type :: HEARTBEAT
handle_heartbeat ( client_msg . heartbeat , client )
2012-10-05 02:34:10 +00:00
elsif client_msg . type == ClientMessage :: Type :: LOGIN_MUSIC_SESSION
2012-08-17 03:22:31 +00:00
2012-10-05 02:34:10 +00:00
handle_join_music_session ( client_msg . login_music_session , client )
2012-08-17 03:22:31 +00:00
2012-10-05 02:34:10 +00:00
elsif client_msg . type == ClientMessage :: Type :: LEAVE_MUSIC_SESSION
2012-08-17 03:22:31 +00:00
2012-10-05 02:34:10 +00:00
handle_leave_music_session ( client_msg . leave_music_session , client )
2012-08-17 03:22:31 +00:00
else
2012-08-26 18:42:22 +00:00
raise SessionError , " unknown message type ' #{ client_msg . type } ' for #{ client_msg . target } -directed message "
2012-08-17 03:22:31 +00:00
end
end
def handle_login ( login , client )
2012-09-04 01:22:46 +00:00
username = login . username if login . value_for_tag ( 1 )
password = login . password if login . value_for_tag ( 2 )
token = login . token if login . value_for_tag ( 3 )
2012-08-17 03:22:31 +00:00
user = valid_login ( username , password , token )
if ! user . nil?
@log . debug " user #{ user . email } logged in "
# respond with LOGIN_ACK to let client know it was successful
2012-08-27 13:56:02 +00:00
#binding.pry
remote_port , remote_ip = Socket . unpack_sockaddr_in ( client . get_peername )
2012-09-04 01:22:46 +00:00
login_ack = @message_factory . login_ack ( remote_ip )
2012-08-27 03:00:03 +00:00
send_to_client ( client , login_ack )
2012-08-17 03:22:31 +00:00
# remove from pending_queue
2012-08-26 18:42:22 +00:00
@semaphore . synchronize do
@pending_clients . delete ( client )
# add a tracker for this user
context = ClientContext . new ( user , client )
@clients [ client ] = context
add_user ( context )
end
2012-08-17 03:22:31 +00:00
else
2012-08-26 18:42:22 +00:00
raise SessionError , 'invalid login'
2012-08-17 03:22:31 +00:00
end
end
2012-08-24 02:46:58 +00:00
def handle_heartbeat ( heartbeat , client )
2012-08-26 18:42:22 +00:00
# todo: manage staleness
2012-08-24 02:46:58 +00:00
end
2012-10-05 02:34:10 +00:00
def handle_join_music_session ( join_music_session , client )
# verify that the current user has the rights to actually join the music session
2012-08-17 03:22:31 +00:00
context = @clients [ client ]
2012-10-05 02:34:10 +00:00
session_id = join_music_session . music_session
2012-08-17 03:22:31 +00:00
begin
2012-10-05 02:34:10 +00:00
session = access_music_session? ( session_id , context . user )
2012-08-26 18:42:22 +00:00
@log . debug " user #{ context } joining new session #{ session } "
@semaphore . synchronize do
old_session = context . session
if ! old_session . nil?
@log . debug " #{ context } is already in session. auto-logging out to join new session. "
remove_session ( context )
end
context . session = session
add_session ( context )
end
2012-08-17 03:22:31 +00:00
rescue = > e
# send back a failure ack and bail
2012-08-24 02:46:58 +00:00
@log . debug " client requested non-existent session. client: #{ client . request [ 'origin' ] } user: #{ context . user . email } "
2012-10-05 02:34:10 +00:00
login_music_session = @message_factory . login_music_session_ack ( true , e . to_s )
send_to_client ( client , login_music_session )
2012-08-17 03:22:31 +00:00
return
end
2012-10-05 02:34:10 +00:00
# respond with LOGIN_MUSIC_SESSION_ACK to let client know it was successful
login_music_session = @message_factory . login_music_session_ack ( false , nil )
send_to_client ( client , login_music_session )
2012-08-27 03:00:03 +00:00
2012-08-26 18:42:22 +00:00
# send 'new client' message to other members in the session
handle_session_directed ( session_id ,
2012-10-05 02:34:10 +00:00
@message_factory . user_joined_music_session ( context . user . id , context . user . name ) ,
2012-08-26 18:42:22 +00:00
client )
2012-08-17 03:22:31 +00:00
end
2012-10-05 02:34:10 +00:00
def handle_leave_music_session ( leave_music_session , client )
2012-08-17 03:22:31 +00:00
2012-08-26 18:42:22 +00:00
context = @clients [ client ]
2012-08-17 03:22:31 +00:00
2012-08-26 18:42:22 +00:00
raise SessionError , " unsupported "
end
2012-08-17 03:22:31 +00:00
2012-08-26 18:42:22 +00:00
def valid_login ( username , password , token )
2012-09-04 01:22:46 +00:00
if ! token . nil? && token != ''
@log . debug " logging in via token "
2012-08-26 18:42:22 +00:00
# attempt login with token
user = User . find_by_remember_token ( token )
if user . nil?
@log . debug " no user found with token "
return false
else
2012-09-04 01:22:46 +00:00
@log . debug " #{ user } login via token "
return user
end
elsif ! username . nil? and ! password . nil?
@log . debug " logging in via user/pass ' #{ username } ' ' #{ password } ' "
# attempt login with username and password
user = User . find_by_email ( username )
if ! user . nil? && user . authenticate ( password )
@log . debug " #{ user } login via password "
return user
else
@log . debug " #{ username } login failure "
2012-08-26 18:42:22 +00:00
return nil
end
2012-09-04 01:22:46 +00:00
else
2012-08-26 18:42:22 +00:00
raise SessionError , 'no login data was found in Login message'
end
2012-08-17 03:22:31 +00:00
2012-08-26 18:42:22 +00:00
end
2012-08-17 03:22:31 +00:00
2012-10-05 02:34:10 +00:00
def access_music_session? ( music_session_id , user )
music_session = MusicSession . find_by_id ( music_session_id )
2012-08-17 03:22:31 +00:00
2012-10-05 02:34:10 +00:00
if music_session . nil?
2012-08-26 18:42:22 +00:00
raise SessionError , 'specified session not found'
end
2012-08-17 03:22:31 +00:00
2012-10-05 02:34:10 +00:00
if ! music_session . access? user
2012-08-26 18:42:22 +00:00
raise SessionError , 'not allowed to join the specified session'
end
2012-08-17 03:22:31 +00:00
2012-10-05 02:34:10 +00:00
return music_session
2012-08-26 18:42:22 +00:00
end
2012-08-17 03:22:31 +00:00
2012-08-26 18:42:22 +00:00
def handle_session_directed ( session_id , client_msg , client )
2012-08-17 03:22:31 +00:00
2012-08-26 18:42:22 +00:00
context = @clients [ client ]
2012-08-17 03:22:31 +00:00
2012-08-26 18:42:22 +00:00
# by not catching any exception here, this will kill the connection
# if for some reason the client is trying to send to a session that it doesn't
# belong to
2012-10-05 02:34:10 +00:00
session = access_music_session? ( session_id , context . user )
2012-08-17 03:22:31 +00:00
2012-09-05 01:23:34 +00:00
@log . debug " publishing to session #{ session } from client_id #{ client . client_id } "
2012-08-26 18:42:22 +00:00
# put it on the topic exchange for sessions
2012-09-05 01:23:34 +00:00
@sessions_exchange . publish ( client_msg . to_s , :routing_key = > " session. #{ session_id } " , :properties = > { :headers = > { " client_id " = > client . client_id } } )
2012-08-26 18:42:22 +00:00
end
2012-08-17 03:22:31 +00:00
2012-08-26 18:42:22 +00:00
def handle_user_directed ( user , client_msg , client )
2012-08-17 03:22:31 +00:00
2012-08-26 18:42:22 +00:00
raise SessionError , 'not implemented'
end
end
2012-08-17 03:22:31 +00:00
end