· 6 years ago · Jul 16, 2019, 06:34 AM
1async def fetch_urls(urls, proxies=None, len_arr=1, responses=[]):
2 """ Check if URL is cached first via check_existing_query(),
3 If none is found, fetch then store response.ipyt
4 Otherwise, return cached response.
5 """
6 if type(urls) is not list:
7 urls = [urls]
8 if not os.path.isfile(SQLITE_PATH):
9 """ Check if SQLite3 database exists already.
10 If not, create one and create the relevant table.
11 """
12 cur = sqlite3.connect(SQLITE_PATH).cursor()
13 cur.execute("CREATE TABLE query_cache(query VARCHAR, date DATETIME, response VARCHAR);")
14 if proxies:
15 chunk_size = len(proxies)
16 else:
17 chunk_size = 3
18 if len(urls) > chunk_size:
19 chunks = [urls[i:i + chunk_size] for i in range(0, len(urls), chunk_size)]
20 else:
21 chunks = [urls]
22 async with aiosqlite.connect(SQLITE_PATH) as db:
23 async with aiohttp.ClientSession() as session:
24 assert type(urls) is list, "Input urls are not a list"
25 proxy = None
26 arr = []
27 errors = []
28 for i, chunk in enumerate(chunks):
29 print(f"Processing chunk {i + 1} out of {len(chunks)}... chunk size {len(chunk)}")
30 if proxy:
31 response = await asyncio.gather(*[get_url(j, session, arr, db, proxy=fetch_proxy(proxies, i), errors=errors) for i, j in enumerate(chunk)])
32 else:
33 response = await asyncio.gather(*[get_url(i, session, arr, db, errors=errors) for i in chunk])
34 print(f"Received {len(response)} responses")
35 responses += response
36 print("Sleeping for three seconds.")
37 await asyncio.sleep(3)
38 print(f"{len(errors)} errors occured.")
39 if len(errors) > 0:
40 err_urls = [i[0] for i in errors]
41 try:
42 err_proxies = set([i[1] for i in errors])
43 proxies = [i for i in proxies if i not in err_proxies]
44 await fetch_urls(err_urls, proxies=proxies)
45 except TypeError:
46 await fetch_urls(err_urls)
47 responses = [i for i in responses if i is not None]
48 return responses