· 6 years ago · Apr 22, 2019, 04:18 PM
1--/
2CREATE OR REPLACE FUNCTION my_versioning()
3RETURNS TRIGGER AS $$
4DECLARE
5 sys_period text;
6 history_table text;
7 manipulate jsonb;
8 commonColumns text[];
9 -- name of the column that holds timestamp provided in source record
10 timestamp_name text;
11 -- holds timestampt provided in source record in timestamp_name column
12 timestamp_to_use timestamptz;
13 -- which timestamp use for closing valid records, if closing_timestamp sesson variable is set, we usi it otherwise we use current_timestamp
14 closing_timestamp timestamptz;
15 range_lower timestamptz;
16 transaction_info txid_snapshot;
17 existing_range tstzrange;
18 holder record;
19 holder2 record;
20 pg_version integer;
21 debug1 text;
22 debug2 timestamptz;
23BEGIN
24 -- version 0.2.1
25-- RAISE INFO 'Type of operation: %', TG_OP ;
26 IF TG_WHEN != 'BEFORE' OR TG_LEVEL != 'ROW' THEN
27 RAISE TRIGGER_PROTOCOL_VIOLATED USING
28 MESSAGE = 'function "versioning" must be fired BEFORE ROW';
29 END IF;
30
31 IF TG_OP != 'INSERT' AND TG_OP != 'UPDATE' AND TG_OP != 'DELETE' THEN
32 RAISE TRIGGER_PROTOCOL_VIOLATED USING
33 MESSAGE = 'function "versioning" must be fired for INSERT or UPDATE or DELETE';
34 END IF;
35
36 IF TG_NARGS != 4 THEN
37 RAISE INVALID_PARAMETER_VALUE USING
38 MESSAGE = 'wrong number of parameters for function "versioning"',
39 HINT = 'expected 4 parameters but got ' || TG_NARGS;
40 END IF;
41
42 sys_period := TG_ARGV[0];
43 history_table := TG_ARGV[1];
44 --history_table := TG_ARGV[3];
45 timestamp_name := TG_ARGV[2];
46 --timestamp_to_use := now();
47
48 -- check if sys_period exists on original table
49 SELECT atttypid, attndims INTO holder FROM pg_attribute WHERE attrelid = TG_RELID AND attname = sys_period AND NOT attisdropped;
50 IF NOT FOUND THEN
51 RAISE 'column "%" of relation "%" does not exist', sys_period, TG_TABLE_NAME USING
52 ERRCODE = 'undefined_column';
53 END IF;
54 IF holder.atttypid != to_regtype('tstzrange') THEN
55 IF holder.attndims > 0 THEN
56 RAISE 'system period column "%" of relation "%" is not a range but an array', sys_period, TG_TABLE_NAME USING
57 ERRCODE = 'datatype_mismatch';
58 END IF;
59
60 SELECT rngsubtype INTO holder2 FROM pg_range WHERE rngtypid = holder.atttypid;
61 IF FOUND THEN
62 RAISE 'system period column "%" of relation "%" is not a range of timestamp with timezone but of type %', sys_period, TG_TABLE_NAME, format_type(holder2.rngsubtype, null) USING
63 ERRCODE = 'datatype_mismatch';
64 END IF;
65
66 RAISE 'system period column "%" of relation "%" is not a range but type %', sys_period, TG_TABLE_NAME, format_type(holder.atttypid, null) USING
67 ERRCODE = 'datatype_mismatch';
68 END IF;
69
70 IF TG_OP = 'UPDATE' OR TG_OP = 'DELETE' THEN
71 -- Ignore rows already modified in this transaction - will stop multiple updates in batch
72/* transaction_info := txid_current_snapshot();
73 IF OLD.xmin::text >= (txid_snapshot_xmin(transaction_info) % (2^32)::bigint)::text
74 AND OLD.xmin::text <= (txid_snapshot_xmax(transaction_info) % (2^32)::bigint)::text THEN
75 IF TG_OP = 'DELETE' THEN
76 RETURN OLD;
77 END IF;
78
79 RETURN NEW;
80 END IF;*/
81
82 SELECT current_setting('server_version_num')::integer
83 INTO pg_version;
84
85 -- to support postgres < 9.6
86 IF pg_version < 90600 THEN
87 -- check if history table exits
88 IF to_regclass(history_table::cstring) IS NULL THEN
89 RAISE 'relation "%" does not exist', history_table;
90 END IF;
91 ELSE
92 IF to_regclass(history_table) IS NULL THEN
93 RAISE 'relation "%" does not exist', history_table;
94 END IF;
95 END IF;
96
97 -- check if history table has sys_period
98 IF NOT EXISTS(SELECT * FROM pg_attribute WHERE attrelid = history_table::regclass AND attname = sys_period AND NOT attisdropped) THEN
99 RAISE 'history relation "%" does not contain system period column "%"', history_table, sys_period USING
100 HINT = 'history relation must contain system period column with the same name and data type as the versioned one';
101 END IF;
102
103 EXECUTE format('SELECT $1.%I', sys_period) USING OLD INTO existing_range;
104 --RAISE INFO 'EE existing_range: %', existing_range;
105
106 IF TG_OP = 'UPDATE' THEN
107 EXECUTE format('SELECT $1.%I', timestamp_name) USING NEW INTO timestamp_to_use; -- for UPDATE, may not work for insert (works), but fails for delete
108 ELSEIF TG_OP = 'DELETE' THEN
109 EXECUTE 'show session.closing_timestamp' INTO closing_timestamp;
110 EXECUTE 'SELECT COALESCE($1, CURRENT_TIMESTAMP)' USING closing_timestamp INTO timestamp_to_use; -- for UPDATE, may not work for insert (works), but fails for delete
111 END IF;
112-- RAISE INFO 'EE timestamp_to_use: %', timestamp_to_use;
113
114 IF existing_range IS NULL THEN
115 RAISE 'system period column "%" of relation "%" must not be null', sys_period, TG_TABLE_NAME USING
116 ERRCODE = 'null_value_not_allowed';
117 END IF;
118
119 IF isempty(existing_range) OR NOT upper_inf(existing_range) THEN
120 RAISE 'system period column "%" of relation "%" contains invalid value', sys_period, TG_TABLE_NAME USING
121 ERRCODE = 'data_exception',
122 DETAIL = 'valid ranges must be non-empty and unbounded on the high side';
123 END IF;
124 debug1=TG_ARGV[3];
125/* IF TG_ARGV[4] = True THEN
126 RAISE INFO 'TRUE BOOL';
127 END IF;*/
128 IF TG_ARGV[3] = 'true' THEN
129 -- mitigate update conflicts
130 range_lower := lower(existing_range);
131 IF range_lower >= timestamp_to_use THEN -- we should split the interval as it is wrong ordering
132 RAISE WARNING 'WRONG ORDER Record update % has to be skipped', NEW;
133 RETURN NULL;
134 timestamp_to_use := range_lower + interval '1 microseconds';
135 END IF;
136 END IF;
137
138 WITH history AS
139 (SELECT attname, atttypid
140 FROM pg_attribute
141 WHERE attrelid = history_table::regclass
142 AND attnum > 0
143 AND NOT attisdropped),
144 main AS
145 (SELECT attname, atttypid
146 FROM pg_attribute
147 WHERE attrelid = TG_RELID
148 AND attnum > 0
149 AND NOT attisdropped)
150 SELECT
151 history.attname AS history_name,
152 main.attname AS main_name,
153 history.atttypid AS history_type,
154 main.atttypid AS main_type
155 INTO holder
156 FROM history
157 INNER JOIN main
158 ON history.attname = main.attname
159 WHERE
160 history.atttypid != main.atttypid;
161
162 IF FOUND THEN
163 RAISE 'column "%" of relation "%" is of type % but column "%" of history relation "%" is of type %',
164 holder.main_name, TG_TABLE_NAME, format_type(holder.main_type, null), holder.history_name, history_table, format_type(holder.history_type, null)
165 USING ERRCODE = 'datatype_mismatch';
166 END IF;
167
168 WITH history AS
169 (SELECT attname
170 FROM pg_attribute
171 WHERE attrelid = history_table::regclass
172 AND attnum > 0
173 AND NOT attisdropped),
174 main AS
175 (SELECT attname
176 FROM pg_attribute
177 WHERE attrelid = TG_RELID
178 AND attnum > 0
179 AND NOT attisdropped)
180 SELECT array_agg(quote_ident(history.attname)) INTO commonColumns
181 FROM history
182 INNER JOIN main
183 ON history.attname = main.attname
184 AND history.attname not in ( sys_period, timestamp_name);
185
186 EXECUTE ('INSERT INTO ' ||
187 CASE split_part(history_table, '.', 2)
188 WHEN '' THEN
189 quote_ident(history_table)
190 ELSE
191 quote_ident(split_part(history_table, '.', 1)) || '.' || quote_ident(split_part(history_table, '.', 2))
192 END ||
193 '(' ||
194 array_to_string(commonColumns , ',') ||
195 ',' ||
196 quote_ident(sys_period) ||
197 ') VALUES ($1.' ||
198 array_to_string(commonColumns, ',$1.') ||
199 ',tstzrange($2, $3, ''[)''))')
200 USING OLD, range_lower, timestamp_to_use;
201
202 -- debug
203
204 SELECT ('INSERT INTO ' ||
205 CASE split_part(history_table, '.', 2)
206 WHEN '' THEN
207 quote_ident(history_table)
208 ELSE
209 quote_ident(split_part(history_table, '.', 1)) || '.' || quote_ident(split_part(history_table, '.', 2))
210 END ||
211 '(' ||
212 array_to_string(commonColumns , ',') ||
213 ',' ||
214 quote_ident(sys_period) ||
215 ') VALUES ($1.' ||
216 array_to_string(commonColumns, ',$1.') ||
217 ',tstzrange($2, $3, ''[)''))')
218 --USING OLD, range_lower, time_stamp_to_use
219 INTO debug1;
220 --RAISE INFO 'debug1: %', debug1;
221 END IF;
222
223 IF TG_OP = 'UPDATE' OR TG_OP = 'INSERT' THEN
224-- debug2 = NEW.ted;
225 EXECUTE format('SELECT $1.%I', timestamp_name) USING NEW INTO timestamp_to_use; -- for INSERT
226-- RAISE INFO 'EE timestamp_to_use: %', timestamp_to_use;
227
228 manipulate := jsonb_set('{}'::jsonb, ('{' || sys_period || '}')::text[], to_jsonb(tstzrange(timestamp_to_use, null, '[)')));
229-- RAISE INFO 'manipulate: %', manipulate;
230-- RAISE '%', NEW.state;
231-- RAISE INFO 'return: %', NEW;
232 RETURN jsonb_populate_record(NEW, manipulate);
233 END IF;
234
235 RETURN OLD;
236END;
237$$ LANGUAGE plpgsql;
238/