* VRFS-1073 - support multiple websocket gateways
This commit is contained in:
parent
350f7ae19a
commit
b87ff571ce
|
|
@ -36,6 +36,7 @@ FactoryGirl.define do
|
|||
addr 0
|
||||
locidispid 0
|
||||
client_type 'client'
|
||||
gateway 'gateway1'
|
||||
scoring_timeout Time.now
|
||||
sequence(:channel_id) { |n| "Channel#{n}"}
|
||||
association :user, factory: :user
|
||||
|
|
|
|||
|
|
@ -208,3 +208,4 @@ undirected_scores.sql
|
|||
discard_scores.sql
|
||||
new_genres.sql
|
||||
get_work_faster.sql
|
||||
multiple_gateways.sql
|
||||
|
|
|
|||
|
|
@ -0,0 +1,3 @@
|
|||
-- allow multiple websockegateways to open
|
||||
DELETE FROM connections;
|
||||
ALTER TABLE connections ADD COLUMN gateway VARCHAR NOT NULL;
|
||||
|
|
@ -44,7 +44,7 @@ module JamRuby
|
|||
end
|
||||
|
||||
# reclaim the existing connection, if ip_address is not nil then perhaps a new address as well
|
||||
def reconnect(conn, channel_id, reconnect_music_session_id, ip_address, connection_stale_time, connection_expire_time, udp_reachable)
|
||||
def reconnect(conn, channel_id, reconnect_music_session_id, ip_address, connection_stale_time, connection_expire_time, udp_reachable, gateway)
|
||||
music_session_id = nil
|
||||
reconnected = false
|
||||
|
||||
|
|
@ -89,8 +89,8 @@ module JamRuby
|
|||
udp_reachable_value = udp_reachable.nil? ? 'udp_reachable' : udp_reachable
|
||||
|
||||
sql =<<SQL
|
||||
UPDATE connections SET (channel_id, aasm_state, updated_at, music_session_id, joined_session_at, stale_time, expire_time, udp_reachable) = ('#{channel_id}', '#{Connection::CONNECT_STATE.to_s}', NOW(), #{music_session_id_expression}, #{joined_session_at_expression}, #{connection_stale_time}, #{connection_expire_time}, #{udp_reachable_value})
|
||||
WHERE
|
||||
UPDATE connections SET (channel_id, aasm_state, updated_at, music_session_id, joined_session_at, stale_time, expire_time, udp_reachable, gateway) = ('#{channel_id}', '#{Connection::CONNECT_STATE.to_s}', NOW(), #{music_session_id_expression}, #{joined_session_at_expression}, #{connection_stale_time}, #{connection_expire_time}, #{udp_reachable_value}, '#{gateway}')
|
||||
WHERE
|
||||
client_id = '#{conn.client_id}'
|
||||
RETURNING music_session_id
|
||||
SQL
|
||||
|
|
@ -129,46 +129,26 @@ SQL
|
|||
end
|
||||
|
||||
# flag connections as stale
|
||||
def flag_stale_connections()
|
||||
def flag_stale_connections(gateway_name)
|
||||
ConnectionManager.active_record_transaction do |connection_manager|
|
||||
conn = connection_manager.pg_conn
|
||||
sql =<<SQL
|
||||
SELECT count(user_id) FROM connections
|
||||
WHERE
|
||||
updated_at < (NOW() - (interval '1 second' * stale_time))AND
|
||||
aasm_state = '#{Connection::CONNECT_STATE.to_s}'
|
||||
SQL
|
||||
conn.exec(sql) do |result|
|
||||
count = result.getvalue(0, 0)
|
||||
if 0 < count.to_i
|
||||
sql =<<SQL
|
||||
UPDATE connections SET aasm_state = '#{Connection::STALE_STATE.to_s}'
|
||||
WHERE
|
||||
updated_at < (NOW() - (interval '1 second' * stale_time)) AND
|
||||
aasm_state = '#{Connection::CONNECT_STATE.to_s}'
|
||||
SQL
|
||||
conn.exec(sql)
|
||||
end
|
||||
end
|
||||
sql = "UPDATE connections SET aasm_state = '#{Connection::STALE_STATE.to_s}' WHERE updated_at < (NOW() - (interval '1 second' * stale_time)) AND aasm_state = '#{Connection::CONNECT_STATE.to_s}' AND gateway = '#{gateway_name}'"
|
||||
conn.exec(sql)
|
||||
end
|
||||
end
|
||||
|
||||
# NOTE this is only used for testing purposes;
|
||||
# actual deletes will be processed in the websocket context which cleans up dependencies
|
||||
def expire_stale_connections()
|
||||
self.stale_connection_client_ids.each { |client| self.delete_connection(client[:client_id]) }
|
||||
def expire_stale_connections(gateway_name)
|
||||
self.stale_connection_client_ids(gateway_name).each { |client| self.delete_connection(client[:client_id]) }
|
||||
end
|
||||
|
||||
# expiring connections in stale state, which deletes them
|
||||
def stale_connection_client_ids
|
||||
def stale_connection_client_ids(gateway_name)
|
||||
clients = []
|
||||
ConnectionManager.active_record_transaction do |connection_manager|
|
||||
conn = connection_manager.pg_conn
|
||||
sql =<<SQL
|
||||
SELECT client_id, music_session_id, user_id, client_type FROM connections
|
||||
WHERE
|
||||
updated_at < (NOW() - (interval '1 second' * expire_time))
|
||||
SQL
|
||||
sql = "SELECT client_id, music_session_id, user_id, client_type FROM connections WHERE updated_at < (NOW() - (interval '1 second' * expire_time)) AND gateway = '#{gateway_name}'"
|
||||
conn.exec(sql) do |result|
|
||||
result.each { |row|
|
||||
client_id = row['client_id']
|
||||
|
|
@ -187,7 +167,7 @@ SQL
|
|||
# this number is used by notification logic elsewhere to know
|
||||
# 'oh the user joined for the 1st time, so send a friend update', or
|
||||
# 'don't bother because the user has connected somewhere else already'
|
||||
def create_connection(user_id, client_id, channel_id, ip_address, client_type, connection_stale_time, connection_expire_time, udp_reachable, &blk)
|
||||
def create_connection(user_id, client_id, channel_id, ip_address, client_type, connection_stale_time, connection_expire_time, udp_reachable, gateway, &blk)
|
||||
|
||||
# validate client_type
|
||||
raise "invalid client_type: #{client_type}" if client_type != 'client' && client_type != 'browser'
|
||||
|
|
@ -218,8 +198,8 @@ SQL
|
|||
|
||||
lock_connections(conn)
|
||||
|
||||
conn.exec("INSERT INTO connections (user_id, client_id, channel_id, ip_address, client_type, addr, locidispid, aasm_state, stale_time, expire_time, udp_reachable) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)",
|
||||
[user_id, client_id, channel_id, ip_address, client_type, addr, locidispid, Connection::CONNECT_STATE.to_s, connection_stale_time, connection_expire_time, udp_reachable]).clear
|
||||
conn.exec("INSERT INTO connections (user_id, client_id, channel_id, ip_address, client_type, addr, locidispid, aasm_state, stale_time, expire_time, udp_reachable, gateway) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)",
|
||||
[user_id, client_id, channel_id, ip_address, client_type, addr, locidispid, Connection::CONNECT_STATE.to_s, connection_stale_time, connection_expire_time, udp_reachable, gateway]).clear
|
||||
|
||||
# we just created a new connection-if this is the first time the user has shown up, we need to send out a message to his friends
|
||||
conn.exec("SELECT count(user_id) FROM connections WHERE user_id = $1", [user_id]) do |result|
|
||||
|
|
|
|||
|
|
@ -23,6 +23,8 @@ module JamRuby
|
|||
ip_address = options[:ip_address]
|
||||
connection_stale_time = options[:connection_stale_time]
|
||||
connection_expire_time = options[:connection_expire_time]
|
||||
gateway = options[:gateway]
|
||||
|
||||
# first try to find a LatencyTester with that client_id
|
||||
latency_tester = LatencyTester.find_by_client_id(client_id)
|
||||
|
||||
|
|
@ -71,11 +73,16 @@ module JamRuby
|
|||
connection.as_musician = false
|
||||
connection.channel_id = channel_id
|
||||
connection.scoring_timeout = Time.now
|
||||
connection.gateway = gateway
|
||||
unless connection.save
|
||||
return connection
|
||||
end
|
||||
|
||||
return latency_tester
|
||||
end
|
||||
|
||||
def to_s
|
||||
client_id
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
|||
|
|
@ -184,6 +184,7 @@ FactoryGirl.define do
|
|||
addr 0
|
||||
locidispid 0
|
||||
client_type 'client'
|
||||
gateway 'gateway1'
|
||||
last_jam_audio_latency { user.last_jam_audio_latency if user }
|
||||
sequence(:channel_id) { |n| "Channel#{n}"}
|
||||
association :user, factory: :user
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ describe ConnectionManager, no_transaction: true do
|
|||
EXPIRE_TIME = 60
|
||||
STALE_BUT_NOT_EXPIRED = 50
|
||||
DEFINITELY_EXPIRED = 70
|
||||
GATEWAY = 'gateway1'
|
||||
REACHABLE = true
|
||||
|
||||
let(:channel_id) {'1'}
|
||||
|
|
@ -49,8 +50,8 @@ describe ConnectionManager, no_transaction: true do
|
|||
user.save!
|
||||
user = nil
|
||||
|
||||
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE)
|
||||
expect { @connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE) }.to raise_error(PG::Error)
|
||||
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY)
|
||||
expect { @connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY) }.to raise_error(PG::Error)
|
||||
end
|
||||
|
||||
it "create connection then delete it" do
|
||||
|
|
@ -59,7 +60,7 @@ describe ConnectionManager, no_transaction: true do
|
|||
#user_id = create_user("test", "user2", "user2@jamkazam.com")
|
||||
user = FactoryGirl.create(:user)
|
||||
|
||||
count = @connman.create_connection(user.id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE)
|
||||
count = @connman.create_connection(user.id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY)
|
||||
|
||||
count.should == 1
|
||||
|
||||
|
|
@ -89,7 +90,7 @@ describe ConnectionManager, no_transaction: true do
|
|||
#user_id = create_user("test", "user2", "user2@jamkazam.com")
|
||||
user = FactoryGirl.create(:user)
|
||||
|
||||
count = @connman.create_connection(user.id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE)
|
||||
count = @connman.create_connection(user.id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY)
|
||||
|
||||
count.should == 1
|
||||
|
||||
|
|
@ -106,7 +107,7 @@ describe ConnectionManager, no_transaction: true do
|
|||
cc.locidispid.should == 17192000002
|
||||
cc.udp_reachable.should == true
|
||||
|
||||
@connman.reconnect(cc, channel_id, nil, "33.1.2.3", STALE_TIME, EXPIRE_TIME, false)
|
||||
@connman.reconnect(cc, channel_id, nil, "33.1.2.3", STALE_TIME, EXPIRE_TIME, false, GATEWAY)
|
||||
|
||||
cc = Connection.find_by_client_id!(client_id)
|
||||
cc.connected?.should be_true
|
||||
|
|
@ -129,7 +130,7 @@ describe ConnectionManager, no_transaction: true do
|
|||
#user_id = create_user("test", "user2", "user2@jamkazam.com")
|
||||
user = FactoryGirl.create(:user)
|
||||
|
||||
count = @connman.create_connection(user.id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, false)
|
||||
count = @connman.create_connection(user.id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, false, GATEWAY)
|
||||
|
||||
count.should == 1
|
||||
|
||||
|
|
@ -146,7 +147,7 @@ describe ConnectionManager, no_transaction: true do
|
|||
cc.locidispid.should == 17192000002
|
||||
cc.udp_reachable.should == false
|
||||
|
||||
@connman.reconnect(cc, channel_id, nil, "33.1.2.3", STALE_TIME, EXPIRE_TIME, nil) # heartbeat passes nil in for udp_reachable
|
||||
@connman.reconnect(cc, channel_id, nil, "33.1.2.3", STALE_TIME, EXPIRE_TIME, nil, GATEWAY) # heartbeat passes nil in for udp_reachable
|
||||
|
||||
cc = Connection.find_by_client_id!(client_id)
|
||||
cc.connected?.should be_true
|
||||
|
|
@ -260,12 +261,12 @@ describe ConnectionManager, no_transaction: true do
|
|||
it "flag stale connection" do
|
||||
client_id = "client_id8"
|
||||
user_id = create_user("test", "user8", "user8@jamkazam.com")
|
||||
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE)
|
||||
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY)
|
||||
|
||||
num = JamRuby::Connection.count(:conditions => ['aasm_state = ?','connected'])
|
||||
num.should == 1
|
||||
assert_num_connections(client_id, num)
|
||||
@connman.flag_stale_connections()
|
||||
@connman.flag_stale_connections(GATEWAY)
|
||||
assert_num_connections(client_id, num)
|
||||
|
||||
conn = Connection.find_by_client_id(client_id)
|
||||
|
|
@ -274,7 +275,7 @@ describe ConnectionManager, no_transaction: true do
|
|||
num = JamRuby::Connection.count(:conditions => ["updated_at < (NOW() - interval '#{1} second') AND aasm_state = 'connected'"])
|
||||
num.should == 1
|
||||
# this should change the aasm_state to stale
|
||||
@connman.flag_stale_connections()
|
||||
@connman.flag_stale_connections(GATEWAY)
|
||||
|
||||
num = JamRuby::Connection.count(:conditions => ["updated_at < (NOW() - interval '#{1} second') AND aasm_state = 'connected'"])
|
||||
num.should == 0
|
||||
|
|
@ -286,7 +287,7 @@ describe ConnectionManager, no_transaction: true do
|
|||
conn = Connection.find_by_client_id(client_id)
|
||||
set_updated_at(conn, Time.now - DEFINITELY_EXPIRED)
|
||||
|
||||
cids = @connman.stale_connection_client_ids()
|
||||
cids = @connman.stale_connection_client_ids(GATEWAY)
|
||||
cids.size.should == 1
|
||||
cids[0][:client_id].should == client_id
|
||||
cids[0][:client_type].should == Connection::TYPE_CLIENT
|
||||
|
|
@ -301,21 +302,21 @@ describe ConnectionManager, no_transaction: true do
|
|||
it "expires stale connection" do
|
||||
client_id = "client_id8"
|
||||
user_id = create_user("test", "user8", "user8@jamkazam.com")
|
||||
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE)
|
||||
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY)
|
||||
|
||||
conn = Connection.find_by_client_id(client_id)
|
||||
set_updated_at(conn, Time.now - STALE_BUT_NOT_EXPIRED)
|
||||
|
||||
@connman.flag_stale_connections
|
||||
@connman.flag_stale_connections(GATEWAY)
|
||||
assert_num_connections(client_id, 1)
|
||||
# assert_num_connections(client_id, JamRuby::Connection.count(:conditions => ['aasm_state = ?','stale']))
|
||||
|
||||
@connman.expire_stale_connections
|
||||
@connman.expire_stale_connections(GATEWAY)
|
||||
assert_num_connections(client_id, 1)
|
||||
|
||||
set_updated_at(conn, Time.now - DEFINITELY_EXPIRED)
|
||||
# this should delete the stale connection
|
||||
@connman.expire_stale_connections
|
||||
@connman.expire_stale_connections(GATEWAY)
|
||||
assert_num_connections(client_id, 0)
|
||||
end
|
||||
|
||||
|
|
@ -327,7 +328,7 @@ describe ConnectionManager, no_transaction: true do
|
|||
music_session_id = music_session.id
|
||||
user = User.find(user_id)
|
||||
|
||||
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE)
|
||||
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY)
|
||||
connection = @connman.join_music_session(user, client_id, music_session, true, TRACKS, 10)
|
||||
|
||||
connection.errors.any?.should be_false
|
||||
|
|
@ -363,8 +364,8 @@ describe ConnectionManager, no_transaction: true do
|
|||
client_id2 = "client_id10.12"
|
||||
user_id = create_user("test", "user10.11", "user10.11@jamkazam.com", :musician => true)
|
||||
user_id2 = create_user("test", "user10.12", "user10.12@jamkazam.com", :musician => false)
|
||||
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE)
|
||||
@connman.create_connection(user_id2, client_id2, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE)
|
||||
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY)
|
||||
@connman.create_connection(user_id2, client_id2, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY)
|
||||
|
||||
music_session = FactoryGirl.create(:active_music_session, user_id: user_id)
|
||||
music_session_id = music_session.id
|
||||
|
|
@ -383,7 +384,7 @@ describe ConnectionManager, no_transaction: true do
|
|||
client_id = "client_id10.2"
|
||||
|
||||
user_id = create_user("test", "user10.2", "user10.2@jamkazam.com")
|
||||
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE)
|
||||
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY)
|
||||
|
||||
music_session = FactoryGirl.create(:active_music_session, user_id: user_id)
|
||||
user = User.find(user_id)
|
||||
|
|
@ -399,8 +400,8 @@ describe ConnectionManager, no_transaction: true do
|
|||
fan_client_id = "client_id10.4"
|
||||
musician_id = create_user("test", "user10.3", "user10.3@jamkazam.com")
|
||||
fan_id = create_user("test", "user10.4", "user10.4@jamkazam.com", :musician => false)
|
||||
@connman.create_connection(musician_id, musician_client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE)
|
||||
@connman.create_connection(fan_id, fan_client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE)
|
||||
@connman.create_connection(musician_id, musician_client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY)
|
||||
@connman.create_connection(fan_id, fan_client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY)
|
||||
|
||||
music_session = FactoryGirl.create(:active_music_session, :fan_access => false, user_id: musician_id)
|
||||
music_session_id = music_session.id
|
||||
|
|
@ -424,7 +425,7 @@ describe ConnectionManager, no_transaction: true do
|
|||
music_session_id = music_session.id
|
||||
user = User.find(user_id2)
|
||||
|
||||
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE)
|
||||
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY)
|
||||
# specify real user id, but not associated with this session
|
||||
expect { @connman.join_music_session(user, client_id, music_session, true, TRACKS, 10) } .to raise_error(ActiveRecord::RecordNotFound)
|
||||
end
|
||||
|
|
@ -436,7 +437,7 @@ describe ConnectionManager, no_transaction: true do
|
|||
user = User.find(user_id)
|
||||
music_session = ActiveMusicSession.new
|
||||
|
||||
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE)
|
||||
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY)
|
||||
connection = @connman.join_music_session(user, client_id, music_session, true, TRACKS, 10)
|
||||
connection.errors.size.should == 1
|
||||
connection.errors.get(:music_session).should == [ValidationMessages::MUSIC_SESSION_MUST_BE_SPECIFIED]
|
||||
|
|
@ -450,7 +451,7 @@ describe ConnectionManager, no_transaction: true do
|
|||
music_session_id = music_session.id
|
||||
user = User.find(user_id2)
|
||||
|
||||
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE)
|
||||
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY)
|
||||
# specify real user id, but not associated with this session
|
||||
expect { @connman.join_music_session(user, client_id, music_session, true, TRACKS, 10) } .to raise_error(ActiveRecord::RecordNotFound)
|
||||
end
|
||||
|
|
@ -464,7 +465,7 @@ describe ConnectionManager, no_transaction: true do
|
|||
user = User.find(user_id)
|
||||
dummy_music_session = ActiveMusicSession.new
|
||||
|
||||
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE)
|
||||
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY)
|
||||
|
||||
expect { @connman.leave_music_session(user, Connection.find_by_client_id(client_id), dummy_music_session) }.to raise_error(JamRuby::StateError)
|
||||
end
|
||||
|
|
@ -479,7 +480,7 @@ describe ConnectionManager, no_transaction: true do
|
|||
|
||||
dummy_music_session = ActiveMusicSession.new
|
||||
|
||||
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE)
|
||||
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY)
|
||||
@connman.join_music_session(user, client_id, music_session, true, TRACKS, 10)
|
||||
expect { @connman.leave_music_session(user, Connection.find_by_client_id(client_id), dummy_music_session) }.to raise_error(JamRuby::StateError)
|
||||
end
|
||||
|
|
@ -492,7 +493,7 @@ describe ConnectionManager, no_transaction: true do
|
|||
music_session_id = music_session.id
|
||||
user = User.find(user_id)
|
||||
|
||||
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE)
|
||||
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY)
|
||||
@connman.join_music_session(user, client_id, music_session, true, TRACKS, 10)
|
||||
|
||||
assert_session_exists(music_session_id, true)
|
||||
|
|
@ -535,7 +536,7 @@ describe ConnectionManager, no_transaction: true do
|
|||
user = User.find(user_id)
|
||||
|
||||
client_id1 = Faker::Number.number(20)
|
||||
@connman.create_connection(user_id, client_id1, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE)
|
||||
@connman.create_connection(user_id, client_id1, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY)
|
||||
music_session1 = FactoryGirl.create(:active_music_session, :user_id => user_id)
|
||||
connection1 = @connman.join_music_session(user, client_id1, music_session1, true, TRACKS, 10)
|
||||
connection1.errors.size.should == 0
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@ require 'spec_helper'
|
|||
|
||||
describe LatencyTester do
|
||||
|
||||
let(:params) {{client_id: 'abc', ip_address: '10.1.1.1', connection_stale_time:40, connection_expire_time:60, channel_id: '1'} }
|
||||
let(:params) {{client_id: 'abc', ip_address: '10.1.1.1', connection_stale_time:40, connection_expire_time:60, channel_id: '1', gateway: 'gateway1'} }
|
||||
|
||||
it "success" do
|
||||
latency_tester = FactoryGirl.create(:latency_tester)
|
||||
|
|
|
|||
|
|
@ -17,7 +17,8 @@ unless $rails_rake_task
|
|||
:rabbitmq_host => APP_CONFIG.rabbitmq_host,
|
||||
:rabbitmq_port => APP_CONFIG.rabbitmq_port,
|
||||
:calling_thread => current,
|
||||
:cidr => APP_CONFIG.websocket_gateway_cidr)
|
||||
:cidr => APP_CONFIG.websocket_gateway_cidr,
|
||||
:gateway_name => "default-#{ENV["JAM_INSTANCE"] || 1}")
|
||||
end
|
||||
Thread.stop
|
||||
end
|
||||
|
|
|
|||
|
|
@ -207,6 +207,7 @@ FactoryGirl.define do
|
|||
addr {JamIsp.ip_to_num(ip_address)}
|
||||
locidispid 0
|
||||
client_type 'client'
|
||||
gateway 'gateway1'
|
||||
scoring_timeout Time.now
|
||||
sequence(:channel_id) { |n| "Channel#{n}"}
|
||||
end
|
||||
|
|
|
|||
|
|
@ -86,7 +86,8 @@ Thread.new do
|
|||
:rabbitmq_host => 'localhost',
|
||||
:rabbitmq_port => 5672,
|
||||
:calling_thread => current,
|
||||
:cidr => ['0.0.0.0/0'])
|
||||
:cidr => ['0.0.0.0/0'],
|
||||
:gateway_name => 'default')
|
||||
rescue Exception => e
|
||||
puts "websocket-gateway failed: #{e}"
|
||||
end
|
||||
|
|
|
|||
|
|
@ -14,6 +14,13 @@ db_config = YAML::load(File.open(db_config_file))[jamenv]
|
|||
|
||||
ActiveRecord::Base.establish_connection(db_config)
|
||||
|
||||
jam_instance = ENV['JAM_INSTANCE'] || 1
|
||||
jam_instance = jam_instance.to_i
|
||||
|
||||
if jam_instance == 0
|
||||
puts "JAM INSTANCE MUST BE > 0"
|
||||
exit 1
|
||||
end
|
||||
|
||||
# now bring in the Jam code
|
||||
require 'jam_websockets'
|
||||
|
|
@ -33,11 +40,11 @@ require "#{Dir.pwd}/config/application.rb"
|
|||
|
||||
|
||||
if jamenv == "production"
|
||||
ENV['NEW_RELIC_LOG'] = '/var/log/websocket-gateway/newrelic_agent.log'
|
||||
ENV['NEW_RELIC_LOG'] = "/var/log/websocket-gateway/newrelic_agent-#{jam_instance}.log"
|
||||
one_meg = 1024 * 1024
|
||||
Logging.logger.root.appenders = Logging.appenders.rolling_file("log/#{jamenv}.log", :truncate=>true, :age=>'daily', :size=>one_meg, :keep=>20)
|
||||
Logging.logger.root.appenders = Logging.appenders.rolling_file("/var/log/websocket-gateway/#{jamenv}-#{jam_instance}.log", :truncate=>true, :age=>'daily', :size=>one_meg, :keep=>20, :layout => Logging.layouts.pattern(:pattern => '[%d] %-5l: %m\n'))
|
||||
else
|
||||
ENV['NEW_RELIC_LOG'] = "#{Dir.pwd}/log/newrelic_agent.log"
|
||||
ENV['NEW_RELIC_LOG'] = "#{Dir.pwd}/log/newrelic_agent-#{jam_instance}.log"
|
||||
Logging.logger.root.appenders = Logging.appenders.stdout
|
||||
end
|
||||
|
||||
|
|
@ -47,7 +54,11 @@ require 'newrelic_rpm'
|
|||
Object.send(:remove_const, :Rails) # this is to 'fool' new relic into not thinking this is a Rails app.
|
||||
::NewRelic::Agent.manual_start
|
||||
|
||||
Server.new.run(:port => config["port"],
|
||||
# determine gateway_name
|
||||
gateway_name = ENV['GATEWAY_NAME'] || 'default'
|
||||
gateway_name = "#{gateway_name}-#{jam_instance}"
|
||||
|
||||
Server.new.run(:port => config["port"] + (jam_instance-1 ) * 2,
|
||||
:emwebsocket_debug => config["emwebsocket_debug"],
|
||||
:connect_time_stale_client => config["connect_time_stale_client"],
|
||||
:connect_time_expire_client => config["connect_time_expire_client"],
|
||||
|
|
@ -56,4 +67,5 @@ Server.new.run(:port => config["port"],
|
|||
:max_connections_per_user => config["max_connections_per_user"],
|
||||
:rabbitmq_host => config['rabbitmq_host'],
|
||||
:rabbitmq_port => config['rabbitmq_port'],
|
||||
:cidr => config['cidr'])
|
||||
:cidr => config['cidr'],
|
||||
:gateway_name => gateway_name)
|
||||
|
|
|
|||
|
|
@ -0,0 +1,57 @@
|
|||
global
|
||||
maxconn 4096
|
||||
pidfile ~/tmp/haproxy-queue.pid
|
||||
|
||||
defaults
|
||||
log global
|
||||
log 127.0.0.1 local0
|
||||
log 127.0.0.1 local1 notice
|
||||
mode tcp
|
||||
option httplog
|
||||
option http-server-close
|
||||
#option dontlognull
|
||||
option redispatch
|
||||
option contstats
|
||||
retries 3
|
||||
backlog 10000
|
||||
timeout client 25s
|
||||
timeout connect 5s
|
||||
timeout server 25s
|
||||
# timeout tunnel available in ALOHA 5.5 or HAProxy 1.5-dev10 and higher
|
||||
timeout tunnel 3600s
|
||||
timeout http-keep-alive 1s
|
||||
timeout http-request 15s
|
||||
timeout queue 30s
|
||||
timeout tarpit 60s
|
||||
default-server inter 3s rise 2 fall 3
|
||||
option forwardfor
|
||||
|
||||
frontend gateways
|
||||
bind *:6767
|
||||
default_backend bk_ws
|
||||
|
||||
backend bk_ws
|
||||
balance leastconn
|
||||
|
||||
## websocket protocol validation
|
||||
# acl hdr_connection_upgrade hdr(Connection) -i upgrade
|
||||
# acl hdr_upgrade_websocket hdr(Upgrade) -i websocket
|
||||
# acl hdr_websocket_key hdr_cnt(Sec-WebSocket-Key) eq 1
|
||||
# acl hdr_websocket_version hdr_cnt(Sec-WebSocket-Version) eq 1
|
||||
# acl hdr_host hdr_cnt(Sec-WebSocket-Version) eq 1
|
||||
# http-request deny if ! hdr_connection_upgrade ! hdr_upgrade_websocket ! hdr_w
|
||||
#ebsocket_key ! hdr_websocket_version ! hdr_host
|
||||
|
||||
## ensure our application protocol name is valid
|
||||
## (don't forget to update the list each time you publish new applications)
|
||||
acl ws_valid_protocol hdr(Sec-WebSocket-Protocol) echo-protocol
|
||||
http-request deny if ! ws_valid_protocol
|
||||
|
||||
## websocket health checking
|
||||
#option httpchk GET / HTTP/1.1\r\nHost:\ ws.domain.com\r\nConnection:\ Upgrade
|
||||
#\r\nUpgrade:\ websocket\r\nSec-WebSocket-Key:\ haproxy\r\nSec-WebSocket-Version
|
||||
# <span class="wp-smiley emoji emoji-uneasy" title=":\">:\</span> 13\r\nSec-WebSocket-Protocol:\ echo-protocol
|
||||
# http-check expect status 101
|
||||
|
||||
server websrv1 127.0.0.1:6769 maxconn 1000 weight 10 cookie gateway1 check
|
||||
server websrv2 127.0.0.1:6771 maxconn 1000 weight 10 cookie gateway2 check
|
||||
|
|
@ -27,7 +27,9 @@ module JamWebsockets
|
|||
:heartbeat_interval_browser,
|
||||
:connect_time_expire_browser,
|
||||
:connect_time_stale_browser,
|
||||
:max_connections_per_user
|
||||
:max_connections_per_user,
|
||||
:gateway_name,
|
||||
:client_lookup
|
||||
|
||||
def initialize()
|
||||
@log = Logging.logger[self]
|
||||
|
|
@ -47,10 +49,12 @@ module JamWebsockets
|
|||
@heartbeat_interval_browser= nil
|
||||
@connect_time_expire_browser= nil
|
||||
@connect_time_stale_browser= nil
|
||||
@gateway_name = nil
|
||||
@ar_base_logger = ::Logging::Repository.instance[ActiveRecord::Base]
|
||||
@message_stats = {}
|
||||
end
|
||||
|
||||
def start(connect_time_stale_client, connect_time_expire_client, connect_time_stale_browser, connect_time_expire_browser, options={:host => "localhost", :port => 5672, :max_connections_per_user => 10}, &block)
|
||||
def start(connect_time_stale_client, connect_time_expire_client, connect_time_stale_browser, connect_time_expire_browser, options={:host => "localhost", :port => 5672, :max_connections_per_user => 10, :gateway => 'default'}, &block)
|
||||
|
||||
@log.info "startup"
|
||||
|
||||
|
|
@ -61,6 +65,7 @@ module JamWebsockets
|
|||
@connect_time_stale_browser = connect_time_stale_browser
|
||||
@connect_time_expire_browser = connect_time_expire_browser
|
||||
@max_connections_per_user = options[:max_connections_per_user]
|
||||
@gateway_name = options[:gateway]
|
||||
|
||||
begin
|
||||
@amqp_connection_manager = AmqpConnectionManager.new(true, 4, :host => options[:host], :port => options[:port])
|
||||
|
|
@ -205,19 +210,19 @@ module JamWebsockets
|
|||
|
||||
msg = Jampb::ClientMessage.parse(msg)
|
||||
|
||||
@log.debug "client-directed message received from #{msg.from} to client #{client_id}"
|
||||
@log.debug "client-directed message received from #{msg.from} to client #{client_id}" unless msg.type == ClientMessage::Type::PEER_MESSAGE
|
||||
|
||||
unless client.nil?
|
||||
|
||||
EM.schedule do
|
||||
@log.debug "sending client-directed down websocket to #{client_id}"
|
||||
@log.debug "sending client-directed down websocket to #{client_id}" unless msg.type == ClientMessage::Type::PEER_MESSAGE
|
||||
send_to_client(client, msg)
|
||||
end
|
||||
else
|
||||
@log.debug "client-directed message unroutable to disconnected client #{client_id}"
|
||||
end
|
||||
else
|
||||
@log.debug "Can't route message: no client connected with id #{client_id}"
|
||||
#@log.debug "Can't route message: no client connected with id #{client_id}" this happens all the time in multi-websocket scenarios
|
||||
end
|
||||
end
|
||||
|
||||
|
|
@ -329,7 +334,7 @@ module JamWebsockets
|
|||
|
||||
|
||||
def send_to_client(client, msg)
|
||||
@log.debug "SEND TO CLIENT (#{@message_factory.get_message_type(msg)})" unless msg.type == ClientMessage::Type::HEARTBEAT_ACK
|
||||
@log.debug "SEND TO CLIENT (#{@message_factory.get_message_type(msg)})" unless msg.type == ClientMessage::Type::HEARTBEAT_ACK || msg.type == ClientMessage::Type::PEER_MESSAGE
|
||||
if client.encode_json
|
||||
client.send(msg.to_json.to_s)
|
||||
else
|
||||
|
|
@ -371,66 +376,6 @@ module JamWebsockets
|
|||
end
|
||||
end
|
||||
|
||||
def cleanup_clients_with_ids(expired_connections)
|
||||
expired_connections.each do |expired_connection|
|
||||
cid = expired_connection[:client_id]
|
||||
|
||||
client_context = @client_lookup[cid]
|
||||
|
||||
if client_context
|
||||
Diagnostic.expired_stale_connection(client_context.user.id, client_context)
|
||||
cleanup_client(client_context.client)
|
||||
end
|
||||
|
||||
music_session = nil
|
||||
recording_id = nil
|
||||
user = nil
|
||||
|
||||
# remove this connection from the database
|
||||
ConnectionManager.active_record_transaction do |mgr|
|
||||
mgr.delete_connection(cid) { |conn, count, music_session_id, user_id|
|
||||
@log.info "expiring stale connection client_id:#{cid}, user_id:#{user_id}"
|
||||
Notification.send_friend_update(user_id, false, conn) if count == 0
|
||||
music_session = ActiveMusicSession.find_by_id(music_session_id) unless music_session_id.nil?
|
||||
user = User.find_by_id(user_id) unless user_id.nil?
|
||||
recording = music_session.stop_recording unless music_session.nil? # stop any ongoing recording, if there is one
|
||||
recording_id = recording.id unless recording.nil?
|
||||
music_session.with_lock do # VRFS-1297
|
||||
music_session.tick_track_changes
|
||||
end if music_session
|
||||
}
|
||||
end
|
||||
|
||||
if user && music_session
|
||||
Notification.send_session_depart(music_session, cid, user, recording_id)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
# removes all resources associated with a client
|
||||
def cleanup_client(client)
|
||||
client.close
|
||||
|
||||
@semaphore.synchronize do
|
||||
pending = client.context.nil? # presence of context implies this connection has been logged into
|
||||
|
||||
if pending
|
||||
@log.debug "cleaned up not-logged-in client #{client}"
|
||||
else
|
||||
@log.debug "cleanup up logged-in client #{client}"
|
||||
|
||||
context = @clients.delete(client)
|
||||
|
||||
if context
|
||||
remove_client(client.client_id)
|
||||
remove_user(context)
|
||||
else
|
||||
@log.warn "skipping duplicate cleanup attempt of logged-in client"
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def route(client_msg, client)
|
||||
message_type = @message_factory.get_message_type(client_msg)
|
||||
if message_type.nil?
|
||||
|
|
@ -438,7 +383,9 @@ module JamWebsockets
|
|||
raise SessionError, "unknown message type received: #{client_msg.type}" if message_type.nil?
|
||||
end
|
||||
|
||||
@log.debug("msg received #{message_type}") if client_msg.type != ClientMessage::Type::HEARTBEAT
|
||||
@message_stats[message_type] = @message_stats[message_type].to_i + 1
|
||||
|
||||
@log.debug("msg received #{message_type}") if client_msg.type != ClientMessage::Type::HEARTBEAT && client_msg.type != ClientMessage::Type::HEARTBEAT_ACK && client_msg.type != ClientMessage::Type::PEER_MESSAGE
|
||||
|
||||
if client_msg.route_to.nil?
|
||||
Diagnostic.missing_route_to(client.user_id, client_msg)
|
||||
|
|
@ -535,7 +482,9 @@ module JamWebsockets
|
|||
channel_id: client.channel_id,
|
||||
ip_address: remote_ip,
|
||||
connection_stale_time: connection_stale_time,
|
||||
connection_expire_time: connection_expire_time})
|
||||
connection_expire_time: connection_expire_time,
|
||||
gateway: @gateway_name
|
||||
})
|
||||
if latency_tester.errors.any?
|
||||
@log.warn "unable to log in latency_tester with errors: #{latency_tester.errors.inspect}"
|
||||
raise SessionError, "invalid login: #{latency_tester.errors.inspect}"
|
||||
|
|
@ -639,7 +588,7 @@ module JamWebsockets
|
|||
recording_id = nil
|
||||
|
||||
ConnectionManager.active_record_transaction do |connection_manager|
|
||||
music_session_id, reconnected = connection_manager.reconnect(connection, client.channel_id, reconnect_music_session_id, remote_ip, connection_stale_time, connection_expire_time, udp_reachable)
|
||||
music_session_id, reconnected = connection_manager.reconnect(connection, client.channel_id, reconnect_music_session_id, remote_ip, connection_stale_time, connection_expire_time, udp_reachable, @gateway_name)
|
||||
|
||||
if music_session_id.nil?
|
||||
# if this is a reclaim of a connection, but music_session_id comes back null, then we need to check if this connection was IN a music session before.
|
||||
|
|
@ -675,7 +624,7 @@ module JamWebsockets
|
|||
unless connection
|
||||
# log this connection in the database
|
||||
ConnectionManager.active_record_transaction do |connection_manager|
|
||||
connection_manager.create_connection(user.id, client.client_id, client.channel_id, remote_ip, client_type, connection_stale_time, connection_expire_time, udp_reachable) do |conn, count|
|
||||
connection_manager.create_connection(user.id, client.client_id, client.channel_id, remote_ip, client_type, connection_stale_time, connection_expire_time, udp_reachable, @gateway_name) do |conn, count|
|
||||
user.update_addr_loc(Connection.find_by_client_id(client.client_id), User::JAM_REASON_LOGIN)
|
||||
if count == 1
|
||||
Notification.send_friend_update(user.id, true, conn)
|
||||
|
|
@ -737,7 +686,7 @@ module JamWebsockets
|
|||
if connection.stale?
|
||||
ConnectionManager.active_record_transaction do |connection_manager|
|
||||
heartbeat_interval, connection_stale_time, connection_expire_time = determine_connection_times(context.user, context.client_type)
|
||||
connection_manager.reconnect(connection, client.channel_id, connection.music_session_id, nil, connection_stale_time, connection_expire_time, nil)
|
||||
connection_manager.reconnect(connection, client.channel_id, connection.music_session_id, nil, connection_stale_time, connection_expire_time, nil, @gateway_name)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
@ -883,7 +832,7 @@ module JamWebsockets
|
|||
# populate routing data
|
||||
client_msg.from = client.client_id
|
||||
|
||||
@log.debug "publishing to client #{to_client_id} from client_id #{client.client_id}"
|
||||
@log.debug "publishing to client #{to_client_id} from client_id #{client.client_id}" unless client_msg.type == ClientMessage::Type::PEER_MESSAGE
|
||||
|
||||
# put it on the topic exchange for clients
|
||||
@clients_exchange.publish(client_msg.to_s, :routing_key => "client.#{to_client_id}", :properties => {:headers => {"client_id" => client.client_id}})
|
||||
|
|
@ -937,6 +886,135 @@ module JamWebsockets
|
|||
Socket.unpack_sockaddr_in(client.get_peername)[1]
|
||||
end
|
||||
|
||||
def periodical_flag_connections
|
||||
# @log.debug("*** flag_stale_connections: fires each #{flag_max_time} seconds")
|
||||
ConnectionManager.active_record_transaction do |connection_manager|
|
||||
connection_manager.flag_stale_connections(@gateway_name)
|
||||
end
|
||||
end
|
||||
|
||||
def periodical_check_clients
|
||||
# it's possible that a client will not be represented in the database anymore, due to hard to trace/guess scenario
|
||||
# usually involve reconnects. Double-check that all clients in memory are actually in the database. if not, delete them from memory
|
||||
if @client_lookup.length == 0
|
||||
return
|
||||
end
|
||||
|
||||
client_ids = @client_lookup.map { | client_id, info | "('#{client_id}')" }.join(',')
|
||||
|
||||
# find all client_id's that do not have a row in the db, and whack them
|
||||
# this style of query does the following: https://gist.github.com/sethcall/15308ccde298bff74584
|
||||
sql = "WITH app_client_ids(client_id) AS (VALUES#{client_ids})
|
||||
SELECT client_id from app_client_ids WHERE client_id NOT IN (SELECT client_id FROM connections WHERE gateway = '#{@gateway_name}');
|
||||
"
|
||||
ConnectionManager.active_record_transaction do |connection_manager|
|
||||
conn = connection_manager.pg_conn
|
||||
conn.exec(sql) do |result|
|
||||
result.each { |row|
|
||||
client_id = row['client_id']
|
||||
context = @client_lookup[client_id]
|
||||
if context
|
||||
@log.debug("cleaning up missing client #{client_id}, #{context.user}")
|
||||
cleanup_client(context.client)
|
||||
else
|
||||
@log.error("could not clean up missing client #{client_id}")
|
||||
end
|
||||
}
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def periodical_check_connections
|
||||
# this method is designed to be called periodically (every few seconds)
|
||||
# in which this gateway instance will check only its own clients for their health
|
||||
|
||||
# since each gateway checks only the clients it knows about, this allows us to deploy
|
||||
# n gateways that don't know much about each other.
|
||||
|
||||
# each gateway marks each connection row with it's gateway ID (so each gateway needs it's own ID or bad things happen)
|
||||
|
||||
# we also have a global resque job that checks for connections that appear to be not controlled by any gateway
|
||||
# to make sure that we have stale connections cleaned up, even in the case of gateways that have crashed or are buggy
|
||||
|
||||
clients = []
|
||||
ConnectionManager.active_record_transaction do |connection_manager|
|
||||
clients = connection_manager.stale_connection_client_ids(@gateway_name)
|
||||
end
|
||||
cleanup_clients_with_ids(clients)
|
||||
end
|
||||
|
||||
def periodical_stats_dump
|
||||
# assume 60 seconds per status dump
|
||||
stats = @message_stats.sort_by{|k,v| -v}
|
||||
stats.map { |i| i[1] = (i[1] / 60.0).round(2) }
|
||||
|
||||
@log.info("msg/s: " + stats.map { |i| i.join('=>') }.join(', '));
|
||||
@message_stats.clear
|
||||
end
|
||||
|
||||
def cleanup_clients_with_ids(expired_connections)
|
||||
expired_connections.each do |expired_connection|
|
||||
cid = expired_connection[:client_id]
|
||||
|
||||
client_context = @client_lookup[cid]
|
||||
|
||||
if client_context
|
||||
Diagnostic.expired_stale_connection(client_context.user.id, client_context)
|
||||
cleanup_client(client_context.client)
|
||||
end
|
||||
|
||||
music_session = nil
|
||||
recording_id = nil
|
||||
user = nil
|
||||
|
||||
# remove this connection from the database
|
||||
ConnectionManager.active_record_transaction do |mgr|
|
||||
mgr.delete_connection(cid) { |conn, count, music_session_id, user_id|
|
||||
@log.info "expiring stale connection client_id:#{cid}, user_id:#{user_id}"
|
||||
Notification.send_friend_update(user_id, false, conn) if count == 0
|
||||
music_session = ActiveMusicSession.find_by_id(music_session_id) unless music_session_id.nil?
|
||||
user = User.find_by_id(user_id) unless user_id.nil?
|
||||
recording = music_session.stop_recording unless music_session.nil? # stop any ongoing recording, if there is one
|
||||
recording_id = recording.id unless recording.nil?
|
||||
if music_session
|
||||
music_session.with_lock do # VRFS-1297
|
||||
music_session.tick_track_changes
|
||||
end
|
||||
end
|
||||
}
|
||||
end
|
||||
|
||||
if user && music_session
|
||||
Notification.send_session_depart(music_session, cid, user, recording_id)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
# removes all resources associated with a client
|
||||
def cleanup_client(client)
|
||||
client.close
|
||||
|
||||
@semaphore.synchronize do
|
||||
pending = client.context.nil? # presence of context implies this connection has been logged into
|
||||
|
||||
if pending
|
||||
@log.debug "cleaned up not-logged-in client #{client}"
|
||||
else
|
||||
@log.debug "cleanup up logged-in client #{client}"
|
||||
|
||||
context = @clients.delete(client)
|
||||
|
||||
if context
|
||||
remove_client(client.client_id)
|
||||
remove_user(context)
|
||||
else
|
||||
@log.warn "skipping duplicate cleanup attempt of logged-in client"
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
private
|
||||
|
||||
def sane_logging(&blk)
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ module JamWebsockets
|
|||
connect_time_stale_browser = options[:connect_time_stale_browser].to_i
|
||||
connect_time_expire_browser = options[:connect_time_expire_browser].to_i
|
||||
max_connections_per_user = options[:max_connections_per_user].to_i
|
||||
gateway_name = options[:gateway_name]
|
||||
rabbitmq_host = options[:rabbitmq_host]
|
||||
rabbitmq_port = options[:rabbitmq_port].to_i
|
||||
calling_thread = options[:calling_thread]
|
||||
|
|
@ -34,9 +35,11 @@ module JamWebsockets
|
|||
}
|
||||
|
||||
EventMachine.run do
|
||||
@router.start(connect_time_stale_client, connect_time_expire_client, connect_time_stale_browser, connect_time_expire_browser, host: rabbitmq_host, port: rabbitmq_port, max_connections_per_user: max_connections_per_user) do
|
||||
@router.start(connect_time_stale_client, connect_time_expire_client, connect_time_stale_browser, connect_time_expire_browser, host: rabbitmq_host, port: rabbitmq_port, max_connections_per_user: max_connections_per_user, gateway: gateway_name) do
|
||||
start_connection_expiration
|
||||
start_client_expiration
|
||||
start_connection_flagger
|
||||
start_stats_dump
|
||||
start_websocket_listener(host, port, trust_port, trust_check, options[:emwebsocket_debug])
|
||||
calling_thread.wakeup if calling_thread
|
||||
end
|
||||
|
|
@ -55,7 +58,7 @@ module JamWebsockets
|
|||
|
||||
def start_websocket_listener(listen_ip, port, trust_port, trust_check, emwebsocket_debug)
|
||||
EventMachine::WebSocket.run(:host => listen_ip, :port => port, :debug => emwebsocket_debug) do |ws|
|
||||
@log.info "new client #{ws}"
|
||||
#@log.info "new client #{ws}"
|
||||
@router.new_client(ws, false)
|
||||
end
|
||||
EventMachine::WebSocket.run(:host => listen_ip, :port => trust_port, :debug => emwebsocket_debug) do |ws|
|
||||
|
|
@ -74,35 +77,34 @@ module JamWebsockets
|
|||
|
||||
def start_connection_expiration
|
||||
# one cleanup on startup
|
||||
expire_stale_connections
|
||||
@router.periodical_check_connections
|
||||
|
||||
EventMachine::PeriodicTimer.new(2) do
|
||||
sane_logging { expire_stale_connections }
|
||||
sane_logging { @router.periodical_check_connections }
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
def expire_stale_connections
|
||||
clients = []
|
||||
ConnectionManager.active_record_transaction do |connection_manager|
|
||||
clients = connection_manager.stale_connection_client_ids
|
||||
def start_client_expiration
|
||||
# one cleanup on startup
|
||||
@router.periodical_check_clients
|
||||
|
||||
EventMachine::PeriodicTimer.new(30) do
|
||||
sane_logging { @router.periodical_check_clients }
|
||||
end
|
||||
@router.cleanup_clients_with_ids(clients)
|
||||
end
|
||||
|
||||
def start_connection_flagger
|
||||
# one cleanup on startup
|
||||
flag_stale_connections
|
||||
@router.periodical_flag_connections
|
||||
|
||||
EventMachine::PeriodicTimer.new(2) do
|
||||
sane_logging { flag_stale_connections }
|
||||
sane_logging { @router.periodical_flag_connections }
|
||||
end
|
||||
end
|
||||
|
||||
def flag_stale_connections()
|
||||
# @log.debug("*** flag_stale_connections: fires each #{flag_max_time} seconds")
|
||||
ConnectionManager.active_record_transaction do |connection_manager|
|
||||
connection_manager.flag_stale_connections
|
||||
def start_stats_dump
|
||||
EventMachine::PeriodicTimer.new(60) do
|
||||
@router.periodical_stats_dump
|
||||
end
|
||||
end
|
||||
|
||||
|
|
|
|||
|
|
@ -1,17 +1,30 @@
|
|||
#!/bin/bash -l
|
||||
|
||||
# default config values
|
||||
BUILD_NUMBER=`cat /var/lib/websocket-gateway/BUILD_NUMBER`
|
||||
usage()
|
||||
{
|
||||
echo "pass one numerical argument representing the instance of this websocket-gateway"
|
||||
}
|
||||
|
||||
CONFIG_FILE="/etc/websocket-gateway/upstart.conf"
|
||||
if [ -e "$CONFIG_FILE" ]; then
|
||||
. "$CONFIG_FILE"
|
||||
fi
|
||||
usage main()
|
||||
{
|
||||
JAM_INSTANCE=$1
|
||||
|
||||
# I don't like doing this, but the next command (bundle exec) retouches/generates
|
||||
# the gemfile. This unfortunately means the next debian update doesn't update this file.
|
||||
# Ultimately this means an old Gemfile.lock is left behind for a new package,
|
||||
# and bundle won't run because it thinks it has the wrong versions of gems
|
||||
rm -f Gemfile.lock
|
||||
# default config values
|
||||
BUILD_NUMBER=`cat /var/lib/websocket-gateway/BUILD_NUMBER`
|
||||
|
||||
CONFIG_FILE="/etc/websocket-gateway/upstart.conf"
|
||||
if [ -e "$CONFIG_FILE" ]; then
|
||||
. "$CONFIG_FILE"
|
||||
fi
|
||||
|
||||
# I don't like doing this, but the next command (bundle exec) retouches/generates
|
||||
# the gemfile. This unfortunately means the next debian update doesn't update this file.
|
||||
# Ultimately this means an old Gemfile.lock is left behind for a new package,
|
||||
# and bundle won't run because it thinks it has the wrong versions of gems
|
||||
rm -f Gemfile.lock
|
||||
|
||||
JAM_INSTANCE=$JAM_INSTANCE BUILD_NUMBER=$BUILD_NUMBER JAMENV=production exec bundle exec ruby -Ilib bin/websocket_gateway
|
||||
}
|
||||
|
||||
[ "$#" -ne 1 ] && ( usage && exit 1 ) || main
|
||||
|
||||
BUILD_NUMBER=$BUILD_NUMBER JAMENV=production exec bundle exec ruby -Ilib bin/websocket_gateway
|
||||
|
|
|
|||
|
|
@ -1,18 +1,22 @@
|
|||
description "websocket-gateway"
|
||||
|
||||
start on startup
|
||||
start on runlevel [2345]
|
||||
stop on runlevel [016]
|
||||
#start on startup
|
||||
#start on runlevel [2345]
|
||||
# stop on runlevel [016]
|
||||
stop on stopping gateways
|
||||
|
||||
limit nofile 20000 20000
|
||||
limit core unlimited unlimited
|
||||
|
||||
respawn
|
||||
respawn limit 10 5
|
||||
|
||||
instance $N
|
||||
|
||||
pre-start script
|
||||
set -e
|
||||
mkdir -p /var/run/websocket-gateway
|
||||
chown websocket-gateway:websocket-gateway /var/run/websocket-gateway
|
||||
end script
|
||||
|
||||
exec start-stop-daemon --start --chuid websocket-gateway:websocket-gateway --chdir /var/lib/websocket-gateway --exec /var/lib/websocket-gateway/script/package/upstart-run.sh
|
||||
exec start-stop-daemon --start --chuid websocket-gateway:websocket-gateway --chdir /var/lib/websocket-gateway --exec /var/lib/websocket-gateway/script/package/upstart-run.sh $N
|
||||
|
|
|
|||
|
|
@ -91,6 +91,7 @@ FactoryGirl.define do
|
|||
ip_address '1.1.1.1'
|
||||
as_musician true
|
||||
client_type 'client'
|
||||
gateway 'gateway1'
|
||||
scoring_timeout Time.now
|
||||
sequence(:channel_id) { |n| "Channel#{n}"}
|
||||
end
|
||||
|
|
|
|||
|
|
@ -114,6 +114,7 @@ describe Router do
|
|||
@router.max_connections_per_user = 10
|
||||
@router.heartbeat_interval_browser = @router.connect_time_stale_browser / 2
|
||||
@router.amqp_connection_manager = AmqpConnectionManager.new(true, 4, host: 'localhost', port: 5672)
|
||||
@router.gateway_name = 'gateway1'
|
||||
end
|
||||
|
||||
subject { @router }
|
||||
|
|
@ -122,6 +123,41 @@ describe Router do
|
|||
|
||||
end
|
||||
|
||||
describe "periodical_check_clients" do
|
||||
let(:user) { FactoryGirl.create(:user) }
|
||||
it "with no data" do
|
||||
@router.client_lookup.length.should == 0
|
||||
@router.periodical_check_clients
|
||||
done
|
||||
end
|
||||
|
||||
it "with one OK client" do
|
||||
client = double("client")
|
||||
client.should_receive(:context=).any_number_of_times
|
||||
conn1 = FactoryGirl.create(:connection, :user => user, :client_id => "pc1")
|
||||
@router.add_tracker(user, client, 'client', conn1.client_id)
|
||||
@router.client_lookup[conn1.client_id].should_not be_nil
|
||||
@router.periodical_check_clients
|
||||
@router.client_lookup[conn1.client_id].should_not be_nil
|
||||
done
|
||||
end
|
||||
|
||||
it "with one missing client" do
|
||||
client = double("client")
|
||||
client.should_receive(:context=).any_number_of_times
|
||||
context = ClientContext.new(user, client, "client")
|
||||
client.should_receive(:context).any_number_of_times.and_return(context)
|
||||
client.should_receive(:close)
|
||||
conn1 = FactoryGirl.create(:connection, :user => user, :client_id => "pc1")
|
||||
client.should_receive(:client_id).and_return(conn1.client_id)
|
||||
@router.add_tracker(user, client, 'client', conn1.client_id)
|
||||
conn1.delete
|
||||
@router.client_lookup[conn1.client_id].should_not be_nil
|
||||
@router.periodical_check_clients
|
||||
@router.client_lookup[conn1.client_id].should be_nil
|
||||
done
|
||||
end
|
||||
end
|
||||
|
||||
describe "serviceability" do
|
||||
it "should start and stop", :mq => true do
|
||||
|
|
|
|||
Loading…
Reference in New Issue