· 5 years ago · Feb 09, 2020, 05:56 PM
1#!/usr/bin/python
2# -*- coding: utf-8 -*-
3#
4# dependencies:
5# - mysql/mariadb server
6# - python 3.8+
7# - pip install BeautifulSoup4 lxml mysql-connector requests python-dateutil
8
9from cProfile import run as Profiling
10from multiprocessing import Pool
11from threading import Thread
12from queue import Queue, Empty as QueueEmpty
13from enum import Enum
14import datetime
15from json import loads
16import traceback
17
18from bs4 import BeautifulSoup
19import requests
20import mysql.connector
21from dateutil.relativedelta import relativedelta
22
23DB_HOST = "localhost"
24DB_USER = "sad"
25DB_PASS = "reGGied0gg0!"
26DB_NAME = "sad"
27
28USER_IDS_PER_PROCESS = 1000
29
30START_ID = 0
31# bad idea running it all at onec
32# better separate in sessions, cause killing is annoying af
33wanted_users = 3600000
34BATCH_COUNT = wanted_users//USER_IDS_PER_PROCESS
35BATCHES_AT_ONCE = 12
36
37NUM_PARSING_THREADS = 2
38NUM_RETRIEVING_THREADS = 4 # has to be a divisor of USER_IDS_PER_PROCESS
39MAX_QUEUED_ITEMS = NUM_RETRIEVING_THREADS * NUM_PARSING_THREADS * 4 # random multiplier, just to make sure the retrievers dont get stuck on a full queue. shouldn't happen anyway
40
41DO_PROFILING = False
42
43class UserInfo:
44 class Status(Enum):
45 EXISTS = 0
46 BANNED = 1
47 REMOVED = 2
48
49 COUNTRY_CODE = {
50 "": " ",
51 "Afghanistan": "AF",
52 "Åland Islands": "AX",
53 "Albania": "AL",
54 "Algeria": "DZ",
55 "American Samoa": "AS",
56 "Andorra": "AD",
57 "Angola": "AO",
58 "Anguilla": "AI",
59 "Antarctica": "AQ",
60 "Antigua and Barbuda": "AG",
61 "Argentina": "AR",
62 "Armenia": "AM",
63 "Aruba": "AW",
64 "Australia": "AU",
65 "Austria": "AT",
66 "Azerbaijan": "AZ",
67 "Bahamas": "BS",
68 "Bahrain": "BH",
69 "Bangladesh": "BD",
70 "Barbados": "BB",
71 "Belarus": "BY",
72 "Belgium": "BE",
73 "Belize": "BZ",
74 "Benin": "BJ",
75 "Bermuda": "BM",
76 "Bhutan": "BT",
77 "Bolivia, Plurinational State of": "BO",
78 "Bonaire, Sint Eustatius and Saba": "BQ",
79 "Bosnia and Herzegovina": "BA",
80 "Botswana": "BW",
81 "Bouvet Island": "BV",
82 "Brazil": "BR",
83 "British Indian Ocean Territory": "IO",
84 "Brunei Darussalam": "BN",
85 "Bulgaria": "BG",
86 "Burkina Faso": "BF",
87 "Burundi": "BI",
88 "Cambodia": "KH",
89 "Cameroon": "CM",
90 "Canada": "CA",
91 "Cape Verde": "CV",
92 "Cayman Islands": "KY",
93 "Central African Republic": "CF",
94 "Chad": "TD",
95 "Chile": "CL",
96 "China": "CN",
97 "Christmas Island": "CX",
98 "Cocos (Keeling) Islands": "CC",
99 "Colombia": "CO",
100 "Comoros": "KM",
101 "Congo": "CG",
102 "Congo, The Democratic Republic of the": "CD",
103 "Cook Islands": "CK",
104 "Costa Rica": "CR",
105 "Côte d'Ivoire": "CI",
106 "Croatia": "HR",
107 "Cuba": "CU",
108 "Curaçao": "CW",
109 "Cyprus": "CY",
110 "Czech Republic": "CZ",
111 "Denmark": "DK",
112 "Djibouti": "DJ",
113 "Dominica": "DM",
114 "Dominican Republic": "DO",
115 "Ecuador": "EC",
116 "Egypt": "EG",
117 "El Salvador": "SV",
118 "Equatorial Guinea": "GQ",
119 "Eritrea": "ER",
120 "Estonia": "EE",
121 "Ethiopia": "ET",
122 "Falkland Islands (Malvinas)": "FK",
123 "Faroe Islands": "FO",
124 "Fiji": "FJ",
125 "Finland": "FI",
126 "France": "FR",
127 "French Guiana": "GF",
128 "French Polynesia": "PF",
129 "French Southern Territories": "TF",
130 "Gabon": "GA",
131 "Gambia": "GM",
132 "Georgia": "GE",
133 "Germany": "DE",
134 "Ghana": "GH",
135 "Gibraltar": "GI",
136 "Greece": "GR",
137 "Greenland": "GL",
138 "Grenada": "GD",
139 "Guadeloupe": "GP",
140 "Guam": "GU",
141 "Guatemala": "GT",
142 "Guernsey": "GG",
143 "Guinea": "GN",
144 "Guinea-Bissau": "GW",
145 "Guyana": "GY",
146 "Haiti": "HT",
147 "Heard Island and McDonald Islands": "HM",
148 "Holy See (Vatican City State)": "VA",
149 "Honduras": "HN",
150 "Hong Kong": "HK",
151 "Hungary": "HU",
152 "Iceland": "IS",
153 "India": "IN",
154 "Indonesia": "ID",
155 "Iran, Islamic Republic of": "IR",
156 "Iraq": "IQ",
157 "Ireland": "IE",
158 "Isle of Man": "IM",
159 "Israel": "IL",
160 "Italy": "IT",
161 "Jamaica": "JM",
162 "Japan": "JP",
163 "Jersey": "JE",
164 "Jordan": "JO",
165 "Kazakhstan": "KZ",
166 "Kenya": "KE",
167 "Kiribati": "KI",
168 "Korea, Democratic People's Republic of": "KP",
169 "Korea, Republic of": "KR",
170 "Kuwait": "KW",
171 "Kyrgyzstan": "KG",
172 "Lao People's Democratic Republic": "LA",
173 "Latvia": "LV",
174 "Lebanon": "LB",
175 "Lesotho": "LS",
176 "Liberia": "LR",
177 "Libya": "LY",
178 "Liechtenstein": "LI",
179 "Lithuania": "LT",
180 "Luxembourg": "LU",
181 "Macao": "MO",
182 "Macedonia, The Former Yugoslav Republic of": "MK",
183 "Madagascar": "MG",
184 "Malawi": "MW",
185 "Malaysia": "MY",
186 "Maldives": "MV",
187 "Mali": "ML",
188 "Malta": "MT",
189 "Marshall Islands": "MH",
190 "Martinique": "MQ",
191 "Mauritania": "MR",
192 "Mauritius": "MU",
193 "Mayotte": "YT",
194 "Mexico": "MX",
195 "Micronesia, Federated States of": "FM",
196 "Moldova, Republic of": "MD",
197 "Monaco": "MC",
198 "Mongolia": "MN",
199 "Montenegro": "ME",
200 "Montserrat": "MS",
201 "Morocco": "MA",
202 "Mozambique": "MZ",
203 "Myanmar": "MM",
204 "Namibia": "NA",
205 "Nauru": "NR",
206 "Nepal": "NP",
207 "Netherlands": "NL",
208 "New Caledonia": "NC",
209 "New Zealand": "NZ",
210 "Nicaragua": "NI",
211 "Niger": "NE",
212 "Nigeria": "NG",
213 "Niue": "NU",
214 "Norfolk Island": "NF",
215 "Northern Mariana Islands": "MP",
216 "Norway": "NO",
217 "Oman": "OM",
218 "Pakistan": "PK",
219 "Palau": "PW",
220 "Palestine, State of": "PS",
221 "Panama": "PA",
222 "Papua New Guinea": "PG",
223 "Paraguay": "PY",
224 "Peru": "PE",
225 "Philippines": "PH",
226 "Pitcairn": "PN",
227 "Poland": "PL",
228 "Portugal": "PT",
229 "Puerto Rico": "PR",
230 "Qatar": "QA",
231 "Réunion": "RE",
232 "Romania": "RO",
233 "Russian Federation": "RU",
234 "Rwanda": "RW",
235 "Saint Barthélemy": "BL",
236 "Saint Helena, Ascension and Tristan da Cunha": "SH",
237 "Saint Kitts and Nevis": "KN",
238 "Saint Lucia": "LC",
239 "Saint Martin (French part)": "MF",
240 "Saint Pierre and Miquelon": "PM",
241 "Saint Vincent and the Grenadines": "VC",
242 "Samoa": "WS",
243 "San Marino": "SM",
244 "São Tomé and Príncipe": "ST",
245 "Saudi Arabia": "SA",
246 "Senegal": "SN",
247 "Serbia": "RS",
248 "Seychelles": "SC",
249 "Sierra Leone": "SL",
250 "Singapore": "SG",
251 "Sint Maarten (Dutch part)": "SX",
252 "Slovakia": "SK",
253 "Slovenia": "SI",
254 "Solomon Islands": "SB",
255 "Somalia": "SO",
256 "South Africa": "ZA",
257 "South Georgia and the South Sandwich Islands": "GS",
258 "South Sudan": "SS",
259 "Spain": "ES",
260 "Sri Lanka": "LK",
261 "Sudan": "SD",
262 "Suriname": "SR",
263 "Svalbard and Jan Mayen": "SJ",
264 "Swaziland": "SZ",
265 "Sweden": "SE",
266 "Switzerland": "CH",
267 "Syrian Arab Republic": "SY",
268 "Taiwan, Province of China": "TW",
269 "Tajikistan": "TJ",
270 "Tanzania, United Republic of": "TZ",
271 "Thailand": "TH",
272 "Timor-Leste": "TL",
273 "Togo": "TG",
274 "Tokelau": "TK",
275 "Tonga": "TO",
276 "Trinidad and Tobago": "TT",
277 "Tunisia": "TN",
278 "Turkey": "TR",
279 "Turkmenistan": "TM",
280 "Turks and Caicos Islands": "TC",
281 "Tuvalu": "TV",
282 "Uganda": "UG",
283 "Ukraine": "UA",
284 "United Arab Emirates": "AE",
285 "United Kingdom": "GB",
286 "United States": "US",
287 "United States Minor Outlying Islands": "UM",
288 "Uruguay": "UY",
289 "Uzbekistan": "UZ",
290 "Vanuatu": "VU",
291 "Venezuela, Bolivarian Republic of": "VE",
292 "Viet Nam": "VN",
293 "Virgin Islands, British": "VG",
294 "Virgin Islands, U.S.": "VI",
295 "Wallis and Futuna": "WF",
296 "Western Sahara": "EH",
297 "Yemen": "YE",
298 "Zambia": "ZM",
299 "Zimbabwe": "ZW",
300 }
301
302 def __init__(self, user_id, status):
303 self.user_id = user_id
304 self.status = status
305
306def user_id_range_getter_creator(start_id, start_offset):
307 start_i = ((start_offset) * USER_IDS_PER_PROCESS) // NUM_RETRIEVING_THREADS
308 end_i = ((start_offset + 1) * USER_IDS_PER_PROCESS) // NUM_RETRIEVING_THREADS
309 def gen():
310 for i in range(start_i, end_i):
311 yield start_id + i
312 return gen
313
314def user_id_range_process(start, width):
315 for i in range(width):
316 yield start + i * USER_IDS_PER_PROCESS
317
318def user_parser_internal(user_queue):
319 conn = mysql.connector.connect(host=DB_HOST, user=DB_USER, password=DB_PASS, database=DB_NAME, autocommit=True)
320 cursor_exists = conn.cursor(prepared=True)
321 exists_query = "INSERT INTO CFusers VALUES (" + ("?, " * 16) + "CURRENT_TIMESTAMP())"
322 cursor_doesntexist = conn.cursor(prepared=True)
323 doesnt_exist_query = "INSERT INTO CFusers (id, status) VALUES (?, ?)"
324 start_date = datetime.date.today()
325
326 while True:
327 item = user_queue.get()
328 if item is None:
329 break
330
331 if item.status == UserInfo.Status.EXISTS:
332 soup = BeautifulSoup(item.data, "lxml")
333 name = soup.title.string.split("|")[0].strip()
334
335 country = ""
336 country_div = soup.find_all("div", {"class": "field-item even"})
337 if len(country_div):
338 country = country_div[0].contents[0]
339
340 premium = False
341 active = False
342 versus_score = None
343 ffa_score = None
344 team_score = None
345 data_list = soup.find_all("dl")[0]
346 time_info = {
347 "years": 0,
348 "months": 0,
349 "weeks": 0,
350 }
351 for title, data in zip(data_list.find_all("dt"), data_list.find_all("dd")):
352 title = title.string
353 if "Member for" in title:
354 time_info_split = data.string.split()
355 for i in range(len(time_info_split)//2):
356 if "year" in time_info_split[i + 1]:
357 time_info["years"] = -int(time_info_split[i])
358 elif "month" in time_info_split[i + 1]:
359 time_info["months"] = -int(time_info_split[i])
360 elif "week" in time_info_split[i + 1]:
361 time_info["weeks"] = -int(time_info_split[i])
362 elif "Premium" in title:
363 premium = True
364 elif "1v1 Europe" in title:
365 point_info = data.string.split()[0]
366 versus_score = int(point_info)
367 elif "Team Europe" in title:
368 point_info = data.string.split()[0]
369 team_score = int(point_info)
370 elif "FFA Europe" in title:
371 point_info = data.string.split()[0]
372 ffa_score = int(point_info)
373 elif "Match History" in title:
374 rows = data.find_all("tbody")
375 active = bool(len(rows))
376 join_date = start_date + relativedelta(**time_info)
377
378 cursor_input = (
379 item.user_id,
380 item.status.value,
381 int(premium),
382 int(active),
383 UserInfo.COUNTRY_CODE[country],
384 name,
385 versus_score, versus_score, versus_score,
386 ffa_score, ffa_score, ffa_score,
387 team_score, team_score, team_score,
388 str(join_date)
389 )
390 cursor_exists.execute(exists_query, cursor_input)
391 elif item.status == UserInfo.Status.BANNED:
392 cursor_doesntexist.execute(doesnt_exist_query, (item.user_id, item.status.value))
393 elif item.status == UserInfo.Status.REMOVED:
394 cursor_doesntexist.execute(doesnt_exist_query, (item.user_id, item.status.value))
395
396 cursor_doesntexist.close()
397 cursor_exists.close()
398 conn.close()
399
400def user_parser(user_queue, error_queue):
401 print("Parser thread START")
402 try:
403 user_parser_internal(user_queue)
404 except Exception as e:
405 traceback.print_exception(None, e, e.__traceback__)
406 for i in range(NUM_RETRIEVING_THREADS):
407 error_queue.put(True)
408 print("Parser thread ERROR")
409 else:
410 print("Parser thread COMPLETED")
411
412def user_retriever(gen, user_queue, error_queue):
413 status_from_code = {
414 200: UserInfo.Status.EXISTS,
415 403: UserInfo.Status.BANNED,
416 404: UserInfo.Status.REMOVED,
417 }
418 print("User retriever thread START")
419 for user_id in gen():
420 try:
421 should_stop = error_queue.get_nowait()
422 except QueueEmpty:
423 try:
424 response = requests.get(f"http://forum.curvefever.com/user/{user_id}")
425 info = UserInfo(user_id, status_from_code[response.status_code])
426 info.data = response.text
427 response.close()
428 user_queue.put(info)
429 except KeyError:
430 print(f"Unknown status code for user {user_id}: {response.status_code}")
431 except Exception as e:
432 traceback.print_exception(None, e, e.__traceback__)
433 print(f"User retriever thread ERROR at user {user_id}")
434 break
435 else:
436 print(f"User retriever thread STOP because parser errored out at user {user_id}")
437 break
438
439def process_entrypoint_retrieve_users(user_id_start):
440 print("Process START for begin id:", user_id_start)
441
442 error_queue = Queue(MAX_QUEUED_ITEMS)
443 user_queue = Queue(MAX_QUEUED_ITEMS)
444
445 user_retriever_thread = [Thread(target=user_retriever, args=(user_id_range_getter_creator(user_id_start, i), user_queue, error_queue)) for i in range(NUM_RETRIEVING_THREADS)]
446 user_parser_threads = [Thread(target=user_parser, args=(user_queue, error_queue)) for i in range(NUM_PARSING_THREADS)]
447
448 for t in user_parser_threads:
449 t.start()
450
451 for t in user_retriever_thread:
452 t.start()
453 for t in user_retriever_thread:
454 t.join()
455
456 for i in range(NUM_PARSING_THREADS):
457 user_queue.put(None)
458
459 for t in user_parser_threads:
460 t.join()
461
462 print("Process END for begin id:", user_id_start)
463
464def retrieve_users():
465 start_time = datetime.datetime.now()
466 conn = mysql.connector.connect(host=DB_HOST, user=DB_USER, password=DB_PASS, database=DB_NAME)
467 cursor = conn.cursor()
468
469 create_users_string = """CREATE TABLE IF NOT EXISTS `CFusers` (
470 `id` INT NOT NULL AUTO_INCREMENT,
471 `status` TINYINT NOT NULL,
472 `premium` BOOL,
473 `active` BOOL,
474 `countryCode` CHAR(2),
475 `name` VARCHAR(60),
476 `versusScore` INT,
477 `versusScoreMax` INT,
478 `versusScoreMin` INT,
479 `ffaScore` INT,
480 `ffaScoreMax` INT,
481 `ffaScoreMin` INT,
482 `teamScore` INT,
483 `teamScoreMax` INT,
484 `teamScoreMin` INT,
485 `joinDate` DATE,
486 `insertTime` TIMESTAMP,
487 PRIMARY KEY (`id`)
488 )"""
489
490 # these 2 are unused
491 create_matches_string = """CREATE TABLE IF NOT EXISTS `CFmatches` (
492 `id` INT NOT NULL,
493 `scaled` BOOL NOT NULL,
494 `duration` SMALLINT NOT NULL,
495 `gameMode` TINYINT NOT NULL,
496 `startTime` TIMESTAMP NOT NULL,
497 `roomName` VARCHAR(60) NOT NULL,
498 PRIMARY KEY (`id`)
499 )"""
500
501 create_match_players_string = """CREATE TABLE IF NOT EXISTS `CFmatchplayers` (
502 `idMatch` INT NOT NULL,
503 `idPlayer` INT NOT NULL,
504 `newrank` INT NOT NULL,
505 `gain` TINYINT NOT NULL,
506 `place` TINYINT NOT NULL,
507 CONSTRAINT PK_player PRIMARY KEY (`idMatch`, `idPlayer`),
508 FOREIGN KEY (`idMatch`) REFERENCES `CFmatches`(`id`)
509 )"""
510 cursor.execute(create_users_string)
511 # cursor.execute(create_matches_string)
512 # cursor.execute(create_match_players_string)
513 conn.commit()
514 cursor.close()
515 conn.close()
516
517 with Pool(processes=BATCHES_AT_ONCE) as pool:
518 try:
519 pool.map(process_entrypoint_retrieve_users, user_id_range_process(START_ID, BATCH_COUNT))
520 except KeyboardInterrupt: # no worky, I advise not terminating with ctrl+c, that or kill with task manager all the processes/cmd holding them
521 pool.terminate()
522 pool.join()
523
524def with_profiling():
525 Profiling("retrieve_users()", "stats.bin")
526def without_profiling():
527 retrieve_users()
528
529if __name__ == "__main__":
530 call_arr = [
531 without_profiling,
532 with_profiling,
533 ]
534 call_arr[int(DO_PROFILING)]()