more changes for user presence
This commit is contained in:
parent
8451f8653a
commit
cbb9508b6b
|
|
@ -1,4 +1,4 @@
|
|||
module JamWebsockets
|
||||
module JamWebsockets
|
||||
class ClientContext
|
||||
|
||||
attr_accessor :user, :client, :msg_count, :session
|
||||
|
|
|
|||
|
|
@ -20,7 +20,6 @@ end
|
|||
|
||||
module JamWebsockets
|
||||
|
||||
|
||||
class Router
|
||||
|
||||
attr_accessor :user_context_lookup, :session_context_lookup
|
||||
|
|
@ -62,75 +61,24 @@ module JamWebsockets
|
|||
|
||||
end
|
||||
|
||||
def add_user(context)
|
||||
user_contexts = @user_context_lookup[context.user.id]
|
||||
if user_contexts.nil?
|
||||
user_contexts = Set.new
|
||||
@user_context_lookup[context.user.id] = user_contexts
|
||||
end
|
||||
|
||||
user_contexts.add(context)
|
||||
end
|
||||
|
||||
def remove_user(context)
|
||||
user_contexts = @user_context_lookup[context.user.id]
|
||||
if user_contexts.nil?
|
||||
@log.warn "user can not be removed #{context}"
|
||||
else
|
||||
# delete the context from set of user contexts
|
||||
user_contexts.delete(context)
|
||||
|
||||
# if last user context, delete entire set (memory leak concern)
|
||||
if user_contexts.length == 0
|
||||
@user_context_lookup.delete(context.user.id)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def add_session(context)
|
||||
session_contexts = @session_context_lookup[context.session.id]
|
||||
if session_contexts.nil?
|
||||
session_contexts = Set.new
|
||||
@session_context_lookup[context.session.id] = session_contexts
|
||||
end
|
||||
|
||||
session_contexts.add(context)
|
||||
end
|
||||
|
||||
def remove_session(context)
|
||||
session_contexts = @session_context_lookup[context.session.id]
|
||||
if session_contexts.nil?
|
||||
@log.warn "session can not be removed #{context}"
|
||||
else
|
||||
# delete the context from set of session contexts
|
||||
session_contexts.delete(context)
|
||||
|
||||
# if last session context, delete entire set (memory leak concern)
|
||||
if session_contexts.length == 0
|
||||
@session_context_lookup.delete(context.session.id)
|
||||
end
|
||||
|
||||
context.session = nil
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
# register topic for user messages and session messages
|
||||
def register_topics
|
||||
|
||||
######################## USER MESSAGING ###########################
|
||||
|
||||
# create user exchange
|
||||
@users_exchange = @channel.exchange('users', :type => :topic)
|
||||
@sessions_exchange = @channel.exchange('sessions', :type => :topic)
|
||||
|
||||
# create user messaging topic
|
||||
@user_topic = @channel.queue("", :auto_delete => true)
|
||||
@user_topic.bind(@users_exchange, :routing_key => "user.#")
|
||||
@user_topic.purge
|
||||
|
||||
# TODO: alert friends
|
||||
|
||||
# subscribe for any messages to users
|
||||
|
||||
# subscribe for any messages to users
|
||||
#@user_subscription = @user_topic.subscribe(:ack => false, :blocking => false, :executor => @threadpool) do |headers, msg|
|
||||
@user_subscription = @user_topic.subscribe(:ack => false)
|
||||
|
||||
# this code serves as a callback that dequeues messages and processes them
|
||||
@user_subscription.each(:blocking => false, :executor => @threadpool) do |headers, msg|
|
||||
begin
|
||||
routing_key = headers.envelope.routing_key
|
||||
|
|
@ -140,7 +88,7 @@ module JamWebsockets
|
|||
|
||||
unless contexts.nil?
|
||||
|
||||
@log.debug "received user-directed message for session: #{user_id}"
|
||||
@log.debug "received user-directed message for user: #{user_id}"
|
||||
|
||||
msg = Jampb::ClientMessage.parse(msg)
|
||||
contexts.each do |context|
|
||||
|
|
@ -157,6 +105,12 @@ module JamWebsockets
|
|||
end
|
||||
end
|
||||
|
||||
######################## SESSION MESSAGING ###########################
|
||||
|
||||
# create session exchange
|
||||
@sessions_exchange = @channel.exchange('sessions', :type => :topic)
|
||||
|
||||
# create session messaging topic
|
||||
@session_topic = @channel.queue("", :auto_delete => true)
|
||||
@session_topic.bind(@sessions_exchange, :routing_key => "session.#")
|
||||
@session_topic.purge
|
||||
|
|
@ -164,6 +118,8 @@ module JamWebsockets
|
|||
# subscribe for any messages to session
|
||||
#@session_subscription = @session_topic.subscribe(:ack => false, :blocking => false) do |headers, msg|
|
||||
@session_subscription = @session_topic.subscribe(:ack => false)
|
||||
|
||||
# this code serves as a callback that dequeues messages and processes them
|
||||
@session_subscription.each(:blocking => false, :executor => @threadpool) do |headers, msg|
|
||||
begin
|
||||
routing_key = headers.envelope.routing_key
|
||||
|
|
@ -174,8 +130,6 @@ module JamWebsockets
|
|||
unless contexts.nil?
|
||||
|
||||
@log.debug "received session-directed message for session: #{session_id}"
|
||||
|
||||
msg = Jampb::ClientMessage.parse(msg)
|
||||
|
||||
# ok, its very odd to have your own message that you sent bounce back to you.
|
||||
# In one small favor to the client, we purposefully disallow messages a client
|
||||
|
|
@ -187,7 +141,9 @@ module JamWebsockets
|
|||
# counter-intuitively, even though a string is passed in when you send the header, an (apparently) auto-generated class is sent back which, if you to_s, returns the original value
|
||||
origin_client_id = origin_client_id.to_s unless origin_client_id.nil?
|
||||
|
||||
@log.debug "message received from client #{origin_client_id}"
|
||||
@log.debug "session message received from client #{origin_client_id}"
|
||||
|
||||
msg = Jampb::ClientMessage.parse(msg)
|
||||
contexts.each do |context|
|
||||
if context.client.client_id != origin_client_id
|
||||
EM.schedule do
|
||||
|
|
@ -205,55 +161,6 @@ module JamWebsockets
|
|||
end
|
||||
end
|
||||
|
||||
def send_to_client(client, msg)
|
||||
if client.encode_json
|
||||
client.send(msg.to_json.to_s)
|
||||
else
|
||||
# this is so odd that this is necessary from an API perspective. but searching through the source code... it's all I could find in em-websocket for allowing a binary message to be sent
|
||||
client.instance_variable_get(:@handler).send_frame(:binary, msg.to_s)
|
||||
end
|
||||
end
|
||||
|
||||
def cleanup()
|
||||
# shutdown topic listeners and mq connection
|
||||
begin
|
||||
if !@user_subscription.nil? && @user_subscription.active?
|
||||
@log.debug "cleaning up user subscription"
|
||||
@user_subscription.cancel
|
||||
@user_subscription.shutdown!
|
||||
end
|
||||
|
||||
if !@session_subscription.nil? && @session_subscription.active?
|
||||
@log.debug "cleaning up session subscription"
|
||||
@session_subscription.cancel
|
||||
@session_subscription.shutdown!
|
||||
end
|
||||
|
||||
rescue => e
|
||||
@log.debug "unable to cancel subscription on cleanup: #{e}"
|
||||
end
|
||||
|
||||
@thread_pool.shutdown
|
||||
|
||||
if !@channel.nil?
|
||||
@channel.close
|
||||
end
|
||||
|
||||
if !@connection.nil?
|
||||
@connection.close
|
||||
end
|
||||
|
||||
# tear down each individual client
|
||||
@clients.each do |client, context|
|
||||
cleanup_client(client)
|
||||
end
|
||||
end
|
||||
|
||||
def stop
|
||||
@log.info "shutdown"
|
||||
cleanup
|
||||
end
|
||||
|
||||
def new_client(client)
|
||||
|
||||
# give a unique ID to this client. This is used to prevent session messages
|
||||
|
|
@ -342,6 +249,113 @@ module JamWebsockets
|
|||
|
||||
end
|
||||
|
||||
def add_user(client_context)
|
||||
user_contexts = @user_context_lookup[client_context.user.id]
|
||||
|
||||
if user_contexts.nil?
|
||||
user_contexts = Set.new
|
||||
@user_context_lookup[client_context.user.id] = user_contexts
|
||||
end
|
||||
|
||||
user_contexts.add(client_context)
|
||||
end
|
||||
|
||||
def remove_user(client_context)
|
||||
user_contexts = @user_context_lookup[client_context.user.id]
|
||||
|
||||
if user_contexts.nil?
|
||||
@log.warn "user can not be removed #{client_context}"
|
||||
else
|
||||
# delete the context from set of user contexts
|
||||
user_contexts.delete(client_context)
|
||||
|
||||
# if last user context, delete entire set (memory leak concern)
|
||||
if user_contexts.length == 0
|
||||
@user_context_lookup.delete(client_context.user.id)
|
||||
end
|
||||
|
||||
client_context.user = nil
|
||||
end
|
||||
end
|
||||
|
||||
def add_session(client_context)
|
||||
session_contexts = @session_context_lookup[client_context.session.id]
|
||||
|
||||
if session_contexts.nil?
|
||||
session_contexts = Set.new
|
||||
@session_context_lookup[client_context.session.id] = session_contexts
|
||||
end
|
||||
|
||||
session_contexts.add(client_context)
|
||||
end
|
||||
|
||||
def remove_session(client_context)
|
||||
session_contexts = @session_context_lookup[client_context.session.id]
|
||||
|
||||
if session_contexts.nil?
|
||||
@log.warn "session can not be removed #{client_context}"
|
||||
else
|
||||
# delete the context from set of session contexts
|
||||
session_contexts.delete(client_context)
|
||||
|
||||
# if last session context, delete entire set (memory leak concern)
|
||||
if session_contexts.length == 0
|
||||
@session_context_lookup.delete(client_context.session.id)
|
||||
end
|
||||
|
||||
client_context.session = nil
|
||||
end
|
||||
end
|
||||
|
||||
def send_to_client(client, msg)
|
||||
if client.encode_json
|
||||
client.send(msg.to_json.to_s)
|
||||
else
|
||||
# this is so odd that this is necessary from an API perspective. but searching through the source code... it's all I could find in em-websocket for allowing a binary message to be sent
|
||||
client.instance_variable_get(:@handler).send_frame(:binary, msg.to_s)
|
||||
end
|
||||
end
|
||||
|
||||
def cleanup()
|
||||
# shutdown topic listeners and mq connection
|
||||
begin
|
||||
if !@user_subscription.nil? && @user_subscription.active?
|
||||
@log.debug "cleaning up user subscription"
|
||||
@user_subscription.cancel
|
||||
@user_subscription.shutdown!
|
||||
end
|
||||
|
||||
if !@session_subscription.nil? && @session_subscription.active?
|
||||
@log.debug "cleaning up session subscription"
|
||||
@session_subscription.cancel
|
||||
@session_subscription.shutdown!
|
||||
end
|
||||
|
||||
rescue => e
|
||||
@log.debug "unable to cancel subscription on cleanup: #{e}"
|
||||
end
|
||||
|
||||
@thread_pool.shutdown
|
||||
|
||||
if !@channel.nil?
|
||||
@channel.close
|
||||
end
|
||||
|
||||
if !@connection.nil?
|
||||
@connection.close
|
||||
end
|
||||
|
||||
# tear down each individual client
|
||||
@clients.each do |client, context|
|
||||
cleanup_client(client)
|
||||
end
|
||||
end
|
||||
|
||||
def stop
|
||||
@log.info "shutdown"
|
||||
cleanup
|
||||
end
|
||||
|
||||
# removes all resources associated with a client
|
||||
def cleanup_client(client)
|
||||
|
||||
|
|
@ -358,8 +372,8 @@ module JamWebsockets
|
|||
remove_user(context)
|
||||
|
||||
# remove this connection from the database
|
||||
connection = Connection.delete_all "user_id = '#{context.user.id}' AND client_id = '#{context.client.client_id}'"
|
||||
send_friend_update(user, false)
|
||||
connection = JamRuby::Connection.delete_all "user_id = '#{context.user.id}' AND client_id = '#{context.client.client_id}'"
|
||||
send_friend_update(client, user, false)
|
||||
|
||||
if !context.session.nil?
|
||||
remove_session(context)
|
||||
|
|
@ -447,8 +461,8 @@ module JamWebsockets
|
|||
login_ack = @message_factory.login_ack(remote_ip)
|
||||
send_to_client(client, login_ack)
|
||||
|
||||
# remove from pending_queue
|
||||
@semaphore.synchronize do
|
||||
# remove from pending_queue
|
||||
@pending_clients.delete(client)
|
||||
|
||||
# add a tracker for this user
|
||||
|
|
@ -457,10 +471,10 @@ module JamWebsockets
|
|||
add_user(context)
|
||||
|
||||
# log this connection in the database
|
||||
connection = Connection.new(user.id, client.id)
|
||||
connection = JamRuby::Connection.new(user.id, client.id)
|
||||
|
||||
if connection.save?
|
||||
send_friend_update(user, true)
|
||||
send_friend_update(client, user, true)
|
||||
end
|
||||
end
|
||||
else
|
||||
|
|
@ -468,26 +482,19 @@ module JamWebsockets
|
|||
end
|
||||
end
|
||||
|
||||
def send_friend_update(user, online)
|
||||
def send_friend_update(client, user, online)
|
||||
unless user.friends.nil?
|
||||
@log.debug "sending friend update message to friends"
|
||||
|
||||
# create the friend_update message
|
||||
friend_update = @message_factory.friend_update(user.id, online)
|
||||
friend_update_msg = @message_factory.friend_update(user.id, online)
|
||||
|
||||
# send the friend_update to each friend that has active connections
|
||||
user.friends.each do |friend|
|
||||
# only send to friends that have active connections
|
||||
active_connections = @user_context_lookup[friend.id]
|
||||
unless active_connections.nil?
|
||||
# send the update to each active connection of this friend
|
||||
active_connections.each do |context|
|
||||
EM.schedule do
|
||||
@log.debug "sending friend update message to #{friend}"
|
||||
send_to_client(context.client, friend_update)
|
||||
end
|
||||
end
|
||||
end
|
||||
@log.debug "sending friend update message to #{context}"
|
||||
|
||||
# put it on the topic exchange for users
|
||||
@users_exchange.publish(friend_update_msg.to_s, :routing_key => "user.#{friend.id}")
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
require 'em-websocket'
|
||||
|
||||
module JamWebsockets
|
||||
|
||||
class Server
|
||||
|
||||
def initialize(options={})
|
||||
|
|
|
|||
Loading…
Reference in New Issue