103 lines
3.5 KiB
Ruby
103 lines
3.5 KiB
Ruby
|
|
class ConnectionManager
|
||
|
|
|
||
|
|
attr_accessor :mq_router
|
||
|
|
|
||
|
|
def initialize(conn)
|
||
|
|
@log = Logging.logger[self]
|
||
|
|
@mq_router = MQRouter.new
|
||
|
|
@pg_conn = conn
|
||
|
|
@message_factory = MessageFactory.new
|
||
|
|
|
||
|
|
|
||
|
|
unless PG.threadsafe?
|
||
|
|
raise Exception "a non-threadsafe build of libpq is present."
|
||
|
|
end
|
||
|
|
end
|
||
|
|
|
||
|
|
def remove_stale_connections()
|
||
|
|
@pg_conn.transaction do |conn|
|
||
|
|
|
||
|
|
end
|
||
|
|
end
|
||
|
|
|
||
|
|
def create_connection(user_id, client_id, ip_address)
|
||
|
|
@pg_conn.transaction do |conn|
|
||
|
|
|
||
|
|
lock_connections(conn)
|
||
|
|
|
||
|
|
conn.exec("INSERT INTO connections (user_id, client_id, ip_address) VALUES ($1, $2, $3)", [user_id, client_id, ip_address]).clear
|
||
|
|
|
||
|
|
# we just created a new connection-if this is the first time the user has shown up, we need to send out a message to his friends
|
||
|
|
conn.exec("SELECT count(user_id) FROM connections WHERE user_id = $1", [user_id]) do |result|
|
||
|
|
count = result.getvalue(0, 0)
|
||
|
|
if count == "1"
|
||
|
|
# get all friend user_ids using the same query rails does for @user.friends
|
||
|
|
friend_update = @message_factory.friend_update(user_id, true)
|
||
|
|
friend_ids = gather_friends(conn, user_id)
|
||
|
|
@mq_router.publish_to_friends(friend_ids, friend_update, user_id)
|
||
|
|
end
|
||
|
|
end
|
||
|
|
end
|
||
|
|
end
|
||
|
|
|
||
|
|
|
||
|
|
# once a connection is known gone (whether timeout or because a TCP connection is observed lost)
|
||
|
|
# this code is responsible for all cleanup logic associated with a connection going away
|
||
|
|
def delete_connection(client_id)
|
||
|
|
|
||
|
|
user_id = nil
|
||
|
|
music_session_id = nil
|
||
|
|
|
||
|
|
@pg_conn.transaction do |conn|
|
||
|
|
|
||
|
|
lock_connections(conn)
|
||
|
|
|
||
|
|
conn.exec("DELETE FROM connections WHERE client_id = $1 RETURNING user_id, music_session_id", [client_id]) do |result|
|
||
|
|
|
||
|
|
if result.cmd_tuples == 0
|
||
|
|
# the client is already gone from the database... do nothing but log error
|
||
|
|
@log.error("unable to delete client #{client_id}")
|
||
|
|
return
|
||
|
|
elsif result.cmd_tuples == 1
|
||
|
|
user_id = result[0]['user_id']
|
||
|
|
music_session_id = result[0]['client_id']
|
||
|
|
|
||
|
|
else
|
||
|
|
raise Exception 'uniqueness constraint has been lost on client_id'
|
||
|
|
end
|
||
|
|
end
|
||
|
|
|
||
|
|
|
||
|
|
# since we did delete a row, check and see if any more connections for that user exist
|
||
|
|
# if we are down to zero, send out user gone message
|
||
|
|
conn.exec("SELECT count(user_id) FROM connections where user_id = $1", [user_id]) do |result|
|
||
|
|
count = result.getvalue(0, 0)
|
||
|
|
if count == "0"
|
||
|
|
friend_update = @message_factory.friend_update(user_id, false)
|
||
|
|
friend_ids = gather_friends(conn, user_id)
|
||
|
|
@mq_router.publish_to_friends(friend_ids, friend_update, user_id)
|
||
|
|
end
|
||
|
|
end
|
||
|
|
|
||
|
|
# same for session-if we are down to the last participant, delete the session
|
||
|
|
unless music_session_id.nil?
|
||
|
|
conn.exec("DELETE FROM music_sessions id = $1 AND 0 = (SELECT count(music_session_id) FROM connections where music_session_id = $1)", [music_session_id]).clear
|
||
|
|
end
|
||
|
|
end
|
||
|
|
end
|
||
|
|
|
||
|
|
def lock_connections(conn)
|
||
|
|
conn.exec("LOCK connections IN ACCESS EXCLUSIVE MODE").clear
|
||
|
|
end
|
||
|
|
|
||
|
|
|
||
|
|
def gather_friends(conn, user_id)
|
||
|
|
friend_ids = []
|
||
|
|
conn.exec("SELECT f1.friend_id as friend_id FROM friendships f1 WHERE f1.user_id = $1 AND f1.friend_id IN (SELECT f2.user_id FROM friendships f2 WHERE f2.friend_id = $1)", [user_id]) do |friend_results|
|
||
|
|
friend_results.each do |friend_result|
|
||
|
|
friend_ids.push(friend_result['friend_id'])
|
||
|
|
end
|
||
|
|
end
|
||
|
|
return friend_ids
|
||
|
|
end
|
||
|
|
end
|