· 6 years ago · Jul 19, 2019, 04:46 PM
1ftp = FTP('server.com','user_name','password')
2
3import psycopg2
4import time
5import os
6#import MySQLdb
7#import paramiko
8from ftplib import FTP
9#from utils.config import Configuration as Config
10#from utils.utils import get_global_config
11
12start_time = time.perf_counter()
13
14cnx_psql = psycopg2.connect(host="localhost", database="postgres", user="postgres",
15 password="postgres", port="5432")
16
17# Cursors initializations
18cur_psql = cnx_psql.cursor()
19
20def getFile(ftp, filename):
21 try:
22 local_filename = os.path.join(r"/Users/linu/Downloads/", filename)
23 lf = open(local_filename, "wb")
24 ftp.retrbinary("RETR " + filename ,lf.write)
25 print("file copied")
26 except:
27 print ("Error")
28try:
29
30 filePath='''/Users/linu/Downloads/log'''
31 #filePath='''/cmd/log/stk/log.txt'''
32 table='staging.stock_dump'
33
34 SQL="""DROP TABLE IF EXISTS """+ table + """;CREATE TABLE IF NOT EXISTS """+ table + """
35 (created_date TEXT, product_sku TEXT, previous_stock TEXT, current_stock TEXT );"""
36
37 cur_psql.execute(SQL)
38 cnx_psql.commit()
39
40
41 ftp = FTP('server.com','user_name','password')
42 print("FTP connection succesful")
43 data = []
44 ftp.cwd('/stockitem/')
45 getFile(ftp,'log')
46
47 read_file = open(filePath, "r")
48 my_file_data = read_file.readlines()
49
50 for line in my_file_data:
51 if 'Stock:' in line:
52 fields=line.split(" ")
53 date_part1=fields[0]
54 date_part2=fields[1][:-1]
55 sku=fields[3]
56 prev_stock=fields[5]
57 current_stock=fields[7]
58 if prev_stock.strip()==current_stock.strip():
59 continue
60 else:
61 cur_psql.execute("insert into " + table+"(created_date, product_sku, previous_stock , current_stock)" + " select CAST('" + date_part1+ " "+ date_part2 + "' AS TEXT)" +", CAST('"+sku+"' AS TEXT),CAST('" + prev_stock +"' AS TEXT),CAST('" +current_stock + "' AS TEXT);")
62 cnx_psql.commit()
63 cur_psql.close()
64 cnx_psql.close()
65 print("Data loaded to DWH from text file")
66 print("Data porting took %s seconds to finish---" % (time.perf_counter() - start_time))
67except (Exception, psycopg2.Error) as error:
68 print ("Error while fetching data from PostgreSQL", error)
69 print("Error adding information.")
70 quit()
71finally:
72 ftp.close()
73
74SERVER = 'server.com'
75USER_NAME = 'Linu'
76PASSWORD = 'CR.SEisGreat'
77
78ftp = FTP(SERVER, USER_NAME, PASSWORD)
79
80parser = argparse.ArgumentParser(description='Reading a file from a distant FTP server.')
81parser.add_argument('--passwd', dest='password', type=str,
82 help='password required for FTP connection')
83
84FILE_PATH='''/path/to/file/'''
85
86import psycopg2
87import time
88import os
89
90from psycopg2 import connect, sql
91from time import perf_counter
92
93try:
94 # ...
95except:
96 print ("Error")
97
98try:
99 # ...
100except Exception as e:
101 print ("Error: {}".format(e))
102
103SQL="""DROP TABLE IF EXISTS """+ table + """;CREATE TABLE IF NOT EXISTS """+ table + """
104(created_date TEXT, product_sku TEXT, previous_stock TEXT, current_stock TEXT );"""
105
106sql.SQL="""DROP TABLE IF EXISTS {table_name} ; CREATE TABLE IF NOT EXISTS {table_name}
107 (created_date TEXT, product_sku TEXT, previous_stock TEXT,
108 current_stock TEXT );""".format(
109 table_name = sql.Literal(table)
110)
111
112keys = ['created_date', 'product_sku', 'previous_stock', 'current_stock']
113sql.SQL("""CREATE TABLE IF NOT EXISTS {} ({});""").format(
114 sql.Identifier(table),
115 sql.SQL('{} TEXT,').join(map(sql.Identifier, keys))
116)
117
118with open(filePath) as read_file:
119 for line in read_file:
120 ...
121
122cur_psql.execute(f"INSERT INTO {table}"
123 " (created_date, product_sku, previous_stock , current_stock)"
124 " VALUES (%s, %s, %s, %s)",
125 (date_part1+ " "+ date_part2, sku, prev_stock, current_stock))