· 6 years ago · Aug 19, 2019, 01:30 AM
1CREATE OR REPLACE PROCEDURE public.load_foreign_schema_postgresql_stock(
2 _server text,
3 _remote_schema text,
4 _dl_schema text DEFAULT NULL::text,
5 _foreign_table_schema text DEFAULT NULL::text,
6 check_ft boolean DEFAULT true,
7 recreate_primary_keys boolean DEFAULT true,
8 log_table_schema_name text DEFAULT NULL::text,
9 log_table_name text DEFAULT NULL::text)
10LANGUAGE 'plpgsql'
11
12AS $BODY$
13DECLARE
14 tab RECORD;
15 tab_indices RECORD;
16
17 start_time TIMESTAMPTZ;
18
19 pg_class TEXT := 'pg_class_stock' || now()::TEXT;
20 pg_namespace TEXT := 'pg_namespace_stock' || now()::TEXT;
21 pg_index TEXT := 'pg_index_stock' || now()::TEXT;
22 pg_attribute TEXT := 'pg_attribute_stock' || now()::TEXT;
23
24 foreign_server TEXT;
25 remote_schema TEXT;
26 foreign_schema TEXT;
27 dl_schema TEXT;
28 table_name TEXT;
29 nr_rows TEXT;
30 duration_table_load INTERVAL;
31 duration_index_creation INTERVAL;
32 duration_analyze INTERVAL;
33 recreate_primary_keys BOOLEAN;
34
35 index_query TEXT;
36
37BEGIN
38 IF _foreign_table_schema ISNULL
39 THEN
40 _foreign_table_schema := 'ft_' || _remote_schema;
41 END IF;
42
43 IF _dl_schema ISNULL
44 THEN
45 _dl_schema := _remote_schema;
46 END IF;
47
48 foreign_server := _server;
49 remote_schema := _remote_schema;
50 dl_schema := _dl_schema;
51 foreign_schema := _foreign_table_schema;
52 recreate_primary_keys := load_foreign_schema_postgresql_stock.recreate_primary_keys;
53
54 IF recreate_primary_keys
55 THEN
56 EXECUTE format('CREATE FOREIGN TABLE %1$I.%2$I
57 (
58 oid OID OPTIONS (COLUMN_NAME ''oid''),
59 relname NAME OPTIONS (COLUMN_NAME ''relname''),
60 relnamespace OID OPTIONS (COLUMN_NAME ''relnamespace''),
61 relkind CHAR OPTIONS (COLUMN_NAME ''relkind'')
62 )
63 SERVER %3$I
64 OPTIONS (SCHEMA_NAME ''pg_catalog'', TABLE_NAME ''pg_class'')
65 ;', _foreign_table_schema, pg_class, _server);
66 EXECUTE format('CREATE FOREIGN TABLE %1$I.%2$I
67 (
68 oid OID OPTIONS (COLUMN_NAME ''oid''),
69 nspname TEXT OPTIONS (COLUMN_NAME ''nspname'')
70 )
71 SERVER %3$I
72 OPTIONS (SCHEMA_NAME ''pg_catalog'', TABLE_NAME ''pg_namespace'')
73 ;', _foreign_table_schema, pg_namespace, _server);
74 EXECUTE format('CREATE FOREIGN TABLE %1$I.%2$I
75 (
76 oid OID OPTIONS (COLUMN_NAME ''oid''),
77 indexrelid OID OPTIONS (COLUMN_NAME ''indexrelid''),
78 indrelid OID OPTIONS (COLUMN_NAME ''indrelid''),
79 indisunique BOOLEAN OPTIONS (COLUMN_NAME ''indisunique''),
80 indisprimary BOOLEAN OPTIONS (COLUMN_NAME ''indisprimary''),
81 indkey INT2VECTOR OPTIONS (COLUMN_NAME ''indkey'')
82 )
83 SERVER %3$I
84 OPTIONS (SCHEMA_NAME ''pg_catalog'', TABLE_NAME ''pg_index'')
85 ;', _foreign_table_schema, pg_index, _server);
86 EXECUTE format('CREATE FOREIGN TABLE %1$I.%2$I
87 (
88 attrelid OID OPTIONS (COLUMN_NAME ''attrelid''),
89 attname NAME OPTIONS (COLUMN_NAME ''attname''),
90 attnum SMALLINT OPTIONS (COLUMN_NAME ''attnum'')
91 )
92 SERVER %3$I
93 OPTIONS (SCHEMA_NAME ''pg_catalog'', TABLE_NAME ''pg_attribute'')
94 ;', _foreign_table_schema, pg_attribute, _server);
95 COMMIT;
96 END IF;
97
98 FOR tab IN
99 (
100 SELECT foreign_table_schema,
101 foreign_table_name
102 FROM information_schema.foreign_tables
103 WHERE foreign_table_schema = _foreign_table_schema
104 AND foreign_server_name = _server
105 AND foreign_table_name NOT IN (pg_class, pg_attribute, pg_namespace, pg_index)
106 AND foreign_table_name='customer_access'
107 ORDER BY 1, 2
108 )
109 LOOP
110 table_name := tab.foreign_table_name;
111
112 RAISE NOTICE 'Loading table %.% into %.%', tab.foreign_table_schema, tab.foreign_table_name, _dl_schema, tab.foreign_table_name;
113 EXECUTE format('DROP TABLE IF EXISTS %I.%I;', _dl_schema, tab.foreign_table_name);
114 start_time := clock_timestamp();
115 EXECUTE format('CREATE TABLE %1$I.%2$I AS
116 SELECT *
117 FROM %3$I.%2$I;', _dl_schema, tab.foreign_table_name, tab.foreign_table_schema);
118 duration_table_load := clock_timestamp() - start_time;
119
120 GET DIAGNOSTICS nr_rows := ROW_COUNT;
121 RAISE NOTICE '% rows loaded into %.%', nr_rows, _dl_schema, tab.foreign_table_name;
122
123 duration_index_creation := NULL;
124
125 IF recreate_primary_keys
126 THEN
127 start_time := clock_timestamp();
128 index_query := format('
129 WITH indexes AS (
130 SELECT n.nspname AS schemaname,
131 c.relname AS tablename,
132 i.relname AS indexname,
133 x.indisprimary,
134 indkey,
135 indrelid
136 FROM (((%1$I.%5$I x
137 JOIN %1$I.%2$I c ON ((c.oid = x.indrelid)))
138 JOIN %1$I.%2$I i ON ((i.oid = x.indexrelid)))
139 LEFT JOIN %1$I.%4$I n ON ((n.oid = c.relnamespace)))
140 WHERE ((c.relkind = ANY (ARRAY [''r''::"char", ''m''::"char"])) AND (i.relkind = ''i''::"char"))
141 AND n.nspname = %6$L
142 AND c.relname = %7$L
143 ),
144 indexes_cols_pkey AS (
145 SELECT *,
146 indkey[a] AS ikey
147 FROM indexes,
148 generate_subscripts(indkey, 1) t(a)
149 WHERE indisprimary
150 ), pkeys AS (
151 SELECT format(''ALTER TABLE %8$I.%%1$I ADD PRIMARY KEY (%%2$s)'', i.tablename,
152 string_agg(format(''%%1$I'', attname::TEXT), '', '' ORDER BY a)) AS indexdef
153 FROM indexes_cols_pkey i
154 JOIN %1$I.%3$I a ON i.indrelid = a.attrelid AND a.attnum = ikey
155 GROUP BY i.schemaname,i.tablename
156 )
157 SELECT *
158 FROM pkeys
159 ;', tab.foreign_table_schema, pg_class, pg_attribute, pg_namespace, pg_index,
160 remote_schema, table_name, dl_schema
161 );
162
163 FOR tab_indices IN EXECUTE index_query
164 LOOP
165 RAISE NOTICE '%',tab_indices.indexdef;
166 EXECUTE tab_indices.indexdef;
167 END LOOP;
168 duration_index_creation := clock_timestamp() - start_time;
169 END IF;
170
171 RAISE NOTICE 'Analyzing %.%', _dl_schema, tab.foreign_table_name;
172 start_time := clock_timestamp();
173 EXECUTE format('ANALYZE %I.%I', _dl_schema, tab.foreign_table_name);
174 duration_analyze := clock_timestamp() - start_time;
175 RAISE NOTICE 'Done analyzing %.%', _dl_schema, tab.foreign_table_name;
176
177 COMMIT;
178
179 END LOOP;
180END
181$BODY$;