· 6 years ago · Jun 04, 2019, 08:02 PM
1DELIMITER ;;
2
3drop table if exists replication_events
4;;
5
6create table replication_events
7(
8 -- Internal ID for this table.
9 -- I'd prefer a UUID here, in case you failover to a snapshot and lose data.
10 -- UUIDs would not be reused (go backward) in that case.
11 -- However, mysql has abysmal support for efficient UUID data types.
12 rep_id SERIAL,
13
14 -- When the record was inserted.
15 -- Use SYSDATE since it's closer to the end of the transaction, vs CURRENT_TIMESTAMP
16 -- This is used for latency monitoring.
17 rep_timestamp TIMESTAMP NOT NULL,
18
19 -- If non-null, duplicate inserts will fail.
20 rep_deduplication_token VARCHAR(500) UNIQUE,
21
22 -- When this event should be processed. Use when setting a deuplication token,
23 -- so duplicate events have a time window in which to coalesce.
24 rep_ready_time TIMESTAMP NOT NULL DEFAULT NOW(),
25
26 -- JSON string.
27 -- Must contain replication_type='xxx'
28 -- For call_logs, it also contains twilio_id and row_version.
29 rep_data VARCHAR(8000) NOT NULL
30)
31;;
32
33CREATE INDEX idx_replication_events_ready_time USING BTREE ON replication_events (rep_ready_time)
34;;
35
36
37
38
39DROP TRIGGER IF EXISTS TR_leads_UPDT_Report ;;
40DROP TRIGGER IF EXISTS TR_leads_UPDT ;;
41
42CREATE TRIGGER TR_leads_UPDT
43 AFTER UPDATE ON leads
44 FOR EACH ROW
45BEGIN
46 SET @ready_time = DATE_ADD(NOW(), INTERVAL 5 MINUTE);
47
48 SET @deuplication_token = CONCAT(
49 'lead-',
50 CAST(IF(OLD.location_id IS NULL, '', OLD.location_id) as char(50)),
51 "#",
52 CAST(IF(OLD.ad_id IS NULL, '', OLD.ad_id) as char(50)));
53
54 -- Replicate reports to S3 and ElasticSearch
55 INSERT IGNORE INTO replication_events(rep_timestamp, rep_ready_time, rep_deduplication_token, rep_data)
56 VALUES (SYSDATE(), @ready_time, @deuplication_token, CONCAT(
57 '{ "replication_type": "report", "event_type": "function_invocation", "Message": {"func_invoke_type": "db_trigger", "table_name" : "', 'leads',
58 '", "primary_key" : "', 'lead_id',
59 '", "primary_key_value" : "', CONCAT(CAST(IF(OLD.lead_id IS NULL, '', OLD.lead_id) as char(50)), "####", CAST(IF(OLD.location_id IS NULL, '', OLD.location_id) as char(50)), "####", CAST(IF(OLD.employee_id IS NULL, '', OLD.employee_id) as char(50)), "####", CAST(IF(OLD.ad_id IS NULL, '', OLD.ad_id) as char(50)), "####", CAST(IF(OLD.lead_type IS NULL, '', OLD.lead_type) as char(50)), "####", CAST(IF(OLD.contact_date IS NULL, '', OLD.contact_date) as char(50)), "####", CAST(IF(OLD.daterented IS NULL, '', OLD.daterented) as char(50)), "####", CAST(IF(OLD.date_removed IS NULL, '', OLD.date_removed) as char(50))),
60 '", "operation" : "', 'update',
61 '", "arg1" : "', OLD.location_id,
62 '", "arg2" : "', CAST(IF(OLD.contact_date IS NULL, '', OLD.contact_date) as char(50)),
63 '", "arg3" : "', CAST(IF(OLD.daterented IS NULL, '', OLD.daterented) as char(50)),
64 '", "arg4" : "', CAST(IF(OLD.date_removed IS NULL, '', OLD.date_removed) as char(50)),
65 '"}}'));
66
67 -- Replicate reports to S3 and ElasticSearch
68 -- FIXME: Update the @deuplication_token
69 INSERT INTO replication_events(rep_timestamp, rep_data)
70 VALUES (SYSDATE(), CONCAT(
71 '{ "replication_type": "report", "event_type": "function_invocation", "Message": {"func_invoke_type": "db_trigger", "table_name" : "', 'leads',
72 '", "primary_key" : "', 'lead_id',
73 '", "primary_key_value" : "', CONCAT(CAST(IF(NEW.lead_id IS NULL, '', NEW.lead_id) as char(50)), "####", CAST(IF(NEW.location_id IS NULL, '', NEW.location_id) as char(50)), "####", CAST(IF(NEW.employee_id IS NULL, '', NEW.employee_id) as char(50)), "####", CAST(IF(NEW.ad_id IS NULL, '', NEW.ad_id) as char(50)), "####", CAST(IF(NEW.lead_type IS NULL, '', NEW.lead_type) as char(50)), "####", CAST(IF(NEW.contact_date IS NULL, '', NEW.contact_date) as char(50)), "####", CAST(IF(NEW.daterented IS NULL, '', NEW.daterented) as char(50)), "####", CAST(IF(NEW.date_removed IS NULL, '', NEW.date_removed) as char(50))),
74 '", "operation" : "', 'update',
75 '", "arg1" : "', NEW.location_id,
76 '", "arg2" : "', CAST(IF(NEW.contact_date IS NULL, '', NEW.contact_date) as char(50)),
77 '", "arg3" : "', CAST(IF(NEW.daterented IS NULL, '', NEW.daterented) as char(50)),
78 '", "arg4" : "', CAST(IF(NEW.date_removed IS NULL, '', NEW.date_removed) as char(50)),
79 '"}}'));
80
81
82END
83;;
84
85update leads set phone='815-677-6968' where lead_id=3139559;;
86update leads set phone='815-677-6961' where lead_id=3139559;;
87
88select 'Check for events, the deuped message is not ready yet' as message;;
89select * from replication_events where rep_ready_time <= now();
90
91select 'Simulate time passing for 5 minutes, deduped message should be ready' as message;;
92select * from replication_events where rep_ready_time <= date_add(now(), INTERVAL 5 minute);;
93
94select 'Poller query should use an index' as message;;
95explain select * from replication_events where rep_ready_time < now();;