· 6 years ago · Oct 21, 2019, 08:34 AM
1#! /usr/bin/env python3
2
3"""This script captures F1 2019 telemetry packets (sent over UDP) and stores them into SQLite3 database files.
4
5One database file will contain all packets from one session.
6
7From UDP packet to database entry
8---------------------------------
9
10The data flow of UDP packets into the database is managed by 2 threads.
11
12PacketReceiver thread:
13
14 (1) The PacketReceiver thread does a select() to wait on incoming packets in the UDP socket.
15 (2) When woken up with the notification that a UDP packet is available for reading, it is actually read from the socket.
16 (3) The receiver thread calls the recorder_thread.record_packet() method with a TimedPacket containing
17 the reception timestamp and the packet just read.
18 (4) The recorder_thread.record_packet() method locks its packet queue, inserts the packet there,
19 then unlocks the queue. Note that this method is only called from within the receiver thread!
20 (5) repeat from (1).
21
22PacketRecorder thread:
23
24 (1) The PacketRecorder thread sleeps for a given period, then wakes up.
25 (2) It locks its packet queue, moves the queue's packets to a local variable, empties the packet queue,
26 then unlocks the packet queue.
27 (3) The packets just moved out of the queue are passed to the 'process_incoming_packets' method.
28 (4) The 'process_incoming_packets' method inspects the packet headers, and converts the packet data
29 into SessionPacket instances that are suitable for inserting into the database.
30 In the process, it collects packets from the same session. After collecting all
31 available packets from the same session, it passed them on to the
32 'process_incoming_same_session_packets' method.
33 (5) The 'process_incoming_same_session_packets' method makes sure that the appropriate SQLite database file
34 is opened (i.e., the one with matching sessionUID), then writes the packets into the 'packets' table.
35
36By decoupling the packet capture and database writing in different threads, we minimize the risk of
37dropping UDP packets. This risk is real because SQLite3 database commits can take a considerable time.
38"""
39
40import argparse
41import sys
42import time
43import socket
44import sqlite3
45import threading
46import logging
47import ctypes
48import selectors
49
50from collections import namedtuple
51
52from .threading_utils import WaitConsoleThread, Barrier
53from ..packets import PacketHeader, PacketID, HeaderFieldsToPacketType, unpack_udp_packet
54
55# The type used by the PacketReceiverThread to represent incoming telemetry packets, with timestamp.
56TimestampedPacket = namedtuple('TimestampedPacket', 'timestamp, packet')
57
58# The type used by the PacketRecorderThread to represent incoming telemetry packets for storage in the SQLite3 database.
59SessionPacket = namedtuple('SessionPacket', 'timestamp, packetFormat, gameMajorVersion, gameMinorVersion, packetVersion, packetId, sessionUID, sessionTime, frameIdentifier, playerCarIndex, packet')
60
61
62class PacketRecorder:
63 """The PacketRecorder records incoming packets to SQLite3 database files.
64
65 A single SQLite3 file stores packets from a single session.
66 Whenever a new session starts, any open file is closed, and a new database file is created.
67 """
68
69 # The SQLite3 query that creates the 'packets' table in the database file.
70 _create_packets_table_query = """
71 CREATE TABLE packets (
72 pkt_id INTEGER PRIMARY KEY, -- Alias for SQLite3's 'rowid'.
73 timestamp REAL NOT NULL, -- The POSIX time right after capturing the telemetry packet.
74 packetFormat INTEGER NOT NULL, -- Header field: packet format.
75 gameMajorVersion INTEGER NOT NULL, -- Header field: game major version.
76 gameMinorVersion INTEGER NOT NULL, -- Header field: game minor version.
77 packetVersion INTEGER NOT NULL, -- Header field: packet version.
78 packetId INTEGER NOT NULL, -- Header field: packet type ('packetId' is a bit of a misnomer).
79 sessionUID CHAR(16) NOT NULL, -- Header field: unique session id as hex string.
80 sessionTime REAL NOT NULL, -- Header field: session time.
81 frameIdentifier INTEGER NOT NULL, -- Header field: frame identifier.
82 playerCarIndex INTEGER NOT NULL, -- Header field: player car index.
83 packet BLOB NOT NULL -- The packet itself
84 );
85 """
86
87 # The SQLite3 query that inserts packet data into the 'packets' table of an open database file.
88 _insert_packets_query = """
89 INSERT INTO packets(
90 timestamp,
91 packetFormat, gameMajorVersion, gameMinorVersion, packetVersion, packetId, sessionUID,
92 sessionTime, frameIdentifier, playerCarIndex,
93 packet) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);
94 """
95
96 def __init__(self):
97 self._conn = None
98 self._cursor = None
99 self._filename = None
100 self._sessionUID = None
101
102 def close(self):
103 """Make sure that no database remains open."""
104 if self._conn is not None:
105 self._close_database()
106
107 def _open_database(self, sessionUID: str):
108 """Open SQLite3 database file and make sure it has the correct schema."""
109 assert self._conn is None
110 filename = "F1_2019_{:s}.sqlite3".format(sessionUID)
111 logging.info("Opening file {!r}.".format(filename))
112 conn = sqlite3.connect(filename)
113 cursor = conn.cursor()
114
115 # Get rid of indentation and superfluous newlines in the 'CREATE TABLE' command.
116 query = "".join(line[8:] + "\n" for line in PacketRecorder._create_packets_table_query.split("\n")[1:-1])
117
118 # Try to execute the 'CREATE TABLE' statement. If it already exists, this will raise an exception.
119 try:
120 cursor.execute(query)
121 except sqlite3.OperationalError:
122 logging.info(" (Appending to existing file.)")
123 else:
124 logging.info(" (Created new file.)")
125
126 self._conn = conn
127 self._cursor = cursor
128 self._filename = filename
129 self._sessionUID = sessionUID
130
131 def _close_database(self):
132 """Close SQLite3 database file."""
133 assert self._conn is not None
134 logging.info("Closing file {!r}.".format(self._filename))
135 self._cursor.close()
136 self._cursor = None
137 self._conn.close()
138 self._conn = None
139 self._filename = None
140 self._sessionUID = None
141
142 def _insert_and_commit_same_session_packets(self, same_session_packets):
143 """Insert session packets to database and commit."""
144 assert self._conn is not None
145 self._cursor.executemany(PacketRecorder._insert_packets_query, same_session_packets)
146 self._conn.commit()
147
148 def _process_same_session_packets(self, same_session_packets):
149 """Insert packets from the same session into the 'packets' table of the appropriate database file.
150
151 Precondition: all packets in 'same_session_packets' are from the same session (identical 'sessionUID' field).
152
153 We need to handle four different cases:
154
155 (1) 'same_session_packets' is empty:
156
157 --> return (no-op).
158
159 (2) A database file is currently open, but it stores packets with a different session UID:
160
161 --> Close database;
162 --> Open database with correct session UID;
163 --> Insert 'same_session_packets'.
164
165 (3) No database file is currently open:
166
167 --> Open database with correct session UID;
168 --> Insert 'same_session_packets'.
169
170 (4) A database is currently open, with correct session UID:
171
172 --> Insert 'same_session_packets'.
173 """
174
175 if not same_session_packets:
176 # Nothing to insert.
177 return
178
179 if self._conn is not None and self._sessionUID != same_session_packets[0].sessionUID:
180 # Close database if it's recording a different session.
181 self._close_database()
182
183 if self._conn is None:
184 # Open database with the correct sessionID.
185 self._open_database(same_session_packets[0].sessionUID)
186
187 # Write packets.
188 self._insert_and_commit_same_session_packets(same_session_packets)
189
190 def process_incoming_packets(self, timestamped_packets):
191 """Process incoming packets by recording them into the correct database file.
192
193 The incoming 'timestamped_packets' is a list of timestamped raw UDP packets.
194
195 We process them to a variable 'same_session_packets', which is a list of consecutive
196 packets having the same 'sessionUID' field. In this list, each packet is a 11-element tuple
197 that can be inserted into the 'packets' table of the database.
198
199 The 'same_session_packets' are then passed on to the '_process_same_session_packets'
200 method that writes them into the appropriate database file.
201 """
202
203 t1 = time.monotonic()
204
205 # Invariant to be guaranteed: all packets in 'same_session_packets' have the same 'sessionUID' field.
206 same_session_packets = []
207
208 for (timestamp, packet) in timestamped_packets:
209
210 if len(packet) < ctypes.sizeof(PacketHeader):
211 logging.error("Dropped bad packet of size {} (too short).".format(len(packet)))
212 continue
213
214 header = PacketHeader.from_buffer_copy(packet)
215
216 packet_type_tuple = (header.packetFormat, header.packetVersion, header.packetId)
217
218 packet_type = HeaderFieldsToPacketType.get(packet_type_tuple)
219 if packet_type is None:
220 logging.error("Dropped unrecognized packet (format, version, id) = {!r}.".format(packet_type_tuple))
221 continue
222
223 if len(packet) != ctypes.sizeof(packet_type):
224 logging.error("Dropped packet with unexpected size; "
225 "(format, version, id) = {!r} packet, size = {}, expected {}.".format(
226 packet_type_tuple, len(packet), ctypes.sizeof(packet_type)))
227 continue
228
229 if header.packetId == PacketID.EVENT: # Log Event packets
230 event_packet = unpack_udp_packet(packet)
231 logging.info("Recording event packet: {}".format(event_packet.eventStringCode.decode()))
232
233 # NOTE: the sessionUID is not reliable at the start of a session (in F1 2018, need to check for F1 2019).
234 # See: http://forums.codemasters.com/discussion/138130/bug-f1-2018-pc-v1-0-4-udp-telemetry-bad-session-uid-in-first-few-packets-of-a-session
235
236 # Create an INSERT-able tuple for the data in this packet.
237 #
238 # Note that we convert the sessionUID to a 16-digit hex string here.
239 # SQLite3 can store 64-bit numbers, but only signed ones.
240 # To prevent any issues, we represent the sessionUID as a 16-digit hex string instead.
241
242 session_packet = SessionPacket(
243 timestamp,
244 header.packetFormat, header.gameMajorVersion, header.gameMinorVersion,
245 header.packetVersion, header.packetId, "{:016x}".format(header.sessionUID),
246 header.sessionTime, header.frameIdentifier, header.playerCarIndex,
247 packet
248 )
249
250 if len(same_session_packets) > 0 and same_session_packets[0].sessionUID != session_packet.sessionUID:
251 # Write 'same_session_packets' collected so far to the correct session database, then forget about them.
252 self._process_same_session_packets(same_session_packets)
253 same_session_packets.clear()
254
255 same_session_packets.append(session_packet)
256
257 # Write 'same_session_packets' to the correct session database, then forget about them.
258 # The 'same_session_packets.clear()' is not strictly necessary here, because 'same_session_packets' is about to
259 # go out of scope; but we make it explicit for clarity.
260
261 self._process_same_session_packets(same_session_packets)
262 same_session_packets.clear()
263
264 t2 = time.monotonic()
265
266 duration = (t2 - t1)
267
268 logging.info("Recorded {} packets in {:.3f} ms.".format(len(timestamped_packets), duration * 1000.0))
269
270 def no_packets_received(self, age: float) -> None:
271 """No packets were received for a considerable time. If a database file is open, close it."""
272 if self._conn is None:
273 logging.info("No packets to record for {:.3f} seconds.".format(age))
274 else:
275 logging.info("No packets to record for {:.3f} seconds; closing file due to inactivity.".format(age))
276 self._close_database()
277
278
279class PacketRecorderThread(threading.Thread):
280 """The PacketRecorderThread writes telemetry data to SQLite3 files."""
281
282 def __init__(self, record_interval):
283 super().__init__(name='recorder')
284 self._record_interval = record_interval
285 self._packets = []
286 self._packets_lock = threading.Lock()
287 self._socketpair = socket.socketpair()
288
289 def close(self):
290 for sock in self._socketpair:
291 sock.close()
292
293 def run(self):
294 """Receive incoming packets and hand them over the the PacketRecorder.
295
296 This method runs in its own thread.
297 """
298
299 selector = selectors.DefaultSelector()
300 key_socketpair = selector.register(self._socketpair[0], selectors.EVENT_READ)
301
302 recorder = PacketRecorder()
303
304 packets = []
305
306 logging.info("Recorder thread started.")
307
308 quitflag = False
309 inactivity_timer = time.time()
310 while not quitflag:
311
312 # Calculate the timeout value that will bring us in sync with the next period.
313 timeout = (-time.time()) % self._record_interval
314 # If the timeout interval is too short, increase its length by 1 period.
315 if timeout < 0.5 * self._record_interval:
316 timeout += self._record_interval
317
318 for (key, events) in selector.select(timeout):
319 if key == key_socketpair:
320 quitflag = True
321
322 # Swap packets, so the 'record_packet' method can be called uninhibited as soon as possible.
323 with self._packets_lock:
324 (packets, self._packets) = (self._packets, packets)
325
326 if len(packets) != 0:
327 inactivity_timer = packets[-1].timestamp
328 recorder.process_incoming_packets(packets)
329 packets.clear()
330 else:
331 t_now = time.time()
332 age = t_now - inactivity_timer
333 recorder.no_packets_received(age)
334 inactivity_timer = t_now
335
336 recorder.close()
337
338 selector.close()
339
340 logging.info("Recorder thread stopped.")
341
342 def request_quit(self):
343 """Request termination of the PacketRecorderThread.
344
345 Called from the main thread to request that we quit.
346 """
347 self._socketpair[1].send(b'\x00')
348
349 def record_packet(self, timestamped_packet):
350 """Called from the receiver thread for every UDP packet received."""
351 with self._packets_lock:
352 self._packets.append(timestamped_packet)
353
354
355class PacketReceiverThread(threading.Thread):
356 """The PacketReceiverThread receives incoming telemetry packets via the network and passes them to the PacketRecorderThread for storage."""
357
358 def __init__(self, udp_port, recorder_thread):
359 super().__init__(name='receiver')
360 self._udp_port = udp_port
361 self._recorder_thread = recorder_thread
362 self._socketpair = socket.socketpair()
363
364 def close(self):
365 for sock in self._socketpair:
366 sock.close()
367
368 def run(self):
369 """Receive incoming packets and hand them over to the PacketRecorderThread.
370
371 This method runs in its own thread.
372 """
373
374 udp_socket = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
375
376 # Allow multiple receiving endpoints.
377 if sys.platform in ['darwin']:
378 udp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
379 elif sys.platform in ['linux', 'win32']:
380 udp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
381
382 # Accept UDP packets from any host.
383 address = ('', self._udp_port)
384 udp_socket.bind(address)
385
386 selector = selectors.DefaultSelector()
387
388 key_udp_socket = selector.register(udp_socket, selectors.EVENT_READ)
389 key_socketpair = selector.register(self._socketpair[0], selectors.EVENT_READ)
390
391 logging.info("Receiver thread started, reading UDP packets from port {}.".format(self._udp_port))
392
393 quitflag = False
394 while not quitflag:
395 for (key, events) in selector.select():
396 timestamp = time.time()
397 if key == key_udp_socket:
398 # All telemetry UDP packets fit in 2048 bytes with room to spare.
399 packet = udp_socket.recv(2048)
400 timestamped_packet = TimestampedPacket(timestamp, packet)
401 self._recorder_thread.record_packet(timestamped_packet)
402 elif key == key_socketpair:
403 quitflag = True
404
405 selector.close()
406 udp_socket.close()
407 for sock in self._socketpair:
408 sock.close()
409
410 logging.info("Receiver thread stopped.")
411
412 def request_quit(self):
413 """Request termination of the PacketReceiverThread.
414
415 Called from the main thread to request that we quit.
416 """
417 self._socketpair[1].send(b'\x00')
418
419
420def main():
421 """Record incoming telemetry data until the user presses enter."""
422
423 # Configure logging.
424
425 logging.basicConfig(level=logging.DEBUG, format="%(asctime)-23s | %(threadName)-10s | %(levelname)-5s | %(message)s")
426 logging.Formatter.default_msec_format = '%s.%03d'
427
428 # Parse command line arguments.
429
430 parser = argparse.ArgumentParser(description="Record F1 2019 telemetry data to SQLite3 files.")
431
432 parser.add_argument("-p", "--port", default=20777, type=int, help="UDP port to listen to (default: 20777)", dest='port')
433 parser.add_argument("-i", "--interval", default=1.0, type=float, help="interval for writing incoming data to SQLite3 file, in seconds (default: 1.0)", dest='interval')
434
435 args = parser.parse_args()
436
437 # Start recorder thread first, then receiver thread.
438
439 quit_barrier = Barrier()
440
441 recorder_thread = PacketRecorderThread(args.interval)
442 recorder_thread.start()
443
444 receiver_thread = PacketReceiverThread(args.port, recorder_thread)
445 receiver_thread.start()
446
447 wait_console_thread = WaitConsoleThread(quit_barrier)
448 wait_console_thread.start()
449
450 # Recorder, receiver, and wait_console threads are now active. Run until we're asked to quit.
451
452 quit_barrier.wait()
453
454 # Stop threads.
455
456 wait_console_thread.request_quit()
457 wait_console_thread.join()
458 wait_console_thread.close()
459
460 receiver_thread.request_quit()
461 receiver_thread.join()
462 receiver_thread.close()
463
464 recorder_thread.request_quit()
465 recorder_thread.join()
466 recorder_thread.close()
467
468 # All done.
469
470 logging.info("All done.")
471
472
473if __name__ == "__main__":
474 main()