· 7 years ago · Dec 09, 2018, 02:22 PM
1###############################################################################
2#
3# This single-operator pipeline manages a dim table tracking XWF access
4# points who have a listed location which we believe is significantly
5# different from their real-world location
6#
7###############################################################################
8
9
10from __future__ import absolute_import
11from __future__ import division
12from __future__ import print_function
13from __future__ import unicode_literals
14
15from dataswarm.operators import (
16 GlobalDefaults,
17 PrestoInsertOperator,
18)
19
20from dataswarm_commons.iorg.pipeline_constants import DE_XWF_OWNER
21
22GlobalDefaults.set(
23 user='benjyb',
24 secure_group='internet_org_pipelines',
25 pool='cea.highpri_pipelines',
26 schedule='@daily',
27 depends_on_past=True,
28 num_retries=5,
29 retry_wait=60,
30 notify_on_retry=True,
31 fail_on_future=False,
32 partition={'ds': '<DATEID>'},
33 oncall=DE_XWF_OWNER,
34)
35
36GlobalDefaults.add_macros(
37 # Input tables
38 XWF_APS='dim_xwf_access_points_2:cea',
39 FB4A_APS='dim_wifi_ap_location',
40 TECHNICIAN_APS='fbobject_fbtype_16313:di',
41 # Output table
42 INACCURATE_LOCATIONS='<TABLE:dim_xwf_inaccurate_ap_locations>',
43)
44
45
46create_table_statement = '''
47CREATE TABLE IF NOT EXISTS <INACCURATE_LOCATIONS> (
48 partner varchar,
49 access_point_id bigint,
50 access_point_status varchar,
51 current_latitude varchar,
52 current_longitude varchar,
53 recommended_latitude double,
54 recommended_longitude double,
55 dist_between_points double,
56 source varchar,
57 multiple_ap_same_location bigint,
58 score double,
59 row bigint,
60 ds varchar
61)
62WITH (
63 partitioned_by = ARRAY['ds'],
64 retention_days = 90
65)
66'''
67
68select_query = '''
69--
70WITH multiple_locations AS (
71 SELECT
72 latitude,
73 longitude,
74 COUNT(DISTINCT access_point_id) multiple_ap_same_location
75 FROM <XWF_APS> a
76 WHERE
77 a.ds = '<LATEST_DS:<XWF_APS>>'
78 AND a.ap_interface_mac_addresses IS NOT NULL
79 AND a.access_point_status IN ('commercially_active', 'installed')
80 AND latitude IS NOT NULL
81 AND longitude IS NOT NULL
82 GROUP BY
83 1, 2
84 HAVING
85 COUNT(DISTINCT access_point_id) > 1
86),
87--
88xwf_ap_locations AS (
89 SELECT
90 a.partner,
91 a.access_point_status,
92 a.latitude,
93 a.longitude,
94 a.access_point_id,
95 multiple_ap_same_location,
96 LOWER(ap_interface_mac_address) AS ap_interface_mac_address
97 FROM <XWF_APS> a
98 LEFT JOIN multiple_locations b
99 ON a.latitude = b.latitude
100 AND a.longitude = b.longitude
101 CROSS JOIN
102 UNNEST(ap_interface_mac_addresses) AS t (ap_interface_mac_address)
103 WHERE
104 a.ds = '<LATEST_DS:<XWF_APS>>'
105 AND a.ap_interface_mac_addresses IS NOT NULL
106 AND a.access_point_status IN ('commercially_active', 'installed')
107),
108--
109fb4a_locations AS (
110 SELECT
111 LOWER(hardware_address) AS hardware_address,
112 isconnected_latitude AS fb_lat,
113 isconnected_longitude AS fb_long
114 FROM <FB4A_APS>
115 WHERE
116 ds = '<LATEST_DS:<FB4A_APS>>'
117 AND exceeds_quality_threshold = 1
118),
119--
120technician_app_locations AS (
121 SELECT
122 LOWER(mac_address) AS hardware_address,
123 latitude AS fb_lat,
124 longitude AS fb_long
125 FROM <TECHNICIAN_APS>
126 WHERE
127 ds = '<LATEST_DS:<TECHNICIAN_APS>>'
128),
129--
130dist_ap_from_fb4a AS (
131 SELECT
132 latitude,
133 longitude,
134 ap_interface_mac_address,
135 hardware_address,
136 access_point_id,
137 fb_lat,
138 fb_long,
139 FB_GREAT_CIRCLE_DIST(
140 CAST(a.latitude AS DOUBLE),
141 CAST(a.longitude AS DOUBLE),
142 CAST(b.fb_lat AS DOUBLE),
143 CAST(b.fb_long AS DOUBLE)
144 ) dist1
145 FROM xwf_ap_locations a
146 LEFT JOIN fb4a_locations b
147 ON a.ap_interface_mac_address = b.hardware_address
148),
149--
150dist_ap_from_technician_app AS (
151 SELECT
152 latitude,
153 longitude,
154 ap_interface_mac_address,
155 hardware_address,
156 access_point_id,
157 fb_lat,
158 fb_long,
159 FB_GREAT_CIRCLE_DIST(
160 CAST(a.latitude AS DOUBLE),
161 CAST(a.longitude AS DOUBLE),
162 CAST(b.fb_lat AS DOUBLE),
163 CAST(b.fb_long AS DOUBLE)
164 ) dist1
165 FROM xwf_ap_locations a
166 LEFT JOIN technician_app_locations b
167 ON a.ap_interface_mac_address = b.hardware_address
168),
169--
170minimum_dist_fb4a AS (
171 SELECT
172 access_point_id,
173 min_by(fb_lat, dist1) AS min_fb_lat,
174 min_by(fb_long, dist1) AS min_fb_long,
175 MIN(dist1) AS dist1
176 FROM dist_ap_from_fb4a a
177 GROUP BY
178 1
179),
180--
181minimum_dist_technician_app AS (
182 SELECT
183 access_point_id,
184 min_by(fb_lat, dist1) AS min_fb_lat,
185 min_by(fb_long, dist1) AS min_fb_long,
186 MIN(dist1) AS dist1
187 FROM dist_ap_from_technician_app a
188 GROUP BY
189 1
190),
191--
192recommendation_location_and_dist AS (
193 SELECT
194 a.access_point_id,
195 CASE
196 WHEN a.dist1 IS NOT NULL
197 THEN a.min_fb_lat
198 ELSE CAST(b.min_fb_lat AS DOUBLE)
199 END AS recommended_latitude,
200 CASE
201 WHEN a.dist1 IS NOT NULL
202 THEN a.min_fb_long
203 ELSE CAST(b.min_fb_long AS DOUBLE)
204 END AS recommended_longitude,
205 CASE
206 WHEN a.dist1 IS NOT NULL
207 THEN a.dist1
208 ELSE b.dist1
209 END AS dist_between_points,
210 CASE
211 WHEN a.dist1 IS NOT NULL
212 THEN 'fb4a'
213 WHEN b.dist1 IS NOT NULL
214 THEN 'technician_app'
215 END AS source
216 FROM minimum_dist_fb4a a
217 LEFT JOIN minimum_dist_technician_app b
218 ON a.access_point_id = b.access_point_id
219),
220--
221ap_recommendations AS (
222 SELECT
223 DISTINCT
224 partner,
225 a.access_point_id,
226 access_point_status,
227 b.latitude AS current_latitude,
228 b.longitude AS current_longitude,
229 a.recommended_latitude,
230 a.recommended_longitude,
231 dist_between_points,
232 source,
233 multiple_ap_same_location
234 FROM recommendation_location_and_dist a
235 LEFT JOIN xwf_ap_locations b
236 ON a.access_point_id = b.access_point_id
237),
238--
239thresholds_and_score AS(
240 SELECT
241 *,
242 (
243 CAST(
244 CASE
245 WHEN dist_between_points > 1
246 THEN 1
247 WHEN dist_between_points IS NULL
248 THEN 0.03
249 ELSE dist_between_points
250 END AS DOUBLE
251 ) / 1.0
252 ) * 0.6 + (
253 CAST(
254 CASE
255 WHEN multiple_ap_same_location >= 20
256 THEN 20
257 WHEN multiple_ap_same_location IS NULL
258 THEN 1
259 ELSE multiple_ap_same_location
260 END AS DOUBLE
261 ) / 20.1
262 ) * 0.4 AS score
263 FROM ap_recommendations
264 WHERE
265 dist_between_points >= 0.15
266 OR (
267 multiple_ap_same_location > 2
268 AND (
269 dist_between_points >= 0.03
270 OR dist_between_points IS NULL
271 )
272 )
273),
274--
275row_number_table AS(
276 SELECT
277 *,
278 ROW_NUMBER() OVER (
279 PARTITION BY
280 partner
281 ORDER BY
282 score DESC
283 ) AS ROW
284 FROM thresholds_and_score
285) --final
286SELECT
287 *,
288 <DATEID> AS ds
289FROM row_number_table
290WHERE
291 ROW < 101
292'''
293
294create_inaccurate_ap_dim_table = PrestoInsertOperator(
295 dep_list=[],
296 namespace='locations',
297 table='<INACCURATE_LOCATIONS>',
298 partition={'ds': '<DATEID>'},
299 create=create_table_statement,
300 select=select_query,
301)