· 7 years ago · Feb 03, 2019, 10:58 AM
1/*
2
3SNOWFLAKE INCREMENTAL MATERIALIZATION BY PERIOD
4
5*/
6
7{% macro create_state_table_if_not_exists(state_schema) -%}
8
9 /*
10
11 This will create state table if not exists.
12
13 Notes:
14 - State table can be created ahead of time to seed state with specific timestamps for usage downstream.
15
16 */
17
18 {% call statement('state_table', fetch_result=False) -%}
19
20 create table if not exists {{ state_schema }}.dbt_state(
21 m_id varchar not null,
22 last_processed_timestamp timestamp,
23 custom_stop_timestamp timestamp
24 )
25
26 {%- endcall %}
27
28{%- endmacro %}
29
30{% macro get_period_boundaries(state_schema, timestamp_field, start_date, stop_date, period) -%}
31
32 /*
33
34 This will get proper range to process based on dbt_state table in target schema.
35
36 returns: start_timestamp, stop_timestamp, and number of intervals between the two
37
38 Notes:
39 - Depends on existence of a state table create by materialization or ahead of time.
40 - If no rows exist in the state table, timestamps from config start and stop dates will be returned.
41 - If last_processed_timestamp from dbt_state is null, defaults to start_date from config.
42 - If last_processed_timestamp is not null, last_processed_timestamp plus one interval becomes start_timestamp.
43 - Supports a custom_stop_timestamp that can be updated outside of dbt before running.
44 Our use case was needing to leverage last snapshot date from raw data source database.
45 - If custom_stop_timestamp is null, then default to provided stop_date.
46 - If provide stop_date is empty, then default to current timestamp.
47
48 */
49
50 {% call statement('period_boundaries', fetch_result=True) -%}
51
52 with ts_transform as (
53 select
54 coalesce({{ dbt_utils.dateadd(period,
55 interval=1,
56 from_date_or_timestamp='last_processed_timestamp') }},
57 '{{start_date}}')::timestamp as start_timestamp,
58 coalesce({{dbt_utils.dateadd('millisecond',
59 -1,
60 'custom_stop_timestamp')}},
61 {{dbt_utils.dateadd('millisecond',
62 -1,
63 "nullif('" ~ stop_date ~ "','')::timestamp")}},
64 {{dbt_utils.current_timestamp()}}) as stop_timestamp
65 from {{ state_schema }}.dbt_state
66 where m_id = '{{this.name}}'),
67 default_values as (
68 select
69 '{{start_date}}'::timestamp as start_timestamp,
70 coalesce({{dbt_utils.dateadd('millisecond',
71 -1,
72 "nullif('" ~ stop_date ~ "','')::timestamp")}},
73 {{dbt_utils.current_timestamp()}}) as stop_timestamp
74 )
75 select
76 start_timestamp,
77 stop_timestamp,
78 {{dbt_utils.datediff('start_timestamp',
79 'stop_timestamp',
80 period)}} + 1 as num_periods
81 from ts_transform
82 union
83 select
84 start_timestamp,
85 stop_timestamp,
86 {{dbt_utils.datediff('start_timestamp',
87 'stop_timestamp',
88 period)}} + 1 as num_periods
89 from default_values
90 where not exists (select 1 from ts_transform);
91
92 {%- endcall %}
93
94{%- endmacro %}
95
96{% macro get_period_sql(target_cols_csv, sql, period, start_timestamp, stop_timestamp, offset) -%}
97
98 /*
99
100 This creates the select statement to be used by insert into temporary table.
101
102 returns: select statement using provided parameters
103
104 Notes:
105 - Injects transformed model sql inside of filtered_sql section replacing tokens: __START_TS__ and __END_TS__
106
107 */
108
109 {%- set period_start -%}
110 '{{start_timestamp}}'::timestamp + interval '{{offset}} {{period}}'
111 {%- endset -%}
112 {%- set period_end -%}
113 '{{start_timestamp}}'::timestamp + interval '{{offset}} {{period}}' + interval '1 {{period}}'
114 {%- endset -%}
115 {%- set filtered_sql = sql | replace("__START_TS__", period_start) -%}
116 {%- set filtered_sql = filtered_sql | replace("__END_TS__", period_end) -%}
117
118 select
119 {{target_cols_csv}}
120 from (
121 {{filtered_sql}}
122 )
123
124{%- endmacro %}
125
126{% macro dbt__incremental_delete_based_on_timestamp(target_relation, timestamp_field, start_timestamp, offset, period) -%}
127
128 /*
129
130 This creates the delete statement used in main commit as part of 'UPSERT' for existing rows
131
132 returns: delete statement against target table using start timestamp provided
133
134 */
135
136 delete
137 from {{ target_relation }}
138 where {{ timestamp_field }} = '{{start_timestamp}}'::timestamp + interval '{{offset}} {{period}}';
139
140{%- endmacro %}
141
142{% macro dbt__incremental_commit_state(state_schema, start_timestamp, offset, period) -%}
143
144 /*
145
146 This creates or updates state for model in dbt_state table using provided start timestamp
147
148 */
149
150 merge into {{ state_schema }}.dbt_state as ds
151 using (select '{{ this.name }}' as m_id, '{{ start_timestamp }}'::timestamp + interval '{{offset}} {{period}}' as last_processed_timestamp, null as custom_stop_timestamp) us
152 on ds.m_id = ds.m_id
153 when not matched then insert (m_id, last_processed_timestamp, custom_stop_timestamp) values (us.m_id, us.last_processed_timestamp, us.custom_stop_timestamp)
154 when matched then update set last_processed_timestamp = us.last_processed_timestamp;
155
156{%- endmacro %}
157
158{% materialization snowflake_incremental_by_period, adapter='snowflake' %}
159
160 -- set params from config section of calling sql file
161
162 {%- set timestamp_field = config.require('timestamp_field') -%} -- timestamp field to use for 'UPSERT'
163 {%- set start_date = config.require('start_date') -%} -- default start date
164 {%- set stop_date = config.get('stop_date') or '' -%} -- stop date if needed, will default to current timestamp
165 {%- set period = config.get('period') or 'week' -%} -- interval to use for timestamp loop
166
167 -- validate that events filter exist in select statement
168
169 {%- if sql.find('__START_TS__') == -1 or sql.find('__END_TS__') == -1 -%}
170 {%- set error_message -%}
171 Model '{{ model.unique_id }}' does not include the required strings in its sql
172 {%- endset -%}
173 {{ exceptions.raise_compiler_error(error_message) }}
174 {%- endif -%}
175
176 -- set standard dbt params to be used downstream for database preparation
177
178 {%- set identifier = model['name'] -%}
179 {%- set tmp_identifier = identifier + '__dbt_incremental_tmp' -%}
180
181 {%- set old_relation = adapter.get_relation(schema=schema, identifier=identifier) -%}
182 {%- set target_relation = api.Relation.create(identifier=identifier, schema=schema, type='table') -%}
183 {%- set tmp_relation = api.Relation.create(identifier=tmp_identifier,
184 schema=schema, type='table') -%}
185
186 {%- set non_destructive_mode = (flags.NON_DESTRUCTIVE == True) -%}
187 {%- set full_refresh_mode = (flags.FULL_REFRESH == True) -%}
188
189 {%- set exists_as_table = (old_relation is not none and old_relation.is_table) -%}
190 {%- set exists_not_as_table = (old_relation is not none and not old_relation.is_table) -%}
191
192 {%- set should_truncate = (non_destructive_mode and full_refresh_mode and exists_as_table) -%}
193 {%- set should_drop = (not should_truncate and (full_refresh_mode or exists_not_as_table)) -%}
194 {%- set force_create = (flags.FULL_REFRESH and not flags.NON_DESTRUCTIVE) -%}
195
196 -- truncate or drop table based on full refresh, table existence, not destructive values
197
198 {% if old_relation is none -%}
199 -- noop
200 {%- elif should_truncate -%}
201 {{ adapter.truncate_relation(old_relation) }}
202 {%- elif should_drop -%}
203 {{ adapter.drop_relation(old_relation) }}
204 {%- set old_relation = none -%}
205 {%- endif %}
206
207 -- run pre_hooks
208
209 {{ run_hooks(pre_hooks, inside_transaction=False) }}
210
211 /* snowflake DDL always auto commits a statement so need
212 the additional begin statement, so that commit does
213 not raise exception */
214 */
215 {{ run_hooks(pre_hooks, inside_transaction=True) }}
216 {% call statement() -%}
217 begin;
218 {%- endcall %}
219 {{ adapter.commit() }}
220
221 -- build model using a loop based period selected
222
223 -- 1) Build the destination table
224 {% if force_create or old_relation is none -%}
225 {%- call statement('main', fetch_result=False) -%}
226 -- this is needed to properly create table based on provided sql
227 {%- set empty_sql = sql | replace("__START_TS__", "'" ~ modules.datetime.datetime(1970, 1, 1, 0, 0, 0) ~ "'::timestamp") -%}
228 {%- set empty_sql = empty_sql | replace("__END_TS__", "'" ~ modules.datetime.datetime(1970, 1, 1, 0, 0, 0) ~ "'::timestamp") -%}
229 {{create_table_as(False, target_relation, empty_sql)}}
230 {%- endcall -%}
231 {%- endif %}
232
233 -- 2) Create state table if not exits
234
235 {{ dbt_utils.create_state_table_if_not_exists(schema) }}
236
237 -- 3) Get the dates to process using get_period_boundaries macro
238
239 {% set period_dates = dbt_utils.get_period_boundaries(schema,
240 timestamp_field,
241 start_date,
242 stop_date,
243 period) %}
244
245 {%- set start_timestamp = load_result('period_boundaries')['data'][0][0] | string -%}
246 {%- set stop_timestamp = load_result('period_boundaries')['data'][0][1] | string -%}
247 {%- set num_periods = load_result('period_boundaries')['data'][0][2] | int -%}
248
249 {% set target_columns = adapter.get_columns_in_table(schema, identifier) %}
250 {%- set target_cols_csv = target_columns | map(attribute='quoted') | join(', ') -%}
251 {%- set loop_vars = {'sum_rows_inserted': 0} -%}
252
253 -- 4) Using period dates, commit each period as a separate transaction
254
255 {% for i in range(num_periods) -%}
256 {%- set msg = "Running for " ~ period ~ " " ~ (i + 1) ~ " of " ~ (num_periods) -%}
257 {{log(" + " ~ modules.datetime.datetime.now().strftime('%H:%M:%S') ~ " " ~ msg, info=True)}}
258
259 -- create temp table with intermediate results based on timestamp
260
261 {%- set tmp_identifier = model['name'] ~ '__dbt_incremental_period' ~ i ~ '_tmp' -%}
262
263 {%- set tmp_relation = api.Relation.create(identifier=tmp_identifier,
264 schema=schema, type='table') -%}
265
266 {% call statement(fetch_result=True) -%}
267 {% set tmp_table_sql = dbt_utils.get_period_sql(target_cols_csv,
268 sql,
269 period,
270 start_timestamp,
271 stop_timestamp,
272 i) %}
273 {{dbt.create_table_as(True, tmp_relation, tmp_table_sql)}}
274 {%- endcall %}
275
276 {{adapter.expand_target_column_types(temp_table=tmp_identifier,
277 to_schema=schema,
278 to_table=identifier)}}
279 {%- set name = 'main-' ~ i -%}
280
281 -- perform 'UPSERT' to target table using results from temp table inside a transaction
282
283 {% call statement(name, fetch_result=False, auto_begin=True) -%}
284
285 {{ dbt_utils.dbt__incremental_commit_state(schema, start_timestamp, i, period) }}
286 {{ dbt_utils.dbt__incremental_delete_based_on_timestamp(target_relation, timestamp_field, start_timestamp, i, period) }}
287 insert into {{target_relation}} ({{target_cols_csv}})
288 (
289 select
290 {{target_cols_csv}}
291 from {{tmp_relation.include(schema=False)}}
292 );
293
294 {%- endcall %}
295 {{ adapter.commit() }}
296
297 -- log progress
298
299 {%- set rows_inserted = (load_result('main-' ~ i)['status'].split(" "))[1] | int -%}
300 {{ log(rows_inserted)}}
301 {%- set sum_rows_inserted = loop_vars['sum_rows_inserted'] + rows_inserted -%}
302 {%- if loop_vars.update({'sum_rows_inserted': sum_rows_inserted}) %} {% endif -%}
303
304 {%- set msg = "Ran for " ~ period ~ " " ~ (i + 1) ~ " of " ~ (num_periods) ~ "; " ~ rows_inserted ~ " records inserted" -%}
305 {{log(" + " ~ modules.datetime.datetime.now().strftime('%H:%M:%S') ~ " " ~ msg, info=True)}}
306 {%- endfor %}
307
308
309 -- run post_hooks
310
311 {{ run_hooks(post_hooks, inside_transaction=True) }}
312 /* snowflake DDL always auto commits a statement so need
313 the additional begin statement, so that commit does
314 not raise exception */
315 {% call statement() -%}
316 begin;
317 {%- endcall %}
318 {{ adapter.commit() }}
319
320 {{ run_hooks(post_hooks, inside_transaction=False) }}
321
322 {%- set status_string = "INSERT " ~ loop_vars['sum_rows_inserted'] -%}
323
324 {% call noop_statement(name='main', status=status_string) -%}
325 -- no-op
326 {%- endcall %}
327
328{%- endmaterialization %}