· 6 years ago · Aug 07, 2019, 02:30 AM
1#!/usr/bin/env python
2import sys
3import click
4import atexit
5import os
6import logging
7import re
8import subprocess
9from boto.s3.connection import S3Connection
10
11from tools.bi_db_consts import RedShiftSpectrum
12from tools.bi_db import DataB
13from datetime import datetime as dt
14from retrying import retry
15
16# gobal declaration of redshift vars for db maintenance
17rshift = None
18vacuum_running = False
19analyze_running = False
20
21logger = logging.getLogger("sync_s3_redshift_mgr")
22formatter = logging.Formatter('%(process)d|%(asctime)s|%(name)s|%(levelname)s|%(message)s')
23
24error_handler = logging.FileHandler("error.log")
25error_handler.setFormatter(formatter)
26error_handler.setLevel(logging.ERROR)
27
28stream_handler = logging.StreamHandler(sys.stdout)
29stream_handler.setFormatter(formatter)
30stream_handler.setLevel(logging.DEBUG)
31
32logger.addHandler(stream_handler)
33logger.addHandler(error_handler)
34logger.setLevel(logging.DEBUG)
35
36
37def vacuum_db():
38 global vacuum_running
39 if not vacuum_running:
40 vacuum_running = True
41 logger.info('Running: {}'.format('fn - vacuum_db'))
42 rshift_4_vacuum = DataB(db_choice=RedShiftSpectrum, sql_language=RedShiftSpectrum.LANGUAGE)
43 rshift_4_vacuum.create_conn()
44 rshift_4_vacuum.conn.execute("""END TRANSACTION; VACUUM pub_master""")
45 rshift_4_vacuum.kill_conn()
46 vacuum_running = False
47
48
49def re_analyze_db():
50 global analyze_running
51 if not analyze_running:
52 analyze_running = True
53 logger.info('Running: {}'.format('fn - re_analyze_db'))
54 re_analyze_db = DataB(db_choice=RedShiftSpectrum, sql_language=RedShiftSpectrum.LANGUAGE)
55 re_analyze_db.create_conn()
56 re_analyze_db.conn.execute("""END TRANSACTION; ANALYZE pub_master""")
57 re_analyze_db.kill_conn()
58 analyze_running = False
59
60
61def create_conn():
62 logger.info('Running: {}'.format('fn - create_conn'))
63 global rshift
64 rshift = DataB(db_choice=RedShiftSpectrum, sql_language=RedShiftSpectrum.LANGUAGE)
65 rshift.create_conn()
66 return rshift
67
68
69def get_csvs_currently_in_s3():
70 logger.info('Running: {}'.format('fn - get_csvs_currently_in_s3'))
71 s3_dir = subprocess.check_output("s3cmd ls --recursive s3://temp/".split(" "))
72 csvs = []
73 for x in s3_dir.split("\n"):
74 if len(x) == 0:
75 continue
76 csvs.append(re.match(".*s3://.*date=(?P<date>[^/]+)/(?P<org_name>.*)", x).groups())
77 return csvs
78
79
80def close_all_open_db_connections():
81 logger.info('Running: {}'.format('fn - close_all_open_db_connections'))
82 if rshift:
83 rshift.kill_all_db()
84
85
86def get_date_org_list_from_redshift():
87 logger.info('Running: {}'.format('fn - get_date_org_list_from_redshift'))
88 # Pulls list [date, org] that exist in redshift
89 create_conn()
90 rshift_select = """
91 select start_date, org_name, inserted_at
92 from pub_master
93 group by 1, 2, 3
94 order by start_date desc
95 """
96 result = rshift.conn.execute(rshift_select)
97 return result.fetchall()
98
99
100def get_org_and_date_to_copy_from_s3_to_redshift(csvs_in_s3, org_dict, org_meta_data, process_by_date):
101 logger.info('Running: {}'.format('fn - get_org_and_date_to_copy_from_s3_to_redshift'))
102 conn = S3Connection()
103 bucket = conn.get_bucket('30d-retention-us-west-2')
104 list_of_org_date_dicts_to_instruct_copy_from_s3_to_redshift = []
105
106 if process_by_date:
107 logger.debug('Dates that will be updated in bulk: {}'.format(org_dict.keys()))
108 return org_dict.keys()
109
110 for csv in csvs_in_s3:
111 try:
112 key_meta = bucket.get_key('temp/date={}/{}'.format(csv[0], csv[1]))
113 most_recent_uploaded_to_s3 = dt.strptime(key_meta.last_modified, '%a, %d %b %Y %H:%M:%S %Z')
114 previous_upload_to_s3 = org_meta_data[csv[1]]['previous_upload_to_s3']
115
116 if csv[1] not in org_dict.get(csv[0], []) or ((csv[0] == org_meta_data[csv[1]]['date']) and (
117 most_recent_uploaded_to_s3 - previous_upload_to_s3).days > 5
118 ) or (csv[0] != org_meta_data[csv[1]]['date']):
119
120 key_meta = bucket.get_key('temp/date={}/{}'.format(csv[0], csv[1]))
121 if key_meta.size:
122 list_of_org_date_dicts_to_instruct_copy_from_s3_to_redshift.append({
123 'org_name': csv[1],
124 'date': csv[0]
125 })
126 # logger.debug('Updated Needed on {} for {}{}'.format(csv[0],
127 # '{} b/c data was not found in Redshift but s3 file found w/ {}kb of data'.format(
128 # csv[1], key_meta.size) if bool(csv[1] not in org_dict.get(csv[0], [])) else '',
129 # '{} b/c data was refreshed {} days ago'.format(
130 # csv[1], (most_recent_uploaded_to_s3 - previous_upload_to_s3).days) if bool((most_recent_uploaded_to_s3 - previous_upload_to_s3).days > 5) else ''))
131 except Exception:
132 # these are the dates that are not in org_dict as keys
133 pass
134 return list_of_org_date_dicts_to_instruct_copy_from_s3_to_redshift
135
136
137def maintain_db_health(counter):
138 logger.info('Running: {}'.format('fn - maintain_db_health'))
139 DataB.kill_all_db()
140 logger.debug('Ended all connections, starting vacumm')
141 vacuum_db()
142 logger.debug('Vacuum completed, starting db analyze')
143 re_analyze_db()
144 logger.debug('Finished db analyze command, creating new conn')
145 global rshift
146 rshift = create_conn()
147 logger.debug('Continuing redshift work')
148
149
150def drop_staging():
151 logger.info('Running: {}'.format('fn - drop_staging'))
152 rshift.conn.execute("""DROP TABLE IF EXISTS pub_staging;""")
153
154
155def create_temp_staging():
156 logger.info('Running: {}'.format('fn - create_temp_staging'))
157 rshift.conn.execute("""
158 CREATE TEMP TABLE pub_staging (LIKE pub_master);
159 """)
160
161
162def copy_from_s3_to_redshift(date, org_name=None):
163 logger.info('Running: {}'.format('fn - copy_from_s3_to_redshift'))
164 if org_name:
165 s3_url = 's3://temp/date={}/{}'.format(date, org_name)
166 else:
167 s3_url = 's3://temp/date={}'.format(date)
168 copy_command = """
169 COPY pub_staging (start_date, org_id, org_name, inserted_at, org_partner_cost,
170 organization_cost, uan_cost, app, source, os, platform, country_field, adn_sub_campaign_name,
171 adn_sub_adnetwork_name, adn_original_currency, adn_campaign_name, keyword, publisher_id, publisher_site_name,
172 unified_campaign_name, organization_currency, adn_cost, adn_impressions, custom_clicks,
173 custom_installs, adn_original_cost, adn_clicks, adn_installs, revenue_1, revenue_1_original,
174 revenue_7, revenue_7_original, revenue_14, revenue_14_original, revenue_30, revenue_30_original)
175 FROM '{}'
176 IAM_ROLE '{}'
177 CSV IGNOREHEADER 1
178 TIMEFORMAT AS 'YYYY-MM-DD HH24:MI:SS'
179 MAXERROR AS 10;
180 """.format(
181 s3_url,
182 os.getenv('ROLE'))
183
184 rshift.conn.execute(copy_command)
185
186
187def delete_from_redshift_where_updates_are_present():
188 logger.info('Running: {}'.format('fn - delete_from_redshift_where_updates_are_present'))
189 rshift.conn.execute("""BEGIN TRANSACTION;""")
190 rshift.conn.execute("""
191 DELETE FROM pub_master
192 USING pub_staging
193 WHERE pub_master.start_date = pub_staging.start_date
194 AND pub_master.org_id = pub_staging.org_id
195 AND pub_master.org_name = pub_staging.org_name
196 """)
197
198
199def insert_into_redshift_from_staging():
200 logger.info('Running: {}'.format('fn - insert_into_redshift_from_staging'))
201 rshift.conn.execute("""
202 INSERT INTO pub_master
203 SELECT start_date, org_id, org_name, inserted_at, org_partner_cost, organization_cost, uan_cost,
204 app, source, os, platform, country_field, adn_sub_campaign_name, adn_sub_adnetwork_name,
205 adn_original_currency, adn_campaign_name, keyword, publisher_id, publisher_site_name,
206 unified_campaign_name, organization_currency, adn_cost, adn_impressions, custom_clicks,
207 custom_installs, adn_original_cost, adn_clicks, adn_installs, revenue_1, revenue_1_original,
208 revenue_7, revenue_7_original, revenue_14, revenue_14_original, revenue_30, revenue_30_original
209 FROM pub_staging;
210 """)
211 rshift.conn.execute("""END TRANSACTION;""")
212
213
214@retry(wait_exponential_multiplier=1000, wait_exponential_max=60000, stop_max_attempt_number=2)
215def do_work(date, org_name=None):
216 logger.info("Copying data from S3 to Redshift for: {}{}".format(
217 org_name if org_name else '', ' - ' + date if org_name else date))
218 trans = rshift.conn.begin()
219 try:
220 drop_staging()
221 create_temp_staging()
222 copy_from_s3_to_redshift(date, org_name)
223 delete_from_redshift_where_updates_are_present()
224 insert_into_redshift_from_staging()
225 trans.commit()
226 rshift.kill_all_db()
227 rshift.create_conn()
228 logger.info('Finished: {}{}, Moving on to next instruction'.format(
229 org_name if org_name else '', ' - ' + date if org_name else date))
230 except Exception, err:
231 logger.error("Rolling back: {}".format(err))
232 trans.rollback()
233 logger.error("Writing to errors file: {}".format(err))
234 rshift.kill_all_db()
235 rshift.create_conn()
236 except KeyboardInterrupt:
237 sys.exit()
238
239
240@click.group()
241def cli():
242 pass
243
244
245@cli.command()
246@click.option('--process-by', type=click.Choice(['date', 'dateorg']), default='date')
247def copy_s3_data_to_redshift(process_by):
248 logger.info('Running: {}'.format('fn - main'))
249 process_by_date = True if process_by == 'date' else False
250 csvs_in_s3 = get_csvs_currently_in_s3()
251
252 data_from_redshift = get_date_org_list_from_redshift()
253
254 redshift_orgs_organized_by_dict_key_date = {}
255 org_meta_data = {}
256 for date_org_tup in data_from_redshift:
257 org_date = dt.strftime(date_org_tup[0], '%Y-%m-%d')
258 org_name = date_org_tup[1]
259 org_meta_data.update({
260 date_org_tup[1]: {
261 'date': dt.strftime(date_org_tup[0], '%Y-%m-%d'),
262 'previous_upload_to_s3': date_org_tup[2]
263 }
264 })
265
266 if org_date in redshift_orgs_organized_by_dict_key_date.keys():
267 redshift_orgs_organized_by_dict_key_date[org_date].append(org_name)
268 else:
269 redshift_orgs_organized_by_dict_key_date.update({
270 org_date: [org_name]
271 })
272
273 list_of_org_date_dicts_to_instruct_copy_from_s3_to_redshift = get_org_and_date_to_copy_from_s3_to_redshift(
274 csvs_in_s3, redshift_orgs_organized_by_dict_key_date, org_meta_data, process_by_date
275 )
276
277 counter = 0
278 for instruction in list_of_org_date_dicts_to_instruct_copy_from_s3_to_redshift:
279 date = instruction if process_by_date else instruction['date']
280 name = None if process_by_date else instruction['org_name']
281
282 counter += 1
283 if (counter % 30 == 0 or counter == 1):
284 maintain_db_health(counter)
285
286 do_work(date, name)
287
288
289if __name__ == '__main__':
290 atexit.register(close_all_open_db_connections)
291 copy_s3_data_to_redshift()