VRFS-1942 refactoring scheduled session emails; new admin view into batch email data queries
This commit is contained in:
parent
6a037d9987
commit
ff73c170bd
|
|
@ -0,0 +1,79 @@
|
||||||
|
ActiveAdmin.register JamRuby::EmailBatchScheduledSessions, :as => 'Daily Sessions' do
|
||||||
|
|
||||||
|
menu :label => 'Daily Sessions', :parent => 'Email'
|
||||||
|
|
||||||
|
config.sort_order = 'updated_at DESC'
|
||||||
|
config.filters = false
|
||||||
|
config.batch_actions = false
|
||||||
|
config.clear_action_items!
|
||||||
|
|
||||||
|
index do
|
||||||
|
column 'Created' do |bb| bb.created_at end
|
||||||
|
column 'Status' do |bb| bb.aasm_state end
|
||||||
|
column 'Sent Count' do |bb| bb.sent_count end
|
||||||
|
column 'Counters' do |bb| bb.test_emails end
|
||||||
|
column 'Started' do |bb| bb.started_at end
|
||||||
|
column 'Completed' do |bb| bb.completed_at end
|
||||||
|
end
|
||||||
|
|
||||||
|
action_item :only => [:index] do
|
||||||
|
link_to('Daily Session Snapshot', new_admin_daily_session_path)
|
||||||
|
end
|
||||||
|
|
||||||
|
show :title => "Daily Session Snapshot" do |obj|
|
||||||
|
h3 "Session created range: (#{obj.earliest_session_create_time}, #{obj.latest_session_create_time}); Earliest session start: #{obj.earliest_session_start_time}"
|
||||||
|
h3 "Max Latency Score: #{params[:max_score] ? params[:max_score] : Score::MAX_YELLOW_LATENCY}"
|
||||||
|
h4 "(append URL with ?max_score=NN to change max latency from default (#{Score::MAX_YELLOW_LATENCY}))"
|
||||||
|
|
||||||
|
num, objs = obj.snapshot_scored_recipients
|
||||||
|
panel "Session & Scoring Matches (#{num})" do
|
||||||
|
table_for(objs) do
|
||||||
|
column :receiver_id do |oo|
|
||||||
|
link_to(oo.vals['receiver_id'], admin_user_path(oo.vals['receiver_id']), {:target => '_blank'})
|
||||||
|
end
|
||||||
|
column :session_id do |oo|
|
||||||
|
link_to(oo.vals['session_id'], admin_music_session_path(oo.vals['session_id']), {:target => '_blank'})
|
||||||
|
end
|
||||||
|
column :latency_score do |oo| oo.vals['latency'] end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
num, objs = obj.snapshot_eligible_sessions
|
||||||
|
panel "Eligible Sessions (#{num})" do
|
||||||
|
table_for(objs) do
|
||||||
|
column :session_id do |oo|
|
||||||
|
link_to(oo.vals['session_id'], admin_music_session_path(oo.vals['session_id']), {:target => '_blank'})
|
||||||
|
end
|
||||||
|
column :creator_id do |oo|
|
||||||
|
link_to(oo.vals['creator_id'], admin_user_path(oo.vals['creator_id']), {:target => '_blank'})
|
||||||
|
end
|
||||||
|
column :creator_score_idx do |oo| oo.vals['creator_score_idx'] end
|
||||||
|
column :instrument_id do |oo| oo.vals['instrument_id'] end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
num, objs = obj.snapshot_eligible_recipients
|
||||||
|
panel "Eligible Recipients (#{num})" do
|
||||||
|
table_for(objs) do
|
||||||
|
column :receiver_id do |oo|
|
||||||
|
link_to(oo.vals['receiver_id'], admin_user_path(oo.vals['receiver_id']), {:target => '_blank'})
|
||||||
|
end
|
||||||
|
column :receiver_score_idx do |oo| oo.vals['receiver_score_idx'] end
|
||||||
|
column :instrument_id do |oo| oo.vals['instrument_id'] end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
controller do
|
||||||
|
def new
|
||||||
|
if 0 < (max_score = params[:max_score].to_i)
|
||||||
|
ENV[EmailBatchScheduledSessions::ENV_MAX_LATENCY] = max_score.to_s
|
||||||
|
else
|
||||||
|
ENV[EmailBatchScheduledSessions::ENV_MAX_LATENCY] = '0'
|
||||||
|
end
|
||||||
|
set_resource_ivar(EmailBatchScheduledSessions.refresh_snapshot!)
|
||||||
|
render active_admin_template('show')
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
end
|
||||||
|
|
@ -33,6 +33,7 @@ FOO
|
||||||
state :delivering
|
state :delivering
|
||||||
state :delivered
|
state :delivered
|
||||||
state :disabled
|
state :disabled
|
||||||
|
state :snapshot
|
||||||
|
|
||||||
event :enable do
|
event :enable do
|
||||||
transitions :from => :disabled, :to => :pending
|
transitions :from => :disabled, :to => :pending
|
||||||
|
|
@ -55,6 +56,9 @@ FOO
|
||||||
event :disable do
|
event :disable do
|
||||||
transitions :from => [:pending, :tested, :delivered], :to => :disabled
|
transitions :from => [:pending, :tested, :delivered], :to => :disabled
|
||||||
end
|
end
|
||||||
|
event :snapshoting, :after => :take_snapshot do
|
||||||
|
transitions :from => [:pending], :to => :snapshot
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def self.new(*args)
|
def self.new(*args)
|
||||||
|
|
@ -184,6 +188,9 @@ FOO
|
||||||
self.update_with_conflict_validation({ :completed_at => Time.now })
|
self.update_with_conflict_validation({ :completed_at => Time.now })
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def take_snapshot
|
||||||
|
end
|
||||||
|
|
||||||
def clone
|
def clone
|
||||||
bb = EmailBatch.new
|
bb = EmailBatch.new
|
||||||
bb.subject = self.subject
|
bb.subject = self.subject
|
||||||
|
|
|
||||||
|
|
@ -5,6 +5,7 @@ module JamRuby
|
||||||
def time_since_last_batch_query
|
def time_since_last_batch_query
|
||||||
self.class
|
self.class
|
||||||
.where(['created_at < ?', self.created_at])
|
.where(['created_at < ?', self.created_at])
|
||||||
|
.where(:aasm_state => 'delivered')
|
||||||
.order('created_at DESC')
|
.order('created_at DESC')
|
||||||
.limit(1)
|
.limit(1)
|
||||||
end
|
end
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,24 @@
|
||||||
module JamRuby
|
module JamRuby
|
||||||
|
|
||||||
|
class ResultStub
|
||||||
|
extend ActiveModel::Naming
|
||||||
|
extend ActiveModel::Translation
|
||||||
|
include ActiveModel::Validations
|
||||||
|
include ActiveModel::Conversion
|
||||||
|
|
||||||
|
attr_accessor :vals
|
||||||
|
|
||||||
|
def initialize(vals)
|
||||||
|
@vals = vals
|
||||||
|
end
|
||||||
|
|
||||||
|
def self.stubs(sql)
|
||||||
|
ActiveRecord::Base.connection.execute(sql).collect { |rr| self.new(rr) }
|
||||||
|
end
|
||||||
|
|
||||||
|
def persisted?; false; end
|
||||||
|
end
|
||||||
|
|
||||||
class EmailBatchScheduledSessions < EmailBatchPeriodic
|
class EmailBatchScheduledSessions < EmailBatchPeriodic
|
||||||
|
|
||||||
BATCH_SIZE = 500
|
BATCH_SIZE = 500
|
||||||
|
|
@ -9,106 +29,56 @@ module JamRuby
|
||||||
TMP_RECIP = 'tmp_candidate_recipients'
|
TMP_RECIP = 'tmp_candidate_recipients'
|
||||||
TMP_MATCH = 'tmp_matches'
|
TMP_MATCH = 'tmp_matches'
|
||||||
|
|
||||||
|
ENV_MAX_LATENCY = 'env_max_latency'
|
||||||
|
|
||||||
|
def self.refresh_snapshot!
|
||||||
|
self.where(:aasm_state => 'snapshot').limit(1).first.try(:destroy)
|
||||||
|
oo = self.create
|
||||||
|
oo.snapshoting!
|
||||||
|
oo
|
||||||
|
end
|
||||||
|
|
||||||
def self.subject
|
def self.subject
|
||||||
"New sessions have been scheduled that may be a good match for you!"
|
"New sessions have been scheduled that may be a good match for you!"
|
||||||
end
|
end
|
||||||
|
|
||||||
# inserts eligible sessions to temp table
|
def earliest_session_create_time
|
||||||
def _collect_eligible_sessions
|
time_since_last_batch(SINCE_DAYS)
|
||||||
ActiveRecord::Base.connection.execute("DROP TABLE IF EXISTS #{TMP_SESS}")
|
|
||||||
sql =<<SQL
|
|
||||||
SELECT
|
|
||||||
msess.id AS session_id,
|
|
||||||
msess.user_id AS creator_id,
|
|
||||||
users.last_jam_locidispid AS creator_score_idx,
|
|
||||||
rs.instrument_id
|
|
||||||
INTO TEMP TABLE #{TMP_SESS}
|
|
||||||
FROM music_sessions msess
|
|
||||||
INNER JOIN users ON users.id = msess.user_id
|
|
||||||
INNER JOIN rsvp_slots AS rs ON rs.music_session_id = msess.id
|
|
||||||
LEFT JOIN rsvp_requests_rsvp_slots AS rrrs ON rrrs.rsvp_slot_id = rs.id
|
|
||||||
WHERE
|
|
||||||
musician_access = 't' AND
|
|
||||||
approval_required = 'f' AND
|
|
||||||
users.last_jam_locidispid IS NOT NULL AND
|
|
||||||
msess.created_at > '#{time_since_last_batch(SINCE_DAYS)}' AND
|
|
||||||
msess.created_at < '#{self.created_at}' AND
|
|
||||||
scheduled_start >= '#{Time.now() + MIN_HOURS_START.hours}' AND
|
|
||||||
(rrrs.rsvp_slot_id IS NULL OR rrrs.chosen != 't')
|
|
||||||
SQL
|
|
||||||
ActiveRecord::Base.connection.execute(sql)
|
|
||||||
end
|
end
|
||||||
|
|
||||||
def _collect_eligible_recipients
|
def latest_session_create_time
|
||||||
ActiveRecord::Base.connection.execute("DROP TABLE IF EXISTS #{TMP_RECIP}")
|
self.created_at
|
||||||
# load eligible recipients into tmp table
|
|
||||||
sql =<<SQL
|
|
||||||
SELECT
|
|
||||||
users.id AS receiver_id,
|
|
||||||
users.last_jam_locidispid AS receiver_score_idx
|
|
||||||
INTO TEMP TABLE #{TMP_RECIP}
|
|
||||||
FROM users
|
|
||||||
INNER JOIN musicians_instruments AS mi ON mi.user_id = users.id
|
|
||||||
INNER JOIN #{TMP_SESS} ON #{TMP_SESS}.instrument_id = mi.instrument_id
|
|
||||||
WHERE
|
|
||||||
users.last_jam_locidispid IS NOT NULL AND
|
|
||||||
users.musician = 't' AND
|
|
||||||
users.subscribe_email = 't'
|
|
||||||
SQL
|
|
||||||
ActiveRecord::Base.connection.execute(sql)
|
|
||||||
end
|
end
|
||||||
|
|
||||||
def _collect_scored_recipients
|
def earliest_session_start_time
|
||||||
ActiveRecord::Base.connection.execute("DROP TABLE IF EXISTS #{TMP_MATCH}")
|
self.created_at + MIN_HOURS_START.hours
|
||||||
sql =<<SQL
|
|
||||||
SELECT
|
|
||||||
DISTINCT #{TMP_RECIP}.receiver_id,
|
|
||||||
#{TMP_SESS}.session_id,
|
|
||||||
scores.score AS latency
|
|
||||||
INTO TEMP TABLE #{TMP_MATCH}
|
|
||||||
FROM scores
|
|
||||||
INNER JOIN #{TMP_SESS} ON #{TMP_SESS}.creator_score_idx = scores.alocidispid
|
|
||||||
INNER JOIN #{TMP_RECIP} ON #{TMP_RECIP}.receiver_score_idx = scores.blocidispid
|
|
||||||
WHERE
|
|
||||||
scores.score < #{Score::MAX_YELLOW_LATENCY} AND
|
|
||||||
#{TMP_RECIP}.receiver_id != #{TMP_SESS}.creator_id
|
|
||||||
GROUP BY
|
|
||||||
#{TMP_RECIP}.receiver_id,
|
|
||||||
#{TMP_SESS}.session_id,
|
|
||||||
latency
|
|
||||||
SQL
|
|
||||||
ActiveRecord::Base.connection.execute(sql)
|
|
||||||
end
|
end
|
||||||
|
|
||||||
# select recipients whose score is below minimum threshold
|
def snapshot_eligible_sessions
|
||||||
def _select_scored_recipients(offset=0)
|
rr = ActiveRecord::Base.connection.execute("SELECT COUNT(*) AS num FROM #{TMP_SESS}")
|
||||||
if 0 > offset
|
[0 < rr.count ? rr[0]['num'].to_i : 0, ResultStub.stubs("SELECT * FROM #{TMP_SESS}")]
|
||||||
sql = "SELECT COUNT(DISTINCT receiver_id) AS num FROM #{TMP_MATCH}"
|
end
|
||||||
rr = ActiveRecord::Base.connection.execute(sql)
|
|
||||||
return 0 < rr.count ? rr[0]['num'].to_i : 0
|
def snapshot_eligible_recipients
|
||||||
else
|
rr = ActiveRecord::Base.connection.execute("SELECT COUNT(*) AS num FROM #{TMP_RECIP}")
|
||||||
sql =<<SQL
|
[0 < rr.count ? rr[0]['num'].to_i : 0, ResultStub.stubs("SELECT * FROM #{TMP_RECIP}")]
|
||||||
SELECT DISTINCT receiver_id
|
end
|
||||||
FROM #{TMP_MATCH}
|
|
||||||
ORDER BY receiver_id ASC
|
def snapshot_scored_recipients
|
||||||
LIMIT #{@per_page}
|
rr = ActiveRecord::Base.connection.execute("SELECT COUNT(*) AS num FROM #{TMP_MATCH}")
|
||||||
OFFSET #{offset}
|
[0 < rr.count ? rr[0]['num'].to_i : 0, ResultStub.stubs("SELECT * FROM #{TMP_MATCH}")]
|
||||||
SQL
|
end
|
||||||
return ActiveRecord::Base.connection.execute(sql)
|
|
||||||
end
|
def take_snapshot
|
||||||
|
_load_recipients
|
||||||
|
_count_recipients
|
||||||
|
self.update_attribute(:test_emails, @counters.inspect)
|
||||||
end
|
end
|
||||||
|
|
||||||
def fetch_recipients(per_page=BATCH_SIZE)
|
def fetch_recipients(per_page=BATCH_SIZE)
|
||||||
objs = []
|
objs = []
|
||||||
|
|
||||||
# load eligible sessions into tmp table
|
_load_recipients
|
||||||
self._collect_eligible_sessions
|
|
||||||
|
|
||||||
# load eligible mail recipients into tmp table
|
|
||||||
self._collect_eligible_recipients
|
|
||||||
|
|
||||||
# load mail recipients with minimum score into tmp table
|
|
||||||
self._collect_scored_recipients
|
|
||||||
|
|
||||||
@per_page = per_page
|
@per_page = per_page
|
||||||
num_recip = _select_scored_recipients(-1)
|
num_recip = _select_scored_recipients(-1)
|
||||||
|
|
@ -136,6 +106,9 @@ SQL
|
||||||
bset = EmailBatchSet.scheduled_session_set(self, receiver, sessions_and_latency)
|
bset = EmailBatchSet.scheduled_session_set(self, receiver, sessions_and_latency)
|
||||||
UserMailer.scheduled_session_daily(receiver, sessions_and_latency).deliver
|
UserMailer.scheduled_session_daily(receiver, sessions_and_latency).deliver
|
||||||
end
|
end
|
||||||
|
|
||||||
|
self.test_emails = _count_recipients.inspect
|
||||||
|
|
||||||
self.sent_count = self.opt_in_count
|
self.sent_count = self.opt_in_count
|
||||||
self.save
|
self.save
|
||||||
self.did_batch_run!
|
self.did_batch_run!
|
||||||
|
|
@ -147,5 +120,125 @@ SQL
|
||||||
oo
|
oo
|
||||||
end
|
end
|
||||||
|
|
||||||
|
private
|
||||||
|
|
||||||
|
# inserts eligible sessions to temp table
|
||||||
|
def _collect_eligible_sessions
|
||||||
|
ActiveRecord::Base.connection.execute("DROP TABLE IF EXISTS #{TMP_SESS}")
|
||||||
|
sql =<<SQL
|
||||||
|
SELECT
|
||||||
|
msess.id AS session_id,
|
||||||
|
msess.user_id AS creator_id,
|
||||||
|
users.last_jam_locidispid AS creator_score_idx,
|
||||||
|
rs.instrument_id
|
||||||
|
INTO TEMP TABLE #{TMP_SESS}
|
||||||
|
FROM music_sessions msess
|
||||||
|
INNER JOIN users ON users.id = msess.user_id
|
||||||
|
INNER JOIN rsvp_slots AS rs ON rs.music_session_id = msess.id
|
||||||
|
LEFT JOIN rsvp_requests_rsvp_slots AS rrrs ON rrrs.rsvp_slot_id = rs.id
|
||||||
|
WHERE
|
||||||
|
musician_access = 't' AND
|
||||||
|
approval_required = 'f' AND
|
||||||
|
users.last_jam_locidispid IS NOT NULL AND
|
||||||
|
msess.created_at > '#{earliest_session_create_time}' AND
|
||||||
|
msess.created_at < '#{latest_session_create_time}' AND
|
||||||
|
scheduled_start >= '#{earliest_session_start_time}' AND
|
||||||
|
(rrrs.rsvp_slot_id IS NULL OR rrrs.chosen != 't')
|
||||||
|
SQL
|
||||||
|
ActiveRecord::Base.connection.execute(sql)
|
||||||
|
end
|
||||||
|
|
||||||
|
def _collect_eligible_recipients
|
||||||
|
ActiveRecord::Base.connection.execute("DROP TABLE IF EXISTS #{TMP_RECIP}")
|
||||||
|
# load eligible recipients into tmp table
|
||||||
|
sql =<<SQL
|
||||||
|
SELECT
|
||||||
|
users.id AS receiver_id,
|
||||||
|
users.last_jam_locidispid AS receiver_score_idx,
|
||||||
|
mi.instrument_id
|
||||||
|
INTO TEMP TABLE #{TMP_RECIP}
|
||||||
|
FROM users
|
||||||
|
INNER JOIN musicians_instruments AS mi ON mi.user_id = users.id
|
||||||
|
INNER JOIN #{TMP_SESS} ON #{TMP_SESS}.instrument_id = mi.instrument_id
|
||||||
|
WHERE
|
||||||
|
users.last_jam_locidispid IS NOT NULL AND
|
||||||
|
users.musician = 't' AND
|
||||||
|
users.subscribe_email = 't'
|
||||||
|
SQL
|
||||||
|
ActiveRecord::Base.connection.execute(sql)
|
||||||
|
end
|
||||||
|
|
||||||
|
def _collect_scored_recipients
|
||||||
|
ActiveRecord::Base.connection.execute("DROP TABLE IF EXISTS #{TMP_MATCH}")
|
||||||
|
if 0 == (max_score = ENV[ENV_MAX_LATENCY].to_i)
|
||||||
|
max_score = Score::MAX_YELLOW_LATENCY
|
||||||
|
end
|
||||||
|
sql =<<SQL
|
||||||
|
SELECT
|
||||||
|
DISTINCT #{TMP_RECIP}.receiver_id,
|
||||||
|
#{TMP_SESS}.session_id,
|
||||||
|
scores.score AS latency
|
||||||
|
INTO TEMP TABLE #{TMP_MATCH}
|
||||||
|
FROM scores
|
||||||
|
INNER JOIN #{TMP_SESS} ON #{TMP_SESS}.creator_score_idx = scores.alocidispid
|
||||||
|
INNER JOIN #{TMP_RECIP} ON #{TMP_RECIP}.receiver_score_idx = scores.blocidispid
|
||||||
|
WHERE
|
||||||
|
scores.score < #{max_score} AND
|
||||||
|
#{TMP_RECIP}.receiver_id != #{TMP_SESS}.creator_id
|
||||||
|
GROUP BY
|
||||||
|
#{TMP_RECIP}.receiver_id,
|
||||||
|
#{TMP_SESS}.session_id,
|
||||||
|
latency
|
||||||
|
SQL
|
||||||
|
ActiveRecord::Base.connection.execute(sql)
|
||||||
|
end
|
||||||
|
|
||||||
|
# select recipients whose score is below minimum threshold
|
||||||
|
def _select_scored_recipients(offset=0)
|
||||||
|
if 0 > offset
|
||||||
|
sql = "SELECT COUNT(DISTINCT receiver_id) AS num FROM #{TMP_MATCH}"
|
||||||
|
rr = ActiveRecord::Base.connection.execute(sql)
|
||||||
|
return 0 < rr.count ? rr[0]['num'].to_i : 0
|
||||||
|
else
|
||||||
|
sql =<<SQL
|
||||||
|
SELECT DISTINCT receiver_id
|
||||||
|
FROM #{TMP_MATCH}
|
||||||
|
ORDER BY receiver_id ASC
|
||||||
|
LIMIT #{@per_page}
|
||||||
|
OFFSET #{offset}
|
||||||
|
SQL
|
||||||
|
return ActiveRecord::Base.connection.execute(sql)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def _load_recipients
|
||||||
|
# load eligible sessions into tmp table
|
||||||
|
_collect_eligible_sessions
|
||||||
|
|
||||||
|
# load eligible mail recipients into tmp table
|
||||||
|
_collect_eligible_recipients
|
||||||
|
|
||||||
|
# load mail recipients with minimum score into tmp table
|
||||||
|
_collect_scored_recipients
|
||||||
|
end
|
||||||
|
|
||||||
|
def _count_recipients(load_tmp_tables = false)
|
||||||
|
return @counters if @counters || !self.snapshot?
|
||||||
|
_load_recipients if load_tmp_tables
|
||||||
|
|
||||||
|
rr = ActiveRecord::Base.connection.execute("SELECT COUNT(*) AS num FROM #{TMP_SESS}")
|
||||||
|
session_count = 0 < rr.count ? rr[0]['num'].to_i : 0
|
||||||
|
rr = ActiveRecord::Base.connection.execute("SELECT COUNT(*) AS num FROM #{TMP_RECIP}")
|
||||||
|
receiver_candidate_count = 0 < rr.count ? rr[0]['num'].to_i : 0
|
||||||
|
rr = ActiveRecord::Base.connection.execute("SELECT COUNT(*) AS num FROM #{TMP_MATCH}")
|
||||||
|
receiver_match_count = 0 < rr.count ? rr[0]['num'].to_i : 0
|
||||||
|
|
||||||
|
@counters = {
|
||||||
|
:sessions => session_count,
|
||||||
|
:receiver_candidates => receiver_candidate_count,
|
||||||
|
:receiver_match => receiver_match_count
|
||||||
|
}
|
||||||
|
end
|
||||||
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue