· 6 years ago · Oct 10, 2019, 09:38 PM
1import pyodbc, gzip, shutil, datetime, pandas as pd, csv, numpy as np, os, sys, traceback
2from datetime import date, timedelta
3from flask import render_template
4
5
6#connect to teradata use ab1234 user and pass to connect to db as that user has create table permissions
7with open("\\\\server1\\d_dev\\py_etl\\_ab1234\\login.txt", "r") as file:
8 login_pwd = file.read()
9pwd = login_pwd
10db_td = pyodbc.connect('DRIVER={Teradata ODBC};DBCNAME=td_server;UID=ab1234;PWD=%s;QUIETMODE=YES;' % (pwd))
11cursor_td = db_td.cursor()
12cursor_td.fast_executemany = True
13
14
15def file_loader(usr_nm, file_name):
16 log = open("\\\\server1\\d_dev\\m_site\\tbl_m_log.txt","a+")
17 try:
18 # use two backslashes for every slash in your path c:\dir\ should be typed as c:\\dir\\
19 #network shares should use two slashes for each one slash so //server/dir would be ////server//dir//
20 #user will set the naming convention of your table in teradata ex: u_ab1234_01 for the
21 #first table, each table after is sequenced numerically
22
23 #set session variables
24 path = "\\\\server1\\d_dev\\m_site\\" #change this based on your local machine.
25 file = file_name #change this based on your file.
26 user = usr_nm #change this based on who you is.
27 extension = file[file.rfind('.'):]
28 file_nm = path+file
29
30 #use the right pandas reader depending on file type (excel, csv, and pipe delimited text)
31 #caveat!!! as of now you HAVE TO HAVE COLUMN NAMES>>>>
32 #column names cannot be reserved words in teradata, cannot start with integers, and cannot have spaces!
33 if extension.lower() == ".xlsx":
34 file_contents = pd.read_excel(open(file_nm, 'rb'), encoding = "latin1")
35 elif extension.lower() == ".xls":
36 file_contents = pd.read_excel(open(file_nm, 'rb'), encoding = "latin1")
37 elif extension.lower() == ".csv":
38 file_contents = pd.read_csv(open(file_nm, 'rb'), encoding = "latin1")
39 elif extension.lower() == ".txt":
40 file_contents = pd.read_csv(file_nm, sep = None, encoding = "latin1")
41
42 #these variable are capturing specific items of the pandas dataframe
43 #columns will list the columns of your dataset from the file
44 #dtypes will return the datatype of the column
45 #values will return the values stored for each row
46 ##file_contents = file_contents.drop_duplicates()
47 file_columns = file_contents.columns.tolist()
48 file_dtypes = file_contents.dtypes.tolist()
49 measurer = np.vectorize(len)
50 file_column_length = measurer(file_contents.values.astype(str)).max(axis=0)
51 file_values = file_contents.values.tolist()
52
53 #concatenate the datatypes with the columns and the lengths into a new list instead of a dataframe
54 tbl_ddl = list(zip(file_columns, file_dtypes, file_column_length))
55 #convert dataframe list values to strings in order to read list and convert
56 #datatypes from pandas dataframes to teradata compatible column types
57 new_tbl_ddl = []
58 for x in tbl_ddl:
59 newlist = (str('"' + x[0] + '"').lower(), str(x[1]).lower(), str(x[2]).lower())
60 new_tbl_ddl.append(newlist)
61
62 new_tbl_ddl_spc = [tuple(map(lambda i: str.replace(i, " ","_"), tup)) for tup in new_tbl_ddl]
63 new_tbl_ddl = new_tbl_ddl_spc
64
65 #####################################################################
66 #missing something here to correct column names. this function should take any reserved
67 #names and convert the columns to an approved column name in teradata
68 #a column named [MONTH] is reserved, this function should change the name [MONTH] to something like [mnth]
69 #this function should also remove spaces from names
70 #and if we feel like getting freaky we can attempt to rename columns to something more aligned with teradata standards in naming
71 #####################################################################
72
73 #this tidbit here will convert all of the pandas datatypes to something that teradata will
74 #understand int64 to int, object to varchar(nn) etc
75 #convert datatypes of columns in dataframe to SQL datatypes
76 fnl_tbl_ddl = []
77 for x in new_tbl_ddl:
78 if str(x[1]) == 'int64' and int(x[2]) <= 5:
79 y = "int"
80 newlist = []
81 newlist = (x[0], y)
82 fnl_tbl_ddl.append(newlist)
83 elif str(x[1]) == 'int64' and int(x[2]) > 5:
84 y = "decimal(15,0)"
85 newlist = []
86 newlist = (x[0], y)
87 fnl_tbl_ddl.append(newlist)
88 elif str(x[1]) == 'object':
89 varcharlen= (int(x[2])+round((int(x[2])*.10)))
90 y = f"varchar({varcharlen})"
91 newlist = []
92 newlist = (x[0],y)
93 fnl_tbl_ddl.append(newlist)
94 elif str(x[1]) == 'datetime64[ns]':
95 y = "timestamp"
96 newlist = []
97 newlist = (x[0],y)
98 fnl_tbl_ddl.append(newlist)
99 elif str(x[1]) == 'float64':
100 y = "float"
101 newlist = []
102 newlist = (x[0],y)
103 fnl_tbl_ddl.append(newlist)
104 elif str(x[1]) == 'bool':
105 y = "varchar(1)"
106 newlist = []
107 newlist = (x[0],y)
108 fnl_tbl_ddl.append(newlist)
109 elif str(x[1]) == 'category':
110 varcharlen= (int(x[2])+round((int(x[2])*.10)))
111 y = f"varchar({varcharlen})"
112 newlist = []
113 newlist = (x[0],y)
114 fnl_tbl_ddl.append(newlist)
115 #i guess you can try to put some kind of funky datatype here, if i dont catch it we will try to dump it into a varchar
116 else:
117 varcharlen= (int(x[2])+round((int(x[2])*.10)))
118 y = f"varchar({varcharlen})"
119 newlist = []
120 newlist = (x[0],y)
121 fnl_tbl_ddl.append(newlist)
122
123
124 #query db for any tables that exist for the entered username
125 #find last table sequence number
126 get_tbl_sql = f"""
127 SELECT top 1
128 substring(TableName,10,length(TableName))
129 FROM
130 DBC.TablesV
131 WHERE
132 TableName like 'u_{user}%'
133 and
134 DatabaseName = 'db'
135 ORDER BY
136 TableName desc
137 ;
138 """
139
140 #will create a table in db with the proper sequenced int after u_<userid>_<nn>
141 #name new user table with next sequence number
142 def tbl_sequence():
143 cursor_td.execute(get_tbl_sql)
144 dset=cursor_td.fetchone()
145 cursor_td.close
146 if dset ==None:
147 tbl_rspns = 0
148 else:
149 tbl_rspns = dset[0]
150 return tbl_rspns
151 # if a table doesnt exist for that user it will start with 01
152 if tbl_sequence() == 0 :
153 new_tbl_seq_num = f"{1:02d}"
154 else:
155 new_tbl_seq_num = f"{int(tbl_sequence()) + 1:02d}"
156
157 #creating the SQL strings that will Create table, and insert into table
158 new_tbl_nm = f"db.u_{user}_{new_tbl_seq_num}"
159 new_view_nm = f"views.u_{user}_{new_tbl_seq_num}"
160 create_tbl_sql = f"""create table {new_tbl_nm} ("""
161 create_tbl_ins = f"""insert into {new_tbl_nm} ("""
162 create_tbl_val = f""" values ("""
163 comment_tbl_sql = f"""COMMENT ON {new_tbl_nm} AS '{file}';"""
164 #fnl_ddl_tbl is variable list that stores final teradata columns and data types
165 #build final table ddl
166 for x in fnl_tbl_ddl:
167 create_tbl_sql = create_tbl_sql + " " + x[0] +" "+ x[1] +", "
168 create_tbl_ins = create_tbl_ins + " " + x[0] +" "+ ", "
169 create_tbl_val = create_tbl_val + " " + " ? " +" "+ ","
170 create_tbl_sql = create_tbl_sql[:-2] + """ ,kip_id BIGINT GENERATED ALWAYS AS IDENTITY
171 (START WITH 1 INCREMENT BY 1 MINVALUE 1 MAXVALUE 922337203685477580 NO CYCLE) )
172 PRIMARY INDEX ( kip_id ) ;"""
173 create_tbl_ins = create_tbl_ins[:-2] + " )"
174 create_tbl_val = create_tbl_val[:-2] + " );"
175 ins_sql = str(create_tbl_ins) +" "+ str(create_tbl_val)
176 #print for confirmation
177 #print(create_tbl_sql)
178 #print(ins_sql)
179
180
181 #actually create the table by running the create table sql query and commit
182 cursor_td.execute(create_tbl_sql)
183 cursor_td.commit()
184 cursor_td.execute(comment_tbl_sql)
185 cursor_td.commit()
186
187 #take the dataframe from the file after it has been read and convert all of the values to strings and store the strings in a list.
188 #we do this because pyodbc really likes to load strings to whatever datatype exists, let's KISS (keep it simple stupid)
189 #cnvert dataframe to string list for table insert
190 str_dset=[]
191 for rec in file_values:
192 recordvalues = []
193 recordvalues = rec
194 str_rec_val = []
195 for val in recordvalues:
196 v = str(val)
197 str_rec_val.append(v)
198 str_dset.append(str_rec_val)
199
200
201
202
203 # here we load the final dataset stored as a list full of strings into the table
204 # we can change the bulkrecordsize to load more records to the table at once,
205 #this may need to be lowered if there are too many columns causing the
206 #memory to overload, or increase the size if you have a narrow table with more rows and want to load it faster.
207 rec_cnt = len(str_dset)
208 #pyodbc.DataError: ('22001', '[22001] [Teradata][ODBC Teradata Driver] (67) SQL request
209 #exceeds maximum allowed length of 7 MB. (67) (SQLExecute)')
210 #
211 bulkrecordsize = 5000 #count of records to bulk insert at one time
212 #(dependant uponly column type and number of columns, adjust as needed)
213 curs = 0 #cursor variable used to assign range of execute many, min range
214 curs2 = bulkrecordsize - 1 #cursor variable used to assign range of execute many, max range
215 #insert data in chunks as to not overload driver memory max for driver
216 while curs <= rec_cnt:
217 if curs < rec_cnt:
218 cursor_td.executemany(ins_sql, str_dset[curs:curs2])
219 print(str(curs) + "-" + str(curs2))
220 cursor_td.commit()
221 #print(str(curs) +" - "+str(curs2)) # report what chunk has been loaded to the table
222 curs = curs + bulkrecordsize
223 curs2 = curs + bulkrecordsize -1
224 if curs2 > rec_cnt:
225 curs2 = rec_cnt
226
227 createview = f"""
228 REPLACE VIEW {new_view_nm}
229 AS
230 LOCK ROW FOR ACCESS
231 SELECT *
232 FROM {new_tbl_nm};
233 """
234 print(createview)
235 cursor_td.execute(createview)
236 cursor_td.commit()
237 print("created view")
238
239 comment_view=f"""COMMENT ON {new_view_nm} AS '{file}';"""
240 print(comment_view)
241 cursor_td.execute(comment_view)
242 cursor_td.commit()
243 print("commented view")
244
245
246 log.close()
247 del_dt = (date.today() + timedelta(30)).strftime("%Y-%m-%d")
248 print(new_tbl_nm, rec_cnt)
249 return (new_view_nm, rec_cnt, str(del_dt))
250
251
252
253
254 except Exception as error:
255 error = traceback.format_exc()
256 now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
257 log.write(f"[{user}] {now}:\n{error}\n--------------------\n\n")
258 log.close()
259 e = "error"
260 return e