jam-cloud/ruby/lib/jam_ruby/models/email_batch_scheduled_sessi...

294 lines
10 KiB
Ruby

module JamRuby
class ResultStub
extend ActiveModel::Naming
extend ActiveModel::Translation
include ActiveModel::Validations
include ActiveModel::Conversion
attr_accessor :vals
def initialize(vals)
@vals = vals
end
def self.stubs(sql)
ActiveRecord::Base.connection.execute(sql).collect { |rr| self.new(rr) }
end
def persisted?; false; end
end
# Temporary Tables created by this class:
# tmp_candidate_sessions
# ----------------------
#
# These are 'open' sessions that have any open slots left, and fall within a certain start time.
# The session creator must also have a locidispid.
#
# session_id - music_session.id
# creator_id - music_session.user_id
# creator_score_idx - this is the creator's users.last_jam_locidispid
# instrument_id - instruments that are open as gleamed from the RSVP. If this is NULL, it means 'ANY INSTRUMENT'
# invited_user_id - the ID of a user who was invited. Can be NULL.
#
# tmp_candidate_recipients
# ------------------------
#
# These are musicians, that allow email notifications, that have an instrument which matches the session's open RSVP slot's instrument.
# The musician must also have a locidispid.
#
# receiver_id - user ID that could be in the session
# receiver_score_idx - the user's last_jam_locidispid
# instrument_id - the user's matching instrument for a open session slot. If this is NULL, it means 'ANY INSTRUMENT'
# invited_user_id
#
# tmp_matches
# -----------
#
# These are 'candidate_recipients' that have a decent enough score with the creator of the music sessions in tmp_candidate_sessions
#
# receiver_id - the user.id that should receive an Daily Session email
# session_id - the music_session.id for the email
# latency - the score.score between the creator and the candidate (needs to be full score soon)
class EmailBatchScheduledSessions < EmailBatchPeriodic
BATCH_SIZE = 500
SINCE_DAYS = 2
MIN_HOURS_START = 2
ENV_MAX_LATENCY = 'env_max_latency'
ENV_QUERY_LIMIT = 'env_query_limit'
SNAPSHOT_QUERY_LIMIT = '500'
def self.refresh_snapshot!
self.where(:aasm_state => 'snapshot').limit(1).first.try(:destroy)
oo = self.create
oo.snapshoting!
oo
end
def self.subject
"New sessions have been scheduled that may be a good match for you!"
end
def earliest_session_create_time
time_since_last_batch(SINCE_DAYS)
end
def latest_session_create_time
self.created_at
end
def earliest_session_start_time
self.created_at + MIN_HOURS_START.hours
end
def snapshot_eligible_sessions
rr = ActiveRecord::Base.connection.execute("SELECT COUNT(*) AS num FROM tmp_candidate_sessions")
[0 < rr.count ? rr[0]['num'].to_i : 0, ResultStub.stubs("SELECT * FROM tmp_candidate_sessions")]
end
def snapshot_eligible_recipients
rr = ActiveRecord::Base.connection.execute("SELECT COUNT(*) AS num FROM tmp_candidate_recipients")
[0 < rr.count ? rr[0]['num'].to_i : 0, ResultStub.stubs("SELECT * FROM tmp_candidate_recipients")]
end
def snapshot_scored_recipients
rr = ActiveRecord::Base.connection.execute("SELECT COUNT(*) AS num FROM tmp_matches")
[0 < rr.count ? rr[0]['num'].to_i : 0, ResultStub.stubs("SELECT * FROM tmp_matches")]
end
def take_snapshot
_load_recipients
_count_recipients
self.update_attribute(:test_emails, @counters.inspect)
end
def fetch_recipients(per_page=BATCH_SIZE)
objs = []
_load_recipients
@per_page = per_page
num_recip = _select_scored_recipients(-1)
loops = (num_recip / @per_page) + (num_recip % @per_page) - 1
0.upto(loops) do |nn|
offset = nn * @per_page
# now just get the sessions/latency for each distinct mail recipient
_select_scored_recipients(offset).each do |result|
receiver = User.find_by_id(result['receiver_id'])
sessions = MusicSession.select("music_sessions.*, tmp_matches.latency")
.joins("INNER JOIN tmp_matches ON tmp_matches.session_id = music_sessions.id")
.where(["tmp_matches.receiver_id = ?", receiver.id])
.order('tmp_matches.latency')
.limit(20)
.includes([:genre, :creator])
block_given? ? yield(receiver, sessions) : objs << [receiver, sessions]
end
end
objs
end
def deliver_batch_sets!
self.opt_in_count = 0
self.fetch_recipients do |receiver, sessions_and_latency|
self.opt_in_count += 1
bset = EmailBatchSet.scheduled_session_set(self, receiver, sessions_and_latency)
UserMailer.scheduled_session_daily(receiver, sessions_and_latency).deliver
end
self.test_emails = _count_recipients.inspect
self.sent_count = self.opt_in_count
self.save
self.did_batch_run!
end
def self.send_daily_session_batch
oo = self.create
oo.deliver_batch
oo
end
private
# inserts eligible sessions to temp table
def _collect_eligible_sessions
ActiveRecord::Base.connection.execute("DROP TABLE IF EXISTS tmp_candidate_sessions")
limit_sql = (self.snapshot? && 0 < ENV[ENV_QUERY_LIMIT].to_i) ? "LIMIT #{ENV[ENV_QUERY_LIMIT]}" : ''
sql =<<SQL
SELECT
msess.id AS session_id,
msess.user_id AS creator_id,
users.last_jam_locidispid AS creator_score_idx,
rs.instrument_id,
invitations.receiver_id AS invited_user_id,
msess.is_unstructured_rsvp,
msess.open_rsvps
INTO TEMP TABLE tmp_candidate_sessions
FROM music_sessions msess
INNER JOIN users ON users.id = msess.user_id
INNER JOIN rsvp_slots AS rs ON rs.music_session_id = msess.id
LEFT JOIN rsvp_requests_rsvp_slots AS rrrs ON rrrs.rsvp_slot_id = rs.id
LEFT JOIN invitations ON open_rsvps = FALSE AND invitations.music_session_id = msess.id
WHERE
(msess.is_unstructured_rsvp = TRUE OR (rrrs.id IS NULL OR rrrs.chosen != TRUE)) AND
users.last_jam_locidispid IS NOT NULL AND
msess.created_at > '#{earliest_session_create_time}' AND
msess.created_at < '#{latest_session_create_time}' AND
scheduled_start >= '#{earliest_session_start_time}'
#{limit_sql}
SQL
ActiveRecord::Base.connection.execute(sql)
end
def _collect_eligible_recipients
ActiveRecord::Base.connection.execute("DROP TABLE IF EXISTS tmp_candidate_recipients").check
limit_sql = (self.snapshot? && 0 < ENV[ENV_QUERY_LIMIT].to_i) ? "LIMIT #{ENV[ENV_QUERY_LIMIT]}" : ''
# load eligible recipients into tmp table
sql =<<SQL
SELECT
users.id AS receiver_id,
users.last_jam_locidispid AS receiver_score_idx,
mi.instrument_id,
tmp_candidate_sessions.invited_user_id,
tmp_candidate_sessions.session_id AS session_id,
tmp_candidate_sessions.creator_id AS creator_id,
tmp_candidate_sessions.creator_score_idx AS creator_score_idx
INTO TEMP TABLE tmp_candidate_recipients
FROM users
INNER JOIN musicians_instruments AS mi ON mi.user_id = users.id
INNER JOIN tmp_candidate_sessions ON tmp_candidate_sessions.is_unstructured_rsvp = TRUE OR
(tmp_candidate_sessions.open_rsvps = TRUE AND tmp_candidate_sessions.instrument_id = mi.instrument_id) OR
tmp_candidate_sessions.invited_user_id = users.id
WHERE
users.last_jam_locidispid IS NOT NULL AND
users.musician = TRUE AND
users.subscribe_email = TRUE AND
users.id != tmp_candidate_sessions.creator_id
#{limit_sql}
SQL
ActiveRecord::Base.connection.execute(sql)
end
def _collect_scored_recipients
ActiveRecord::Base.connection.execute("DROP TABLE IF EXISTS tmp_matches")
if !self.snapshot? || 0 == (max_score = ENV[ENV_MAX_LATENCY].to_i)
max_score = APP_CONFIG.max_yellow_full_score
end
limit_sql = (self.snapshot? && 0 < ENV[ENV_QUERY_LIMIT].to_i) ? "LIMIT #{ENV[ENV_QUERY_LIMIT]}" : ''
sql =<<SQL
SELECT DISTINCT
tmp_candidate_recipients.receiver_id,
tmp_candidate_recipients.session_id,
nondirected_scores.full_score AS latency
INTO TEMP TABLE tmp_matches
FROM nondirected_scores
INNER JOIN tmp_candidate_recipients ON
tmp_candidate_recipients.creator_id = nondirected_scores.a_userid AND
tmp_candidate_recipients.receiver_id = nondirected_scores.b_userid
WHERE
nondirected_scores.full_score < #{max_score}
GROUP BY
tmp_candidate_recipients.receiver_id,
tmp_candidate_recipients.session_id,
latency
#{limit_sql}
SQL
ActiveRecord::Base.connection.execute(sql)
end
# select recipients whose score is below minimum threshold
def _select_scored_recipients(offset=0)
if 0 > offset
sql = "SELECT COUNT(DISTINCT receiver_id) AS num FROM tmp_matches"
rr = ActiveRecord::Base.connection.execute(sql)
return 0 < rr.count ? rr[0]['num'].to_i : 0
else
sql =<<SQL
SELECT DISTINCT receiver_id
FROM tmp_matches
ORDER BY receiver_id ASC
LIMIT #{@per_page}
OFFSET #{offset}
SQL
return ActiveRecord::Base.connection.execute(sql)
end
end
def _load_recipients
# load eligible sessions into tmp table
_collect_eligible_sessions
# load eligible mail recipients into tmp table
_collect_eligible_recipients
# load mail recipients with minimum score into tmp table
_collect_scored_recipients
end
def _count_recipients(load_tmp_tables = false)
return @counters if @counters || !self.snapshot?
_load_recipients if load_tmp_tables
rr = ActiveRecord::Base.connection.execute("SELECT COUNT(*) AS num FROM tmp_candidate_sessions")
session_count = 0 < rr.count ? rr[0]['num'].to_i : 0
rr = ActiveRecord::Base.connection.execute("SELECT COUNT(*) AS num FROM tmp_candidate_recipients")
receiver_candidate_count = 0 < rr.count ? rr[0]['num'].to_i : 0
rr = ActiveRecord::Base.connection.execute("SELECT COUNT(*) AS num FROM tmp_matches")
receiver_match_count = 0 < rr.count ? rr[0]['num'].to_i : 0
@counters = {
:sessions => session_count,
:receiver_candidates => receiver_candidate_count,
:receiver_match => receiver_match_count
}
end
end
end