· 2 years ago · Aug 21, 2023, 06:25 PM
1# -*- coding: utf-8 -*-
2
3# Used to capture data from TRN, and works around the severe rate limiting by using a key pool
4# and some clever timing to get a full sample in a two hour window, with 40 members being done
5# in < 20 minutes
6
7# This script is resumable, and doesn't wait when rate limited, instead just exiting.
8# When the sample is complete, a semaphore file is written out so the calling shell
9# script knows it can continue processing
10
11# This script should be called from cron with these timings:
12# cron *must* be configured to use UTC!
13# https://unix.stackexchange.com/questions/710815/how-do-i-make-cron-use-utc
14# (The above method works on the rPi/rasbian)
15
16# 00:05
17# 01:05 --delay
18# 02:05 --delay --complete
19# 03:50 (Sample Hour 03)
20# 04:05
21# 05:05 --delay
22# 06:05 --delay --complete
23# 07:50 (Sample Hour 07)
24# 08:05
25# 09:05 --delay
26# 10:05 --delay --complete
27# 11:50 (Sample Hour 11)
28# 12:05
29# 13:05 --delay
30# 14:05 --delay --complete
31# 15:50 (Sample Hour 15)
32# 16:05
33# 17:05 --delay
34# 18:05 --delay --complete
35# 19:50 (Sample Hour 19)
36# 20:05
37# 21:05 --delay
38# 22:05 --delay --complete
39# 23:50 (Sample Hour 23 (EOD))
40
41# Python 3
42
43import sys
44import os
45import json
46import requests
47from datetime import datetime, timedelta
48import pytz
49import argparse
50import random
51import time
52import logging
53
54# Configuration
55
56script_dir = os.path.dirname(os.path.abspath(__file__))
57data_dir = os.path.join(script_dir, "data")
58rejected_dir = os.path.join(script_dir, "rejected")
59inactive_src = os.path.join(script_dir, "inactive.txt")
60players_src = os.path.join(script_dir, "players.json")
61complete_flag = os.path.join(data_dir,"capture.complete")
62
63api_keys = [
64 { 'secret': os.environ['TRN_SECRET'], 'name': 'Primary' },
65 { 'secret': os.environ['TRN_SECRET_LEGACY'], 'name': 'Legacy' }
66 ]
67trn_api_url = "https://public-api.tracker.gg/v2/division-2/standard/profile/psn/"
68local_timezone = 'Europe/Berlin'
69
70
71# Parameter parsing
72
73parser = argparse.ArgumentParser(
74 prog="TRN Fetch",
75 description="Captures JSON files of player stats from TRN.",
76 epilog="Requires the TRN_SECRET environment variable to be defined."
77)
78parser.add_argument('-v', '--verbose', help="Show detailed messages.", action="store_true")
79parser.add_argument('-u', '--complete', help="Force complete flag.", action="store_true")
80parser.add_argument('-q', '--quiet', help="Only log warnings and errors.", action="store_true")
81parser.add_argument('-d', '--delay', help="Start capture with a random delay of 1-3 minutes.", action='store_true')
82options = parser.parse_args()
83
84# Verbose mode overrides quiet
85if options.verbose:
86 options.quiet = False
87 logging.basicConfig(level=logging.DEBUG)
88else:
89 if options.quiet:
90 logging.basicConfig(level=logging.WARNING)
91 else:
92 logging.basicConfig(level=logging.INFO)
93
94if options.delay:
95 options.pause_seconds = 160
96else:
97 options.pause_seconds = 15
98
99# Define classes
100
101# Class to manage keys and rate limiting
102class KeyPool:
103
104 def __init__(self, use_cap: 10):
105
106 self.keys = []
107 self.key_count = 0
108 self.limited_keys = 0
109 self.next_key = 0
110 self.last_key = 0
111 self.use_cap = use_cap
112 self.available_calls = 0
113
114
115 def add_key(self, name, secret):
116
117 self.keys.append({ 'name': name, 'secret': secret, 'hour_used' : None, 'available' : self.use_cap, 'uses' : 0, 'errors' : 0 })
118 self.key_count = self.key_count+1
119 self.available_calls = self.available_calls + self.use_cap
120 logging.debug(f"Added key #{self.key_count-1}, available calls: {self.available_calls}")
121
122 def key_available(self):
123 return self.available_calls > 0
124
125
126 def key_limited(self):
127
128 logging.debug(f"Key {self.last_key} was limited. Reducing available calls.")
129 logging.debug(f"Calls available was: {self.available_calls}")
130 logging.debug(f"Calls attached to limited key is: {self.keys[self.last_key]['available']}")
131 self.available_calls = self.available_calls - self.keys[self.last_key]['available']
132 self.keys[self.last_key]['available']=0
133 logging.debug(f"Calls available now: {self.available_calls}")
134
135 if self.last_key == self.key_count - 1:
136 self.next_key = None
137 else:
138 self.next_key = self.last_key + 1
139
140 logging.debug(f"Next key to use is {self.next_key}")
141
142 def get_secret(self):
143
144 this_hour = datetime.utcnow().hour
145
146 if self.keys[self.next_key]['hour_used']!=this_hour:
147 self.keys[self.next_key]['hour_used']=this_hour
148 # remove any unused calls from the previous hour from the total available
149 self.available_calls = self.available_calls - self.keys[self.next_key]['available']
150 # reset the available uses on the key
151 self.keys[self.next_key]['available']=self.use_cap
152 # add the number of calls to the total available
153 self.available_calls = self.available_calls + self.keys[self.next_key]['available']
154
155 # this key will now be used, so reduce the count of available remaining calls
156 self.available_calls = self.available_calls - 1
157 self.keys[self.next_key]['available'] = self.keys[self.next_key]['available'] - 1
158 # increment how many times it was used in total
159 self.keys[self.next_key]['uses'] = self.keys[self.next_key]['uses'] + 1
160
161 self.last_key = self.next_key
162
163 if self.keys[self.next_key]['available'] == 0:
164
165 self.next_key = self.next_key + 1
166
167 if self.next_key > self.key_count - 1:
168 self.available_calls = 0
169 self.next_key = None
170
171 logging.debug(f"Using key #{self.last_key}, calls remaining: {self.available_calls}, next key is #{self.next_key}")
172
173 return self.keys[self.last_key]['secret']
174
175
176# Class used to ensure we don't go over the rate limit with TRN
177class Throttle:
178
179 def __init__(self, transactions, secs=1.0):
180 self.max_transactions = transactions
181 self.duration = secs
182 self.queue = []
183 self.tolerance = 1
184
185 def next(self):
186 waiting = True
187 while waiting:
188 self.queue = [recent for recent in self.queue if (recent+self.duration+self.tolerance) > time.monotonic()]
189 if len(self.queue) < self.max_transactions:
190 waiting = False
191 self.queue.append(time.monotonic())
192 else:
193 oldest = self.queue[0]
194 current = time.monotonic()
195 elapsed = current - oldest
196 pause = self.duration - elapsed + self.tolerance
197 time.sleep(pause)
198
199# Define functions
200
201def player_list_load(player_file_path):
202
203 players = []
204
205 with open(player_file_path, 'r') as f:
206 players = json.load(f)
207
208 return players
209
210
211
212def fetch_from_api(url, id, keys: KeyPool, limiter):
213
214 data = ""
215
216 secret = keys.get_secret()
217
218 trn_api_header = { 'TRN-Api-Key' : secret }
219
220 limiter.next()
221
222 fetch_url = f"{url}{id}"
223
224 valid = False
225
226 response = requests.get(
227 fetch_url,
228 headers = trn_api_header
229 )
230
231 try:
232 data = response.json()
233 except:
234 data = {'message' : 'invalid non JSON response received.'}
235
236 return data
237
238
239def touch(fname):
240 if os.path.exists(fname):
241 os.utime(fname, None)
242 else:
243 open(fname, 'a').close()
244
245### Start main program
246
247keys = KeyPool(use_cap=10)
248
249for k in api_keys:
250 keys.add_key(k['name'], k['secret'])
251
252
253start_utc = datetime.now(pytz.utc)
254start_local = start_utc.astimezone(pytz.timezone(local_timezone))
255
256
257if not options.quiet:
258 print("\nTRN Fetch\n")
259 print(f"Fetch run starting at {start_utc:%Y-%m-%d %H:%M:%S} UTC")
260 print(f" {start_local:%Y-%m-%d %H:%M:%S} Local\n")
261
262
263logging.debug(f"Delayed start: {options.delay}")
264logging.debug(f"Complete mode: {options.complete}")
265logging.debug(f" Verbose mode: {options.verbose}\n")
266logging.debug(f"")
267
268# before starting
269
270# Determine our "run hour", which represents the UTC hour of the sample bucket.
271# We sample at 03, 07, 11, 15, 19, 23
272# (Every four hours, revised from the old script, which was almost every 4.
273
274# Put an hour into a bucket
275hour_buckets = ['23', '23', '23',
276 '03', '03', '03', '03',
277 '07', '07', '07', '07',
278 '11', '11', '11', '11',
279 '15', '15', '15', '15',
280 '19', '19', '19', '19', '23']
281day_adjust = [-1, -1, -1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
282
283bucket_minute_cutoff = "44"
284
285run_hour = f"{start_utc:%H}"
286run_minute = f"{start_utc:%M}"
287
288hour_bucket = hour_buckets[int(run_hour)]
289
290logging.info(f"Run {run_hour}:{run_minute} for sample hour {hour_bucket}.")
291
292if run_hour == hour_bucket and run_minute < bucket_minute_cutoff:
293 logging.warning(f"Exiting. Too early to start for initial sample hour {hour_bucket}")
294 sys.exit()
295
296# Check if we need to run. If this is continuation run, we only want to do
297# the capture if we have files existing from a previous run
298
299skip_capture = False
300if run_hour != hour_bucket:
301 # If there are no json files in the data directory, we don't need to run
302 files=os.listdir(data_dir)
303 existing = next((f for f in files if f[-5:] == ".json"), None)
304 if existing is None:
305 skip_capture = True
306
307
308if skip_capture:
309 logging.info("Exiting. Previous run was completed.")
310 sys.exit()
311
312# We need to adjust the time stamp for the 23 hour sample that spills over to tomorrow
313adjust = day_adjust[int(run_hour)]
314
315timestamp_date = start_utc + timedelta(days=adjust)
316timestamp = f"{timestamp_date:%Y%m%d}_{hour_bucket}-"
317
318logging.info(f"Timestamp tag is '{timestamp}'")
319
320logging.debug(f"Loading player info from {players_src}")
321players = player_list_load(players_src)
322
323to_do = players.copy()
324cycle = 0
325
326limiter = Throttle(1, 22.0) # TRN has a 20 second rate limit per call
327api_calls = 0
328
329complete = False
330
331delay = random.randint(0,options.pause_seconds)+20
332
333logging.debug(f"Holding for {delay} seconds to stagger start of API calls...")
334
335time.sleep(delay)
336
337while len(to_do)>0 and keys.key_available():
338
339 successful = []
340 failed = []
341 incomplete = []
342 cycle = cycle + 1
343
344 if options.verbose:
345 print(f"\n> Cycle {cycle} - players to do: {len(to_do)}.\n")
346
347 random.shuffle(to_do)
348
349 for p in to_do:
350
351 player_name = p['name']
352 player_id = p['id']
353 player_data = f"{timestamp}{player_name}.json"
354 data_path = os.path.join(data_dir, player_data)
355
356
357 print(f"\n {p}")
358
359 if os.path.isfile(data_path):
360 if options.verbose:
361 print(f" Data exists for {player_id}. Not fetching.")
362 successful.append(p)
363 continue
364
365 if not keys.key_available():
366 if options.verbose:
367 print(f" No available key to fetch for {player_id}. Not fetching.")
368 incomplete.append(p)
369 continue
370
371 api_calls = api_calls + 1
372 if options.verbose:
373 print(f" API call #{api_calls}:")
374
375 data = fetch_from_api(trn_api_url, player_id, keys, limiter)
376
377 if "data" in data:
378 valid = True
379 else:
380 valid = False
381 if "message" in data:
382 message = data['message']
383 if message=='API rate limit exceeded':
384
385 keys.key_limited()
386
387 if options.verbose:
388 print(" Rate limit hit for key.")
389
390 else:
391 if options.verbose:
392 print(f" Non valid message: {message}")
393
394 print(f" data valid: {valid}")
395
396 if valid:
397
398 with open(data_path, 'w') as f:
399 json.dump(data, f, indent = 4)
400
401 successful.append(p)
402
403 else:
404
405 failed.append(p)
406
407 if options.verbose:
408 print(f"\n Successful data fetches: {len(successful)}")
409 print(f"\n Failed data fetches: {len(failed)}")
410 print(f"\n Incomplete data fetches: {len(incomplete)}")
411
412 to_do = failed.copy() + incomplete.copy()
413
414# If there's no members left to process OR this is a forced "complete" run
415# (i.e. a run before the start of the next sample bucket) we write out the complete flag
416
417if len(to_do)==0 or options.complete:
418
419 completed = True
420
421 touch(complete_flag)
422
423 if options.verbose:
424 print("\nCompleted.")
425
426else:
427
428 if options.verbose:
429 print("\nDone for now, but more needs to done next hour.")
430