· 6 years ago · Dec 17, 2019, 04:52 PM
1
2import requests
3from bs4 import BeautifulSoup
4import re
5from prettytable import PrettyTable
6import os.path
7import pickle
8from datetime import datetime, timedelta
9
10turbo_headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.102 Safari/537.36'}
11
12size_regex = re.compile(r".*(\d+) \D*(\d+) \w+.*", re.MULTILINE)
13price_regex = re.compile(r".*\$(\d*\.?\d*).*", re.MULTILINE)
14beerid_regex = re.compile(r".*https://www.thebeerstore.ca/beers/([\w|\-|\_]*)/")
15
16def this_or_given(thing, given=None): return thing if thing is not None else given
17
18class Beer:
19 def __init__ (self, key):
20 def extract_offerings(item_list):
21 if item_list is None: return None
22 results = []
23 for item in item_list.find_all('li', {"class": "single_beer_details"}):
24 item_size_result = size_regex.search(item.find("div", {"class": "col_1"}).text)
25 num_items = item_size_result.group(1)
26 size_ml = item_size_result.group(2)
27 price_div = item.find("div", {"class": "col_2"})
28 # Check if beer on sale
29 if price_div.find("del") is not None:
30 price_text = price_div.find("del").text
31 sale_price = price_div.find_all("span")[1].text
32 else:
33 sale_price = None
34 price_text = price_div.text
35 regular_price = float(price_regex.search(price_text).group(1))
36 if sale_price:
37 if sale_price[0] == "$":
38 sale_price = sale_price[1:]
39 sale_price = float(sale_price)
40 inventory = item.find("div", {"class": "col_3"}).find("span").text
41 results.append([num_items, size_ml, regular_price, sale_price, inventory])
42 return results
43
44 def find_cheapest(item_list):
45 if item_list is None: return None
46 cheapest = 99999999999.99
47 for item in item_list:
48 price = float(min(this_or_given(item[2], 999999999.99), this_or_given(item[3], 9999999999999.99)))
49 if float(item[0]) == 0:
50 continue
51 cheapest = price / float(item[0])
52 return None if cheapest == 9999999999999999999.99 else cheapest
53
54
55 self.key = key
56 r = requests.get("https://www.thebeerstore.ca/beers/{0}/".format(key), headers=turbo_headers)
57 soup = BeautifulSoup(r.text, features="html.parser")
58 detail_div = soup.find("div", {"class": "single_beer_dt_sec"})
59 name_div = soup.find("div", {"class": "desc_mob"})
60
61 self.cans = extract_offerings(soup.find("ul", {"class": "total_cans"}))
62 self.bottles = extract_offerings(soup.find("ul", {"class": "total_bottles"}))
63 self.kegs = extract_offerings(soup.find("ul", {"class": "total_kegs"}))
64
65 self.cheapest_cans = find_cheapest(self.cans)
66 self.cheapest_bottles = find_cheapest(self.bottles)
67 self.cheapest_kegs = find_cheapest(self.kegs)
68
69 self.sale = False
70 for hit in this_or_given(self.cans, []) + this_or_given(self.bottles, []) + this_or_given(self.kegs, []):
71 for item in hit:
72 if hit[3] is not None:
73 self.sale = True
74 break
75
76 self.company = name_div.find("h3").get_text(strip=True)
77 self.name = name_div.find("h2").get_text(strip=True)
78
79 for item in detail_div.find_all("div", {"class": "inner_box"}):
80 k = item.find("h3").get_text(strip=True).lower()
81 v = item.find("p").get_text(strip=True).lower()
82 if k == "type": self.type = v
83 elif k == "category": self.category = v
84 elif k == "abv": self.abv = v
85 elif k == "country": self.country = v
86
87 def __str__(self):
88 result = ""
89
90 def print_prices_table(name, items):
91 t = PrettyTable([name, "Size ml", "$$", "Sale"])
92 for item in items:
93 t.add_row(item[:4])
94 return str(t)
95
96 result += "{0} - {1}{2}\n".format(self.company, self.name, " [[SALE]]" if self.sale else "")
97 result += "{0} {1} {2} from {3}\n".format(self.abv, self.type.capitalize(), self.category.capitalize(), self.country.capitalize())
98
99 if self.cans: result += print_prices_table("Cans", self.cans) + "\n"
100 if self.bottles: result += print_prices_table("Bottles", self.bottles) + "\n"
101 if self.kegs: result += print_prices_table("Kegs", self.kegs) + "\n"
102
103 return result
104
105
106class BeerDatabase:
107
108 def __init__ (self, basic="basic.bdb", advanced="advanced.bdb"):
109 self.advanced = None # Detailed beer database
110 self.advanced_updated = None # Last time advanced database was synchronised
111 self.basic = None # Basic beer database
112 self.basic_updated = None # Last time basic database was synchronised
113 self.max_time_delta = timedelta(hours=24)
114
115 self.basic_file = basic
116 self.advanced_file = advanced
117 basic_d, basic_u = self.loadBeerData(basic)
118 if basic_d is not None and basic_u is not None:
119 self.basic = basic_d
120 self.basic_updated = basic_u
121 advanced_d, advanced_u = self.loadBeerData(advanced)
122 if advanced_d is not None and advanced_u is not None:
123 self.advanced = advanced_d
124 self.advanced_updated = advanced_u
125
126 def saveBeerData(self, basic=True, advanced=True):
127 if basic:
128 with open(self.basic_file, "wb") as fout:
129 pickle.dump({"beers": self.basic, "updated": self.basic_updated}, fout)
130 if advanced:
131 with open(self.advanced_file, "wb") as fout:
132 pickle.dump({"beers": self.advanced, "updated": self.advanced_updated}, fout)
133
134
135 @staticmethod
136 def loadBeerData(savefile):
137 beers = None
138 updated = None
139 if not os.path.exists(savefile):
140 print("Beer database {0} not found".format(savefile))
141 else:
142 try:
143 with open(savefile, "rb") as fin:
144 stored = pickle.load(fin)
145 beers = stored["beers"]
146 updated = stored["updated"]
147 except Exception as ex: print("Error loading beer database")
148 return beers, updated
149
150 def getBasicBeerList(self):
151 if None in [self.basic, self.basic_updated] or self.basic_updated + self.max_time_delta < datetime.now():
152 self.syncBasicBeerList()
153 return self.basic
154
155 def syncBasicBeerList(self, save=True):
156 print("Synchronishing Basic Beer List... This could take a few moments")
157 self.basic = []
158 page_num = 0
159 while True:
160 # Grab a page of results from the api
161 page = requests.post(url="https://www.thebeerstore.ca/wp-admin/admin-ajax.php", headers=turbo_headers,
162 data = { 'action': "beer_ajax_load_more", 'page': page_num, 'is_new': 0, 'query': "" }).text
163 #Stop if we reached the end of pages
164 if len(page) == 0: break
165 page_num += 1
166 soup = BeautifulSoup(page, features="html.parser")
167 for item in soup.find_all('div', {"class": "result_box"}):
168 name_div = item.find('div', {"class": "bottom_res_cont"})
169 self.basic.append({
170 "address": item.find('a', {"class": "result_box_link"})["href"],
171 "company": name_div.find("p").text,
172 "name": name_div.find("h4").text,
173 "on_sale": item.find('span', {"class": "beer_badge_sale"}) is not None
174 })
175 print("Basic Beer list synchronised")
176 self.basic_updated = datetime.now()
177 if save:
178 self.saveBeerData(basic=True, advanced=False)
179
180 def printBasicBeerList(self):
181 beerlist = self.getBasicBeerList()
182 t = PrettyTable(["#", "Beer", "Company", "Sale", "beerid"])
183 current = 1
184 for beer in beerlist:
185 t.add_row([current, beer["name"], beer["company"], "SALE" if beer["on_sale"] else "", beerid_regex.search(b["address"]).group(1)])
186 current += 1
187 print(t)
188
189 def getAdvancedBeerList(self):
190 if None in [self.basic, self.advanced_updated] or self.advanced_updated + self.max_time_delta < datetime.now():
191 self.syncAdvancedBeerList()
192 return self.advanced
193
194 def syncAdvancedBeerList(self, numthreads=4, save=True):
195 basiclist = self.getBasicBeerList()
196 print("Synchronishing Advanced Beer List... This could take a few minutes")
197 def process(basic_beers): return [Beer(beerid_regex.search(b["address"]).group(1)) for b in basic_beers]
198 # If single threaded dont bother with multithreading
199 if numthreads < 2:
200 return process(basiclist)
201
202 from multiprocessing.pool import ThreadPool
203 from itertools import zip_longest
204 import math
205 tpool = ThreadPool(processes=numthreads)
206 chunksize = int(math.ceil(len(basiclist) / numthreads))
207 chunked = list(basiclist[i:min(len(basiclist)-1, i + chunksize)] for i in range(0, len(basiclist), chunksize))
208 thread_jobs = [tpool.apply_async(process, (item,)) for item in chunked]
209 self.advanced = [item for result in thread_jobs for item in result.get()]
210 print("Advanced Beer list synchronised")
211 self.advanced_updated = datetime.now()
212 if save:
213 self.saveBeerData(basic=False, advanced=True)
214
215
216 def printAdvancedBeerList(self, sorting="name"):
217 beerlist = self.getAdvancedBeerList()
218 t = PrettyTable(["#", "Beer", "CAN", "BTL", "ABV", "Company", "Sale", "Type", "Category", "Country", "beerid"])
219 current = 1
220 if sorting == "name":
221 beerlist = sorted(beerlist, key=lambda beer: beer.name)
222 else: return False
223 for item in beerlist:
224 t.add_row([current, item.name, this_or_given(item.cheapest_cans,""), this_or_given(item.cheapest_bottles,""), item.abv, item.company, "SALE" if item.sale else "", item.type, item.category, item.country, item.key])
225 current += 1
226 print(t)
227 return True
228
229
230
231
232if __name__ == "__main__":
233 import sys
234
235 if len(sys.argv) == 1:
236 sys.argv.append("NOWORK")
237
238 # List the beers
239 if sys.argv[1] == "list":
240 if len(sys.argv) < 3:
241 sys.argv.append("name")
242 if not BeerDatabase().printAdvancedBeerList(sys.argv[2]):
243 print("No sorting method \"{0}\"".format(sys.argv[2]))
244 # Info command
245 elif sys.argv[1] == "info":
246 if len(sys.argv) < 3:
247 print("Invalid usage. No beer specified for information.\n beer.py info heineken")
248 try:
249 print(Beer(sys.argv[2]))
250 except Exception as ex:
251 print("Invalid usage. \n Error finding beer with name \"{0}\"".format(sys.argv[2]))
252 else:
253 print("Invalid usage. Modes are list and info.\n beer.py list\n beer.py info heiniken")