2012-10-21 15:05:06 +00:00
module JamRuby
2012-11-11 01:07:17 +00:00
# All writes should occur through the ConnectionManager for the connection table
2012-10-21 15:05:06 +00:00
# Reads can occur freely elsewhere, though
# Because connections are tied to the websocket-connection and we bookkeep them in the database purely
# for 'SQL convenience', this is a obvious place we can go away from a database
# as an optimization if we find it's too much db traffic created'
# At a minimum, though, we could make connections an UNLOGGED table because if the database crashes,
# all clients should reconnect and restablish their connection anyway
#
# All methods in here could also be refactored as stored procedures, if we stick with a database.
# This may make sense in the short term if we are still managing connections in the database, but
# we move to the node-js in the websocket gateway (because the websocket gateway needs to call some of these methods).
# Or of course we could just port the relevant methods to node-js
2012-11-11 01:07:17 +00:00
class ConnectionManager < BaseManager
2012-10-21 15:05:06 +00:00
2012-11-11 01:07:17 +00:00
attr_accessor :mq_router
2012-10-21 15:05:06 +00:00
2012-10-23 00:59:35 +00:00
def initialize ( options = { } )
2012-11-11 01:07:17 +00:00
super ( options )
2012-10-21 15:05:06 +00:00
@log = Logging . logger [ self ]
@mq_router = MQRouter . new
@message_factory = MessageFactory . new
end
2012-10-21 01:55:49 +00:00
2012-10-21 15:05:06 +00:00
def update_staleness ( )
#TODO
2012-10-21 01:55:49 +00:00
end
2012-10-21 15:05:06 +00:00
# remove stale connections
def remove_stale_connections ( max_seconds )
stale_clients = [ ]
@pg_conn . exec ( " SELECT client_id FROM connections WHERE updated_at < (NOW() - interval ' #{ max_seconds } second') " ) do | result |
result . each do | row |
stale_clients . push ( row [ 'client_id' ] )
end
end
2012-10-21 01:55:49 +00:00
2012-10-23 11:37:25 +00:00
@log . debug ( " deleting #{ stale_clients . length } stale connections " )
2012-10-21 15:05:06 +00:00
stale_clients . each do | client_id |
delete_connection ( client_id )
end
2012-10-21 01:55:49 +00:00
end
2012-10-23 00:59:35 +00:00
2012-10-21 15:05:06 +00:00
def create_connection ( user_id , client_id , ip_address )
2012-10-23 00:59:35 +00:00
conn = @pg_conn
2012-10-21 01:55:49 +00:00
2012-10-23 00:59:35 +00:00
lock_connections ( conn )
2012-10-21 01:55:49 +00:00
2012-10-23 00:59:35 +00:00
conn . exec ( " INSERT INTO connections (user_id, client_id, ip_address) VALUES ($1, $2, $3) " , [ user_id , client_id , ip_address ] ) . clear
2012-10-21 01:55:49 +00:00
2012-10-23 00:59:35 +00:00
# we just created a new connection-if this is the first time the user has shown up, we need to send out a message to his friends
conn . exec ( " SELECT count(user_id) FROM connections WHERE user_id = $1 " , [ user_id ] ) do | result |
count = result . getvalue ( 0 , 0 )
if count == " 1 "
# get all friend user_ids using the same query rails does for @user.friends
friend_update = @message_factory . friend_update ( user_id , true )
friend_ids = gather_friends ( conn , user_id )
@mq_router . publish_to_friends ( friend_ids , friend_update , user_id )
2012-10-21 01:55:49 +00:00
end
end
end
2012-10-21 15:05:06 +00:00
# once a connection is known gone (whether timeout or because a TCP connection is observed lost)
# this code is responsible for all cleanup logic associated with a connection going away
def delete_connection ( client_id )
2012-10-21 01:55:49 +00:00
2012-10-21 15:05:06 +00:00
user_id = nil
music_session_id = nil
2012-10-21 01:55:49 +00:00
2012-10-23 00:59:35 +00:00
conn = @pg_conn
2012-10-21 01:55:49 +00:00
2012-10-23 00:59:35 +00:00
lock_connections ( conn )
2012-10-21 01:55:49 +00:00
2012-10-23 00:59:35 +00:00
previous_music_session_id = check_already_session ( conn , client_id )
2012-10-21 01:55:49 +00:00
2012-10-23 00:59:35 +00:00
conn . exec ( " DELETE FROM connections WHERE client_id = $1 RETURNING user_id, music_session_id " , [ client_id ] ) do | result |
2012-10-21 01:55:49 +00:00
2012-10-23 00:59:35 +00:00
if result . cmd_tuples == 0
# the client is already gone from the database... do nothing but log error
@log . warn ( " unable to delete client #{ client_id } " )
return
elsif result . cmd_tuples == 1
user_id = result [ 0 ] [ 'user_id' ]
music_session_id = result [ 0 ] [ 'client_id' ]
2012-10-21 15:05:06 +00:00
2012-10-23 00:59:35 +00:00
else
raise Exception , 'uniqueness constraint has been lost on client_id'
2012-10-21 15:05:06 +00:00
end
2012-10-23 00:59:35 +00:00
end
2012-10-21 15:05:06 +00:00
2012-10-23 00:59:35 +00:00
session_checks ( conn , previous_music_session_id , user_id )
2012-10-21 15:05:06 +00:00
2012-10-23 00:59:35 +00:00
# since we did delete a row, check and see if any more connections for that user exist
# if we are down to zero, send out user gone message
conn . exec ( " SELECT count(user_id) FROM connections where user_id = $1 " , [ user_id ] ) do | result |
count = result . getvalue ( 0 , 0 )
if count == " 0 "
friend_update = @message_factory . friend_update ( user_id , false )
friend_ids = gather_friends ( conn , user_id )
@mq_router . publish_to_friends ( friend_ids , friend_update , user_id )
2012-10-21 15:05:06 +00:00
end
2012-10-23 00:59:35 +00:00
end
2012-10-21 15:05:06 +00:00
2012-10-23 00:59:35 +00:00
# same for session-if we are down to the last participant, delete the session
unless music_session_id . nil?
conn . exec ( " DELETE FROM music_sessions id = $1 AND 0 = (SELECT count(music_session_id) FROM connections where music_session_id = $1) " , [ music_session_id ] ) . clear
2012-10-21 01:55:49 +00:00
end
2012-10-21 15:05:06 +00:00
end
def destroy_if_empty ( conn , music_session_id )
2012-10-21 01:55:49 +00:00
2012-10-21 15:05:06 +00:00
num_participants = nil
conn . exec ( " SELECT count(*) FROM connections WHERE music_session_id = $1 " , [ music_session_id ] ) do | result |
num_participants = result . getvalue ( 0 , 0 ) . to_i
end
2012-10-21 01:55:49 +00:00
2012-10-21 15:05:06 +00:00
if num_participants == 0
# delete the music_session
conn . exec ( " DELETE from music_sessions WHERE id = $1 " , [ music_session_id ] ) do | result |
if result . cmd_tuples == 0
# no music session deleted. do nothing
elsif result . cmd_tuples == 1
# music session deleted!
@log . debug ( " deleted music session #{ music_session_id } " )
else
@log . error ( " music_sessions table data integrity violation; multiple rows found with music_session_id= #{ music_session_id } " )
raise Exception , " music_sessions table data integrity violation; multiple rows found with music_session_id= #{ music_session_id } "
end
2012-10-21 01:55:49 +00:00
end
end
2012-10-21 15:05:06 +00:00
end
2012-10-21 01:55:49 +00:00
2012-10-21 15:05:06 +00:00
def check_already_session ( conn , client_id )
conn . exec ( " SELECT music_session_id FROM connections WHERE client_id = $1 " , [ client_id ] ) do | result |
if result . num_tuples == 1
previous_music_session_id = result . getvalue ( 0 , 0 )
return previous_music_session_id
elsif result . num_tuples == 0
# there is no connection found matching this criteria; we are done.
@log . debug ( " when checking for existing session, no connection found with client= #{ client_id } " )
return nil
else
@log . error ( " connection table data integrity violation; multiple rows found. client_id= #{ client_id } " )
raise Exception , " connection table data integrity violation; multiple rows found. client_id= #{ client_id } "
end
2012-10-21 01:55:49 +00:00
end
end
2012-10-21 15:05:06 +00:00
def session_checks ( conn , previous_music_session_id , user_id )
unless previous_music_session_id . nil?
# TODO: send notification to friends that this user left this session?
@log . debug ( " user #{ user_id } left music_session #{ previous_music_session_id } " )
destroy_if_empty ( conn , previous_music_session_id )
end
end
2012-11-18 02:59:59 +00:00
def join_music_session ( user_id , client_id , music_session_id , as_musician )
2012-10-23 00:59:35 +00:00
conn = @pg_conn
2012-10-21 15:05:06 +00:00
2012-10-23 00:59:35 +00:00
lock_connections ( conn )
2012-10-21 15:05:06 +00:00
2012-10-23 00:59:35 +00:00
previous_music_session_id = check_already_session ( conn , client_id )
2012-10-21 15:05:06 +00:00
2012-11-18 02:59:59 +00:00
user = User . find ( user_id )
if as_musician != true && as_musician != false # checks that a boolean was passed in
raise JamArgumentError , " as_musician incorrectly specified "
end
if as_musician && ! user . musician
raise PermissionError , " a fan can not join a music session as a musician "
end
2012-11-02 06:51:52 +00:00
# determine if the user can join; if not, throw a PermissionError
music_session = MusicSession . find ( music_session_id )
2012-11-18 02:59:59 +00:00
unless music_session . can_join? ( user , as_musician )
2012-11-02 06:51:52 +00:00
@log . debug " user can not join a session user_id= #{ user_id } and client_id= #{ client_id } "
raise PermissionError , " unable to join the specified session "
end
2012-10-23 00:59:35 +00:00
begin
2012-11-02 06:51:52 +00:00
# we include user_id in the query as an act o security, so that a user can't access someone else' client connection
2012-11-18 02:59:59 +00:00
conn . exec ( " UPDATE connections SET music_session_id = $1, as_musician = $2 WHERE client_id = $3 and user_id = $4 " , [ music_session_id , as_musician , client_id , user_id ] ) do | result |
2012-10-23 00:59:35 +00:00
if result . cmd_tuples == 1
2012-11-02 06:51:52 +00:00
@log . debug " associated music_session with connection for client= #{ client_id } , music_session= #{ music_session_id } , and user= #{ user_id } "
2012-10-21 15:05:06 +00:00
2012-10-23 00:59:35 +00:00
session_checks ( conn , previous_music_session_id , user_id )
elsif result . cmd_tuples == 0
2012-11-02 06:51:52 +00:00
@log . debug " join_music_session no connection found with client_id= #{ client_id } and user_id= #{ user_id } "
raise ActiveRecord :: RecordNotFound
2012-10-21 15:05:06 +00:00
else
2012-10-23 00:59:35 +00:00
@log . error " database failure or logic error; this path should be impossible if the table is locked (join_music_session) "
raise Exception , " locked table changed state "
2012-10-21 15:05:06 +00:00
end
end
2012-10-23 00:59:35 +00:00
rescue PG :: Error = > pg_error
if pg_error . to_s . include? ( " insert or update on table \" connections \" violates foreign key constraint \" connections_music_session_id_fkey \" " )
# if there is no music session that exists, we will receive this message
@log . debug " music_session does not exist. music_session= #{ music_session_id } "
raise StateError , " music_session does not exist "
else
raise pg_error
end
2012-10-21 15:05:06 +00:00
end
2012-10-23 00:59:35 +00:00
# end
2012-10-21 15:05:06 +00:00
end
def leave_music_session ( user_id , client_id , music_session_id )
2012-10-23 00:59:35 +00:00
conn = @pg_conn
2012-10-21 01:55:49 +00:00
2012-10-23 00:59:35 +00:00
lock_connections ( conn )
2012-10-21 01:55:49 +00:00
2012-10-23 00:59:35 +00:00
previous_music_session_id = check_already_session ( conn , client_id )
2012-10-21 15:05:06 +00:00
2012-10-23 00:59:35 +00:00
if previous_music_session_id == nil
@log . debug " the client is not in a session. user= #{ user_id } , client= #{ client_id } , music_session= #{ music_session_id } "
raise StateError , " not in session "
elsif previous_music_session_id != music_session_id
@log . debug " the client is in a different session. user= #{ user_id } , client= #{ client_id } , music_session= #{ music_session_id } "
raise StateError , " in a session different than that specified "
end
2012-10-21 15:05:06 +00:00
2012-10-23 00:59:35 +00:00
# can throw exception if the session is deleted just before this
2012-11-18 02:59:59 +00:00
conn . exec ( " UPDATE connections SET music_session_id = NULL, as_musician = NULL WHERE client_id = $1 AND user_id =$2 " , [ client_id , user_id ] ) do | result |
2012-10-23 00:59:35 +00:00
if result . cmd_tuples == 1
2012-11-02 06:51:52 +00:00
@log . debug ( " deassociated music_session with connection for client_id= #{ client_id } , user_id= #{ user_id } " )
2012-10-21 15:05:06 +00:00
2012-10-23 00:59:35 +00:00
session_checks ( conn , previous_music_session_id , user_id )
elsif result . cmd_tuples == 0
@log . debug " leave_music_session no connection found with client_id= #{ client_id } "
2012-11-02 06:51:52 +00:00
raise ActiveRecord :: RecordNotFound
2012-10-23 00:59:35 +00:00
else
@log . error ( " database failure or logic error; this path should be impossible if the table is locked (leave_music_session) " )
raise Exception , " locked table changed state "
2012-10-21 15:05:06 +00:00
end
end
end
2012-10-23 00:59:35 +00:00
2012-10-23 11:37:25 +00:00
def lock_connections ( conn )
conn . exec ( " LOCK connections IN EXCLUSIVE MODE " ) . clear
end
2012-10-21 15:05:06 +00:00
2012-10-23 11:37:25 +00:00
def gather_friends ( conn , user_id )
friend_ids = [ ]
conn . exec ( " SELECT f1.friend_id as friend_id FROM friendships f1 WHERE f1.user_id = $1 AND f1.friend_id IN (SELECT f2.user_id FROM friendships f2 WHERE f2.friend_id = $1) " , [ user_id ] ) do | friend_results |
friend_results . each do | friend_result |
friend_ids . push ( friend_result [ 'friend_id' ] )
end
end
return friend_ids
2012-10-21 15:05:06 +00:00
end
2012-10-23 11:37:25 +00:00
2012-10-21 01:55:49 +00:00
end
end