· 7 years ago · Dec 07, 2018, 11:24 PM
1import csv
2import logging
3import os
4import random
5import time
6from datetime import datetime
7from datetime import timedelta
8from dateutil.relativedelta import relativedelta
9from itertools import product
10from multiprocessing.dummy import Pool as ThreadPool
11
12import pandas
13import psycopg2
14import requests
15from bs4 import BeautifulSoup
16
17THREAD_NUMBER = 20
18HEADERS = {
19 "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8",
20 "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/64.0.3282.186 Safari/537.36",
21 "Content-Type": "application/x-www-form-urlencoded",
22 "Accept-Encoding": "gzip, deflate",
23 "Accept-Language": "en-GB,en;q=0.9,en-US;q=0.8,zh-TW;q=0.7,zh;q=0.6,zh-CN;q=0.5"
24}
25CREDENTIALS_FILE_CLIENT = "db_credentials.csv"
26INPUT_FILE = "scrapeme.csv"
27DATE_FORMAT = "%Y/%m/%d"
28
29CHUNKS_NUMBER = 1000
30
31
32# create logs
33logger = logging.getLogger(__name__)
34logger.setLevel(logging.INFO)
35handler = logging.FileHandler("scraper.log", 'w')
36handler.setLevel(logging.INFO)
37formatter = logging.Formatter("[%(levelname)s %(asctime)s] "
38 "thread #%(thread)d - "
39 "at function %(funcName)s - "
40 "%(message)s",
41 "%H:%M:%S")
42handler.setFormatter(formatter)
43logger.addHandler(handler)
44
45# start new session
46session = requests.session()
47
48
49def time_it(pattern):
50 """Counting the execution time of the function"""
51 def decorator(func):
52 def wrapper(*args, **kwargs):
53 start_time = time.time()
54 res = func(*args, **kwargs)
55 print(pattern.format(time.time() - start_time))
56 return res
57 return wrapper
58 return decorator
59
60
61def chunk_by_number(seq, num):
62 """Split seq into chunks"""
63 avg = len(seq) / float(num)
64 out = []
65 last = 0.0
66 while last < len(seq):
67 out.append(seq[int(last):int(last + avg)])
68 last += avg
69
70 return out
71
72
73def get_db_credentials(credentials_file):
74 """Use db_credentials.csv file to connect to the DB"""
75 cd = os.path.dirname(os.path.abspath(__file__))
76 csv_file = os.path.join(cd, credentials_file)
77
78 with open(csv_file) as file:
79 read_csv = csv.reader(file, delimiter=";")
80 db__credentials = {row[0]: row[1] for row in read_csv}
81
82 conn = psycopg2.connect(host=db__credentials["hostname_db"],
83 user=db__credentials["username_db"],
84 password=db__credentials["password_db"],
85 dbname=db__credentials["database_db"],
86 port=db__credentials["port_db"])
87 cur = conn.cursor()
88
89 return conn, cur
90
91
92def create_tables():
93 """Create all necessary tables"""
94 connection, cursor = get_db_credentials(CREDENTIALS_FILE_CLIENT)
95 query = """create table if not exists dbo.hkexnews_log(
96 stock_id varchar,
97 date date,
98 constraint hkexnews_log_pkey primary key (stock_id, date)
99 )"""
100 cursor.execute(query)
101
102 query = """create table if not exists dbo.hkexnews_header(
103 parse_date date,
104 data_date date,
105 result_date date,
106 stock_id varchar,
107 company_name varchar,
108 element varchar,
109 shareholding_in_ccass varchar,
110 number_of_participants varchar,
111 percent_of_total varchar,
112 constraint hkexnews_header_pkey primary key (data_date, result_date, stock_id, element)
113 )"""
114 cursor.execute(query)
115
116 query = """create table if not exists dbo.hkexnews_ownership_new2(
117 stock_id varchar,
118 participant_id varchar,
119 shareholding bigint,
120 percent_of_share real,
121 result_date date,
122 constraint hkexnews_ownership_pkey primary key (stock_id, participant_id, result_date)
123 )"""
124 cursor.execute(query)
125
126 query = """create table if not exists dbo.hkexnews_participants(
127 update_date date,
128 participant_id varchar,
129 name varchar,
130 address varchar,
131 constraint hkexnews_participants_pkey primary key (participant_id, name, address)
132 )"""
133 cursor.execute(query)
134
135 query = """create table if not exists dbo.hkex_holidays(
136 date date,
137 holiday text,
138 constraint hkex_holidays_pkey primary key (date, holiday)
139 )"""
140 cursor.execute(query)
141
142 connection.commit()
143 connection.close()
144
145
146def save_to_db(values, conn, cur, table_name):
147 signs = '(' + ('%s,' * len(values[0]))[:-1] + ')'
148 try:
149 args_str = b','.join(cur.mogrify(signs, x) for x in values)
150 args_str = args_str.decode()
151 insert_statement = """INSERT INTO {} VALUES""".format(table_name)
152 conflict_statement = """ ON CONFLICT DO NOTHING"""
153 cur.execute(insert_statement + args_str + conflict_statement)
154 conn.commit()
155 except Exception as e:
156 logger.error("Error during saving to DB")
157 logger.error(e)
158 conn.rollback()
159 return False
160 else:
161 logger.info("Chunk was successfully saved to DB")
162 return True
163
164
165def get_parsed():
166 """Get stock ids which have already been parsed in specific dates"""
167 connection, _ = get_db_credentials(CREDENTIALS_FILE_CLIENT)
168 query = "select * from dbo.hkexnews_log"
169 df = pandas.read_sql(query, connection)
170 connection.close()
171
172 return list(zip(*[df[c].values.tolist() for c in df]))
173
174
175def get_to_parse(source_file):
176 """Get stock ids from source_file which we need to parse"""
177 source_ids = []
178 with open(source_file) as file:
179 read_csv = csv.reader(file, delimiter=";")
180 for line in read_csv:
181 source_ids.append(line[0])
182
183 return source_ids
184
185
186def gen_dates():
187 """Generate days in past year, exclude holidays and sundays"""
188 dates_list = []
189
190 connection, _ = get_db_credentials(CREDENTIALS_FILE_CLIENT)
191 query = "select date from dbo.hkex_holidays"
192 holidays_list = pandas.read_sql(query, connection)["date"].tolist()
193 connection.close()
194
195 start_date = datetime.now() - relativedelta(years=1)
196 end_date = datetime.now()
197 days_number = (end_date - start_date).days
198 for i in range(days_number + 1):
199 date = (start_date + timedelta(days=i)).date()
200 if date.weekday() != 6 and date not in holidays_list:
201 dates_list.append((start_date + timedelta(days=i)).date())
202
203 return dates_list
204
205
206def thread_work(payload):
207 """Parsing and saving results. Pass to each thread"""
208 logger.info("Start thread work")
209
210 try:
211 response = session.post("http://www.hkexnews.hk/sdw/search/searchsdw.aspx", headers=HEADERS, data=payload)
212 except Exception as e:
213 logger.error("Cannot make request to site")
214 logger.error(e)
215 else:
216 soup = BeautifulSoup(response.text, "lxml")
217
218 header_table = []
219 ownership_table = []
220 participants_table = []
221 log_table = []
222
223 parse_date = datetime.now().date()
224 stock_id = payload["txtStockCode"].lstrip("0")
225 data_date = datetime.strptime(payload["txtShareholdingDate"], DATE_FORMAT).date()
226
227 # parse block
228 try:
229 result_date = datetime.strptime(soup.find("input", attrs={"name": "txtShareholdingDate"})["value"],
230 DATE_FORMAT).date()
231 company_name = soup.find("input", attrs={"name": "txtStockName"})["value"]
232 header = soup.find("div", attrs={"class": "ccass-search-summary-table"})
233 ownership = soup.find("table", attrs={"class": "table"}).find("tbody")
234
235 if header:
236 for row in header.find_all("div", attrs={"class": "ccass-search-datarow"}):
237 header_table.append([
238 parse_date,
239 data_date,
240 result_date,
241 stock_id,
242 company_name,
243 row.find("div", attrs={"class": "summary-category"})
244 .text,
245 row.find("div", attrs={"class": "shareholding"})
246 .find("div", attrs={"class": "value"}).text,
247 row.find("div", attrs={"class": "number-of-participants"})
248 .find("div", attrs={"class": "value"}).text,
249 row.find("div", attrs={"class": "percent-of-participants"})
250 .find("div", attrs={"class": "value"}).text,
251 ])
252
253 if ownership:
254 for row in ownership.find_all("tr"):
255 ownership_table.append([
256 stock_id,
257 row.find("td", attrs={"col-participant-id"})
258 .find("div", attrs={"class": "mobile-list-body"}).text,
259 int(row.find("td", attrs={"col-shareholding"}).find("div", attrs={"class": "mobile-list-body"})
260 .text.replace(",", "")),
261 float(row.find("td", attrs={"col-shareholding-percent"})
262 .find("div", attrs={"class": "mobile-list-body"}).text.rstrip("%")),
263 result_date
264 ])
265 participants_table.append([
266 parse_date,
267 row.find("td", attrs={"col-participant-id"}).find("div", attrs={"class": "mobile-list-body"})
268 .text,
269 row.find("td", attrs={"col-participant-name"}).find("div", attrs={"class": "mobile-list-body"})
270 .text,
271 row.find("td", attrs={"col-address"}).find("div", attrs={"class": "mobile-list-body"})
272 .text,
273 ])
274 except Exception as e:
275 logger.error("Error during parsing, stock code = {}, requested date = {}".format(stock_id, data_date))
276 logger.error(e)
277 else:
278 # if parsing end with success, add parsed stock_id and date of parsing to log
279 logger.info("Successfully end thread work")
280 log_table.append([stock_id, result_date])
281 return header_table, ownership_table, participants_table, log_table
282
283 # set random delay for requests (try not to DDoS www.hkexnews.hk)
284 time.sleep(random.uniform(0.5, 2))
285
286
287def main():
288 logger.info("Start main")
289 saved_count = 0
290
291 # create tables
292 create_tables()
293
294 # open connection to db
295 connection, cursor = get_db_credentials(CREDENTIALS_FILE_CLIENT)
296
297 # make init requests for necessary params
298 init_response = session.get("http://www.hkexnews.hk/sdw/search/searchsdw.aspx", headers=HEADERS)
299 soup = BeautifulSoup(init_response.text, "lxml")
300 viewstate = soup.find("input", {"name": "__VIEWSTATE"}).get("value")
301 viewstategenerator = soup.find("input", {"name": "__VIEWSTATEGENERATOR"}).get("value")
302
303 # determine what to parse
304 source_ids = get_to_parse(INPUT_FILE) # read ids from INPUT FILE
305 parse_dates = gen_dates() # generate dates at which parsing will be
306 pairs_to_check = list(product(source_ids, parse_dates)) # form pairs (id, date)
307 parsed_pairs = get_parsed() # get already parse pairs (id, date)
308 pairs_to_parse = [pair for pair in pairs_to_check if pair not in parsed_pairs] # form pairs which will be parsed
309 logger.info("To parse {} pairs (id, date)".format(len(pairs_to_parse)))
310
311 # create list of different payloads for each thread
312 payloads = [{
313 "__EVENTTARGET": "btnSearch",
314 "__VIEWSTATE": viewstate,
315 "__VIEWSTATEGENERATOR": viewstategenerator,
316 "txtShareholdingDate": pair[1].strftime(DATE_FORMAT),
317 "txtStockCode": "0" * (5 - len(pair[0])) + pair[0]
318 } for pair in pairs_to_parse]
319
320 # parse data chunk by chunk
321 chunks = chunk_by_number(payloads, CHUNKS_NUMBER)
322 for chunk in chunks:
323 print("Left to save:", len(pairs_to_parse) - saved_count)
324 header = []
325 ownership = []
326 participants = []
327 log = []
328
329 pool = ThreadPool(THREAD_NUMBER)
330 results = pool.map(thread_work, chunk)
331 for thread_res in results:
332 if thread_res:
333 header.extend(thread_res[0])
334 ownership.extend(thread_res[1])
335 participants.extend(thread_res[2])
336 log.extend(thread_res[3])
337
338 # save results to database, each structure to appropriate table
339 if header and ownership and participants and log:
340 logger.info("Start saving chunk to DB, chunk size: {}".format(len(chunk)))
341 save_to_db(header, connection, cursor, table_name="dbo.hkexnews_header")
342 save_to_db(ownership, connection, cursor, table_name="dbo.hkexnews_ownership")
343 save_to_db(participants, connection, cursor, table_name="dbo.hkexnews_participants")
344 save_to_db(log, connection, cursor, table_name="dbo.hkexnews_log")
345 saved_count += len(chunk)
346 else:
347 logger.info("Nothing to save")
348
349 logger.info("End main")
350
351
352main()