· 4 years ago · Aug 05, 2021, 03:58 AM
1import os
2import sys
3import math
4import json
5import pytz
6import awswrangler as wr
7import pandas as pd
8from PostgresConn import open_conn, close_conn
9from datetime import datetime, timedelta
10from dotenv import load_dotenv
11from twilio.rest import Client
12
13
14class TwilioJob:
15 """Twilio APIs Implementation"""
16
17 FILE_PATH = "./data/"
18 LOG_FILE_FORMAT = "%m-%d-%Y-%H-" + str(
19 math.floor(int(datetime.now(pytz.utc).strftime("%M")) / 15) * 15
20 )
21 S3_BUCKET = "s3://datalakeingest/twilio_calls/"
22 syncUpto = None
23
24 client = None
25 connection = None
26 cursor = None
27
28 schemaName = "twilio"
29 tableName = "calls"
30 tableSynced = False
31 fieldPrefix = "call_"
32 fieldTypes = {
33 "sid": "CHAR(40)",
34 "date_created": "TIMESTAMPTZ",
35 "date_updated": "TIMESTAMPTZ",
36 "parent_call_sid": "CHAR(40)",
37 "account_sid": "CHAR(40)",
38 "to": "CHAR(40)",
39 "to_formatted": "CHAR(40)",
40 "from_": "CHAR(40)",
41 "from_formatted": "CHAR(40)",
42 "phone_number_sid": "CHAR(40)",
43 "status": "CHAR(40)",
44 "start_time": "TIMESTAMPTZ",
45 "end_time": "CHAR(40)",
46 "duration": "char(40)",
47 "price": "char(40)",
48 "price_unit": "char(20)",
49 "direction": "char(40)",
50 "answered_by": "char(40)",
51 "annotation": "char(40)",
52 "api_version": "char(40)",
53 "forwarded_from": "char(40)",
54 "group_sid": "char(40)",
55 "caller_name": "char(40)",
56 "queue_time": "char(40)",
57 "trunk_sid": "char(40)",
58 "uri": "CHAR(255)",
59 }
60
61 def __init__(self):
62 load_dotenv()
63
64 datetime.utcnow().replace(tzinfo=pytz.utc)
65 self.syncUpto = datetime.now(pytz.utc) - timedelta(minutes=15)
66 account_sid = os.getenv("TWILIO_LANDING_ACCOUNT_SID")
67 auth_token = os.getenv("TWILIO_LANDING_AUTH_TOKEN")
68 self.client = Client(account_sid, auth_token)
69 self.connection, self.cursor = open_conn()
70
71 def process(self):
72 for call in self.client.calls.stream():
73 if call.date_created < self.syncUpto:
74 break
75
76 data = {}
77 for key in iter(call._properties):
78 val = call._properties[key]
79 if type(val).__name__ == "datetime":
80 data[key] = val.isoformat("T")
81 elif type(val).__name__ != "dict":
82 data[key] = val
83
84 jsonData = json.dumps(data)
85 self.log(call.date_created, jsonData)
86
87 # Pending files to be processed
88 pendingFiles = [
89 f
90 for f in os.listdir(self.FILE_PATH)
91 if os.path.isfile(os.path.join(self.FILE_PATH, f))
92 ]
93 for file in pendingFiles:
94 # Read source file contents
95 srcFilePath = self.FILE_PATH + file
96 fileObj = open(srcFilePath, "r")
97 sourceData = "[\n" + fileObj.read() + "{}\n]"
98 fileObj.close()
99
100 # Write file contents to destination as valid JSON
101 destFilePath = self.FILE_PATH + "processed/" + file
102 fileObj = open(destFilePath, "w")
103 fileObj.write(sourceData)
104 fileObj.close()
105
106 self.pushToPostgres(destFilePath)
107 self.pushToAWS(destFilePath)
108
109 # Remove source file after processing
110 os.remove(srcFilePath)
111
112 def pushToAWS(self, filePath):
113 if "push2aws" in sys.argv[1:]:
114 df = pd.read_json(filePath)
115 s3Path = self.S3_BUCKET + os.path.basename(filePath)
116 wr.s3.to_json(df, s3Path)
117 print(s3Path)
118
119 def pushToPostgres(self, filePath):
120 if "push2db" in sys.argv[1:]:
121 with open(filePath) as json_file:
122 records = json.load(json_file)
123
124 filename = os.path.basename(filePath)
125 insertBatchSql = ""
126 for data in records:
127 if not data:
128 continue
129
130 if not self.tableSynced:
131 # Check base table exists
132 schema = "CREATE TABLE IF NOT EXISTS {}.{}({}id SERIAL PRIMARY KEY, s3_filename CHAR(40))".format(
133 self.schemaName, self.tableName, self.fieldPrefix
134 )
135
136 self.cursor.execute(schema)
137 sql = "SELECT * FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = '{}' AND TABLE_NAME = '{}';".format(
138 self.schemaName, self.tableName
139 )
140 self.cursor.execute(sql)
141
142 # Alter table with additional fields, skip field if it exists
143 fields = []
144 row = self.cursor.fetchone()
145 while row is not None:
146 fields.append(row[3])
147 row = self.cursor.fetchone()
148
149 for key in data:
150 fieldName = self.fieldPrefix + key
151 if fieldName not in fields:
152 addField = (
153 "ALTER TABLE twilio.calls ADD COLUMN {} {}".format(
154 fieldName,
155 (
156 self.fieldTypes[key]
157 if key in self.fieldTypes
158 else "CHAR(255)"
159 ),
160 )
161 )
162 self.cursor.execute(addField)
163 self.connection.commit()
164
165 self.tableSynced = True
166 print("Schema is ready")
167
168 if not insertBatchSql:
169 columns = tuple(self.fieldPrefix + sub for sub in data.keys())
170 insertBatchSql = "INSERT INTO {}.{} ({}) VALUES ".format(
171 self.schemaName,
172 self.tableName,
173 ", ".join(columns) + ", s3_filename",
174 )
175
176 values = tuple(item if item else "" for item in data.values())
177 insertBatchSql += "('" + "', '".join(values) + "', '" + filename + "'),"
178
179 insertBatchSql = insertBatchSql[:-1]
180 self.cursor.execute(insertBatchSql)
181 self.connection.commit()
182
183 print("Inserted {} rows".format(self.cursor.rowcount))
184
185 def log(self, time, data):
186 fileLocation = os.path.abspath(
187 self.FILE_PATH + "calls-" + time.strftime(self.LOG_FILE_FORMAT) + ".log"
188 )
189 if not os.path.exists(fileLocation):
190 with open(fileLocation, "w") as myfile:
191 print("Created {}".format(os.path.basename(fileLocation)))
192 myfile.close()
193
194 with open(fileLocation, "a") as myfile:
195 myfile.write(data + ",\n")
196
197
198twilioJob = TwilioJob()
199twilioJob.process()
200