· 5 years ago · Jan 11, 2021, 11:24 PM
1from google.cloud import storage
2from google.cloud import pubsub_v1
3
4import logging
5import os
6import io
7import json
8import csv
9import re
10import datetime
11
12LOGLEVEL = os.environ.get('LOGLEVEL', 'info').upper()
13logging.basicConfig(level=LOGLEVEL)
14logger = logging.getLogger(__name__)
15
16
17def epochToIso(epochTime):
18 dateObj = datetime.date.fromtimestamp(epochTime)
19 return dateObj.isoformat() + 'T00:00:00Z'
20
21def tsIso():
22 timestamp = datetime.datetime.utcnow()
23 return timestamp.isoformat()
24
25def tsStr():
26 timestamp = datetime.datetime.utcnow()
27 return timestamp.strftime("%Y%m%d%H%M")
28
29class Helper(object):
30
31#https://stackoverflow.com/questions/52233949/passing-variables-to-google-cloud-functions
32
33 @staticmethod
34 def getParam(request,param):
35 try:
36 #Via a GET with query parameters:
37 return request.args.get(param)
38 except Exception as e:
39 pass
40
41 try:
42 #Via a POST with a form:
43 return request.form.get(param)
44 except Exception as e:
45 pass
46
47 try:
48 #Via a POST with JSON:
49 return request.get_json().get(param)
50 except Exception as e:
51 pass
52
53 try:
54 #Via a POST with JSON:
55 return os.environ.get(param)
56 except Exception as e:
57 pass
58
59 return False
60
61 @staticmethod
62 def sendNotification(message,level="INFO"):
63 try:
64 publisher = pubsub_v1.PublisherClient()
65 # in the form `projects/{project_id}/topics/{topic_id}`
66 topic_path = os.environ["TOPIC_PATH"]
67 if topic_path=="none":
68 return True
69 package = dict()
70 package['sender']="itunes_to_gcs"
71 package['level']=level
72 package['message']=message
73
74 data = json.dumps(package).encode("utf-8")
75 future = publisher.publish(topic_path, data)
76
77 logger.info(future.result())
78 logger.info("Published messages.")
79 return True
80 except Exception as e:
81 logger.warning("Unable send notification to pub/sub topic, error: {}".format(e))
82 logger.warning(e)
83 return False
84
85
86 @staticmethod
87 def save_data_to_file(data,url):
88 text_file = open(url, "w")
89 n = text_file.write(data)
90 text_file.close()
91
92 @staticmethod
93 def saveDictAsCSVToGCS(data,url):
94 output = io.StringIO()
95 csv_columns = data.keys()
96 writer = csv.DictWriter(output, fieldnames=csv_columns)
97 writer.writeheader()
98 writer.writerow(data)
99 if url.startswith("gs://"):
100 Helper.upload_data_to_gcs(output.getvalue(), url, contentType='text/csv')
101 else:
102 Helper.save_data_to_file(output.getvalue(), url)
103
104 @staticmethod
105 def saveListOfDictAsCSVToGCS(data,url):
106 output = io.StringIO()
107 if len(data)>0:
108 csv_columns = data[0].keys()
109 writer = csv.DictWriter(output, fieldnames=csv_columns)
110 writer.writeheader()
111 for row in data:
112 writer.writerow(row)
113 if url.startswith("gs://"):
114 Helper.upload_data_to_gcs(output.getvalue(), url, contentType='text/csv')
115 else:
116 Helper.save_data_to_file(output.getvalue(), url)
117 else:
118 logger.warning("Empty list can't be saved to csv!")
119
120 @staticmethod
121 def getDailyActiveDevicesBulk(session, epochTimeStart, epochTimeEnd, dataset, fieldList):
122 '''
123 Queries Itunes API for date specified by epochTime
124 '''
125 #All measures in the Itunes API as of 12/11/2017
126
127 task = {
128 'endpoint': 'https://analytics.itunes.apple.com/analytics/api/v1/data/time-series',
129 'data': '{{"adamId":[{}],"frequency":"DAY","measures":["rollingActiveDevices"],"startTime":"{}","endTime":"{}"}}'.format(
130dataset["adamId"], epochToIso(epochTimeStart), epochToIso(epochTimeEnd)),
131 'requestType': 'post'
132 }
133
134 response = session.run(task)
135 response_2 = json.loads(response['read'])
136 results = response_2['results']
137
138 datas = {}
139 for result in results:
140 for measureValue in result['data']:
141 bqDate = measureValue['date']
142 bqDate = bqDate[:10].replace('-','')
143 if bqDate not in datas.keys():
144 datas[bqDate] = {}
145 datas[bqDate]['date'] = bqDate
146 datas[bqDate]['timestamp'] = tsIso()
147 for key in measureValue.keys():
148 if key!='date':
149 datas[bqDate][key] = measureValue[key]
150
151 dataList=[]
152 for key in sorted(datas.keys(), reverse=True):
153 dataList.append(datas[key])
154
155 if len(fieldList)>0:
156 newDataList=[]
157 for record in dataList:
158 newRecord={}
159 for field in fieldList:
160 try:
161 newRecord[field] = record[field]
162 except:
163 newRecord[field] = ""
164 newDataList.append(newRecord)
165 dataList=newDataList
166
167 url=dataset["fileName"].format("DailyActiveDevices",tsStr())
168 Helper.saveListOfDictAsCSVToGCS(dataList,url)
169
170 @staticmethod
171 def getDailyMeasuresBulk(session, epochTimeStart, epochTimeEnd, dataset, fieldList):
172 '''
173 Queries Itunes API for date specified by epochTime
174 '''
175 #All measures in the Itunes API as of 12/11/2017
176
177 task = {
178 'endpoint': 'https://analytics.itunes.apple.com/analytics/api/v1/data/app/detail/measures',
179 'data': '{{"adamId":[{}],"frequency":"DAY","measures":["impressionsTotal","impressionsTotalUnique", \
180 "pageViewCount", "pageViewUnique", "units", "iap", "sales", "payingUsers", "installs", "uninstalls", "sessions", \
181 "activeDevices", "crashes", "optin"],\
182 "startTime":"{}","endTime":"{}"}}'.format(dataset["adamId"], epochToIso(epochTimeStart), epochToIso(epochTimeEnd)),
183 'requestType': 'post'
184 }
185
186 response = session.run(task)
187 response_2 = json.loads(response['read'])
188 results = response_2['results']
189 datas = {}
190 for result in results:
191 for measureValue in result['data']:
192 bqDate = measureValue['date']
193 bqDate = bqDate[:10].replace('-','')
194 if bqDate not in datas.keys():
195 datas[bqDate] = {}
196 datas[bqDate]['date'] = bqDate
197 datas[bqDate]['timestamp'] = tsIso()
198 datas[bqDate][result['measure']] = measureValue['value']
199
200 dataList=[]
201 for key in sorted(datas.keys(), reverse=True):
202 dataList.append(datas[key])
203
204 if len(fieldList)>0:
205 newDataList=[]
206 for record in dataList:
207 newRecord={}
208 for field in fieldList:
209 try:
210 newRecord[field] = record[field]
211 except:
212 newRecord[field] = ""
213 newDataList.append(newRecord)
214 dataList=newDataList
215
216
217 url=dataset["fileName"].format("DailyMeasures",tsStr())
218 Helper.saveListOfDictAsCSVToGCS(dataList,url)
219
220
221
222
223 @staticmethod
224 def getDailyInstalls(session, epochTime, dataset, fieldList):
225 '''
226 Queries Itunes API for date specified by epochTime
227 '''
228 #All measures in the Itunes API as of 12/11/2017
229 dateObj = datetime.date.fromtimestamp(epochTime)
230 bqDate = dateObj.strftime("%Y%m%d")
231 dateIso = dateObj.isoformat() + 'T00:00:00Z'
232
233 task = {
234 'endpoint': 'https://analytics.itunes.apple.com/analytics/api/v1/data/app/detail/dimensions',
235 'data': '{{"adamId":[{}],"frequency":"DAY","measure":"installs","dimensions":["appVersion","source","appReferrer","domainReferrer","campaignId"],"startTime":"{}","endTime":"{}","limit":155,"hideEmptyValues":true}}'.format(dataset["adamId"], dateIso, dateIso),
236 'requestType': 'post'
237 }
238
239 response = session.run(task)
240 response_2 = json.loads(response['read'])
241 results = response_2['results']
242 #logging.debug(results)
243
244
245 appVersionData = map(lambda x: {"key": x['key'], "installs": x['value']}, filter(lambda x: x['meetsThreshold'], list(filter(lambda x: x['dimension'] == 'appVersion', results))[0]['data']))
246 appReferrerData = map(lambda x: {"key": x['key'], "installs": x['value']}, filter(lambda x: x['meetsThreshold'], list(filter(lambda x: x['dimension'] == 'appReferrer', results))[0]['data']))
247 sourceData = map(lambda x: {"key": x['key'], "installs": x['value']}, filter(lambda x: x['meetsThreshold'], list(filter(lambda x: x['dimension'] == 'source', results))[0]['data']))
248 domainReferrerData = map(lambda x: {"key": x['key'], "installs": x['value']}, filter(lambda x: x['meetsThreshold'], list(filter(lambda x: x['dimension'] == 'domainReferrer', results))[0]['data']))
249 campaignIdData = map(lambda x: {"key": x['key'], "installs": x['value']}, filter(lambda x: x['meetsThreshold'], list(filter(lambda x: x['dimension'] == 'campaignId', results))[0]['data']))
250 #logging.debug(appVersionData)
251
252 timestamp = datetime.datetime.utcnow()
253 timestamp = timestamp.isoformat()
254
255 retval = {
256 "date": bqDate,
257 "timestamp": timestamp,
258 "appVersion": list(appVersionData),
259 "appReferrer": list(appReferrerData),
260 "source": list(sourceData),
261 "domainReferrer": list(domainReferrerData),
262 "campaignId": list(campaignIdData)
263 }
264
265 if len(fieldList)>0:
266 newRecord={}
267 for field in fieldList:
268 try:
269 newRecord[field] = retval[field]
270 except:
271 newRecord[field] = ""
272 retval=newRecord
273
274 return retval
275 #url=dataset["fileName"].format("DailyInstalls",bqDate)
276 #Helper.saveDictAsCSVToGCS(retval,url)
277
278 @staticmethod
279 def getDailyUninstalls(session, epochTime, dataset, fieldList):
280 '''
281 Queries Itunes API for date specified by epochTime
282 '''
283 #All measures in the Itunes API as of 12/11/2017
284 dateObj = datetime.date.fromtimestamp(epochTime)
285 bqDate = dateObj.strftime("%Y%m%d")
286 dateIso = dateObj.isoformat() + 'T00:00:00Z'
287
288 task = {
289 'endpoint': 'https://analytics.itunes.apple.com/analytics/api/v1/data/app/detail/dimensions',
290 'data': '{{"adamId":[{}],"frequency":"DAY","measure":"uninstalls","dimensions":["appVersion","source","appReferrer","domainReferrer","campaignId"],"startTime":"{}","endTime":"{}","limit":155,"hideEmptyValues":true}}'.format(dataset["adamId"], dateIso, dateIso),
291 'requestType': 'post'
292 }
293
294 response = session.run(task)
295 response_2 = json.loads(response['read'])
296 results = response_2['results']
297 #logging.debug(results)
298
299
300 appVersionData = map(lambda x: {"key": x['key'], "uninstalls": x['value']}, filter(lambda x: x['meetsThreshold'], list(filter(lambda x: x['dimension'] == 'appVersion', results))[0]['data']))
301 appReferrerData = map(lambda x: {"key": x['key'], "uninstalls": x['value']}, filter(lambda x: x['meetsThreshold'], list(filter(lambda x: x['dimension'] == 'appReferrer', results))[0]['data']))
302 sourceData = map(lambda x: {"key": x['key'], "uninstalls": x['value']}, filter(lambda x: x['meetsThreshold'], list(filter(lambda x: x['dimension'] == 'source', results))[0]['data']))
303 domainReferrerData = map(lambda x: {"key": x['key'], "uninstalls": x['value']}, filter(lambda x: x['meetsThreshold'], list(filter(lambda x: x['dimension'] == 'domainReferrer', results))[0]['data']))
304 campaignIdData = map(lambda x: {"key": x['key'], "uninstalls": x['value']}, filter(lambda x: x['meetsThreshold'], list(filter(lambda x: x['dimension'] == 'campaignId', results))[0]['data']))
305 #logging.debug(appVersionData)
306
307 timestamp = datetime.datetime.utcnow()
308 timestamp = timestamp.isoformat()
309
310 retval = {
311 "date": bqDate,
312 "timestamp": timestamp,
313 "appVersion": list(appVersionData),
314 "appReferrer": list(appReferrerData),
315 "source": list(sourceData),
316 "domainReferrer": list(domainReferrerData),
317 "campaignId": list(campaignIdData)
318 }
319
320 if len(fieldList)>0:
321 newRecord={}
322 for field in fieldList:
323 try:
324 newRecord[field] = retval[field]
325 except:
326 newRecord[field] = ""
327 retval=newRecord
328
329 return retval
330 #url=dataset["fileName"].format("DailyUninstalls",bqDate)
331 #Helper.saveDictAsCSVToGCS(retval,url)
332
333 @staticmethod
334 def getDailyActiveDevices(session, epochTime, dataset, fieldList):
335 '''
336 Queries Itunes API for date specified by epochTime
337 '''
338 #All measures in the Itunes API as of 12/11/2017
339 dateObj = datetime.date.fromtimestamp(epochTime)
340 bqDate = dateObj.strftime("%Y%m%d")
341 dateIso = dateObj.isoformat() + 'T00:00:00Z'
342
343 task = {
344 'endpoint': 'https://analytics.itunes.apple.com/analytics/api/v1/data/time-series',
345 'data': '{{"adamId":[{}],"frequency":"DAY","measures":["rollingActiveDevices"],"startTime":"{}","endTime":"{}"}}'.format(dataset["adamId"], dateIso, dateIso),
346 'requestType': 'post'
347 }
348
349 response = session.run(task)
350 response_2 = json.loads(response['read'])
351 results = response_2['results']
352 #self.response.write(json.dumps(results))
353 data = {}
354 for result in results:
355 data['date'] = result['data'][0]['date']
356 timestamp = datetime.datetime.utcnow()
357 timestamp = timestamp.isoformat()
358 data['timestamp'] = timestamp
359 data[result['totals']['key']] = result['totals']['value']
360
361 bqDate = data['date']
362 data['date'] = bqDate[:10].replace('-','')
363
364 if len(fieldList)>0:
365 newRecord={}
366 for field in fieldList:
367 try:
368 newRecord[field] = data[field]
369 except:
370 newRecord[field] = ""
371 data=newRecord
372
373 url=dataset["fileName"].format("DailyActiveDevices",data['date'])
374 Helper.saveDictAsCSVToGCS(data,url)
375
376
377 @staticmethod
378 def upload_data_to_gcs(
379 data,
380 url,
381 contentType='application/octet-stream'):
382 try:
383 logger.info(
384 "Uploading file: url='{}',contentType='{}'".format(
385 url, contentType))
386
387 bucket, remotePath, base = Helper.parseUrl(url)
388
389 client = storage.Client()
390 bucket = client.bucket(bucket)
391 bucket.blob(remotePath).upload_from_string(
392 data, content_type=contentType)
393 Helper.sendNotification("File '{}' been uploaded to GCS '{}'.".format(base,url))
394 return bucket.blob(remotePath).public_url
395
396 except Exception as e:
397 logger.error(e)
398 Helper.sendNotification("Error '{}' due to uploading file '{}' to GCS '{}'.".format(e,base,url), "ERROR")
399 raise e
400
401 return None
402
403
404 @staticmethod
405 def getDicFromJson(fname):
406 input_file = open(fname)
407 return json.load(input_file)
408
409 @staticmethod
410 def parseUrl(url):
411 try:
412 m = re.match(r"gs://([^/]*)/(.*)$", url)
413 bucket = m.group(1)
414 remotePath = m.group(2)
415 base = remotePath.split("/")[-1]
416 logger.debug(
417 "Parse URL '{}'. bucket='{}', remotePath='{}', base='{}'".format(
418 url, bucket, remotePath, base))
419 return bucket, remotePath, base
420 except Exception as e:
421 logger.error(__name__, e)
422 return False, e, False
423
424 @staticmethod
425 def downloadFile(bucket_name, remoteFile, localFile):
426 try:
427 logger.info(
428 "Start downloading file {}/{}".format(bucket_name, remoteFile))
429 storage_client = storage.Client()
430 bucket = storage_client.bucket(bucket_name)
431 blob = bucket.blob(remoteFile)
432 blob.download_to_filename(localFile)
433 logger.info("Downloaded to {}".format(localFile))
434 return True
435 except Exception as e:
436 logger.error(__name__, e)
437 return False
438
439
440if __name__ == '__main__':
441 logger.error(
442 "this module is Helper class definition, and not expected to be ran by itself.")
443