461 lines
19 KiB
Ruby
461 lines
19 KiB
Ruby
module JamRuby
|
|
# All writes should occur through the ConnectionManager for the connection table
|
|
# Reads can occur freely elsewhere, though
|
|
# Because connections are tied to the websocket-connection and we bookkeep them in the database purely
|
|
# for 'SQL convenience', this is a obvious place we can go away from a database
|
|
# as an optimization if we find it's too much db traffic created'
|
|
# At a minimum, though, we could make connections an UNLOGGED table because if the database crashes,
|
|
# all clients should reconnect and re-establish their connection anyway
|
|
#
|
|
# All methods in here could also be refactored as stored procedures, if we stick with a database.
|
|
# This may make sense in the short term if we are still managing connections in the database, but
|
|
# we move to the node-js in the websocket gateway (because the websocket gateway needs to call some of these methods).
|
|
# Or of course we could just port the relevant methods to node-js
|
|
#
|
|
# Also we don't send notifications from ConnectionManager;
|
|
# we just return enough data so that a caller can make the determination if it needs to
|
|
|
|
class ConnectionManager < BaseManager
|
|
|
|
def initialize(options={})
|
|
super(options)
|
|
@log = Logging.logger[self]
|
|
end
|
|
|
|
def update_staleness()
|
|
#TODO
|
|
end
|
|
|
|
##### TODO: refactored to notification.rb but left here for backwards compatibility w/ connection_manager_spec.rb
|
|
def gather_friends(connection, user_id)
|
|
friend_ids = []
|
|
connection.exec("SELECT f1.friend_id as friend_id FROM friendships f1 WHERE f1.user_id = $1 AND f1.friend_id IN (SELECT f2.user_id FROM friendships f2 WHERE f2.friend_id = $1)", [user_id]) do |friend_results|
|
|
friend_results.each do |friend_result|
|
|
friend_ids.push(friend_result['friend_id'])
|
|
end
|
|
end
|
|
return friend_ids
|
|
end
|
|
|
|
# this simulates music_session destroy callbacks with activerecord
|
|
def before_destroy_music_session(music_session_id)
|
|
music_session = MusicSession.find_by_id(music_session_id)
|
|
music_session.before_destroy if music_session
|
|
end
|
|
|
|
# reclaim the existing connection, if ip_address is not nil then perhaps a new address as well
|
|
def reconnect(conn, reconnect_music_session_id, ip_address, &blk)
|
|
music_session_id = nil
|
|
reconnected = false
|
|
|
|
# we will reconnect the same music_session that the connection was previously in,
|
|
# if it matches the same value currently in the database for music_session_id
|
|
music_session_id_expression = 'NULL'
|
|
unless reconnect_music_session_id.nil?
|
|
music_session_id_expression = "(CASE WHEN music_session_id='#{reconnect_music_session_id}' THEN music_session_id ELSE NULL END)"
|
|
end
|
|
|
|
if ip_address and !ip_address.eql?(conn.ip_address)
|
|
# turn ip_address string into a number, then fetch the isp and block records and update location info
|
|
|
|
addr = JamIsp.ip_to_num(ip_address)
|
|
#puts("============= JamIsp.ip_to_num returns #{addr} for #{ip_address} =============")
|
|
|
|
isp = JamIsp.lookup(addr)
|
|
#puts("============= JamIsp.lookup returns #{isp.inspect} for #{addr} =============")
|
|
if isp.nil? then ispid = 0 else ispid = isp.coid end
|
|
|
|
block = GeoIpBlocks.lookup(addr)
|
|
#puts("============= GeoIpBlocks.lookup returns #{block.inspect} for #{addr} =============")
|
|
if block.nil? then locid = 0 else locid = block.locid end
|
|
|
|
location = GeoIpLocations.lookup(locid)
|
|
if location.nil?
|
|
# todo what's a better default location?
|
|
locidispid = 0
|
|
latitude = 0.0
|
|
longitude = 0.0
|
|
countrycode = 'US'
|
|
region = 'TX'
|
|
city = 'Austin'
|
|
else
|
|
locidispid = locid*1000000+ispid
|
|
latitude = location.latitude
|
|
longitude = location.longitude
|
|
countrycode = location.countrycode
|
|
region = location.region
|
|
city = location.city
|
|
end
|
|
|
|
conn.ip_address = ip_address
|
|
conn.addr = addr
|
|
conn.locidispid = locidispid
|
|
conn.latitude = latitude
|
|
conn.longitude = longitude
|
|
conn.countrycode = countrycode
|
|
conn.region = region
|
|
conn.city = city
|
|
conn.save!(validate: false)
|
|
|
|
# we're passing all this stuff so that the user record might be updated as well...
|
|
blk.call(addr, locidispid, latitude, longitude, countrycode, region, city) unless blk.nil?
|
|
end
|
|
|
|
sql =<<SQL
|
|
UPDATE connections SET (aasm_state, updated_at, music_session_id) = ('#{Connection::CONNECT_STATE.to_s}', NOW(), #{music_session_id_expression})
|
|
WHERE
|
|
client_id = '#{conn.client_id}'
|
|
RETURNING music_session_id
|
|
SQL
|
|
self.pg_conn.exec(sql) do |result|
|
|
if result.cmd_tuples == 1
|
|
music_session_id = result[0]['music_session_id']
|
|
end
|
|
end
|
|
|
|
# we tell the client they reconnected if they specified a reconnect music_session_id, and if that is now the
|
|
# current value in the database
|
|
reconnected = true if !reconnect_music_session_id.nil? && reconnect_music_session_id == music_session_id
|
|
|
|
return music_session_id, reconnected
|
|
end
|
|
|
|
# returns the music_session_id, if any, associated with the client
|
|
def flag_connection_stale_with_client_id(client_id)
|
|
music_session_id = nil
|
|
sql =<<SQL
|
|
UPDATE connections SET aasm_state = '#{Connection::STALE_STATE.to_s}'
|
|
WHERE
|
|
client_id = '#{client_id}' AND
|
|
aasm_state = '#{Connection::CONNECT_STATE.to_s}'
|
|
RETURNING music_session_id
|
|
SQL
|
|
# @log.info("*** flag_connection_stale_with_client_id: client_id = #{client_id}; sql = #{sql}")
|
|
self.pg_conn.exec(sql) do |result|
|
|
|
|
# if we did update a client to stale, retriee music_session_id
|
|
if result.cmd_tuples == 1
|
|
music_session_id = result[0]['music_session_id']
|
|
end
|
|
end
|
|
|
|
music_session_id
|
|
end
|
|
|
|
# flag connections as stale
|
|
def flag_stale_connections(max_seconds)
|
|
ConnectionManager.active_record_transaction do |connection_manager|
|
|
conn = connection_manager.pg_conn
|
|
sql =<<SQL
|
|
SELECT count(user_id) FROM connections
|
|
WHERE
|
|
updated_at < (NOW() - interval '#{max_seconds} second') AND
|
|
aasm_state = '#{Connection::CONNECT_STATE.to_s}'
|
|
SQL
|
|
conn.exec(sql) do |result|
|
|
count = result.getvalue(0, 0)
|
|
# @log.info("flag_stale_connections: flagging #{count} stale connections")
|
|
if 0 < count.to_i
|
|
# @log.info("flag_stale_connections: flagging #{count} stale connections")
|
|
sql =<<SQL
|
|
UPDATE connections SET aasm_state = '#{Connection::STALE_STATE.to_s}'
|
|
WHERE
|
|
updated_at < (NOW() - interval '#{max_seconds} second') AND
|
|
aasm_state = '#{Connection::CONNECT_STATE.to_s}'
|
|
SQL
|
|
conn.exec(sql)
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
# NOTE this is only used for testing purposes;
|
|
# actual deletes will be processed in the websocket context which cleans up dependencies
|
|
def expire_stale_connections(max_seconds)
|
|
self.stale_connection_client_ids(max_seconds).each { |cid| self.delete_connection(cid) }
|
|
end
|
|
|
|
# expiring connections in stale state, which deletes them
|
|
def stale_connection_client_ids(max_seconds)
|
|
client_ids = []
|
|
ConnectionManager.active_record_transaction do |connection_manager|
|
|
conn = connection_manager.pg_conn
|
|
sql =<<SQL
|
|
SELECT client_id, music_session_id, user_id FROM connections
|
|
WHERE
|
|
updated_at < (NOW() - interval '#{max_seconds} second') AND
|
|
aasm_state = '#{Connection::STALE_STATE.to_s}'
|
|
SQL
|
|
conn.exec(sql) do |result|
|
|
result.each { |row|
|
|
client_id = row['client_id']
|
|
music_session_id = row['music_session_id']
|
|
user_id = row['user_id']
|
|
|
|
client_ids << client_id
|
|
|
|
}
|
|
end
|
|
end
|
|
client_ids
|
|
end
|
|
|
|
|
|
# returns the number of connections that this user currently has across all clients
|
|
# this number is used by notification logic elsewhere to know
|
|
# 'oh the user joined for the 1st time, so send a friend update', or
|
|
# 'don't bother because the user has connected somewhere else already'
|
|
def create_connection(user_id, client_id, ip_address, &blk)
|
|
count = 0
|
|
ConnectionManager.active_record_transaction do |connection_manager|
|
|
conn = connection_manager.pg_conn
|
|
|
|
# turn ip_address string into a number, then fetch the isp and block records
|
|
|
|
addr = JamIsp.ip_to_num(ip_address)
|
|
#puts("============= JamIsp.ip_to_num returns #{addr} for #{ip_address} =============")
|
|
|
|
isp = JamIsp.lookup(addr)
|
|
#puts("============= JamIsp.lookup returns #{isp.inspect} for #{addr} =============")
|
|
if isp.nil? then ispid = 0 else ispid = isp.coid end
|
|
|
|
block = GeoIpBlocks.lookup(addr)
|
|
#puts("============= GeoIpBlocks.lookup returns #{block.inspect} for #{addr} =============")
|
|
if block.nil? then locid = 0 else locid = block.locid end
|
|
|
|
location = GeoIpLocations.lookup(locid)
|
|
if location.nil?
|
|
# todo what's a better default location?
|
|
locidispid = 0
|
|
latitude = 0.0
|
|
longitude = 0.0
|
|
countrycode = 'US'
|
|
region = 'TX'
|
|
city = 'Austin'
|
|
else
|
|
locidispid = locid*1000000+ispid
|
|
latitude = location.latitude
|
|
longitude = location.longitude
|
|
countrycode = location.countrycode
|
|
region = location.region
|
|
city = location.city
|
|
end
|
|
|
|
lock_connections(conn)
|
|
|
|
conn.exec("INSERT INTO connections (user_id, client_id, ip_address, addr, locidispid, latitude, longitude, countrycode, region, city, aasm_state) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)",
|
|
[user_id, client_id, ip_address, addr, locidispid, latitude, longitude, countrycode, region, city, Connection::CONNECT_STATE.to_s]).clear
|
|
|
|
# we just created a new connection-if this is the first time the user has shown up, we need to send out a message to his friends
|
|
conn.exec("SELECT count(user_id) FROM connections WHERE user_id = $1", [user_id]) do |result|
|
|
count = result.getvalue(0, 0) .to_i
|
|
# we're passing all this stuff so that the user record might be updated as well...
|
|
blk.call(conn, count, addr, locidispid, latitude, longitude, countrycode, region, city) unless blk.nil?
|
|
end
|
|
return count
|
|
end
|
|
end
|
|
|
|
|
|
# once a connection is known gone (whether timeout or because a TCP connection is observed lost)
|
|
# this code is responsible for all cleanup logic associated with a connection going away
|
|
# returns how many connections are left for this user; this data is used by callers to know whether
|
|
# to tell friends if the user is offline (count==0) or not (count > 0)
|
|
# If a blk is passed in, on success, count is also passed back an the db connection, allowing for
|
|
# notifications to go out within the table log. music_session_id is also passed, if the music_session still exists
|
|
# and this connection was in a session
|
|
def delete_connection(client_id, &blk)
|
|
|
|
ConnectionManager.active_record_transaction do |connection_manager|
|
|
conn = connection_manager.pg_conn
|
|
count = 0
|
|
user_id = nil
|
|
music_session_id = nil
|
|
|
|
lock_connections(conn)
|
|
|
|
previous_music_session_id = check_already_session(conn, client_id)
|
|
|
|
conn.exec("DELETE FROM connections WHERE client_id = $1 RETURNING user_id, music_session_id", [client_id]) do |result|
|
|
|
|
if result.cmd_tuples == 0
|
|
# the client is already gone from the database... do nothing but log error
|
|
@log.warn("unable to delete client #{client_id}")
|
|
return
|
|
elsif result.cmd_tuples == 1
|
|
user_id = result[0]['user_id']
|
|
music_session_id = result[0]['music_session_id']
|
|
|
|
else
|
|
raise Exception, 'uniqueness constraint has been lost on client_id'
|
|
end
|
|
end
|
|
|
|
session_checks(conn, previous_music_session_id, user_id)
|
|
|
|
# since we did delete a row, check and see if any more connections for that user exist
|
|
# if we are down to zero, send out user gone message
|
|
conn.exec("SELECT count(user_id) FROM connections where user_id = $1", [user_id]) do |result|
|
|
count = result.getvalue(0, 0).to_i
|
|
end
|
|
|
|
# same for session-if we are down to the last participant, delete the session
|
|
unless music_session_id.nil?
|
|
before_destroy_music_session(music_session_id)
|
|
result = conn.exec("DELETE FROM music_sessions WHERE id = $1 AND 0 = (select count(music_session_id) FROM connections where music_session_id = $1)", [music_session_id])
|
|
if result.cmd_tuples == 1
|
|
music_session_id = nil
|
|
end
|
|
end
|
|
|
|
blk.call(conn, count, music_session_id, user_id) unless blk.nil?
|
|
return count
|
|
end
|
|
end
|
|
|
|
def check_already_session(conn, client_id)
|
|
conn.exec("SELECT music_session_id FROM connections WHERE client_id = $1", [client_id]) do |result|
|
|
if result.num_tuples == 1
|
|
previous_music_session_id = result.getvalue(0, 0)
|
|
return previous_music_session_id
|
|
elsif result.num_tuples == 0
|
|
# there is no connection found matching this criteria; we are done.
|
|
@log.debug("when checking for existing session, no connection found with client=#{client_id}")
|
|
return nil
|
|
else
|
|
@log.error("connection table data integrity violation; multiple rows found. client_id=#{client_id}")
|
|
raise Exception, "connection table data integrity violation; multiple rows found. client_id=#{client_id}"
|
|
end
|
|
end
|
|
end
|
|
|
|
def session_checks(conn, previous_music_session_id, user_id)
|
|
unless previous_music_session_id.nil?
|
|
# TODO: send notification to friends that this user left this session?
|
|
@log.debug("user #{user_id} left music_session #{previous_music_session_id}")
|
|
|
|
# destroy the music_session if it's empty
|
|
num_participants = nil
|
|
conn.exec("SELECT count(*) FROM connections WHERE music_session_id = $1",
|
|
[previous_music_session_id]) do |result|
|
|
num_participants = result.getvalue(0, 0).to_i
|
|
end
|
|
if num_participants == 0
|
|
# delete the music_session
|
|
before_destroy_music_session(previous_music_session_id)
|
|
conn.exec("DELETE from music_sessions WHERE id = $1",
|
|
[previous_music_session_id]) do |result|
|
|
if result.cmd_tuples == 1
|
|
# music session deleted!
|
|
@log.debug("deleted music session #{previous_music_session_id}")
|
|
JamRuby::MusicSessionHistory.removed_music_session(previous_music_session_id)
|
|
elsif 1 < result.cmd_tuples
|
|
msg = "music_sessions table data integrity violation; multiple rows found with music_session_id=#{previous_music_session_id}"
|
|
@log.error(msg)
|
|
raise Exception, msg
|
|
end
|
|
end
|
|
else
|
|
# there are still people in the session
|
|
|
|
#ensure that there is no active claimed recording if the owner of that recording left the session
|
|
conn.exec("UPDATE music_sessions set claimed_recording_id = NULL, claimed_recording_initiator_id = NULL where claimed_recording_initiator_id = $1 and id = $2",
|
|
[user_id, previous_music_session_id])
|
|
end
|
|
|
|
end
|
|
end
|
|
|
|
# if a blk is passed in, upon success, it will be called and you can issue notifications
|
|
# within the connection table lock
|
|
def join_music_session(user, client_id, music_session, as_musician, tracks, &blk)
|
|
connection = nil
|
|
user_id = user.id
|
|
music_session_id = music_session.id
|
|
|
|
ConnectionManager.active_record_transaction do |connection_manager|
|
|
db_conn = connection_manager.pg_conn
|
|
|
|
connection = Connection.find_by_client_id_and_user_id!(client_id, user_id)
|
|
connection.music_session_id = music_session_id
|
|
connection.as_musician = as_musician
|
|
connection.joining_session = true
|
|
associate_tracks(connection, tracks)
|
|
connection.save
|
|
|
|
if connection.errors.any?
|
|
raise ActiveRecord::Rollback
|
|
end
|
|
end
|
|
|
|
connection
|
|
end
|
|
|
|
# if a blk is passed in, upon success, it will be called and you can issue notifications
|
|
# within the connection table lock
|
|
def leave_music_session(user, connection, music_session, &blk)
|
|
ConnectionManager.active_record_transaction do |connection_manager|
|
|
|
|
conn = connection_manager.pg_conn
|
|
|
|
lock_connections(conn)
|
|
|
|
music_session_id = music_session.id
|
|
user_id = user.id
|
|
client_id = connection.client_id
|
|
|
|
previous_music_session_id = check_already_session(conn, client_id)
|
|
|
|
if previous_music_session_id == nil
|
|
@log.debug "the client is not in a session. user=#{user_id}, client=#{client_id}, music_session=#{music_session_id}"
|
|
raise StateError, "not in session"
|
|
elsif previous_music_session_id != music_session_id
|
|
@log.debug "the client is in a different session. user=#{user_id}, client=#{client_id}, music_session=#{music_session_id}"
|
|
raise StateError, "in a session different than that specified"
|
|
end
|
|
|
|
# can throw exception if the session is deleted just before this
|
|
conn.exec("UPDATE connections SET music_session_id = NULL, as_musician = NULL WHERE client_id = $1 AND user_id =$2", [client_id, user_id]) do |result|
|
|
if result.cmd_tuples == 1
|
|
@log.debug("disassociated music_session with connection for client_id=#{client_id}, user_id=#{user_id}")
|
|
|
|
JamRuby::MusicSessionUserHistory.removed_music_session(user_id, music_session_id)
|
|
session_checks(conn, previous_music_session_id, user_id)
|
|
blk.call() unless blk.nil?
|
|
|
|
elsif result.cmd_tuples == 0
|
|
@log.debug "leave_music_session no connection found with client_id=#{client_id}"
|
|
raise ActiveRecord::RecordNotFound
|
|
else
|
|
@log.error("database failure or logic error; this path should be impossible if the table is locked (leave_music_session)")
|
|
raise Exception, "locked table changed state"
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
def lock_connections(conn)
|
|
conn.exec("LOCK connections IN EXCLUSIVE MODE").clear
|
|
end
|
|
|
|
def associate_tracks(connection, tracks)
|
|
@log.debug "Tracks:"
|
|
@log.debug tracks
|
|
connection.tracks.clear()
|
|
|
|
unless tracks.nil?
|
|
tracks.each do |track|
|
|
instrument = Instrument.find(track["instrument_id"])
|
|
t = Track.new
|
|
t.instrument = instrument
|
|
t.connection = connection
|
|
t.sound = track["sound"]
|
|
t.client_track_id = track["client_track_id"]
|
|
t.save
|
|
connection.tracks << t
|
|
end
|
|
end
|
|
end
|
|
end
|
|
end
|