diff --git a/Gemfile b/Gemfile index 4bfca7659..3c442ecfe 100644 --- a/Gemfile +++ b/Gemfile @@ -13,6 +13,7 @@ gem 'bcrypt-ruby', '3.0.1' gem 'ruby-protocol-buffers', '1.2.2' gem 'eventmachine' gem 'amqp' +gem 'pry' group :test do gem 'jam_db', :path=> "#{workspace}/jam-db/target/ruby_package" diff --git a/lib/jam_ruby.rb b/lib/jam_ruby.rb index 9fb8eeca8..bd1056063 100644 --- a/lib/jam_ruby.rb +++ b/lib/jam_ruby.rb @@ -4,9 +4,9 @@ require "jampb" require "uuidtools" require "logging" require "jam_ruby/mq_router" +require "jam_ruby/connection_manager" require "jam_ruby/version" require "jam_ruby/message_factory" -require "jam_ruby/models/music_session_client" require "jam_ruby/models/user" require "jam_ruby/models/musician" require "jam_ruby/models/band" diff --git a/lib/jam_ruby/connection_cleaner.rb b/lib/jam_ruby/connection_cleaner.rb deleted file mode 100644 index 2bee3b57b..000000000 --- a/lib/jam_ruby/connection_cleaner.rb +++ /dev/null @@ -1,51 +0,0 @@ -class ConnectionCleaner - - def initialize(db_options) - @mq_router = MQRouter.new - @pg_conn = PG::Connection.new(db_options) # just give a dedicated connection to this process - - unless PG.threadsafe - raise Exception "a non-threadsafe build of libpq is present." - end - end - - def remove_stale_connections() - @pg_conn.transaction do |conn| - - end - end - - - # once a connection is known gone (whether timeout or because a TCP connection is observed lost) - # this code is responsible for all cleanup logic associated with a connection going away - def remove_connection(client_id) - - @pg_conn.transaction do |conn| - - # FOR UPDATE - lock the connection and related jam_sessions in question - conn.exec("SELECT user_id FROM connections LEFT OUTER JOIN music_sessions WHERE client_id = $1 ON connections.music_session_id = music_sessions.id FOR UPDATE", [client_id]) do |result| - - conn.exec("DELETE FROM connections WHERE client_id = $1", [client_id]) do |result| - - if result.cmd_tuples == 0 - # the client is already gone from the database... do nothing - - elsif result.cmd_tuples == 1 - - # if we did delete a row, check and see if any more connections for that user exist - # if we are down to zero, send out user gone message - conn.exec("SELECT count(user_id) FROM connections where user_id = $1", [result[0][0]]) do |result| - @mq_router.server_publish_to_user() - end - else - raise Exception 'uniqueness constraint has been lost on client_id' - end - end - end - - end - - end - - -end \ No newline at end of file diff --git a/lib/jam_ruby/connection_manager.rb b/lib/jam_ruby/connection_manager.rb new file mode 100644 index 000000000..a9c47f8de --- /dev/null +++ b/lib/jam_ruby/connection_manager.rb @@ -0,0 +1,103 @@ +class ConnectionManager + + attr_accessor :mq_router + + def initialize(conn) + @log = Logging.logger[self] + @mq_router = MQRouter.new + @pg_conn = conn + @message_factory = MessageFactory.new + + + unless PG.threadsafe? + raise Exception "a non-threadsafe build of libpq is present." + end + end + + def remove_stale_connections() + @pg_conn.transaction do |conn| + + end + end + + def create_connection(user_id, client_id, ip_address) + @pg_conn.transaction do |conn| + + lock_connections(conn) + + conn.exec("INSERT INTO connections (user_id, client_id, ip_address) VALUES ($1, $2, $3)", [user_id, client_id, ip_address]).clear + + # we just created a new connection-if this is the first time the user has shown up, we need to send out a message to his friends + conn.exec("SELECT count(user_id) FROM connections WHERE user_id = $1", [user_id]) do |result| + count = result.getvalue(0, 0) + if count == "1" + # get all friend user_ids using the same query rails does for @user.friends + friend_update = @message_factory.friend_update(user_id, true) + friend_ids = gather_friends(conn, user_id) + @mq_router.publish_to_friends(friend_ids, friend_update, user_id) + end + end + end + end + + + # once a connection is known gone (whether timeout or because a TCP connection is observed lost) + # this code is responsible for all cleanup logic associated with a connection going away + def delete_connection(client_id) + + user_id = nil + music_session_id = nil + + @pg_conn.transaction do |conn| + + lock_connections(conn) + + conn.exec("DELETE FROM connections WHERE client_id = $1 RETURNING user_id, music_session_id", [client_id]) do |result| + + if result.cmd_tuples == 0 + # the client is already gone from the database... do nothing but log error + @log.error("unable to delete client #{client_id}") + return + elsif result.cmd_tuples == 1 + user_id = result[0]['user_id'] + music_session_id = result[0]['client_id'] + + else + raise Exception 'uniqueness constraint has been lost on client_id' + end + end + + + # since we did delete a row, check and see if any more connections for that user exist + # if we are down to zero, send out user gone message + conn.exec("SELECT count(user_id) FROM connections where user_id = $1", [user_id]) do |result| + count = result.getvalue(0, 0) + if count == "0" + friend_update = @message_factory.friend_update(user_id, false) + friend_ids = gather_friends(conn, user_id) + @mq_router.publish_to_friends(friend_ids, friend_update, user_id) + end + end + + # same for session-if we are down to the last participant, delete the session + unless music_session_id.nil? + conn.exec("DELETE FROM music_sessions id = $1 AND 0 = (SELECT count(music_session_id) FROM connections where music_session_id = $1)", [music_session_id]).clear + end + end + end + + def lock_connections(conn) + conn.exec("LOCK connections IN ACCESS EXCLUSIVE MODE").clear + end + + + def gather_friends(conn, user_id) + friend_ids = [] + conn.exec("SELECT f1.friend_id as friend_id FROM friendships f1 WHERE f1.user_id = $1 AND f1.friend_id IN (SELECT f2.user_id FROM friendships f2 WHERE f2.friend_id = $1)", [user_id]) do |friend_results| + friend_results.each do |friend_result| + friend_ids.push(friend_result['friend_id']) + end + end + return friend_ids + end +end \ No newline at end of file diff --git a/lib/jam_ruby/message_factory.rb b/lib/jam_ruby/message_factory.rb index 96912edee..630a9f03a 100644 --- a/lib/jam_ruby/message_factory.rb +++ b/lib/jam_ruby/message_factory.rb @@ -127,7 +127,7 @@ # create a friend update message def friend_update(user_id, online) friend = Jampb::FriendUpdate.new(:user_id => user_id, :online => online) - return Jampb::ClientMessage.new(:type => ClientMessage::Type::FRIEND_UPDATE, :target => CLIENT_TARGET, :friend_update => friend) + return Jampb::ClientMessage.new(:type => ClientMessage::Type::FRIEND_UPDATE, :route_to => CLIENT_TARGET, :friend_update => friend) end # is this message directed to the server? diff --git a/lib/jam_ruby/models/connection.rb b/lib/jam_ruby/models/connection.rb index 7d6f4382c..45755df5a 100644 --- a/lib/jam_ruby/models/connection.rb +++ b/lib/jam_ruby/models/connection.rb @@ -2,7 +2,14 @@ module JamRuby class Connection < ActiveRecord::Base self.primary_key = 'id' - belongs_to :user, :class_name => "JamRuby::User" + belongs_to :user, :class_name => "JamRuby::User" + belongs_to :music_session, :class_name => "JamRuby::MusicSession" + + # decides if a given user can access this client with p2p messaging + # the answer is yes if the user is in the same music session + def access_p2p?(user) + return self.music_session.users.exists?(user) + end end end \ No newline at end of file diff --git a/lib/jam_ruby/models/music_session.rb b/lib/jam_ruby/models/music_session.rb index ff65d19a6..bf2e35054 100644 --- a/lib/jam_ruby/models/music_session.rb +++ b/lib/jam_ruby/models/music_session.rb @@ -4,8 +4,8 @@ module JamRuby self.primary_key = 'id' belongs_to :creator, :inverse_of => :music_sessions, :class_name => "JamRuby::User", :foreign_key => "user_id" - has_many :music_session_clients, :class_name => "JamRuby::MusicSessionClient" - has_many :users, :through => :music_session_clients, :class_name => "JamRuby::User" + has_many :connections, :class_name => "JamRuby::Connection" + has_many :users, :through => :connections, :class_name => "JamRuby::User" # Verifies that the specified user can join this jam session def access?(user) diff --git a/lib/jam_ruby/models/music_session_client.rb b/lib/jam_ruby/models/music_session_client.rb deleted file mode 100644 index 5099cc14f..000000000 --- a/lib/jam_ruby/models/music_session_client.rb +++ /dev/null @@ -1,22 +0,0 @@ -# called 'participant' currently in the REST APIs - -module JamRuby - class MusicSessionClient < ActiveRecord::Base - self.primary_key = 'id' - - belongs_to :user, :class_name => "JamRuby::User" - belongs_to :music_session, :class_name => "JamRuby::MusicSession" - - validates :client_id, :presence => true - - def to_s - return "#{self.user.to_s}:#{self.ip_address}" - end - - # decides if a given user can access this client with p2p messaging - # the answer is yes if the user is in the same music session - def access_p2p?(user) - return self.music_session.users.exists?(user) - end - end -end diff --git a/lib/jam_ruby/mq_router.rb b/lib/jam_ruby/mq_router.rb index 91e808cb4..4ca06e774 100644 --- a/lib/jam_ruby/mq_router.rb +++ b/lib/jam_ruby/mq_router.rb @@ -11,6 +11,7 @@ class MQRouter @@log = Logging.logger[MQRouter] end + def access_music_session(music_session, user) if music_session.nil? @@ -27,32 +28,68 @@ class MQRouter # sends a message to a session on behalf of a user # if this is originating in the context of a client, it should be specified as :client_id => "value" # client_msg should be a well-structure message (jam-pb message) - def user_publish_to_session(music_session, user, client_msg, sender = {:client_id => "" }) + def user_publish_to_session(music_session, user, client_msg, sender = {:client_id => ""}) access_music_session(music_session, user) # gather up client_ids in the session - client_ids = music_session.music_session_clients.map {|client| client.client_id }.reject {|client_id| client_id == sender[:client_id] } + client_ids = music_session.connections.map { |client| client.client_id }.reject { |client_id| client_id == sender[:client_id] } publish_to_session(music_session.id, client_ids, client_msg.to_s, sender) end - # sends a message to a session with no checking of permissions + # sends a message to a client with no checking of permissions (RAW USAGE) # this method deliberately has no database interactivity/active_record objects - def publish_to_session(music_session_id, client_ids, client_msg, sender = {:client_id => "" }) + def publish_to_client(client_id, client_msg, sender = {:client_id => ""}) EM.schedule do - sender_client_id = sender[:client_id] + sender_client_id = sender[:client_id] - # iterate over each person in the session, and send a p2p message - client_ids.each do |client_id| - - @@log.debug "publishing to session:#{music_session_id} client:#{client_id} from client:#{sender_client_id}" - # put it on the topic exchange3 for clients - self.class.client_exchange.publish(client_msg, :routing_key => "client.#{client_id}") - end + @@log.debug "publishing to client:#{client_id} from client:#{sender_client_id}" + # put it on the topic exchange for clients + self.class.client_exchange.publish(client_msg, :routing_key => "client.#{client_id}") end end + # sends a message to a session with no checking of permissions (RAW USAGE) + # this method deliberately has no database interactivity/active_record objects + def publish_to_session(music_session_id, client_ids, client_msg, sender = {:client_id => ""}) + + EM.schedule do + sender_client_id = sender[:client_id] + + # iterate over each person in the session, and send a p2p message + client_ids.each do |client_id| + + @@log.debug "publishing to session:#{music_session_id} / client:#{client_id} from client:#{sender_client_id}" + # put it on the topic exchange for clients + self.class.client_exchange.publish(client_msg, :routing_key => "client.#{client_id}") + end + end + end + + # sends a message to a user with no checking of permissions (RAW USAGE) + # this method deliberately has no database interactivity/active_record objects + def publish_to_user(user_id, user_msg) + + EM.schedule do + @@log.debug "publishing to user:#{user_id} from server" + # put it on the topic exchange for users + self.class.client_exchange.publish(user_msg, :routing_key => "client.#{user_id}") + end + end + + # sends a message to a list of friends with no checking of permissions (RAW USAGE) + # this method deliberately has no database interactivity/active_record objects + def publish_to_friends(friend_ids, user_msg, from_user_id) + + EM.schedule do + friend_ids.each do |friend_id| + @@log.debug "publishing to friend:#{friend_id} from user #{from_user_id}" + # put it on the topic exchange for users + self.class.client_exchange.publish(user_msg, :routing_key => "user.#{friend_id}") + end + end + end end \ No newline at end of file diff --git a/spec/factories.rb b/spec/factories.rb index 4250c349a..75e7f1be2 100644 --- a/spec/factories.rb +++ b/spec/factories.rb @@ -14,6 +14,7 @@ FactoryGirl.define do sequence(:description) { |n| "Music Session #{n}" } end - factory :music_session_client, :class => JamRuby::MusicSessionClient do + factory :connection, :class => JamRuby::Connection do + end end \ No newline at end of file diff --git a/spec/jam_ruby/connection_manager_spec.rb b/spec/jam_ruby/connection_manager_spec.rb new file mode 100644 index 000000000..876987b8e --- /dev/null +++ b/spec/jam_ruby/connection_manager_spec.rb @@ -0,0 +1,148 @@ +require 'spec_helper' + +# these tests avoid the use of ActiveRecord and FactoryGirl to do blackbox, non test-instrumented tests +describe ConnectionManager do + + before do + + @conn = PG::Connection.new(:dbname => SpecDb::TEST_DB_NAME, :user => "postgres", :password => "postgres", :host => "localhost") + @connman = ConnectionManager.new(@conn) + @message_factory = MessageFactory.new + end + + def create_user(name, email) + @conn.exec("INSERT INTO users (name, email, password_digest) VALUES ($1, $2, $3) RETURNING id", [name, email, '1']) do |result| + return result.getvalue(0,0) + end + end + + it "can't create bogus user_id" do + + expect { @connman.create_connection("aeonuthaoentuh", "client_id", "1.1.1.1") }.to raise_error(PG::Error) + end + + it "can't create two client_ids of same value" do + + client_id = "client_id1" + user_id = create_user("user1", "user1@jamkazam.com") + @connman.create_connection(user_id, client_id, "1.1.1.1") + expect { @connman.create_connection(user_id, client_id, "1.1.1.1") }.to raise_error(PG::Error) + + end + + it "create connection then delete it" do + + client_id = "client_id2" + user_id = create_user("user2", "user2@jamkazam.com") + @connman.create_connection(user_id, client_id, "1.1.1.1") + + # make sure the connection is seen + @conn.exec("SELECT count(*) FROM connections where user_id = $1", [user_id]) do |result| + result.getvalue(0, 0).should == "1" + end + + @connman.delete_connection(client_id) + @conn.exec("SELECT count(*) FROM connections where user_id = $1", [user_id]) do |result| + result.getvalue(0,0).should == "0" + end + end + + it "create connection creates user joined message appropriately" do + + client_id = "client_id3" + client_id2 = "client_id3_1" + + user_id = create_user("user3", "user3@jamkazam.com") + + # we should get a message saying that this user is online + friend_update = @message_factory.friend_update(user_id, true) + @connman.mq_router.should_receive(:publish_to_friends).with([], friend_update, user_id) + + @connman.create_connection(user_id, client_id, "1.1.1.1") + + + # but a second connection from the same user should cause no such message + @connman.should_receive(:publish_to_friends).exactly(0).times + + @connman.create_connection(user_id, client_id2, "1.1.1.1") + + end + + + it "deletes connection creates user left message appropriately" do + + client_id = "client_id4" + client_id2 = "client_id4_1" + + user_id = create_user("user4", "user4@jamkazam.com") + + # we should get a message saying that this user is online + + @connman.create_connection(user_id, client_id, "1.1.1.1") + @connman.create_connection(user_id, client_id2, "1.1.1.1") + + # deleting one of the two connections should cause no messages + @connman.should_receive(:publish_to_friends).exactly(0).times + + @connman.delete_connection(client_id) + + # but deleting the final connection should cause a left message + friend_update = @message_factory.friend_update(user_id, false) + @connman.mq_router.should_receive(:publish_to_friends).with([], friend_update, user_id) + + @connman.delete_connection(client_id2) + + end + + it "lookup of friends should find mutual friends only" do + + def create_friend(user_id, friend_id) + @conn.exec("INSERT INTO friendships(user_id, friend_id) VALUES ($1, $2)", [user_id, friend_id]) + end + + def delete_friend(user_id, friend_id) + @conn.exec("DELETE FROM friendships WHERE user_id = $1 AND friend_id = $2", [user_id, friend_id]) + end + + client_id = "client_id5" + + user_id1 = create_user("user5", "user5@jamkazam.com") + user_id2 = create_user("user6", "user6@jamkazam.com") + user_id3 = create_user("user7", "user7@jamkazam.com") + + @connman.gather_friends(@conn, user_id1).should == [] + @connman.gather_friends(@conn, user_id2).should == [] + @connman.gather_friends(@conn, user_id3).should == [] + + # create one-way link + create_friend(user_id1, user_id2) + + @connman.gather_friends(@conn, user_id1).should == [] + @connman.gather_friends(@conn, user_id2).should == [] + @connman.gather_friends(@conn, user_id3).should == [] + + # create one-way link back the other way + create_friend(user_id2, user_id1) + + @connman.gather_friends(@conn, user_id1).should == [user_id2] + @connman.gather_friends(@conn, user_id2).should == [user_id1] + @connman.gather_friends(@conn, user_id3).should == [] + + # make sure a new link to user 1 > user 3 doesn't disrupt anything + create_friend(user_id1, user_id3) + + @connman.gather_friends(@conn, user_id1).should == [user_id2] + @connman.gather_friends(@conn, user_id2).should == [user_id1] + @connman.gather_friends(@conn, user_id3).should == [] + + # make sure a new link to user 1 > user 3 doesn't disrupt anything + create_friend(user_id3, user_id1) + + @connman.gather_friends(@conn, user_id1).should =~ [user_id2, user_id3] + @connman.gather_friends(@conn, user_id2).should == [user_id1] + @connman.gather_friends(@conn, user_id3).should == [user_id1] + + + end +end + diff --git a/spec/jam_ruby/models/music_session_spec.rb b/spec/jam_ruby/models/music_session_spec.rb index ad93f9558..4560a57d0 100644 --- a/spec/jam_ruby/models/music_session_spec.rb +++ b/spec/jam_ruby/models/music_session_spec.rb @@ -10,8 +10,8 @@ describe MusicSession do music_session = FactoryGirl.create(:music_session, :creator => user1) - music_session_member1 = FactoryGirl.create(:music_session_client, :user => user1, :music_session => music_session, :ip_address => "1.1.1.1", :client_id => "1") - music_session_member2 = FactoryGirl.create(:music_session_client, :user => user2, :music_session => music_session, :ip_address => "2.2.2.2", :client_id => "2") + music_session_member1 = FactoryGirl.create(:connection, :user => user1, :music_session => music_session, :ip_address => "1.1.1.1", :client_id => "1") + music_session_member2 = FactoryGirl.create(:connection, :user => user2, :music_session => music_session, :ip_address => "2.2.2.2", :client_id => "2") music_session.access?(user1).should == true music_session.access?(user2).should == true diff --git a/spec/jam_ruby/mq_router_spec.rb b/spec/jam_ruby/mq_router_spec.rb index 37bf54288..2e1b36750 100644 --- a/spec/jam_ruby/mq_router_spec.rb +++ b/spec/jam_ruby/mq_router_spec.rb @@ -13,8 +13,8 @@ describe MQRouter do music_session = FactoryGirl.create(:music_session, :creator => user1) - music_session_member1 = FactoryGirl.create(:music_session_client, :user => user1, :music_session => music_session, :ip_address => "1.1.1.1", :client_id => "1") - music_session_member2 = FactoryGirl.create(:music_session_client, :user => user2, :music_session => music_session, :ip_address => "2.2.2.2", :client_id => "2") + music_session_member1 = FactoryGirl.create(:connection, :user => user1, :music_session => music_session, :ip_address => "1.1.1.1", :client_id => "1") + music_session_member2 = FactoryGirl.create(:connection, :user => user2, :music_session => music_session, :ip_address => "2.2.2.2", :client_id => "2") @mq_router.should_receive(:publish_to_session).with(music_session.id, [music_session_member2.client_id], "a message", :client_id => music_session_member1.client_id) @@ -28,8 +28,8 @@ describe MQRouter do music_session = FactoryGirl.create(:music_session, :creator => user1) - music_session_member1 = FactoryGirl.create(:music_session_client, :user => user1, :music_session => music_session, :ip_address => "1.1.1.1", :client_id => "1") - music_session_member2 = FactoryGirl.create(:music_session_client, :user => user2, :music_session => music_session, :ip_address => "2.2.2.2", :client_id => "2") + music_session_member1 = FactoryGirl.create(:connection, :user => user1, :music_session => music_session, :ip_address => "1.1.1.1", :client_id => "1") + music_session_member2 = FactoryGirl.create(:connection, :user => user2, :music_session => music_session, :ip_address => "2.2.2.2", :client_id => "2") EM.run do @@ -41,7 +41,6 @@ describe MQRouter do @mq_router.user_publish_to_session(music_session, user1, "a message", :client_id => music_session_member1.client_id) EM.stop - end end diff --git a/spec/spec_db.rb b/spec/spec_db.rb index 275a1b485..b825973f5 100644 --- a/spec/spec_db.rb +++ b/spec/spec_db.rb @@ -2,6 +2,7 @@ class SpecDb TEST_DB_NAME="jam_ruby_test" + TEST_USER_ID = "1" #test@jamkazam.com def self.recreate_database conn = PG::Connection.open("dbname=postgres user=postgres password=postgres host=localhost") conn.exec("DROP DATABASE IF EXISTS #{TEST_DB_NAME}") diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index b794c7707..bef938bf1 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -5,6 +5,7 @@ require 'spec_db' # recreate test database and migrate it SpecDb::recreate_database + # initialize ActiveRecord's db connection ActiveRecord::Base.establish_connection(YAML::load(File.open('config/database.yml'))["test"]) @@ -70,5 +71,4 @@ end Spork.each_run do # This code will be run each time you run your specs. - end \ No newline at end of file