· 7 years ago · Mar 02, 2019, 06:26 PM
1# MIT License
2
3# Copyright (c) 2019 Bellhops Inc.
4
5# Permission is hereby granted, free of charge, to any person obtaining a copy
6# of this software and associated documentation files (the "Software"), to deal
7# in the Software without restriction, including without limitation the rights
8# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9# copies of the Software, and to permit persons to whom the Software is
10# furnished to do so, subject to the following conditions:
11
12# The above copyright notice and this permission notice shall be included in all
13# copies or substantial portions of the Software.
14
15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21# SOFTWARE.
22import logging
23
24from datetime import datetime
25from airflow.hooks.postgres_hook import PostgresHook
26from airflow.operators.python_operator import PythonOperator
27from airflow import utils as airflow_utils
28
29
30class Status(object):
31 def __init__(self, status_conn_id, status_schema_name, status_table_name, run_time):
32 self.table_name = status_table_name
33 self.conn_id = status_conn_id
34 self.schema_name = status_schema_name
35 self.run_time = run_time
36
37 self.hook = PostgresHook(postgres_conn_id=status_conn_id)
38 self.engine = self.hook.get_sqlalchemy_engine()
39 self.initialize()
40 self.mark_old_runs_failed()
41
42 def table_exists(self):
43 return self.engine.has_table(self.table_name, schema=self.schema_name)
44
45 def run_time_exists(self):
46 sql = "SELECT COUNT(*) FROM {schema_name}.{table_name} where run_time='{run_time}'".format(
47 schema_name=self.schema_name,
48 table_name=self.table_name,
49 run_time=self.run_time
50 )
51 result = self.run_sql(sql)
52 count = result.fetchone()[0]
53 if count > 0:
54 return True
55 else:
56 return False
57
58 def initialize(self):
59 if not self.table_exists():
60 sql = 'CREATE TABLE IF NOT EXISTS {schema_name}.{table_name}' \
61 '(id SERIAL,' \
62 'status varchar(1000),' \
63 'run_time TIMESTAMP,' \
64 'start_time TIMESTAMP,' \
65 'end_time TIMESTAMP, ' \
66 'UNIQUE (run_time, status))'.format(schema_name=self.schema_name,
67 table_name=self.table_name)
68 logging.info("Running {sql}".format(sql=sql))
69 self.engine.execute(sql)
70 else:
71 logging.info("Status table already present")
72
73 def run_sql(self, sql):
74 logging.info("Running {sql}".format(sql=sql))
75 result = self.engine.execute(sql)
76 return result
77
78 def mark_old_runs_failed(self):
79 logging.info("Marking old runs as failed.")
80 end_time = datetime.now()
81 sql = "UPDATE {schema_name}.{table_name} SET " \
82 "status = '{status}', " \
83 "end_time = '{end_time}' " \
84 "WHERE run_time < '{run_time}' and status != 'ENDED'" \
85 "".format(
86 schema_name=self.schema_name,
87 table_name=self.table_name,
88 status='FAILED',
89 run_time=self.run_time,
90 end_time=end_time
91 )
92 self.run_sql(sql)
93
94 def update(self, status):
95
96 if self.run_time_exists():
97 if status == 'ENDED':
98 end_time = datetime.now()
99 sql = "UPDATE {schema_name}.{table_name} SET " \
100 "status = '{status}', " \
101 "end_time = '{end_time}' " \
102 "WHERE run_time = '{run_time}' " \
103 "".format(
104 schema_name=self.schema_name,
105 table_name=self.table_name,
106 status=status,
107 run_time=self.run_time,
108 end_time=end_time
109 )
110 self.run_sql(sql)
111 else:
112 start_time = datetime.now()
113 sql = "INSERT INTO {schema_name}.{table_name}(status, run_time, start_time) VALUES(" \
114 "'{status}'," \
115 "'{run_time}'," \
116 "'{start_time}')" \
117 "".format(
118 schema_name=self.schema_name,
119 table_name=self.table_name,
120 status=status,
121 run_time=self.run_time,
122 start_time=start_time,
123 )
124 if status == 'STARTED':
125 self.run_sql(sql)
126 else:
127 raise ValueError("Error updating status. Trying to update status for non-existent ETL run. {sql}".format(sql=sql))
128
129
130class StatusOperator(PythonOperator):
131
132 @airflow_utils.apply_defaults
133 def __init__(self, dag, task_id, status_config, current_status, **kwargs):
134 self.status_conn_id = status_config['status_conn_id']
135 self.status_schema_name = status_config['status_schema_name']
136 self.status_table_name = status_config['status_table_name']
137 self.current_status = current_status
138
139 super(StatusOperator, self).__init__(
140 dag=dag,
141 task_id=task_id,
142 python_callable=self.status_update,
143 op_kwargs={
144 'current_status': current_status
145 },
146 **kwargs
147 )
148
149 def status_update(self, current_status, **kwargs):
150 run_time = kwargs.get('execution_date')
151 status = Status(status_conn_id=self.status_conn_id,
152 status_schema_name=self.status_schema_name,
153 status_table_name=self.status_table_name,
154 run_time=run_time)
155 status.update(current_status)