2012-08-17 03:22:31 +00:00
require 'set'
2012-10-23 00:58:57 +00:00
require 'amqp'
2012-08-26 18:42:22 +00:00
require 'thread'
2012-09-03 02:45:18 +00:00
require 'json'
2012-09-04 01:22:46 +00:00
require 'eventmachine'
2012-08-26 18:42:22 +00:00
2012-08-17 03:22:31 +00:00
include Jampb
2012-09-04 01:22:46 +00:00
# add new field to client connection
module EventMachine
2012-10-11 03:04:43 +00:00
module WebSocket
class Connection < EventMachine :: Connection
2014-05-19 13:46:03 +00:00
attr_accessor :encode_json , :channel_id , :client_id , :user_id , :context , :trusted # client_id is uuid we give to each client to track them as we like
2014-04-29 01:45:06 +00:00
# http://stackoverflow.com/questions/11150147/how-to-check-if-eventmachineconnection-is-open
attr_accessor :connected
def connection_completed
connected = true
super
end
def connected?
! ! connected
end
def unbind
connected = false
super
end
2012-10-11 03:04:43 +00:00
end
2012-09-04 01:22:46 +00:00
end
end
2012-08-17 03:22:31 +00:00
module JamWebsockets
2012-09-04 01:22:46 +00:00
2012-08-17 03:22:31 +00:00
class Router
2014-05-19 13:46:03 +00:00
attr_accessor :user_context_lookup ,
:amqp_connection_manager ,
:heartbeat_interval_client ,
:connect_time_expire_client ,
:connect_time_stale_client ,
:heartbeat_interval_browser ,
:connect_time_expire_browser ,
:connect_time_stale_browser
2012-10-11 03:04:43 +00:00
2012-10-23 00:58:57 +00:00
def initialize ( )
2012-08-17 03:22:31 +00:00
@log = Logging . logger [ self ]
@clients = { } # clients that have logged in
2012-10-11 03:04:43 +00:00
@user_context_lookup = { } # lookup a set of client_contexts by user_id
@client_lookup = { } # lookup a client by client_id
2013-07-15 02:50:34 +00:00
@amqp_connection_manager = nil
2012-08-26 18:42:22 +00:00
@users_exchange = nil
2012-08-24 02:46:58 +00:00
@message_factory = JamRuby :: MessageFactory . new
2012-10-11 03:04:43 +00:00
@semaphore = Mutex . new
@user_topic = nil
@client_topic = nil
@thread_pool = nil
2014-04-30 03:01:28 +00:00
@heartbeat_interval_client = nil
@connect_time_expire_client = nil
2014-04-30 20:29:10 +00:00
@connect_time_stale_client = nil
2014-04-30 03:01:28 +00:00
@heartbeat_interval_browser = nil
@connect_time_expire_browser = nil
2014-04-30 20:29:10 +00:00
@connect_time_stale_browser = nil
2014-04-29 01:45:06 +00:00
@ar_base_logger = :: Logging :: Repository . instance [ ActiveRecord :: Base ]
2012-08-17 03:22:31 +00:00
end
2014-04-30 03:01:28 +00:00
def start ( connect_time_stale_client , connect_time_expire_client , connect_time_stale_browser , connect_time_expire_browser , options = { :host = > " localhost " , :port = > 5672 } , & block )
2012-08-17 03:22:31 +00:00
2012-08-26 18:42:22 +00:00
@log . info " startup "
2012-08-17 03:22:31 +00:00
2014-04-30 20:29:10 +00:00
@heartbeat_interval_client = connect_time_stale_client / 2
@connect_time_stale_client = connect_time_stale_client
@connect_time_expire_client = connect_time_expire_client
@heartbeat_interval_browser = connect_time_stale_browser / 2
@connect_time_stale_browser = connect_time_stale_browser
@connect_time_expire_browser = connect_time_expire_browser
2012-10-23 11:42:28 +00:00
2012-08-17 03:22:31 +00:00
begin
2013-07-15 02:50:34 +00:00
@amqp_connection_manager = AmqpConnectionManager . new ( true , 4 , :host = > options [ :host ] , :port = > options [ :port ] )
@amqp_connection_manager . connect do | channel |
register_topics ( channel )
2013-07-24 03:01:51 +00:00
block . call
2013-07-15 02:50:34 +00:00
end
2012-08-17 03:22:31 +00:00
rescue = > e
2013-07-15 02:50:34 +00:00
@log . error " unable to initialize #{ e . to_s } "
2012-08-17 03:22:31 +00:00
cleanup
raise e
end
2012-10-23 03:16:14 +00:00
@log . info " started "
2012-08-17 03:22:31 +00:00
end
2013-08-07 15:38:35 +00:00
def add_client ( client_id , client_context )
2013-11-16 04:35:40 +00:00
@log . debug " adding client #{ client_id } to @client_lookup "
2013-08-07 15:38:35 +00:00
@client_lookup [ client_id ] = client_context
2012-10-11 03:04:43 +00:00
end
2014-04-28 19:47:24 +00:00
def remove_client ( client_id )
2012-10-11 03:04:43 +00:00
deleted = @client_lookup . delete ( client_id )
if deleted . nil?
2014-04-28 19:47:24 +00:00
@log . warn " unable to delete #{ client_id } from client_lookup because it's already gone "
else
2012-10-13 22:55:01 +00:00
@log . debug " cleaned up @client_lookup for #{ client_id } "
2012-10-11 03:04:43 +00:00
end
2012-10-13 22:55:01 +00:00
2012-10-11 03:04:43 +00:00
end
def add_user ( context )
user_contexts = @user_context_lookup [ context . user . id ]
if user_contexts . nil?
2013-08-07 15:38:35 +00:00
user_contexts = Hash . new
2012-10-11 03:04:43 +00:00
@user_context_lookup [ context . user . id ] = user_contexts
end
2013-08-07 15:38:35 +00:00
user_contexts [ context . client ] = context
2012-10-11 03:04:43 +00:00
end
2012-10-17 03:50:28 +00:00
def remove_user ( client_context )
user_contexts = @user_context_lookup [ client_context . user . id ]
2012-10-11 03:04:43 +00:00
if user_contexts . nil?
2012-10-17 03:50:28 +00:00
@log . warn " user can not be removed #{ client_context } "
2012-10-11 03:04:43 +00:00
else
# delete the context from set of user contexts
2013-08-07 15:38:35 +00:00
user_contexts . delete ( client_context . client )
2012-10-11 03:04:43 +00:00
# if last user context, delete entire set (memory leak concern)
if user_contexts . length == 0
2012-10-17 03:50:28 +00:00
@user_context_lookup . delete ( client_context . user . id )
2012-10-11 03:04:43 +00:00
end
end
end
# register topic for user messages and session messages
2013-07-15 02:50:34 +00:00
def register_topics ( channel )
2012-10-11 03:04:43 +00:00
2012-10-07 03:38:38 +00:00
######################## USER MESSAGING ###########################
# create user exchange
2013-07-15 02:50:34 +00:00
@users_exchange = channel . topic ( 'users' )
2012-10-11 03:04:43 +00:00
# create user messaging topic
2013-07-15 02:50:34 +00:00
@user_topic = channel . queue ( " " , :auto_delete = > true )
2012-08-26 18:42:22 +00:00
@user_topic . bind ( @users_exchange , :routing_key = > " user. # " )
@user_topic . purge
2012-10-11 03:04:43 +00:00
# subscribe for any messages to users
2012-10-23 00:58:57 +00:00
@user_topic . subscribe ( :ack = > false ) do | headers , msg |
2012-10-11 03:04:43 +00:00
begin
2013-03-27 21:02:00 +00:00
routing_key = headers . routing_key
2012-10-11 03:04:43 +00:00
user_id = routing_key [ " user. " . length .. - 1 ]
2012-10-09 05:03:49 +00:00
@semaphore . synchronize do
2012-10-11 03:04:43 +00:00
contexts = @user_context_lookup [ user_id ]
2012-10-09 05:03:49 +00:00
if ! contexts . nil?
2012-10-11 03:04:43 +00:00
2012-10-07 03:38:38 +00:00
@log . debug " received user-directed message for user: #{ user_id } "
2012-10-11 03:04:43 +00:00
2012-09-04 01:22:46 +00:00
msg = Jampb :: ClientMessage . parse ( msg )
2012-10-09 05:03:49 +00:00
2013-08-07 15:38:35 +00:00
contexts . each do | client_id , context |
2012-10-11 03:04:43 +00:00
EM . schedule do
@log . debug " sending user message to #{ context } "
send_to_client ( context . client , msg )
end
end
2012-10-09 05:03:49 +00:00
else
2013-03-27 21:02:00 +00:00
@log . debug " Can't route message: no user connected with id #{ user_id } "
2012-10-11 03:04:43 +00:00
end
end
rescue = > e
@log . error " unhandled error in messaging to client "
@log . error e
end
end
2013-07-23 01:00:42 +00:00
MQRouter . user_exchange = @users_exchange
2012-10-13 22:55:01 +00:00
############## CLIENT MESSAGING ###################
2013-07-15 02:50:34 +00:00
@clients_exchange = channel . topic ( 'clients' )
2012-10-13 22:55:01 +00:00
2013-07-15 02:50:34 +00:00
@client_topic = channel . queue ( " " , :auto_delete = > true )
2012-10-11 03:04:43 +00:00
@client_topic . bind ( @clients_exchange , :routing_key = > " client. # " )
@client_topic . purge
# subscribe for any p2p messages to a client
2012-10-23 00:58:57 +00:00
@client_topic . subscribe ( :ack = > false ) do | headers , msg |
2012-10-11 03:04:43 +00:00
begin
2012-10-23 11:42:28 +00:00
routing_key = headers . routing_key
2012-10-11 03:04:43 +00:00
client_id = routing_key [ " client. " . length .. - 1 ]
@semaphore . synchronize do
2013-08-07 15:38:35 +00:00
client_context = @client_lookup [ client_id ]
2012-10-11 03:04:43 +00:00
2013-11-16 04:35:40 +00:00
if ! client_context . nil?
2012-10-13 22:55:01 +00:00
2013-11-16 04:35:40 +00:00
client = client_context . client
2012-10-13 22:55:01 +00:00
2013-11-16 04:35:40 +00:00
msg = Jampb :: ClientMessage . parse ( msg )
@log . debug " client-directed message received from #{ msg . from } to client #{ client_id } "
2012-10-11 03:04:43 +00:00
2013-11-16 04:35:40 +00:00
unless client . nil?
EM . schedule do
@log . debug " sending client-directed down websocket to #{ client_id } "
send_to_client ( client , msg )
end
else
@log . debug " client-directed message unroutable to disconnected client #{ client_id } "
2012-10-11 03:04:43 +00:00
end
2012-10-13 22:55:01 +00:00
else
2013-11-16 04:35:40 +00:00
@log . debug " Can't route message: no client connected with id #{ client_id } "
2012-10-11 03:04:43 +00:00
end
end
rescue = > e
@log . error " unhandled error in messaging to client "
@log . error e
end
end
2013-07-23 01:00:42 +00:00
MQRouter . client_exchange = @clients_exchange
2012-10-11 03:04:43 +00:00
end
2014-05-19 13:46:03 +00:00
def new_client ( client , is_trusted )
2012-09-04 01:22:46 +00:00
# default to using json instead of pb
2012-10-11 03:04:43 +00:00
client . encode_json = true
2014-05-19 13:46:03 +00:00
client . trusted = is_trusted
2012-10-11 03:04:43 +00:00
2013-01-27 02:15:03 +00:00
client . onopen { | handshake |
2014-04-30 20:29:10 +00:00
# a unique ID for this TCP connection, to aid in debugging
client . channel_id = handshake . query [ " channel_id " ]
@log . debug " client connected #{ client } with channel_id: #{ client . channel_id } "
2012-09-03 02:45:18 +00:00
2012-09-04 01:22:46 +00:00
# check for '?pb' or '?pb=true' in url query parameters
2013-01-27 02:15:03 +00:00
query_pb = handshake . query [ " pb " ]
2012-09-04 01:22:46 +00:00
if ! query_pb . nil? && ( query_pb == " " || query_pb == " true " )
client . encode_json = false
end
2012-09-03 02:45:18 +00:00
2012-09-04 01:22:46 +00:00
}
2012-08-17 03:22:31 +00:00
2012-10-11 03:04:43 +00:00
client . onclose {
2014-05-01 19:09:33 +00:00
@log . debug " connection closed. marking stale: #{ client . context } "
2014-04-30 03:01:28 +00:00
cleanup_client ( client )
2012-08-17 03:22:31 +00:00
}
client . onerror { | error |
if error . kind_of? ( EM :: WebSocket :: WebSocketError )
@log . error " websockets error: #{ error } "
else
2012-08-24 02:46:58 +00:00
@log . error " generic error: #{ error } #{ error . backtrace } "
2012-08-17 03:22:31 +00:00
end
}
client . onmessage { | msg |
# TODO: set a max message size before we put it through PB?
# TODO: rate limit?
2012-10-15 11:38:33 +00:00
pb_msg = nil
2012-10-11 03:04:43 +00:00
2012-08-17 03:22:31 +00:00
begin
2012-10-11 03:04:43 +00:00
if client . encode_json
2012-10-13 22:55:01 +00:00
#example: {"type":"LOGIN", "target":"server", "login" : {"username":"hi"}}
2012-10-11 03:04:43 +00:00
parse = JSON . parse ( msg )
pb_msg = Jampb :: ClientMessage . json_create ( parse )
2012-09-04 01:22:46 +00:00
self . route ( pb_msg , client )
2012-10-11 03:04:43 +00:00
else
pb_msg = Jampb :: ClientMessage . parse ( msg . to_s )
self . route ( pb_msg , client )
end
rescue SessionError = > e
@log . info " ending client session deliberately due to malformed client behavior. reason= #{ e } "
begin
2012-08-26 18:42:22 +00:00
# wrap the message up and send it down
2012-09-04 01:22:46 +00:00
error_msg = @message_factory . server_rejection_error ( e . to_s )
2012-10-11 03:04:43 +00:00
send_to_client ( client , error_msg )
2012-08-26 18:42:22 +00:00
ensure
cleanup_client ( client )
end
2012-10-11 03:04:43 +00:00
rescue PermissionError = > e
@log . info " permission error. reason= #{ e . to_s } "
@log . info e
# wrap the message up and send it down
2012-10-15 11:38:33 +00:00
error_msg = @message_factory . server_permission_error ( pb_msg . message_id , e . to_s )
2012-10-11 03:04:43 +00:00
send_to_client ( client , error_msg )
2012-08-17 03:22:31 +00:00
rescue = > e
2012-08-26 18:42:22 +00:00
@log . error " ending client session due to server programming or runtime error. reason= #{ e . to_s } "
2012-10-11 03:04:43 +00:00
@log . error e
begin
2012-08-17 03:22:31 +00:00
# wrap the message up and send it down
2012-09-04 01:22:46 +00:00
error_msg = @message_factory . server_generic_error ( e . to_s )
2012-10-11 03:04:43 +00:00
send_to_client ( client , error_msg )
2012-08-17 03:22:31 +00:00
ensure
cleanup_client ( client )
end
end
}
end
2012-10-07 03:38:38 +00:00
def send_to_client ( client , msg )
2014-04-29 01:45:06 +00:00
@log . debug " SEND TO CLIENT ( #{ @message_factory . get_message_type ( msg ) } ) " unless msg . type == ClientMessage :: Type :: HEARTBEAT_ACK
2012-10-07 03:38:38 +00:00
if client . encode_json
client . send ( msg . to_json . to_s )
else
# this is so odd that this is necessary from an API perspective. but searching through the source code... it's all I could find in em-websocket for allowing a binary message to be sent
client . instance_variable_get ( :@handler ) . send_frame ( :binary , msg . to_s )
end
end
def cleanup ( )
# shutdown topic listeners and mq connection
2013-07-15 02:50:34 +00:00
unless @amqp_connection_manager . nil?
@amqp_connection_manager . disconnect
2012-10-07 03:38:38 +00:00
end
# tear down each individual client
@clients . each do | client , context |
cleanup_client ( client )
2012-10-13 22:55:01 +00:00
end
2012-10-07 03:38:38 +00:00
end
def stop
@log . info " shutdown "
cleanup
2012-08-17 03:22:31 +00:00
end
2013-02-21 17:49:32 +00:00
# caused a client connection to be marked stale
def stale_client ( client )
2014-04-30 03:01:28 +00:00
if client . client_id
2014-05-01 19:09:33 +00:00
@log . info " marking client stale: #{ client . context } "
2013-02-21 17:49:32 +00:00
ConnectionManager . active_record_transaction do | connection_manager |
2014-04-30 03:01:28 +00:00
music_session_id = connection_manager . flag_connection_stale_with_client_id ( client . client_id )
2013-08-07 15:38:35 +00:00
# update the session members, letting them know this client went stale
context = @client_lookup [ client . client_id ]
2014-05-06 13:34:38 +00:00
if music_session = ActiveMusicSession . find_by_id ( music_session_id )
2013-10-15 14:35:30 +00:00
Notification . send_musician_session_stale ( music_session , client . client_id , context . user )
end unless music_session_id . nil?
2013-02-21 17:49:32 +00:00
end
end
end
2014-04-30 03:01:28 +00:00
def cleanup_clients_with_ids ( expired_connections )
expired_connections . each do | expired_connection |
cid = expired_connection [ :client_id ]
2013-08-31 19:55:44 +00:00
2013-08-07 15:38:35 +00:00
client_context = @client_lookup [ cid ]
2014-04-30 03:01:28 +00:00
2014-05-01 19:09:33 +00:00
if client_context
2014-05-02 12:33:41 +00:00
Diagnostic . expired_stale_connection ( client_context . user . id , client_context )
2014-05-01 19:09:33 +00:00
cleanup_client ( client_context . client )
end
2013-08-31 19:55:44 +00:00
2014-03-01 14:58:42 +00:00
music_session = nil
2014-05-01 19:09:33 +00:00
recording_id = nil
2014-03-01 14:58:42 +00:00
user = nil
2013-08-31 19:55:44 +00:00
# remove this connection from the database
ConnectionManager . active_record_transaction do | mgr |
mgr . delete_connection ( cid ) { | conn , count , music_session_id , user_id |
2014-05-01 19:09:33 +00:00
@log . info " expiring stale connection client_id: #{ cid } , user_id: #{ user_id } "
2013-08-31 19:55:44 +00:00
Notification . send_friend_update ( user_id , false , conn ) if count == 0
2014-05-06 13:34:38 +00:00
music_session = ActiveMusicSession . find_by_id ( music_session_id ) unless music_session_id . nil?
2013-08-31 19:55:44 +00:00
user = User . find_by_id ( user_id ) unless user_id . nil?
2014-01-05 01:25:05 +00:00
recording = music_session . stop_recording unless music_session . nil? # stop any ongoing recording, if there is one
2014-05-01 19:09:33 +00:00
recording_id = recording . id unless recording . nil?
2014-03-03 22:13:23 +00:00
music_session . with_lock do # VRFS-1297
music_session . tick_track_changes
end if music_session
}
2013-08-31 19:55:44 +00:00
end
2014-03-01 14:58:42 +00:00
2014-05-01 19:09:33 +00:00
if user && music_session
Notification . send_session_depart ( music_session , cid , user , recording_id )
end
2013-02-21 17:49:32 +00:00
end
end
2012-10-11 03:04:43 +00:00
# removes all resources associated with a client
2012-08-17 03:22:31 +00:00
def cleanup_client ( client )
2012-10-11 03:04:43 +00:00
@semaphore . synchronize do
2014-04-29 01:45:06 +00:00
client . close if client . connected?
2012-08-24 02:46:58 +00:00
2014-04-30 03:01:28 +00:00
pending = client . context . nil? # presence of context implies this connection has been logged into
2012-10-11 03:04:43 +00:00
2014-04-29 01:45:06 +00:00
if pending
@log . debug " cleaned up not-logged-in client #{ client } "
2012-10-11 03:04:43 +00:00
else
2012-10-13 22:55:01 +00:00
@log . debug " cleanup up logged-in client #{ client } "
2014-04-28 19:47:24 +00:00
2012-10-11 03:04:43 +00:00
context = @clients . delete ( client )
2012-08-24 02:46:58 +00:00
2014-04-29 01:45:06 +00:00
if context
remove_client ( client . client_id )
2012-10-11 03:04:43 +00:00
remove_user ( context )
else
2014-04-30 03:01:28 +00:00
@log . warn " skipping duplicate cleanup attempt of logged-in client "
2012-10-11 03:04:43 +00:00
end
end
end
2012-08-17 03:22:31 +00:00
end
def route ( client_msg , client )
2012-10-11 03:04:43 +00:00
message_type = @message_factory . get_message_type ( client_msg )
2014-04-30 03:01:28 +00:00
if message_type . nil?
Diagnostic . unknown_message_type ( client . user_id , client_msg )
raise SessionError , " unknown message type received: #{ client_msg . type } " if message_type . nil?
end
2012-10-11 03:04:43 +00:00
2014-04-29 01:45:06 +00:00
@log . debug ( " msg received #{ message_type } " ) if client_msg . type != ClientMessage :: Type :: HEARTBEAT
2012-10-11 03:04:43 +00:00
2014-04-30 03:01:28 +00:00
if client_msg . route_to . nil?
Diagnostic . missing_route_to ( client . user_id , client_msg )
raise SessionError , 'client_msg.route_to is null'
end
2012-08-17 03:22:31 +00:00
2014-04-30 03:01:28 +00:00
if ! client . user_id and client_msg . type != ClientMessage :: Type :: LOGIN
2012-08-17 03:22:31 +00:00
# this client has not logged in and is trying to send a non-login message
2012-08-26 18:42:22 +00:00
raise SessionError , " must 'Login' first "
2012-08-17 03:22:31 +00:00
end
if @message_factory . server_directed? client_msg
handle_server_directed ( client_msg , client )
2012-10-11 03:04:43 +00:00
elsif @message_factory . client_directed? client_msg
to_client_id = client_msg . route_to [ MessageFactory :: CLIENT_TARGET_PREFIX . length .. - 1 ]
handle_client_directed ( to_client_id , client_msg , client )
2012-08-17 03:22:31 +00:00
elsif @message_factory . session_directed? client_msg
2012-10-07 20:51:43 +00:00
session_id = client_msg . target [ MessageFactory :: SESSION_TARGET_PREFIX . length .. - 1 ]
handle_session_directed ( session_id , client_msg , client )
2012-08-17 03:22:31 +00:00
elsif @message_factory . user_directed? client_msg
2012-10-07 20:51:43 +00:00
user_id = client_msg . target [ MessageFactory :: USER_PREFIX_TARGET . length .. - 1 ]
handle_user_directed ( user_id , client_msg , client )
2012-08-17 03:22:31 +00:00
else
2012-10-11 03:04:43 +00:00
raise SessionError , " client_msg.route_to is unknown type: #{ client_msg . route_to } "
2012-08-17 03:22:31 +00:00
end
end
def handle_server_directed ( client_msg , client )
2013-02-22 01:41:09 +00:00
# @log.info("*** handle_server_directed(#{client_msg.inspect}, #{client})")
2012-08-17 03:22:31 +00:00
if client_msg . type == ClientMessage :: Type :: LOGIN
handle_login ( client_msg . login , client )
2012-10-11 03:04:43 +00:00
elsif client_msg . type == ClientMessage :: Type :: HEARTBEAT
2014-04-29 01:45:06 +00:00
sane_logging { handle_heartbeat ( client_msg . heartbeat , client_msg . message_id , client ) }
2012-08-17 03:22:31 +00:00
else
2012-10-11 03:04:43 +00:00
raise SessionError , " unknown message type ' #{ client_msg . type } ' for #{ client_msg . route_to } -directed message "
2012-08-17 03:22:31 +00:00
end
end
2012-08-24 02:46:58 +00:00
2014-04-30 20:29:10 +00:00
# returns heartbeat_interval, connection stale time, and connection expire time
def determine_connection_times ( user , client_type )
2012-08-24 02:46:58 +00:00
2014-04-30 20:29:10 +00:00
if client_type == Connection :: TYPE_BROWSER
default_heartbeat = @heartbeat_interval_browser
default_stale = @connect_time_stale_browser
default_expire = @connect_time_expire_browser
2012-08-17 03:22:31 +00:00
else
2014-04-30 20:29:10 +00:00
default_heartbeat = @heartbeat_interval_client
default_stale = @connect_time_stale_client
default_expire = @connect_time_expire_client
end
2014-05-19 13:46:03 +00:00
heartbeat_interval = user . try ( :heartbeat_interval_client ) || default_heartbeat
2014-04-30 20:29:10 +00:00
heartbeat_interval = heartbeat_interval . to_i
heartbeat_interval = default_heartbeat if heartbeat_interval == 0 # protect against bad config
2014-05-19 13:46:03 +00:00
connection_expire_time = user . try ( :connection_expire_time_client ) || default_expire
2014-04-30 20:29:10 +00:00
connection_expire_time = connection_expire_time . to_i
connection_expire_time = default_expire if connection_expire_time == 0 # protect against bad config
connection_stale_time = default_stale # no user override exists for this; not a very meaningful time right now
if heartbeat_interval > = connection_stale_time
raise SessionError , " misconfiguration! heartbeat_interval ( #{ heartbeat_interval } ) should be less than stale time ( #{ connection_stale_time } ) "
end
if connection_stale_time > = connection_expire_time
raise SessionError , " misconfiguration! stale time ( #{ connection_stale_time } ) should be less than expire time ( #{ connection_expire_time } ) "
2012-08-17 03:22:31 +00:00
end
2014-04-30 20:29:10 +00:00
[ heartbeat_interval , connection_stale_time , connection_expire_time ]
2012-08-17 03:22:31 +00:00
end
2014-05-19 13:46:03 +00:00
def add_tracker ( user , client , client_type , client_id )
# add a tracker for this user
context = ClientContext . new ( user , client , client_type )
@clients [ client ] = context
add_user ( context )
add_client ( client_id , context )
context
end
def handle_latency_tester_login ( client_id , client_type , client )
# respond with LOGIN_ACK to let client know it was successful
remote_ip = extract_ip ( client )
heartbeat_interval , connection_stale_time , connection_expire_time = determine_connection_times ( nil , client_type )
latency_tester = LatencyTester . connect ( {
client_id : client_id ,
ip_address : remote_ip ,
connection_stale_time : connection_stale_time ,
connection_expire_time : connection_expire_time } )
if latency_tester . errors . any?
@log . warn " unable to log in latency_tester with errors: #{ latency_tester . errors . inspect } "
raise SessionError , " invalid login: #{ latency_tester . errors . inspect } "
end
client . client_id = client_id
client . user_id = latency_tester . id if latency_tester
@semaphore . synchronize do
context = add_tracker ( latency_tester , client , client_type , client_id )
@log . debug " logged in context created: #{ context } "
login_ack = @message_factory . login_ack ( remote_ip ,
client_id ,
nil ,
heartbeat_interval ,
nil ,
false ,
latency_tester . id ,
connection_expire_time )
send_to_client ( client , login_ack )
end
end
2012-08-17 03:22:31 +00:00
def handle_login ( login , client )
2012-09-04 01:22:46 +00:00
username = login . username if login . value_for_tag ( 1 )
password = login . password if login . value_for_tag ( 2 )
token = login . token if login . value_for_tag ( 3 )
2012-10-11 03:04:43 +00:00
client_id = login . client_id if login . value_for_tag ( 4 )
2014-03-03 22:13:23 +00:00
reconnect_music_session_id = login . reconnect_music_session_id if login . value_for_tag ( 5 )
2014-03-07 20:20:34 +00:00
client_type = login . client_type if login . value_for_tag ( 6 )
2012-10-11 03:04:43 +00:00
2014-03-07 20:20:34 +00:00
@log . info ( " *** handle_login: token= #{ token } ; client_id= #{ client_id } , client_type= #{ client_type } " )
2014-05-19 13:46:03 +00:00
if client_type == Connection :: TYPE_LATENCY_TESTER
handle_latency_tester_login ( client_id , client_type , client )
return
end
2013-08-07 15:38:35 +00:00
reconnected = false
2013-02-22 01:41:09 +00:00
2012-10-11 03:04:43 +00:00
# you don't have to supply client_id in login--if you don't, we'll generate one
2012-10-13 22:55:01 +00:00
if client_id . nil? || client_id . empty?
2014-04-30 20:29:10 +00:00
# give a unique ID to this client.
2012-10-11 03:04:43 +00:00
client_id = UUIDTools :: UUID . random_create . to_s
2013-11-02 20:54:32 +00:00
end
user = valid_login ( username , password , token , client_id )
2014-04-28 19:47:24 +00:00
# kill any websocket connections that have this same client_id, which can happen in race conditions
2014-04-30 03:01:28 +00:00
# this code must happen here, before we go any further, so that there is only one websocket connection per client_id
existing_context = @client_lookup [ client_id ]
if existing_context
2014-04-30 20:29:10 +00:00
# in some reconnect scenarios, we may have in memory a websocket client still.
2014-05-01 19:09:33 +00:00
@log . info " duplicate client: #{ existing_context } "
2014-04-30 03:01:28 +00:00
Diagnostic . duplicate_client ( existing_context . user , existing_context ) if existing_context . client . connected
cleanup_client ( existing_context . client )
2014-04-28 19:47:24 +00:00
end
2013-11-02 20:54:32 +00:00
connection = JamRuby :: Connection . find_by_client_id ( client_id )
2014-04-30 20:29:10 +00:00
# if this connection is reused by a different user (possible in logout/login scenarios), then whack the connection
2013-11-02 20:54:32 +00:00
# because it will recreate a new connection lower down
2014-04-30 03:01:28 +00:00
if connection && user && connection . user != user
2013-11-02 20:54:32 +00:00
@log . debug ( " user #{ user . email } took client_id #{ client_id } from user #{ connection . user . email } " )
connection . delete
connection = nil
end
client . client_id = client_id
2014-04-30 03:01:28 +00:00
client . user_id = user . id if user
2014-02-22 04:21:31 +00:00
remote_ip = extract_ip ( client )
2013-11-02 20:54:32 +00:00
2014-04-30 03:01:28 +00:00
if user
2014-04-30 20:29:10 +00:00
heartbeat_interval , connection_stale_time , connection_expire_time = determine_connection_times ( user , client_type )
2014-05-01 19:09:33 +00:00
@log . debug " logged in #{ user } with client_id: #{ client_id } "
2013-11-02 20:54:32 +00:00
2013-02-22 01:41:09 +00:00
# check if there's a connection for the client... if it's stale, reconnect it
2013-11-02 20:54:32 +00:00
unless connection . nil?
2013-02-22 01:41:09 +00:00
# FIXME: I think connection table needs to updated within connection_manager
# otherwise this would be 1 line of code (connection.connect!)
2013-08-07 15:38:35 +00:00
music_session_upon_reentry = connection . music_session
2014-03-01 14:58:42 +00:00
send_depart = false
2014-04-30 03:01:28 +00:00
recording_id = nil
2014-03-01 14:58:42 +00:00
2013-02-22 01:41:09 +00:00
ConnectionManager . active_record_transaction do | connection_manager |
2014-04-30 20:29:10 +00:00
music_session_id , reconnected = connection_manager . reconnect ( connection , reconnect_music_session_id , remote_ip , connection_stale_time , connection_expire_time )
2014-03-03 05:16:48 +00:00
2013-08-07 15:38:35 +00:00
if music_session_id . nil?
# if this is a reclaim of a connection, but music_session_id comes back null, then we need to check if this connection was IN a music session before.
# if so, then we need to tell the others in the session that this user is now departed
2014-04-30 03:01:28 +00:00
unless music_session_upon_reentry . nil? || music_session_upon_reentry . destroyed?
2014-01-05 01:25:05 +00:00
recording = music_session_upon_reentry . stop_recording
2014-04-30 03:01:28 +00:00
recording_id = recording . id unless recording . nil?
2014-03-03 22:13:23 +00:00
music_session_upon_reentry . with_lock do # VRFS-1297
music_session_upon_reentry . tick_track_changes
end
2014-03-01 14:58:42 +00:00
send_depart = true
2013-11-03 20:55:55 +00:00
end
2013-08-07 15:38:35 +00:00
else
2014-05-06 13:34:38 +00:00
music_session = ActiveMusicSession . find_by_id ( music_session_id )
2014-04-30 03:01:28 +00:00
Notification . send_musician_session_fresh ( music_session , client . client_id , user )
2013-08-07 15:38:35 +00:00
end
2014-04-30 03:01:28 +00:00
end
2014-03-01 14:58:42 +00:00
if send_depart
2014-04-30 03:01:28 +00:00
Notification . send_session_depart ( music_session_upon_reentry , client . client_id , user , recording_id )
2014-03-01 14:58:42 +00:00
end
2013-02-22 01:41:09 +00:00
end
2012-08-17 03:22:31 +00:00
# respond with LOGIN_ACK to let client know it was successful
2012-10-11 03:04:43 +00:00
@semaphore . synchronize do
# add a tracker for this user
2014-05-19 13:46:03 +00:00
context = add_tracker ( user , client , client_type , client_id )
2012-10-02 05:03:08 +00:00
2014-05-01 19:09:33 +00:00
@log . debug " logged in context created: #{ context } "
2013-08-07 15:38:35 +00:00
unless connection
2013-02-06 13:43:26 +00:00
# log this connection in the database
ConnectionManager . active_record_transaction do | connection_manager |
2014-04-30 20:29:10 +00:00
connection_manager . create_connection ( user . id , client . client_id , remote_ip , client_type , connection_stale_time , connection_expire_time ) do | conn , count |
2013-08-07 15:38:35 +00:00
if count == 1
Notification . send_friend_update ( user . id , true , conn )
end
end
2013-02-06 13:43:26 +00:00
end
2012-10-02 05:03:08 +00:00
end
2014-04-30 03:01:28 +00:00
2013-08-07 15:38:35 +00:00
login_ack = @message_factory . login_ack ( remote_ip ,
client_id ,
user . remember_token ,
2014-04-30 20:29:10 +00:00
heartbeat_interval ,
2013-08-07 15:38:35 +00:00
connection . try ( :music_session_id ) ,
2013-12-20 22:01:51 +00:00
reconnected ,
2014-04-30 03:01:28 +00:00
user . id ,
2014-04-30 20:29:10 +00:00
connection_expire_time )
2013-08-07 15:38:35 +00:00
send_to_client ( client , login_ack )
2012-10-11 03:04:43 +00:00
end
2012-08-17 03:22:31 +00:00
else
2012-08-26 18:42:22 +00:00
raise SessionError , 'invalid login'
2012-08-17 03:22:31 +00:00
end
end
2013-07-15 02:50:34 +00:00
def handle_heartbeat ( heartbeat , heartbeat_message_id , client )
2013-02-22 01:41:09 +00:00
unless context = @clients [ client ]
2014-04-30 03:01:28 +00:00
@log . warn " *** WARNING: unable to find context when handling heartbeat. client_id= #{ client . client_id } ; killing session "
Diagnostic . missing_client_state ( client . user_id , client . context )
2013-10-23 15:46:30 +00:00
raise SessionError , 'context state is gone. please reconnect.'
2013-01-06 05:43:23 +00:00
else
2014-05-19 13:46:03 +00:00
connection = Connection . find_by_client_id ( context . client . client_id )
2014-03-03 22:13:23 +00:00
track_changes_counter = nil
2013-02-22 01:41:09 +00:00
if connection . nil?
2014-04-30 03:01:28 +00:00
@log . warn " *** WARNING: unable to find connection when handling heartbeat. context= #{ context } ; killing session "
Diagnostic . missing_connection ( client . user_id , client . context )
2013-10-23 15:46:30 +00:00
raise SessionError , 'connection state is gone. please reconnect.'
2013-02-21 17:49:32 +00:00
else
2014-03-03 22:13:23 +00:00
Connection . transaction do
2014-03-26 17:09:48 +00:00
# send back track_changes_counter if in a session
if connection . music_session_id
2014-05-06 13:34:38 +00:00
music_session = ActiveMusicSession . select ( :track_changes_counter ) . find_by_id ( connection . music_session_id )
2014-03-26 17:09:48 +00:00
track_changes_counter = music_session . track_changes_counter if music_session
end
# update connection updated_at
2014-03-03 22:13:23 +00:00
connection . touch
2014-03-26 17:09:48 +00:00
# update user's notification_seen_at field if the heartbeat indicates it saw one
2014-03-27 18:43:15 +00:00
# first we try to use the notification id, which should usually exist.
# if not, then fallback to notification_seen_at, which is approximately the last time we saw a notification
2014-05-19 13:46:03 +00:00
update_notification_seen_at ( connection , context , heartbeat ) if client . context . client_type != Connection :: TYPE_LATENCY_TESTER
2014-03-03 22:13:23 +00:00
end
2013-08-07 15:38:35 +00:00
2014-05-19 13:46:03 +00:00
if connection . stale?
ConnectionManager . active_record_transaction do | connection_manager |
heartbeat_interval , connection_stale_time , connection_expire_time = determine_connection_times ( context . user , context . client_type )
connection_manager . reconnect ( connection , connection . music_session_id , nil , connection_stale_time , connection_expire_time )
end
end
2013-02-21 17:49:32 +00:00
end
2013-07-15 02:50:34 +00:00
2014-03-03 22:13:23 +00:00
heartbeat_ack = @message_factory . heartbeat_ack ( track_changes_counter )
2013-09-01 02:17:38 +00:00
send_to_client ( client , heartbeat_ack )
2013-08-07 15:38:35 +00:00
# send errors to clients in response to heartbeats if rabbitmq is down
2013-07-15 02:50:34 +00:00
if ! @amqp_connection_manager . connected?
error_msg = @message_factory . server_bad_state_error ( heartbeat_message_id , " messaging system down " )
context . sent_bad_state_previously = true
send_to_client ( client , error_msg )
return
elsif context . sent_bad_state_previously
context . sent_bad_state_previously = false
recovery_msg = @message_factory . server_bad_state_recovered ( heartbeat_message_id )
send_to_client ( client , recovery_msg )
end
2012-10-07 18:01:48 +00:00
end
2012-10-11 03:04:43 +00:00
end
2012-08-24 02:46:58 +00:00
2014-03-27 18:43:15 +00:00
def update_notification_seen_at ( connection , context , heartbeat )
notification_id_field = heartbeat . notification_seen if heartbeat . value_for_tag ( 1 )
if notification_id_field
notification = Notification . find_by_id ( notification_id_field )
if notification
connection . user . notification_seen_at = notification . created_at
unless connection . user . save ( validate : false )
@log . error " unable to update notification_seen_at via id field for client #{ context } . errors: #{ connection . user . errors . inspect } "
end
else
notification_seen_at_parsed = nil
notification_seen_at = heartbeat . notification_seen_at if heartbeat . value_for_tag ( 2 )
begin
notification_seen_at_parsed = Time . parse ( notification_seen_at ) if notification_seen_at && notification_seen_at . length > 0
rescue Exception = > e
@log . error " unable to parse notification_seen_at in heartbeat from #{ context } . notification_seen_at: #{ notification_seen_at } "
end
if notification_seen_at_parsed
connection . user . notification_seen_at = notification_seen_at
unless connection . user . save ( validate : false )
@log . error " unable to update notification_seen_at via time field for client #{ context } . errors: #{ connection . user . errors . inspect } "
end
end
end
end
end
2012-10-11 03:04:43 +00:00
def valid_login ( username , password , token , client_id )
2012-08-26 18:42:22 +00:00
2012-09-04 01:22:46 +00:00
if ! token . nil? && token != ''
@log . debug " logging in via token "
2012-10-11 03:04:43 +00:00
# attempt login with token
2013-02-21 17:49:32 +00:00
user = JamRuby :: User . find_by_remember_token ( token )
2012-08-26 18:42:22 +00:00
2012-10-11 03:04:43 +00:00
if user . nil?
2013-02-21 17:49:32 +00:00
@log . debug " no user found with token #{ token } "
2013-05-18 11:50:39 +00:00
return nil
2012-10-11 03:04:43 +00:00
else
@log . debug " #{ user } login via token "
return user
end
2012-09-04 01:22:46 +00:00
2012-10-11 03:04:43 +00:00
elsif ! username . nil? and ! password . nil?
2012-09-04 01:22:46 +00:00
@log . debug " logging in via user/pass ' #{ username } ' ' #{ password } ' "
2012-10-11 03:04:43 +00:00
# attempt login with username and password
user = User . find_by_email ( username )
2013-03-15 04:24:25 +00:00
if ! user . nil? && user . valid_password? ( password )
2012-10-11 03:04:43 +00:00
@log . debug " #{ user } login via password "
return user
else
@log . debug " #{ username } login failure "
return nil
end
else
raise SessionError , 'no login data was found in Login message'
end
end
2012-08-17 03:22:31 +00:00
2012-10-11 03:04:43 +00:00
def access_music_session ( music_session_id , user )
2014-05-06 13:34:38 +00:00
music_session = ActiveMusicSession . find_by_id ( music_session_id )
2012-08-17 03:22:31 +00:00
2012-10-11 03:04:43 +00:00
if music_session . nil?
raise SessionError , 'specified session not found'
end
2012-08-17 03:22:31 +00:00
2012-10-11 03:04:43 +00:00
if ! music_session . access? user
raise SessionError , 'not allowed to join the specified session'
end
2012-08-17 03:22:31 +00:00
2012-10-11 03:04:43 +00:00
return music_session
end
2012-08-17 03:22:31 +00:00
2012-10-11 03:04:43 +00:00
# client_id = the id of the client being accessed
# client = the current client
def access_p2p ( client_id , user , msg )
2012-08-17 03:22:31 +00:00
2013-01-10 20:07:25 +00:00
return nil
2012-10-11 03:04:43 +00:00
# ping_request and ping_ack messages are special in that they are simply allowed
if msg . type == ClientMessage :: Type :: PING_REQUEST || msg . type == ClientMessage :: Type :: PING_ACK
return nil
end
2012-08-17 03:22:31 +00:00
2012-10-29 00:24:16 +00:00
client_connection = Connection . find_by_client_id ( client_id )
2012-08-17 03:22:31 +00:00
2012-10-29 00:24:16 +00:00
if client_connection . nil?
2012-10-11 03:04:43 +00:00
raise PermissionError , 'specified client not found'
end
2012-08-17 03:22:31 +00:00
2012-10-29 00:24:16 +00:00
if ! client_connection . access_p2p? user
2012-10-11 03:04:43 +00:00
raise SessionError , 'not allowed to message this client'
end
end
def handle_client_directed ( to_client_id , client_msg , client )
context = @clients [ client ]
# by not catching any exception here, a PermissionError will be thrown if this isn't valid
# if for some reason the client is trying to send to a client that it doesn't
# belong to
access_p2p ( to_client_id , context . user , client_msg )
2013-11-16 04:35:40 +00:00
if to_client_id . nil? || to_client_id == 'undefined' # javascript translates to 'undefined' in many cases
raise SessionError , " empty client_id specified in peer-to-peer message "
end
2012-10-11 03:04:43 +00:00
# populate routing data
client_msg . from = client . client_id
@log . debug " publishing to client #{ to_client_id } from client_id #{ client . client_id } "
# put it on the topic exchange for clients
@clients_exchange . publish ( client_msg . to_s , :routing_key = > " client. #{ to_client_id } " , :properties = > { :headers = > { " client_id " = > client . client_id } } )
end
2012-10-07 20:51:43 +00:00
def handle_user_directed ( user_id , client_msg , client )
@log . debug " publishing to user #{ user_id } from client_id #{ client . client_id } "
2012-10-11 03:04:43 +00:00
2012-10-07 20:51:43 +00:00
# put it on the topic exchange for users
@users_exchange . publish ( client_msg . to_s , :routing_key = > " user. #{ user_id } " )
2012-10-11 03:04:43 +00:00
end
2012-10-17 03:50:28 +00:00
def handle_session_directed ( session_id , client_msg , client )
context = @clients [ client ]
user_publish_to_session ( session_id , context . user , client_msg , :client_id = > client . client_id )
end
# sends a message to a session on behalf of a user
# if this is originating in the context of a client, it should be specified as :client_id => "value"
# client_msg should be a well-structure message (jam-pb message)
def user_publish_to_session ( music_session_id , user , client_msg , sender = { :client_id = > " " } )
music_session = access_music_session ( music_session_id , user )
# gather up client_ids in the session
client_ids = music_session . music_session_clients . map { | client | client . client_id } . reject { | client_id | client_id == sender [ :client_id ] }
publish_to_session ( music_session . id , client_ids , client_msg . to_s , sender )
end
# sends a message to a session with no checking of permissions
# this method deliberately has no database interactivity/active_record objects
def publish_to_session ( music_session_id , client_ids , client_msg , sender = { :client_id = > " " } )
EM . schedule do
sender_client_id = sender [ :client_id ]
# iterate over each person in the session, and send a p2p message
client_ids . each do | client_id |
@@log . debug " publishing to session: #{ music_session_id } client: #{ client_id } from client: #{ sender_client_id } "
# put it on the topic exchange3 for clients
2013-11-02 20:54:32 +00:00
self . class . client_exchange . publish ( client_msg , :routing_key = > " client. #{ client_id } " )
2012-10-17 03:50:28 +00:00
end
end
end
2012-10-23 11:42:28 +00:00
def extract_ip ( client )
2014-05-19 13:46:03 +00:00
Socket . unpack_sockaddr_in ( client . get_peername ) [ 1 ]
2012-10-23 11:42:28 +00:00
end
2014-04-29 01:45:06 +00:00
private
def sane_logging ( & blk )
# used around repeated transactions that cause too much ActiveRecord::Base logging
begin
2014-05-01 19:09:33 +00:00
if @ar_base_logger
original_level = @ar_base_logger . level
@ar_base_logger . level = :info
end
2014-04-29 01:45:06 +00:00
blk . call
ensure
2014-05-01 19:09:33 +00:00
if @ar_base_logger
@ar_base_logger . level = original_level
end
2014-04-29 01:45:06 +00:00
end
end
2012-10-11 03:04:43 +00:00
end
2012-08-17 03:22:31 +00:00
end