· 6 years ago · Oct 20, 2019, 04:30 PM
1from django.conf import settings as cfg
2from pandas.io import gbq
3from pandas_gbq import gbq
4from ..helpers import huub_gbq as hbq
5import time
6import logging
7from google.cloud import bigquery
8
9logger = logging.getLogger(__name__)
10retry_bigquery_get = 0
11retry_bigquery_ins = 0
12
13############################################################
14# INSERT QUERY #
15############################################################
16
17
18# function that insert rows on any table on Big Query database
19# dataset_id is the dataset where the table resides
20# table_id is the table name where the data should be inserted
21# rows_to_insert is a list with all the data to be inserted
22# client_big_query is the Big Query connector
23# the if_exists argument has a default value 'append' that append the data to the existing table (table_id)
24# or create the table and insert data
25# if_exists has other 2 options: 'fails', will make the insert fail if the table already exists
26# and 'replace' that will delete the existing table and will create another one with the table_id name and
27# the schema passed on the df_to_insert; this option will make a data delay of 2 minutes due to a Google Big Query issue
28def insert_rows(dataset_id, table_id, df_to_insert, if_exists='append', env='uat'):
29 global retry_bigquery_ins
30 try:
31 # clean the \n character
32 df = df_to_insert.replace('\n', '', regex=True)
33 # insert data into Google Big Query
34 gbq.to_gbq(dataframe=df,
35 destination_table='{}.{}'.format(dataset_id, table_id),
36 project_id=cfg.BIGQUERY_CONNECTOR[env][0]['BIG_QUERY_PROJECT'],
37 private_key=cfg.BIGQUERY_CONNECTOR[env][0]['BIGQUERY_CONNECTOR_CREDENTIALS'],
38 if_exists=if_exists)
39 return None
40
41 except Exception as e:
42 if ('500' in str(e) or '502' in str(e)) and retry_bigquery_ins < cfg.BIG_QUERY_RETRY:
43 retry_bigquery_ins = retry_bigquery_ins + 1
44 logger.warning('Got 500 error - Error: {0}'.format(e))
45 time.sleep(5)
46 insert_rows(dataset_id, table_id, df_to_insert, if_exists)
47 else:
48 logger.error('Got an error on insert', exc_info=True)
49 return 'Got an error on insert'
50
51############################################################
52# EXECUTE QUERY #
53############################################################
54
55
56# This function returns the result from a query executed on Big Query database
57# query is the query to run on the database
58# result_type is the data type that the result set must have
59# we have 3 possibilities: dataframe, dict and json; dict is the default value
60# return is Null if some error occurred during the query execution
61# the last argument (expected_return) is used to know if the query to run must return a dataset (select clause)
62# or not (insert from select clause or update from select clause)
63def get_query_results(query, result_type='dict', expected_return=True, env='uat'):
64 global retry_bigquery_get
65 try:
66 if expected_return:
67 results_df = gbq.read_gbq(query=query,
68 expected_return=expected_return,
69 project_id=cfg.BIGQUERY_CONNECTOR[env]['BIG_QUERY_PROJECT'],
70 private_key=cfg.BIGQUERY_CONNECTOR[env]['BIGQUERY_CONNECTOR_CREDENTIALS'],
71 dialect='standard')
72
73 else:
74 results_df = hbq.read_gbq(query=query,
75 expected_return=expected_return,
76 project_id=cfg.BIGQUERY_CONNECTOR[env]['BIG_QUERY_PROJECT'],
77 private_key=cfg.BIGQUERY_CONNECTOR[env]['BIGQUERY_CONNECTOR_CREDENTIALS'],
78 dialect='standard')
79
80 if result_type == 'dataframe' and expected_return is True:
81 return results_df
82 elif result_type == 'dict' and expected_return is True:
83 # the result is a list of dicts
84 return results_df.to_dict(orient='records')
85 elif result_type == 'json' and expected_return is True:
86 # the result is a json_array
87 return results_df.to_json(orient='records')
88 else:
89 return 'Success'
90
91 except OverflowError as e:
92 if str(e) == 'Python int too large to convert to C long':
93 logger.warning('Got an overflow error - Error: {0}'.format(e))
94 return 'Success'
95 else:
96 logger.exception('Different overflow error - analyse - Error: {0}'.format(e))
97 return None
98 except Exception as e:
99 if ('500' in str(e) or '502' in str(e)) and retry_bigquery_get < cfg.BIG_QUERY_RETRY:
100 retry_bigquery_get = retry_bigquery_get + 1
101 logger.warning('Got 500 error - Error: {0}'.format(e))
102 time.sleep(5)
103 get_query_results(query, result_type, expected_return)
104 else:
105 logger.exception('Could not execute query on Big Query - Error: {0}'.format(e))
106 return None
107
108
109def stream_rows(rows_to_insert, dataset_id, table_id, env='uat'):
110 try:
111 client = bigquery.Client.from_service_account_json(json_credentials_path=cfg.BIGQUERY_CONNECTOR[env][0]['BIGQUERY_CONNECTOR_CREDENTIALS'],
112 project=cfg.BIGQUERY_CONNECTOR[env][0]['BIG_QUERY_PROJECT'])
113 table_ref = client.dataset(dataset_id).table(table_id)
114 table = client.get_table(table_ref) # API request
115 errors = client.insert_rows(table, rows_to_insert) # API request
116 if not errors == []:
117 raise Exception(errors)
118 return None
119 except Exception as e:
120 logger.exception('Could not stream data to Big Query - Error: {0}'.format(e))
121 return 'Failed'
122
123
124def insert_from_table(sql, dataset_id, table_id, env='uat'):
125 try:
126 client = bigquery.Client.from_service_account_json(json_credentials_path=cfg.BIGQUERY_CONNECTOR[env][0]['BIGQUERY_CONNECTOR_CREDENTIALS'],
127 project=cfg.BIGQUERY_CONNECTOR[env][0]['BIG_QUERY_PROJECT'])
128 job_config = bigquery.QueryJobConfig()
129 # Set the destination table
130 table_ref = client.dataset(dataset_id).table(table_id)
131 job_config.destination = table_ref
132 # Start the query, passing in the extra configuration.
133 query_job = client.query(
134 sql,
135 # Location must match that of the dataset(s) referenced in the query
136 # and of the destination table.
137 location=cfg.BIG_QUERY_LOCATION,
138 job_config=job_config) # API request - starts the query
139
140 query_job.result() # Waits for the query to finish
141 return None
142 except Exception as e:
143 logger.exception('Could not insert data from persistent table - Error: {0}'.format(e))
144 return 'Failed'