· 4 years ago · Feb 16, 2021, 11:18 AM
1"""
2Author: Leonid Romanovsky
3Date: 2017-10-18
4Description: Module aggregating functions to work with
5 sqlAlchemy connection.
6
7"""
8
9import re
10
11import pandas as pd
12import sqlalchemy
13from numpy import int64, float64
14from sqlalchemy import VARCHAR, INT, BIGINT, FLOAT
15from sqlalchemy.sql import text
16
17from . import stuff as stf
18
19# Setups.
20marker_description = '-- DESCRIPTION:'
21
22
23def execute(sql_engine, query, pass_errs=False):
24 """
25 Execute SQL query.
26 :param pass_errs:
27 :param sql_engine:
28 :param query:
29 :return:
30 """
31 query = text(query)
32 res = None
33 try:
34 res = sql_engine.execute(query.execution_options(autocommit=True))
35 except Exception as e:
36 if pass_errs:
37 pass
38 else:
39 raise e
40 return res
41
42
43def get_query_descr(query):
44 """
45 Tries to get query definition.
46 :param query:
47 :return:
48 """
49 _q = query.splitlines()
50 _q = [v.strip() for v in _q]
51 _q = [v for v in _q if v.startswith(marker_description)]
52 if len(_q) > 0:
53 _q = _q[0]
54 _d = _q[len(marker_description):]
55 _d = _d.strip()
56 else:
57 _d = None
58 return _d
59# def get_quesry_descr(query):
60
61
62def execute_many(sql_engine,
63 _queries_text,
64 sep=';',
65 pass_errs=False,
66 log=None):
67 """
68 Execute multiple SQL queries divided by ';' separator
69 :param pass_errs:
70 :param log:
71 :param sep:
72 :param sql_engine:
73 :param _queries_text:
74 :return:
75 """
76 # Remove unwanted comment lines.
77 qlines = _queries_text.splitlines()
78 qlines = [l.strip() for l in qlines]
79 # Remove comment lines ('--' at the beginning) and markers.
80 qlines = [
81 l for l in qlines
82 if not l.startswith('--') # Remove comment lines.
83 # or l.strip().startswith(marker_description) # Not remove description maker.
84 ]
85 # Remove inline comment from code lines end.
86 qlines = [l.strip() for l in qlines]
87 qlines = [re.sub(r'([^%]+)(--)(.*)', r'\1\2', l) for l in qlines]
88 qlines = [l.replace(sep, '') if l.startswith('--') else l # Remove separator from comment lines.
89 for l in qlines]
90 queries_text = '\n'.join(qlines)
91
92 # Split on separate queries.
93 queries = filter(len, queries_text.split(sep))
94 queries = list(queries)
95 results = []
96 for i, q in enumerate(queries): # For each query.
97 q = str(q).strip()
98 if len(q) == 0:
99 continue
100 if log: # Logging message.
101 msg = 'execute_many says: ' \
102 'Processing query %d of %d' \
103 % (i+1, len(queries))
104 _descr = get_query_descr(q) # Get query description.
105 if _descr: # Add description message.
106 msg += '; description: ' + _descr
107 else:
108 msg += '.'
109 log(msg)
110
111 res = execute(sql_engine, # Execute query.
112 q,
113 pass_errs=pass_errs)
114 results.append(res)
115 return results
116
117
118def query_result(sql_engine, query, tupled=False):
119 """
120 Execute SQL query and return result
121 :param tupled:
122 :param sql_engine:
123 :param query:
124 :return:
125 """
126 if 'SELECT *' in query.upper():
127 raise ValueError('`SELECT *` not allowed in sql statements.')
128 res = sql_engine.execute(query)
129 fres = []
130 for row in res:
131 row = dict(row)
132 if tupled:
133 row = row.values()
134 else:
135 row = {k.lower(): v for k, v in row.items()}
136 fres.append(row)
137 return fres
138
139
140def join_quoted(arr):
141 """
142 Wraps values in quotes and joins by comma
143 :param arr:
144 :return:
145 """
146 return ", ".join("'" + str(v) + "'" for v in arr)
147
148
149def comma_join_parenthesis(arr):
150 """
151 Wraps values in parenthesis and joins by comma
152 :param arr:
153 :return:
154 """
155 return ", ".join("(" + str(v) + ")" for v in arr)
156
157
158def create_engine_db2(con_str):
159 """
160 Create sqlalchemy connection using
161 standard db2 ( ibm-db package)
162 connection string.
163 :param con_str: db2 connection string.
164 :return: sql alchemy engine.
165 """
166 acon_str_tmplt = 'ibm_db_sa://{4}:{5}@{1}:{2}/{0}'
167 _tmp_creds = con_str.strip().split(';')
168 _creds = [v.split('=')[1] for v in _tmp_creds if len(v) > 0]
169 acon_str = acon_str_tmplt.format(*_creds)
170 try:
171 sql_engine = sqlalchemy.create_engine(
172 acon_str,
173 max_overflow=30,
174 echo=False,
175 echo_pool=False
176 )
177 except Exception as e:
178 msg = 'Cannot connect to db2 ' \
179 'due to: %s' % str(e)
180 raise Exception(msg)
181 return sql_engine
182# def create_con_db2(con_str):
183
184
185def result_chunked(
186 db_engine,
187 query_template,
188 ids_list,
189 other_args={},
190 chunk_size=10000,
191 tupled=False,
192 log_func=None,
193 max_chunks=-1,
194):
195 """
196 Loads data by chunks.
197 :param max_chunks:
198 :param log_func:
199 :param tupled:
200 :param chunk_size:
201 :param db_engine:
202 :param query_template:
203 :param ids_list: list of id -- will be comma joined and put to query under `IDs` tag.
204 :param other_args:
205 :return:
206 """
207 param_str_builder = stf.comma_join_parenthesis_quoted
208
209 # Splitting on chunks.
210 chunks = stf.get_chunks(ids_list, chunk_size)
211
212 res = []
213 cnt = len(ids_list) / chunk_size + 1
214 if max_chunks > 0:
215 cnt = max_chunks
216 cntr = 0
217 start_t = stf.now()
218 for chunk in chunks:
219 cntr += 1
220 if 0 < max_chunks < cntr:
221 break
222 if log_func:
223 tl = stf.time_left(cnt, cntr, start_t)
224 log_func('Processing %s of %s; '
225 '%s left.' % (cntr, cnt, tl))
226 ids_param = param_str_builder(chunk)
227 other_args['IDs'] = ids_param
228 stmt = query_template.format(**other_args)
229 chunk_res = query_result(db_engine,
230 stmt,
231 tupled)
232 res.extend(chunk_res)
233 return res
234# def result_chunked(ids_list, other_args):
235
236
237def drop_table(con,
238 table_name,
239 schema_name
240 ):
241 if schema_name.lower() == 'mendata':
242 raise ValueError('Cannot drop in `MENDATA` schema.')
243 if not isinstance(table_name, list):
244 table_name = [table_name]
245 for _tbl in table_name:
246 sql_args = {'SCHEMA': schema_name,
247 'TABLE': _tbl}
248 qry = 'DROP TABLE {SCHEMA}.{TABLE}'
249 qry = qry.format(**sql_args) # Drop table.
250 execute(con, qry, pass_errs=True)
251 return
252# def drop_table(table_name,
253
254
255def check_table_exists(db_engine, table_name, schema=None):
256 """
257 Checks if table exists in database.
258 :param db_engine:
259 :param table_name:
260 :param schema:
261 :return:
262 """
263 if schema:
264 table_name = schema + '.' + table_name
265 _q = 'SELECT 1 AS res FROM %s LIMIT 1'\
266 % table_name
267 try:
268 execute(db_engine, _q)
269 res = 1
270 except Exception as e:
271 res = 0
272 return res
273# def chack_table_exists(db_engnie
274
275
276def get_count(con, schema, table, condition=None):
277 """
278 Get count from database.
279 :param con:
280 :param schema: schema name.
281 :param table: table name.
282 :param condition: condition for `WHERE` statement.
283 :return: rows count.
284 """
285 query = "SELECT COUNT(*) AS cnt FROM %s.%s"
286 sql_args = (schema, table)
287
288 if condition: # Add WHERE statement condition, if required.
289 query += '\n\t\tWHERE %s'
290 sql_args.append(condition)
291
292 query = query % sql_args
293 res = query_result(con, query)
294 cnt = res[0]['cnt']
295 return cnt
296# def get_count(
297
298
299def get_sql_values(values_list):
300 """
301 Build list of values
302 :param values_list: list of valuest to be used with `VALUES` statement.
303 :return:
304 """
305 return stf.comma_join_parenthesis(values_list)
306# def get_sql_values(values_list):
307
308
309def get_distinct(con,
310 col_name,
311 schema_name,
312 table_name):
313 """
314 Get distinct values from single column.
315 :param con: sqlaclhemy connection;
316 :param col_name: column to read;
317 :param schema_name: schema name;
318 :param table_name: table containing required column;
319 :return: list of distinct values.
320 """
321 col_name = col_name.lower()
322 q = 'SELECT DISTINCT VARCHAR({}) AS {} FROM {}.{}'.format(
323 col_name, col_name, schema_name, table_name
324 )
325 ret_val = pd.read_sql(q, con)[col_name].tolist()
326 return ret_val
327
328
329def dataframe_to_sql_old(df, con, tablename, schemaname, dtypes, chunksize=10000):
330 """
331 Upload dataframe to the database. This function was created to extend
332 standard pandas function `to_sql` with the `COMPRESS YES ADAPTIVE NOT
333 LOGGED INITIALLY` clause to avoid database disk space and performance
334 issues.
335 NB! Not tested yet with string data types.
336 :param df: dataframeto be uploaded;
337 :param con: sqlalchemydatabase connection;
338 :param tablename: database table name;
339 :param schemaname: database schema name;
340 :param dtypes: list ordict of sqlalchemy data types;
341 :param chunksize: `to_sql` function chunksize argument;
342 """
343 if type(dtypes) is dict:
344 _cols = dtypes.keys()
345 _dtypes = dtypes.values()
346 elif isinstance(dtypes, list):
347 _cols = df.columns
348 _dtypes = dtypes
349 else:
350 raise ValueError('`dtypes` should be dict or list.')
351 if len(_cols) != len(_dtypes):
352 raise ValueError('`dtypes` len does not match columns names.')
353
354 columns = [
355 _c + ' ' + _t.__name__
356 for _c, _t in zip(_cols, _dtypes)
357 ]
358 columns = ',\n'.join(columns)
359 table_def = """
360 CREATE TABLE {schemaname}.{tablename}(
361 {columns}
362 )
363 COMPRESS YES ADAPTIVE
364 NOT LOGGED INITIALLY
365 """.format(**locals())
366 drop_table(con, tablename, schemaname)
367 execute(con, table_def)
368 df.to_sql(tablename, con, schema=schemaname, if_exists='append',
369 index=False, chunksize=chunksize)
370 return
371# def dataframe_to_sql_old(df)
372
373
374def sqlachemy_type_to_str(sqlalchemy_type):
375 import sqlalchemy.types as _st
376 if type(sqlalchemy_type) is _st.VARCHAR:
377 ret_val = str(sqlalchemy_type)
378 elif sqlalchemy_type == _st.INT:
379 ret_val = 'INT'
380 elif sqlalchemy_type == _st.BIGINT:
381 ret_val = 'BIGINT'
382 elif sqlalchemy_type == _st.DATE:
383 ret_val = 'DATE'
384 elif sqlalchemy_type == _st.TIMESTAMP:
385 ret_val = 'TIMESTAMP'
386 elif sqlalchemy_type == _st.FLOAT:
387 ret_val = 'FLOAT'
388 elif sqlalchemy_type == _st.SMALLINT:
389 ret_val = 'SMALLINT'
390 else:
391 raise ValueError('Have not str parser for sqlalchemy type `%s`.' % str(sqlalchemy_type))
392 return ret_val
393
394
395def create_index(con, schema_name, table_name, columns):
396 """
397 Creates index on required table.
398 :param con:
399 :param schema_name:
400 :param table_name:
401 :param columns:
402 :return:
403 """
404 if type(columns) is str: # or type(columns) is unicode:
405 columns = [columns]
406 random_marker = stf.get_session_id()
407 for i, col in enumerate(columns):
408 _query = 'CREATE INDEX {schema_name}.idx_{table_name}_{random_marker}_{i} ' \
409 'ON {schema_name}.{table_name}({col})'.format(**locals())
410 execute(con, _query)
411 runstats(con, schema_name, table_name)
412 return
413# def create_index
414
415
416def runstats(con, schema_name, table_name):
417 """
418 Executes `runstats` statement, required by DB2 after creating indices.
419 :param schema_name:
420 :param con:
421 :param table_name:
422 :return:
423 """
424 query = "CALL ADMIN_CMD('RUNSTATS ON TABLE {schema_name}.{table_name} AND INDEXES ALL')"
425 query = query.format(schema_name=schema_name, table_name=table_name)
426 execute(con, query)
427 commit(con)
428 return
429# def runstats
430
431
432def commit(con):
433 # Execute `COMMIT` statement
434 query = 'COMMIT'
435 execute(con, query)
436 return
437# def commit
438
439
440def convert_dataframe_dtypes_sql(df, sql_dtypes_mappings):
441 """
442 Converting data types using required sqlalchemy
443 dtypes map. Use it to avoid `non-homogeneous` error.
444 NB! integer type column does not support NULL values, this
445 is Pandas restriction.
446 :param df: dataframe to modify;
447 :param sql_dtypes_mappings: dict {column -> sqlalchemy type}
448 :return:
449 """
450 for i, _col in enumerate(df.columns):
451 mapped_type = sql_dtypes_mappings.get(_col)
452 if not mapped_type:
453 continue # No mapping for this column, skip.
454 if isinstance(mapped_type, VARCHAR):
455 df[_col] = df[_col].apply(str) # Issue: `astype` do not convert numbers.
456 elif isinstance(mapped_type, INT):
457 df[_col] = df[_col].astype(int)
458 elif isinstance(mapped_type, BIGINT):
459 df[_col] = df[_col].astype(int64)
460 elif isinstance(mapped_type, FLOAT):
461 df[_col] = df[_col].astype(float64)
462 else:
463 pass # Leave column type as is.
464 # df[_col] = df[_col].str
465 return
466
467
468def reformat_column_name(column_names):
469 """
470 Reformat columns names: lowercase all, exclude spaces, `-`, `,`, `.`, `$`, `%` and
471 any other symbols that will make database columns looks bad.
472 :param column_names:
473 :return:
474 """
475 if type(column_names) is str: # or type(column_names) is unicode:
476 column_names = [column_names] # Make list if single col_name passed.
477
478 # Prepare translator...
479 intab = "\\\t -,./$%^&+=#@|:;<>(){}[]"
480 outtab = "__________________________"
481
482 # Translate each column...
483 # new_col_names = map(lambda col_name: str(col_name).translate(trantab), column_names)
484 new_col_names = map(lambda col_name: str(col_name).maketrans(intab, outtab), column_names)
485 new_col_names = map(str.lower, new_col_names)
486
487 if len(new_col_names) == 1:
488 return new_col_names[0]
489
490 return new_col_names
491# def reformat_column_name
492