· 6 years ago · Oct 13, 2019, 11:42 PM
1/*
2 2.
3 There is a table of non-intersecting intervals T ( id bigint not null, start_date timestamptz, end_date timestamptz ).
4 For example, assigning employees to positions (two assignments on one position cannot be at the same time) or action price period
5 (similarly, two prices cannot be simultaneous) - this is irrelevant, just for understanding..
6 The task: create a function for (a) adding and (b) modifying of intervals, provided that they must not intersect.
7*/
8
9
10CREATE TABLE IF NOT EXISTS non_intersecting_intervals
11(
12 id BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
13 start_date TIMESTAMP NOT NULL,
14 end_date TIMESTAMP NOT NULL
15);
16
17
18/* 2 - CREATE TRIGGER :: BEGIN */
19CREATE OR REPLACE FUNCTION non_intersecting_intervals__before__insert_update()
20RETURNS TRIGGER
21AS $$
22BEGIN
23
24 IF NEW.start_date IS NULL THEN
25 RAISE EXCEPTION 'start_date can not be null';
26 END IF;
27 IF NEW.end_date IS NULL THEN
28 RAISE EXCEPTION 'end_date can not be null';
29 END IF;
30
31 IF NEW.start_date > NEW.end_date THEN
32 RAISE EXCEPTION '% is older then %', NEW.start_date, NEW.end_date;
33 END IF;
34
35 IF (TG_OP = 'INSERT') THEN
36 IF EXISTS (SELECT nii.id FROM non_intersecting_intervals AS nii WHERE (nii.start_date, nii.end_date) OVERLAPS (NEW.start_date, NEW.end_date)) THEN
37 RAISE EXCEPTION '% and % are overlaping with other time intervals', NEW.start_date, NEW.end_date;
38 END IF;
39 ELSIF (TG_OP = 'UPDATE') THEN -- in case of update do not check record we want to update
40 IF EXISTS (SELECT nii.id FROM non_intersecting_intervals AS nii WHERE nii.id != NEW.id AND (nii.start_date, nii.end_date) OVERLAPS (NEW.start_date, NEW.end_date)) THEN
41 RAISE EXCEPTION '% and % are overlaping with other time intervals', NEW.start_date, NEW.end_date;
42 END IF;
43 END IF;
44
45 RETURN NEW;
46
47END;
48$$ LANGUAGE plpgsql;
49
50
51
52CREATE TRIGGER non_intersecting_intervals_audit
53BEFORE INSERT OR UPDATE ON non_intersecting_intervals
54FOR EACH ROW EXECUTE PROCEDURE non_intersecting_intervals__before__insert_update();
55/* 2 - CREATE TRIGGER :: END */
56
57
58/* 2 - CREATE INSERT FUNCTION :: BEGIN */
59CREATE OR REPLACE FUNCTION "public"."non_intersecting_intervals__insert"(p_start_date TIMESTAMP, p_end_date TIMESTAMP)
60RETURNS SETOF non_intersecting_intervals
61AS $$
62BEGIN
63
64 RETURN QUERY
65 INSERT INTO non_intersecting_intervals (start_date, end_date)
66 VALUES (p_start_date, p_end_date)
67 RETURNING *;
68
69END;
70$$ LANGUAGE plpgsql;
71/* 2 - CREATE INSERT FUNCTION :: END */
72
73
74/* 2 - CREATE UPDATE FUNCTION :: BEGIN */
75CREATE OR REPLACE FUNCTION "public"."non_intersecting_intervals__update"(p_id INTEGER, p_start_date TIMESTAMP, p_end_date TIMESTAMP)
76RETURNS SETOF non_intersecting_intervals
77AS $$
78BEGIN
79
80 RETURN QUERY
81 UPDATE non_intersecting_intervals
82 SET start_date = p_start_date, end_date = p_end_date
83 WHERE id = p_id
84 RETURNING *;
85
86END;
87$$ LANGUAGE plpgsql;
88/* 2 - CREATE UPDATE FUNCTION :: END */
89
90
91
92/* 2 - TEST CASES :: BEGIN */
93
94SELECT * FROM "public"."non_intersecting_intervals__insert"(CAST('29-09-2019' AS TIMESTAMP), CAST('30-10-2019' AS TIMESTAMP));
95
96INSERT INTO non_intersecting_intervals (start_date, end_date)
97VALUES (CAST('30-09-2019' AS TIMESTAMP), NULL);
98
99SELECT * FROM "public"."non_intersecting_intervals__insert"(CAST('30-09-2019' AS TIMESTAMP), CAST('29-10-2019' AS TIMESTAMP));
100
101INSERT INTO non_intersecting_intervals (start_date, end_date)
102VALUES (NULL, CAST('30-09-2019' AS TIMESTAMP));
103
104SELECT * FROM "public"."non_intersecting_intervals__insert"(CAST('29-10-2019' AS TIMESTAMP), CAST('30-09-2019' AS TIMESTAMP));
105
106INSERT INTO non_intersecting_intervals (start_date, end_date)
107VALUES (CAST('30-01-2019' AS TIMESTAMP), CAST('28-02-2019' AS TIMESTAMP));
108
109SELECT * FROM "public"."non_intersecting_intervals__insert"(CAST('29-01-2018' AS TIMESTAMP), CAST('30-09-2018' AS TIMESTAMP));
110
111SELECT * FROM "public"."non_intersecting_intervals__update"(7, CAST('29-11-2018' AS TIMESTAMP), CAST('30-11-2018' AS TIMESTAMP));
112
113/* 2 - TEST CASES :: END */
114
115
116
117
118/*
119 3. There is a constant stream of some events for logging (suppose, the source_name, event_name, service_data in json).
120 Periodically (once a week) old data (older than 1 month) must be moved to the archive (another table) and finally deleted after 3 months.
121 At the same time, there is no way to interrupt or somehow delay the flow.
122 Access to current data should be all this time without having to rewrite select (i.e., they can always be obtained from the same table).
123 The task: create (a) table (tables), (b) functions for archiving and (c) deleting old data.
124*/
125
126--pgAgent is a job scheduling agent for Postgres databases, capable of running multi-step batch or shell scripts and SQL tasks on complex schedules.
127CREATE EXTENSION pgagent;
128
129CREATE TABLE logs
130(
131 log_id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
132 source_name TEXT NOT NULL,
133 event_name TEXT NOT NULL,
134 service_data JSON NOT NULL,
135 created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
136);
137
138CREATE TABLE logs_archive
139(
140 log_id BIGINT NOT NULL,
141 source_name TEXT NOT NULL,
142 event_name TEXT NOT NULL,
143 service_data JSON NOT NULL,
144 created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
145);
146
147
148-- INSIDE THIS PG AGENT JOB IS UNSAFE APPROACH OF ARCHIVING, IF INSERT STATMENT EXECUTION IS FEW SECONDS,
149-- WE CAN LEAVE SOME OF THE LOGS INSIDE LOG TABLE (IF THERE ARE SOME LOGS CATCHED AT THAT EXACT SAME TIME)
150-- FUNCTIONS BELOW DEMONSTRATE DELETING ONLY WHAT WE INSERTED IN ARCHIVE
151
152/* 3 - PGAGENT - JOB CREATION :: BEGIN */
153UPDATE pgagent.pga_jobstep
154SET jstcode =
155'INSERT INTO logs_archive (log_id, source_name, event_name, service_data, created_at)
156SELECT l.log_id, l.source_name, l.event_name, l.service_data, l_created_at
157FROM logs AS l
158WHERE l.created_at <= NOW() - interval ''1 month'';
159
160DELETE
161FROM logs AS l
162WHERE l.created_at <= NOW() - interval ''1 month'';'
163WHERE jstid=1::integer AND jstjobid=1::integer;
164
165INSERT INTO pgagent.pga_jobstep (
166 jstjobid, jstname, jstenabled, jstkind,
167 jstconnstr, jstdbname, jstonerror,
168 jstcode, jstdesc
169)
170VALUES
171(
1721, 'logs_archive__delete_3months_old_data'::text, true, 's'::character(1), ''::text, 'mop_testdb'::name, 'f'::character(1),
173'DELETE
174FROM logs_archive AS la
175WHERE la.created_at <= NOW() - interval ''3 month'';'::text, ''::text
176) RETURNING jstid;
177
178DO $$
179DECLARE
180 scid integer;
181BEGIN
182-- Inserting a schedule (jobid: 1)
183INSERT INTO pgagent.pga_schedule(
184 jscjobid, jscname, jscdesc, jscenabled,
185 jscstart, jscminutes, jschours, jscweekdays, jscmonthdays, jscmonths
186) VALUES (
187 1, 'weekly__sunday_04_00'::text, ''::text, true,
188 '2019-10-01 02:00:00 +02:00'::timestamp with time zone,
189 -- Minutes
190 ARRAY[true,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false]::boolean[],
191 -- Hours
192 ARRAY[false,false,false,false,true,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false]::boolean[],
193 -- Week days
194 ARRAY[true,false,false,false,false,false,false]::boolean[],
195 -- Month days
196 ARRAY[false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false]::boolean[],
197 -- Months
198 ARRAY[false,false,false,false,false,false,false,false,false,false,false,false]::boolean[]
199) RETURNING jscid INTO scid;END
200$$;
201/* 3 - PGAGENT - JOB CREATION :: END */
202
203
204/* 3 - LOG ARCHIVING FUNCTIONS :: BEGIN */
205CREATE OR REPLACE FUNCTION "public"."logs_archive__insert_from_logs__olderthen_xmonths"(p_months_old INTEGER default 1)
206RETURNS SETOF BIGINT
207AS $$
208BEGIN
209
210 RETURN QUERY
211 INSERT INTO logs_archive (log_id, source_name, event_name, service_data, created_at)
212 SELECT l.log_id, l.source_name, l.event_name, l.service_data, l.created_at
213 FROM logs AS l
214 WHERE l.created_at <= NOW() - (interval '1 month' * p_months_old)
215 RETURNING log_id;
216
217END;
218$$ LANGUAGE plpgsql;
219
220
221CREATE OR REPLACE FUNCTION "public"."logs__move_to_archive__olderthen_xmonths"(p_months_old INTEGER default 1)
222RETURNS SETOF BIGINT
223AS $$
224DECLARE
225 l_inserted_ids BIGINT ARRAY := NULL;
226BEGIN
227
228 l_inserted_ids := ARRAY(SELECT * FROM "public"."logs_archive__insert_from_logs__olderthen_xmonths"(p_months_old));
229
230 RETURN QUERY
231 DELETE
232 FROM logs AS l
233 WHERE l.log_id IN
234 (
235 SELECT UNNEST(l_inserted_ids)
236 ) RETURNING log_id;
237
238END;
239$$ LANGUAGE plpgsql;
240/* 3 - LOG ARCHIVING FUNCTIONS :: END */
241
242
243/* 3 - ARCHIVE CLEANING FUNCTION :: BEGIN */
244CREATE OR REPLACE FUNCTION "public"."logs_archive__delete__olderthen_xmonths"(p_months_old INTEGER default 3)
245RETURNS SETOF BIGINT
246AS $$
247BEGIN
248
249 RETURN QUERY
250 DELETE
251 FROM logs_archive AS la
252 WHERE la.created_at <= NOW() - (interval '1 month' * p_months_old)
253 RETURNING log_id;
254
255END;
256$$ LANGUAGE plpgsql;
257/* 3 - ARCHIVE CLEANING FUNCTION :: END */
258
259
260/* 3 - INSERTING TEST DATA :: BEGIN */
261INSERT INTO logs (source_name, event_name, service_data, created_at)
262VALUES ('balance_changes', 'insert', '{ "test" : 1 }', CURRENT_TIMESTAMP - interval '2 month'),
263 ('balance_changes', 'insert', '{ "test" : 1 }', CURRENT_TIMESTAMP - interval '1 month'),
264 ('balance_changes', 'insert', '{ "test" : 1 }', CURRENT_TIMESTAMP),
265 ('balance_changes', 'insert', '{ "test" : 1 }', CURRENT_TIMESTAMP - interval '4 month'),
266 ('balance_changes', 'insert', '{ "test" : 1 }', CURRENT_TIMESTAMP - interval '11 day')
267/* 3 - INSERTING TEST DATA :: END */
268
269
270/* 3 - TESTING LOG ARCHIVING AND LOG ARCHIVE CLEANING :: BEGIN */
271SELECT * FROM "public"."logs__move_to_archive__olderthen_xmonths"(1);
272SELECT * FROM "public"."logs_archive__delete__olderthen_xmonths"(3);
273/* 3 - TESTING LOG ARCHIVING AND LOG ARCHIVE CLEANING :: END */
274
275/* 3 - CHECK FOR REMAINING DATA IN BOTH TABLES :: BEGIN */
276SELECT * FROM logs;
277
278SELECT * FROM logs_archive;
279/* 3 - CHECK FOR REMAINING DATA IN BOTH TABLES :: END */
280
281
282
283
284/*
285
286 5. There is a table of balance_changes ( id bigint not null, user_id bigint not null, event_date timestamptz not null, amount numeric not null )
287 (+ funding, - withdrawal); and table of current balances ( user_id bigint not null, balance numeric not null ).
288 The task: create a query (or function), returning balances of a specified user for the last 30 days.
289
290*/
291
292
293CREATE TABLE balance_changes
294(
295 id BIGINT NOT NULL GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
296 user_id BIGINT NOT NULL,
297 event_date TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
298 amount NUMERIC NOT NULL
299);
300
301CREATE TABLE current_balances
302(
303 user_id BIGINT NOT NULL GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
304 balance NUMERIC NOT NULL -- I suppose that negative balance is allowed
305);
306
307
308CREATE OR REPLACE FUNCTION balance_changes__after__insert_update_delete()
309RETURNS TRIGGER
310AS $$
311BEGIN
312
313 IF (TG_OP = 'INSERT') THEN
314 UPDATE current_balances
315 SET balance = balance + NEW.amount
316 WHERE user_id = NEW.user_id;
317 ELSIF (TG_OP IN ('UPDATE', 'DELETE')) THEN -- in case of update do not check record we want to update
318 RAISE EXCEPTION '% command is not allowed', TG_OP;
319 END IF;
320
321 RETURN NEW;
322
323END;
324$$ LANGUAGE plpgsql;
325
326
327CREATE TRIGGER balance_changes_audit
328BEFORE INSERT OR UPDATE OR DELETE ON balance_changes
329FOR EACH ROW EXECUTE PROCEDURE balance_changes__after__insert_update_delete();
330
331
332
333
334/* 5 - INSERTING TEST DATA :: BEGIN */
335INSERT INTO current_balances (balance)
336VALUES (0);
337
338INSERT INTO balance_changes (user_id, event_date, amount)
339VALUES (1, CURRENT_TIMESTAMP - interval '21 days', +200),
340 (1, CURRENT_TIMESTAMP - interval '17 days', -450),
341 (1, CURRENT_TIMESTAMP - interval '7 days', +300);
342/* 5 - INSERTING TEST DATA :: BEGIN */
343
344
345/* 5 - CREATE FUNCTION :: BEGIN */
346CREATE OR REPLACE FUNCTION "public"."balance_changes__select__sum_last_xdays__byuserid"(p_user_id integer, p_days integer default 30)
347RETURNS TABLE("date" date, "sum" numeric)
348AS $$
349DECLARE
350 l_current_balance NUMERIC := (SELECT balance FROM current_balances WHERE user_id = p_user_id);
351 l_to_date DATE := CURRENT_DATE - (interval '1 days' * p_days);
352 l_current_date DATE := CURRENT_DATE;
353BEGIN
354
355 CREATE TEMP TABLE IF NOT EXISTS temp_balances (balance_date DATE, amount NUMERIC);
356
357 CREATE TEMP TABLE temp_balance_changes AS
358 SELECT event_date::DATE as event_date, SUM(amount) as amount
359 FROM balance_changes
360 WHERE user_id = p_user_id AND event_date::DATE BETWEEN l_to_date AND l_current_date
361 GROUP BY event_date::DATE;
362
363 INSERT INTO temp_balances (balance_date, amount)
364 VALUES (l_current_date, l_current_balance);
365
366 WHILE l_current_date > l_to_date LOOP
367
368 l_current_balance := l_current_balance - COALESCE((SELECT amount FROM temp_balance_changes WHERE event_date = l_current_date), 0);
369 l_current_date := l_current_date - interval '1 day';
370
371 INSERT INTO temp_balances (balance_date, amount)
372 VALUES (l_current_date, l_current_balance);
373
374 END LOOP;
375
376 RETURN QUERY
377 SELECT balance_date AS "date", amount AS "sum"
378 FROM temp_balances;
379
380 DROP TABLE IF EXISTS temp_balance_changes;
381 DROP TABLE IF EXISTS temp_balances;
382
383END;
384$$ LANGUAGE plpgsql
385/* 5 - CREATE FUNCTION :: END */
386
387
388/* 5 - SELECTING TEST DATA USING FUNCTIONS :: BEGIN */
389SELECT * FROM "public"."balance_changes__select__sum_last_xdays__byuserid"(1);
390
391
392INSERT INTO balance_changes (user_id, event_date, amount)
393VALUES (1, CURRENT_TIMESTAMP, +100);
394
395
396SELECT * FROM "public"."balance_changes__select__sum_last_xdays__byuserid"(1, 40);
397/* 5 - SELECTING TEST DATA USING FUNCTIONS :: END */