* VRFS-1663 (diagnostics), VRFS-1657 (configurable timer for heartbeats), VRFS-1653 (websocket connection cleanup)
This commit is contained in:
parent
9820562892
commit
ad266e5b80
|
|
@ -1,2 +1,2 @@
|
|||
ALTER TABLE connections ADD COLUMN stale_time INTEGER NOT NULL DEFAULT 20;
|
||||
ALTER TABLE connections ADD COLUMN expire_time INTEGER NOT NULL DEFAULT 30;
|
||||
ALTER TABLE connections ADD COLUMN stale_time INTEGER NOT NULL DEFAULT 40;
|
||||
ALTER TABLE connections ADD COLUMN expire_time INTEGER NOT NULL DEFAULT 60;
|
||||
|
|
@ -44,7 +44,7 @@ module JamRuby
|
|||
end
|
||||
|
||||
# reclaim the existing connection, if ip_address is not nil then perhaps a new address as well
|
||||
def reconnect(conn, reconnect_music_session_id, ip_address)
|
||||
def reconnect(conn, reconnect_music_session_id, ip_address, connection_stale_time, connection_expire_time)
|
||||
music_session_id = nil
|
||||
reconnected = false
|
||||
|
||||
|
|
@ -54,7 +54,7 @@ module JamRuby
|
|||
joined_session_at_expression = 'NULL'
|
||||
unless reconnect_music_session_id.nil?
|
||||
music_session_id_expression = "(CASE WHEN music_session_id='#{reconnect_music_session_id}' THEN music_session_id ELSE NULL END)"
|
||||
joined_session_at_expression = "(CASE WHEN music_session_id='#{reconnect_music_session_id}' THEN NOW() ELSE NULL END)"
|
||||
joined_session_at_expression = "(CASE WHEN music_session_id='#{reconnect_music_session_id}' THEN NOW() at time zone 'utc' ELSE NULL END)"
|
||||
end
|
||||
|
||||
if ip_address and !ip_address.eql?(conn.ip_address)
|
||||
|
|
@ -101,7 +101,7 @@ module JamRuby
|
|||
end
|
||||
|
||||
sql =<<SQL
|
||||
UPDATE connections SET (aasm_state, updated_at, music_session_id, joined_session_at) = ('#{Connection::CONNECT_STATE.to_s}', NOW(), #{music_session_id_expression}, #{joined_session_at_expression})
|
||||
UPDATE connections SET (aasm_state, updated_at, music_session_id, joined_session_at, stale_time, expire_time) = ('#{Connection::CONNECT_STATE.to_s}', NOW() at time zone 'utc', #{music_session_id_expression}, #{joined_session_at_expression}, #{connection_stale_time}, #{connection_expire_time})
|
||||
WHERE
|
||||
client_id = '#{conn.client_id}'
|
||||
RETURNING music_session_id
|
||||
|
|
@ -141,24 +141,22 @@ SQL
|
|||
end
|
||||
|
||||
# flag connections as stale
|
||||
def flag_stale_connections(max_seconds)
|
||||
def flag_stale_connections()
|
||||
ConnectionManager.active_record_transaction do |connection_manager|
|
||||
conn = connection_manager.pg_conn
|
||||
sql =<<SQL
|
||||
SELECT count(user_id) FROM connections
|
||||
WHERE
|
||||
updated_at < (NOW() - interval '#{max_seconds} second') AND
|
||||
updated_at < (NOW() at time zone 'utc' - (interval '1 second' * stale_time))AND
|
||||
aasm_state = '#{Connection::CONNECT_STATE.to_s}'
|
||||
SQL
|
||||
conn.exec(sql) do |result|
|
||||
count = result.getvalue(0, 0)
|
||||
# @log.info("flag_stale_connections: flagging #{count} stale connections")
|
||||
if 0 < count.to_i
|
||||
# @log.info("flag_stale_connections: flagging #{count} stale connections")
|
||||
sql =<<SQL
|
||||
UPDATE connections SET aasm_state = '#{Connection::STALE_STATE.to_s}'
|
||||
WHERE
|
||||
updated_at < (NOW() - interval '#{max_seconds} second') AND
|
||||
updated_at < (NOW() at time zone 'utc' - (interval '1 second' * stale_time)) AND
|
||||
aasm_state = '#{Connection::CONNECT_STATE.to_s}'
|
||||
SQL
|
||||
conn.exec(sql)
|
||||
|
|
@ -169,19 +167,19 @@ SQL
|
|||
|
||||
# NOTE this is only used for testing purposes;
|
||||
# actual deletes will be processed in the websocket context which cleans up dependencies
|
||||
def expire_stale_connections(max_seconds)
|
||||
self.stale_connection_client_ids(max_seconds).each { |client| self.delete_connection(client[:client_id]) }
|
||||
def expire_stale_connections()
|
||||
self.stale_connection_client_ids().each { |client| self.delete_connection(client[:client_id]) }
|
||||
end
|
||||
|
||||
# expiring connections in stale state, which deletes them
|
||||
def stale_connection_client_ids(max_seconds)
|
||||
def stale_connection_client_ids()
|
||||
clients = []
|
||||
ConnectionManager.active_record_transaction do |connection_manager|
|
||||
conn = connection_manager.pg_conn
|
||||
sql =<<SQL
|
||||
SELECT client_id, music_session_id, user_id, client_type FROM connections
|
||||
WHERE
|
||||
updated_at < (NOW() - interval '#{max_seconds} second') AND
|
||||
updated_at < (NOW() at time zone 'utc' - (interval '1 second' * expire_time)) AND
|
||||
aasm_state = '#{Connection::STALE_STATE.to_s}'
|
||||
SQL
|
||||
conn.exec(sql) do |result|
|
||||
|
|
@ -205,7 +203,7 @@ SQL
|
|||
# this number is used by notification logic elsewhere to know
|
||||
# 'oh the user joined for the 1st time, so send a friend update', or
|
||||
# 'don't bother because the user has connected somewhere else already'
|
||||
def create_connection(user_id, client_id, ip_address, client_type, &blk)
|
||||
def create_connection(user_id, client_id, ip_address, client_type, connection_stale_time, connection_expire_time, &blk)
|
||||
|
||||
# validate client_type
|
||||
raise "invalid client_type: #{client_type}" if client_type != 'client' && client_type != 'browser'
|
||||
|
|
@ -247,8 +245,8 @@ SQL
|
|||
|
||||
lock_connections(conn)
|
||||
|
||||
conn.exec("INSERT INTO connections (user_id, client_id, ip_address, client_type, addr, locidispid, latitude, longitude, countrycode, region, city, aasm_state) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)",
|
||||
[user_id, client_id, ip_address, client_type, addr, locidispid, latitude, longitude, countrycode, region, city, Connection::CONNECT_STATE.to_s]).clear
|
||||
conn.exec("INSERT INTO connections (user_id, client_id, ip_address, client_type, addr, locidispid, latitude, longitude, countrycode, region, city, aasm_state, stale_time, expire_time) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)",
|
||||
[user_id, client_id, ip_address, client_type, addr, locidispid, latitude, longitude, countrycode, region, city, Connection::CONNECT_STATE.to_s, connection_stale_time, connection_expire_time]).clear
|
||||
|
||||
# we just created a new connection-if this is the first time the user has shown up, we need to send out a message to his friends
|
||||
conn.exec("SELECT count(user_id) FROM connections WHERE user_id = $1", [user_id]) do |result|
|
||||
|
|
|
|||
|
|
@ -3,6 +3,9 @@ require 'aasm'
|
|||
module JamRuby
|
||||
class Connection < ActiveRecord::Base
|
||||
|
||||
# client_types
|
||||
TYPE_CLIENT = 'client'
|
||||
TYPE_BROWSER = 'browser'
|
||||
|
||||
attr_accessor :joining_session
|
||||
|
||||
|
|
@ -12,9 +15,8 @@ module JamRuby
|
|||
belongs_to :music_session, :class_name => "JamRuby::MusicSession"
|
||||
has_many :tracks, :class_name => "JamRuby::Track", :inverse_of => :connection, :foreign_key => 'connection_id', :dependent => :delete_all
|
||||
|
||||
|
||||
validates :as_musician, :inclusion => {:in => [true, false]}
|
||||
validates :client_type, :inclusion => {:in => ['client', 'browser']}
|
||||
validates :client_type, :inclusion => {:in => [TYPE_CLIENT, TYPE_BROWSER]}
|
||||
validate :can_join_music_session, :if => :joining_session?
|
||||
after_save :require_at_least_one_track_when_in_session, :if => :joining_session?
|
||||
after_create :did_create
|
||||
|
|
|
|||
|
|
@ -61,7 +61,7 @@ module JamRuby
|
|||
# handle time range
|
||||
days = TIME_RANGES[time_range]
|
||||
if days > 0
|
||||
query = query.where("feeds.created_at > NOW() - '#{days} day'::INTERVAL")
|
||||
query = query.where("feeds.created_at > NOW() at time zone 'utc' - '#{days} day'::INTERVAL")
|
||||
end
|
||||
|
||||
# handle type filters
|
||||
|
|
|
|||
|
|
@ -289,12 +289,12 @@ module JamRuby
|
|||
@mods_json ||= mods ? JSON.parse(mods, symbolize_names: true) : {}
|
||||
end
|
||||
|
||||
def heartbeat_interval
|
||||
mods_json[:heartbeat_interval]
|
||||
def heartbeat_interval_client
|
||||
mods_json[:heartbeat_interval_client]
|
||||
end
|
||||
|
||||
def connection_expire_time
|
||||
mods_json[:connection_expire_time]
|
||||
def connection_expire_time_client
|
||||
mods_json[:connection_expire_time_client]
|
||||
end
|
||||
|
||||
def recent_history
|
||||
|
|
|
|||
|
|
@ -21,7 +21,7 @@ module JamRuby
|
|||
|
||||
def self.queue_jobs_needing_retry
|
||||
# if we haven't seen updated_at be tickled in 5 minutes, but config_changed is still set to TRUE, this record has gotten stale
|
||||
IcecastServer.find_each(:conditions => "config_changed = 1 AND updated_at < (NOW() - interval '#{APP_CONFIG.icecast_max_missing_check} second')", :batch_size => 100) do |server|
|
||||
IcecastServer.find_each(:conditions => "config_changed = 1 AND updated_at < (NOW() at time zone 'utc' - interval '#{APP_CONFIG.icecast_max_missing_check} second')", :batch_size => 100) do |server|
|
||||
IcecastConfigWriter.enqueue(server.server_id)
|
||||
end
|
||||
end
|
||||
|
|
|
|||
|
|
@ -32,7 +32,7 @@ module JamRuby
|
|||
|
||||
|
||||
def run # if we haven't seen updated_at be tickled in 5 minutes, but config_changed is still set to TRUE, this record has gotten stale
|
||||
IcecastMount.find_each(lock: true, :conditions => "sourced_needs_changing_at < (NOW() - interval '#{APP_CONFIG.icecast_max_sourced_changed} second')", :batch_size => 100) do |mount|
|
||||
IcecastMount.find_each(lock: true, :conditions => "sourced_needs_changing_at < (NOW() at time zone 'utc' - interval '#{APP_CONFIG.icecast_max_sourced_changed} second')", :batch_size => 100) do |mount|
|
||||
if mount.music_session_id
|
||||
mount.with_lock do
|
||||
handle_notifications(mount)
|
||||
|
|
|
|||
|
|
@ -247,15 +247,15 @@ describe ConnectionManager do
|
|||
|
||||
sleep(1)
|
||||
|
||||
num = JamRuby::Connection.count(:conditions => ["updated_at < (NOW() - interval '#{1} second') AND aasm_state = 'connected'"])
|
||||
num = JamRuby::Connection.count(:conditions => ["updated_at < (NOW() at time zone 'utc' - interval '#{1} second') AND aasm_state = 'connected'"])
|
||||
num.should == 1
|
||||
# this should change the aasm_state to stale
|
||||
@connman.flag_stale_connections(1)
|
||||
|
||||
num = JamRuby::Connection.count(:conditions => ["updated_at < (NOW() - interval '#{1} second') AND aasm_state = 'connected'"])
|
||||
num = JamRuby::Connection.count(:conditions => ["updated_at < (NOW() at time zone 'utc' - interval '#{1} second') AND aasm_state = 'connected'"])
|
||||
num.should == 0
|
||||
|
||||
num = JamRuby::Connection.count(:conditions => ["updated_at < (NOW() - interval '#{1} second') AND aasm_state = 'stale'"])
|
||||
num = JamRuby::Connection.count(:conditions => ["updated_at < (NOW() at time zone 'utc' - interval '#{1} second') AND aasm_state = 'stale'"])
|
||||
num.should == 1
|
||||
assert_num_connections(client_id, 1)
|
||||
|
||||
|
|
|
|||
|
|
@ -464,11 +464,11 @@ describe User do
|
|||
end
|
||||
|
||||
it "should return connection_expire_time" do
|
||||
@user.connection_expire_time.should be_nil
|
||||
@user.mods = {connection_expire_time: 5}.to_json
|
||||
@user.connection_expire_time_client.should be_nil
|
||||
@user.mods = {connection_expire_time_client: 5}.to_json
|
||||
@user.save!
|
||||
@user = User.find(@user.id) # necessary because mods_json is cached in the model
|
||||
@user.connection_expire_time.should == 5
|
||||
@user.connection_expire_time_client.should == 5
|
||||
end
|
||||
end
|
||||
=begin
|
||||
|
|
|
|||
|
|
@ -14,10 +14,14 @@
|
|||
|
||||
context.JK.JamServer = function (app) {
|
||||
|
||||
// uniquely identify the websocket connection
|
||||
var channelId = null;
|
||||
var clientType = null;
|
||||
|
||||
// heartbeat
|
||||
var heartbeatInterval = null;
|
||||
var heartbeatMS = null;
|
||||
var heartbeatMissedMS = 10000; // if 10 seconds go by and we haven't seen a heartbeat ack, get upset
|
||||
var connection_expire_time = null;
|
||||
var lastHeartbeatSentTime = null;
|
||||
var lastHeartbeatAckTime = null;
|
||||
var lastHeartbeatFound = false;
|
||||
|
|
@ -55,7 +59,6 @@
|
|||
server.socketClosedListeners = [];
|
||||
server.connected = false;
|
||||
|
||||
var clientType = context.JK.clientType();
|
||||
|
||||
function heartbeatStateReset() {
|
||||
lastHeartbeatSentTime = null;
|
||||
|
|
@ -136,8 +139,8 @@
|
|||
|
||||
// check if the server is still sending heartbeat acks back down
|
||||
// this logic equates to 'if we have not received a heartbeat within heartbeatMissedMS, then get upset
|
||||
if (new Date().getTime() - lastHeartbeatAckTime.getTime() > heartbeatMissedMS) {
|
||||
logger.error("no heartbeat ack received from server after ", heartbeatMissedMS, " seconds . giving up on socket connection");
|
||||
if (new Date().getTime() - lastHeartbeatAckTime.getTime() > connection_expire_time) {
|
||||
logger.error("no heartbeat ack received from server after ", connection_expire_time, " seconds . giving up on socket connection");
|
||||
lastDisconnectedReason = 'NO_HEARTBEAT_ACK';
|
||||
context.JK.JamServer.close(true);
|
||||
}
|
||||
|
|
@ -184,11 +187,11 @@
|
|||
|
||||
|
||||
heartbeatMS = payload.heartbeat_interval * 1000;
|
||||
logger.debug("jamkazam.js.loggedIn(): clientId now " + app.clientId + "; Setting up heartbeat every " + heartbeatMS + " MS");
|
||||
connection_expire_time = payload.connection_expire_time * 1000;
|
||||
logger.debug("jamkazam.js.loggedIn(): clientId=" + app.clientId + ", heartbeat=" + heartbeatMS + "ms, expire_time=" + connection_expire_time);
|
||||
heartbeatInterval = context.setInterval(_heartbeat, heartbeatMS);
|
||||
heartbeatAckCheckInterval = context.setInterval(_heartbeatAckCheck, 1000);
|
||||
lastHeartbeatAckTime = new Date(new Date().getTime() + heartbeatMS); // add a little forgiveness to server for initial heartbeat
|
||||
|
||||
connectDeferred.resolve();
|
||||
app.activeElementEvent('afterConnect', payload);
|
||||
|
||||
|
|
@ -250,23 +253,26 @@
|
|||
|
||||
rest.createDiagnostic({
|
||||
type: lastDisconnectedReason,
|
||||
data: {logs: logger.logCache, client_type: clientType, client_id: server.clientID}
|
||||
data: {logs: logger.logCache, client_type: clientType, client_id: server.clientID, channel_id: channelId}
|
||||
})
|
||||
.always(function() {
|
||||
if ($currentDisplay.is('.no-websocket-connection')) {
|
||||
// this path is the 'not in session path'; so there is nothing else to do
|
||||
$currentDisplay.hide();
|
||||
|
||||
// TODO: tell certain elements that we've reconnected
|
||||
}
|
||||
else {
|
||||
// this path is the 'in session' path, where we actually reload the page
|
||||
context.JK.CurrentSessionModel.leaveCurrentSession()
|
||||
.always(function () {
|
||||
window.location.reload();
|
||||
});
|
||||
}
|
||||
server.reconnecting = false;
|
||||
});
|
||||
|
||||
if ($currentDisplay.is('.no-websocket-connection')) {
|
||||
// this path is the 'not in session path'; so there is nothing else to do
|
||||
$currentDisplay.hide();
|
||||
|
||||
// TODO: tell certain elements that we've reconnected
|
||||
}
|
||||
else {
|
||||
// this path is the 'in session' path, where we actually reload the page
|
||||
context.JK.CurrentSessionModel.leaveCurrentSession()
|
||||
.always(function () {
|
||||
window.location.reload();
|
||||
});
|
||||
}
|
||||
server.reconnecting = false;
|
||||
}
|
||||
|
||||
function buildOptions() {
|
||||
|
|
@ -435,9 +441,14 @@
|
|||
};
|
||||
|
||||
server.connect = function () {
|
||||
if(!clientType) {
|
||||
clientType = context.JK.clientType();
|
||||
}
|
||||
connectDeferred = new $.Deferred();
|
||||
logger.log("server.connect");
|
||||
var uri = context.JK.websocket_gateway_uri; // Set in index.html.erb.
|
||||
channelId = context.JK.generateUUID(); // create a new channel ID for every websocket connection
|
||||
logger.log("connecting websocket, channel_id: " + channelId);
|
||||
|
||||
var uri = context.JK.websocket_gateway_uri + '?channel_id=' + channelId; // Set in index.html.erb.
|
||||
//var uri = context.gon.websocket_gateway_uri; // Leaving here for now, as we're looking for a better solution.
|
||||
|
||||
server.socket = new context.WebSocket(uri);
|
||||
|
|
@ -506,7 +517,7 @@
|
|||
|
||||
// onClose is called if either client or server closes connection
|
||||
server.onClose = function () {
|
||||
logger.log("Socket to server closed.", arguments);
|
||||
logger.log("Socket to server closed.");
|
||||
|
||||
if (connectDeferred.state() === "pending") {
|
||||
connectDeferred.reject();
|
||||
|
|
@ -611,6 +622,7 @@
|
|||
}
|
||||
|
||||
function initialize() {
|
||||
|
||||
registerLoginAck();
|
||||
registerHeartbeatAck();
|
||||
registerSocketClosed();
|
||||
|
|
|
|||
|
|
@ -298,7 +298,6 @@
|
|||
|
||||
|
||||
window.jamClient = interceptedJamClient;
|
||||
|
||||
}
|
||||
|
||||
// Let's get things rolling...
|
||||
|
|
|
|||
|
|
@ -105,11 +105,15 @@ if defined?(Bundler)
|
|||
# Websocket-gateway embedded configs
|
||||
config.websocket_gateway_enable = false
|
||||
if Rails.env=='test'
|
||||
config.websocket_gateway_connect_time_stale = 2
|
||||
config.websocket_gateway_connect_time_expire = 5
|
||||
config.websocket_gateway_connect_time_stale_client = 4
|
||||
config.websocket_gateway_connect_time_expire_client = 6
|
||||
config.websocket_gateway_connect_time_stale_browser = 4
|
||||
config.websocket_gateway_connect_time_expire_browser = 6
|
||||
else
|
||||
config.websocket_gateway_connect_time_stale = 12 # 12 matches production
|
||||
config.websocket_gateway_connect_time_expire = 20 # 20 matches production
|
||||
config.websocket_gateway_connect_time_stale_client = 40 # 40 matches production
|
||||
config.websocket_gateway_connect_time_expire_client = 60 # 60 matches production
|
||||
config.websocket_gateway_connect_time_stale_browser = 40 # 40 matches production
|
||||
config.websocket_gateway_connect_time_expire_browser = 60 # 60 matches production
|
||||
end
|
||||
config.websocket_gateway_internal_debug = false
|
||||
config.websocket_gateway_port = 6767 + ENV['JAM_INSTANCE'].to_i
|
||||
|
|
|
|||
|
|
@ -68,8 +68,10 @@ SampleApp::Application.configure do
|
|||
# it's nice to have even admin accounts (which all the default ones are) generate GA data for testing
|
||||
config.ga_suppress_admin = false
|
||||
|
||||
config.websocket_gateway_connect_time_stale = 12
|
||||
config.websocket_gateway_connect_time_expire = 20
|
||||
config.websocket_gateway_connect_time_stale_client = 40 # 40 matches production
|
||||
config.websocket_gateway_connect_time_expire_client = 60 # 60 matches production
|
||||
config.websocket_gateway_connect_time_stale_browser = 40 # 40 matches production
|
||||
config.websocket_gateway_connect_time_expire_browser = 60 # 60 matches production
|
||||
|
||||
config.audiomixer_path = ENV['AUDIOMIXER_PATH'] || audiomixer_workspace_path || "/var/lib/audiomixer/audiomixer/audiomixerapp"
|
||||
|
||||
|
|
|
|||
|
|
@ -9,8 +9,10 @@ unless $rails_rake_task
|
|||
JamWebsockets::Server.new.run(
|
||||
:port => APP_CONFIG.websocket_gateway_port,
|
||||
:emwebsocket_debug => APP_CONFIG.websocket_gateway_internal_debug,
|
||||
:connect_time_stale => APP_CONFIG.websocket_gateway_connect_time_stale,
|
||||
:connect_time_expire_client => APP_CONFIG.websocket_gateway_connect_time_expire,
|
||||
:connect_time_stale_client => APP_CONFIG.websocket_gateway_connect_time_stale_client,
|
||||
:connect_time_expire_client => APP_CONFIG.websocket_gateway_connect_time_expire_client,
|
||||
:connect_time_stale_browser => APP_CONFIG.websocket_gateway_connect_time_stale_browser,
|
||||
:connect_time_expire_browser=> APP_CONFIG.websocket_gateway_connect_time_expire_browser,
|
||||
:rabbitmq_host => APP_CONFIG.rabbitmq_host,
|
||||
:rabbitmq_port => APP_CONFIG.rabbitmq_port,
|
||||
:calling_thread => current)
|
||||
|
|
|
|||
|
|
@ -75,8 +75,10 @@ Thread.new do
|
|||
JamWebsockets::Server.new.run(
|
||||
:port => 6769,
|
||||
:emwebsocket_debug => false,
|
||||
:connect_time_stale => 2,
|
||||
:connect_time_expire_client => 5,
|
||||
:connect_time_stale_client => 4,
|
||||
:connect_time_expire_client => 6,
|
||||
:connect_time_stale_browser => 4,
|
||||
:connect_time_expire_browser => 6,
|
||||
:rabbitmq_host => 'localhost',
|
||||
:rabbitmq_port => 5672,
|
||||
:calling_thread => current)
|
||||
|
|
|
|||
|
|
@ -131,8 +131,8 @@ end
|
|||
|
||||
def leave_music_session_sleep_delay
|
||||
# add a buffer to ensure WSG has enough time to expire
|
||||
sleep_dur = (Rails.application.config.websocket_gateway_connect_time_stale +
|
||||
Rails.application.config.websocket_gateway_connect_time_expire) * 1.4
|
||||
sleep_dur = (Rails.application.config.websocket_gateway_connect_time_stale_browser +
|
||||
Rails.application.config.websocket_gateway_connect_time_expire_browser) * 1.4
|
||||
sleep sleep_dur
|
||||
end
|
||||
|
||||
|
|
|
|||
|
|
@ -1,8 +1,8 @@
|
|||
Defaults: &defaults
|
||||
connect_time_stale_client: 20
|
||||
connect_time_expire_client: 30
|
||||
connect_time_stale_client: 40
|
||||
connect_time_expire_client: 62
|
||||
connect_time_stale_browser: 40
|
||||
connect_time_expire_browser: 60
|
||||
connect_time_expire_browser: 62
|
||||
|
||||
development:
|
||||
port: 6767
|
||||
|
|
|
|||
|
|
@ -19,7 +19,7 @@
|
|||
end
|
||||
|
||||
def to_json
|
||||
{user_id: @user.id, client_id: @client.client_id, msg_count: @msg_count, client_type: @client_type}.to_json
|
||||
{user_id: @user.id, client_id: @client.client_id, msg_count: @msg_count, client_type: @client_type, socket_id: @client.socket_id}.to_json
|
||||
end
|
||||
|
||||
def hash
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ include Jampb
|
|||
module EventMachine
|
||||
module WebSocket
|
||||
class Connection < EventMachine::Connection
|
||||
attr_accessor :encode_json, :client_id, :user_id, :context # client_id is uuid we give to each client to track them as we like
|
||||
attr_accessor :encode_json, :channel_id, :client_id, :user_id, :context # client_id is uuid we give to each client to track them as we like
|
||||
|
||||
# http://stackoverflow.com/questions/11150147/how-to-check-if-eventmachineconnection-is-open
|
||||
attr_accessor :connected
|
||||
|
|
@ -51,8 +51,10 @@ module JamWebsockets
|
|||
@thread_pool = nil
|
||||
@heartbeat_interval_client = nil
|
||||
@connect_time_expire_client = nil
|
||||
@connect_time_stale_client = nil
|
||||
@heartbeat_interval_browser= nil
|
||||
@connect_time_expire_browser= nil
|
||||
@connect_time_stale_browser= nil
|
||||
@ar_base_logger = ::Logging::Repository.instance[ActiveRecord::Base]
|
||||
end
|
||||
|
||||
|
|
@ -60,8 +62,12 @@ module JamWebsockets
|
|||
|
||||
@log.info "startup"
|
||||
|
||||
@heartbeat_interval_client = connect_time_stale_client / 2
|
||||
@connect_time_expire_client = connect_time_expire_client
|
||||
@heartbeat_interval_client = connect_time_stale_client / 2
|
||||
@connect_time_stale_client = connect_time_stale_client
|
||||
@connect_time_expire_client = connect_time_expire_client
|
||||
@heartbeat_interval_browser = connect_time_stale_browser / 2
|
||||
@connect_time_stale_browser = connect_time_stale_browser
|
||||
@connect_time_expire_browser = connect_time_expire_browser
|
||||
|
||||
begin
|
||||
@amqp_connection_manager = AmqpConnectionManager.new(true, 4, :host => options[:host], :port => options[:port])
|
||||
|
|
@ -221,8 +227,11 @@ module JamWebsockets
|
|||
client.encode_json = true
|
||||
|
||||
client.onopen { |handshake|
|
||||
#binding.pry
|
||||
@log.debug "client connected #{client}"
|
||||
# a unique ID for this TCP connection, to aid in debugging
|
||||
client.channel_id = handshake.query["channel_id"]
|
||||
|
||||
@log.debug "client connected #{client} with channel_id: #{client.channel_id}"
|
||||
|
||||
|
||||
# check for '?pb' or '?pb=true' in url query parameters
|
||||
query_pb = handshake.query["pb"]
|
||||
|
|
@ -447,6 +456,37 @@ module JamWebsockets
|
|||
end
|
||||
end
|
||||
|
||||
# returns heartbeat_interval, connection stale time, and connection expire time
|
||||
def determine_connection_times(user, client_type)
|
||||
|
||||
if client_type == Connection::TYPE_BROWSER
|
||||
default_heartbeat = @heartbeat_interval_browser
|
||||
default_stale = @connect_time_stale_browser
|
||||
default_expire = @connect_time_expire_browser
|
||||
else
|
||||
default_heartbeat = @heartbeat_interval_client
|
||||
default_stale = @connect_time_stale_client
|
||||
default_expire = @connect_time_expire_client
|
||||
end
|
||||
|
||||
heartbeat_interval = user.heartbeat_interval_client || default_heartbeat
|
||||
heartbeat_interval = heartbeat_interval.to_i
|
||||
heartbeat_interval = default_heartbeat if heartbeat_interval == 0 # protect against bad config
|
||||
connection_expire_time = user.connection_expire_time_client || default_expire
|
||||
connection_expire_time = connection_expire_time.to_i
|
||||
connection_expire_time = default_expire if connection_expire_time == 0 # protect against bad config
|
||||
connection_stale_time = default_stale # no user override exists for this; not a very meaningful time right now
|
||||
|
||||
if heartbeat_interval >= connection_stale_time
|
||||
raise SessionError, "misconfiguration! heartbeat_interval (#{heartbeat_interval}) should be less than stale time (#{connection_stale_time})"
|
||||
end
|
||||
if connection_stale_time >= connection_expire_time
|
||||
raise SessionError, "misconfiguration! stale time (#{connection_stale_time}) should be less than expire time (#{connection_expire_time})"
|
||||
end
|
||||
|
||||
[heartbeat_interval, connection_stale_time, connection_expire_time]
|
||||
end
|
||||
|
||||
def handle_login(login, client)
|
||||
username = login.username if login.value_for_tag(1)
|
||||
password = login.password if login.value_for_tag(2)
|
||||
|
|
@ -460,8 +500,7 @@ module JamWebsockets
|
|||
|
||||
# you don't have to supply client_id in login--if you don't, we'll generate one
|
||||
if client_id.nil? || client_id.empty?
|
||||
# give a unique ID to this client. This is used to prevent session messages
|
||||
# from echoing back to the sender, for instance.
|
||||
# give a unique ID to this client.
|
||||
client_id = UUIDTools::UUID.random_create.to_s
|
||||
end
|
||||
|
||||
|
|
@ -471,13 +510,13 @@ module JamWebsockets
|
|||
# this code must happen here, before we go any further, so that there is only one websocket connection per client_id
|
||||
existing_context = @client_lookup[client_id]
|
||||
if existing_context
|
||||
# in reconnect scenarios, we may have in memory a client still
|
||||
# in some reconnect scenarios, we may have in memory a websocket client still.
|
||||
Diagnostic.duplicate_client(existing_context.user, existing_context) if existing_context.client.connected
|
||||
cleanup_client(existing_context.client)
|
||||
end
|
||||
|
||||
connection = JamRuby::Connection.find_by_client_id(client_id)
|
||||
# if this connection is reused by a different user, then whack the connection
|
||||
# if this connection is reused by a different user (possible in logout/login scenarios), then whack the connection
|
||||
# because it will recreate a new connection lower down
|
||||
if connection && user && connection.user != user
|
||||
@log.debug("user #{user.email} took client_id #{client_id} from user #{connection.user.email}")
|
||||
|
|
@ -490,6 +529,9 @@ module JamWebsockets
|
|||
remote_ip = extract_ip(client)
|
||||
|
||||
if user
|
||||
|
||||
heartbeat_interval, connection_stale_time, connection_expire_time = determine_connection_times(user, client_type)
|
||||
|
||||
@log.debug "user #{user} logged in with client_id #{client_id}"
|
||||
|
||||
# check if there's a connection for the client... if it's stale, reconnect it
|
||||
|
|
@ -503,7 +545,7 @@ module JamWebsockets
|
|||
recording_id = nil
|
||||
|
||||
ConnectionManager.active_record_transaction do |connection_manager|
|
||||
music_session_id, reconnected = connection_manager.reconnect(connection, reconnect_music_session_id, remote_ip)
|
||||
music_session_id, reconnected = connection_manager.reconnect(connection, reconnect_music_session_id, remote_ip, connection_stale_time, connection_expire_time)
|
||||
|
||||
if music_session_id.nil?
|
||||
# if this is a reclaim of a connection, but music_session_id comes back null, then we need to check if this connection was IN a music session before.
|
||||
|
|
@ -540,7 +582,7 @@ module JamWebsockets
|
|||
unless connection
|
||||
# log this connection in the database
|
||||
ConnectionManager.active_record_transaction do |connection_manager|
|
||||
connection_manager.create_connection(user.id, client.client_id, remote_ip, client_type) do |conn, count|
|
||||
connection_manager.create_connection(user.id, client.client_id, remote_ip, client_type, connection_stale_time, connection_expire_time) do |conn, count|
|
||||
if count == 1
|
||||
Notification.send_friend_update(user.id, true, conn)
|
||||
end
|
||||
|
|
@ -548,20 +590,14 @@ module JamWebsockets
|
|||
end
|
||||
end
|
||||
|
||||
heartbeat_interval = user.heartbeat_interval_client.to_i || @heartbeat_interval_client
|
||||
heartbeat_interval = @heartbeat_interval_client if heartbeat_interval == 0 # protect against bad config
|
||||
connection_expire_time = user.connection_expire_time || @connection_expire_time
|
||||
connection_expire_time = @connection_expire_time if connection_expire_time == 0 # protect against bad config
|
||||
|
||||
|
||||
login_ack = @message_factory.login_ack(remote_ip,
|
||||
client_id,
|
||||
user.remember_token,
|
||||
@heartbeat_interval_client,
|
||||
heartbeat_interval,
|
||||
connection.try(:music_session_id),
|
||||
reconnected,
|
||||
user.id,
|
||||
@connection_expire_time)
|
||||
connection_expire_time)
|
||||
send_to_client(client, login_ack)
|
||||
end
|
||||
else
|
||||
|
|
|
|||
|
|
@ -33,9 +33,8 @@ module JamWebsockets
|
|||
|
||||
EventMachine.run do
|
||||
@router.start(connect_time_stale_client, connect_time_expire_client, connect_time_stale_browser, connect_time_expire_browser, host: rabbitmq_host, port: rabbitmq_port) do
|
||||
expire_time = connect_time_expire_client
|
||||
start_connection_expiration(expire_time)
|
||||
start_connection_flagger(connect_time_stale_client)
|
||||
start_connection_expiration()
|
||||
start_connection_flagger()
|
||||
start_websocket_listener(host, port, options[:emwebsocket_debug])
|
||||
calling_thread.wakeup if calling_thread
|
||||
end
|
||||
|
|
@ -60,37 +59,37 @@ module JamWebsockets
|
|||
@log.debug("started websocket")
|
||||
end
|
||||
|
||||
def start_connection_expiration(stale_max_time)
|
||||
def start_connection_expiration()
|
||||
# one cleanup on startup
|
||||
expire_stale_connections(stale_max_time)
|
||||
expire_stale_connections()
|
||||
|
||||
EventMachine::PeriodicTimer.new(stale_max_time) do
|
||||
sane_logging { expire_stale_connections(stale_max_time) }
|
||||
EventMachine::PeriodicTimer.new(5) do
|
||||
sane_logging { expire_stale_connections() }
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
def expire_stale_connections(stale_max_time)
|
||||
def expire_stale_connections()
|
||||
clients = []
|
||||
ConnectionManager.active_record_transaction do |connection_manager|
|
||||
clients = connection_manager.stale_connection_client_ids(stale_max_time)
|
||||
clients = connection_manager.stale_connection_client_ids()
|
||||
end
|
||||
@router.cleanup_clients_with_ids(clients)
|
||||
end
|
||||
|
||||
def start_connection_flagger(flag_max_time)
|
||||
def start_connection_flagger()
|
||||
# one cleanup on startup
|
||||
flag_stale_connections(flag_max_time)
|
||||
flag_stale_connections()
|
||||
|
||||
EventMachine::PeriodicTimer.new(flag_max_time/2) do
|
||||
sane_logging { flag_stale_connections(flag_max_time) }
|
||||
EventMachine::PeriodicTimer.new(5) do
|
||||
sane_logging { flag_stale_connections() }
|
||||
end
|
||||
end
|
||||
|
||||
def flag_stale_connections(flag_max_time)
|
||||
def flag_stale_connections()
|
||||
# @log.debug("*** flag_stale_connections: fires each #{flag_max_time} seconds")
|
||||
ConnectionManager.active_record_transaction do |connection_manager|
|
||||
connection_manager.flag_stale_connections(flag_max_time)
|
||||
connection_manager.flag_stale_connections()
|
||||
end
|
||||
end
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue