jam-cloud/ruby/lib/jam_ruby/models/email_batch_scheduled_sessi...

253 lines
7.8 KiB
Ruby

module JamRuby
class ResultStub
extend ActiveModel::Naming
extend ActiveModel::Translation
include ActiveModel::Validations
include ActiveModel::Conversion
attr_accessor :vals
def initialize(vals)
@vals = vals
end
def self.stubs(sql)
ActiveRecord::Base.connection.execute(sql).collect { |rr| self.new(rr) }
end
def persisted?; false; end
end
class EmailBatchScheduledSessions < EmailBatchPeriodic
BATCH_SIZE = 500
SINCE_DAYS = 2
MIN_HOURS_START = 2
TMP_SESS = 'tmp_candidate_sessions'
TMP_RECIP = 'tmp_candidate_recipients'
TMP_MATCH = 'tmp_matches'
ENV_MAX_LATENCY = 'env_max_latency'
ENV_QUERY_LIMIT = 'env_query_limit'
SNAPSHOT_QUERY_LIMIT = '500'
def self.refresh_snapshot!
self.where(:aasm_state => 'snapshot').limit(1).first.try(:destroy)
oo = self.create
oo.snapshoting!
oo
end
def self.subject
"New sessions have been scheduled that may be a good match for you!"
end
def earliest_session_create_time
time_since_last_batch(SINCE_DAYS)
end
def latest_session_create_time
self.created_at
end
def earliest_session_start_time
self.created_at + MIN_HOURS_START.hours
end
def snapshot_eligible_sessions
rr = ActiveRecord::Base.connection.execute("SELECT COUNT(*) AS num FROM #{TMP_SESS}")
[0 < rr.count ? rr[0]['num'].to_i : 0, ResultStub.stubs("SELECT * FROM #{TMP_SESS}")]
end
def snapshot_eligible_recipients
rr = ActiveRecord::Base.connection.execute("SELECT COUNT(*) AS num FROM #{TMP_RECIP}")
[0 < rr.count ? rr[0]['num'].to_i : 0, ResultStub.stubs("SELECT * FROM #{TMP_RECIP}")]
end
def snapshot_scored_recipients
rr = ActiveRecord::Base.connection.execute("SELECT COUNT(*) AS num FROM #{TMP_MATCH}")
[0 < rr.count ? rr[0]['num'].to_i : 0, ResultStub.stubs("SELECT * FROM #{TMP_MATCH}")]
end
def take_snapshot
_load_recipients
_count_recipients
self.update_attribute(:test_emails, @counters.inspect)
end
def fetch_recipients(per_page=BATCH_SIZE)
objs = []
_load_recipients
@per_page = per_page
num_recip = _select_scored_recipients(-1)
loops = (num_recip / @per_page) + (num_recip % @per_page) - 1
0.upto(loops) do |nn|
offset = nn * @per_page
# now just get the sessions/latency for each distinct mail recipient
_select_scored_recipients(offset).each do |result|
receiver = User.find_by_id(result['receiver_id'])
sessions = MusicSession.select("music_sessions.*, #{TMP_MATCH}.latency")
.joins("INNER JOIN #{TMP_MATCH} ON #{TMP_MATCH}.session_id = music_sessions.id")
.where(["#{TMP_MATCH}.receiver_id = ?", receiver.id])
.includes([:genre, :creator])
block_given? ? yield(receiver, sessions) : objs << [receiver, sessions]
end
end
objs
end
def deliver_batch_sets!
self.opt_in_count = 0
self.fetch_recipients do |receiver, sessions_and_latency|
self.opt_in_count += 1
bset = EmailBatchSet.scheduled_session_set(self, receiver, sessions_and_latency)
UserMailer.scheduled_session_daily(receiver, sessions_and_latency).deliver
end
self.test_emails = _count_recipients.inspect
self.sent_count = self.opt_in_count
self.save
self.did_batch_run!
end
def self.send_daily_session_batch
oo = self.create
oo.deliver_batch
oo
end
private
# inserts eligible sessions to temp table
def _collect_eligible_sessions
ActiveRecord::Base.connection.execute("DROP TABLE IF EXISTS #{TMP_SESS}")
limit_sql = (self.snapshot? && 0 < ENV[ENV_QUERY_LIMIT].to_i) ? "LIMIT #{ENV[ENV_QUERY_LIMIT]}" : ''
sql =<<SQL
SELECT
msess.id AS session_id,
msess.user_id AS creator_id,
users.last_jam_locidispid AS creator_score_idx,
rs.instrument_id
INTO TEMP TABLE #{TMP_SESS}
FROM music_sessions msess
INNER JOIN users ON users.id = msess.user_id
INNER JOIN rsvp_slots AS rs ON rs.music_session_id = msess.id
LEFT JOIN rsvp_requests_rsvp_slots AS rrrs ON rrrs.rsvp_slot_id = rs.id
WHERE
musician_access = 't' AND
approval_required = 'f' AND
users.last_jam_locidispid IS NOT NULL AND
msess.created_at > '#{earliest_session_create_time}' AND
msess.created_at < '#{latest_session_create_time}' AND
scheduled_start >= '#{earliest_session_start_time}' AND
(rrrs.rsvp_slot_id IS NULL OR rrrs.chosen != 't')
#{limit_sql}
SQL
ActiveRecord::Base.connection.execute(sql)
end
def _collect_eligible_recipients
ActiveRecord::Base.connection.execute("DROP TABLE IF EXISTS #{TMP_RECIP}")
limit_sql = (self.snapshot? && 0 < ENV[ENV_QUERY_LIMIT].to_i) ? "LIMIT #{ENV[ENV_QUERY_LIMIT]}" : ''
# load eligible recipients into tmp table
sql =<<SQL
SELECT
users.id AS receiver_id,
users.last_jam_locidispid AS receiver_score_idx,
mi.instrument_id
INTO TEMP TABLE #{TMP_RECIP}
FROM users
INNER JOIN musicians_instruments AS mi ON mi.user_id = users.id
INNER JOIN #{TMP_SESS} ON #{TMP_SESS}.instrument_id = mi.instrument_id
WHERE
users.last_jam_locidispid IS NOT NULL AND
users.musician = 't' AND
users.subscribe_email = 't'
#{limit_sql}
SQL
ActiveRecord::Base.connection.execute(sql)
end
def _collect_scored_recipients
ActiveRecord::Base.connection.execute("DROP TABLE IF EXISTS #{TMP_MATCH}")
if !self.snapshot? || 0 == (max_score = ENV[ENV_MAX_LATENCY].to_i)
max_score = Score::MAX_YELLOW_LATENCY
end
limit_sql = (self.snapshot? && 0 < ENV[ENV_QUERY_LIMIT].to_i) ? "LIMIT #{ENV[ENV_QUERY_LIMIT]}" : ''
sql =<<SQL
SELECT
DISTINCT #{TMP_RECIP}.receiver_id,
#{TMP_SESS}.session_id,
scores.score AS latency
INTO TEMP TABLE #{TMP_MATCH}
FROM scores
INNER JOIN #{TMP_SESS} ON #{TMP_SESS}.creator_score_idx = scores.alocidispid
INNER JOIN #{TMP_RECIP} ON #{TMP_RECIP}.receiver_score_idx = scores.blocidispid
WHERE
scores.score < #{max_score} AND
#{TMP_RECIP}.receiver_id != #{TMP_SESS}.creator_id
GROUP BY
#{TMP_RECIP}.receiver_id,
#{TMP_SESS}.session_id,
latency
#{limit_sql}
SQL
ActiveRecord::Base.connection.execute(sql)
end
# select recipients whose score is below minimum threshold
def _select_scored_recipients(offset=0)
if 0 > offset
sql = "SELECT COUNT(DISTINCT receiver_id) AS num FROM #{TMP_MATCH}"
rr = ActiveRecord::Base.connection.execute(sql)
return 0 < rr.count ? rr[0]['num'].to_i : 0
else
sql =<<SQL
SELECT DISTINCT receiver_id
FROM #{TMP_MATCH}
ORDER BY receiver_id ASC
LIMIT #{@per_page}
OFFSET #{offset}
SQL
return ActiveRecord::Base.connection.execute(sql)
end
end
def _load_recipients
# load eligible sessions into tmp table
_collect_eligible_sessions
# load eligible mail recipients into tmp table
_collect_eligible_recipients
# load mail recipients with minimum score into tmp table
_collect_scored_recipients
end
def _count_recipients(load_tmp_tables = false)
return @counters if @counters || !self.snapshot?
_load_recipients if load_tmp_tables
rr = ActiveRecord::Base.connection.execute("SELECT COUNT(*) AS num FROM #{TMP_SESS}")
session_count = 0 < rr.count ? rr[0]['num'].to_i : 0
rr = ActiveRecord::Base.connection.execute("SELECT COUNT(*) AS num FROM #{TMP_RECIP}")
receiver_candidate_count = 0 < rr.count ? rr[0]['num'].to_i : 0
rr = ActiveRecord::Base.connection.execute("SELECT COUNT(*) AS num FROM #{TMP_MATCH}")
receiver_match_count = 0 < rr.count ? rr[0]['num'].to_i : 0
@counters = {
:sessions => session_count,
:receiver_candidates => receiver_candidate_count,
:receiver_match => receiver_match_count
}
end
end
end