· 4 years ago · Feb 16, 2021, 11:16 AM
1"""
2Author: Leonid Romanovsky
3Date: 2017-10-18
4Description: Module aggregating functions to work with
5 sqlAlchemy connection.
6
7"""
8
9import re
10
11import pandas as pd
12import sqlalchemy
13from numpy import int64, float64
14from sqlalchemy import VARCHAR, INT, BIGINT, FLOAT
15from sqlalchemy.sql import text
16
17from . import stuff as stf
18
19# Setups.
20marker_description = '-- DESCRIPTION:'
21
22def execute(sql_engine, query, pass_errs=False):
23 """
24 Execute SQL query.
25 :param pass_errs:
26 :param sql_engine:
27 :param query:
28 :return:
29 """
30 query = text(query)
31 res = None
32 try:
33 res = sql_engine.execute(query.execution_options(autocommit=True))
34 except Exception as e:
35 if pass_errs:
36 pass
37 else:
38 raise e
39 return res
40
41def get_query_descr(query):
42 """
43 Tries to get query definition.
44 :param query:
45 :return:
46 """
47 _q = query.splitlines()
48 _q = [v.strip() for v in _q]
49 _q = [v for v in _q if v.startswith(marker_description)]
50 if len(_q) > 0:
51 _q = _q[0]
52 _d = _q[len(marker_description):]
53 _d = _d.strip()
54 else:
55 _d = None
56 return _d
57# def get_quesry_descr(query):
58
59def execute_many(sql_engine,
60 _queries_text,
61 sep=';',
62 pass_errs=False,
63 log=None):
64 """
65 Execute multiple SQL queries divided by ';' separator
66 :param pass_errs:
67 :param log:
68 :param sep:
69 :param sql_engine:
70 :param _queries_text:
71 :return:
72 """
73 # Remove unwanted comment lines.
74 qlines = _queries_text.splitlines()
75 qlines = [l.strip() for l in qlines]
76 # Remove comment lines ('--' at the beginning) and markers.
77 qlines = [
78 l for l in qlines
79 if not l.startswith('--') # Remove comment lines.
80 # or l.strip().startswith(marker_description) # Not remove description maker.
81 ]
82 # Remove inline comment from code lines end.
83 qlines = [l.strip() for l in qlines]
84 qlines = [re.sub(r'([^%]+)(--)(.*)', r'\1\2', l) for l in qlines]
85 qlines = [l.replace(sep, '') if l.startswith('--') else l # Remove separator from comment lines.
86 for l in qlines]
87 queries_text = '\n'.join(qlines)
88
89 # Split on separate queries.
90 queries = filter(len, queries_text.split(sep))
91 queries = list(queries)
92 results = []
93 for i, q in enumerate(queries): # For each query.
94 q = str(q).strip()
95 if len(q) == 0:
96 continue
97 if log: # Logging message.
98 msg = 'execute_many says: ' \
99 'Processing query %d of %d' \
100 % (i+1, len(queries))
101 _descr = get_query_descr(q) # Get query description.
102 if _descr: # Add description message.
103 msg += '; description: ' + _descr
104 else:
105 msg += '.'
106 log(msg)
107
108 res = execute(sql_engine, # Execute query.
109 q,
110 pass_errs=pass_errs)
111 results.append(res)
112 return results
113
114def query_result(sql_engine, query, tupled=False):
115 """
116 Execute SQL query and return result
117 :param tupled:
118 :param sql_engine:
119 :param query:
120 :return:
121 """
122 if 'SELECT *' in query.upper():
123 raise ValueError('`SELECT *` not allowed in sql statements.')
124 res = sql_engine.execute(query)
125 fres = []
126 for row in res:
127 row = dict(row)
128 if tupled:
129 row = row.values()
130 else:
131 row = {k.lower(): v for k, v in row.items()}
132 fres.append(row)
133 return fres
134
135def join_quoted(arr):
136 """
137 Wraps values in quotes and joins by comma
138 :param arr:
139 :return:
140 """
141 return ", ".join("'" + str(v) + "'" for v in arr)
142
143def comma_join_parenthesis(arr):
144 """
145 Wraps values in parenthesis and joins by comma
146 :param arr:
147 :return:
148 """
149 return ", ".join("(" + str(v) + ")" for v in arr)
150
151def create_engine_db2(con_str):
152 """
153 Create sqlalchemy connection using
154 standard db2 ( ibm-db package)
155 connection string.
156 :param con_str: db2 connection string.
157 :return: sql alchemy engine.
158 """
159 acon_str_tmplt = 'ibm_db_sa://{4}:{5}@{1}:{2}/{0}'
160 _tmp_creds = con_str.strip().split(';')
161 _creds = [v.split('=')[1] for v in _tmp_creds if len(v) > 0]
162 acon_str = acon_str_tmplt.format(*_creds)
163 try:
164 sql_engine = sqlalchemy.create_engine(
165 acon_str,
166 max_overflow=30,
167 echo=False,
168 echo_pool=False
169 )
170 except Exception as e:
171 msg = 'Cannot connect to db2 ' \
172 'due to: %s' % str(e)
173 raise Exception(msg)
174 return sql_engine
175# def create_con_db2(con_str):
176
177def result_chunked(
178 db_engine,
179 query_template,
180 ids_list,
181 other_args={},
182 chunk_size=10000,
183 tupled=False,
184 log_func=None,
185 max_chunks=-1,
186):
187 """
188 Loads data by chunks.
189 :param max_chunks:
190 :param log_func:
191 :param tupled:
192 :param chunk_size:
193 :param db_engine:
194 :param query_template:
195 :param ids_list: list of id -- will be comma joined and put to query under `IDs` tag.
196 :param other_args:
197 :return:
198 """
199 param_str_builder = stf.comma_join_parenthesis_quoted
200
201 # Splitting on chunks.
202 chunks = stf.get_chunks(ids_list, chunk_size)
203
204 res = []
205 cnt = len(ids_list) / chunk_size + 1
206 if max_chunks > 0:
207 cnt = max_chunks
208 cntr = 0
209 start_t = stf.now()
210 for chunk in chunks:
211 cntr += 1
212 if 0 < max_chunks < cntr:
213 break
214 if log_func:
215 tl = stf.time_left(cnt, cntr, start_t)
216 log_func('Processing %s of %s; '
217 '%s left.' % (cntr, cnt, tl))
218 ids_param = param_str_builder(chunk)
219 other_args['IDs'] = ids_param
220 stmt = query_template.format(**other_args)
221 chunk_res = query_result(db_engine,
222 stmt,
223 tupled)
224 res.extend(chunk_res)
225 return res
226# def result_chunked(ids_list, other_args):
227
228def drop_table(con,
229 table_name,
230 schema_name
231 ):
232 if schema_name.lower() == 'mendata':
233 raise ValueError('Cannot drop in `MENDATA` schema.')
234 if not isinstance(table_name, list):
235 table_name = [table_name]
236 for _tbl in table_name:
237 sql_args = {'SCHEMA': schema_name,
238 'TABLE': _tbl}
239 qry = 'DROP TABLE {SCHEMA}.{TABLE}'
240 qry = qry.format(**sql_args) # Drop table.
241 execute(con, qry, pass_errs=True)
242 return
243# def drop_table(table_name,
244
245def check_table_exists(db_engine, table_name, schema=None):
246 """
247 Checks if table exists in database.
248 :param db_engine:
249 :param table_name:
250 :param schema:
251 :return:
252 """
253 if schema:
254 table_name = schema + '.' + table_name
255 _q = 'SELECT 1 AS res FROM %s LIMIT 1'\
256 % table_name
257 try:
258 execute(db_engine, _q)
259 res = 1
260 except Exception as e:
261 res = 0
262 return res
263# def chack_table_exists(db_engnie
264
265def get_count(con, schema, table, condition=None):
266 """
267 Get count from database.
268 :param con:
269 :param schema: schema name.
270 :param table: table name.
271 :param condition: condition for `WHERE` statement.
272 :return: rows count.
273 """
274 query = "SELECT COUNT(*) AS cnt FROM %s.%s"
275 sql_args = (schema, table)
276
277 if condition: # Add WHERE statement condition, if required.
278 query += '\n\t\tWHERE %s'
279 sql_args.append(condition)
280
281 query = query % sql_args
282 res = query_result(con, query)
283 cnt = res[0]['cnt']
284 return cnt
285# def get_count(
286
287def get_sql_values(values_list):
288 """
289 Build list of values
290 :param values_list: list of valuest to be used with `VALUES` statement.
291 :return:
292 """
293 return stf.comma_join_parenthesis(values_list)
294# def get_sql_values(values_list):
295
296def get_distinct(con,
297 col_name,
298 schema_name,
299 table_name):
300 """
301 Get distinct values from single column.
302 :param con: sqlaclhemy connection;
303 :param col_name: column to read;
304 :param schema_name: schema name;
305 :param table_name: table containing required column;
306 :return: list of distinct values.
307 """
308 col_name = col_name.lower()
309 q = 'SELECT DISTINCT VARCHAR({}) AS {} FROM {}.{}'.format(
310 col_name, col_name, schema_name, table_name
311 )
312 ret_val = pd.read_sql(q, con)[col_name].tolist()
313 return ret_val
314
315def dataframe_to_sql_old(df, con, tablename, schemaname, dtypes, chunksize=10000):
316 """
317 Upload dataframe to the database. This function was created to extend
318 standard pandas function `to_sql` with the `COMPRESS YES ADAPTIVE NOT
319 LOGGED INITIALLY` clause to avoid database disk space and performance
320 issues.
321 NB! Not tested yet with string data types.
322 :param df: dataframeto be uploaded;
323 :param con: sqlalchemydatabase connection;
324 :param tablename: database table name;
325 :param schemaname: database schema name;
326 :param dtypes: list ordict of sqlalchemy data types;
327 :param chunksize: `to_sql` function chunksize argument;
328 """
329 if type(dtypes) is dict:
330 _cols = dtypes.keys()
331 _dtypes = dtypes.values()
332 elif isinstance(dtypes, list):
333 _cols = df.columns
334 _dtypes = dtypes
335 else:
336 raise ValueError('`dtypes` should be dict or list.')
337 if len(_cols) != len(_dtypes):
338 raise ValueError('`dtypes` len does not match columns names.')
339
340 columns = [
341 _c + ' ' + _t.__name__
342 for _c, _t in zip(_cols, _dtypes)
343 ]
344 columns = ',\n'.join(columns)
345 table_def = """
346 CREATE TABLE {schemaname}.{tablename}(
347 {columns}
348 )
349 COMPRESS YES ADAPTIVE
350 NOT LOGGED INITIALLY
351 """.format(**locals())
352 drop_table(con, tablename, schemaname)
353 execute(con, table_def)
354 df.to_sql(tablename, con, schema=schemaname, if_exists='append',
355 index=False, chunksize=chunksize)
356 return
357# def dataframe_to_sql_old(df)
358
359def sqlachemy_type_to_str(sqlalchemy_type):
360 import sqlalchemy.types as _st
361 if type(sqlalchemy_type) is _st.VARCHAR:
362 ret_val = str(sqlalchemy_type)
363 elif sqlalchemy_type == _st.INT:
364 ret_val = 'INT'
365 elif sqlalchemy_type == _st.BIGINT:
366 ret_val = 'BIGINT'
367 elif sqlalchemy_type == _st.DATE:
368 ret_val = 'DATE'
369 elif sqlalchemy_type == _st.TIMESTAMP:
370 ret_val = 'TIMESTAMP'
371 elif sqlalchemy_type == _st.FLOAT:
372 ret_val = 'FLOAT'
373 elif sqlalchemy_type == _st.SMALLINT:
374 ret_val = 'SMALLINT'
375 else:
376 raise ValueError('Have not str parser for sqlalchemy type `%s`.' % str(sqlalchemy_type))
377 return ret_val
378
379def create_index(con, schema_name, table_name, columns):
380 """
381 Creates index on required table.
382 :param con:
383 :param schema_name:
384 :param table_name:
385 :param columns:
386 :return:
387 """
388 if type(columns) is str: # or type(columns) is unicode:
389 columns = [columns]
390 random_marker = stf.get_session_id()
391 for i, col in enumerate(columns):
392 _query = 'CREATE INDEX {schema_name}.idx_{table_name}_{random_marker}_{i} ' \
393 'ON {schema_name}.{table_name}({col})'.format(**locals())
394 execute(con, _query)
395 runstats(con, schema_name, table_name)
396 return
397# def create_index
398
399def runstats(con, schema_name, table_name):
400 """
401 Executes `runstats` statement, required by DB2 after creating indices.
402 :param schema_name:
403 :param con:
404 :param table_name:
405 :return:
406 """
407 query = "CALL ADMIN_CMD('RUNSTATS ON TABLE {schema_name}.{table_name} AND INDEXES ALL')"
408 query = query.format(schema_name=schema_name, table_name=table_name)
409 execute(con, query)
410 commit(con)
411 return
412# def runstats
413
414def commit(con):
415 # Execute `COMMIT` statement
416 query = 'COMMIT'
417 execute(con, query)
418 return
419# def commit
420
421def convert_dataframe_dtypes_sql(df, sql_dtypes_mappings):
422 """
423 Converting data types using required sqlalchemy
424 dtypes map. Use it to avoid `non-homogeneous` error.
425 NB! integer type column does not support NULL values, this
426 is Pandas restriction.
427 :param df: dataframe to modify;
428 :param sql_dtypes_mappings: dict {column -> sqlalchemy type}
429 :return:
430 """
431 for i, _col in enumerate(df.columns):
432 mapped_type = sql_dtypes_mappings.get(_col)
433 if not mapped_type:
434 continue # No mapping for this column, skip.
435 if isinstance(mapped_type, VARCHAR):
436 df[_col] = df[_col].apply(str) # Issue: `astype` do not convert numbers.
437 elif isinstance(mapped_type, INT):
438 df[_col] = df[_col].astype(int)
439 elif isinstance(mapped_type, BIGINT):
440 df[_col] = df[_col].astype(int64)
441 elif isinstance(mapped_type, FLOAT):
442 df[_col] = df[_col].astype(float64)
443 else:
444 pass # Leave column type as is.
445 # df[_col] = df[_col].str
446 return
447
448def reformat_column_name(column_names):
449 """
450 Reformat columns names: lowercase all, exclude spaces, `-`, `,`, `.`, `$`, `%` and
451 any other symbols that will make database columns looks bad.
452 :param column_names:
453 :return:
454 """
455 if type(column_names) is str: # or type(column_names) is unicode:
456 column_names = [column_names] # Make list if single col_name passed.
457
458 # Prepare translator...
459 intab = "\\\t -,./$%^&+=#@|:;<>(){}[]"
460 outtab = "__________________________"
461
462 # Translate each column...
463 # new_col_names = map(lambda col_name: str(col_name).translate(trantab), column_names)
464 new_col_names = map(lambda col_name: str(col_name).maketrans(intab, outtab), column_names)
465 new_col_names = map(str.lower, new_col_names)
466
467 if len(new_col_names) == 1:
468 return new_col_names[0]
469
470 return new_col_names
471# def reformat_column_name
472
473
474вт, 16 февр. 2021 г., 12:37 Leonid Romanovskii <leonid.romanovskii@wave-access.com>:
475# *****************************************************************
476# *************************** Example 1 ***************************
477# *****************************************************************
478def lstm_dataset_generator(yconf, filtered_members_list, chunk_size=10, target_y_col='target_value'):
479 """,
480 Creates generator which iterates through train dataset, consisting of 3 files: preprocessed
481 x features, raw <DELETED> x features and future target value y.
482 :param yconf:
483 :param filtered_members_list:
484 :return:
485 """
486 visits_count = yconf.history_visits_count
487 filtered_members_list = sorted(filtered_members_list)
488 gs = yconf.generated_settings
489 fn_y_train = gs['fn_y_train']
490 fn_x_train_preproc = gs['fn_x_train_preproc']
491 fn_x_train_raw_diags = gs['fn_x_train_raw_diags']
492
493 df_y_train = pd.read_csv(fn_y_train, sep=sep)
494 build_first_step_y(df_y_train)
495 df_y_train.set_index('MBR_DK', inplace=True)
496
497 def split_mbr_x(_new_mbr_x):
498 _new_mbr_x = [_v.split(big_sep) for _v in _new_mbr_x]
499 try:
500 _new_mbr_x = [(id.split(sep), vals.split(sep)) for id, vals in _new_mbr_x]
501 except:
502 raise ValueError('ffff')
503 mbr_id = set(v[0][0] for v in _new_mbr_x)
504 if len(mbr_id) > 1:
505 raise ValueError('Wrong structure, more than one members in single `mbr_x`.')
506 return _new_mbr_x, mbr_id.pop()
507
508 def read_multiple_lines(_fin, _lines_count):
509 _lines = [None] * _lines_count
510 for i in range(_lines_count):
511 _l = _fin.readline()
512 _lines[i] = _l
513 return _lines
514
515 while True:
516 # Open x files for reading.
517 fin_x_train_preproc = open(fn_x_train_preproc)
518 fin_x_train_raw_diags = open(fn_x_train_raw_diags)
519
520 # Skip header lines.
521 header_x_train_preproc = fin_x_train_preproc.readline().split(big_sep)[1].split(sep)
522 # No header in aw doagnoses file.
523 # header_x_train_raw_diags = fin_x_train_raw_diags.readline().split(big_sep)[1].split(sep)
524
525 # Chunk through members list.
526 mbrs_chunks = get_chunks(filtered_members_list, chunk_size=chunk_size)
527
528 for chunk in mbrs_chunks:
529 chunk = set(chunk)
530 collected_mbrs = set()
531 chunk_x_preproc, chunk_x_raw_diags = [], []
532 while collected_mbrs != chunk: # Read untill all membres from this chunk are collected.
533 if len(collected_mbrs) > len(chunk):
534 raise ValueError('Incorrect members set\order.')
535
536 # Read next member from x data files.
537 new_mbr_x_preproc = read_multiple_lines(fin_x_train_preproc, visits_count)
538 new_mbr_x_raw_diags = read_multiple_lines(fin_x_train_raw_diags, visits_count)
539 new_mbr_x_preproc, mbr_id_p = split_mbr_x(new_mbr_x_preproc)
540 new_mbr_x_raw_diags, mbr_id_r = split_mbr_x(new_mbr_x_raw_diags)
541 if mbr_id_p != mbr_id_r:
542 raise ValueError('Preproc and raw_diags files provided different members.')
543
544 mbr_id = mbr_id_r
545 if mbr_id not in chunk:
546 continue # Member not in the list of filtered members, skip.
547
548 collected_mbrs.add(mbr_id)
549 chunk_x_preproc.append(new_mbr_x_preproc)
550 chunk_x_raw_diags.append(new_mbr_x_raw_diags)
551 # while collected_mbrs != chunk
552
553 chunk_y = df_y_train.loc[chunk]
554
555 yield chunk_x_preproc, chunk_y
556 # for chunk in mbrs_chunks
557 return
558# def lstm_dataset_generator
559
560
561
562
563# *****************************************************************
564# *************************** Example 2 ***************************
565# *****************************************************************
566class Marker(object):
567 def __init__(self, config):
568 self.config = config
569 self.dir_debug_data = join(config.dir_out, 'debug_data')
570 self.dir_frames = join(self.dir_debug_data, 'frames')
571 create_dirs((self.dir_debug_data, self.dir_frames))
572
573 self.painter = Painter(config['painter'])
574 self.player_detector = Frcnn(config['player_detector']['faster_rcnn'])
575 self.player_tracker = Tracker(config['player_tracker'])
576 # self.num_recognizer = DummyNumRecognizer(self.player_tracker)
577 self.num_recognizer = NumRecSv(self.dir_debug_data)
578 self.player_label = int(config['player_detector']['player_label'])
579 # def __init__
580
581 def processing(self, reader, recorder):
582 info = self.config.logger.info
583 success, frame = reader.read()
584 frames_counter = -1
585 start_time = time.time()
586 capture_duration = 10
587 players = []
588
589 while success:
590 # while success and (int(time.time() - start_time) < capture_duration):
591 frames_counter +=1
592
593 if frames_counter > self.config.frames_limit:
594 info('Frames count limit reached.')
595 break
596
597 if frames_counter % self.config.each_th_frame == 0:
598 replaced_print('Processing frame %d ...' % frames_counter)
599
600 frame_name = 'frame_%05d.png' % frames_counter
601
602 # Skip some frames at the beginning.
603 if frames_counter < self.config.frames_skip_at_start:
604 #recorder.write(frame, frame_name)
605 success, frame = reader.read()
606 continue
607
608 if frames_counter % self.config.each_th_frame != 0:
609 # Only each `self.config.each_th_frame`-th frame is processed.
610 marked_frame = self.painter.drawing_players(frame, players)
611 recorder.write(marked_frame, frame_name)
612 success, frame = reader.read()
613 continue
614
615 # Recognise players on frame.
616 players = []
617 detector_result = self.player_detector.processing(frame)
618 if self.player_label in detector_result:
619 bboxes = detector_result[self.player_label]
620 for bbox_num, bbox in enumerate(bboxes):
621 # Recognize numbers on players bboxes.
622 x, y, w, h = map(int, bbox[:4])
623 x, y = max(0, x), max(0, y)
624 player_image = frame[y:h, x:w]
625 frame_player_name = frame_name + '.' + str(bbox_num)
626 label, confidence = self.num_recognizer.recognize_numbers(player_image, frame_player_name)
627 players.append((label, [x, y, w, h], confidence))
628
629 # Process players tracking.
630 players = self.player_tracker.processing(players)
631
632 # Draw markers on frame.
633 marked_frame = self.painter.drawing_players(frame, players)
634
635 recorder.write(marked_frame, frame_name)
636
637 success, frame = reader.read()
638 print('Total frames: ', frames_counter)
639 # def processing
640# class Marker
641
642
643
644
645# *****************************************************************
646# *************************** Example 3 ***************************
647# *****************************************************************
648def process_files_spark(
649 input_file
650 , ref_file
651 , files
652 , input_folder
653 , output_folder
654 , spark_context
655 , zone_weights, zone_palette
656 , <DELETED>=30, learning_rate=0.01, iterations=2000
657 , termination_criteria=0.05
658 , debug_<DELETED>_process=False, <DELETED>=<DELETED>
659):
660 """
661 Process <DELETED> algorithm for all <DELETED> files (with Spark).
662 """
663
664 num_threads = 1 # Mutithreading disabled on Spark.
665
666 # Prepare <DELETED>, zone colors, weights, etc.
667 lptsc, rx, ry, rz, tx, ty, tz, wa, <DELETED> = process_files_setups(
668 input_file, ref_file, files, output_folder
669 , zone_weights, zone_palette
670 , logger, num_threads
671 )
672
673 logger.info(f"Preparing list of input arguments...")
674 logger.info(f"files: {files}")
675 logger.info(f"input_folder: {input_folder}")
676 logger.info(f"output_folder: {output_folder}")
677
678 process_file_args = [
679 (
680 os.path.join(input_folder, _f) # Input filename.
681 , os.path.join( # Output filename.
682 output_folder,
683 filename_to_<DELETED>_name(_f))
684 , idx
685 )
686 for idx, _f in enumerate(files)
687 ]
688 logger.info(f"RDD args: {process_file_args}")
689
690 # Save all input and output files to separate dataframe for later use.
691 <DELETED>_files = [dict(input_file=v[0], output_file=v[1]) for v in process_file_args]
692
693 logger.info(f"Converting list of input arguments to Spark RDD dataset...")
694 rdd_process_file_args = spark_context.parallelize(list(process_file_args), len(process_file_args))
695
696 logger.info(f"Applying `process_file_spark_wrapper` function to RDD input arguments list...")
697 result = rdd_process_file_args.map(lambda rdd_args: <DELETED>_worker(
698 rdd_args[0] # input filename
699 , rdd_args[1] # output filename
700 , lptsc, rx, ry, rz, tx, ty, tz, wa
701 , rdd_args[2] # idx
702 , <DELETED>, learning_rate, iterations
703 , termination_criteria, debug_<DELETED>_process, <DELETED>=<DELETED>
704 ))
705
706 logger.info(f"Collecting all Spark jobs results...")
707 result_filenames = result.collect()
708 logger.info(f"All Spark jobs are completed.")
709
710 logger.info(f"Result files of Spark <DELETED> are:")
711 logger.info(result_filenames)
712
713 return <DELETED>, <DELETED>_files
714