diff --git a/ruby/lib/jam_ruby/message_factory.rb b/ruby/lib/jam_ruby/message_factory.rb index 333006f2c..f5b59ae6e 100644 --- a/ruby/lib/jam_ruby/message_factory.rb +++ b/ruby/lib/jam_ruby/message_factory.rb @@ -475,8 +475,8 @@ module JamRuby def recording_master_mix_complete(receiver_id, recording_id, band_id, msg, notification_id, created_at) recording_master_mix_complete = Jampb::RecordingMasterMixComplete.new( - :band_invitation_id => invitation_id, - :photo_url => photo_url, + :recording_id => recording_id, + :band_id => band_id, :msg => msg, :notification_id => notification_id, :created_at => created_at diff --git a/ruby/lib/jam_ruby/mq_router.rb b/ruby/lib/jam_ruby/mq_router.rb index 087bace44..e117a0974 100644 --- a/ruby/lib/jam_ruby/mq_router.rb +++ b/ruby/lib/jam_ruby/mq_router.rb @@ -77,6 +77,8 @@ class MQRouter # sends a message to a user with no checking of permissions (RAW USAGE) # this method deliberately has no database interactivity/active_record objects def publish_to_user(user_id, user_msg) + @@log.warn "EM not running in publish_to_user" unless EM.reactor_running? + EM.schedule do @@log.debug "publishing to user:#{user_id} from server" # put it on the topic exchange for users diff --git a/ruby/lib/jam_ruby/resque/audiomixer.rb b/ruby/lib/jam_ruby/resque/audiomixer.rb index 5aab4abc7..e1bde1947 100644 --- a/ruby/lib/jam_ruby/resque/audiomixer.rb +++ b/ruby/lib/jam_ruby/resque/audiomixer.rb @@ -17,10 +17,14 @@ module JamRuby :error_reason, :error_detail def self.perform(mix_id, postback_output_url) - audiomixer = AudioMixer.new() - audiomixer.postback_output_url = postback_output_url - audiomixer.mix_id = mix_id - audiomixer.run + + JamWebEventMachine.run_wait_stop do + audiomixer = AudioMixer.new() + audiomixer.postback_output_url = postback_output_url + audiomixer.mix_id = mix_id + audiomixer.run + end + end def initialize diff --git a/web/config/initializers/eventmachine.rb b/web/config/initializers/eventmachine.rb index f09eb1811..097a332f6 100644 --- a/web/config/initializers/eventmachine.rb +++ b/web/config/initializers/eventmachine.rb @@ -12,60 +12,79 @@ require 'bugsnag' # Also starts websocket-gateway module JamWebEventMachine + # starts amqp & eventmachine up first. + # and then calls your block. + # After the supplied block is done, + # waits until all EM tasks scheduled in the supplied block are done, or timeout + def self.run_wait_stop(timeout = 30, &blk) + JamWebEventMachine.start + + thread = Thread.current + + blk.call + + # put our thread wake up event on the EM scheduler, + # meaning we go last (assuming that all EM tasks needed were scheduled in the blk) + EM.schedule do + thread.wakeup + end + + sleep timeout + end + def self.run_em - EventMachine.error_handler{|e| - Bugsnag.notify(e) - } + EventMachine.error_handler { |e| + Bugsnag.notify(e) + } - EM.run do - # this is global because we need to check elsewhere if we are currently connected to amqp before signalling success with some APIs, such as 'create session' - $amqp_connection_manager = AmqpConnectionManager.new(true, 4, :host => Rails.application.config.rabbitmq_host, :port => Rails.application.config.rabbitmq_port) - $amqp_connection_manager.connect do |channel| + EM.run do + # this is global because we need to check elsewhere if we are currently connected to amqp before signalling success with some APIs, such as 'create session' + $amqp_connection_manager = AmqpConnectionManager.new(true, 4, :host => Rails.application.config.rabbitmq_host, :port => Rails.application.config.rabbitmq_port) + $amqp_connection_manager.connect do |channel| - AMQP::Exchange.new(channel, :topic, "clients") do |exchange| - Rails.logger.debug("#{exchange.name} is ready to go") - MQRouter.client_exchange = exchange - end - - AMQP::Exchange.new(channel, :topic, "users") do |exchange| - Rails.logger.debug("#{exchange.name} is ready to go") - MQRouter.user_exchange = exchange - Rails.logger.debug "MQRouter.user_exchange = #{MQRouter.user_exchange}" - end + AMQP::Exchange.new(channel, :topic, "clients") do |exchange| + Rails.logger.debug("#{exchange.name} is ready to go") + MQRouter.client_exchange = exchange end - if Rails.application.config.websocket_gateway_enable && !$rails_rake_task + AMQP::Exchange.new(channel, :topic, "users") do |exchange| + Rails.logger.debug("#{exchange.name} is ready to go") + MQRouter.user_exchange = exchange + end + end - Thread.new { JamWebsockets::Server.new.run :port => Rails.application.config.websocket_gateway_port, + if Rails.application.config.websocket_gateway_enable && !$rails_rake_task + + Thread.new { JamWebsockets::Server.new.run :port => Rails.application.config.websocket_gateway_port, :emwebsocket_debug => Rails.application.config.websocket_gateway_internal_debug, :connect_time_stale => Rails.application.config.websocket_gateway_connect_time_stale, :connect_time_expire => Rails.application.config.websocket_gateway_connect_time_expire } - end - end + + end end def self.die_gracefully_on_signal Rails.logger.debug("*** die_gracefully_on_signal") - Signal.trap("INT") { EM.stop } + Signal.trap("INT") { EM.stop } Signal.trap("TERM") { EM.stop } end def self.start - if defined?(PhusionPassenger) + if defined?(PhusionPassenger) Rails.logger.debug("PhusionPassenger detected") PhusionPassenger.on_event(:starting_worker_process) do |forked| - # for passenger, we need to avoid orphaned threads + # for passenger, we need to avoid orphaned threads if forked && EM.reactor_running? - Rails.logger.debug("stopping EventMachine") + Rails.logger.debug("stopping EventMachine") EM.stop end - Rails.logger.debug("starting EventMachine") - Thread.new { - run_em - } + Rails.logger.debug("starting EventMachine") + Thread.new do + run_em + end die_gracefully_on_signal end elsif defined?(Unicorn) @@ -82,4 +101,5 @@ module JamWebEventMachine end end -JamWebEventMachine.start +JamWebEventMachine.start unless $rails_rake_task + diff --git a/web/config/initializers/websocket_gateway.rb b/web/config/initializers/websocket_gateway.rb deleted file mode 100644 index ee7691bf2..000000000 --- a/web/config/initializers/websocket_gateway.rb +++ /dev/null @@ -1,7 +0,0 @@ -if Rails.application.config.websocket_gateway_enable - - # Thread.new { JamWebsockets::Server.new.run :port => Rails.application.config.websocket_gateway_port, - # :emwebsocket_debug => Rails.application.config.websocket_gateway_internal_debug, - # :connect_time_stale => Rails.application.config.websocket_gateway_connect_time_stale, - # :connect_time_expire => Rails.application.config.websocket_gateway_connect_time_expire } -end