646 lines
20 KiB
Ruby
646 lines
20 KiB
Ruby
require 'pry'
|
|
require 'set'
|
|
require 'hot_bunnies'
|
|
require 'thread'
|
|
require 'json'
|
|
require 'eventmachine'
|
|
|
|
import java.util.concurrent.Executors
|
|
|
|
include Jampb
|
|
|
|
# add new field to client connection
|
|
module EventMachine
|
|
module WebSocket
|
|
class Connection < EventMachine::Connection
|
|
attr_accessor :encode_json, :client_id # client_id is uuid we give to each client to track them as we like
|
|
end
|
|
end
|
|
end
|
|
|
|
module JamWebsockets
|
|
|
|
class Router
|
|
|
|
attr_accessor :user_context_lookup
|
|
|
|
def initialize(options={})
|
|
@log = Logging.logger[self]
|
|
@pending_clients = Set.new # clients that have connected to server, but not logged in.
|
|
@clients = {} # clients that have logged in
|
|
@user_context_lookup = {} # lookup a set of client_contexts by user_id
|
|
@client_lookup = {} # lookup a client by client_id
|
|
@connection = nil
|
|
@channel = nil
|
|
@users_exchange = nil
|
|
@message_factory = JamRuby::MessageFactory.new
|
|
@semaphore = Mutex.new
|
|
@user_topic = nil
|
|
@user_subscription = nil
|
|
@client_topic = nil
|
|
@client_subscription = nil
|
|
@thread_pool = nil
|
|
end
|
|
|
|
def start(options = {})
|
|
|
|
@log.info "startup"
|
|
|
|
begin
|
|
@thread_pool = Executors.new_fixed_thread_pool(8)
|
|
@connection = HotBunnies.connect(:host => options[:host], :port => options[:port])
|
|
@channel = @connection.create_channel
|
|
@channel.prefetch = 10
|
|
|
|
register_topics
|
|
rescue => e
|
|
cleanup
|
|
raise e
|
|
end
|
|
|
|
end
|
|
|
|
def add_client(client_id, client)
|
|
|
|
# should never occur
|
|
if @client_lookup.has_key?(client_id)
|
|
@log.warn "client_id #{client_id} connected while the old connection has not yet been terminated"
|
|
end
|
|
|
|
@client_lookup[client_id] = client
|
|
end
|
|
|
|
def remove_client(client_id, client)
|
|
deleted = @client_lookup.delete(client_id)
|
|
|
|
if deleted.nil?
|
|
@log.warn "unable to delete #{client_id} from client_lookup"
|
|
elsif deleted != client
|
|
# put it back--this is only possible if add_client hit the 'old connection' path
|
|
# so in other words if this happens:
|
|
# add_client(1, clientX)
|
|
# add_client(1, clientY) # but clientX is essentially defunct - this could happen due to a bug in client, or EM doesn't notify always of connection close in time
|
|
# remove_client(1, clientX) -- this check maintains that clientY stays as the current client in the hash
|
|
@client_lookup[client_id] = client
|
|
@log.debug "putting back client into @client_lookup for #{client_id} #{client.inspect}"
|
|
else
|
|
@log.debug "cleaned up @client_lookup for #{client_id}"
|
|
end
|
|
|
|
end
|
|
|
|
def add_user(context)
|
|
user_contexts = @user_context_lookup[context.user.id]
|
|
if user_contexts.nil?
|
|
user_contexts = Set.new
|
|
@user_context_lookup[context.user.id] = user_contexts
|
|
end
|
|
|
|
user_contexts.add(context)
|
|
end
|
|
|
|
def remove_user(client_context)
|
|
user_contexts = @user_context_lookup[client_context.user.id]
|
|
|
|
if user_contexts.nil?
|
|
@log.warn "user can not be removed #{client_context}"
|
|
else
|
|
# delete the context from set of user contexts
|
|
user_contexts.delete(client_context)
|
|
|
|
# if last user context, delete entire set (memory leak concern)
|
|
if user_contexts.length == 0
|
|
@user_context_lookup.delete(client_context.user.id)
|
|
end
|
|
|
|
client_context.user = nil
|
|
end
|
|
end
|
|
|
|
# register topic for user messages and session messages
|
|
def register_topics
|
|
|
|
######################## USER MESSAGING ###########################
|
|
|
|
# create user exchange
|
|
@users_exchange = @channel.exchange('users', :type => :topic)
|
|
# create user messaging topic
|
|
@user_topic = @channel.queue("", :auto_delete => true)
|
|
@user_topic.bind(@users_exchange, :routing_key => "user.#")
|
|
@user_topic.purge
|
|
|
|
# subscribe for any messages to users
|
|
@user_subscription = @user_topic.subscribe(:ack => false)
|
|
|
|
# this code serves as a callback that dequeues messages and processes them
|
|
@user_subscription.each(:blocking => false, :executor => @threadpool) do |headers, msg|
|
|
begin
|
|
routing_key = headers.envelope.routing_key
|
|
user_id = routing_key["user.".length..-1]
|
|
|
|
@semaphore.synchronize do
|
|
contexts = @user_context_lookup[user_id]
|
|
|
|
if !contexts.nil?
|
|
|
|
@log.debug "received user-directed message for user: #{user_id}"
|
|
|
|
msg = Jampb::ClientMessage.parse(msg)
|
|
|
|
contexts.each do |context|
|
|
EM.schedule do
|
|
@log.debug "sending user message to #{context}"
|
|
send_to_client(context.client, msg)
|
|
end
|
|
end
|
|
else
|
|
@log.debug "Context is null"
|
|
end
|
|
end
|
|
|
|
rescue => e
|
|
@log.error "unhandled error in messaging to client"
|
|
@log.error e
|
|
end
|
|
end
|
|
|
|
############## CLIENT MESSAGING ###################
|
|
|
|
@clients_exchange = @channel.exchange('clients', :type => :topic)
|
|
|
|
@client_topic = @channel.queue("", :auto_delete => true)
|
|
@client_topic.bind(@clients_exchange, :routing_key => "client.#")
|
|
@client_topic.purge
|
|
|
|
# subscribe for any p2p messages to a client
|
|
@client_subscription = @client_topic.subscribe(:ack => false)
|
|
@client_subscription.each(:blocking => false, :executor => @threadpool) do |headers, msg|
|
|
begin
|
|
routing_key = headers.envelope.routing_key
|
|
client_id = routing_key["client.".length..-1]
|
|
@semaphore.synchronize do
|
|
client = @client_lookup[client_id]
|
|
|
|
msg = Jampb::ClientMessage.parse(msg)
|
|
|
|
@log.debug "p2p message received from #{msg.from} to client #{client_id}"
|
|
|
|
unless client.nil?
|
|
|
|
EM.schedule do
|
|
@log.debug "sending p2p message to #{client_id}"
|
|
send_to_client(client, msg)
|
|
end
|
|
else
|
|
@log.debug "p2p message unroutable to disconnected client #{client_id}"
|
|
end
|
|
end
|
|
rescue => e
|
|
@log.error "unhandled error in messaging to client"
|
|
@log.error e
|
|
end
|
|
end
|
|
end
|
|
|
|
|
|
def new_client(client)
|
|
|
|
@semaphore.synchronize do
|
|
@pending_clients.add(client)
|
|
end
|
|
|
|
# default to using json instead of pb
|
|
client.encode_json = true
|
|
|
|
client.onopen {
|
|
#binding.pry
|
|
@log.debug "client connected #{client}"
|
|
|
|
# check for '?pb' or '?pb=true' in url query parameters
|
|
query_pb = client.request["query"]["pb"]
|
|
|
|
if !query_pb.nil? && (query_pb == "" || query_pb == "true")
|
|
client.encode_json = false
|
|
end
|
|
|
|
}
|
|
|
|
client.onclose {
|
|
@log.debug "Connection closed"
|
|
|
|
cleanup_client(client)
|
|
}
|
|
|
|
client.onerror { |error|
|
|
if error.kind_of?(EM::WebSocket::WebSocketError)
|
|
@log.error "websockets error: #{error}"
|
|
else
|
|
@log.error "generic error: #{error} #{error.backtrace}"
|
|
end
|
|
|
|
cleanup_client(client)
|
|
client.close_websocket
|
|
}
|
|
|
|
client.onmessage { |msg|
|
|
@log.debug("msg received")
|
|
|
|
# TODO: set a max message size before we put it through PB?
|
|
# TODO: rate limit?
|
|
|
|
pb_msg = nil
|
|
|
|
begin
|
|
if client.encode_json
|
|
#example: {"type":"LOGIN", "target":"server", "login" : {"username":"hi"}}
|
|
parse = JSON.parse(msg)
|
|
pb_msg = Jampb::ClientMessage.json_create(parse)
|
|
self.route(pb_msg, client)
|
|
else
|
|
pb_msg = Jampb::ClientMessage.parse(msg.to_s)
|
|
self.route(pb_msg, client)
|
|
end
|
|
rescue SessionError => e
|
|
@log.info "ending client session deliberately due to malformed client behavior. reason=#{e}"
|
|
begin
|
|
# wrap the message up and send it down
|
|
error_msg = @message_factory.server_rejection_error(e.to_s)
|
|
send_to_client(client, error_msg)
|
|
ensure
|
|
client.close_websocket
|
|
cleanup_client(client)
|
|
end
|
|
rescue PermissionError => e
|
|
@log.info "permission error. reason=#{e.to_s}"
|
|
@log.info e
|
|
|
|
# wrap the message up and send it down
|
|
error_msg = @message_factory.server_permission_error(pb_msg.message_id, e.to_s)
|
|
send_to_client(client, error_msg)
|
|
rescue => e
|
|
@log.error "ending client session due to server programming or runtime error. reason=#{e.to_s}"
|
|
@log.error e
|
|
|
|
begin
|
|
# wrap the message up and send it down
|
|
error_msg = @message_factory.server_generic_error(e.to_s)
|
|
send_to_client(client, error_msg)
|
|
ensure
|
|
client.close_websocket
|
|
cleanup_client(client)
|
|
end
|
|
end
|
|
}
|
|
end
|
|
|
|
|
|
def send_to_client(client, msg)
|
|
@log.debug "SEND TO CLIENT START"
|
|
if client.encode_json
|
|
client.send(msg.to_json.to_s)
|
|
else
|
|
# this is so odd that this is necessary from an API perspective. but searching through the source code... it's all I could find in em-websocket for allowing a binary message to be sent
|
|
client.instance_variable_get(:@handler).send_frame(:binary, msg.to_s)
|
|
end
|
|
@log.debug "SEND TO CLIENT STOP"
|
|
end
|
|
|
|
def cleanup()
|
|
# shutdown topic listeners and mq connection
|
|
begin
|
|
if !@user_subscription.nil? && @user_subscription.active?
|
|
@log.debug "cleaning up user subscription"
|
|
@user_subscription.cancel
|
|
@user_subscription.shutdown!
|
|
end
|
|
|
|
if !@client_subscription.nil? && @client_subscription.active?
|
|
@log.debug "cleaning up client subscription"
|
|
@client_subscription.cancel
|
|
@client_subscription.shutdown!
|
|
end
|
|
|
|
rescue => e
|
|
@log.debug "unable to cancel subscription on cleanup: #{e}"
|
|
end
|
|
|
|
@thread_pool.shutdown
|
|
|
|
if !@channel.nil?
|
|
@channel.close
|
|
end
|
|
|
|
if !@connection.nil?
|
|
@connection.close
|
|
end
|
|
|
|
# tear down each individual client
|
|
@clients.each do |client, context|
|
|
cleanup_client(client)
|
|
end
|
|
end
|
|
|
|
def stop
|
|
@log.info "shutdown"
|
|
cleanup
|
|
end
|
|
|
|
# removes all resources associated with a client
|
|
def cleanup_client(client)
|
|
|
|
@semaphore.synchronize do
|
|
pending = @pending_clients.delete?(client)
|
|
|
|
if !pending.nil?
|
|
@log.debug "cleaning up pending client #{client}"
|
|
else
|
|
|
|
@log.debug "cleanup up logged-in client #{client}"
|
|
|
|
remove_client(client.client_id, client)
|
|
|
|
context = @clients.delete(client)
|
|
|
|
if !context.nil?
|
|
|
|
# remove this connection from the database
|
|
if !context.user.nil? && !context.client.nil?
|
|
JamRuby::Connection.delete_all "user_id = '#{context.user.id}' AND client_id = '#{context.client.client_id}'"
|
|
end
|
|
|
|
send_friend_update(context.user, false, context.client)
|
|
|
|
remove_user(context)
|
|
else
|
|
@log.debug "skipping duplicate cleanup attempt of logged-in client"
|
|
end
|
|
|
|
end
|
|
end
|
|
end
|
|
|
|
def route(client_msg, client)
|
|
message_type = @message_factory.get_message_type(client_msg)
|
|
|
|
raise SessionError, "unknown message type received: #{client_msg.type}" if message_type.nil?
|
|
|
|
@log.debug("msg received #{message_type}")
|
|
|
|
raise SessionError, 'client_msg.route_to is null' if client_msg.route_to.nil?
|
|
|
|
if @pending_clients.include? client and client_msg.type != ClientMessage::Type::LOGIN
|
|
# this client has not logged in and is trying to send a non-login message
|
|
raise SessionError, "must 'Login' first"
|
|
end
|
|
|
|
if @message_factory.server_directed? client_msg
|
|
|
|
handle_server_directed(client_msg, client)
|
|
|
|
elsif @message_factory.client_directed? client_msg
|
|
|
|
to_client_id = client_msg.route_to[MessageFactory::CLIENT_TARGET_PREFIX.length..-1]
|
|
handle_client_directed(to_client_id, client_msg, client)
|
|
|
|
elsif @message_factory.session_directed? client_msg
|
|
|
|
session_id = client_msg.target[MessageFactory::SESSION_TARGET_PREFIX.length..-1]
|
|
handle_session_directed(session_id, client_msg, client)
|
|
|
|
elsif @message_factory.user_directed? client_msg
|
|
|
|
user_id = client_msg.target[MessageFactory::USER_PREFIX_TARGET.length..-1]
|
|
handle_user_directed(user_id, client_msg, client)
|
|
|
|
else
|
|
raise SessionError, "client_msg.route_to is unknown type: #{client_msg.route_to}"
|
|
end
|
|
|
|
end
|
|
|
|
def handle_server_directed(client_msg, client)
|
|
|
|
if client_msg.type == ClientMessage::Type::LOGIN
|
|
|
|
handle_login(client_msg.login, client)
|
|
|
|
elsif client_msg.type == ClientMessage::Type::HEARTBEAT
|
|
|
|
handle_heartbeat(client_msg.heartbeat, client)
|
|
|
|
else
|
|
raise SessionError, "unknown message type '#{client_msg.type}' for #{client_msg.route_to}-directed message"
|
|
end
|
|
end
|
|
|
|
def handle_login(login, client)
|
|
|
|
username = login.username if login.value_for_tag(1)
|
|
password = login.password if login.value_for_tag(2)
|
|
token = login.token if login.value_for_tag(3)
|
|
client_id = login.client_id if login.value_for_tag(4)
|
|
|
|
# you don't have to supply client_id in login--if you don't, we'll generate one
|
|
if client_id.nil? || client_id.empty?
|
|
# give a unique ID to this client. This is used to prevent session messages
|
|
# from echoing back to the sender, for instance.
|
|
client_id = UUIDTools::UUID.random_create.to_s
|
|
end
|
|
|
|
client.client_id = client_id
|
|
|
|
user = valid_login(username, password, token, client_id)
|
|
|
|
if !user.nil?
|
|
|
|
@log.debug "user #{user.email} logged in"
|
|
|
|
# respond with LOGIN_ACK to let client know it was successful
|
|
#binding.pry
|
|
remote_port, remote_ip = Socket.unpack_sockaddr_in(client.get_peername)
|
|
login_ack = @message_factory.login_ack(remote_ip, client_id, user.remember_token)
|
|
send_to_client(client, login_ack)
|
|
|
|
@semaphore.synchronize do
|
|
# remove from pending_queue
|
|
@pending_clients.delete(client)
|
|
|
|
# add a tracker for this user
|
|
context = ClientContext.new(user, client)
|
|
@clients[client] = context
|
|
add_user(context)
|
|
add_client(client_id, client) # TODO
|
|
|
|
# log this connection in the database
|
|
connection = JamRuby::Connection.new(:user => user, :client_id => client.client_id)
|
|
@log.debug "Created connection => #{connection.user}, #{connection.client_id}"
|
|
|
|
if connection.save
|
|
send_friend_update(user, true, context.client)
|
|
end
|
|
end
|
|
else
|
|
raise SessionError, 'invalid login'
|
|
end
|
|
end
|
|
|
|
def send_friend_update(user, online, client)
|
|
@log.debug "sending friend update for user #{user} online = #{online}"
|
|
|
|
if !user.nil? && user.friends.exists?
|
|
@log.debug "user has friends - sending friend updates"
|
|
|
|
# create the friend_update message
|
|
friend_update_msg = @message_factory.friend_update(user.id, online)
|
|
|
|
# send the friend_update to each friend that has active connections
|
|
user.friends.each do |friend|
|
|
@log.debug "sending friend update message to #{friend}"
|
|
|
|
handle_user_directed(friend.id, friend_update_msg, client)
|
|
end
|
|
end
|
|
end
|
|
|
|
def handle_heartbeat(heartbeat, client)
|
|
context = @clients[client]
|
|
@log.debug "updating timestamp for user #{context}"
|
|
connection = Connection.find_by_user_id_and_client_id(context.user.id, context.client.client_id)
|
|
|
|
unless connection.nil?
|
|
connection.updated_at = DateTime.now
|
|
connection.save
|
|
end
|
|
end
|
|
|
|
def valid_login(username, password, token, client_id)
|
|
|
|
if !token.nil? && token != ''
|
|
@log.debug "logging in via token"
|
|
# attempt login with token
|
|
user = User.find_by_remember_token(token)
|
|
|
|
if user.nil?
|
|
@log.debug "no user found with token"
|
|
return false
|
|
else
|
|
@log.debug "#{user} login via token"
|
|
return user
|
|
end
|
|
|
|
elsif !username.nil? and !password.nil?
|
|
|
|
@log.debug "logging in via user/pass '#{username}' '#{password}'"
|
|
# attempt login with username and password
|
|
user = User.find_by_email(username)
|
|
|
|
if !user.nil? && user.authenticate(password)
|
|
@log.debug "#{user} login via password"
|
|
return user
|
|
else
|
|
@log.debug "#{username} login failure"
|
|
return nil
|
|
end
|
|
else
|
|
raise SessionError, 'no login data was found in Login message'
|
|
end
|
|
end
|
|
|
|
def access_music_session(music_session_id, user)
|
|
music_session = MusicSession.find_by_id(music_session_id)
|
|
|
|
if music_session.nil?
|
|
raise SessionError, 'specified session not found'
|
|
end
|
|
|
|
if !music_session.access? user
|
|
raise SessionError, 'not allowed to join the specified session'
|
|
end
|
|
|
|
return music_session
|
|
end
|
|
|
|
# client_id = the id of the client being accessed
|
|
# client = the current client
|
|
def access_p2p(client_id, user, msg)
|
|
|
|
# ping_request and ping_ack messages are special in that they are simply allowed
|
|
if msg.type == ClientMessage::Type::PING_REQUEST || msg.type == ClientMessage::Type::PING_ACK
|
|
return nil
|
|
end
|
|
|
|
music_session_client = MusicSessionClient.find_by_client_id(client_id)
|
|
|
|
if music_session_client.nil?
|
|
raise PermissionError, 'specified client not found'
|
|
end
|
|
|
|
if !music_session_client.access_p2p? user
|
|
raise SessionError, 'not allowed to message this client'
|
|
end
|
|
end
|
|
|
|
|
|
def handle_client_directed(to_client_id, client_msg, client)
|
|
context = @clients[client]
|
|
|
|
# by not catching any exception here, a PermissionError will be thrown if this isn't valid
|
|
# if for some reason the client is trying to send to a client that it doesn't
|
|
# belong to
|
|
access_p2p(to_client_id, context.user, client_msg)
|
|
|
|
# populate routing data
|
|
client_msg.from = client.client_id
|
|
|
|
@log.debug "publishing to client #{to_client_id} from client_id #{client.client_id}"
|
|
|
|
# put it on the topic exchange for clients
|
|
@clients_exchange.publish(client_msg.to_s, :routing_key => "client.#{to_client_id}", :properties => {:headers => {"client_id" => client.client_id}})
|
|
end
|
|
|
|
def handle_user_directed(user_id, client_msg, client)
|
|
|
|
@log.debug "publishing to user #{user_id} from client_id #{client.client_id}"
|
|
|
|
# put it on the topic exchange for users
|
|
@users_exchange.publish(client_msg.to_s, :routing_key => "user.#{user_id}")
|
|
end
|
|
|
|
def handle_session_directed(session_id, client_msg, client)
|
|
context = @clients[client]
|
|
|
|
user_publish_to_session(session_id, context.user, client_msg, :client_id => client.client_id)
|
|
end
|
|
|
|
# sends a message to a session on behalf of a user
|
|
# if this is originating in the context of a client, it should be specified as :client_id => "value"
|
|
# client_msg should be a well-structure message (jam-pb message)
|
|
def user_publish_to_session(music_session_id, user, client_msg, sender = {:client_id => ""})
|
|
music_session = access_music_session(music_session_id, user)
|
|
|
|
# gather up client_ids in the session
|
|
client_ids = music_session.music_session_clients.map { |client| client.client_id }.reject { |client_id| client_id == sender[:client_id] }
|
|
|
|
publish_to_session(music_session.id, client_ids, client_msg.to_s, sender)
|
|
end
|
|
|
|
|
|
# sends a message to a session with no checking of permissions
|
|
# this method deliberately has no database interactivity/active_record objects
|
|
def publish_to_session(music_session_id, client_ids, client_msg, sender = {:client_id => ""})
|
|
|
|
EM.schedule do
|
|
sender_client_id = sender[:client_id]
|
|
|
|
# iterate over each person in the session, and send a p2p message
|
|
client_ids.each do |client_id|
|
|
|
|
@@log.debug "publishing to session:#{music_session_id} client:#{client_id} from client:#{sender_client_id}"
|
|
# put it on the topic exchange3 for clients
|
|
self.class.client_exchange.publish(client_msg, :routing_key => "client.#{music_session_id}")
|
|
end
|
|
end
|
|
end
|
|
end
|
|
end
|