· 7 years ago · Jan 26, 2019, 10:32 PM
1from airflow.models import DAG
2from datetime import datetime, timedelta
3import textwrap
4from airflow.operators import HiveOperator, SQLCheckOperator, PartitionSwapOperator
5from lyftdata.airflow.plugins.lyft_toolkit import QAOperator
6from lyftdata.airflow.plugins.sensors.lyft_metastore_partition_sensor import LyftHivePartitionSensor
7from lyftdata.airflow.util.workflow import table_pipeline
8
9
10# PUT YOUR USERNAME HERE:
11USERNAME = 'gguy'
12
13dag_args = {
14 'concurrency': 12,
15 'catchup': False,
16 'max_active_runs': 8,
17 'start_date': datetime(2018, 11, 01)
18}
19
20default_args = {
21 'owner': '{}@lyft.com'.format(USERNAME),
22 'retries': 0,
23 'retry_delay': timedelta(minutes=1),
24 'depends_on_past': False,
25 'email': '{}@lyft.com'.format(USERNAME),
26 'email_on_failure': True,
27 'email_on_retry': False,
28 'max_active_runs': 1,
29 'concurrency': 10,
30}
31
32params = {
33 'prod_schema': 'locations',
34 'schema_stg': 'etl',
35 'table': 'location_refined',
36 'location': 'PRODUCTION',
37}
38
39
40stg_rides_mapjoin_sql = """
41 CREATE EXTERNAL TABLE IF NOT EXISTS {{ params.schema_stg }}.stg_rides_{{ params.table }} (
42 user_lyft_id bigint,
43 ride_id bigint,
44 requested_at timestamp,
45 picked_up_at timestamp,
46 arrived_at timestamp,
47 dropped_off_at timestamp
48 )
49 PARTITIONED BY (ds STRING)
50 STORED AS PARQUET
51 LOCATION 's3://lyftqubole-iad/qubole/t/sedirt/{{ location }}/{{ params.schema_stg }}/table_name=stg_rides_{{ params.table }}'
52 TBLPROPERTIES ('PARQUET.COMPRESS'='SNAPPY');
53
54 INSERT OVERWRITE TABLE {{ params.schema_stg }}.stg_rides_{{ params.table }} PARTITION (ds = '{{ ds }}')
55 SELECT
56 driver_key as user_lyft_id
57 ,ride_id
58 ,requested_at
59 ,picked_up_at
60 ,arrived_at
61 ,dropped_off_at
62 FROM core.fact_rides
63 where
64 ds = '{{ ds }}'
65 -- and driver_key % 1000 == 2
66
67"""
68
69create_location_refined_sql = textwrap.dedent("""\
70
71
72CREATE EXTERNAL TABLE IF NOT EXISTS {{ params.prod_schema }}.{{ params.table }} (
73 user_lyft_id BIGINT,
74 ride_id BIGINT,
75 model STRING COMMENT 'model used',
76 ingest_region STRING,
77
78 osm_segment_start_ID BIGINT,
79 osm_segment_start_lat FLOAT,
80 osm_segment_start_lng FLOAT,
81
82 osm_segment_end_ID BIGINT,
83 osm_segment_end_lat FLOAT,
84 osm_segment_end_lng FLOAT,
85
86 RECORDED_AT TIMESTAMP,
87 CREATED_AT TIMESTAMP,
88 LOGGED_AT TIMESTAMP,
89
90 lat float,
91 lng float,
92
93 before_requested tinyint,
94 before_picked_up tinyint,
95 before_arrived tinyint,
96 before_dropped_off tinyint
97
98)
99 COMMENT 'Location ETL job output locations for each ride'
100 PARTITIONED BY (ds STRING)
101 STORED AS PARQUET
102 LOCATION 's3://lyftqubole-iad/qubole/t/sedirt/{{ location }}/{{ params.schema }}/table_name={{ params.table }}'
103 TBLPROPERTIES ('PARQUET.COMPRESS'='SNAPPY')
104""")
105
106
107
108stg_location_refined_sql = textwrap.dedent("""\
109 CREATE EXTERNAL TABLE IF NOT EXISTS {{ params.schema_stg }}.stg_{{ params.table }} (
110 user_lyft_id BIGINT,
111 ride_id BIGINT,
112 model STRING COMMENT 'model used',
113 ingest_region STRING,
114
115 osm_segment_start_ID BIGINT,
116 osm_segment_start_lat FLOAT,
117 osm_segment_start_lng FLOAT,
118
119 osm_segment_end_ID BIGINT,
120 osm_segment_end_lat FLOAT,
121 osm_segment_end_lng FLOAT,
122
123 RECORDED_AT TIMESTAMP,
124 CREATED_AT TIMESTAMP,
125 LOGGED_AT TIMESTAMP,
126
127 lat float,
128 lng float,
129
130 before_requested tinyint,
131 before_picked_up tinyint,
132 before_arrived tinyint,
133 before_dropped_off tinyint
134
135 )
136 COMMENT 'Location ETL job output locations for each ride'
137 PARTITIONED BY (ds STRING)
138 STORED AS PARQUET
139 LOCATION 's3://lyftqubole-iad/qubole/t/sedirt/{{ location }}/{{ params.schema_stg }}/table_name=stg_{{ params.table }}'
140 TBLPROPERTIES ('PARQUET.COMPRESS'='SNAPPY');
141
142 INSERT OVERWRITE TABLE {{ params.schema_stg }}.stg_{{ params.table }} PARTITION (ds='{{ ds }}')
143 SELECT /*+ MAPJOIN(rides) */
144 a.user_lyft_id
145 ,rides.ride_id
146 ,a.model
147 ,a.ingest_region
148 ,a.osm_segment_start_ID as startNodeID
149 ,a.osm_segment_start_lat as startNodeLat
150 ,a.osm_segment_start_lng as startNodeLng
151 ,a.osm_segment_end_ID as endNodeID
152 ,a.osm_segment_end_lat as endNodeLat
153 ,a.osm_segment_end_lng as endNodeLng
154 ,a.RECORDED_AT
155 ,a.created_at
156 ,a.LOGGED_AT
157 ,a.lat
158 ,a.lng
159 , CASE WHEN a.recorded_at < rides.requested_at then 1 else 0 end as before_requested
160 , CASE WHEN a.recorded_at < rides.picked_up_at then 1 else 0 end as before_picked_up
161 , CASE WHEN a.recorded_at < rides.arrived_at then 1 else 0 end as before_arrived
162 , CASE WHEN a.recorded_at < rides.dropped_off_at then 1 else 0 end as before_dropped_off
163
164 FROM default.event_location_processed a
165 JOIN {{ params.schema_stg }}.stg_rides_{{ params.table }} rides
166 on a.ds = rides.ds
167 and a.user_lyft_id = rides.user_lyft_id
168 where (substring(model,1,2) = 'rt' or substring(model,1,1) = 'v' )
169 and a.recorded_at is not null
170 and a.ds = '{{ ds }}'
171 -- and a.user_lyft_id % 1000 == 2
172 and cast(a.RECORDED_AT as double) between (cast(rides.requested_at as double) - 120) and (cast(rides.dropped_off_at as double) + 120)
173
174 """)
175
176not_empty_check_sql = textwrap.dedent("""\
177 SELECT COUNT(*) > 0
178 FROM {{ params.schema_stg }}.stg_{{ params.table }}
179 WHERE ds = '{{ ds }}'
180""")
181
182
183with DAG('location_refined', default_args=default_args, **dag_args) as dag:
184
185 wait_for_rides = LyftHivePartitionSensor(
186 task_id='wait_for_fact_rides',
187 table='fact_rides',
188 schema='core',
189 partition='%%ds={}%%'.format('{{ ds }}'),
190 dag=dag,
191 )
192
193 stg_rides_map_join = HiveOperator(
194 task_id='stg_rides_map_join',
195 hql=stg_rides_mapjoin_sql,
196 dag=dag,
197 params=params,
198 )
199
200 wait_for_location_processed = LyftHivePartitionSensor(
201 task_id='wait_for_location_process',
202 table='event_location_processed',
203 schema='default',
204 partition='%%ds={}%%'.format('{{ ds }}'),
205 dag=dag,
206 params=params,
207 )
208
209 stg_location_refined = HiveOperator(
210 task_id='stg_location_refined',
211 hql=stg_location_refined_sql,
212 dag=dag,
213 params=params,
214 )
215
216
217 not_empty_check = QAOperator(
218 qubole_conn_id='qubole_default',
219 task_id='QA_hive.non_zero',
220 command_type='hivecmd',
221 cluster_label='default',
222 query=not_empty_check_sql,
223 fetch_logs=True,
224 params=params,
225 sla=timedelta(minutes=30),
226 dag=dag,
227 )
228
229 create_location_refined = HiveOperator(
230 task_id='create_location_refined',
231 hql=create_location_refined_sql,
232 dag=dag,
233 params=params,
234 )
235
236 location_refined_promoted = PartitionSwapOperator(
237 from_schema='{{ params.schema_stg }}',
238 from_table='stg_{{ params.table }}',
239 to_schema='{{ params.prod_schema }}',
240 to_table='{{ params.table }}',
241 task_id="location_refined_promoted",
242 dag=dag,
243 partitions={'ds': '{{ ds }}'},
244 params=params,
245 )
246
247 wait_for_rides >> stg_rides_map_join
248 wait_for_location_processed >> stg_location_refined
249 stg_rides_map_join >> stg_location_refined
250 stg_location_refined >> not_empty_check
251 create_location_refined >> location_refined_promoted
252 not_empty_check >> location_refined_promoted