· 5 years ago · May 15, 2020, 10:40 PM
1-- for everything message related
2create schema if not exists api;
3
4-- case-insensitive text and uuid columns
5create extension if not exists citext with schema public;
6create extension if not exists pgcrypto with schema public;
7
8-- parent partition for all messages
9create table if not exists api.messages (
10 -- these ids will not conflict across services
11 id uuid not null default gen_random_uuid(),
12
13 -- specified when it's a reply to another message
14 reply uuid,
15
16 -- when was this message initially created
17 time timestamptz not null default now(),
18
19 -- the sending service's name
20 sender citext not null default current_database(),
21
22 -- the receiving service's name
23 receiver citext not null,
24
25 -- type of the message (services define these)
26 type citext not null,
27
28 -- message payload
29 data jsonb not null default '{}'::jsonb,
30
31 -- payloads must be hash objects
32 check (jsonb_typeof(data) = 'object'),
33
34 -- partition keys must be part of the primary key
35 primary key (receiver, id)
36
37 -- automatically write messages to service specific tables
38) partition by list (receiver);
39
40
41-- generate composite primary key for notifications
42-- select m.*, m.pk from api.messages;
43create function api.pk(api.messages)
44returns text stable language sql as $$
45 select $1.receiver || '-' || $1.id::text;
46$$;
47
48
49-- inbox/outbox table partitions for our "payments" and "store" services
50-- note: there's only one inbox per database
51create table if not exists api.payments partition of api.messages for values in ('payments');
52create table if not exists api.store partition of api.messages for values in ('store');
53
54
55-- create a store replication user in the payments database
56create role store with replication login password 'store';
57grant all privileges on database payments to store; -- TODO: fine tune
58grant all privileges on all tables in schema api to store; -- TODO: fine tune
59
60-- create a payments replication user in the store database
61create role payments with replication login password 'payments';
62grant all privileges on database store to payments; -- TODO: fine tune
63grant all privileges on all tables in schema api to payments; -- TODO: fine tune
64
65
66-- publish outgoing store messages in the payments database
67create publication store for table api.store with (publish = 'delete, insert');
68
69-- publish outgoing payments messages in the store database
70create publication payments for table api.payments with (publish = 'delete, insert');
71
72
73-- subscribe to payments messages in the store database
74create subscription payments_to_store
75connection 'host=payments port=5432 password=payments user=payments dbname=payments'
76publication store;
77
78-- subscribe to store messages in the payments database
79create subscription store_to_payments
80connection 'host=store port=5432 password=store user=store dbname=store'
81publication payments;
82
83
84-- message acknowledgement, notification, and cleanup
85create or replace function api.ack()
86returns trigger language plpgsql as $$
87 begin
88 -- The service deleted a message they sent us from their outbox
89 if TG_OP = 'DELETE' then
90 if old.type != 'ack' then
91 -- Delete the associated ack message from our outbox
92 delete from api.messages
93 where reply = old.id
94 and type = 'ack'
95 and receiver = old.sender
96 and sender = old.receiver;
97 end if;
98
99 -- Don't delete the message from our inbox
100 return null;
101
102 -- We're receiving acknowedgement of a message we sent earlier
103 elsif new.type = 'ack' then
104 -- Delete the original message from our outbox
105 delete from api.messages
106 where id = new.reply
107 and receiver = new.sender
108 and sender = new.receiver;
109
110 -- Notify workers/threads waiting for acknowledgement of specific messages
111 perform pg_notify(new.receiver || '.ack.' || new.reply, new.pk);
112
113 -- Don't save this acknowledgement message
114 return null;
115
116 -- We're receiving a new message
117 else
118 -- Acknowledge receipt of the message
119 insert into api.messages (sender, receiver, type, reply)
120 values (new.receiver, new.sender, 'ack', new.id);
121
122 -- Notify workers/threads waiting for replies to specific messages
123 if new.reply is not null then
124 perform pg_notify(new.receiver || '.reply.' || new.reply, new.pk);
125 end if;
126
127 -- Notify workers/threads waiting for new messages
128 perform pg_notify(new.receiver::text, new.pk);
129
130 -- Deliver the message to our inbox
131 return new;
132 end if;
133 end;
134$$;
135
136
137-- enable trigger on the inbox table in the payments db
138create trigger ack before delete or insert on api.payments
139for each row execute procedure api.ack();
140
141-- enable trigger on the inbox table in the store db
142create trigger ack before delete or insert on api.store
143for each row execute procedure api.ack();
144
145-- since these aren't enabled by default for replication
146alter table api.payments enable always trigger ack;
147alter table api.store enable always trigger ack;
148
149
150-- calculate delivery latency for testing below
151-- select m.*, m.latency from api.messages m;
152create function api.latency(api.messages)
153returns interval stable language sql as $$
154 select case when $1.received_at is not null then $1.received_at - $1.sent_at end;
155$$;
156
157
158-- let's send a message to the payments service from the store database
159insert into api.messages(receiver, type, data) values ('payments', 'charge', '{"usd":100,"user":123}'::jsonb);
160
161
162-- (payments db) select m.*, m.latency from api.messages;
163 id | reply | sent_at | received_at | sender | receiver | type | data | latency
164--------------------------------------+-------+-------------------------------+-------------------------------+--------+----------+--------+------------------------+-----------------
165 30c60d61-886d-42d5-a5ff-0adca10d6eba | | 2020-05-13 04:11:28.341731+00 | 2020-05-13 04:11:28.347607+00 | store | payments | charge | {"usd":100,"user":123} | 00:00:00.005876
166
167
168-- (store db) select m.*, m.latency from api.messages;
169 id | reply | sent_at | received_at | sender | receiver | type | data
170----+-------+---------+-------------+--------+----------+------+------
171(0 rows)