* add timings for websocket-gateway health issues

This commit is contained in:
Seth Call 2015-09-22 20:57:01 -05:00
parent 1dce984247
commit e4e65f8c04
3 changed files with 131 additions and 52 deletions

View File

@ -2,4 +2,5 @@ ALTER TABLE crash_dumps ADD COLUMN email VARCHAR(255);
ALTER TABLE crash_dumps ADD COLUMN description VARCHAR(10000);
ALTER TABLE crash_dumps ADD COLUMN os VARCHAR(100);
ALTER TABLE crash_dumps ADD COLUMN os_version VARCHAR(100);
ALTER TABLE crash_dumps DROP CONSTRAINT crash_dumps_user_id_fkey;

View File

@ -29,7 +29,9 @@ module JamWebsockets
:connect_time_stale_browser,
:max_connections_per_user,
:gateway_name,
:client_lookup
:client_lookup,
:time_it_sums,
:profile_it_sums
def initialize()
@log = Logging.logger[self]
@ -54,11 +56,16 @@ module JamWebsockets
@gateway_name = nil
@ar_base_logger = ::Logging::Repository.instance[ActiveRecord::Base]
@message_stats = {}
@time_it_sums = {}
@profile_it_sums = {}
@login_success_count = 0
@login_fail_count = 0
@connected_count = 0
@disconnected_count = 0
@user_message_counts = {}
@largest_message = nil
@largest_message_user = nil
end
def start(connect_time_stale_client, connect_time_expire_client, connect_time_stale_browser, connect_time_expire_browser, options={:host => "localhost", :port => 5672, :max_connections_per_user => 10, :gateway => 'default', :allow_dynamic_registration => true}, &block)
@ -382,28 +389,30 @@ module JamWebsockets
client.onopen { |handshake|
stats_connected
time_it('onopen') {
stats_connected
# a unique ID for this TCP connection, to aid in debugging
client.channel_id = handshake.query["channel_id"]
# a unique ID for this TCP connection, to aid in debugging
client.channel_id = handshake.query["channel_id"]
@log.debug "client connected #{client} with channel_id: #{client.channel_id}"
@log.debug "client connected #{client} with channel_id: #{client.channel_id}"
# check for '?pb' or '?pb=true' in url query parameters
query_pb = handshake.query["pb"]
# check for '?pb' or '?pb=true' in url query parameters
query_pb = handshake.query["pb"]
if !query_pb.nil? && (query_pb == "" || query_pb == "true")
client.encode_json = false
end
if !query_pb.nil? && (query_pb == "" || query_pb == "true")
client.encode_json = false
end
websocket_comm(client, nil) do
handle_login(client, handshake.query, handshake.headers["X-Forwarded-For"])
end
websocket_comm(client, nil) do
handle_login(client, handshake.query, handshake.headers["X-Forwarded-For"])
end
}
}
client.onclose {
time_it('ws_close') {
time_it('onclose') {
@log.debug "connection closed. marking stale: #{client.context}"
cleanup_client(client)
}
@ -424,6 +433,12 @@ module JamWebsockets
msg = nil
if @largest_message.nil? || data.length > @largest_message.length
@largest_message = data
@largest_message_user = client.user_id
end
# extract the message safely
websocket_comm(client, nil) do
if client.encode_json
@ -486,7 +501,6 @@ module JamWebsockets
end
def route(client_msg, client)
time_it('route') {
message_type = @message_factory.get_message_type(client_msg)
if message_type.nil?
Diagnostic.unknown_message_type(client.user_id, client_msg)
@ -512,20 +526,20 @@ module JamWebsockets
elsif @message_factory.client_directed? client_msg
to_client_id = client_msg.route_to[MessageFactory::CLIENT_TARGET_PREFIX.length..-1]
handle_client_directed(to_client_id, client_msg, client)
time_it('client_directed') { handle_client_directed(to_client_id, client_msg, client) }
elsif @message_factory.session_directed? client_msg
session_id = client_msg.target[MessageFactory::SESSION_TARGET_PREFIX.length..-1]
handle_session_directed(session_id, client_msg, client)
time_it('session_directed') { handle_session_directed(session_id, client_msg, client) }
elsif @message_factory.user_directed? client_msg
user_id = client_msg.target[MessageFactory::USER_PREFIX_TARGET.length..-1]
handle_user_directed(user_id, client_msg, client)
time_it('user_directed') { handle_user_directed(user_id, client_msg, client) }
else
raise SessionError, "client_msg.route_to is unknown type: #{client_msg.route_to}"
end
}
end
def handle_server_directed(client_msg, client)
@ -533,10 +547,10 @@ module JamWebsockets
if client_msg.type == ClientMessage::Type::LOGIN
handle_login(client_msg.login, client)
time_it('login') { handle_login(client_msg.login, client) }
elsif client_msg.type == ClientMessage::Type::HEARTBEAT
sane_logging { handle_heartbeat(client_msg.heartbeat, client_msg.message_id, client) }
time_it('heartbeat') { sane_logging { handle_heartbeat(client_msg.heartbeat, client_msg.message_id, client) } }
elsif client_msg.type == ClientMessage::Type::SUBSCRIBE_BULK
time_it('subscribe_bulk') { sane_logging { handle_bulk_subscribe(client_msg.subscribe_bulk, client) } }
elsif client_msg.type == ClientMessage::Type::SUBSCRIBE
@ -838,39 +852,56 @@ module JamWebsockets
def handle_heartbeat(heartbeat, heartbeat_message_id, client)
unless context = @clients[client]
@log.warn "*** WARNING: unable to find context when handling heartbeat. client_id=#{client.client_id}; killing session"
#Diagnostic.missing_client_state(client.user_id, client.context)
raise SessionError, 'context state is gone. please reconnect.'
profile_it('heartbeat_context_gone') {
@log.warn "*** WARNING: unable to find context when handling heartbeat. client_id=#{client.client_id}; killing session"
#Diagnostic.missing_client_state(client.user_id, client.context)
raise SessionError, 'context state is gone. please reconnect.'
}
else
connection = Connection.find_by_client_id(context.client.client_id)
connection = nil
profile_it('heartbeat_find_conn') {
connection = Connection.find_by_client_id(context.client.client_id)
}
track_changes_counter = nil
if connection.nil?
@log.warn "*** WARNING: unable to find connection when handling heartbeat. context= #{context}; killing session"
Diagnostic.missing_connection(client.user_id, client.context)
raise SessionError, 'connection state is gone. please reconnect.'
profile_it('heartbeat_diag_missing') {
@log.warn "*** WARNING: unable to find connection when handling heartbeat. context= #{context}; killing session"
Diagnostic.missing_connection(client.user_id, client.context)
raise SessionError, 'connection state is gone. please reconnect.'
}
else
Connection.transaction do
# send back track_changes_counter if in a session
if connection.music_session_id
music_session = ActiveMusicSession.select(:track_changes_counter).find_by_id(connection.music_session_id)
track_changes_counter = music_session.track_changes_counter if music_session
#profile_it('heartbeat_transaction') {
#Connection.transaction do
# send back track_changes_counter if in a session
profile_it('heartbeat_session') {
if connection.music_session_id
music_session = ActiveMusicSession.select(:track_changes_counter).find_by_id(connection.music_session_id)
track_changes_counter = music_session.track_changes_counter if music_session
end
}
profile_it('heartbeat_touch') {
# update connection updated_at
connection.touch
}
profile_it('heartbeat_notification') {
# update user's notification_seen_at field if the heartbeat indicates it saw one
# first we try to use the notification id, which should usually exist.
# if not, then fallback to notification_seen_at, which is approximately the last time we saw a notification
update_notification_seen_at(connection, context, heartbeat) if client.context.client_type != Connection::TYPE_LATENCY_TESTER
}
#end
#}
profile_it('heartbeat_stale') {
if connection.stale?
ConnectionManager.active_record_transaction do |connection_manager|
heartbeat_interval, connection_stale_time, connection_expire_time = determine_connection_times(context.user, context.client_type)
connection_manager.reconnect(connection, client.channel_id, connection.music_session_id, nil, connection_stale_time, connection_expire_time, nil, @gateway_name)
end
end
# update connection updated_at
connection.touch
# update user's notification_seen_at field if the heartbeat indicates it saw one
# first we try to use the notification id, which should usually exist.
# if not, then fallback to notification_seen_at, which is approximately the last time we saw a notification
update_notification_seen_at(connection, context, heartbeat) if client.context.client_type != Connection::TYPE_LATENCY_TESTER
end
if connection.stale?
ConnectionManager.active_record_transaction do |connection_manager|
heartbeat_interval, connection_stale_time, connection_expire_time = determine_connection_times(context.user, context.client_type)
connection_manager.reconnect(connection, client.channel_id, connection.music_session_id, nil, connection_stale_time, connection_expire_time, nil, @gateway_name)
end
end
}
end
heartbeat_ack = @message_factory.heartbeat_ack(track_changes_counter)
@ -1130,7 +1161,33 @@ module JamWebsockets
stats = @message_stats.sort_by{|k,v| -v}
stats.map { |i| i[1] = (i[1] / 60.0).round(2) }
@log.info("msg/s: " + stats.map { |i| i.join('=>') }.join(', '));
@log.info("msg/s: " + stats.map { |i| i.join('=>') }.join(', '))
@log.info("largest msg from #{@largest_message_user}: #{@largest_message.length}b")
total_time = 0
time_sums = @time_it_sums.sort_by{|k,v| -v}
log_num = 3
count = 0
time_sums.each do | cat, cat_time |
count += 1
if count <= log_num
@log.info("timed #{cat} used time: #{cat_time}")
end
total_time += cat_time
end
@log.info("total used time: #{total_time}")
profile_sums = @profile_it_sums.sort_by{|k,v| -v}
profile_sums.each do | cat, cat_time |
@log.info("profiled #{cat} used time: #{cat_time}")
end
# stuff in extra stats into the @message_stats and send it all off
@ -1139,6 +1196,7 @@ module JamWebsockets
@message_stats['login_fail'] = @login_fail_count
@message_stats['connected'] = @connected_count
@message_stats['disconnected'] = @disconnected_count
@message_stats['largest_msg'] = @largest_message ? @largest_message.length : 0
Stats.write('gateway.stats', @message_stats)
@ -1148,6 +1206,11 @@ module JamWebsockets
@login_fail_count = 0
@connected_count = 0
@disconnected_count = 0
@user_message_counts = {}
@largest_message = nil
@largest_message_user = nil
@time_it_sums = {}
end
def cleanup_clients_with_ids(expired_connections)
@ -1207,7 +1270,6 @@ module JamWebsockets
# removes all resources associated with a client
def cleanup_client(client)
time_it('cleanup_client') {
client.close
# unregister any subscriptions
@ -1235,7 +1297,6 @@ module JamWebsockets
end
end
end
}
end
def stats_logged_in
@ -1263,6 +1324,20 @@ module JamWebsockets
time = Time.now - start
@time_it_sums[cat] = (@time_it_sums[cat] || 0 )+ time
@log.warn("LONG TIME: #{cat}: #{time}") if time > 1
end
def profile_it(cat, &blk)
start = Time.now
blk.call
time = Time.now - start
@profile_it_sums[cat] = (@profile_it_sums[cat] || 0 )+ time
@log.warn("LONG TIME: #{cat}: #{time}") if time > 1
end

View File

@ -152,6 +152,9 @@ module JamWebsockets
time = Time.now - start
@router.time_it_sums[cat] = (@router.time_it_sums[cat] || 0) + time
@log.warn("LONG TIME #{cat}: #{time}") if time > 1
end