· 7 years ago · Oct 18, 2018, 09:26 AM
1import psycopg2
2
3# --- SETUP ---
4# for printing some messages in the console
5DEBUG = True
6
7# The only things that need to be filled in to run the script (+ line 102: tables, will change that later):
8# dbname, user, password
9# and
10# year, old data filename, category, empty string for later
11input_for_script = (2016, 'ongeval.csv', 'ongevallen', '')
12
13# Connect to an existing database
14conn = psycopg2.connect("dbname=rws_2011_2017 user=postgres password=Prandall19s!")
15
16# --- EXECUTION ---
17# This code adds a new year of RWS data to the old dataset.
18# Requirements: 1. all tables are located in the same schema (old years, new year to add, reference tables)
19# 2. the table will only join the attributes from the old table
20# 3. the names of the reference tables end in '.txt.csv' */
21
22# Open a cursor to perform database operations
23cur = conn.cursor()
24
25# Creates settings table. input it takes:
26# the year (to add),
27# the name of the old table (containing the previous years)
28# and category (partij/ongeval/slachtoffer/voertuig/etcetera)
29cur.execute("""
30DROP TABLE IF EXISTS _rws_settings;
31CREATE TABLE _rws_settings (
32 year_to_add INT,
33 old_data text,
34 category text,
35 year_filename text);
36""")
37
38cur.execute("""
39INSERT INTO _rws_settings
40 (year_to_add, old_data, category, year_filename)
41VALUES
42 (%s, %s, %s, %s);
43 """,
44 input_for_script)
45
46cur.execute("""
47UPDATE _rws_settings
48SET year_filename = CONCAT(_rws_settings.category, '_', _rws_settings.year_to_add, '.csv');
49""")
50
51
52# Creates lookup table _rws_datatypes, which states the column names and datatypes of old_data from _rws_settings
53cur.execute("""
54DROP TABLE IF EXISTS _rws_datatypes;
55CREATE TABLE _rws_datatypes AS
56SELECT column_name, data_type FROM INFORMATION_SCHEMA.columns
57WHERE table_name = (SELECT old_data FROM _rws_settings);
58""")
59
60# Creates lookup table _rws_references: It takes the columns ending in '_ID' or '_CODE' and the name of the reference table.
61# With this, it'll be possible to look up whether columns in the new table (year to add) have to be transformed into '_OMS'.
62cur.execute("""
63DROP TABLE IF EXISTS _rws_references;
64CREATE TABLE _rws_references AS
65SELECT column_name, table_name FROM INFORMATION_SCHEMA.columns
66WHERE table_name LIKE '%.txt.csv'
67AND (column_name LIKE '%\_ID' OR column_name LIKE '%\_CODE' OR column_name LIKE '%_ID_%');
68""")
69
70# Selects all columns from new table that end in '_ID' or '_CODE', and outputs it into a Python list
71cur.execute("""
72DROP TABLE IF EXISTS _rws_columns_to_convert;
73CREATE TABLE _rws_columns_to_convert AS
74SELECT column_name FROM INFORMATION_SCHEMA.columns
75WHERE table_name LIKE (SELECT _rws_settings.year_filename FROM _rws_settings)
76AND (column_name LIKE '%\_ID' OR column_name LIKE '%\_CODE' OR column_name LIKE '%_ID_%');
77""")
78
79# Makes a list on what columns need to be converted with a reference table
80cur.execute("""
81SELECT _rws_columns_to_convert."column_name" FROM _rws_columns_to_convert;
82""")
83list_to_convert = cur.fetchall()
84# removes '(' and '),'
85list_to_convert = [i[0] for i in list_to_convert]
86# List of column names that do end in _ID or _CODE, but should not be converted (checked manually)
87columns_not_to_convert = ['JTE_ID', 'WVK_ID', 'GME_ID', 'PVE_CODE', 'WSE_ID']
88
89# Loop to check if the name should not have a conversion to _OMS, and remove them from the conversion list that will be iterated through (not the table in pgAdmin)
90for name in columns_not_to_convert:
91 if name in list_to_convert:
92 list_to_convert.remove(name)
93print(list_to_convert)
94
95# Query for the loop, selects the correct reference table based on the iteration
96get_table_name_query = """
97SELECT table_name from _rws_references WHERE column_name = '{0}'
98"""
99
100# List of tables that have ID/CODE to convert to OMS ["ongevallen_2016.csv", "ongevallen_2017.csv"]
101# create a temporary data table and fill it with the old data
102tables = ["ongevallen_2016.csv"]
103all_data = cur.execute("""
104DROP TABLE IF EXISTS _rws_all_temp;
105CREATE TABLE _rws_all_temp AS SELECT * FROM (SELECT _rws_settings.old_data FROM _rws_settings) AS derived_temp
106""")
107
108# VKL_NUMMER <--- ID for rows
109# Function to print some messages for easy debug
110def run_query(query):
111 if(DEBUG):
112 print(query)
113 cur.execute(query)
114
115# Function to make a copy of the table to edit in
116def copy_table(table_name):
117 copy_table_query = """
118 DROP TABLE IF EXISTS "{0}_copy";
119 CREATE TABLE "{0}_copy" AS
120 SELECT * FROM "{0}"
121 """.format(table_name)
122
123 run_query(copy_table_query)
124
125# Function to add a column to a table
126def add_column(table_name, column_name, datatype):
127 add_column_query = """
128 ALTER TABLE "{0}"
129 ADD COLUMN "{1}" {2};
130 """.format(table_name, column_name, datatype)
131
132 run_query(add_column_query)
133
134# Runs the function to copy the table mentioned in the variable tables (list)
135for table in tables:
136 copy_table(table)
137
138
139# Function that converts _ID or _CODE into _OMS
140# 0= data table (2016 for example) - table_to_convert
141# 1= the reference table
142# 2=item name, iterable (_ID or _CODE)
143# 3=item name (but the _OMS version, needs function turn_id_or_code_into_oms)
144def run_queries(table_to_convert, from_table, item_name, item_oms):
145
146 if 'BZD_' in item_name:
147 newname = item_name.replace("_ID", "")
148 newname = newname + str("_OMS")
149
150 add_column(table_to_convert, newname, "VARCHAR")
151
152 grab_oms_query = """
153 DROP TABLE IF EXISTS "{3}_temp";
154 CREATE TABLE "{3}_temp" AS
155 SELECT "{0}"."VKL_NUMMER", "{0}"."{2}", oms_table."BZD_ID"
156 FROM "{0}"
157 LEFT JOIN "{1}" as oms_table
158 ON CAST("{0}"."{2}" AS VARCHAR) = CAST(oms_table."BZD_ID" AS VARCHAR);
159 ALTER TABLE "{3}_temp" RENAME COLUMN "BZD_ID" TO "{3}";
160 """.format(table_to_convert, from_table, item_name, newname)
161
162 fill_oms_column_query = """
163 UPDATE "{0}"
164 SET "{1}" = "{1}_temp"."{1}"
165 FROM "{1}_temp"
166 WHERE "{0}"."VKL_NUMMER" = "{1}_temp"."VKL_NUMMER"
167 """.format(table_to_convert, newname)
168
169 drop_temp_table_query = """
170 DROP TABLE IF EXISTS "{0}";
171 """.format(newname + "_temp")
172 else:
173 add_column(table_to_convert, item_oms, "VARCHAR")
174
175 grab_oms_query = """
176 DROP TABLE IF EXISTS "{3}_temp";
177 CREATE TABLE "{3}_temp" AS
178 SELECT "{0}"."VKL_NUMMER", "{0}"."{2}", oms_table."{3}"
179 FROM "{0}"
180 LEFT JOIN "{1}" as oms_table
181 ON CAST("{0}"."{2}" AS VARCHAR) = CAST(oms_table."{2}" AS VARCHAR);
182 """.format(table_to_convert, from_table, item_name, item_oms)
183
184 fill_oms_column_query = """
185 UPDATE "{0}"
186 SET "{1}" = "{1}_temp"."{1}"
187 FROM "{1}_temp"
188 WHERE "{0}"."VKL_NUMMER" = "{1}_temp"."VKL_NUMMER"
189 """.format(table_to_convert, item_oms)
190
191 drop_temp_table_query = """
192 DROP TABLE IF EXISTS "{0}";
193 """.format(item_oms + "_temp")
194
195 run_query(grab_oms_query)
196 run_query(fill_oms_column_query)
197 run_query(drop_temp_table_query)
198
199
200# changes extension _ID and _CODE into _OMS (for easy lookup in reference table)
201def turn_id_or_code_into_oms(name):
202 if "_CODE" in name:
203 if name == "DAG_CODE" or name == "PVE_CODE":
204 return name.replace("_CODE", "_NAAM")
205 else:
206 return name.replace("_CODE", "_OMS")
207 #writing for BZD_ID_IF1 -> BZD_IF1_OMS
208 elif name =="BZD_ID_%":
209 #newname = name.replace("_ID", "")
210 #newname = newname + str("_OMS")
211 newname = "BZD_OMS"
212 return newname
213 else:
214 return name.replace("_ID", "_OMS")
215
216# Get list with column names from original datafile
217cur.execute("""
218SELECT _rws_datatypes."column_name" FROM _rws_datatypes;
219""")
220columns_for_union = cur.fetchall()
221# Removes '(' and '),'
222columns_for_union = [i[0] for i in columns_for_union]
223
224# Loop through all the tables and convertable items, find all reference tables and start function run_queries
225for table in tables:
226 table_copy = table + "_copy"
227 for item in list_to_convert:
228 print("PROCESSING: " + table + " - item: " + item)
229 if 'BZD_' in item:
230 item_id = 'BZD_ID'
231 from_table = cur.execute(get_table_name_query.format(item_id))
232 from_table = cur.fetchone()[0] #ERROR: Nonetype object is not subscriptable
233 else:
234 cur.execute(get_table_name_query.format(item))
235 from_table = cur.fetchone()[0]
236
237 run_queries(table_copy, from_table, item, turn_id_or_code_into_oms(item))
238
239 # Get list with column names from new year datafile
240 columns_in_new_data = cur.execute("""
241 SELECT column_name FROM INFORMATION_SCHEMA.columns
242 WHERE table_name = '{0}';
243 """.format(table_copy))
244 # Removes '(' and '),'
245 columns_in_new_data = cur.fetchall()
246 columns_in_new_data = [i[0] for i in columns_in_new_data]
247 print('2016:', columns_in_new_data)
248
249 for column_name in columns_for_union:
250 # If column exists in original datafile, but not in new datafile: add column with correct datatype
251 if column_name not in columns_in_new_data:
252 datatype_column = cur.execute("""SELECT data_type FROM _rws_datatypes WHERE column_name = '{0}'""".format(column_name))
253 datatype_column = cur.fetchone()
254 datatype_column = datatype_column[0]
255 print(datatype_column)
256 cur.execute("""
257 ALTER TABLE "{0}"
258 ADD COLUMN "{1}" {2};
259 """.format(table_copy, column_name, datatype_column))
260# If column name exists in new datafile, but not in original datafile: drop column
261 for column_name in columns_in_new_data:
262 if column_name not in columns_for_union:
263 cur.execute("""
264 ALTER TABLE "{0}"
265 DROP COLUMN "{1}";
266 """.format(table_copy, column_name))
267
268 # get all column names
269 cur.execute("""SELECT column_name FROM _rws_datatypes""")
270 all_columns = cur.fetchall()
271 all_columns = [i[0] for i in all_columns]
272
273 # Check if column names are same datatype (old and new dataset)
274 for column_name in all_columns:
275 wanted_datatype = cur.execute("""SELECT data_type FROM _rws_datatypes WHERE column_name = '{0}'""".format(column_name))
276 wanted_datatype = cur.fetchone()
277 wanted_datatype = wanted_datatype[0]
278 print("column name: ", column_name, "datatype: ", wanted_datatype)
279 current_datatype = cur.execute("""SELECT data_type FROM INFORMATION_SCHEMA.columns WHERE table_name = '{0}' AND column_name = '{1}';""".format(table_copy, column_name))
280 current_datatype = cur.fetchone()
281 current_datatype = current_datatype[0]
282 print('current: ', current_datatype, 'wanted: ', wanted_datatype)
283
284 # Changes a varchar column to the correct datatype and adds NULLS to empty values
285 if current_datatype != wanted_datatype and current_datatype == 'character varying':
286 cur.execute("""UPDATE "{0}" SET "{1}" = NULL WHERE "{1}" = '';""".format(table_copy, column_name))
287 cur.execute("""
288 ALTER TABLE "{0}"
289 ALTER COLUMN "{1}" TYPE {2} USING "{1}"::{2};
290 """.format(table_copy, column_name, wanted_datatype))
291 # changes a non varchar column to the correct datatype - no NULLS changed
292 elif current_datatype != wanted_datatype and current_datatype != 'character varying':
293 cur.execute("""
294 ALTER TABLE "{0}"
295 ALTER COLUMN "{1}" TYPE {2} USING "{1}"::{2};
296 """.format(table_copy, column_name, wanted_datatype))
297
298#Union goes wrong because it sees a column with Null timestamps as varchar (even though it doesn't show..)
299#If you add all column names manually instead of putting a star (*), it's all good
300 # Union old and new table, still need it to work with multiple years
301 def concat_column_names(column_names):
302 all_column_names = ""
303 for i in range(len(column_names) - 1):
304 all_column_names += ("\"" + column_names[i] + "\"" + ", ")
305
306 all_column_names += ("\"" + column_names[len(column_names) - 1] + "\"")
307 return all_column_names
308
309 union_query = ("""DROP TABLE IF EXISTS _all_data; CREATE TABLE _all_data AS SELECT {0} FROM "{1}" UNION SELECT {0} FROM "{2}";""".format(concat_column_names(all_columns), input_for_script[1], table_copy))
310 cur.execute(union_query)
311
312
313# --- TO DO
314# BZD_ID_IF1 -> BZD_IF1_OMS inbouwen
315# union old and new data: make sure that it can add multiple years
316# create linking tables
317# make it work with partij
318# make it work with voertuig
319# make it work with slachtoffer
320# create index? aliases?
321
322# new scripts - restructure for better readability
323# 1. functions and queries
324# 2. find and convert all txt to csv (reference tables) (input: main folder and year(s))
325# 3. prepare datasets rws specific (coordinate system mainly) (input: main folder and year(s))
326# 4. import all data found in the folders to pgadmin (input: main folder, old data table(s) and year(s))
327# 5. add new year's data ongeval/partij/voertuig/slachtoffer
328# 6. make linking tables
329
330
331# --- FINISH UP RUNNING THE SCRIPT
332# Make the changes to the database persistent
333conn.commit()
334
335# Close communication with the database
336cur.close()
337conn.close()