· 7 years ago · Oct 25, 2018, 07:52 AM
1# This code adds a new year of RWS data to the old dataset.
2# --- IMPORT ---
3import psycopg2
4
5
6# --- FUNCTIONS ---
7# VKL_NUMMER <--- ID for rows
8# Function to print some messages for easy debug
9def run_query(query):
10 if(DEBUG):
11 print(query)
12 cur.execute(query)
13
14# Function to make a copy of the table to edit in
15def copy_table(table_name):
16 copy_table_query = """
17 DROP TABLE IF EXISTS "{0}_copy";
18 CREATE TABLE "{0}_copy" AS
19 SELECT * FROM "{0}"
20 """.format(table_name)
21
22 run_query(copy_table_query)
23
24# Function to add a column to a table
25def add_column(table_name, column_name, datatype):
26 add_column_query = """
27 ALTER TABLE "{0}"
28 ADD COLUMN "{1}" {2};
29 """.format(table_name, column_name, datatype)
30
31 run_query(add_column_query)
32
33# Function that converts _ID or _CODE into _OMS
34# 0= data table (2016 for example) - table_to_convert
35# 1= the reference table
36# 2=item name, iterable (_ID or _CODE)
37# 3=item name (but the _OMS version, needs function turn_id_or_code_into_oms)
38#HARDCODED: VKL_NUMMER (PTJ_ID for partij, SOR_ID for slachtoffer, PTJ_ID for voertuig)
39#IDEA slachtoffer en voertuig: since the original data is here, maybe I should segment it in years and then put it on Ixiwa, solves the conversion problem and such
40def run_queries(table_to_convert, from_table, item_name, item_oms):
41 special_columns = ['BZD_','AGT_','BWG_','TDT_']
42
43 if any(special_item in item_name for special_item in special_columns):
44 print(item_name, ' is in special columns: TRUE')
45 newcolname = item_name.replace("_ID", "")
46 newcolname = newcolname + str("_OMS")
47 newname = turn_id_or_code_into_oms(item)
48
49 add_column(table_to_convert, newcolname, "VARCHAR")
50
51 grab_oms_query = """
52 DROP TABLE IF EXISTS "{3}_temp";
53 CREATE TABLE "{3}_temp" AS
54 SELECT "{0}"."{4}", "{0}"."{5}", oms_table."{3}"
55 FROM "{0}"
56 LEFT JOIN "{1}" as oms_table
57 ON CAST("{0}"."{2}" AS VARCHAR) = CAST(oms_table."{3}" AS VARCHAR);
58 ALTER TABLE "{3}_temp" RENAME COLUMN "{3}" TO "{5}";
59 """.format(table_to_convert, from_table, item_name, newname, link_id, newcolname)
60
61 fill_oms_column_query = """
62 UPDATE "{0}"
63 SET "{1}" = "{1}_temp"."{1}"
64 FROM "{1}_temp"
65 WHERE "{0}"."{2}" = "{1}_temp"."{2}"
66 """.format(table_to_convert, newname, link_id)
67
68 drop_temp_table_query = """
69 DROP TABLE IF EXISTS "{0}";
70 """.format(newname + "_temp")
71
72 #run_query(grab_oms_query)
73 #run_query(fill_oms_column_query)
74 #run_query(drop_temp_table_query)
75 #break
76
77 else:
78 print(item_name, ' is NOT in special columns: FALSE')
79 add_column(table_to_convert, item_oms, "VARCHAR")
80
81 grab_oms_query = """
82 DROP TABLE IF EXISTS "{3}_temp";
83 CREATE TABLE "{3}_temp" AS
84 SELECT "{0}"."{4}", "{0}"."{2}", oms_table."{3}"
85 FROM "{0}"
86 LEFT JOIN "{1}" as oms_table
87 ON CAST("{0}"."{2}" AS VARCHAR) = CAST(oms_table."{2}" AS VARCHAR);
88 """.format(table_to_convert, from_table, item_name, item_oms, link_id)
89
90 fill_oms_column_query = """
91 UPDATE "{0}"
92 SET "{1}" = "{1}_temp"."{1}"
93 FROM "{1}_temp"
94 WHERE "{0}"."{2}" = "{1}_temp"."{2}"
95 """.format(table_to_convert, item_oms, link_id)
96
97 drop_temp_table_query = """
98 DROP TABLE IF EXISTS "{0}";
99 """.format(item_oms + "_temp")
100
101 run_query(grab_oms_query)
102 run_query(fill_oms_column_query)
103 run_query(drop_temp_table_query)
104 #break
105
106# changes extension _ID and _CODE into _OMS (for easy lookup in reference table)
107def turn_id_or_code_into_oms(name):
108 if "_CODE" in name:
109 if name == "DAG_CODE" or name == "PVE_CODE":
110 return name.replace("_CODE", "_NAAM")
111 else:
112 return name.replace("_CODE", "_OMS")
113 #writing for BZD_ID_IF1 -> BZD_IF1_OMS
114 elif "BZD_ID_" in name:
115 #newname = name.replace("_ID", "")
116 #newname = newname + str("_OMS")
117 newname = "BZD_OMS"
118 return newname
119 elif "AGT_ID_" in name:
120 newname = "AGT_OMS"
121 return newname
122 elif "BWG_ID_" in name:
123 newname = "BWG_OMS"
124 return newname
125 elif "TDT_ID_" in name:
126 newname = "TDT_OMS"
127 return newname
128 else:
129 return name.replace("_ID", "_OMS")
130
131# Concat column names for create_union_tables_query
132def concat_column_names(column_names):
133 all_column_names = ""
134 for i in range(len(column_names) - 1):
135 all_column_names += ("\"" + column_names[i] + "\"" + ", ")
136
137 all_column_names += ("\"" + column_names[len(column_names) - 1] + "\"")
138 return all_column_names
139
140# create union query
141def create_union_tables_query(tables):
142 start = 'DROP TABLE IF EXISTS _all_data; CREATE TABLE _all_data AS SELECT {0} FROM "{1}" '.format(concat_column_names(all_columns), input_for_script[0] + '_copy')
143 #union_query = 'UNION SELECT * FROM "{0}" '.format(tables[0])
144 union_query = ''
145 for i in tables:
146 union_query += 'UNION SELECT {0} FROM "{1}" '.format(concat_column_names(all_columns), i)
147 union_query = start + union_query
148 print(union_query)
149 return union_query
150
151
152# --- QUERIES ---
153#space for queries, will restructure it later
154# Query for the loop, selects the correct reference table based on the iteration
155get_table_name_query = """
156SELECT table_name from _rws_references WHERE column_name = '{0}'
157"""
158
159# --- SETUP ---
160# for printing some messages in the console
161DEBUG = True
162
163#HARDCODED
164input_for_script = ('partij.csv', 'partij')
165if input_for_script[1] == 'partij':
166 link_id = 'PTJ_ID'
167elif input_for_script[1] == 'ongeval':
168 link_id = 'VKL_NUMMER'
169
170# The only things that need to be filled in to run the script is the variables 'tables' and 'input_for_script':
171# List of tables that have ID/CODE to convert to OMS ["ongevallen_2016.csv", "ongevallen_2017.csv"]
172tables = ["partij_2016.csv"]
173tables_copy = []
174for table in tables:
175 tables_copy.append(table + '_copy')
176 print('tables copy: ', tables_copy)
177# dbname, user, password and year, old data filename, category, empty string for later
178
179# Connect to an existing database
180conn = psycopg2.connect("dbname=rws_2011_2017 user=postgres password=Prandall19s!")
181
182# Open a cursor to perform database operations
183cur = conn.cursor()
184
185copy_table(input_for_script[0])
186
187
188#IDEA old table TIJDSTIP = (H)HMM, new table it's UUR = (H)H
189#ADD COLUMN "TIJDSTIP_MODULO" integer
190#SET "TIJDSTIP_MODULO" = (("TIJDSTIP"%100)/60.0+"TIJDSTIP"/100);
191
192
193
194# --- EXECUTION ---
195# Requirements: 1. all tables are located in the same schema (old years, new year to add, reference tables)
196# 2. the table will only join the attributes from the old table
197# 3. the names of the reference tables end in '.txt.csv' */
198
199# Creates settings table. input it takes:
200# the year (to add),
201# the name of the old table (containing the previous years)
202# and category (partij/ongeval/slachtoffer/voertuig/etcetera)
203cur.execute("""
204DROP TABLE IF EXISTS _rws_settings;
205CREATE TABLE _rws_settings (
206 old_data text,
207 category text);
208""")
209 #year_to_add INT,
210
211cur.execute("""
212INSERT INTO _rws_settings
213 (old_data, category)
214VALUES
215 (%s, %s);
216 """,
217 input_for_script)
218
219# Creates lookup table _rws_datatypes, which states the column names and datatypes of old_data from _rws_settings
220cur.execute("""
221DROP TABLE IF EXISTS _rws_datatypes;
222CREATE TABLE _rws_datatypes AS
223SELECT column_name, data_type FROM INFORMATION_SCHEMA.columns
224WHERE table_name = '{0}';
225""".format(input_for_script[0] + '_copy'))
226
227if input_for_script[0] == 'ongeval.csv':
228 cur.execute("""INSERT INTO "_rws_datatypes" VALUES ('DAGTYPE', 'varchar');""")
229
230# Creates lookup table _rws_references: It takes the columns ending in '_ID' or '_CODE' and the name of the reference table.
231# With this, it'll be possible to look up whether columns in the new table (year to add) have to be transformed into '_OMS'.
232cur.execute("""
233DROP TABLE IF EXISTS _rws_references;
234CREATE TABLE _rws_references AS
235SELECT column_name, table_name FROM INFORMATION_SCHEMA.columns
236WHERE table_name LIKE '%.txt.csv'
237AND (column_name LIKE '%\_ID' OR column_name LIKE '%\_CODE' OR column_name LIKE '%_ID_%');
238""")
239
240
241# Loop through all the tables and convertable items, find all reference tables and start function run_queries
242 # Runs the function to copy the table mentioned in the variable tables (list)
243for table in tables:
244 copy_table(table)
245
246 # Selects all columns from new table that end in '_ID' or '_CODE', and outputs it into a Python list
247 cur.execute("""
248 DROP TABLE IF EXISTS _rws_columns_to_convert;
249 CREATE TABLE _rws_columns_to_convert AS
250 SELECT column_name FROM INFORMATION_SCHEMA.columns
251 WHERE table_name LIKE (SELECT '{0}' FROM _rws_settings)
252 AND (column_name LIKE '%\_ID' OR column_name LIKE '%\_CODE' OR column_name LIKE '%_ID_%');
253 """.format(table))
254
255 # Makes a list on what columns need to be converted with a reference table
256 cur.execute("""
257 SELECT _rws_columns_to_convert."column_name" FROM _rws_columns_to_convert;
258 """)
259 list_to_convert = cur.fetchall()
260 # removes '(' and '),'
261 list_to_convert = [i[0] for i in list_to_convert]
262 # HARDCODED List of column names that do end in _ID or _CODE, but should not be converted (checked manually)
263 #everything including and after AGT_ID_1 needs conversion 'AGT_ID_1', 'AGT_ID_2', 'BWG_ID_1', 'BWG_ID_2', 'TDT_ID_1', 'TDT_ID_2', 'TDT_ID_3'
264 columns_not_to_convert = ['JTE_ID', 'WVK_ID', 'GME_ID', 'PVE_CODE', 'WSE_ID', 'PTJ_ID']
265
266 # Loop to check if the name should not have a conversion to _OMS, and remove them from the conversion list that will be iterated through (not the table in pgAdmin)
267 for name in columns_not_to_convert:
268 if name in list_to_convert:
269 list_to_convert.remove(name)
270 print(list_to_convert)
271
272 table_copy = table + "_copy"
273 for item in list_to_convert:
274 print("PROCESSING: " + table + " - item: " + item)
275 if 'BZD_' in item:
276 item_id = 'BZD_ID'
277 from_table = cur.execute(get_table_name_query.format(item_id))
278 from_table = cur.fetchone()[0] #ERROR: Nonetype object is not subscriptable
279 elif 'AGT_' in item:
280 item_id = 'AGT_ID'
281 from_table = cur.execute(get_table_name_query.format(item_id))
282 from_table = cur.fetchone()[0] #ERROR: Nonetype object is not subscriptable
283 elif 'BWG_' in item:
284 item_id = 'BWG_ID'
285 from_table = cur.execute(get_table_name_query.format(item_id))
286 from_table = cur.fetchone()[0] #ERROR: Nonetype object is not subscriptable
287 elif 'TDT_' in item:
288 item_id = 'TDT_ID'
289 from_table = cur.execute(get_table_name_query.format(item_id))
290 from_table = cur.fetchone()[0] #ERROR: Nonetype object is not subscriptable
291 else:
292 cur.execute(get_table_name_query.format(item))
293 from_table = cur.fetchone()[0]
294
295 run_queries(table_copy, from_table, item, turn_id_or_code_into_oms(item))
296
297 #HARDCODED Adding dagtype to older data (will have to rewrite for Ixiwa, but this'll do for now)
298 if input_for_script[0] == "ongeval.csv":
299 print('input for script is ongeval.csv')
300 cur.execute("""ALTER TABLE "ongeval.csv_copy" ADD COLUMN "DAGTYPE" varchar;""")
301 cur.execute("""UPDATE "ongeval.csv_copy"
302 SET "DAGTYPE" = 'MA-VR' WHERE ("DAG_CODE" = 'MA' OR "DAG_CODE" = 'DI' OR "DAG_CODE" = 'WO' OR "DAG_CODE" = 'DO' OR "DAG_CODE" = 'VR');
303 UPDATE "ongeval.csv_copy"
304 SET "DAGTYPE" = 'ZA-ZO' WHERE ("DAG_CODE" = 'ZA' OR "DAG_CODE" = 'ZO');""")
305
306
307 # Get list with column names from new year datafile
308 columns_in_new_data = cur.execute("""
309 SELECT column_name FROM INFORMATION_SCHEMA.columns
310 WHERE table_name = '{0}';
311 """.format(table_copy))
312 # Removes '(' and '),'
313 columns_in_new_data = cur.fetchall()
314 columns_in_new_data = [i[0] for i in columns_in_new_data]
315
316 # Get list with column names from original datafile
317 cur.execute("""
318 SELECT _rws_datatypes."column_name" FROM _rws_datatypes;
319 """)
320 columns_for_union = cur.fetchall()
321 # Removes '(' and '),'
322 columns_for_union = [i[0] for i in columns_for_union]
323
324 print('old: ', columns_for_union)
325 print('2016:', columns_in_new_data)
326
327 for column_name in columns_for_union:
328 # If column exists in original datafile, but not in new datafile: add column with correct datatype
329 if column_name not in columns_in_new_data:
330 datatype_column = cur.execute("""SELECT data_type FROM _rws_datatypes WHERE column_name = '{0}'""".format(column_name))
331 datatype_column = cur.fetchone()
332 datatype_column = datatype_column[0]
333 print(datatype_column)
334 cur.execute("""
335 ALTER TABLE "{0}"
336 ADD COLUMN "{1}" {2};
337 """.format(table_copy, column_name, datatype_column))
338# If column name exists in new datafile, but not in original datafile: drop column
339 for column_name in columns_in_new_data:
340 if column_name not in columns_for_union:
341 cur.execute("""
342 ALTER TABLE "{0}"
343 DROP COLUMN "{1}";
344 """.format(table_copy, column_name))
345
346
347
348
349 # get all column names
350 cur.execute("""SELECT column_name FROM _rws_datatypes""")
351 all_columns = cur.fetchall()
352 all_columns = [i[0] for i in all_columns]
353
354 # Check if column names are same datatype (old and new dataset)
355 for column_name in all_columns:
356 wanted_datatype = cur.execute("""SELECT data_type FROM _rws_datatypes WHERE column_name = '{0}'""".format(column_name))
357 wanted_datatype = cur.fetchone()
358 wanted_datatype = wanted_datatype[0]
359 print("column name: ", column_name, "datatype: ", wanted_datatype)
360 current_datatype = cur.execute("""SELECT data_type FROM INFORMATION_SCHEMA.columns WHERE table_name = '{0}' AND column_name = '{1}';""".format(table_copy, column_name))
361 current_datatype = cur.fetchone()
362 current_datatype = current_datatype[0]
363 print('current: ', current_datatype, 'wanted: ', wanted_datatype)
364
365 # Changes a varchar column to the correct datatype and adds NULLS to empty values
366 if current_datatype != wanted_datatype and current_datatype == 'character varying':
367 cur.execute("""UPDATE "{0}" SET "{1}" = NULL WHERE "{1}" = '';""".format(table_copy, column_name))
368 cur.execute("""
369 ALTER TABLE "{0}"
370 ALTER COLUMN "{1}" TYPE {2} USING "{1}"::{2};
371 """.format(table_copy, column_name, wanted_datatype))
372 # changes a non varchar column to the correct datatype - no NULLS changed
373 elif current_datatype != wanted_datatype and current_datatype != 'character varying':
374 cur.execute("""
375 ALTER TABLE "{0}"
376 ALTER COLUMN "{1}" TYPE {2} USING "{1}"::{2};
377 """.format(table_copy, column_name, wanted_datatype))
378
379 #ADD TIJDSTIP_MODULE (for nice graph on zonnestand)
380 #cur.execute("""UPDATE "{0}" SET "TIJDSTIP_MODULO" = ({"TIJDSTIP"}%100)/60.0+{"TIJDSTIP"}/100;""".format(table))
381
382#Union goes wrong because it sees a column with Null timestamps as varchar (even though it doesn't show..)
383#If you add all column names manually instead of putting a star (*), it's all good
384#Run union query
385#cur.execute("""{0}""".format(create_union_tables_query(tables_copy)))
386
387# --- TO DO
388#-MELDDATAPK, VERVDATAPK, GEBDAT, DATOVERL to_date
389#- """({ongeval.TIJDSTIP}%100)/60.0+{ongeval.TIJDSTIP}/100""" DOES NOT WORK! OLD TABLE uses TIJDSTIP (H)HMM, new uses UUR (H)H
390# New script: create linking tables
391# make it work with partij
392# make it work with voertuig
393# make it work with slachtoffer
394# create index? aliases?
395#rewrite for use with Mozart
396
397
398# TO DO in new scripts
399#-ADD DAGTYPE TO YEAR < 2016
400# new scripts - restructure for better readability
401# 1. functions and queries
402# 2. find and convert all txt to csv (reference tables) (input: main folder and year(s))
403# 3. prepare datasets rws specific (coordinate system mainly) (input: main folder and year(s)) <-- maybe put this in 5.
404# 4. import all data found in the folders to pgadmin (input: main folder, old data table(s) and year(s))
405# 5. add new year's data ongeval/partij/voertuig/slachtoffer (also: convert coordinates, add DAGTYPE to original dataset)
406# 6. make linking tables
407
408
409# --- FINISH UP RUNNING THE SCRIPT
410# Make the changes to the database persistent
411conn.commit()
412
413# Close communication with the database
414cur.close()
415conn.close()