115 lines
10 KiB
MySQL
115 lines
10 KiB
MySQL
|
|
-- try to make get_work faster by avoiding temporary tables
|
||
|
|
|
||
|
|
|
||
|
|
-- flag set by client if this connection is able to score.
|
||
|
|
ALTER TABLE connections ADD COLUMN udp_reachable BOOLEAN DEFAULT TRUE NOT NULL;
|
||
|
|
-- flag set by the user indicating this connection can't score until this time has elapsed
|
||
|
|
ALTER TABLE connections ADD COLUMN scoring_timeout TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL;
|
||
|
|
-- counter of how many scoring failures have occurred on this connection. once it hits a certain number, we'll set scoring_timeout
|
||
|
|
ALTER TABLE connections ADD COLUMN scoring_failures INTEGER NOT NULL DEFAULT 0;
|
||
|
|
-- counter how many times a connection has been put in scoring timeout
|
||
|
|
ALTER TABLE connections ADD COLUMN scoring_timeout_occurrences INTEGER NOT NULL DEFAULT 0;
|
||
|
|
-- counter to provide a relative counter offset to avoid writes when scoring_timeout period has elapsed
|
||
|
|
ALTER TABLE connections ADD COLUMN scoring_failures_offset INTEGER NOT NULL DEFAULT 0;
|
||
|
|
|
||
|
|
|
||
|
|
--DROP FUNCTION IF EXISTS get_work (mylocidispid BIGINT, myaddr BIGINT);
|
||
|
|
CREATE FUNCTION get_work (my_client_id VARCHAR(64), mylocidispid BIGINT, myaddr BIGINT, return_rows INT, stale_score INTERVAL) RETURNS TABLE (client_id VARCHAR(64)) VOLATILE AS $$
|
||
|
|
BEGIN
|
||
|
|
RETURN QUERY WITH
|
||
|
|
scorable_locations AS (
|
||
|
|
SELECT DISTINCT locidispid FROM connections WHERE client_type = 'client' AND connections.client_id != my_client_id AND addr != myaddr AND udp_reachable AND NOW() > scoring_timeout AND connections.music_session_id IS NULL AND
|
||
|
|
locidispid NOT IN (SELECT DISTINCT blocidispid FROM most_recent_scores WHERE alocidispid = mylocidispid AND (current_timestamp - score_dt) < stale_score) AND
|
||
|
|
locidispid/1000000 IN (SELECT locid FROM geoiplocations WHERE geog && st_buffer((SELECT geog FROM geoiplocations WHERE locid = mylocidispid/1000000), 4023360))
|
||
|
|
)
|
||
|
|
|
||
|
|
SELECT tmp.client_id FROM (SELECT connections.client_id, random() AS r, row_number() OVER (PARTITION BY connections.locidispid) AS rownum FROM connections, scorable_locations
|
||
|
|
WHERE connections.locidispid = scorable_locations.locidispid AND client_type = 'client' AND connections.client_id != my_client_id AND addr != myaddr AND udp_reachable AND NOW() > scoring_timeout AND connections.music_session_id IS NULL ) tmp WHERE rownum <= 1 ORDER BY r LIMIT return_rows;
|
||
|
|
|
||
|
|
RETURN;
|
||
|
|
|
||
|
|
END;
|
||
|
|
$$ LANGUAGE plpgsql;
|
||
|
|
-- DROP FUNCTION get_work (my_client_id VARCHAR(64), mylocidispid BIGINT, myaddr BIGINT, return_rows INT, stale_score INTERVAL)
|
||
|
|
|
||
|
|
-- when a conneciton is involved with a failed score, increment their scoring_failures column, and possible put them in the 'doghouse'
|
||
|
|
-- DROP FUNCTION connection_failed_score(a_client_id VARCHAR(64), b_client_id VARCHAR(64), timeout_time INTERVAL, failed_score_threshold INT);
|
||
|
|
CREATE FUNCTION connection_failed_score(a_client_id VARCHAR(64), b_client_id VARCHAR(64), timeout_time INTERVAL, failed_score_threshold INT) RETURNS BOOLEAN VOLATILE AS $$
|
||
|
|
DECLARE
|
||
|
|
ret BOOLEAN;
|
||
|
|
BEGIN
|
||
|
|
|
||
|
|
UPDATE connections SET scoring_timeout_occurrences = CASE WHEN scoring_failures = scoring_failures_offset + failed_score_threshold - 1 THEN scoring_timeout_occurrences + 1 ELSE scoring_timeout_occurrences END,
|
||
|
|
scoring_timeout = CASE WHEN scoring_failures = scoring_failures_offset + failed_score_threshold - 1 THEN CURRENT_TIMESTAMP + timeout_time ELSE scoring_timeout END,
|
||
|
|
scoring_failures_offset = CASE WHEN scoring_failures = scoring_failures_offset + failed_score_threshold - 1 THEN scoring_failures + 1 WHEN scoring_timeout < CURRENT_TIMESTAMP THEN scoring_failures_offset ELSE scoring_failures_offset + 1 END,
|
||
|
|
scoring_failures = scoring_failures + 1
|
||
|
|
WHERE connections.client_id = b_client_id;
|
||
|
|
|
||
|
|
|
||
|
|
UPDATE connections SET scoring_timeout_occurrences = CASE WHEN scoring_failures = scoring_failures_offset + failed_score_threshold - 1 THEN scoring_timeout_occurrences + 1 ELSE scoring_timeout_occurrences END,
|
||
|
|
scoring_timeout = CASE WHEN scoring_failures = scoring_failures_offset + failed_score_threshold - 1 THEN CURRENT_TIMESTAMP + timeout_time ELSE scoring_timeout END,
|
||
|
|
scoring_failures_offset = CASE WHEN scoring_failures = scoring_failures_offset + failed_score_threshold - 1 THEN scoring_failures + 1 WHEN scoring_timeout < CURRENT_TIMESTAMP THEN scoring_failures_offset ELSE scoring_failures_offset + 1 END,
|
||
|
|
scoring_failures = scoring_failures + 1
|
||
|
|
WHERE connections.client_id = a_client_id RETURNING scoring_timeout > NOW() AS in_timeout INTO ret;
|
||
|
|
|
||
|
|
RETURN ret;
|
||
|
|
END;
|
||
|
|
$$ LANGUAGE plpgsql;
|
||
|
|
|
||
|
|
|
||
|
|
-- when a connection is involved with a failed score, increment their scoring_failures column, and possible put them in the 'doghouse'
|
||
|
|
-- DROP FUNCTION connection_good_score(a_client_id VARCHAR(64), b_client_id VARCHAR(64));
|
||
|
|
CREATE FUNCTION connection_good_score(a_client_id VARCHAR(64), b_client_id VARCHAR(64)) RETURNS BOOLEAN VOLATILE AS $$
|
||
|
|
DECLARE
|
||
|
|
ret BOOLEAN;
|
||
|
|
BEGIN
|
||
|
|
|
||
|
|
UPDATE connections SET scoring_failures = CASE WHEN scoring_timeout < CURRENT_TIMESTAMP THEN 0 ELSE scoring_failures END,
|
||
|
|
scoring_failures_offset = CASE WHEN scoring_timeout < CURRENT_TIMESTAMP THEN 0 ELSE scoring_failures_offset END
|
||
|
|
WHERE connections.client_id = b_client_id;
|
||
|
|
|
||
|
|
UPDATE connections SET scoring_failures = CASE WHEN scoring_timeout < CURRENT_TIMESTAMP THEN 0 ELSE scoring_failures END,
|
||
|
|
scoring_failures_offset = CASE WHEN scoring_timeout < CURRENT_TIMESTAMP THEN 0 ELSE scoring_failures_offset END
|
||
|
|
WHERE connections.client_id = a_client_id RETURNING scoring_timeout > NOW() AS in_timeout INTO ret;
|
||
|
|
|
||
|
|
RETURN ret;
|
||
|
|
END;
|
||
|
|
$$ LANGUAGE plpgsql;
|
||
|
|
|
||
|
|
|
||
|
|
DROP FUNCTION IF EXISTS scorable_locations(my_client_id VARCHAR(64), mylocidispid BIGINT, myaddr BIGINT, stale_score INTERVAL);
|
||
|
|
CREATE FUNCTION scorable_locations(my_client_id VARCHAR(64), mylocidispid BIGINT, myaddr BIGINT, stale_score INTERVAL) RETURNS TABLE (locidispid BIGINT, addr BIGINT, client_id VARCHAR(64)) VOLATILE AS $$
|
||
|
|
BEGIN
|
||
|
|
RETURN QUERY
|
||
|
|
|
||
|
|
SELECT c.locidispid, c.addr, c.client_id FROM connections c WHERE client_type = 'client' AND c.udp_reachable AND NOW() > c.scoring_timeout AND c.music_session_id IS NULL AND c.addr != myaddr AND c.client_id != my_client_id AND
|
||
|
|
c.locidispid NOT IN (SELECT DISTINCT blocidispid FROM most_recent_scores WHERE alocidispid = mylocidispid AND (current_timestamp - score_dt) < stale_score) AND
|
||
|
|
c.locidispid/1000000 IN (SELECT locid FROM geoiplocations WHERE geog && st_buffer((SELECT geog FROM geoiplocations WHERE locid = mylocidispid/1000000), 4023360));
|
||
|
|
|
||
|
|
RETURN;
|
||
|
|
END;
|
||
|
|
$$ LANGUAGE plpgsql;
|
||
|
|
|
||
|
|
|
||
|
|
--DROP FUNCTION IF EXISTS get_work_summary (stale_score INTERVAL);
|
||
|
|
CREATE FUNCTION get_work_summary (stale_score INTERVAL) RETURNS TABLE (work_count BIGINT, client_id VARCHAR(64), email VARCHAR, first_name VARCHAR, last_name VARCHAR, user_id VARCHAR(64), udp_reachable BOOLEAN, in_timeout BOOLEAN, in_session BOOLEAN, scoring_failures INT, scoring_failures_offset INT, scoring_timeout_occurrences INT) VOLATILE AS $$
|
||
|
|
BEGIN
|
||
|
|
RETURN QUERY
|
||
|
|
SELECT SUM(CASE WHEN tmp.test_client_id IS NULL OR tmp.in_session OR tmp.in_timeout OR tmp.udp_reachable = FALSE THEN 0 ELSE 1 END) AS work_count, tmp.client_id AS client_id, users.email, users.first_name, users.last_name, users.id AS user_id, tmp.udp_reachable, tmp.in_timeout, tmp.in_session, tmp.scoring_failures, tmp.scoring_failures_offset, tmp.scoring_timeout_occurrences FROM
|
||
|
|
(SELECT connections.client_type, scorable_locations.client_id AS test_client_id, connections.client_id AS client_id, connections.user_id AS user_id, connections.udp_reachable, connections.scoring_timeout > NOW() as in_timeout, connections.music_session_id IS NOT NULL AS in_session, connections.scoring_failures, connections.scoring_failures_offset, connections.scoring_timeout_occurrences, scorable_locations.client_id IS NULL AS same_client, row_number() OVER (PARTITION BY connections.locidispid) AS rownum FROM connections LEFT OUTER JOIN scorable_locations(connections.client_id, connections.locidispid, connections.addr, stale_score)
|
||
|
|
ON connections.locidispid != scorable_locations.locidispid) tmp INNER JOIN users ON tmp.user_id = users.id WHERE tmp.client_type = 'client' GROUP BY tmp.client_id, users.email, users.first_name, users.last_name, users.id, tmp.same_client, tmp.udp_reachable, tmp.in_timeout, tmp.in_session, tmp.scoring_failures, tmp.scoring_failures_offset, tmp.scoring_timeout_occurrences ORDER BY work_count DESC;
|
||
|
|
RETURN;
|
||
|
|
END;
|
||
|
|
$$ LANGUAGE plpgsql;
|
||
|
|
|
||
|
|
--DROP FUNCTION IF EXISTS get_work_summary_no_agg(stale_score INTERVAL);
|
||
|
|
-- useful for debugging get_work_summary
|
||
|
|
CREATE FUNCTION get_work_summary_no_agg (stale_score INTERVAL) RETURNS TABLE (client_id VARCHAR(64), test_client_id VARCHAR(64), email VARCHAR, first_name VARCHAR, last_name VARCHAR, user_id VARCHAR(64), udp_reachable BOOLEAN, in_timeout BOOLEAN, scoring_failures INT, scoring_failures_offset INT, scoring_timeout_occurrences INT) VOLATILE AS $$
|
||
|
|
BEGIN
|
||
|
|
RETURN QUERY
|
||
|
|
SELECT tmp.client_id AS client_id, tmp.test_client_id, users.email, users.first_name, users.last_name, users.id AS user_id, tmp.udp_reachable, tmp.in_timeout, tmp.scoring_failures, tmp.scoring_failures_offset, tmp.scoring_timeout_occurrences FROM
|
||
|
|
(SELECT scorable_locations.client_id AS test_client_id, connections.client_id AS client_id, connections.user_id AS user_id, connections.udp_reachable, connections.scoring_timeout > NOW() as in_timeout, connections.scoring_failures, connections.scoring_failures_offset, connections.scoring_timeout_occurrences, scorable_locations.client_id IS NULL AS same_client, row_number() OVER (PARTITION BY connections.locidispid) AS rownum FROM connections LEFT OUTER JOIN scorable_locations(connections.client_id, connections.locidispid, connections.addr, stale_score)
|
||
|
|
ON connections.locidispid != scorable_locations.locidispid AND connections.client_type = 'client') tmp INNER JOIN users ON tmp.user_id = users.id ORDER BY tmp.client_id;
|
||
|
|
RETURN;
|
||
|
|
END;
|
||
|
|
$$ LANGUAGE plpgsql;
|