178 lines
5.8 KiB
Ruby
178 lines
5.8 KiB
Ruby
require 'em-websocket'
|
|
require 'bugsnag'
|
|
module JamWebsockets
|
|
|
|
class Server
|
|
|
|
def initialize(options={})
|
|
EM::WebSocket.close_timeout = 10 # the default of 60 is pretty intense
|
|
@log = Logging.logger[self]
|
|
@count=0
|
|
@router = Router.new
|
|
@ar_base_logger = ::Logging::Repository.instance[ActiveRecord::Base]
|
|
|
|
@last_conn_check = nil
|
|
end
|
|
|
|
def run(options={})
|
|
host = "0.0.0.0"
|
|
port = options[:port]
|
|
trust_port = port + 1
|
|
connect_time_stale_client = options[:connect_time_stale_client].to_i
|
|
connect_time_expire_client = options[:connect_time_expire_client].to_i
|
|
connect_time_stale_browser = options[:connect_time_stale_browser].to_i
|
|
connect_time_expire_browser = options[:connect_time_expire_browser].to_i
|
|
max_connections_per_user = options[:max_connections_per_user].to_i
|
|
gateway_name = options[:gateway_name]
|
|
rabbitmq_host = options[:rabbitmq_host]
|
|
rabbitmq_port = options[:rabbitmq_port].to_i
|
|
allow_dynamic_registration = options[:allow_dynamic_registration].nil? ? true : options[:allow_dynamic_registration]
|
|
|
|
|
|
Stats::init(options)
|
|
|
|
calling_thread = options[:calling_thread]
|
|
trust_check = TrustCheck.new(trust_port, options[:cidr])
|
|
|
|
@log.info "starting server #{host}:#{port} staleness_time=#{connect_time_stale_client}; reconnect time = #{connect_time_expire_client}, rabbitmq=#{rabbitmq_host}:#{rabbitmq_port} gateway_name=#{gateway_name}"
|
|
|
|
EventMachine.error_handler{|e|
|
|
puts "unhandled error #{e}"
|
|
@log.error "unhandled error #{e}"
|
|
Bugsnag.notify(e)
|
|
}
|
|
|
|
EventMachine.run do
|
|
@router.start(connect_time_stale_client, connect_time_expire_client, connect_time_stale_browser, connect_time_expire_browser, host: rabbitmq_host, port: rabbitmq_port, max_connections_per_user: max_connections_per_user, gateway: gateway_name, allow_dynamic_registration: allow_dynamic_registration) do
|
|
start_connection_expiration
|
|
start_client_expiration
|
|
start_connection_flagger
|
|
start_stats_dump
|
|
start_websocket_listener(host, port, trust_port, trust_check, options[:emwebsocket_debug])
|
|
calling_thread.wakeup if calling_thread
|
|
end
|
|
|
|
# if you don't do this, the app won't exit unless you kill -9
|
|
at_exit do
|
|
@log.info "cleaning up server"
|
|
@router.cleanup
|
|
end
|
|
end
|
|
end
|
|
|
|
def stop
|
|
EventMachine::stop_event_loop
|
|
end
|
|
|
|
|
|
def check_for_em_drift(timer)
|
|
# if our timer check is a full second off, say what's up
|
|
if Time.now - @last_conn_check > timer + 1
|
|
@log.error("significant drift! Should be 2 seconds. Instead was: #{Time.now - @last_conn_check}")
|
|
end
|
|
|
|
@last_conn_check = Time.now
|
|
end
|
|
|
|
|
|
|
|
def start_websocket_listener(listen_ip, port, trust_port, trust_check, emwebsocket_debug)
|
|
EventMachine::WebSocket.run(:host => listen_ip, :port => port, :debug => emwebsocket_debug) do |ws|
|
|
#@log.info "new client #{ws}"
|
|
@router.new_client(ws, false)
|
|
end
|
|
EventMachine::WebSocket.run(:host => listen_ip, :port => trust_port, :debug => emwebsocket_debug) do |ws|
|
|
@log.info "new latency_tester client #{ws}"
|
|
# verify this connection came in from a valid subnet, if specified
|
|
ip = extract_ip(ws)
|
|
if trust_check.trusted?(ip, trust_port)
|
|
@router.new_client(ws, true)
|
|
else
|
|
@log.warn("untrusted client attempted to connect to #{listen_ip}:#{trust_port} from #{ip}")
|
|
ws.close
|
|
end
|
|
end
|
|
@log.debug("started websocket")
|
|
end
|
|
|
|
def start_connection_expiration
|
|
# one cleanup on startup
|
|
@router.periodical_check_connections
|
|
|
|
@last_conn_check = Time.now
|
|
timer = 2
|
|
EventMachine::PeriodicTimer.new(timer) do
|
|
check_for_em_drift(timer)
|
|
time_it('conn_expire') { safety_net { sane_logging { @router.periodical_check_connections } } }
|
|
end
|
|
end
|
|
|
|
def start_client_expiration
|
|
# one cleanup on startup
|
|
@router.periodical_check_clients
|
|
|
|
EventMachine::PeriodicTimer.new(30) do
|
|
time_it('client_expire') { safety_net { sane_logging { @router.periodical_check_clients } } }
|
|
end
|
|
end
|
|
|
|
def start_connection_flagger
|
|
# one cleanup on startup
|
|
@router.periodical_flag_connections
|
|
|
|
EventMachine::PeriodicTimer.new(2) do
|
|
time_it('conn_flagger') { safety_net { sane_logging { @router.periodical_flag_connections } } }
|
|
end
|
|
end
|
|
|
|
def start_stats_dump
|
|
EventMachine::PeriodicTimer.new(60) do
|
|
time_it('stats_dump') { safety_net { @router.periodical_stats_dump } }
|
|
end
|
|
end
|
|
|
|
# this was added for this reason: https://jamkazam.atlassian.net/browse/VRFS-2425
|
|
# if an unhandled exception occurs in PeriodicTimer, it just kills all future timers; doesn't kill the app.
|
|
# not really what you want.
|
|
|
|
# so, we signal to Bugsnag, so we know really bad stuff is happening, but we also move
|
|
def safety_net(&blk)
|
|
begin
|
|
blk.call
|
|
rescue => e
|
|
Bugsnag.notify(e)
|
|
@log.error("unhandled exception in EM Timer #{e}")
|
|
end
|
|
end
|
|
|
|
def time_it(cat, &blk)
|
|
start = Time.now
|
|
|
|
blk.call
|
|
|
|
time = Time.now - start
|
|
|
|
@log.warn("LONG TIME #{cat}: #{time}") if time > 1
|
|
end
|
|
|
|
|
|
def sane_logging(&blk)
|
|
# used around repeated transactions that cause too much ActiveRecord::Base logging
|
|
# example is handling heartbeats
|
|
begin
|
|
original_level = @ar_base_logger.level if @ar_base_logger
|
|
@ar_base_logger.level = :info if @ar_base_logger
|
|
blk.call
|
|
ensure
|
|
@ar_base_logger.level = original_level if @ar_base_logger
|
|
end
|
|
end
|
|
|
|
private
|
|
def extract_ip(client)
|
|
Socket.unpack_sockaddr_in(client.get_peername)[1]
|
|
end
|
|
end
|
|
|
|
end
|