jam-cloud/lib/jam_ruby/connection_manager.rb

370 lines
15 KiB
Ruby

module JamRuby
# All writes should occur through the ConnectionManager for the connection table
# Reads can occur freely elsewhere, though
# Because connections are tied to the websocket-connection and we bookkeep them in the database purely
# for 'SQL convenience', this is a obvious place we can go away from a database
# as an optimization if we find it's too much db traffic created'
# At a minimum, though, we could make connections an UNLOGGED table because if the database crashes,
# all clients should reconnect and restablish their connection anyway
#
# All methods in here could also be refactored as stored procedures, if we stick with a database.
# This may make sense in the short term if we are still managing connections in the database, but
# we move to the node-js in the websocket gateway (because the websocket gateway needs to call some of these methods).
# Or of course we could just port the relevant methods to node-js
class ConnectionManager < BaseManager
def initialize(options={})
super(options)
@log = Logging.logger[self]
end
def update_staleness()
#TODO
end
##### TODO: refactored to notification.rb but left here for backwards compatibility w/ connection_manager_spec.rb
def gather_friends(connection, user_id)
friend_ids = []
connection.exec("SELECT f1.friend_id as friend_id FROM friendships f1 WHERE f1.user_id = $1 AND f1.friend_id IN (SELECT f2.user_id FROM friendships f2 WHERE f2.friend_id = $1)", [user_id]) do |friend_results|
friend_results.each do |friend_result|
friend_ids.push(friend_result['friend_id'])
end
end
return friend_ids
end
def reconnect(conn)
sql =<<SQL
UPDATE connections SET (aasm_state,updated_at) = ('#{Connection::CONNECT_STATE.to_s}', NOW())
WHERE
client_id = '#{conn.client_id}'
SQL
# @log.info("*** reconnect: client_id = #{conn.client_id}")
self.pg_conn.exec(sql)
end
def flag_connection_stale_with_client_id(client_id)
sql =<<SQL
UPDATE connections SET aasm_state = '#{Connection::STALE_STATE.to_s}'
WHERE
client_id = '#{client_id}' AND
aasm_state = '#{Connection::CONNECT_STATE.to_s}'
SQL
# @log.info("*** flag_connection_stale_with_client_id: client_id = #{client_id}; sql = #{sql}")
self.pg_conn.exec(sql)
end
# flag connections as stale
def flag_stale_connections(max_seconds)
ConnectionManager.active_record_transaction do |connection_manager|
conn = connection_manager.pg_conn
sql =<<SQL
SELECT count(user_id) FROM connections
WHERE
updated_at < (NOW() - interval '#{max_seconds} second') AND
aasm_state = '#{Connection::CONNECT_STATE.to_s}'
SQL
conn.exec(sql) do |result|
count = result.getvalue(0, 0)
# @log.info("flag_stale_connections: flagging #{count} stale connections")
if 0 < count.to_i
# @log.info("flag_stale_connections: flagging #{count} stale connections")
sql =<<SQL
UPDATE connections SET aasm_state = '#{Connection::STALE_STATE.to_s}'
WHERE
updated_at < (NOW() - interval '#{max_seconds} second') AND
aasm_state = '#{Connection::CONNECT_STATE.to_s}'
SQL
conn.exec(sql)
end
end
end
end
# NOTE this is only used for testing purposes; actual deletes will be processed in the websocket context which cleans up dependencies
def expire_stale_connections(max_seconds)
self.stale_connection_client_ids(max_seconds).each { |cid| self.delete_connection(cid) }
end
# expiring connections in stale state, which deletes them
def stale_connection_client_ids(max_seconds)
client_ids = []
ConnectionManager.active_record_transaction do |connection_manager|
conn = connection_manager.pg_conn
sql =<<SQL
SELECT client_id FROM connections
WHERE
updated_at < (NOW() - interval '#{max_seconds} second') AND
aasm_state = '#{Connection::STALE_STATE.to_s}'
SQL
conn.exec(sql) do |result|
result.each { |row| client_ids << row['client_id'] }
# @log.debug("*** stale_connection_client_ids: client_ids = #{client_ids.inspect}")
end
end
client_ids
end
def create_connection(user_id, client_id, ip_address)
ConnectionManager.active_record_transaction do |connection_manager|
conn = connection_manager.pg_conn
lock_connections(conn)
conn.exec("INSERT INTO connections (user_id, client_id, ip_address, aasm_state) VALUES ($1, $2, $3, $4)",
[user_id, client_id, ip_address, Connection::CONNECT_STATE.to_s]).clear
# we just created a new connection-if this is the first time the user has shown up, we need to send out a message to his friends
conn.exec("SELECT count(user_id) FROM connections WHERE user_id = $1", [user_id]) do |result|
count = result.getvalue(0, 0)
if count == "1"
# send notification
Notification.send_friend_update(user_id, true, conn)
end
end
return Connection.find_by_client_id!(client_id)
end
end
# once a connection is known gone (whether timeout or because a TCP connection is observed lost)
# this code is responsible for all cleanup logic associated with a connection going away
def delete_connection(client_id)
ConnectionManager.active_record_transaction do |connection_manager|
conn = connection_manager.pg_conn
user_id = nil
music_session_id = nil
lock_connections(conn)
previous_music_session_id = check_already_session(conn, client_id)
conn.exec("DELETE FROM connections WHERE client_id = $1 RETURNING user_id, music_session_id", [client_id]) do |result|
if result.cmd_tuples == 0
# the client is already gone from the database... do nothing but log error
@log.warn("unable to delete client #{client_id}")
return
elsif result.cmd_tuples == 1
user_id = result[0]['user_id']
music_session_id = result[0]['client_id']
else
raise Exception, 'uniqueness constraint has been lost on client_id'
end
end
session_checks(conn, previous_music_session_id, user_id)
# since we did delete a row, check and see if any more connections for that user exist
# if we are down to zero, send out user gone message
conn.exec("SELECT count(user_id) FROM connections where user_id = $1", [user_id]) do |result|
count = result.getvalue(0, 0)
if count == "0"
# send notification
Notification.send_friend_update(user_id, false, conn)
end
end
# same for session-if we are down to the last participant, delete the session
unless music_session_id.nil?
conn.exec("DELETE FROM music_sessions id = $1 AND 0 = (SELECT count(music_session_id) FROM connections where music_session_id = $1)", [music_session_id]).clear
end
end
end
def destroy_if_empty(conn, music_session_id)
num_participants = nil
conn.exec("SELECT count(*) FROM connections WHERE music_session_id = $1", [music_session_id]) do |result|
num_participants = result.getvalue(0, 0).to_i
end
if num_participants == 0
# delete the music_session
conn.exec("DELETE from music_sessions WHERE id = $1", [music_session_id]) do |result|
if result.cmd_tuples == 0
# no music session deleted. do nothing
elsif result.cmd_tuples == 1
# music session deleted!
@log.debug("deleted music session #{music_session_id}")
else
@log.error("music_sessions table data integrity violation; multiple rows found with music_session_id=#{music_session_id}")
raise Exception, "music_sessions table data integrity violation; multiple rows found with music_session_id=#{music_session_id}"
end
end
end
end
def check_already_session(conn, client_id)
conn.exec("SELECT music_session_id FROM connections WHERE client_id = $1", [client_id]) do |result|
if result.num_tuples == 1
previous_music_session_id = result.getvalue(0, 0)
return previous_music_session_id
elsif result.num_tuples == 0
# there is no connection found matching this criteria; we are done.
@log.debug("when checking for existing session, no connection found with client=#{client_id}")
return nil
else
@log.error("connection table data integrity violation; multiple rows found. client_id=#{client_id}")
raise Exception, "connection table data integrity violation; multiple rows found. client_id=#{client_id}"
end
end
end
def session_checks(conn, previous_music_session_id, user_id)
unless previous_music_session_id.nil?
# TODO: send notification to friends that this user left this session?
@log.debug("user #{user_id} left music_session #{previous_music_session_id}")
destroy_if_empty(conn, previous_music_session_id)
end
end
def join_music_session(user, client_id, music_session, as_musician, tracks)
connection = nil
user_id = user.id
music_session_id = music_session.id
ConnectionManager.active_record_transaction do |connection_manager|
db_conn = connection_manager.pg_conn
connection = Connection.find_by_client_id_and_user_id!(client_id, user_id)
connection.music_session_id = music_session_id
connection.as_musician = as_musician
connection.joining_session = true
associate_tracks(connection, tracks)
connection.save
if connection.errors.any?
raise ActiveRecord::Rollback
else
if as_musician && music_session.musician_access
Notification.send_musician_session_join(music_session, connection, user)
Notification.send_friend_session_join(db_conn, connection, user)
end
MusicSessionUserHistory.save(music_session_id, user_id, client_id)
end
end
return connection
end
def join_music_session_old(user_id, client_id, music_session_id, as_musician)
conn = @pg_conn
lock_connections(conn)
previous_music_session_id = check_already_session(conn, client_id)
user = User.find(user_id)
if as_musician != true && as_musician != false # checks that a boolean was passed in
raise JamArgumentError, "as_musician incorrectly specified"
end
# determine if the user can join; if not, throw a PermissionError
music_session = MusicSession.find(music_session_id)
unless music_session.can_join?(user, as_musician)
@log.debug "user can not join a session user_id=#{user_id} and client_id=#{client_id}"
raise PermissionError, "unable to join the specified session"
end
begin
# we include user_id in the query as an act of security, so that a user can't access someone else's client connection
conn.exec("UPDATE connections SET music_session_id = $1, as_musician = $2 WHERE client_id = $3 and user_id = $4", [music_session_id, as_musician, client_id, user_id]) do |result|
if result.cmd_tuples == 1
@log.debug "associated music_session with connection for client=#{client_id}, music_session=#{music_session_id}, and user=#{user_id}"
session_checks(conn, previous_music_session_id, user_id)
elsif result.cmd_tuples == 0
@log.debug "join_music_session no connection found with client_id=#{client_id} and user_id=#{user_id}"
raise ActiveRecord::RecordNotFound
else
@log.error "database failure or logic error; this path should be impossible if the table is locked (join_music_session)"
raise Exception, "locked table changed state"
end
end
rescue PG::Error => pg_error
if pg_error.to_s.include?("insert or update on table \"connections\" violates foreign key constraint \"connections_music_session_id_fkey\"")
# if there is no music session that exists, we will receive this message
@log.debug "music_session does not exist. music_session=#{music_session_id}"
raise StateError, "music_session does not exist"
else
raise pg_error
end
end
# end
end
def leave_music_session(user, connection, music_session)
ConnectionManager.active_record_transaction do |connection_manager|
conn = connection_manager.pg_conn
lock_connections(conn)
music_session_id = music_session.id
user_id = user.id
client_id = connection.client_id
previous_music_session_id = check_already_session(conn, client_id)
if previous_music_session_id == nil
@log.debug "the client is not in a session. user=#{user_id}, client=#{client_id}, music_session=#{music_session_id}"
raise StateError, "not in session"
elsif previous_music_session_id != music_session_id
@log.debug "the client is in a different session. user=#{user_id}, client=#{client_id}, music_session=#{music_session_id}"
raise StateError, "in a session different than that specified"
end
# can throw exception if the session is deleted just before this
conn.exec("UPDATE connections SET music_session_id = NULL, as_musician = NULL WHERE client_id = $1 AND user_id =$2", [client_id, user_id]) do |result|
if result.cmd_tuples == 1
@log.debug("disassociated music_session with connection for client_id=#{client_id}, user_id=#{user_id}")
session_checks(conn, previous_music_session_id, user_id)
Notification.send_musician_session_depart(music_session, connection, user)
elsif result.cmd_tuples == 0
@log.debug "leave_music_session no connection found with client_id=#{client_id}"
raise ActiveRecord::RecordNotFound
else
@log.error("database failure or logic error; this path should be impossible if the table is locked (leave_music_session)")
raise Exception, "locked table changed state"
end
end
end
end
def lock_connections(conn)
conn.exec("LOCK connections IN EXCLUSIVE MODE").clear
end
def associate_tracks(connection, tracks)
@log.debug "Tracks:"
@log.debug tracks
connection.tracks.clear()
unless tracks.nil?
tracks.each do |track|
instrument = Instrument.find(track["instrument_id"])
t = Track.new
t.instrument = instrument
t.connection = connection
t.sound = track["sound"]
t.save
connection.tracks << t
end
end
end
end
end