· 7 years ago · Feb 27, 2019, 05:24 PM
1from airflow import DAG
2from datetime import timedelta, datetime
3from acme.operators.dwh_operators import ProcessDimensionOperator
4
5default_args = {
6 'owner': 'airflow',
7 'start_date': datetime(2019, 2, 27),
8 'provide_context': True,
9 'depends_on_past': True
10}
11
12dag = DAG(
13 'etl',
14 schedule_interval=None,
15 dagrun_timeout=timedelta(minutes=60),
16 template_searchpath=tmpl_search_path,
17 default_args=default_args,
18 max_active_runs=1)
19
20process_product_dim = ProcessDimensionOperator(
21 task_id='process_product_dim',
22 mysql_conn_id='mysql_dwh',
23 sql='process_dimension.sql',
24 database='dwh',
25 col_names=[
26 'id',
27 'name',
28 'category',
29 'price',
30 'available',
31 'country',
32 ],
33 t_name='products',
34 dag=dag)
35
36from airflow.hooks.mysql_hook import MySqlHook
37from airflow.models import BaseOperator
38from airflow.utils.decorators import apply_defaults
39
40class ProcessDimensionOperator(BaseOperator):
41 template_fields = (
42 'sql',
43 'parameters')
44 template_ext = ('.sql',)
45
46 @apply_defaults
47 def __init__(
48 self,
49 sql,
50 t_name,
51 col_names,
52 database,
53 mysql_conn_id='mysql_default',
54 *args, **kwargs):
55 super(ProcessDimensionOperator, self).__init__(*args, **kwargs)
56 self.sql = sql
57 self.t_name = t_name
58 self.col_names = col_names
59 self.database = database
60 self.mysql_conn_id = mysql_conn_id
61 self.parameters = parameters
62
63 def execute(self, context):
64 hook = MySqlHook(mysql_conn_id=self.mysql_conn_id)
65
66 self.params['col_names'] = self.col_names
67 self.params['t_name'] = self.t_name
68 self.params['match_statement'] = self.construct_match_statement(self.col_names)
69
70 hook.run(sql=self.sql)
71
72 def construct_match_statement(self, cols):
73 map_list = map(lambda x: f'and t.{x} = s.{x}', cols[1:])
74
75 return ' '.join(map_list)
76
77create table if not exists staging.{{ params.t_name }};
78
79select
80 *
81from
82 source.{{ params.t_name }} as source
83join
84 target.{{ params.t_name }} as target
85 on source.id = target.id {{ params.match_statement }}