From 01e50b45b380e3344e31bff738fc2cb2eb30be8c Mon Sep 17 00:00:00 2001 From: Seth Call Date: Thu, 16 Jan 2014 02:21:28 +0000 Subject: [PATCH] * fixes for EventMachine initialization to work in ruby-only context (without web) --- ruby/lib/jam_ruby.rb | 1 + ruby/lib/jam_ruby/lib/em_helper.rb | 93 ++++++++++++++++++++ ruby/spec/support/utilities.rb | 8 ++ web/Gemfile | 2 - web/config/environment.rb | 6 ++ web/config/initializers/eventmachine.rb | 108 ++---------------------- 6 files changed, 117 insertions(+), 101 deletions(-) create mode 100644 ruby/lib/jam_ruby/lib/em_helper.rb diff --git a/ruby/lib/jam_ruby.rb b/ruby/lib/jam_ruby.rb index 0a36ead74..b756eadb8 100755 --- a/ruby/lib/jam_ruby.rb +++ b/ruby/lib/jam_ruby.rb @@ -28,6 +28,7 @@ require "jam_ruby/lib/module_overrides" require "jam_ruby/lib/s3_util" require "jam_ruby/lib/s3_manager" require "jam_ruby/lib/profanity" +require "jam_ruby/lib/em_helper.rb" require "jam_ruby/resque/audiomixer" require "jam_ruby/resque/scheduled/audiomixer_retry" require "jam_ruby/mq_router" diff --git a/ruby/lib/jam_ruby/lib/em_helper.rb b/ruby/lib/jam_ruby/lib/em_helper.rb new file mode 100644 index 000000000..73a928a89 --- /dev/null +++ b/ruby/lib/jam_ruby/lib/em_helper.rb @@ -0,0 +1,93 @@ +require 'amqp' +require 'jam_ruby' + +# Creates a connection to RabbitMQ + +# On that single connection, a channel is created (which is a way to multiplex multiple queues/topics over the same TCP connection with rabbitmq) +# Then connections to the client_exchange and user_exchange are made, and put into the MQRouter static variables +# If this code completes (which implies that Rails can start to begin with, because this is in an initializer), +# then the Rails app itself is free to send messages over these exchanges + +# Also starts websocket-gateway +module JamWebEventMachine + + @@log = Logging.logger[JamWebEventMachine] + + # starts amqp & eventmachine up first. + # and then calls your block. + # After the supplied block is done, + # waits until all EM tasks scheduled in the supplied block are done, or timeout + def self.run_wait_stop(timeout = 30, &blk) + JamWebEventMachine.start + + thread = Thread.current + + blk.call + + # put our thread wake up event on the EM scheduler, + # meaning we go last (assuming that all EM tasks needed were scheduled in the blk) + EM.schedule do + thread.wakeup + end + + sleep timeout + end + + + def self.run_em + + EM.run do + # this is global because we need to check elsewhere if we are currently connected to amqp before signalling success with some APIs, such as 'create session' + $amqp_connection_manager = AmqpConnectionManager.new(true, 4, :host => APP_CONFIG.rabbitmq_host, :port => APP_CONFIG.rabbitmq_port) + $amqp_connection_manager.connect do |channel| + + AMQP::Exchange.new(channel, :topic, "clients") do |exchange| + @@log.debug("#{exchange.name} is ready to go") + MQRouter.client_exchange = exchange + end + + AMQP::Exchange.new(channel, :topic, "users") do |exchange| + @@log.debug("#{exchange.name} is ready to go") + MQRouter.user_exchange = exchange + end + end + end + end + + def self.die_gracefully_on_signal + @@log.debug("*** die_gracefully_on_signal") + Signal.trap("INT") { EM.stop } + Signal.trap("TERM") { EM.stop } + end + + def self.start + if defined?(PhusionPassenger) + @@log.debug("PhusionPassenger detected") + + PhusionPassenger.on_event(:starting_worker_process) do |forked| + # for passenger, we need to avoid orphaned threads + if forked && EM.reactor_running? + @@log.debug("stopping EventMachine") + EM.stop + end + @@log.debug("starting EventMachine") + Thread.new do + run_em + end + die_gracefully_on_signal + end + elsif defined?(Unicorn) + @@log.debug("Unicorn detected--do nothing at initializer phase") + else + @@log.debug("Development environment detected") + Thread.abort_on_exception = true + + # create a new thread separate from the Rails main thread that EventMachine can run on + Thread.new do + run_em + end + end + end +end + + diff --git a/ruby/spec/support/utilities.rb b/ruby/spec/support/utilities.rb index e1206735e..9ef5410ed 100644 --- a/ruby/spec/support/utilities.rb +++ b/ruby/spec/support/utilities.rb @@ -21,6 +21,14 @@ def app_config ENV['AUDIOMIXER_PATH'] || audiomixer_workspace_path || "/var/lib/audiomixer/audiomixer/audiomixerapp" end + def rabbitmq_host + "localhost" + end + + def rabbitmq_port + 5672 + end + private def audiomixer_workspace_path diff --git a/web/Gemfile b/web/Gemfile index 7e052d9d7..8132e1b52 100644 --- a/web/Gemfile +++ b/web/Gemfile @@ -62,9 +62,7 @@ gem 'resque' gem 'resque-retry' gem 'resque-failed-job-mailer' gem 'resque-dynamic-queues' - gem 'quiet_assets', :group => :development - gem "bugsnag" group :development, :test do diff --git a/web/config/environment.rb b/web/config/environment.rb index 061a53982..bd172639e 100644 --- a/web/config/environment.rb +++ b/web/config/environment.rb @@ -1,3 +1,5 @@ +require 'bugsnag' + # Load the rails application require File.expand_path('../application', __FILE__) @@ -5,5 +7,9 @@ Mime::Type.register "audio/ogg", :audio_ogg APP_CONFIG = Rails.application.config +EventMachine.error_handler { |e| + Bugsnag.notify(e) +} + # Initialize the rails application SampleApp::Application.initialize! diff --git a/web/config/initializers/eventmachine.rb b/web/config/initializers/eventmachine.rb index 097a332f6..1b4bd95ad 100644 --- a/web/config/initializers/eventmachine.rb +++ b/web/config/initializers/eventmachine.rb @@ -1,105 +1,15 @@ -require 'amqp' -require 'jam_ruby' -require 'bugsnag' +unless $rails_rake_task -# Creates a connection to RabbitMQ + JamWebEventMachine.start -# On that single connection, a channel is created (which is a way to multiplex multiple queues/topics over the same TCP connection with rabbitmq) -# Then connections to the client_exchange and user_exchange are made, and put into the MQRouter static variables -# If this code completes (which implies that Rails can start to begin with, because this is in an initializer), -# then the Rails app itself is free to send messages over these exchanges + if APP_CONFIG.websocket_gateway_enable && !$rails_rake_task -# Also starts websocket-gateway -module JamWebEventMachine - - # starts amqp & eventmachine up first. - # and then calls your block. - # After the supplied block is done, - # waits until all EM tasks scheduled in the supplied block are done, or timeout - def self.run_wait_stop(timeout = 30, &blk) - JamWebEventMachine.start - - thread = Thread.current - - blk.call - - # put our thread wake up event on the EM scheduler, - # meaning we go last (assuming that all EM tasks needed were scheduled in the blk) - EM.schedule do - thread.wakeup - end - - sleep timeout - end - - def self.run_em - - EventMachine.error_handler { |e| - Bugsnag.notify(e) - } - - EM.run do - # this is global because we need to check elsewhere if we are currently connected to amqp before signalling success with some APIs, such as 'create session' - $amqp_connection_manager = AmqpConnectionManager.new(true, 4, :host => Rails.application.config.rabbitmq_host, :port => Rails.application.config.rabbitmq_port) - $amqp_connection_manager.connect do |channel| - - AMQP::Exchange.new(channel, :topic, "clients") do |exchange| - Rails.logger.debug("#{exchange.name} is ready to go") - MQRouter.client_exchange = exchange - end - - AMQP::Exchange.new(channel, :topic, "users") do |exchange| - Rails.logger.debug("#{exchange.name} is ready to go") - MQRouter.user_exchange = exchange - end - end - - if Rails.application.config.websocket_gateway_enable && !$rails_rake_task - - Thread.new { JamWebsockets::Server.new.run :port => Rails.application.config.websocket_gateway_port, - :emwebsocket_debug => Rails.application.config.websocket_gateway_internal_debug, - :connect_time_stale => Rails.application.config.websocket_gateway_connect_time_stale, - :connect_time_expire => Rails.application.config.websocket_gateway_connect_time_expire } - end - - end - end - - def self.die_gracefully_on_signal - Rails.logger.debug("*** die_gracefully_on_signal") - Signal.trap("INT") { EM.stop } - Signal.trap("TERM") { EM.stop } - end - - def self.start - if defined?(PhusionPassenger) - Rails.logger.debug("PhusionPassenger detected") - - PhusionPassenger.on_event(:starting_worker_process) do |forked| - # for passenger, we need to avoid orphaned threads - if forked && EM.reactor_running? - Rails.logger.debug("stopping EventMachine") - EM.stop - end - Rails.logger.debug("starting EventMachine") - Thread.new do - run_em - end - die_gracefully_on_signal - end - elsif defined?(Unicorn) - Rails.logger.debug("Unicorn detected--do nothing at initializer phase") - else - Rails.logger.debug("Development environment detected") - Thread.abort_on_exception = true - - # create a new thread separate from the Rails main thread that EventMachine can run on - Thread.new do - run_em - end + Thread.new do + JamWebsockets::Server.new.run( + :port => APP_CONFIG.websocket_gateway_port, + :emwebsocket_debug => APP_CONFIG.websocket_gateway_internal_debug, + :connect_time_stale => APP_CONFIG.websocket_gateway_connect_time_stale, + :connect_time_expire => APP_CONFIG.websocket_gateway_connect_time_expire) end end end - -JamWebEventMachine.start unless $rails_rake_task -