· 6 years ago · Jun 25, 2019, 12:00 AM
1# Load and parse CSV logs from PostgreSQL
2
3import os
4import re
5import sys
6
7import postgres_copy
8import six
9import sqlalchemy
10
11CREATE_LOG_TABLE = """
12 CREATE TABLE if not exists postgres_log
13 (
14 log_time timestamp(3) with time zone,
15 user_name text,
16 database_name text,
17 process_id integer,
18 connection_from text,
19 session_id text,
20 session_line_num bigint,
21 command_tag text,
22 session_start_time timestamp with time zone,
23 virtual_transaction_id text,
24 transaction_id bigint,
25 error_severity text,
26 sql_state_code text,
27 message text,
28 detail text,
29 hint text,
30 internal_query text,
31 internal_query_pos integer,
32 context text,
33 query text,
34 query_pos integer,
35 location text,
36 application_name text,
37 PRIMARY KEY (session_id, session_line_num)
38 );
39"""
40
41CREATE_PARSED_TABLE = """
42create table if not exists pg_log_parsed(
43 session_id text,
44 session_line_num bigint,
45 log_time timestamp(3) with time zone,
46 database_name text,
47 duration float,
48 query text,
49 params text,
50 PRIMARY KEY (session_id, session_line_num)
51);
52"""
53
54
55def _create_tables(engine):
56 with engine.being() as conn:
57 conn.execute(CREATE_LOG_TABLE)
58 conn.execute(CREATE_PARSED_TABLE)
59
60
61def _insert_raw_logs(log_path, engine):
62 metadata = sqlalchemy.MetaData(bind=engine)
63 metadata.reflect(bind=engine, extend_existing=True)
64
65 table = metadata.tables['postgres_log']
66 with open(log_path, 'r') as f:
67 postgres_copy.copy_from(
68 f, table, engine, format='csv' if six.PY3 else b'csv',
69 null='' if six.PY3 else b''
70 )
71
72
73def parse_logs(engine):
74 query_rx = re.compile(r'duration: ([\d.]+).*(?:execute [\w<>]*|statement): (.*)')
75 params_rx = re.compile(r'parameters: (.*)')
76
77 with engine.begin() as conn:
78 res = conn.execute(
79 "select session_id, session_line_num, log_time, database_name, message, detail"
80 " from postgres_log where command_tag in ('SELECT', 'COPY')"
81 )
82 for row in res:
83 duration, query, params = None, None, None
84 match = query_rx.match(row.message)
85 if match:
86 duration, query = match.groups()
87 if row.detail:
88 match = params_rx.match(row.detail)
89 if match:
90 params = match.groups()[0]
91
92 conn.execute('insert into pg_log_parsed values(%s, %s, %s, %s, %s, %s, %s)', [
93 row.session_id,
94 row.session_line_num,
95 row.log_time,
96 row.database_name,
97 float(duration) if duration else None,
98 query,
99 params
100 ])
101
102
103if __name__ == '__main__':
104
105 args = sys.argv[1:]
106 if len(args) != 2:
107 print('Usage: pg_log_loader [path to log dir] [db connection url]')
108
109 log_path, db_url = args
110 engine = sqlalchemy.create_engine(db_url)
111
112 for file in os.listdir(log_path):
113 fpath = os.path.join(log_path, file)
114 if os.path.isfile(fpath) and file.endswith('.csv'):
115 _insert_raw_logs(fpath, engine)
116
117 parse_logs(engine)