2012-08-17 03:22:31 +00:00
require 'set'
2012-10-23 00:58:57 +00:00
require 'amqp'
2012-08-26 18:42:22 +00:00
require 'thread'
2012-09-03 02:45:18 +00:00
require 'json'
2012-09-04 01:22:46 +00:00
require 'eventmachine'
2012-08-26 18:42:22 +00:00
2012-08-17 03:22:31 +00:00
include Jampb
2012-09-04 01:22:46 +00:00
# add new field to client connection
module EventMachine
2012-10-11 03:04:43 +00:00
module WebSocket
class Connection < EventMachine :: Connection
2016-03-04 15:06:13 +00:00
attr_accessor :encode_json , :channel_id , :client_id , :user_id , :context , :trusted , :subscriptions , :x_forwarded_for , :query , :is_jamblaster # client_id is uuid we give to each client to track them as we like
2012-10-11 03:04:43 +00:00
end
2012-09-04 01:22:46 +00:00
end
end
2012-08-17 03:22:31 +00:00
module JamWebsockets
2012-09-04 01:22:46 +00:00
2012-08-17 03:22:31 +00:00
class Router
2014-05-19 13:46:03 +00:00
attr_accessor :user_context_lookup ,
:amqp_connection_manager ,
:heartbeat_interval_client ,
:connect_time_expire_client ,
:connect_time_stale_client ,
:heartbeat_interval_browser ,
:connect_time_expire_browser ,
2014-08-14 16:35:17 +00:00
:connect_time_stale_browser ,
2015-09-23 14:18:00 +00:00
:maximum_minutely_heartbeat_rate_browser ,
:maximum_minutely_heartbeat_rate_client ,
2014-09-24 19:27:56 +00:00
:max_connections_per_user ,
:gateway_name ,
2015-09-23 01:57:01 +00:00
:client_lookup ,
:time_it_sums ,
2015-09-23 14:18:00 +00:00
:profile_it_sums ,
:highest_drift ,
:heartbeat_tracker
2016-03-04 15:06:13 +00:00
:temp_ban
2015-09-23 14:18:00 +00:00
2012-10-11 03:04:43 +00:00
2012-10-23 00:58:57 +00:00
def initialize ( )
2012-08-17 03:22:31 +00:00
@log = Logging . logger [ self ]
@clients = { } # clients that have logged in
2012-10-11 03:04:43 +00:00
@user_context_lookup = { } # lookup a set of client_contexts by user_id
@client_lookup = { } # lookup a client by client_id
2014-12-18 21:13:55 +00:00
@subscription_lookup = { }
2013-07-15 02:50:34 +00:00
@amqp_connection_manager = nil
2012-08-26 18:42:22 +00:00
@users_exchange = nil
2012-08-24 02:46:58 +00:00
@message_factory = JamRuby :: MessageFactory . new
2012-10-11 03:04:43 +00:00
@semaphore = Mutex . new
@user_topic = nil
@client_topic = nil
2014-12-18 21:13:55 +00:00
@subscription_topic = nil
2012-10-11 03:04:43 +00:00
@thread_pool = nil
2014-04-30 03:01:28 +00:00
@heartbeat_interval_client = nil
@connect_time_expire_client = nil
2014-04-30 20:29:10 +00:00
@connect_time_stale_client = nil
2014-04-30 03:01:28 +00:00
@heartbeat_interval_browser = nil
@connect_time_expire_browser = nil
2014-04-30 20:29:10 +00:00
@connect_time_stale_browser = nil
2015-09-23 14:18:00 +00:00
@maximum_minutely_heartbeat_rate_browser = nil
@maximum_minutely_heartbeat_rate_client = nil
2014-09-24 19:27:56 +00:00
@gateway_name = nil
2014-04-29 01:45:06 +00:00
@ar_base_logger = :: Logging :: Repository . instance [ ActiveRecord :: Base ]
2014-09-24 19:27:56 +00:00
@message_stats = { }
2015-09-23 01:57:01 +00:00
@time_it_sums = { }
@profile_it_sums = { }
2015-09-23 14:18:00 +00:00
@heartbeat_tracker = { }
@temp_ban = { }
2016-02-08 12:56:54 +00:00
@chat_enabled = true
@chat_blast = true
2014-12-30 23:10:16 +00:00
@login_success_count = 0
@login_fail_count = 0
@connected_count = 0
@disconnected_count = 0
2015-09-23 01:57:01 +00:00
@user_message_counts = { }
@largest_message = nil
@largest_message_user = nil
2015-09-23 14:18:00 +00:00
@highest_drift = 0
2012-08-17 03:22:31 +00:00
end
2016-02-08 12:56:54 +00:00
def start ( connect_time_stale_client , connect_time_expire_client , connect_time_stale_browser , connect_time_expire_browser , options = { :host = > " localhost " , :port = > 5672 , :max_connections_per_user = > 10 , :gateway = > 'default' , :allow_dynamic_registration = > true , :chat_enabled = > true , :chat_blast = > true } , & block )
2012-08-17 03:22:31 +00:00
2012-08-26 18:42:22 +00:00
@log . info " startup "
2012-08-17 03:22:31 +00:00
2016-03-04 15:06:13 +00:00
@heartbeat_interval_client = connect_time_stale_client / 2
@connect_time_stale_client = connect_time_stale_client
@connect_time_expire_client = connect_time_expire_client
@heartbeat_interval_browser = connect_time_stale_browser / 2
@connect_time_stale_browser = connect_time_stale_browser
@connect_time_expire_browser = connect_time_expire_browser
@max_connections_per_user = options [ :max_connections_per_user ]
@gateway_name = options [ :gateway ]
@allow_dynamic_registration = options [ :allow_dynamic_registration ]
@chat_enabled = options [ :chat_enabled ]
@chat_blast = options [ :chat_blast ]
2012-10-23 11:42:28 +00:00
2015-09-23 14:18:00 +00:00
# determine the maximum amount of heartbeats we should get per user
@maximum_minutely_heartbeat_rate_client = ( ( @heartbeat_interval_client / 60 . 0 ) * 2 ) . ceil + 3
@maximum_minutely_heartbeat_rate_browser = ( ( @heartbeat_interval_browser / 60 . 0 ) * 2 ) . ceil + 3
@log . info ( " maxmium minutely timer #{ maximum_minutely_heartbeat_rate_client } " )
2012-08-17 03:22:31 +00:00
begin
2013-07-15 02:50:34 +00:00
@amqp_connection_manager = AmqpConnectionManager . new ( true , 4 , :host = > options [ :host ] , :port = > options [ :port ] )
@amqp_connection_manager . connect do | channel |
register_topics ( channel )
2013-07-24 03:01:51 +00:00
block . call
2013-07-15 02:50:34 +00:00
end
2012-08-17 03:22:31 +00:00
rescue = > e
2013-07-15 02:50:34 +00:00
@log . error " unable to initialize #{ e . to_s } "
2012-08-17 03:22:31 +00:00
cleanup
raise e
end
2012-10-23 03:16:14 +00:00
@log . info " started "
2012-08-17 03:22:31 +00:00
end
2013-08-07 15:38:35 +00:00
def add_client ( client_id , client_context )
2013-11-16 04:35:40 +00:00
@log . debug " adding client #{ client_id } to @client_lookup "
2013-08-07 15:38:35 +00:00
@client_lookup [ client_id ] = client_context
2012-10-11 03:04:43 +00:00
end
2014-04-28 19:47:24 +00:00
def remove_client ( client_id )
2012-10-11 03:04:43 +00:00
deleted = @client_lookup . delete ( client_id )
if deleted . nil?
2014-04-28 19:47:24 +00:00
@log . warn " unable to delete #{ client_id } from client_lookup because it's already gone "
2016-03-04 15:06:13 +00:00
else
2012-10-13 22:55:01 +00:00
@log . debug " cleaned up @client_lookup for #{ client_id } "
2012-10-11 03:04:43 +00:00
end
2012-10-13 22:55:01 +00:00
2012-10-11 03:04:43 +00:00
end
def add_user ( context )
user_contexts = @user_context_lookup [ context . user . id ]
if user_contexts . nil?
2013-08-07 15:38:35 +00:00
user_contexts = Hash . new
2012-10-11 03:04:43 +00:00
@user_context_lookup [ context . user . id ] = user_contexts
end
2013-08-07 15:38:35 +00:00
user_contexts [ context . client ] = context
2012-10-11 03:04:43 +00:00
end
2012-10-17 03:50:28 +00:00
def remove_user ( client_context )
2016-03-04 15:06:13 +00:00
if client_context . user
user_contexts = @user_context_lookup [ client_context . user . id ]
2012-10-17 03:50:28 +00:00
2016-03-04 15:06:13 +00:00
if user_contexts . nil?
@log . warn " user can not be removed #{ client_context } "
else
# delete the context from set of user contexts
user_contexts . delete ( client_context . client )
2012-10-11 03:04:43 +00:00
2016-03-04 15:06:13 +00:00
# if last user context, delete entire set (memory leak concern)
if user_contexts . length == 0
@user_context_lookup . delete ( client_context . user . id )
end
2012-10-11 03:04:43 +00:00
end
end
end
# register topic for user messages and session messages
2013-07-15 02:50:34 +00:00
def register_topics ( channel )
2012-10-11 03:04:43 +00:00
2012-10-07 03:38:38 +00:00
######################## USER MESSAGING ###########################
# create user exchange
2013-07-15 02:50:34 +00:00
@users_exchange = channel . topic ( 'users' )
2012-10-11 03:04:43 +00:00
# create user messaging topic
2013-07-15 02:50:34 +00:00
@user_topic = channel . queue ( " " , :auto_delete = > true )
2012-08-26 18:42:22 +00:00
@user_topic . bind ( @users_exchange , :routing_key = > " user. # " )
@user_topic . purge
2012-10-11 03:04:43 +00:00
# subscribe for any messages to users
2012-10-23 00:58:57 +00:00
@user_topic . subscribe ( :ack = > false ) do | headers , msg |
2015-09-22 20:25:48 +00:00
time_it ( 'user_topic' ) {
2016-03-04 15:06:13 +00:00
begin
routing_key = headers . routing_key
user_id = routing_key [ " user. " . length .. - 1 ]
2012-10-09 05:03:49 +00:00
2016-03-04 15:06:13 +00:00
@semaphore . synchronize do
contexts = @user_context_lookup [ user_id ]
2012-10-11 03:04:43 +00:00
2016-03-04 15:06:13 +00:00
if ! contexts . nil?
2012-10-11 03:04:43 +00:00
2016-03-04 15:06:13 +00:00
@log . debug " received user-directed message for user: #{ user_id } "
2012-10-11 03:04:43 +00:00
2016-03-04 15:06:13 +00:00
msg = Jampb :: ClientMessage . parse ( msg )
2012-10-09 05:03:49 +00:00
2016-03-04 15:06:13 +00:00
contexts . each do | client_id , context |
EM . schedule do
@log . debug " sending user message to #{ context } "
send_to_client ( context . client , msg )
end
2012-10-11 03:04:43 +00:00
end
2016-03-04 15:06:13 +00:00
else
#@log.debug "Can't route message: no user connected with id #{user_id}" # too chatty
2012-10-11 03:04:43 +00:00
end
end
2016-03-04 15:06:13 +00:00
rescue = > e
@log . error " unhandled error in messaging to client "
@log . error e
end
2015-09-22 20:25:48 +00:00
}
2012-10-11 03:04:43 +00:00
end
2013-07-23 01:00:42 +00:00
MQRouter . user_exchange = @users_exchange
2012-10-13 22:55:01 +00:00
############## CLIENT MESSAGING ###################
2013-07-15 02:50:34 +00:00
@clients_exchange = channel . topic ( 'clients' )
2012-10-13 22:55:01 +00:00
2013-07-15 02:50:34 +00:00
@client_topic = channel . queue ( " " , :auto_delete = > true )
2012-10-11 03:04:43 +00:00
@client_topic . bind ( @clients_exchange , :routing_key = > " client. # " )
@client_topic . purge
# subscribe for any p2p messages to a client
2012-10-23 00:58:57 +00:00
@client_topic . subscribe ( :ack = > false ) do | headers , msg |
2015-09-22 20:25:48 +00:00
time_it ( 'p2p_topic' ) {
2016-03-04 15:06:13 +00:00
begin
routing_key = headers . routing_key
client_id = routing_key [ " client. " . length .. - 1 ]
@semaphore . synchronize do
2014-12-18 21:13:55 +00:00
2016-03-04 15:06:13 +00:00
if client_id == MessageFactory :: ALL_NATIVE_CLIENTS
2014-08-31 15:30:59 +00:00
2016-01-19 00:41:53 +00:00
msg = Jampb :: ClientMessage . parse ( msg )
2016-03-04 15:06:13 +00:00
@log . debug " client-directed message received from #{ msg . from } to all clients "
2016-01-19 00:41:53 +00:00
@client_lookup . each do | client_id , client_context |
2016-03-04 15:06:13 +00:00
if client_context . client_type == JamRuby :: Connection :: TYPE_CLIENT
2016-01-19 00:41:53 +00:00
client = client_context . client
if client
EM . schedule do
2016-03-04 15:06:13 +00:00
@log . debug " sending client-directed down websocket to #{ client_id } "
2016-01-19 00:41:53 +00:00
send_to_client ( client , msg )
end
end
end
end
2016-03-04 15:06:13 +00:00
elsif client_id == MessageFactory :: ALL_ACTIVE_CLIENTS
if @chat_enabled
msg = Jampb :: ClientMessage . parse ( msg )
@log . debug " client-directed message received from #{ msg . from } to all chat clients "
@client_lookup . each do | client_id , client_context |
if @chat_blast || client_context . active
client = client_context . client
if client
EM . schedule do
#@log.debug "sending client-directed down websocket to #{client_id}"
send_to_client ( client , msg )
end
end
end
end
end
else
client_context = @client_lookup [ client_id ]
2012-10-13 22:55:01 +00:00
2016-03-04 15:06:13 +00:00
if client_context
2012-10-13 22:55:01 +00:00
2016-03-04 15:06:13 +00:00
client = client_context . client
2013-11-16 04:35:40 +00:00
2016-03-04 15:06:13 +00:00
msg = Jampb :: ClientMessage . parse ( msg )
2012-10-11 03:04:43 +00:00
2016-03-04 15:06:13 +00:00
@log . debug " client-directed message received from #{ msg . from } to client #{ client_id } " unless msg . type == ClientMessage :: Type :: PEER_MESSAGE
2013-11-16 04:35:40 +00:00
2016-03-04 15:06:13 +00:00
unless client . nil?
2014-08-31 15:30:59 +00:00
2016-03-04 15:06:13 +00:00
EM . schedule do
@log . debug " sending client-directed down websocket to #{ client_id } " unless msg . type == ClientMessage :: Type :: PEER_MESSAGE
send_to_client ( client , msg )
end
else
@log . debug " client-directed message unroutable to disconnected client #{ client_id } "
2014-08-31 15:30:59 +00:00
end
else
2016-03-04 15:06:13 +00:00
#@log.debug "Can't route message: no client connected with id #{client_id}" this happens all the time in multi-websocket scenarios
2013-11-16 04:35:40 +00:00
end
2012-10-11 03:04:43 +00:00
end
2014-08-31 15:30:59 +00:00
2016-03-04 15:06:13 +00:00
end
rescue = > e
@log . error " unhandled error in messaging to client "
@log . error e
2012-10-11 03:04:43 +00:00
end
2015-09-22 20:25:48 +00:00
}
2012-10-11 03:04:43 +00:00
end
2013-07-23 01:00:42 +00:00
MQRouter . client_exchange = @clients_exchange
2014-12-18 21:13:55 +00:00
############## DYNAMIC SUBSCRIPTION MESSAGING ###################
@subscriptions_exchange = channel . topic ( 'subscriptions' )
@subscription_topic = channel . queue ( " subscriptions- #{ @gateway_name } " , :auto_delete = > true )
@subscription_topic . purge
# subscribe for any p2p messages to a client
@subscription_topic . subscribe ( :ack = > false ) do | headers , msg |
2015-09-22 20:25:48 +00:00
time_it ( 'subscribe_topic' ) {
2016-03-04 15:06:13 +00:00
begin
routing_key = headers . routing_key
type_and_id = routing_key [ " subscription. " . length .. - 1 ]
#type, id = type_and_id.split('.')
2014-12-18 21:13:55 +00:00
2016-03-04 15:06:13 +00:00
@semaphore . synchronize do
2014-12-18 21:13:55 +00:00
2016-03-04 15:06:13 +00:00
clients = @subscription_lookup [ type_and_id ]
2014-12-18 21:13:55 +00:00
2016-03-04 15:06:13 +00:00
msg = Jampb :: ClientMessage . parse ( msg )
2014-12-18 21:13:55 +00:00
2016-03-04 15:06:13 +00:00
if clients
EM . schedule do
clients . each do | client |
@log . debug " subscription msg to client #{ client . client_id } "
send_to_client ( client , msg )
end
2014-12-18 21:13:55 +00:00
end
end
end
2016-03-04 15:06:13 +00:00
rescue = > e
@log . error " unhandled error in messaging to client for mount "
@log . error e
2014-12-18 21:13:55 +00:00
end
2015-09-22 20:25:48 +00:00
}
2014-12-18 21:13:55 +00:00
end
MQRouter . subscription_exchange = @subscriptions_exchange
end
# listens on a subscription topic on the behalf of a client
def register_subscription ( client , type , id )
# track subscriptions that this client has made, for disconnect scenarios
2016-03-04 15:06:13 +00:00
client . subscriptions . add ( { type : type , id : id } )
2014-12-18 21:13:55 +00:00
key = " #{ type } . #{ id } "
# for a given type:id in @subscription_lookup, track clients listening
clients = @subscription_lookup [ key ]
if clients . nil?
clients = Set . new
@subscription_lookup [ key ] = clients
end
needs_subscription = clients . length == 0
clients . add ( client )
@log . debug ( " register subscription handled #{ type } . #{ id } " )
if needs_subscription
routing_key = " subscription. #{ type } . #{ id } "
@log . debug ( " register topic bound #{ routing_key } " )
# if this is the 1st client to listen for this mount, then subscribe to the topic for this mount_id
@subscription_topic . bind ( @subscriptions_exchange , :routing_key = > routing_key )
end
end
# de-listens on a subscription topic on the behalf of a client
# called automatically when a clean disconnects, to keep state clean.
def unregister_subscription ( client , type , id )
# remove subscription from this client's list of subscriptions
2016-03-04 15:06:13 +00:00
client . subscriptions . delete ( { type : type , id : id } )
2014-12-18 21:13:55 +00:00
key = " #{ type } . #{ id } "
# for a given mount_id in @subscription_lookup, remove from list of clients listening
clients = @subscription_lookup [ key ]
if clients
deleted = clients . delete ( client )
if ! deleted
@log . error " unregister_subscription: unable to locate any client #{ client . client_id } for id #{ id } "
end
else
@log . error " unregister_subscription: unable to locate any clients for id #{ id } "
end
if clients . length == 0
# if there are no more clients listening, then unsubscribe to the topic for this mount_id
2015-01-23 22:51:59 +00:00
routing_key = " subscription. #{ type } . #{ id } "
@log . debug ( " unregister dynamic topic #{ routing_key } " )
@subscription_topic . unbind ( @subscriptions_exchange , :routing_key = > routing_key )
2014-12-18 21:13:55 +00:00
end
2012-10-11 03:04:43 +00:00
end
2014-06-19 19:05:33 +00:00
# this method allows you to translate exceptions into websocket channel messages and behavior safely.
# pass in your block, throw an error in your logic, and have the right things happen on the websocket channel
def websocket_comm ( client , original_message_id , & blk )
begin
blk . call
rescue SessionError = > e
@log . info " ending client session deliberately due to malformed client behavior. reason= #{ e } "
begin
# wrap the message up and send it down
2014-09-26 04:13:34 +00:00
error_msg = @message_factory . server_rejection_error ( e . to_s , e . error_code )
2014-06-19 19:05:33 +00:00
send_to_client ( client , error_msg )
ensure
cleanup_client ( client )
end
2015-04-20 14:50:33 +00:00
rescue JamPermissionError = > e
2014-06-19 19:05:33 +00:00
@log . info " permission error. reason= #{ e . to_s } "
@log . info e
# wrap the message up and send it down
error_msg = @message_factory . server_permission_error ( original_message_id , e . to_s )
send_to_client ( client , error_msg )
rescue = > e
@log . error " ending client session due to server programming or runtime error. reason= #{ e . to_s } "
@log . error e
begin
# wrap the message up and send it down
error_msg = @message_factory . server_generic_error ( e . to_s )
send_to_client ( client , error_msg )
ensure
cleanup_client ( client )
end
end
end
2012-10-11 03:04:43 +00:00
2014-05-19 13:46:03 +00:00
def new_client ( client , is_trusted )
2012-09-04 01:22:46 +00:00
# default to using json instead of pb
2012-10-11 03:04:43 +00:00
client . encode_json = true
2014-05-19 13:46:03 +00:00
client . trusted = is_trusted
2012-10-11 03:04:43 +00:00
2013-01-27 02:15:03 +00:00
client . onopen { | handshake |
2014-09-26 04:13:34 +00:00
2015-09-23 01:57:01 +00:00
time_it ( 'onopen' ) {
stats_connected
2014-12-30 23:10:16 +00:00
2015-09-23 01:57:01 +00:00
# a unique ID for this TCP connection, to aid in debugging
client . channel_id = handshake . query [ " channel_id " ]
2014-04-30 20:29:10 +00:00
2015-09-23 01:57:01 +00:00
@log . debug " client connected #{ client } with channel_id: #{ client . channel_id } "
2014-04-30 20:29:10 +00:00
2012-09-03 02:45:18 +00:00
2015-09-23 01:57:01 +00:00
# check for '?pb' or '?pb=true' in url query parameters
query_pb = handshake . query [ " pb " ]
2012-09-04 01:22:46 +00:00
2015-09-23 01:57:01 +00:00
if ! query_pb . nil? && ( query_pb == " " || query_pb == " true " )
client . encode_json = false
end
2012-09-03 02:45:18 +00:00
2015-09-23 01:57:01 +00:00
websocket_comm ( client , nil ) do
2016-03-04 15:06:13 +00:00
client . x_forwarded_for = handshake . headers [ " X-Forwarded-For " ]
client . query = handshake . query
handle_login ( client , client . query , client . x_forwarded_for )
2015-09-23 01:57:01 +00:00
end
}
2012-09-04 01:22:46 +00:00
}
2012-08-17 03:22:31 +00:00
2012-10-11 03:04:43 +00:00
client . onclose {
2015-09-23 01:57:01 +00:00
time_it ( 'onclose' ) {
2015-09-22 20:25:48 +00:00
@log . debug " connection closed. marking stale: #{ client . context } "
cleanup_client ( client )
}
2012-08-17 03:22:31 +00:00
}
client . onerror { | error |
if error . kind_of? ( EM :: WebSocket :: WebSocketError )
@log . error " websockets error: #{ error } "
else
2012-08-24 02:46:58 +00:00
@log . error " generic error: #{ error } #{ error . backtrace } "
2012-08-17 03:22:31 +00:00
end
}
2014-06-19 19:05:33 +00:00
client . onmessage { | data |
2012-08-17 03:22:31 +00:00
# TODO: set a max message size before we put it through PB?
# TODO: rate limit?
2014-06-19 19:05:33 +00:00
msg = nil
2012-10-11 03:04:43 +00:00
2015-09-23 01:57:01 +00:00
if @largest_message . nil? || data . length > @largest_message . length
@largest_message = data
@largest_message_user = client . user_id
end
2014-06-19 19:05:33 +00:00
# extract the message safely
websocket_comm ( client , nil ) do
2012-10-11 03:04:43 +00:00
if client . encode_json
2014-06-19 19:05:33 +00:00
json = JSON . parse ( data )
msg = Jampb :: ClientMessage . json_create ( json )
2012-10-11 03:04:43 +00:00
else
2014-06-19 19:05:33 +00:00
msg = Jampb :: ClientMessage . parse ( data . to_s )
2012-10-11 03:04:43 +00:00
end
2014-06-19 19:05:33 +00:00
end
2012-10-11 03:04:43 +00:00
2014-06-19 19:05:33 +00:00
# then route it internally
websocket_comm ( client , msg . message_id ) do
self . route ( msg , client )
2012-08-17 03:22:31 +00:00
end
}
end
2012-10-07 03:38:38 +00:00
def send_to_client ( client , msg )
2014-09-24 19:27:56 +00:00
@log . debug " SEND TO CLIENT ( #{ @message_factory . get_message_type ( msg ) } ) " unless msg . type == ClientMessage :: Type :: HEARTBEAT_ACK || msg . type == ClientMessage :: Type :: PEER_MESSAGE
2012-10-07 03:38:38 +00:00
if client . encode_json
client . send ( msg . to_json . to_s )
else
# this is so odd that this is necessary from an API perspective. but searching through the source code... it's all I could find in em-websocket for allowing a binary message to be sent
client . instance_variable_get ( :@handler ) . send_frame ( :binary , msg . to_s )
end
end
def cleanup ( )
# shutdown topic listeners and mq connection
2013-07-15 02:50:34 +00:00
unless @amqp_connection_manager . nil?
@amqp_connection_manager . disconnect
2012-10-07 03:38:38 +00:00
end
# tear down each individual client
@clients . each do | client , context |
cleanup_client ( client )
2012-10-13 22:55:01 +00:00
end
2012-10-07 03:38:38 +00:00
end
def stop
@log . info " shutdown "
cleanup
2012-08-17 03:22:31 +00:00
end
2013-02-21 17:49:32 +00:00
# caused a client connection to be marked stale
def stale_client ( client )
2014-04-30 03:01:28 +00:00
if client . client_id
2014-05-01 19:09:33 +00:00
@log . info " marking client stale: #{ client . context } "
2013-02-21 17:49:32 +00:00
ConnectionManager . active_record_transaction do | connection_manager |
2014-04-30 03:01:28 +00:00
music_session_id = connection_manager . flag_connection_stale_with_client_id ( client . client_id )
2013-08-07 15:38:35 +00:00
# update the session members, letting them know this client went stale
context = @client_lookup [ client . client_id ]
2014-05-06 13:34:38 +00:00
if music_session = ActiveMusicSession . find_by_id ( music_session_id )
2013-10-15 14:35:30 +00:00
Notification . send_musician_session_stale ( music_session , client . client_id , context . user )
end unless music_session_id . nil?
2013-02-21 17:49:32 +00:00
end
end
end
2012-08-17 03:22:31 +00:00
def route ( client_msg , client )
2012-10-11 03:04:43 +00:00
message_type = @message_factory . get_message_type ( client_msg )
2014-04-30 03:01:28 +00:00
if message_type . nil?
Diagnostic . unknown_message_type ( client . user_id , client_msg )
raise SessionError , " unknown message type received: #{ client_msg . type } " if message_type . nil?
end
2012-10-11 03:04:43 +00:00
2014-09-24 19:27:56 +00:00
@message_stats [ message_type ] = @message_stats [ message_type ] . to_i + 1
@log . debug ( " msg received #{ message_type } " ) if client_msg . type != ClientMessage :: Type :: HEARTBEAT && client_msg . type != ClientMessage :: Type :: HEARTBEAT_ACK && client_msg . type != ClientMessage :: Type :: PEER_MESSAGE
2012-10-11 03:04:43 +00:00
2014-04-30 03:01:28 +00:00
if client_msg . route_to . nil?
Diagnostic . missing_route_to ( client . user_id , client_msg )
raise SessionError , 'client_msg.route_to is null'
end
2012-08-17 03:22:31 +00:00
2016-03-04 15:06:13 +00:00
if ! client . user_id && ( client_msg . type != ClientMessage :: Type :: LOGIN && client_msg . type != ClientMessage :: Type :: HEARTBEAT && client_msg . type != ClientMessage :: Type :: LOGOUT )
2012-08-17 03:22:31 +00:00
# this client has not logged in and is trying to send a non-login message
2016-03-04 15:06:13 +00:00
if client . is_jamblaster
# send message back to client intsead of doing nothing?
@log . debug ( " jamblaster sent message #{ message_type } at wrong time " )
2016-03-04 16:44:39 +00:00
send_to_client ( client , @message_factory . diagnostic ( " message type #{ message_type } ignored because no log in " ) )
2016-03-04 15:06:13 +00:00
else
raise SessionError , " must 'Login' first "
end
2012-08-17 03:22:31 +00:00
end
if @message_factory . server_directed? client_msg
handle_server_directed ( client_msg , client )
2012-10-11 03:04:43 +00:00
elsif @message_factory . client_directed? client_msg
to_client_id = client_msg . route_to [ MessageFactory :: CLIENT_TARGET_PREFIX . length .. - 1 ]
2015-09-23 01:57:01 +00:00
time_it ( 'client_directed' ) { handle_client_directed ( to_client_id , client_msg , client ) }
2012-10-11 03:04:43 +00:00
2012-08-17 03:22:31 +00:00
elsif @message_factory . session_directed? client_msg
2012-10-07 20:51:43 +00:00
session_id = client_msg . target [ MessageFactory :: SESSION_TARGET_PREFIX . length .. - 1 ]
2015-09-23 01:57:01 +00:00
time_it ( 'session_directed' ) { handle_session_directed ( session_id , client_msg , client ) }
2012-08-17 03:22:31 +00:00
elsif @message_factory . user_directed? client_msg
2012-10-07 20:51:43 +00:00
user_id = client_msg . target [ MessageFactory :: USER_PREFIX_TARGET . length .. - 1 ]
2015-09-23 01:57:01 +00:00
time_it ( 'user_directed' ) { handle_user_directed ( user_id , client_msg , client ) }
2012-08-17 03:22:31 +00:00
else
2012-10-11 03:04:43 +00:00
raise SessionError , " client_msg.route_to is unknown type: #{ client_msg . route_to } "
2012-08-17 03:22:31 +00:00
end
2015-09-23 01:57:01 +00:00
2012-08-17 03:22:31 +00:00
end
def handle_server_directed ( client_msg , client )
2013-02-22 01:41:09 +00:00
# @log.info("*** handle_server_directed(#{client_msg.inspect}, #{client})")
2012-08-17 03:22:31 +00:00
if client_msg . type == ClientMessage :: Type :: LOGIN
2016-03-04 15:06:13 +00:00
# this is curently only a jamblaster path
client . query [ " token " ] = client_msg . login . token
client . query [ " username " ] = client_msg . login . username
client . query [ " password " ] = client_msg . login . password
time_it ( 'login' ) { handle_login ( client , client . query , client . x_forwarded_for , false ) }
elsif client_msg . type == ClientMessage :: Type :: LOGOUT
# this is currently only a jamblaster path
time_it ( 'login' ) { handle_logout ( client ) }
2012-10-11 03:04:43 +00:00
elsif client_msg . type == ClientMessage :: Type :: HEARTBEAT
2015-09-23 01:57:01 +00:00
time_it ( 'heartbeat' ) { sane_logging { handle_heartbeat ( client_msg . heartbeat , client_msg . message_id , client ) } }
2016-01-19 00:41:53 +00:00
elsif client_msg . type == ClientMessage :: Type :: USER_STATUS
time_it ( 'user_status' ) { sane_logging { handle_user_status ( client_msg . user_status , client ) } }
2014-12-18 21:13:55 +00:00
elsif client_msg . type == ClientMessage :: Type :: SUBSCRIBE_BULK
2015-09-22 20:25:48 +00:00
time_it ( 'subscribe_bulk' ) { sane_logging { handle_bulk_subscribe ( client_msg . subscribe_bulk , client ) } }
2014-12-18 21:13:55 +00:00
elsif client_msg . type == ClientMessage :: Type :: SUBSCRIBE
2015-09-22 20:25:48 +00:00
time_it ( 'subscribe' ) { sane_logging { handle_subscribe ( client_msg . subscribe , client ) } }
2014-12-18 21:13:55 +00:00
elsif client_msg . type == ClientMessage :: Type :: UNSUBSCRIBE
2015-09-22 20:25:48 +00:00
time_it ( 'unsubscribe' ) { sane_logging { handle_unsubscribe ( client_msg . unsubscribe , client ) } }
2012-08-17 03:22:31 +00:00
else
2012-10-11 03:04:43 +00:00
raise SessionError , " unknown message type ' #{ client_msg . type } ' for #{ client_msg . route_to } -directed message "
2012-08-17 03:22:31 +00:00
end
end
2012-08-24 02:46:58 +00:00
2014-04-30 20:29:10 +00:00
# returns heartbeat_interval, connection stale time, and connection expire time
def determine_connection_times ( user , client_type )
2012-08-24 02:46:58 +00:00
2014-04-30 20:29:10 +00:00
if client_type == Connection :: TYPE_BROWSER
default_heartbeat = @heartbeat_interval_browser
2016-03-04 15:06:13 +00:00
default_stale = @connect_time_stale_browser
default_expire = @connect_time_expire_browser
2012-08-17 03:22:31 +00:00
else
2014-04-30 20:29:10 +00:00
default_heartbeat = @heartbeat_interval_client
2016-03-04 15:06:13 +00:00
default_stale = @connect_time_stale_client
default_expire = @connect_time_expire_client
2014-04-30 20:29:10 +00:00
end
2016-03-04 15:06:13 +00:00
heartbeat_interval = ( user && user . heartbeat_interval_client ) || default_heartbeat
heartbeat_interval = heartbeat_interval . to_i
heartbeat_interval = default_heartbeat if heartbeat_interval == 0 # protect against bad config
connection_expire_time = ( user && user . connection_expire_time_client ) || default_expire
connection_expire_time = connection_expire_time . to_i
connection_expire_time = default_expire if connection_expire_time == 0 # protect against bad config
connection_stale_time = default_stale # no user override exists for this; not a very meaningful time right now
2014-04-30 20:29:10 +00:00
if heartbeat_interval > = connection_stale_time
raise SessionError , " misconfiguration! heartbeat_interval ( #{ heartbeat_interval } ) should be less than stale time ( #{ connection_stale_time } ) "
end
if connection_stale_time > = connection_expire_time
raise SessionError , " misconfiguration! stale time ( #{ connection_stale_time } ) should be less than expire time ( #{ connection_expire_time } ) "
2012-08-17 03:22:31 +00:00
end
2014-04-30 20:29:10 +00:00
[ heartbeat_interval , connection_stale_time , connection_expire_time ]
2012-08-17 03:22:31 +00:00
end
2014-05-19 13:46:03 +00:00
def add_tracker ( user , client , client_type , client_id )
# add a tracker for this user
2016-03-04 15:06:13 +00:00
context = @clients [ client ]
if context
context . user = user
add_user ( context ) if user
else
context = ClientContext . new ( user , client , client_type )
@clients [ client ] = context
add_user ( context ) if user
add_client ( client_id , context )
end
2014-05-19 13:46:03 +00:00
context
end
2014-09-26 04:13:34 +00:00
def handle_latency_tester_login ( client_id , client_type , client , override_ip )
2014-05-19 13:46:03 +00:00
# respond with LOGIN_ACK to let client know it was successful
2014-09-26 04:13:34 +00:00
remote_ip = extract_ip ( client , override_ip )
2014-05-19 13:46:03 +00:00
heartbeat_interval , connection_stale_time , connection_expire_time = determine_connection_times ( nil , client_type )
latency_tester = LatencyTester . connect ( {
client_id : client_id ,
2014-06-19 19:05:33 +00:00
channel_id : client . channel_id ,
2014-05-19 13:46:03 +00:00
ip_address : remote_ip ,
connection_stale_time : connection_stale_time ,
2014-09-24 19:27:56 +00:00
connection_expire_time : connection_expire_time ,
gateway : @gateway_name
} )
2014-05-19 13:46:03 +00:00
if latency_tester . errors . any?
@log . warn " unable to log in latency_tester with errors: #{ latency_tester . errors . inspect } "
2014-12-30 23:10:16 +00:00
stats_logged_in_failed
2014-05-19 13:46:03 +00:00
raise SessionError , " invalid login: #{ latency_tester . errors . inspect } "
end
client . client_id = client_id
client . user_id = latency_tester . id if latency_tester
@semaphore . synchronize do
context = add_tracker ( latency_tester , client , client_type , client_id )
@log . debug " logged in context created: #{ context } "
login_ack = @message_factory . login_ack ( remote_ip ,
client_id ,
nil ,
heartbeat_interval ,
nil ,
false ,
latency_tester . id ,
2016-03-10 14:10:43 +00:00
" latency_tester " ,
2014-05-19 13:46:03 +00:00
connection_expire_time )
2014-12-30 23:10:16 +00:00
stats_logged_in
2014-05-19 13:46:03 +00:00
send_to_client ( client , login_ack )
end
end
2016-03-04 15:06:13 +00:00
def handle_logout ( client )
connection = Connection . find_by_client_id ( client . client_id )
if connection
connection . delete
end
client . user_id = nil
context = client . context
if context
@log . debug ( " will remove context with user: #{ context . user } " )
remove_user ( context )
context . user = nil
end
logout_ack = @message_factory . logout_ack ( )
send_to_client ( client , logout_ack )
end
def handle_login ( client , options , override_ip = nil , connecting = true )
2014-06-19 19:05:33 +00:00
username = options [ " username " ]
password = options [ " password " ]
token = options [ " token " ]
client_id = options [ " client_id " ]
reconnect_music_session_id = options [ " music_session_id " ]
client_type = options [ " client_type " ]
2014-08-31 15:30:59 +00:00
os = options [ " os " ]
2016-03-04 15:06:13 +00:00
udp_reachable = options [ " udp_reachable " ] . nil? ? true : options [ " udp_reachable " ] == 'true'
2016-02-03 17:18:47 +00:00
jamblaster_serial_no = options [ " jamblaster_serial_no " ]
2012-10-11 03:04:43 +00:00
2016-03-04 15:06:13 +00:00
# TESTING
#if jamblaster_serial_no.nil?
# jamblaster_serial_no = 'hi'
#end
client . subscriptions = Set . new # list of subscriptions that this client is watching in real-time
2014-12-18 21:13:55 +00:00
2014-09-13 03:30:51 +00:00
@log . info ( " handle_login: client_type= #{ client_type } token= #{ token } client_id= #{ client_id } channel_id= #{ client . channel_id } udp_reachable= #{ udp_reachable } " )
2014-05-19 13:46:03 +00:00
if client_type == Connection :: TYPE_LATENCY_TESTER
2014-09-26 04:13:34 +00:00
handle_latency_tester_login ( client_id , client_type , client , override_ip )
2014-05-19 13:46:03 +00:00
return
end
2013-08-07 15:38:35 +00:00
reconnected = false
2013-02-22 01:41:09 +00:00
2016-03-04 15:06:13 +00:00
if connecting
# you don't have to supply client_id in login--if you don't, we'll generate one
if client_id . nil? || client_id . empty?
# give a unique ID to this client.
client_id = UUIDTools :: UUID . random_create . to_s
end
else
# client_id's don't change per websocket connection; so use the one from memeory
client_id = client . client_id
2013-11-02 20:54:32 +00:00
end
2016-03-04 15:06:13 +00:00
2016-02-03 17:18:47 +00:00
# we have to deal with jamblaster before login
2016-03-01 19:49:58 +00:00
if jamblaster_serial_no && jamblaster_serial_no != ''
2016-02-03 17:18:47 +00:00
jamblaster = Jamblaster . find_by_serial_no ( jamblaster_serial_no )
if jamblaster
2016-03-04 15:06:13 +00:00
client . is_jamblaster = true
end
if jamblaster && connecting
2016-02-03 17:18:47 +00:00
jamblaster . client_id = client_id
jamblaster . save
end
end
2016-03-04 15:06:13 +00:00
user = valid_login ( username , password , token , client_id , jamblaster )
2013-11-02 20:54:32 +00:00
2014-08-14 16:35:17 +00:00
# protect against this user swamping the server
2014-08-18 15:37:55 +00:00
if user && Connection . where ( user_id : user . id ) . count > = @max_connections_per_user
2014-08-14 18:16:38 +00:00
@log . warn " user #{ user . id } / #{ user . email } unable to connect due to max_connections_per_user #{ @max_connections_per_user } "
2014-12-30 23:10:16 +00:00
stats_logged_in_failed
2014-09-26 04:13:34 +00:00
raise SessionError , 'max_user_connections' , 'max_user_connections'
2014-08-14 16:35:17 +00:00
end
2016-03-04 15:06:13 +00:00
if connecting
# XXX This logic needs to instead be handled by a broadcast out to all websockets indicating dup
# kill any websocket connections that have this same client_id, which can happen in race conditions
# this code must happen here, before we go any further, so that there is only one websocket connection per client_id
existing_context = @client_lookup [ client_id ]
if existing_context
# in some reconnect scenarios, we may have in memory a websocket client still.
# let's whack it, and tell the other client, if still connected, that this is a duplicate login attempt
@log . info " duplicate client: #{ existing_context } "
Diagnostic . duplicate_client ( existing_context . user , existing_context )
error_msg = @message_factory . server_duplicate_client_error
send_to_client ( existing_context . client , error_msg )
cleanup_client ( existing_context . client )
end
2014-04-28 19:47:24 +00:00
end
2016-03-04 15:06:13 +00:00
2014-06-19 19:05:33 +00:00
connection = Connection . find_by_client_id ( client_id )
2014-04-30 20:29:10 +00:00
# if this connection is reused by a different user (possible in logout/login scenarios), then whack the connection
2013-11-02 20:54:32 +00:00
# because it will recreate a new connection lower down
2016-03-04 15:06:13 +00:00
if connection && connection . user != user
keep = false
if user
if connection . user . nil?
keep = true
@log . debug ( " user #{ user . email } logged into #{ client_id } " )
connection . user = user
connection . save
else
@log . debug ( " user #{ user . email } took client_id #{ client_id } from user #{ connection . user . email } " )
end
else
@log . debug ( " user-less connection #{ client_id } took from user #{ connection . user . email } " )
end
if ! keep
2013-11-02 20:54:32 +00:00
connection . delete
connection = nil
2016-03-04 15:06:13 +00:00
end
2013-11-02 20:54:32 +00:00
end
client . client_id = client_id
2014-04-30 03:01:28 +00:00
client . user_id = user . id if user
2014-09-26 04:13:34 +00:00
remote_ip = extract_ip ( client , override_ip )
2013-11-02 20:54:32 +00:00
2014-04-30 03:01:28 +00:00
if user
2014-04-30 20:29:10 +00:00
heartbeat_interval , connection_stale_time , connection_expire_time = determine_connection_times ( user , client_type )
2014-05-01 19:09:33 +00:00
@log . debug " logged in #{ user } with client_id: #{ client_id } "
2013-11-02 20:54:32 +00:00
2013-02-22 01:41:09 +00:00
# check if there's a connection for the client... if it's stale, reconnect it
2016-03-04 15:06:13 +00:00
if ! connection . nil? && connecting
2013-02-22 01:41:09 +00:00
# FIXME: I think connection table needs to updated within connection_manager
# otherwise this would be 1 line of code (connection.connect!)
2013-08-07 15:38:35 +00:00
music_session_upon_reentry = connection . music_session
2014-03-01 14:58:42 +00:00
send_depart = false
2014-04-30 03:01:28 +00:00
recording_id = nil
2014-03-01 14:58:42 +00:00
2013-02-22 01:41:09 +00:00
ConnectionManager . active_record_transaction do | connection_manager |
2014-09-24 19:27:56 +00:00
music_session_id , reconnected = connection_manager . reconnect ( connection , client . channel_id , reconnect_music_session_id , remote_ip , connection_stale_time , connection_expire_time , udp_reachable , @gateway_name )
2014-03-03 05:16:48 +00:00
2013-08-07 15:38:35 +00:00
if music_session_id . nil?
# if this is a reclaim of a connection, but music_session_id comes back null, then we need to check if this connection was IN a music session before.
# if so, then we need to tell the others in the session that this user is now departed
2015-01-09 21:15:12 +00:00
2014-04-30 03:01:28 +00:00
unless music_session_upon_reentry . nil? || music_session_upon_reentry . destroyed?
2015-01-09 21:15:12 +00:00
2015-01-21 04:14:48 +00:00
if music_session_upon_reentry . backing_track_initiator == user
music_session_upon_reentry . close_backing_track
end
2015-01-09 21:15:12 +00:00
# if a jamtrack is open and this user is no longer in the session, close it
if music_session_upon_reentry . jam_track_initiator == user
music_session_upon_reentry . close_jam_track
end
# if a recording is open and this user is no longer in the session, close it
if music_session_upon_reentry . claimed_recording_initiator == user
music_session_upon_reentry . claimed_recording_stop
end
# handle case that a recording was ongoing - any one leaves, we stop it
2014-01-05 01:25:05 +00:00
recording = music_session_upon_reentry . stop_recording
2014-11-03 21:24:46 +00:00
unless recording . nil?
@log . debug " stopped recording: #{ recording . id } because user #{ user } reconnected "
recording . discard_if_no_action ( user ) # throw away this users vote for the
recording_id = recording . id unless recording . nil?
end
# if the user was in a recording during the finializing phase (after stopped, but keep/discard still required in recordingFinishedDialog)
# then throw away the user's
most_recent_recording = music_session_upon_reentry . most_recent_recording
if most_recent_recording && most_recent_recording . users . exists? ( user )
@log . debug " disarded user's vote for recording: #{ most_recent_recording . id } because user #{ user } reconnected "
# if this user was in the most recent recording associated with the session they were just in, discard any tracks they had
most_recent_recording . discard_if_no_action ( user ) # throw away this users vote for the
end
2014-03-03 22:13:23 +00:00
music_session_upon_reentry . with_lock do # VRFS-1297
music_session_upon_reentry . tick_track_changes
end
2014-03-01 14:58:42 +00:00
send_depart = true
2013-11-03 20:55:55 +00:00
end
2013-08-07 15:38:35 +00:00
else
2014-05-06 13:34:38 +00:00
music_session = ActiveMusicSession . find_by_id ( music_session_id )
2014-04-30 03:01:28 +00:00
Notification . send_musician_session_fresh ( music_session , client . client_id , user )
2013-08-07 15:38:35 +00:00
end
2014-04-30 03:01:28 +00:00
end
2014-03-01 14:58:42 +00:00
if send_depart
2014-04-30 03:01:28 +00:00
Notification . send_session_depart ( music_session_upon_reentry , client . client_id , user , recording_id )
2014-03-01 14:58:42 +00:00
end
2013-02-22 01:41:09 +00:00
end
2012-08-17 03:22:31 +00:00
# respond with LOGIN_ACK to let client know it was successful
2016-03-04 15:06:13 +00:00
# add a tracker for this user
context = add_tracker ( user , client , client_type , client_id )
2012-10-02 05:03:08 +00:00
2016-03-04 15:06:13 +00:00
@log . debug " logged in context created: #{ context } "
2014-05-01 19:09:33 +00:00
2016-03-04 15:06:13 +00:00
if ! connection
# log this connection in the database
ConnectionManager . active_record_transaction do | connection_manager |
connection_manager . create_connection ( user . id , client . client_id , client . channel_id , remote_ip , client_type , connection_stale_time , connection_expire_time , udp_reachable , @gateway_name , jamblaster ? true : false ) do | conn , count |
user . update_addr_loc ( Connection . find_by_client_id ( client . client_id ) , User :: JAM_REASON_LOGIN )
if count == 1
Notification . send_friend_update ( user . id , true , conn )
2013-08-07 15:38:35 +00:00
end
2013-02-06 13:43:26 +00:00
end
2012-10-02 05:03:08 +00:00
end
2016-03-04 15:06:13 +00:00
end
2014-04-30 03:01:28 +00:00
2016-03-04 15:06:13 +00:00
# if we have OS data, try to grab client update data and let the client have it
update = ArtifactUpdate . find_client_by_os ( os ) if client_type == Connection :: TYPE_CLIENT && os
client_update = update . update_data if update
login_ack = @message_factory . login_ack ( remote_ip ,
client_id ,
user . remember_token ,
heartbeat_interval ,
connection . try ( :music_session_id ) ,
reconnected ,
user . id ,
connection_expire_time ,
2016-03-10 14:10:43 +00:00
user . name ,
2016-03-04 15:06:13 +00:00
client_update )
stats_logged_in
send_to_client ( client , login_ack )
elsif jamblaster
# if no user, but we have a jamblaster, we can allow this session to go through
heartbeat_interval , connection_stale_time , connection_expire_time = determine_connection_times ( nil , client_type )
@log . debug " logged in jb:: #{ jamblaster . serial_no } with client_id: #{ client_id } "
# check if there's a connection for the client... if it's stale, reconnect it
if ! connection . nil?
ConnectionManager . active_record_transaction do | connection_manager |
music_session_id , reconnected = connection_manager . reconnect ( connection , client . channel_id , reconnect_music_session_id , remote_ip , connection_stale_time , connection_expire_time , udp_reachable , @gateway_name )
end
end
# add a tracker for this user
context = add_tracker ( user , client , client_type , client_id )
@log . debug " logged in context created: #{ context } "
if ! connection
# log this connection in the database
ConnectionManager . active_record_transaction do | connection_manager |
connection_manager . create_connection ( nil , client . client_id , client . channel_id , remote_ip , client_type , connection_stale_time , connection_expire_time , udp_reachable , @gateway_name , jamblaster ? true : false ) do | conn , count |
# this blk is not call
end
end
2012-10-11 03:04:43 +00:00
end
2016-03-04 15:06:13 +00:00
# if we have OS data, try to grab client update data and let the client have it
update = ArtifactUpdate . find_client_by_os ( os ) if client_type == Connection :: TYPE_CLIENT && os
client_update = update . update_data if update
connect_ack = @message_factory . connect_ack ( remote_ip ,
client_id ,
heartbeat_interval ,
connection_expire_time ,
client_update )
stats_logged_in
send_to_client ( client , connect_ack )
2012-08-17 03:22:31 +00:00
else
2014-12-30 23:10:16 +00:00
stats_logged_in_failed
2014-09-26 04:13:34 +00:00
raise SessionError . new ( 'invalid login' , 'invalid_login' )
2012-08-17 03:22:31 +00:00
end
end
2014-12-18 21:13:55 +00:00
def handle_bulk_subscribe ( subscriptions , client )
subscriptions . types . each_with_index do | subscription , i |
handle_subscribe ( OpenStruct . new ( { type : subscriptions . types [ i ] , id : subscriptions . ids [ i ] } ) , client )
end
end
def handle_subscribe ( subscribe , client )
id = subscribe . id
type = subscribe . type
if id && id . length > 0 && type && type . length > 0
2015-01-23 22:51:59 +00:00
register_subscription ( client , type , id ) if @allow_dynamic_registration
2014-12-18 21:13:55 +00:00
else
@log . error ( " handle_subscribe: empty data #{ subscribe } " )
end
end
def handle_unsubscribe ( unsubscribe , client )
id = unsubscribe . id
type = unsubscribe . type
if id && id . length > 0 && type && type . length > 0
2015-01-23 22:51:59 +00:00
unregister_subscription ( client , type , id ) if @allow_dynamic_registration
2014-12-18 21:13:55 +00:00
else
@log . error ( " handle_subscribe: empty data #{ unsubscribe } " )
end
end
2015-09-23 14:18:00 +00:00
def add_to_ban ( user , reason )
user_ban = @temp_ban [ user . id ]
if user_ban . nil?
user_ban = { }
@temp_ban [ user . id ] = user_ban
end
# allow user back in, after 10 minutes
user_ban [ :allow ] = Time . now + 600
@log . info ( " user #{ user } banned for 10 minutes. reason #{ reason } " )
end
def runaway_heartbeat ( heartbeat , context )
2015-10-08 02:12:26 +00:00
heartbeat_count = @heartbeat_tracker [ context . client . client_id ] || 0
2015-09-23 14:18:00 +00:00
heartbeat_count += 1
2015-10-08 02:12:26 +00:00
@heartbeat_tracker [ context . client . client_id ] = heartbeat_count
2015-09-23 14:18:00 +00:00
if heartbeat_count > ( context . client_type == 'browser' ? @maximum_minutely_heartbeat_rate_browser : @maximum_minutely_heartbeat_rate_client )
@log . warn ( " user #{ context . user } sending too many heartbeats: #{ heartbeat_count } " ) if heartbeat_count % 100 == 0
add_to_ban ( context . user , 'too many heartbeats' )
raise SessionError . new ( 'too many heartbeats' , 'empty_login' )
else
false
end
end
2016-01-19 00:41:53 +00:00
def handle_user_status ( user_status , client )
client . context . active = user_status . active
end
2013-07-15 02:50:34 +00:00
def handle_heartbeat ( heartbeat , heartbeat_message_id , client )
2013-02-22 01:41:09 +00:00
unless context = @clients [ client ]
2015-09-23 01:57:01 +00:00
profile_it ( 'heartbeat_context_gone' ) {
@log . warn " *** WARNING: unable to find context when handling heartbeat. client_id= #{ client . client_id } ; killing session "
#Diagnostic.missing_client_state(client.user_id, client.context)
raise SessionError , 'context state is gone. please reconnect.'
}
2013-01-06 05:43:23 +00:00
else
2015-09-23 14:18:00 +00:00
if runaway_heartbeat ( heartbeat , context )
return
end
2015-09-23 01:57:01 +00:00
connection = nil
profile_it ( 'heartbeat_find_conn' ) {
connection = Connection . find_by_client_id ( context . client . client_id )
}
2014-03-03 22:13:23 +00:00
track_changes_counter = nil
2013-02-22 01:41:09 +00:00
if connection . nil?
2015-09-23 01:57:01 +00:00
profile_it ( 'heartbeat_diag_missing' ) {
@log . warn " *** WARNING: unable to find connection when handling heartbeat. context= #{ context } ; killing session "
Diagnostic . missing_connection ( client . user_id , client . context )
raise SessionError , 'connection state is gone. please reconnect.'
}
2013-02-21 17:49:32 +00:00
else
2015-09-23 01:57:01 +00:00
#profile_it('heartbeat_transaction') {
2016-03-04 15:06:13 +00:00
#Connection.transaction do
# send back track_changes_counter if in a session
profile_it ( 'heartbeat_session' ) {
if connection . music_session_id
music_session = ActiveMusicSession . select ( :track_changes_counter ) . find_by_id ( connection . music_session_id )
track_changes_counter = music_session . track_changes_counter if music_session
end
}
profile_it ( 'heartbeat_touch' ) {
# update connection updated_at and if the user is active
Connection . where ( id : connection . id ) . update_all ( user_active : heartbeat . active , updated_at : Time . now )
}
profile_it ( 'heartbeat_notification' ) {
# update user's notification_seen_at field if the heartbeat indicates it saw one
# first we try to use the notification id, which should usually exist.
# if not, then fallback to notification_seen_at, which is approximately the last time we saw a notification
update_notification_seen_at ( connection , context , heartbeat ) if client . context . client_type != Connection :: TYPE_LATENCY_TESTER
}
#end
2015-09-23 01:57:01 +00:00
#}
profile_it ( 'heartbeat_stale' ) {
if connection . stale?
ConnectionManager . active_record_transaction do | connection_manager |
heartbeat_interval , connection_stale_time , connection_expire_time = determine_connection_times ( context . user , context . client_type )
connection_manager . reconnect ( connection , client . channel_id , connection . music_session_id , nil , connection_stale_time , connection_expire_time , nil , @gateway_name )
end
2014-05-19 13:46:03 +00:00
end
2015-09-23 01:57:01 +00:00
}
2013-02-21 17:49:32 +00:00
end
2013-07-15 02:50:34 +00:00
2014-03-03 22:13:23 +00:00
heartbeat_ack = @message_factory . heartbeat_ack ( track_changes_counter )
2013-09-01 02:17:38 +00:00
send_to_client ( client , heartbeat_ack )
2013-08-07 15:38:35 +00:00
# send errors to clients in response to heartbeats if rabbitmq is down
2013-07-15 02:50:34 +00:00
if ! @amqp_connection_manager . connected?
error_msg = @message_factory . server_bad_state_error ( heartbeat_message_id , " messaging system down " )
context . sent_bad_state_previously = true
send_to_client ( client , error_msg )
return
elsif context . sent_bad_state_previously
context . sent_bad_state_previously = false
recovery_msg = @message_factory . server_bad_state_recovered ( heartbeat_message_id )
send_to_client ( client , recovery_msg )
end
2012-10-07 18:01:48 +00:00
end
2012-10-11 03:04:43 +00:00
end
2012-08-24 02:46:58 +00:00
2014-03-27 18:43:15 +00:00
def update_notification_seen_at ( connection , context , heartbeat )
notification_id_field = heartbeat . notification_seen if heartbeat . value_for_tag ( 1 )
2015-09-23 02:34:34 +00:00
if notification_id_field && notification_id_field != ''
2014-03-27 18:43:15 +00:00
notification = Notification . find_by_id ( notification_id_field )
if notification
connection . user . notification_seen_at = notification . created_at
unless connection . user . save ( validate : false )
@log . error " unable to update notification_seen_at via id field for client #{ context } . errors: #{ connection . user . errors . inspect } "
end
else
notification_seen_at_parsed = nil
notification_seen_at = heartbeat . notification_seen_at if heartbeat . value_for_tag ( 2 )
begin
notification_seen_at_parsed = Time . parse ( notification_seen_at ) if notification_seen_at && notification_seen_at . length > 0
rescue Exception = > e
@log . error " unable to parse notification_seen_at in heartbeat from #{ context } . notification_seen_at: #{ notification_seen_at } "
end
if notification_seen_at_parsed
connection . user . notification_seen_at = notification_seen_at
unless connection . user . save ( validate : false )
@log . error " unable to update notification_seen_at via time field for client #{ context } . errors: #{ connection . user . errors . inspect } "
end
end
end
end
end
2016-03-04 15:06:13 +00:00
def valid_login ( username , password , token , client_id , jamblaster )
2012-08-26 18:42:22 +00:00
2012-09-04 01:22:46 +00:00
if ! token . nil? && token != ''
@log . debug " logging in via token "
2012-10-11 03:04:43 +00:00
# attempt login with token
2014-06-19 19:05:33 +00:00
user = User . find_by_remember_token ( token )
2012-08-26 18:42:22 +00:00
2012-10-11 03:04:43 +00:00
if user . nil?
2013-02-21 17:49:32 +00:00
@log . debug " no user found with token #{ token } "
2013-05-18 11:50:39 +00:00
return nil
2012-10-11 03:04:43 +00:00
else
2015-09-23 14:18:00 +00:00
# check against temp ban list
if @temp_ban [ user . id ]
@log . debug ( " user #{ user } is still banned; rejecting login " )
raise SessionError . new ( 'login rejected temporarily' , 'empty_login' )
end
2012-10-11 03:04:43 +00:00
@log . debug " #{ user } login via token "
return user
end
2012-09-04 01:22:46 +00:00
2012-10-11 03:04:43 +00:00
elsif ! username . nil? and ! password . nil?
2012-09-04 01:22:46 +00:00
@log . debug " logging in via user/pass ' #{ username } ' ' #{ password } ' "
2012-10-11 03:04:43 +00:00
# attempt login with username and password
user = User . find_by_email ( username )
2015-09-23 14:18:00 +00:00
# check against temp ban list
if ! user . nil? && @temp_ban [ user . id ]
@log . debug ( " user #{ user } is still banned; rejecting login " )
raise SessionError . new ( 'login rejected temporarily' , 'empty_login' )
end
2013-03-15 04:24:25 +00:00
if ! user . nil? && user . valid_password? ( password )
2012-10-11 03:04:43 +00:00
@log . debug " #{ user } login via password "
return user
else
@log . debug " #{ username } login failure "
return nil
end
2016-03-04 15:06:13 +00:00
elsif jamblaster
# if there is a jamblaster in context, then we will allow no login.
return nil
2012-10-11 03:04:43 +00:00
else
2014-09-26 04:13:34 +00:00
raise SessionError . new ( 'no login data was found in Login message' , 'empty_login' )
2012-10-11 03:04:43 +00:00
end
end
2012-08-17 03:22:31 +00:00
2012-10-11 03:04:43 +00:00
def access_music_session ( music_session_id , user )
2014-05-06 13:34:38 +00:00
music_session = ActiveMusicSession . find_by_id ( music_session_id )
2012-08-17 03:22:31 +00:00
2012-10-11 03:04:43 +00:00
if music_session . nil?
raise SessionError , 'specified session not found'
end
2012-08-17 03:22:31 +00:00
2012-10-11 03:04:43 +00:00
if ! music_session . access? user
raise SessionError , 'not allowed to join the specified session'
end
2012-08-17 03:22:31 +00:00
2012-10-11 03:04:43 +00:00
return music_session
end
2012-08-17 03:22:31 +00:00
2012-10-11 03:04:43 +00:00
# client_id = the id of the client being accessed
# client = the current client
def access_p2p ( client_id , user , msg )
2012-08-17 03:22:31 +00:00
2016-03-04 15:06:13 +00:00
return nil
2014-09-13 03:30:51 +00:00
2012-10-11 03:04:43 +00:00
# ping_request and ping_ack messages are special in that they are simply allowed
if msg . type == ClientMessage :: Type :: PING_REQUEST || msg . type == ClientMessage :: Type :: PING_ACK
return nil
end
2012-08-17 03:22:31 +00:00
2012-10-29 00:24:16 +00:00
client_connection = Connection . find_by_client_id ( client_id )
2012-08-17 03:22:31 +00:00
2012-10-29 00:24:16 +00:00
if client_connection . nil?
2015-04-20 14:50:33 +00:00
raise JamPermissionError , 'specified client not found'
2012-10-11 03:04:43 +00:00
end
2012-08-17 03:22:31 +00:00
2012-10-29 00:24:16 +00:00
if ! client_connection . access_p2p? user
2012-10-11 03:04:43 +00:00
raise SessionError , 'not allowed to message this client'
end
end
def handle_client_directed ( to_client_id , client_msg , client )
context = @clients [ client ]
# by not catching any exception here, a PermissionError will be thrown if this isn't valid
# if for some reason the client is trying to send to a client that it doesn't
# belong to
2014-05-27 15:23:16 +00:00
#access_p2p(to_client_id, context.user, client_msg)
2012-10-11 03:04:43 +00:00
2014-09-13 03:30:51 +00:00
# quick and dirty safegaurds against the most dangerous operational messages from being sent by malicious clients
if client_msg . type == ClientMessage :: Type :: RELOAD ||
client_msg . type == ClientMessage :: Type :: CLIENT_UPDATE ||
client_msg . type == ClientMessage :: Type :: GENERIC_MESSAGE ||
client_msg . type == ClientMessage :: Type :: RESTART_APPLICATION ||
client_msg . type == ClientMessage :: Type :: STOP_APPLICATION
2016-03-04 15:06:13 +00:00
@@log . error ( " malicious activity " )
raise SessionError , " not allowed "
2014-09-13 03:30:51 +00:00
end
2013-11-16 04:35:40 +00:00
if to_client_id . nil? || to_client_id == 'undefined' # javascript translates to 'undefined' in many cases
raise SessionError , " empty client_id specified in peer-to-peer message "
end
2012-10-11 03:04:43 +00:00
# populate routing data
client_msg . from = client . client_id
2014-09-24 19:27:56 +00:00
@log . debug " publishing to client #{ to_client_id } from client_id #{ client . client_id } " unless client_msg . type == ClientMessage :: Type :: PEER_MESSAGE
2012-10-11 03:04:43 +00:00
# put it on the topic exchange for clients
@clients_exchange . publish ( client_msg . to_s , :routing_key = > " client. #{ to_client_id } " , :properties = > { :headers = > { " client_id " = > client . client_id } } )
end
2012-10-07 20:51:43 +00:00
def handle_user_directed ( user_id , client_msg , client )
@log . debug " publishing to user #{ user_id } from client_id #{ client . client_id } "
2012-10-11 03:04:43 +00:00
2012-10-07 20:51:43 +00:00
# put it on the topic exchange for users
@users_exchange . publish ( client_msg . to_s , :routing_key = > " user. #{ user_id } " )
2012-10-11 03:04:43 +00:00
end
2012-10-17 03:50:28 +00:00
def handle_session_directed ( session_id , client_msg , client )
context = @clients [ client ]
user_publish_to_session ( session_id , context . user , client_msg , :client_id = > client . client_id )
end
# sends a message to a session on behalf of a user
# if this is originating in the context of a client, it should be specified as :client_id => "value"
# client_msg should be a well-structure message (jam-pb message)
def user_publish_to_session ( music_session_id , user , client_msg , sender = { :client_id = > " " } )
music_session = access_music_session ( music_session_id , user )
# gather up client_ids in the session
client_ids = music_session . music_session_clients . map { | client | client . client_id } . reject { | client_id | client_id == sender [ :client_id ] }
publish_to_session ( music_session . id , client_ids , client_msg . to_s , sender )
end
# sends a message to a session with no checking of permissions
# this method deliberately has no database interactivity/active_record objects
def publish_to_session ( music_session_id , client_ids , client_msg , sender = { :client_id = > " " } )
EM . schedule do
sender_client_id = sender [ :client_id ]
# iterate over each person in the session, and send a p2p message
client_ids . each do | client_id |
@@log . debug " publishing to session: #{ music_session_id } client: #{ client_id } from client: #{ sender_client_id } "
# put it on the topic exchange3 for clients
2013-11-02 20:54:32 +00:00
self . class . client_exchange . publish ( client_msg , :routing_key = > " client. #{ client_id } " )
2012-10-17 03:50:28 +00:00
end
end
end
2012-10-23 11:42:28 +00:00
2014-09-26 04:13:34 +00:00
def extract_ip ( client , override_ip )
override_ip || Socket . unpack_sockaddr_in ( client . get_peername ) [ 1 ]
2012-10-23 11:42:28 +00:00
end
2014-04-29 01:45:06 +00:00
2014-09-24 19:27:56 +00:00
def periodical_flag_connections
# @log.debug("*** flag_stale_connections: fires each #{flag_max_time} seconds")
ConnectionManager . active_record_transaction do | connection_manager |
connection_manager . flag_stale_connections ( @gateway_name )
end
end
def periodical_check_clients
# it's possible that a client will not be represented in the database anymore, due to hard to trace/guess scenario
# usually involve reconnects. Double-check that all clients in memory are actually in the database. if not, delete them from memory
if @client_lookup . length == 0
return
end
2016-03-04 15:06:13 +00:00
client_ids = @client_lookup . map { | client_id , info | " (' #{ client_id } ') " } . join ( ',' )
2014-09-24 19:27:56 +00:00
# find all client_id's that do not have a row in the db, and whack them
# this style of query does the following: https://gist.github.com/sethcall/15308ccde298bff74584
2016-03-04 15:06:13 +00:00
sql = " WITH app_client_ids(client_id) AS (VALUES #{ client_ids } )
2014-09-24 19:27:56 +00:00
SELECT client_id from app_client_ids WHERE client_id NOT IN ( SELECT client_id FROM connections WHERE gateway = '#{@gateway_name}' ) ;
"
ConnectionManager . active_record_transaction do | connection_manager |
conn = connection_manager . pg_conn
conn . exec ( sql ) do | result |
result . each { | row |
client_id = row [ 'client_id' ]
context = @client_lookup [ client_id ]
if context
@log . debug ( " cleaning up missing client #{ client_id } , #{ context . user } " )
cleanup_client ( context . client )
else
@log . error ( " could not clean up missing client #{ client_id } " )
end
}
end
end
end
def periodical_check_connections
# this method is designed to be called periodically (every few seconds)
# in which this gateway instance will check only its own clients for their health
# since each gateway checks only the clients it knows about, this allows us to deploy
# n gateways that don't know much about each other.
# each gateway marks each connection row with it's gateway ID (so each gateway needs it's own ID or bad things happen)
# we also have a global resque job that checks for connections that appear to be not controlled by any gateway
# to make sure that we have stale connections cleaned up, even in the case of gateways that have crashed or are buggy
clients = [ ]
ConnectionManager . active_record_transaction do | connection_manager |
clients = connection_manager . stale_connection_client_ids ( @gateway_name )
end
cleanup_clients_with_ids ( clients )
end
def periodical_stats_dump
2015-09-23 14:18:00 +00:00
2014-09-24 19:27:56 +00:00
# assume 60 seconds per status dump
2016-03-04 15:06:13 +00:00
stats = @message_stats . sort_by { | k , v | - v }
2014-09-24 19:27:56 +00:00
stats . map { | i | i [ 1 ] = ( i [ 1 ] / 60 . 0 ) . round ( 2 ) }
2015-09-23 01:57:01 +00:00
@log . info ( " msg/s: " + stats . map { | i | i . join ( '=>' ) } . join ( ', ' ) )
2015-09-23 14:18:00 +00:00
@log . info ( " largest msg from #{ @largest_message_user } : #{ @largest_message ? @largest_message . length : 0 } b " )
2015-09-23 01:57:01 +00:00
2015-09-23 14:18:00 +00:00
if @highest_drift > 1
@log . info ( " highest drift: #{ @highest_drift - 2 } " )
end
2015-09-23 01:57:01 +00:00
total_time = 0
2016-03-04 15:06:13 +00:00
time_sums = @time_it_sums . sort_by { | k , v | - v }
2015-09-23 01:57:01 +00:00
log_num = 3
count = 0
2016-03-04 15:06:13 +00:00
time_sums . each do | cat , cat_time |
2015-09-23 01:57:01 +00:00
count += 1
if count < = log_num
@log . info ( " timed #{ cat } used time: #{ cat_time } " )
end
total_time += cat_time
end
@log . info ( " total used time: #{ total_time } " )
2016-03-04 15:06:13 +00:00
profile_sums = @profile_it_sums . sort_by { | k , v | - v }
profile_sums . each do | cat , cat_time |
2015-09-23 01:57:01 +00:00
@log . info ( " profiled #{ cat } used time: #{ cat_time } " )
end
2015-09-23 14:18:00 +00:00
@temp_ban . each do | user_id , data |
if Time . now > data [ :allow ]
@log . info ( " user #{ user_id } allowed back in " )
@temp_ban . delete ( user_id )
end
end
2014-12-30 23:10:16 +00:00
# stuff in extra stats into the @message_stats and send it all off
@message_stats [ 'gateway_name' ] = @gateway_name
@message_stats [ 'login' ] = @login_success_count
@message_stats [ 'login_fail' ] = @login_fail_count
@message_stats [ 'connected' ] = @connected_count
@message_stats [ 'disconnected' ] = @disconnected_count
2015-09-23 01:57:01 +00:00
@message_stats [ 'largest_msg' ] = @largest_message ? @largest_message . length : 0
2015-09-23 14:18:00 +00:00
@message_stats [ 'highest_drift' ] = @highest_drift - 2 # 2 comes from the server's 2 second timer for the drift check
@message_stats [ 'total_time' ] = total_time
@message_stats [ 'banned_users' ] = @temp_ban . length
2014-12-30 23:10:16 +00:00
Stats . write ( 'gateway.stats' , @message_stats )
# clear out stats
2014-09-24 19:27:56 +00:00
@message_stats . clear
2014-12-30 23:10:16 +00:00
@login_success_count = 0
@login_fail_count = 0
@connected_count = 0
@disconnected_count = 0
2015-09-23 01:57:01 +00:00
@user_message_counts = { }
@largest_message = nil
@largest_message_user = nil
@time_it_sums = { }
2015-09-23 14:18:00 +00:00
@highest_drift = 0
@heartbeat_tracker = { }
2014-09-24 19:27:56 +00:00
end
def cleanup_clients_with_ids ( expired_connections )
expired_connections . each do | expired_connection |
cid = expired_connection [ :client_id ]
client_context = @client_lookup [ cid ]
if client_context
2015-01-01 02:47:36 +00:00
#Diagnostic.expired_stale_connection(client_context.user.id, client_context)
2014-09-24 19:27:56 +00:00
cleanup_client ( client_context . client )
end
music_session = nil
recording_id = nil
user = nil
# remove this connection from the database
ConnectionManager . active_record_transaction do | mgr |
mgr . delete_connection ( cid ) { | conn , count , music_session_id , user_id |
2014-11-15 02:47:56 +00:00
user = User . find_by_id ( user_id )
2016-03-04 15:06:13 +00:00
return if user . nil? # this can happen if you delete a user while their connection is up
2014-11-03 21:24:46 +00:00
@log . info " expiring stale connection client_id: #{ cid } , user_id: #{ user } "
2014-09-24 19:27:56 +00:00
Notification . send_friend_update ( user_id , false , conn ) if count == 0
music_session = ActiveMusicSession . find_by_id ( music_session_id ) unless music_session_id . nil?
user = User . find_by_id ( user_id ) unless user_id . nil?
if music_session
2014-11-03 21:24:46 +00:00
recording = music_session . stop_recording
unless recording . nil?
@log . debug " stopped recording: #{ recording . id } because user #{ user } reconnected "
recording . discard_if_no_action ( user ) # throw away this users vote for the
recording_id = recording . id unless recording . nil?
end
# if the user was in a recording during the finializing phase (after stopped, but keep/discard still required in recordingFinishedDialog)
# then throw away the user's
most_recent_recording = music_session . most_recent_recording
if most_recent_recording && most_recent_recording . users . exists? ( user )
@log . debug " disarded user's vote for recording: #{ most_recent_recording . id } because user #{ user } reconnected "
# if this user was in the most recent recording associated with the session they were just in, discard any tracks they had
most_recent_recording . discard_if_no_action ( user ) # throw away this users vote for the
end
2014-09-24 19:27:56 +00:00
music_session . with_lock do # VRFS-1297
music_session . tick_track_changes
end
end
}
end
if user && music_session
Notification . send_session_depart ( music_session , cid , user , recording_id )
end
end
end
# removes all resources associated with a client
def cleanup_client ( client )
client . close
2014-12-18 21:13:55 +00:00
# unregister any subscriptions
client . subscriptions . each do | subscription |
unregister_subscription ( client , subscription [ :type ] , subscription [ :id ] )
end
2014-09-24 19:27:56 +00:00
@semaphore . synchronize do
pending = client . context . nil? # presence of context implies this connection has been logged into
if pending
@log . debug " cleaned up not-logged-in client #{ client } "
2014-12-30 23:10:16 +00:00
stats_disconnected
2014-09-24 19:27:56 +00:00
else
@log . debug " cleanup up logged-in client #{ client } "
context = @clients . delete ( client )
if context
remove_client ( client . client_id )
remove_user ( context )
2014-12-30 23:10:16 +00:00
stats_disconnected
2014-09-24 19:27:56 +00:00
else
@log . warn " skipping duplicate cleanup attempt of logged-in client "
end
end
end
end
2014-12-30 23:10:16 +00:00
def stats_logged_in
@login_success_count = @login_success_count + 1
end
def stats_logged_in_failed
@login_fail_count = @login_fail_count + 1
end
def stats_connected
@connected_count = @connected_count + 1
end
def stats_disconnected
@disconnected_count = @disconnected_count + 1
end
2014-09-24 19:27:56 +00:00
2014-04-29 01:45:06 +00:00
private
2015-09-22 20:25:48 +00:00
def time_it ( cat , & blk )
start = Time . now
blk . call
time = Time . now - start
2016-03-04 15:06:13 +00:00
@time_it_sums [ cat ] = ( @time_it_sums [ cat ] || 0 ) + time
2015-09-23 01:57:01 +00:00
@log . warn ( " LONG TIME: #{ cat } : #{ time } " ) if time > 1
end
def profile_it ( cat , & blk )
start = Time . now
blk . call
time = Time . now - start
2016-03-04 15:06:13 +00:00
@profile_it_sums [ cat ] = ( @profile_it_sums [ cat ] || 0 ) + time
2015-09-23 01:57:01 +00:00
2015-09-22 20:25:48 +00:00
@log . warn ( " LONG TIME: #{ cat } : #{ time } " ) if time > 1
end
2014-04-29 01:45:06 +00:00
def sane_logging ( & blk )
# used around repeated transactions that cause too much ActiveRecord::Base logging
begin
2014-05-01 19:09:33 +00:00
if @ar_base_logger
original_level = @ar_base_logger . level
@ar_base_logger . level = :info
end
2014-04-29 01:45:06 +00:00
blk . call
ensure
2014-05-01 19:09:33 +00:00
if @ar_base_logger
@ar_base_logger . level = original_level
end
2014-04-29 01:45:06 +00:00
end
end
2012-10-11 03:04:43 +00:00
end
2012-08-17 03:22:31 +00:00
end