jam-cloud/lib/jam_websockets/router.rb

365 lines
9.3 KiB
Ruby
Raw Normal View History

require 'set'
require 'hot_bunnies'
include Jampb
module JamWebsockets
class Router
def initialize(options={})
@log = Logging.logger[self]
@pending_clients = Set.new # clients that have connected to server, but not logged in.
@clients = {} # clients that have logged in
@sessions = nil
@connection = nil
@channel = nil
@endpoints = nil
@message_factory = JamRuby::MessageFactory.new
end
def start(options = {})
@log.debug "startup"
begin
@connection = HotBunnies.connect(:host => options[:host], :port => options[:port])
@channel = @connection.create_channel
@channel.prefetch = 10
@endpoints = @channel.exchange('client_endpoints', :type => :direct)
@sessions = @channel.exchange('client_sessions', :type => :topic)
rescue => e
cleanup
raise e
end
end
def cleanup()
@clients.each do |client, context|
cleanup_client(client)
#context.user_queue.unbind(@endpoints)
end
if !@channel.nil?
@channel.close
end
if !@connection.nil?
@connection.close
end
end
def stop
@log.debug "shutdown"
cleanup
end
def new_client(client)
@pending_clients.add(client)
client.onopen {
@log.debug "client connected #{client}"
}
client.onclose {
@log.debug "Connection closed"
cleanup_client(client)
}
client.onerror { |error|
if error.kind_of?(EM::WebSocket::WebSocketError)
@log.error "websockets error: #{error}"
else
@log.error "generic error: #{error} #{error.backtrace}"
end
cleanup_client(client)
}
client.onmessage { |msg|
@log.debug("msg received")
# TODO: set a max message size before we put it through PB?
# TODO: rate limit?
begin
pb_msg = Jampb::ClientMessage.parse(msg.to_s)
self.route(pb_msg, client)
rescue => e
@log.debug "ending client session due to error: #{e.to_s}"
@log.debug e
begin
# wrap the message up and send it down
client.send(@message_factory.server_generic_error(e.to_s).to_s)
ensure
cleanup_client(client)
end
end
}
end
def cleanup_client(client)
@log.debug "cleaning up client #{client}"
begin
@pending_clients.delete(client)
context = @clients.delete(client)
if !context.nil?
begin
if context.subscription.active?
@log.debug "cleaning up user subscription"
context.subscription.shutdown!
end
if !context.session_topic.nil? && context.session_topic.active?
@log.debug "cleaning up session subscription"
context.session_topic.shutdown!
end
rescue => e
@log.debug "unable to cancel subscription on cleanup: #{e}"
end
mark_context_for_deletion(context)
end
ensure
client.close_websocket
end
end
# we want to eventually not do all the work here until a grace reconnect period has elasped
def mark_context_for_deletion(context)
# TODO handle notifies to sessions and friends about disconnect
end
def extract_inner_message(client_msg)
msg = client_msg.value_for_tag(client_msg.type)
if msg.nil?
raise "inner message is null. type: #{client_msg.type}, target: #{client_msg.target}"
end
return msg
end
def route(client_msg, client)
@log.debug("msg #{client_msg.type} #{client_msg.target}")
if client_msg.target.nil?
raise 'client_msg.target is null'
end
if @pending_clients.include? client and client_msg.type != ClientMessage::Type::LOGIN
# this client has not logged in and is trying to send a non-login message
raise "must 'Login' first"
end
if @message_factory.server_directed? client_msg
handle_server_directed(client_msg, client)
elsif @message_factory.session_directed? client_msg
session = client_msg.target[MessageFactory::SESSION_TARGET_PREFIX.length..-1]
handle_session_directed(session, client_msg, client)
elsif @message_factory.user_directed? client_msg
user = client_msg.target[MessageFactory::USER_PREFIX_TARGET.length..-1]
handle_user_directed(user, client_msg, client)
else
raise "client_msg.target is unknown type: #{client_msg.target}"
end
end
def handle_server_directed(client_msg, client)
type, inner_msg = extract_inner_message(client_msg)
if client_msg.type == ClientMessage::Type::LOGIN
handle_login(client_msg.login, client)
elsif client_msg.type == ClientMessage::Type::HEARTBEAT
handle_heartbeat(client_msg.heartbeat, client)
elsif client_msg.type == ClientMessage::Type::LOGIN_JAM_SESSION
handle_join_jam_session(client_msg.login_jam_session, client)
elsif client_msg.type == ClientMessage::Type::LEAVE_JAM_SESSION
handle_leave_jam_session(client_msg.leave_jam_session, client)
else
raise "unknown message type '#{client_msg.type}' for #{client_msg.target}-directed message"
end
end
def handle_login(login, client)
username = login.username
token = login.token
password = login.password
user = valid_login(username, password, token)
if !user.nil?
@log.debug "user #{user.email} logged in"
# create a queue for just this user
queue = @channel.queue(user.id)
queue.bind(@endpoints, :routing_key => user.id)
queue.purge
# TODO: alert friends
# subscribe for any messages to self
subscription = queue.subscribe(:ack => true, :blocking => false) do |headers, msg|
client.send(msg)
headers.ack
end
# respond with LOGIN_ACK to let client know it was successful
client.send(@message_factory.login_ack(client.request["origin"]).to_s)
# remove from pending_queue
@pending_clients.delete(client)
# add a tracker for this user
context = ClientContext.new(user, queue, subscription)
@clients[client] = context
else
raise 'invalid login'
end
end
def handle_heartbeat(heartbeat, client)
# todo
end
def handle_join_jam_session(join_jam_session, client)
# verify that the current user has the rights to actually join the jam session
context = @clients[client]
session_id = join_jam_session.jam_session;
begin
access_jam_session?(session_id, context.user)
rescue => e
# send back a failure ack and bail
@log.debug "client requested non-existent session. client:#{client.request['origin']} user:#{context.user.email}"
client.send(@message_factory.login_jam_session_ack(true, e.to_s).to_s)
return
end
topic = @channel.queue(session_id)
topic.bind(@sessions, :routing_key => session_id)
topic.purge
# subscribe for any messages to session
subscription = topic.subscribe(:ack => false, :blocking => false) do |headers, msg|
client.send(msg)
#headers.ack
end
old_session = context.session_topic
if !old_session.nil? && old_session.active?
# remove subscription to previous session
@log.debug "auto-removing user from previous session"
old_session.shutdown!
end
context.session_topic = subscription
# respond with LOGIN_JAM_SESSION_ACK to let client know it was successful
client.send(@message_factory.login_jam_session_ack(false, nil))
# send 'new client' message
end
end
def handle_leave_jam_session(leave_jam_session, client)
context = @clients[client]
raise 'unsupported'
end
def valid_login(username, password, token)
if !username.nil? and !password.nil?
# attempt login with username and password
user = User.find_by_email(username)
if !user.nil? && user.authenticate(password)
@log.debug "#{username} login via password"
return user
else
@log.debug "#{username} login failure"
return nil
end
elsif !token.nil?
# attempt login with token
user = User.find_by_remember_token(token)
if user.nil?
@log.debug "no user found with token"
return false
else
@log.debug "#{username} login via token"
return nil
end
else
raise 'no login data was found in Login message'
end
end
def access_jam_session?(jam_session_id, user)
jam_session = JamSession.find_by_id(jam_session_id)
if jam_session.nil?
raise 'specified session not found'
end
if !jam_session.access? user
raise 'not allowed to join the specified session'
end
return jam_session
end
def handle_session_directed(session, client_msg, client)
type, inner_msg = extract_inner_message(client_msg)
context = @clients[client]
# by not catching any exception here, this will kill the connection
# if for some reason the client is trying to send to a session that it doesn't
# belong to
access_jam_session?(session, context.user)
# put it on the topic exchange for sessions
@sessions.publish(client_msg.to_s, :routing_key => session)
end
def handle_user_directed(user, client_msg, client)
type, inner_msg = extract_inner_message(client_msg)
end
end