2012-08-17 03:22:31 +00:00
require 'set'
2012-10-23 00:58:57 +00:00
require 'amqp'
2012-08-26 18:42:22 +00:00
require 'thread'
2012-09-03 02:45:18 +00:00
require 'json'
2012-09-04 01:22:46 +00:00
require 'eventmachine'
2012-08-26 18:42:22 +00:00
2012-08-17 03:22:31 +00:00
include Jampb
2012-09-04 01:22:46 +00:00
# add new field to client connection
module EventMachine
2012-10-11 03:04:43 +00:00
module WebSocket
class Connection < EventMachine :: Connection
2012-09-05 01:23:34 +00:00
attr_accessor :encode_json , :client_id # client_id is uuid we give to each client to track them as we like
2012-10-11 03:04:43 +00:00
end
2012-09-04 01:22:46 +00:00
end
end
2012-08-17 03:22:31 +00:00
module JamWebsockets
2012-09-04 01:22:46 +00:00
2012-08-17 03:22:31 +00:00
class Router
2012-10-17 03:50:28 +00:00
attr_accessor :user_context_lookup
2012-10-11 03:04:43 +00:00
2012-10-23 00:58:57 +00:00
def initialize ( )
2012-08-17 03:22:31 +00:00
@log = Logging . logger [ self ]
@pending_clients = Set . new # clients that have connected to server, but not logged in.
@clients = { } # clients that have logged in
2012-10-11 03:04:43 +00:00
@user_context_lookup = { } # lookup a set of client_contexts by user_id
@client_lookup = { } # lookup a client by client_id
2013-07-15 02:50:34 +00:00
@amqp_connection_manager = nil
2012-08-26 18:42:22 +00:00
@users_exchange = nil
2012-08-24 02:46:58 +00:00
@message_factory = JamRuby :: MessageFactory . new
2012-10-11 03:04:43 +00:00
@semaphore = Mutex . new
@user_topic = nil
@client_topic = nil
@thread_pool = nil
2012-10-23 11:42:28 +00:00
@heartbeat_interval = nil
2012-08-17 03:22:31 +00:00
end
2013-07-24 03:01:51 +00:00
def start ( connect_time_stale , options = { :host = > " localhost " , :port = > 5672 } , & block )
2012-08-17 03:22:31 +00:00
2012-08-26 18:42:22 +00:00
@log . info " startup "
2012-08-17 03:22:31 +00:00
2013-02-11 06:04:41 +00:00
@heartbeat_interval = connect_time_stale / 2
2012-10-23 11:42:28 +00:00
2012-08-17 03:22:31 +00:00
begin
2013-07-15 02:50:34 +00:00
@amqp_connection_manager = AmqpConnectionManager . new ( true , 4 , :host = > options [ :host ] , :port = > options [ :port ] )
@amqp_connection_manager . connect do | channel |
register_topics ( channel )
2013-07-24 03:01:51 +00:00
block . call
2013-07-15 02:50:34 +00:00
end
2012-08-17 03:22:31 +00:00
rescue = > e
2013-07-15 02:50:34 +00:00
@log . error " unable to initialize #{ e . to_s } "
2012-08-17 03:22:31 +00:00
cleanup
raise e
end
2012-10-23 03:16:14 +00:00
@log . info " started "
2012-08-17 03:22:31 +00:00
end
2013-08-07 15:38:35 +00:00
def add_client ( client_id , client_context )
@client_lookup [ client_id ] = client_context
2012-10-11 03:04:43 +00:00
end
def remove_client ( client_id , client )
deleted = @client_lookup . delete ( client_id )
if deleted . nil?
@log . warn " unable to delete #{ client_id } from client_lookup "
2013-08-07 15:38:35 +00:00
elsif deleted . client != client
2012-10-11 03:04:43 +00:00
# put it back--this is only possible if add_client hit the 'old connection' path
# so in other words if this happens:
# add_client(1, clientX)
# add_client(1, clientY) # but clientX is essentially defunct - this could happen due to a bug in client, or EM doesn't notify always of connection close in time
# remove_client(1, clientX) -- this check maintains that clientY stays as the current client in the hash
2013-08-07 15:38:35 +00:00
@client_lookup [ client_id ] = deleted
2012-10-13 22:55:01 +00:00
@log . debug " putting back client into @client_lookup for #{ client_id } #{ client . inspect } "
else
@log . debug " cleaned up @client_lookup for #{ client_id } "
2012-10-11 03:04:43 +00:00
end
2012-10-13 22:55:01 +00:00
2012-10-11 03:04:43 +00:00
end
def add_user ( context )
user_contexts = @user_context_lookup [ context . user . id ]
if user_contexts . nil?
2013-08-07 15:38:35 +00:00
user_contexts = Hash . new
2012-10-11 03:04:43 +00:00
@user_context_lookup [ context . user . id ] = user_contexts
end
2013-08-07 15:38:35 +00:00
user_contexts [ context . client ] = context
2012-10-11 03:04:43 +00:00
end
2012-10-17 03:50:28 +00:00
def remove_user ( client_context )
user_contexts = @user_context_lookup [ client_context . user . id ]
2012-10-11 03:04:43 +00:00
if user_contexts . nil?
2012-10-17 03:50:28 +00:00
@log . warn " user can not be removed #{ client_context } "
2012-10-11 03:04:43 +00:00
else
# delete the context from set of user contexts
2013-08-07 15:38:35 +00:00
user_contexts . delete ( client_context . client )
2012-10-11 03:04:43 +00:00
# if last user context, delete entire set (memory leak concern)
if user_contexts . length == 0
2012-10-17 03:50:28 +00:00
@user_context_lookup . delete ( client_context . user . id )
2012-10-11 03:04:43 +00:00
end
2012-10-17 03:50:28 +00:00
client_context . user = nil
2012-10-11 03:04:43 +00:00
end
end
# register topic for user messages and session messages
2013-07-15 02:50:34 +00:00
def register_topics ( channel )
2012-10-11 03:04:43 +00:00
2012-10-07 03:38:38 +00:00
######################## USER MESSAGING ###########################
# create user exchange
2013-07-15 02:50:34 +00:00
@users_exchange = channel . topic ( 'users' )
2012-10-11 03:04:43 +00:00
# create user messaging topic
2013-07-15 02:50:34 +00:00
@user_topic = channel . queue ( " " , :auto_delete = > true )
2012-08-26 18:42:22 +00:00
@user_topic . bind ( @users_exchange , :routing_key = > " user. # " )
@user_topic . purge
2012-10-11 03:04:43 +00:00
# subscribe for any messages to users
2012-10-23 00:58:57 +00:00
@user_topic . subscribe ( :ack = > false ) do | headers , msg |
2012-10-11 03:04:43 +00:00
begin
2013-03-27 21:02:00 +00:00
routing_key = headers . routing_key
2012-10-11 03:04:43 +00:00
user_id = routing_key [ " user. " . length .. - 1 ]
2012-10-09 05:03:49 +00:00
@semaphore . synchronize do
2012-10-11 03:04:43 +00:00
contexts = @user_context_lookup [ user_id ]
2012-10-09 05:03:49 +00:00
if ! contexts . nil?
2012-10-11 03:04:43 +00:00
2012-10-07 03:38:38 +00:00
@log . debug " received user-directed message for user: #{ user_id } "
2012-10-11 03:04:43 +00:00
2012-09-04 01:22:46 +00:00
msg = Jampb :: ClientMessage . parse ( msg )
2012-10-09 05:03:49 +00:00
2013-08-07 15:38:35 +00:00
contexts . each do | client_id , context |
2012-10-11 03:04:43 +00:00
EM . schedule do
@log . debug " sending user message to #{ context } "
send_to_client ( context . client , msg )
end
end
2012-10-09 05:03:49 +00:00
else
2013-03-27 21:02:00 +00:00
@log . debug " Can't route message: no user connected with id #{ user_id } "
2012-10-11 03:04:43 +00:00
end
end
rescue = > e
@log . error " unhandled error in messaging to client "
@log . error e
end
end
2013-07-23 01:00:42 +00:00
MQRouter . user_exchange = @users_exchange
2012-10-13 22:55:01 +00:00
############## CLIENT MESSAGING ###################
2013-07-15 02:50:34 +00:00
@clients_exchange = channel . topic ( 'clients' )
2012-10-13 22:55:01 +00:00
2013-07-15 02:50:34 +00:00
@client_topic = channel . queue ( " " , :auto_delete = > true )
2012-10-11 03:04:43 +00:00
@client_topic . bind ( @clients_exchange , :routing_key = > " client. # " )
@client_topic . purge
# subscribe for any p2p messages to a client
2012-10-23 00:58:57 +00:00
@client_topic . subscribe ( :ack = > false ) do | headers , msg |
2012-10-11 03:04:43 +00:00
begin
2012-10-23 11:42:28 +00:00
routing_key = headers . routing_key
2012-10-11 03:04:43 +00:00
client_id = routing_key [ " client. " . length .. - 1 ]
@semaphore . synchronize do
2013-08-07 15:38:35 +00:00
client_context = @client_lookup [ client_id ]
client = client_context . client
2012-10-11 03:04:43 +00:00
2012-10-17 03:50:28 +00:00
msg = Jampb :: ClientMessage . parse ( msg )
2012-10-13 22:55:01 +00:00
2012-10-23 11:42:28 +00:00
@log . debug " client-directed message received from #{ msg . from } to client #{ client_id } "
2012-10-13 22:55:01 +00:00
2012-10-11 03:04:43 +00:00
unless client . nil?
EM . schedule do
2012-10-23 11:42:28 +00:00
@log . debug " sending client-directed down websocket to #{ client_id } "
2012-10-11 03:04:43 +00:00
send_to_client ( client , msg )
end
2012-10-13 22:55:01 +00:00
else
2012-10-23 11:42:28 +00:00
@log . debug " client-directed message unroutable to disconnected client #{ client_id } "
2012-10-11 03:04:43 +00:00
end
end
rescue = > e
@log . error " unhandled error in messaging to client "
@log . error e
end
end
2013-07-23 01:00:42 +00:00
MQRouter . client_exchange = @clients_exchange
2012-10-11 03:04:43 +00:00
end
def new_client ( client )
@semaphore . synchronize do
@pending_clients . add ( client )
end
2012-08-17 03:22:31 +00:00
2012-09-04 01:22:46 +00:00
# default to using json instead of pb
2012-10-11 03:04:43 +00:00
client . encode_json = true
2013-01-27 02:15:03 +00:00
client . onopen { | handshake |
2012-10-11 03:04:43 +00:00
#binding.pry
@log . debug " client connected #{ client } "
2012-09-03 02:45:18 +00:00
2012-09-04 01:22:46 +00:00
# check for '?pb' or '?pb=true' in url query parameters
2013-01-27 02:15:03 +00:00
query_pb = handshake . query [ " pb " ]
2012-09-04 01:22:46 +00:00
if ! query_pb . nil? && ( query_pb == " " || query_pb == " true " )
client . encode_json = false
end
2012-09-03 02:45:18 +00:00
2012-09-04 01:22:46 +00:00
}
2012-08-17 03:22:31 +00:00
2012-10-11 03:04:43 +00:00
client . onclose {
@log . debug " Connection closed "
2013-02-21 17:49:32 +00:00
stale_client ( client )
2012-08-17 03:22:31 +00:00
}
client . onerror { | error |
if error . kind_of? ( EM :: WebSocket :: WebSocketError )
@log . error " websockets error: #{ error } "
else
2012-08-24 02:46:58 +00:00
@log . error " generic error: #{ error } #{ error . backtrace } "
2012-08-17 03:22:31 +00:00
end
cleanup_client ( client )
2012-10-11 03:04:43 +00:00
client . close_websocket
2012-08-17 03:22:31 +00:00
}
client . onmessage { | msg |
@log . debug ( " msg received " )
# TODO: set a max message size before we put it through PB?
# TODO: rate limit?
2012-10-15 11:38:33 +00:00
pb_msg = nil
2012-10-11 03:04:43 +00:00
2012-08-17 03:22:31 +00:00
begin
2012-10-11 03:04:43 +00:00
if client . encode_json
2012-10-13 22:55:01 +00:00
#example: {"type":"LOGIN", "target":"server", "login" : {"username":"hi"}}
2012-10-11 03:04:43 +00:00
parse = JSON . parse ( msg )
pb_msg = Jampb :: ClientMessage . json_create ( parse )
2012-09-04 01:22:46 +00:00
self . route ( pb_msg , client )
2012-10-11 03:04:43 +00:00
else
pb_msg = Jampb :: ClientMessage . parse ( msg . to_s )
self . route ( pb_msg , client )
end
rescue SessionError = > e
@log . info " ending client session deliberately due to malformed client behavior. reason= #{ e } "
begin
2012-08-26 18:42:22 +00:00
# wrap the message up and send it down
2012-09-04 01:22:46 +00:00
error_msg = @message_factory . server_rejection_error ( e . to_s )
2012-10-11 03:04:43 +00:00
send_to_client ( client , error_msg )
2012-08-26 18:42:22 +00:00
ensure
2012-10-11 03:04:43 +00:00
client . close_websocket
2012-08-26 18:42:22 +00:00
cleanup_client ( client )
end
2012-10-11 03:04:43 +00:00
rescue PermissionError = > e
@log . info " permission error. reason= #{ e . to_s } "
@log . info e
# wrap the message up and send it down
2012-10-15 11:38:33 +00:00
error_msg = @message_factory . server_permission_error ( pb_msg . message_id , e . to_s )
2012-10-11 03:04:43 +00:00
send_to_client ( client , error_msg )
2012-08-17 03:22:31 +00:00
rescue = > e
2012-08-26 18:42:22 +00:00
@log . error " ending client session due to server programming or runtime error. reason= #{ e . to_s } "
2012-10-11 03:04:43 +00:00
@log . error e
begin
2012-08-17 03:22:31 +00:00
# wrap the message up and send it down
2012-09-04 01:22:46 +00:00
error_msg = @message_factory . server_generic_error ( e . to_s )
2012-10-11 03:04:43 +00:00
send_to_client ( client , error_msg )
2012-08-17 03:22:31 +00:00
ensure
2012-10-11 03:04:43 +00:00
client . close_websocket
2012-08-17 03:22:31 +00:00
cleanup_client ( client )
end
end
}
end
2012-10-07 03:38:38 +00:00
def send_to_client ( client , msg )
2013-09-14 19:57:20 +00:00
@log . debug " SEND TO CLIENT ( #{ @message_factory . get_message_type ( msg ) } ) "
2012-10-07 03:38:38 +00:00
if client . encode_json
client . send ( msg . to_json . to_s )
else
# this is so odd that this is necessary from an API perspective. but searching through the source code... it's all I could find in em-websocket for allowing a binary message to be sent
client . instance_variable_get ( :@handler ) . send_frame ( :binary , msg . to_s )
end
end
def cleanup ( )
# shutdown topic listeners and mq connection
2013-07-15 02:50:34 +00:00
unless @amqp_connection_manager . nil?
@amqp_connection_manager . disconnect
2012-10-07 03:38:38 +00:00
end
# tear down each individual client
@clients . each do | client , context |
cleanup_client ( client )
2012-10-13 22:55:01 +00:00
end
2012-10-07 03:38:38 +00:00
end
def stop
@log . info " shutdown "
cleanup
2012-08-17 03:22:31 +00:00
end
2013-02-21 17:49:32 +00:00
# caused a client connection to be marked stale
def stale_client ( client )
if cid = client . client_id
ConnectionManager . active_record_transaction do | connection_manager |
2013-08-07 15:38:35 +00:00
music_session_id = connection_manager . flag_connection_stale_with_client_id ( cid )
# update the session members, letting them know this client went stale
context = @client_lookup [ client . client_id ]
2013-10-15 14:35:30 +00:00
if music_session = MusicSession . find_by_id ( music_session_id )
Notification . send_musician_session_stale ( music_session , client . client_id , context . user )
end unless music_session_id . nil?
2013-02-21 17:49:32 +00:00
end
end
end
def cleanup_clients_with_ids ( client_ids )
2013-02-21 20:18:17 +00:00
# @log.debug("*** cleanup_clients_with_ids: client_ids = #{client_ids.inspect}")
2013-02-21 17:49:32 +00:00
client_ids . each do | cid |
2013-08-31 19:55:44 +00:00
2013-08-07 15:38:35 +00:00
client_context = @client_lookup [ cid ]
self . cleanup_client ( client_context . client ) unless client_context . nil?
2013-08-31 19:55:44 +00:00
# remove this connection from the database
ConnectionManager . active_record_transaction do | mgr |
mgr . delete_connection ( cid ) { | conn , count , music_session_id , user_id |
Notification . send_friend_update ( user_id , false , conn ) if count == 0
music_session = MusicSession . find_by_id ( music_session_id ) unless music_session_id . nil?
user = User . find_by_id ( user_id ) unless user_id . nil?
Notification . send_musician_session_depart ( music_session , cid , user ) unless music_session . nil? || user . nil?
}
end
2013-02-21 17:49:32 +00:00
end
end
2012-10-11 03:04:43 +00:00
# removes all resources associated with a client
2012-08-17 03:22:31 +00:00
def cleanup_client ( client )
2012-10-11 03:04:43 +00:00
@semaphore . synchronize do
2013-02-21 17:49:32 +00:00
# @log.debug("*** cleanup_clients: client = #{client}")
2012-10-11 03:04:43 +00:00
pending = @pending_clients . delete? ( client )
2012-08-24 02:46:58 +00:00
2012-10-11 03:04:43 +00:00
if ! pending . nil?
2013-08-07 15:38:35 +00:00
@log . debug " cleaning up not-logged-in client #{ client } "
2012-10-11 03:04:43 +00:00
else
2012-10-13 22:55:01 +00:00
@log . debug " cleanup up logged-in client #{ client } "
2012-10-11 03:04:43 +00:00
remove_client ( client . client_id , client )
context = @clients . delete ( client )
2012-08-24 02:46:58 +00:00
2012-10-11 03:04:43 +00:00
if ! context . nil?
remove_user ( context )
else
2012-10-13 22:55:01 +00:00
@log . debug " skipping duplicate cleanup attempt of logged-in client "
2012-10-11 03:04:43 +00:00
end
2012-08-17 03:22:31 +00:00
2012-10-11 03:04:43 +00:00
end
end
2012-08-17 03:22:31 +00:00
end
def route ( client_msg , client )
2012-10-11 03:04:43 +00:00
message_type = @message_factory . get_message_type ( client_msg )
raise SessionError , " unknown message type received: #{ client_msg . type } " if message_type . nil?
2012-08-17 03:22:31 +00:00
2012-10-11 03:04:43 +00:00
@log . debug ( " msg received #{ message_type } " )
raise SessionError , 'client_msg.route_to is null' if client_msg . route_to . nil?
2012-08-17 03:22:31 +00:00
if @pending_clients . include? client and client_msg . type != ClientMessage :: Type :: LOGIN
# this client has not logged in and is trying to send a non-login message
2012-08-26 18:42:22 +00:00
raise SessionError , " must 'Login' first "
2012-08-17 03:22:31 +00:00
end
if @message_factory . server_directed? client_msg
handle_server_directed ( client_msg , client )
2012-10-11 03:04:43 +00:00
elsif @message_factory . client_directed? client_msg
to_client_id = client_msg . route_to [ MessageFactory :: CLIENT_TARGET_PREFIX . length .. - 1 ]
handle_client_directed ( to_client_id , client_msg , client )
2012-08-17 03:22:31 +00:00
elsif @message_factory . session_directed? client_msg
2012-10-07 20:51:43 +00:00
session_id = client_msg . target [ MessageFactory :: SESSION_TARGET_PREFIX . length .. - 1 ]
handle_session_directed ( session_id , client_msg , client )
2012-08-17 03:22:31 +00:00
elsif @message_factory . user_directed? client_msg
2012-10-07 20:51:43 +00:00
user_id = client_msg . target [ MessageFactory :: USER_PREFIX_TARGET . length .. - 1 ]
handle_user_directed ( user_id , client_msg , client )
2012-08-17 03:22:31 +00:00
else
2012-10-11 03:04:43 +00:00
raise SessionError , " client_msg.route_to is unknown type: #{ client_msg . route_to } "
2012-08-17 03:22:31 +00:00
end
end
def handle_server_directed ( client_msg , client )
2013-02-22 01:41:09 +00:00
# @log.info("*** handle_server_directed(#{client_msg.inspect}, #{client})")
2012-08-17 03:22:31 +00:00
if client_msg . type == ClientMessage :: Type :: LOGIN
handle_login ( client_msg . login , client )
2012-10-11 03:04:43 +00:00
elsif client_msg . type == ClientMessage :: Type :: HEARTBEAT
2012-08-24 02:46:58 +00:00
2013-07-15 02:50:34 +00:00
handle_heartbeat ( client_msg . heartbeat , client_msg . message_id , client )
2012-08-24 02:46:58 +00:00
2012-08-17 03:22:31 +00:00
else
2012-10-11 03:04:43 +00:00
raise SessionError , " unknown message type ' #{ client_msg . type } ' for #{ client_msg . route_to } -directed message "
2012-08-17 03:22:31 +00:00
end
end
def handle_login ( login , client )
2012-09-04 01:22:46 +00:00
username = login . username if login . value_for_tag ( 1 )
password = login . password if login . value_for_tag ( 2 )
token = login . token if login . value_for_tag ( 3 )
2012-10-11 03:04:43 +00:00
client_id = login . client_id if login . value_for_tag ( 4 )
2013-08-07 15:38:35 +00:00
reconnect_music_session_id = login . client_id if login . value_for_tag ( 5 )
2012-10-11 03:04:43 +00:00
2013-02-22 01:41:09 +00:00
@log . info ( " *** handle_login: token= #{ token } ; client_id= #{ client_id } " )
connection = nil
2013-08-07 15:38:35 +00:00
reconnected = false
2013-02-22 01:41:09 +00:00
2012-10-11 03:04:43 +00:00
# you don't have to supply client_id in login--if you don't, we'll generate one
2012-10-13 22:55:01 +00:00
if client_id . nil? || client_id . empty?
2012-10-11 03:04:43 +00:00
# give a unique ID to this client. This is used to prevent session messages
# from echoing back to the sender, for instance.
client_id = UUIDTools :: UUID . random_create . to_s
2013-02-22 01:41:09 +00:00
else
# check if there's a connection for the client... if it's stale, reconnect it
if connection = JamRuby :: Connection . find_by_client_id ( client_id )
# FIXME: I think connection table needs to updated within connection_manager
# otherwise this would be 1 line of code (connection.connect!)
2013-08-07 15:38:35 +00:00
music_session_upon_reentry = connection . music_session
2013-02-22 01:41:09 +00:00
ConnectionManager . active_record_transaction do | connection_manager |
2013-08-07 15:38:35 +00:00
music_session_id , reconnected = connection_manager . reconnect ( connection , reconnect_music_session_id )
context = @client_lookup [ client_id ]
if music_session_id . nil?
# if this is a reclaim of a connection, but music_session_id comes back null, then we need to check if this connection was IN a music session before.
# if so, then we need to tell the others in the session that this user is now departed
Notification . send_musician_session_depart ( music_session_upon_reentry , client . client_id , context . user ) unless context . nil? || music_session_upon_reentry . nil? || music_session_upon_reentry . destroyed?
else
music_session = MusicSession . find_by_id ( music_session_id )
Notification . send_musician_session_fresh ( music_session , client . client_id , context . user ) unless context . nil?
end
2013-02-22 01:41:09 +00:00
end if connection . stale?
end
# if there's a client_id but no connection object, create new client_id
client_id = UUIDTools :: UUID . random_create . to_s if ! connection
2012-10-11 03:04:43 +00:00
end
client . client_id = client_id
2012-08-17 03:22:31 +00:00
2012-10-11 03:04:43 +00:00
user = valid_login ( username , password , token , client_id )
2012-08-17 03:22:31 +00:00
if ! user . nil?
2013-02-21 17:49:32 +00:00
@log . debug " user #{ user } logged in "
2012-08-17 03:22:31 +00:00
# respond with LOGIN_ACK to let client know it was successful
2012-10-23 11:42:28 +00:00
remote_ip = extract_ip ( client )
2013-08-07 15:38:35 +00:00
2012-08-17 03:22:31 +00:00
2012-10-11 03:04:43 +00:00
@semaphore . synchronize do
2012-10-07 03:38:38 +00:00
# remove from pending_queue
2012-10-11 03:04:43 +00:00
@pending_clients . delete ( client )
# add a tracker for this user
context = ClientContext . new ( user , client )
@clients [ client ] = context
add_user ( context )
2013-08-07 15:38:35 +00:00
add_client ( client_id , context )
2012-10-02 05:03:08 +00:00
2013-08-07 15:38:35 +00:00
unless connection
2013-02-06 13:43:26 +00:00
# log this connection in the database
ConnectionManager . active_record_transaction do | connection_manager |
2013-08-07 15:38:35 +00:00
connection_manager . create_connection ( user . id , client . client_id , remote_ip ) do | conn , count |
if count == 1
Notification . send_friend_update ( user . id , true , conn )
end
end
2013-02-06 13:43:26 +00:00
end
2012-10-02 05:03:08 +00:00
end
2013-08-07 15:38:35 +00:00
login_ack = @message_factory . login_ack ( remote_ip ,
client_id ,
user . remember_token ,
@heartbeat_interval ,
connection . try ( :music_session_id ) ,
reconnected )
send_to_client ( client , login_ack )
2012-10-11 03:04:43 +00:00
end
2012-08-17 03:22:31 +00:00
else
2012-08-26 18:42:22 +00:00
raise SessionError , 'invalid login'
2012-08-17 03:22:31 +00:00
end
end
2012-10-23 11:42:28 +00:00
# TODO: deprecated; jam_ruby has routine inspired by this
2012-10-07 20:51:43 +00:00
def send_friend_update ( user , online , client )
2012-10-07 18:01:48 +00:00
@log . debug " sending friend update for user #{ user } online = #{ online } "
2012-10-13 22:55:01 +00:00
2012-10-07 20:51:43 +00:00
if ! user . nil? && user . friends . exists?
2012-10-07 18:01:48 +00:00
@log . debug " user has friends - sending friend updates "
2012-10-13 22:55:01 +00:00
2012-10-02 05:03:08 +00:00
# create the friend_update message
2012-10-07 03:38:38 +00:00
friend_update_msg = @message_factory . friend_update ( user . id , online )
2012-10-09 05:03:49 +00:00
2012-10-02 05:03:08 +00:00
# send the friend_update to each friend that has active connections
user . friends . each do | friend |
2012-10-07 18:01:48 +00:00
@log . debug " sending friend update message to #{ friend } "
2012-10-07 03:38:38 +00:00
2012-10-07 20:51:43 +00:00
handle_user_directed ( friend . id , friend_update_msg , client )
2012-10-02 05:03:08 +00:00
end
end
end
2013-07-15 02:50:34 +00:00
def handle_heartbeat ( heartbeat , heartbeat_message_id , client )
2013-02-22 01:41:09 +00:00
unless context = @clients [ client ]
@log . warn " *** WARNING: unable to find context due to heartbeat from client: #{ client . client_id } ; calling cleanup "
cleanup_client ( client )
2013-10-23 15:46:30 +00:00
raise SessionError , 'context state is gone. please reconnect.'
2013-01-06 05:43:23 +00:00
else
2013-02-22 01:41:09 +00:00
connection = Connection . find_by_user_id_and_client_id ( context . user . id , context . client . client_id )
if connection . nil?
@log . warn " *** WARNING: unable to find connection due to heartbeat from client: #{ context } ; calling cleanup_client "
cleanup_client ( client )
2013-10-23 15:46:30 +00:00
raise SessionError , 'connection state is gone. please reconnect.'
2013-02-21 17:49:32 +00:00
else
2013-08-07 15:38:35 +00:00
connection . touch
2013-02-22 01:41:09 +00:00
ConnectionManager . active_record_transaction do | connection_manager |
2013-08-07 17:36:47 +00:00
connection_manager . reconnect ( connection , connection . music_session_id )
2013-02-22 01:41:09 +00:00
end if connection . stale?
2013-02-21 17:49:32 +00:00
end
2013-07-15 02:50:34 +00:00
2013-09-01 02:17:38 +00:00
heartbeat_ack = @message_factory . heartbeat_ack ( )
send_to_client ( client , heartbeat_ack )
2013-08-07 15:38:35 +00:00
# send errors to clients in response to heartbeats if rabbitmq is down
2013-07-15 02:50:34 +00:00
if ! @amqp_connection_manager . connected?
error_msg = @message_factory . server_bad_state_error ( heartbeat_message_id , " messaging system down " )
context . sent_bad_state_previously = true
send_to_client ( client , error_msg )
return
elsif context . sent_bad_state_previously
context . sent_bad_state_previously = false
recovery_msg = @message_factory . server_bad_state_recovered ( heartbeat_message_id )
send_to_client ( client , recovery_msg )
end
2012-10-07 18:01:48 +00:00
end
2012-10-11 03:04:43 +00:00
end
2012-08-24 02:46:58 +00:00
2012-10-11 03:04:43 +00:00
def valid_login ( username , password , token , client_id )
2012-08-26 18:42:22 +00:00
2012-09-04 01:22:46 +00:00
if ! token . nil? && token != ''
@log . debug " logging in via token "
2012-10-11 03:04:43 +00:00
# attempt login with token
2013-02-21 17:49:32 +00:00
user = JamRuby :: User . find_by_remember_token ( token )
2012-08-26 18:42:22 +00:00
2012-10-11 03:04:43 +00:00
if user . nil?
2013-02-21 17:49:32 +00:00
@log . debug " no user found with token #{ token } "
2013-05-18 11:50:39 +00:00
return nil
2012-10-11 03:04:43 +00:00
else
@log . debug " #{ user } login via token "
return user
end
2012-09-04 01:22:46 +00:00
2012-10-11 03:04:43 +00:00
elsif ! username . nil? and ! password . nil?
2012-09-04 01:22:46 +00:00
@log . debug " logging in via user/pass ' #{ username } ' ' #{ password } ' "
2012-10-11 03:04:43 +00:00
# attempt login with username and password
user = User . find_by_email ( username )
2013-03-15 04:24:25 +00:00
if ! user . nil? && user . valid_password? ( password )
2012-10-11 03:04:43 +00:00
@log . debug " #{ user } login via password "
return user
else
@log . debug " #{ username } login failure "
return nil
end
else
raise SessionError , 'no login data was found in Login message'
end
end
2012-08-17 03:22:31 +00:00
2012-10-11 03:04:43 +00:00
def access_music_session ( music_session_id , user )
music_session = MusicSession . find_by_id ( music_session_id )
2012-08-17 03:22:31 +00:00
2012-10-11 03:04:43 +00:00
if music_session . nil?
raise SessionError , 'specified session not found'
end
2012-08-17 03:22:31 +00:00
2012-10-11 03:04:43 +00:00
if ! music_session . access? user
raise SessionError , 'not allowed to join the specified session'
end
2012-08-17 03:22:31 +00:00
2012-10-11 03:04:43 +00:00
return music_session
end
2012-08-17 03:22:31 +00:00
2012-10-11 03:04:43 +00:00
# client_id = the id of the client being accessed
# client = the current client
def access_p2p ( client_id , user , msg )
2012-08-17 03:22:31 +00:00
2013-01-10 20:07:25 +00:00
return nil
2012-10-11 03:04:43 +00:00
# ping_request and ping_ack messages are special in that they are simply allowed
if msg . type == ClientMessage :: Type :: PING_REQUEST || msg . type == ClientMessage :: Type :: PING_ACK
return nil
end
2012-08-17 03:22:31 +00:00
2012-10-29 00:24:16 +00:00
client_connection = Connection . find_by_client_id ( client_id )
2012-08-17 03:22:31 +00:00
2012-10-29 00:24:16 +00:00
if client_connection . nil?
2012-10-11 03:04:43 +00:00
raise PermissionError , 'specified client not found'
end
2012-08-17 03:22:31 +00:00
2012-10-29 00:24:16 +00:00
if ! client_connection . access_p2p? user
2012-10-11 03:04:43 +00:00
raise SessionError , 'not allowed to message this client'
end
end
def handle_client_directed ( to_client_id , client_msg , client )
context = @clients [ client ]
# by not catching any exception here, a PermissionError will be thrown if this isn't valid
# if for some reason the client is trying to send to a client that it doesn't
# belong to
access_p2p ( to_client_id , context . user , client_msg )
# populate routing data
client_msg . from = client . client_id
@log . debug " publishing to client #{ to_client_id } from client_id #{ client . client_id } "
# put it on the topic exchange for clients
@clients_exchange . publish ( client_msg . to_s , :routing_key = > " client. #{ to_client_id } " , :properties = > { :headers = > { " client_id " = > client . client_id } } )
end
2012-10-07 20:51:43 +00:00
def handle_user_directed ( user_id , client_msg , client )
@log . debug " publishing to user #{ user_id } from client_id #{ client . client_id } "
2012-10-11 03:04:43 +00:00
2012-10-07 20:51:43 +00:00
# put it on the topic exchange for users
@users_exchange . publish ( client_msg . to_s , :routing_key = > " user. #{ user_id } " )
2012-10-11 03:04:43 +00:00
end
2012-10-17 03:50:28 +00:00
def handle_session_directed ( session_id , client_msg , client )
context = @clients [ client ]
user_publish_to_session ( session_id , context . user , client_msg , :client_id = > client . client_id )
end
# sends a message to a session on behalf of a user
# if this is originating in the context of a client, it should be specified as :client_id => "value"
# client_msg should be a well-structure message (jam-pb message)
def user_publish_to_session ( music_session_id , user , client_msg , sender = { :client_id = > " " } )
music_session = access_music_session ( music_session_id , user )
# gather up client_ids in the session
client_ids = music_session . music_session_clients . map { | client | client . client_id } . reject { | client_id | client_id == sender [ :client_id ] }
publish_to_session ( music_session . id , client_ids , client_msg . to_s , sender )
end
# sends a message to a session with no checking of permissions
# this method deliberately has no database interactivity/active_record objects
def publish_to_session ( music_session_id , client_ids , client_msg , sender = { :client_id = > " " } )
EM . schedule do
sender_client_id = sender [ :client_id ]
# iterate over each person in the session, and send a p2p message
client_ids . each do | client_id |
@@log . debug " publishing to session: #{ music_session_id } client: #{ client_id } from client: #{ sender_client_id } "
# put it on the topic exchange3 for clients
self . class . client_exchange . publish ( client_msg , :routing_key = > " client. #{ music_session_id } " )
end
end
end
2012-10-23 11:42:28 +00:00
def extract_ip ( client )
return Socket . unpack_sockaddr_in ( client . get_peername ) [ 1 ]
end
2012-10-11 03:04:43 +00:00
end
2012-08-17 03:22:31 +00:00
end