· 5 years ago · May 15, 2020, 10:48 PM
1-- PostgreSQL logical replication for async service communication
2--
3-- Additional context, diagrams, and discussions
4--
5-- * https://gist.github.com/shuber/8e53d42d0de40e90edaf4fb182b59dfc#gistcomment-3305006
6-- * https://old.reddit.com/r/PostgreSQL/comments/gkdp6p/logical_replication_for_async_service/
7-- * https://www.postgresql.org/message-id/CAM8f5Mi1Ftj%2B48PZxN1AbM-P%3D4YMLENY5zRaPwTbmbkFwCsTkA%40mail.gmail.com
8
9
10
11-- for everything message related
12create schema if not exists api;
13
14-- case-insensitive text and uuid columns
15create extension if not exists citext with schema public;
16create extension if not exists pgcrypto with schema public;
17
18-- parent partition for all messages
19create table if not exists api.messages (
20 -- these ids will not conflict across services
21 id uuid not null default gen_random_uuid(),
22
23 -- specified when it's a reply to another message
24 reply uuid,
25
26 -- when was this message initially created
27 time timestamptz not null default now(),
28
29 -- the sending service's name
30 sender citext not null default current_database(),
31
32 -- the receiving service's name
33 receiver citext not null,
34
35 -- type of the message (services define these)
36 type citext not null,
37
38 -- message payload
39 data jsonb not null default '{}'::jsonb,
40
41 -- payloads must be hash objects
42 check (jsonb_typeof(data) = 'object'),
43
44 -- partition keys must be part of the primary key
45 primary key (receiver, id)
46
47 -- automatically write messages to service specific tables
48) partition by list (receiver);
49
50
51-- generate composite primary key for notifications
52-- select m.*, m.pk from api.messages;
53create function api.pk(api.messages)
54returns text stable language sql as $$
55 select $1.receiver || '-' || $1.id::text;
56$$;
57
58
59-- inbox/outbox table partitions for our "payments" and "store" services
60-- note: there's only one inbox per database
61create table if not exists api.payments partition of api.messages for values in ('payments');
62create table if not exists api.store partition of api.messages for values in ('store');
63
64
65-- create a store replication user in the payments database
66create role store with replication login password 'store';
67grant all privileges on database payments to store; -- TODO: fine tune
68grant all privileges on all tables in schema api to store; -- TODO: fine tune
69
70-- create a payments replication user in the store database
71create role payments with replication login password 'payments';
72grant all privileges on database store to payments; -- TODO: fine tune
73grant all privileges on all tables in schema api to payments; -- TODO: fine tune
74
75
76-- publish outgoing store messages in the payments database
77create publication store for table api.store with (publish = 'delete, insert');
78
79-- publish outgoing payments messages in the store database
80create publication payments for table api.payments with (publish = 'delete, insert');
81
82
83-- subscribe to payments messages in the store database
84create subscription payments_to_store
85connection 'host=payments port=5432 password=payments user=payments dbname=payments'
86publication store;
87
88-- subscribe to store messages in the payments database
89create subscription store_to_payments
90connection 'host=store port=5432 password=store user=store dbname=store'
91publication payments;
92
93
94-- message acknowledgement, notification, and cleanup
95create or replace function api.ack()
96returns trigger language plpgsql as $$
97 begin
98 -- The service deleted a message they sent us from their outbox
99 if TG_OP = 'DELETE' then
100 if old.type != 'ack' then
101 -- Delete the associated ack message from our outbox
102 delete from api.messages
103 where reply = old.id
104 and type = 'ack'
105 and receiver = old.sender
106 and sender = old.receiver;
107 end if;
108
109 -- Don't delete the message from our inbox
110 return null;
111
112 -- We're receiving acknowedgement of a message we sent earlier
113 elsif new.type = 'ack' then
114 -- Delete the original message from our outbox
115 delete from api.messages
116 where id = new.reply
117 and receiver = new.sender
118 and sender = new.receiver;
119
120 -- Notify workers/threads waiting for acknowledgement of specific messages
121 perform pg_notify(new.receiver || '.ack.' || new.reply, new.pk);
122
123 -- Don't save this acknowledgement message
124 return null;
125
126 -- We're receiving a new message
127 else
128 -- Acknowledge receipt of the message
129 insert into api.messages (sender, receiver, type, reply)
130 values (new.receiver, new.sender, 'ack', new.id);
131
132 -- Notify workers/threads waiting for replies to specific messages
133 if new.reply is not null then
134 perform pg_notify(new.receiver || '.reply.' || new.reply, new.pk);
135 end if;
136
137 -- Notify workers/threads waiting for new messages
138 perform pg_notify(new.receiver::text, new.pk);
139
140 -- Deliver the message to our inbox
141 return new;
142 end if;
143 end;
144$$;
145
146
147-- enable trigger on the inbox table in the payments db
148create trigger ack before delete or insert on api.payments
149for each row execute procedure api.ack();
150
151-- enable trigger on the inbox table in the store db
152create trigger ack before delete or insert on api.store
153for each row execute procedure api.ack();
154
155-- since these aren't enabled by default for replication
156alter table api.payments enable always trigger ack;
157alter table api.store enable always trigger ack;
158
159
160-- calculate delivery latency for testing below
161-- select m.*, m.latency from api.messages m;
162create function api.latency(api.messages)
163returns interval stable language sql as $$
164 select case when $1.received_at is not null then $1.received_at - $1.sent_at end;
165$$;
166
167
168-- let's send a message to the payments service from the store database
169insert into api.messages(receiver, type, data) values ('payments', 'charge', '{"usd":100,"user":123}'::jsonb);
170
171
172-- (payments db) select m.*, m.latency from api.messages;
173 id | reply | sent_at | received_at | sender | receiver | type | data | latency
174--------------------------------------+-------+-------------------------------+-------------------------------+--------+----------+--------+------------------------+-----------------
175 30c60d61-886d-42d5-a5ff-0adca10d6eba | | 2020-05-13 04:11:28.341731+00 | 2020-05-13 04:11:28.347607+00 | store | payments | charge | {"usd":100,"user":123} | 00:00:00.005876
176
177
178-- (store db) select m.*, m.latency from api.messages;
179 id | reply | sent_at | received_at | sender | receiver | type | data
180----+-------+---------+-------------+--------+----------+------+------
181(0 rows)