· 6 years ago · Mar 25, 2020, 04:26 PM
1import configparser
2import operator
3import os
4import webbrowser
5from datetime import datetime, timedelta
6from time import time
7
8import pushbullet
9import pyperclip
10import requests
11import win32api
12import win32con
13import win32gui
14from PIL import ImageGrab
15from playsound import playsound
16
17
18def Avg(lst):
19 return sum(lst) / len(lst)
20
21
22# noinspection PyShadowingNames
23def enum_cb(hwnd, results):
24 winlist.append((hwnd, win32gui.GetWindowText(hwnd)))
25
26
27def write(message, add_time=True, push=0, push_now=False):
28 if message:
29 if add_time:
30 m = datetime.now().strftime("%H:%M:%S") + ": " + str(message)
31 else:
32 m = message
33 print(m)
34
35 if push >= 3:
36 global note
37 if message:
38 note = note + m + "\n"
39 if push_now:
40 device.push_note("CSGO AUTO ACCEPT", note)
41 note = ""
42
43
44def click(x, y):
45 win32api.SetCursorPos((x, y))
46 win32api.mouse_event(win32con.MOUSEEVENTF_LEFTDOWN, x, y, 0, 0)
47 win32api.mouse_event(win32con.MOUSEEVENTF_LEFTUP, x, y, 0, 0)
48
49
50# noinspection PyShadowingNames,PyShadowingNames
51def relate_list(l_org, l1, l2=[], relate=operator.le):
52 if not l_org:
53 return False
54 truth_list, l3 = [], []
55 for i, val in enumerate(l1, start=0):
56 l3.append(relate(l_org[i], val))
57 truth_list.append(all(l3))
58 l3 = []
59 if l2:
60 for i, val in enumerate(l2, start=3):
61 l3.append(relate(l_org[i], val))
62 truth_list.append(all(l3))
63 return any(truth_list)
64
65
66# noinspection PyShadowingNames,PyShadowingNames
67def color_average(image, compare_list):
68 average = []
69 r, g, b = [], [], []
70 data = image.getdata()
71 for i in data:
72 r.append(i[0])
73 g.append(i[1])
74 b.append(i[2])
75
76 rgb = [Avg(r), Avg(g), Avg(b)] * int(len(compare_list) / 3)
77
78 for i, val in enumerate(compare_list, start=0):
79 average.append(val - rgb[i])
80 average = list(map(abs, average))
81
82 return average
83
84
85# noinspection PyShadowingNames,PyShadowingNames
86def getScreenShot(window_id, area=(0, 0, 0, 0)):
87 area = list(area)
88 scaled_area = [screen_width / 2560, screen_height / 1440]
89 scaled_area = 2 * scaled_area
90 for i, _ in enumerate(area[-2:], start=len(area) - 2):
91 area[i] += 1
92 for i, val in enumerate(area, start=0):
93 scaled_area[i] = scaled_area[i] * val
94 scaled_area = list(map(int, scaled_area))
95 win32gui.ShowWindow(window_id, win32con.SW_MAXIMIZE)
96 image = ImageGrab.grab(scaled_area)
97 return image
98
99
100# noinspection PyShadowingNames
101def getOldSharecodes(num=-1):
102 try:
103 last_game = open("last_game_"+accounts[current_account]["steam_id"]+".txt", "r")
104 games = last_game.readlines()
105 last_game.close()
106 except FileNotFoundError:
107 last_game = open("last_game_"+accounts[current_account]["steam_id"]+".txt", "w")
108 last_game.write(accounts[current_account]["match_token"] + "\n")
109 games = [accounts[current_account]["match_token"]]
110 last_game.close()
111 last_game = open("last_game_"+accounts[current_account]["steam_id"]+".txt", "w")
112 games = games[-200:]
113 for i, val in enumerate(games):
114 games[i] = "CSGO" + val.strip("\n").split("CSGO")[1]
115 last_game.write(games[i] + "\n")
116 last_game.close()
117 return games[num:]
118
119
120def getNewCSGOMatches(game_id):
121 sharecodes = []
122 next_code = game_id
123 last_game = open("last_game_"+accounts[current_account]["steam_id"]+".txt", "a")
124 while next_code != "n/a":
125 steam_url = "https://api.steampowered.com/ICSGOPlayers_730/GetNextMatchSharingCode/v1?key=" + steam_api_key + "&steamid=" + accounts[current_account]["steam_id"] + "&steamidkey=" + accounts[current_account]["auth_code"] + "&knowncode=" + game_id
126 try:
127 next_code = (requests.get(steam_url).json()["result"]["nextcode"])
128 except KeyError:
129 write("WRONG GAME_CODE, GAME_ID or STEAM_ID ")
130 return 0
131
132 if next_code:
133 if next_code != "n/a":
134 sharecodes.append(next_code)
135 game_id = next_code
136 last_game.write(next_code + "\n")
137 if sharecodes:
138 return sharecodes
139 else:
140 return [game_id]
141
142
143# noinspection PyShadowingNames
144def UpdateCSGOstats(sharecodes, num_completed=1):
145 completed_games, analyze_games = [], []
146 for val in sharecodes:
147 response = requests.post("https://csgostats.gg/match/upload/ajax", data={'sharecode': val, 'index': '1'})
148 if response.json()["status"] == "complete":
149 completed_games.append(response.json())
150 else:
151 analyze_games.append(response.json())
152 # TEST GAME:
153 # analyze_games = [{'status': 'complete', 'data': {'msg': 'Complete - <a href="/match/7584322">View</a>', 'index': '1', 'sharecode': 'CSGO-7NiMO-RPjvj-MZNWP-9cRdx-vzYUN', 'queue_id': 8081108, 'demo_id': 7584322, 'url': 'https://csgostats.gg/match/7584322'}, 'error': 0}]
154 output = [completed_games[num_completed * -1:], analyze_games]
155 for i in output:
156 for json_dict in i:
157 sharecode = json_dict["data"]["sharecode"]
158 game_url = json_dict["data"]["url"]
159 info = json_dict["data"]["msg"].split("<")[0].replace('-', '').rstrip(" ")
160 write('Sharecode: %s' % sharecode, add_time=False, push=push_urgency)
161 write("URL: %s" % game_url, add_time=False, push=push_urgency)
162 write("Status: %s." % info, add_time=False, push=push_urgency)
163 write(None, add_time=False, push=push_urgency, push_now=True)
164 pyperclip.copy(completed_games[-1]["data"]["url"])
165 return game_url
166
167
168def getHotKeys():
169 get_keys = [int(config.get("HotKeys", "Activate Script"), 16), int(config.get("HotKeys", "Activate Push Notification"), 16), int(config.get("HotKeys", "Get Info on newest Match"), 16),
170 int(config.get("HotKeys", "Get Info on multiple Matches"), 16), int(config.get("HotKeys", "Live Tab Key"), 16), int(config.get("HotKeys", "Switch accounts for csgostats.gg"), 16),
171 int(config.get("HotKeys", "End Script"), 16)]
172 return get_keys
173
174
175config = configparser.ConfigParser()
176config.read("config.ini")
177steam_api_key = config.get("csgostats.gg", "API Key")
178screenshot_interval = config.getint("Screenshot", "Interval")
179keys = getHotKeys()
180device = 0
181
182
183accounts, current_account = [], 0
184steam_ids = ","
185for i in config.sections():
186 if i.startswith("Account"):
187 # display_name = config.get(i, "Display Name")
188 steam_id = config.get(i, "Steam ID")
189 auth_code = config.get(i, "Authentication Code")
190 match_token = config.get(i, "Match Token")
191 steam_ids += steam_id + ","
192 accounts.append({"steam_id": steam_id, "auth_code": auth_code, "match_token": match_token})
193
194steam_ids = steam_ids.lstrip(",").rstrip(",")
195profiles = requests.get("http://api.steampowered.com/ISteamUser/GetPlayerSummaries/v0002/?key=" + steam_api_key + "&steamids=" + steam_ids).json()["response"]["players"]
196for i in profiles:
197 for n in accounts:
198 if n["steam_id"] == i["steamid"]:
199 n["name"] = i["personaname"]
200 break
201
202screen_width, screen_height = win32api.GetSystemMetrics(0), win32api.GetSystemMetrics(1)
203toplist, winlist = [], []
204hwnd = 0
205
206test_for_live_game, test_for_success, push_urgency, testing = False, False, False, False
207# accept_avg = []
208
209note = ""
210
211start_time = time()
212write("READY")
213write("current account is: %s" % accounts[current_account]["name"], add_time=False)
214print("\n")
215
216while True:
217 if win32api.GetAsyncKeyState(keys[0]) & 1: # F9 (ACTIVATE / DEACTIVATE SCRIPT)
218 test_for_live_game = not test_for_live_game
219 write("TESTING: %s" % test_for_live_game)
220 if test_for_live_game:
221 playsound('sounds/activated.mp3')
222 time_searching = time()
223 else:
224 playsound('sounds/deactivated.mp3')
225
226 if win32api.GetAsyncKeyState(keys[1]) & 1: # F8 (ACTIVATE / DEACTIVATE PUSH NOTIFICATION)
227 if not device:
228 PushBulletDeviceName = config.get('Pushbullet', 'Device Name')
229 PushBulletAPIKey = config.get('Pushbullet', 'API Key')
230 try:
231 device = pushbullet.PushBullet(PushBulletAPIKey).get_device(PushBulletDeviceName)
232 except pushbullet.errors.PushbulletError or pushbullet.errors.InvalidKeyError:
233 write("Pushbullet is wrongly configured.\nWrong API Key or DeviceName in config.ini")
234 if device:
235 push_urgency += 1
236 if push_urgency > 3:
237 push_urgency = 0
238 push_info = ["not active", "only if accepted", "all game status related information", "all information (game status/csgostats.gg information)"]
239 write("Pushing: %s" % push_info[push_urgency])
240
241 if win32api.GetAsyncKeyState(keys[2]) & 1: # F7 Key (UPLOAD NEWEST MATCH)
242 write("Uploading / Getting status on newest match")
243 pyperclip.copy(UpdateCSGOstats(getNewCSGOMatches(getOldSharecodes()[0])))
244
245 if win32api.GetAsyncKeyState(keys[3]) & 1: # F6 Key (GET INFO ON LAST X MATCHES)
246 last_x_matches = config.getint("csgostats.gg", "Number of Requests")
247 completed_matches = config.getint("csgostats.gg", "Completed Matches")
248 write("Getting Info from last %s matches" % last_x_matches)
249 # write("Outputting %s completed match[es]" % completed_matches, add_time=False)
250 getNewCSGOMatches(getOldSharecodes()[0])
251 UpdateCSGOstats(getOldSharecodes(num=last_x_matches * -1), num_completed=completed_matches)
252
253 if win32api.GetAsyncKeyState(keys[4]) & 1: # F13 Key (OPEN WEB BROWSER ON LIVE GAME TAB)
254 webbrowser.open_new_tab("https://csgostats.gg/player/" + accounts[current_account]["steam_id"] + "#/live")
255 write("new tab opened", add_time=False)
256
257 if win32api.GetAsyncKeyState(keys[5]) & 1: # F15 (SWITCH ACCOUNTS)
258 current_account += 1
259 if current_account > len(accounts)-1:
260 current_account = 0
261 print(current_account)
262 write("current account is: %s" % accounts[current_account]["name"], add_time=False)
263
264 if win32api.GetAsyncKeyState(keys[6]) & 1: # POS1/HOME Key
265 write("Exiting Script")
266 break
267
268 winlist = []
269 win32gui.EnumWindows(enum_cb, toplist)
270 csgo = [(hwnd, title) for hwnd, title in winlist if 'counter-strike: global offensive' in title.lower()]
271 if not csgo:
272 continue
273 hwnd = csgo[0][0]
274
275 # TESTING HERE
276 if win32api.GetAsyncKeyState(0) & 1: # F5, TEST CODE
277 print("\n")
278 write("Executing TestCode")
279 print("\n")
280 testing = not testing
281
282 if testing:
283 # start_time = time()
284 img = getScreenShot(hwnd, (2435, 65, 2555, 100))
285 not_searching_avg = color_average(img, [6, 10, 10])
286 searching_avg = color_average(img, [6, 163, 97, 4, 63, 35])
287 not_searching = relate_list(not_searching_avg, [2, 5, 5])
288 searching = relate_list(searching_avg, [2.7, 55, 35], l2=[1, 50, 35])
289 img = getScreenShot(hwnd, (467, 1409, 1300, 1417))
290 success_avg = color_average(img, [21, 123, 169])
291 success = relate_list(success_avg, [1, 8, 7])
292 # print("Took: %s " % str(timedelta(milliseconds=int(time()*1000 - start_time*1000))))
293 # TESTING ENDS HERE
294
295 if test_for_live_game:
296 if time() - start_time < screenshot_interval:
297 continue
298 start_time = time()
299 img = getScreenShot(hwnd, (1265, 760, 1295, 785))
300 if not img:
301 continue
302 accept_avg = color_average(img, [76, 176, 80, 90, 203, 95])
303
304 if relate_list(accept_avg, [1, 2, 1], l2=[1, 1, 2]):
305 write("Trying to Accept", push=push_urgency + 1)
306
307 test_for_success = True
308 test_for_live_game = False
309 accept_avg = []
310
311 for _ in range(5):
312 click(int(screen_width / 2), int(screen_height / 1.78))
313 # sleep(0.5)
314 # pass
315
316 write("Trying to catch a loading map")
317 playsound('sounds/accept_found.mp3')
318 start_time = time()
319
320 if test_for_success:
321 if time() - start_time < 40:
322 img = getScreenShot(hwnd, (2435, 65, 2555, 100))
323 not_searching_avg = color_average(img, [6, 10, 10])
324 searching_avg = color_average(img, [6, 163, 97, 4, 63, 35])
325
326 not_searching = relate_list(not_searching_avg, [2, 5, 5])
327 searching = relate_list(searching_avg, [2.7, 55, 35], l2=[1, 50, 35])
328
329 img = getScreenShot(hwnd, (467, 1409, 1300, 1417))
330 success_avg = color_average(img, [21, 123, 169])
331 success = relate_list(success_avg, [1, 8, 7])
332
333 if success:
334 write("Took %s since pressing accept." % str(timedelta(seconds=int(time() - start_time))), add_time=False, push=push_urgency + 1)
335 write("Took %s since trying to find a game." % str(timedelta(seconds=int(time() - time_searching))), add_time=False, push=push_urgency + 1)
336 write("Game should have started", push=push_urgency + 2, push_now=True)
337 test_for_success = False
338 playsound('sounds/done_testing.mp3')
339
340 if any([searching, not_searching]):
341 write("Took: %s " % str(timedelta(seconds=int(time() - start_time))), add_time=False, push=push_urgency + 1)
342 write("Game doesnt seem to have started. Continuing to search for accept Button!", push=push_urgency + 1, push_now=True)
343 playsound('sounds/back_to_testing.mp3')
344 test_for_success = False
345 test_for_live_game = True
346
347 else:
348 write("40 Seconds after accept, did not find loading map nor searching queue")
349 test_for_success = False
350 print(success_avg)
351 print(searching_avg)
352 print(not_searching_avg)
353 playsound('sounds/fail.mp3')
354 img.save(os.path.expanduser("~") + '\\Unknown Error.png')