· 4 years ago · Nov 12, 2020, 02:04 PM
1import struct
2import json
3import typing
4import asyncio
5import time
6import requests
7import pyfgc
8import pyfgc_name
9import pyfgclog
10import pyfgc_rbac
11
12
13OAUTH_TOKEN = "xxx"
14
15async def acquire_log_polling(
16 fgc_sessions: typing.List[pyfgc.FgcAsyncSession],
17 devices: typing.List[str],
18 log_property: str,
19 total_duration: float,
20 acquisitions_duration: float = 10,
21 polling_delay: float = 5,
22 log_name: str = None,
23):
24
25 # Whole log for each device
26 concatenated_logs = {}
27
28 # Keeps track of all acquisition log objects (useful for debugging/validation)
29 all_logs = {}
30
31 acquired_duration = 0
32 while acquired_duration < total_duration:
33
34 # Acquire log
35 logs_get_time = time.time()
36 logs = [json.loads(log) for log in await acquire_log(fgc_sessions, devices, log_property, log_name, duration=acquisitions_duration * 1000)]
37 logs_get_time = time.time() - logs_get_time
38
39 # Wait for polling_delay - logs_get_time so that each acquisition is separated by `polling_delay` seconds
40 await asyncio.sleep(polling_delay - logs_get_time)
41
42 # Get logs duration (should be `acquisitions_duration` ms)
43 durations = [log['period'] * len(log['signals'][0]['samples']) for log in logs]
44
45 for i in range(len(devices)):
46 device = devices[i]
47 log = logs[i]
48
49 # If concatenated log was empty
50 if device not in concatenated_logs:
51 concatenated_logs[device] = log
52 all_logs[device] = [json.loads(json.dumps(log))]
53 acquired_duration = len(concatenated_logs[device]['signals'][0]['samples']) * concatenated_logs[device]['period']
54 continue
55
56 # Otherwise, get the previously concatenated log
57 concatenated_log = concatenated_logs[device]
58 all_logs[device].append(json.loads(json.dumps(log)))
59
60 # Otherwise concatenate logs
61 previous_first_sample_time = concatenated_log['firstSampleTime'] + concatenated_log['period'] * len(concatenated_log['signals'][0]['samples'])
62 skip_samples_count = round((previous_first_sample_time - log['firstSampleTime']) / log['period'])
63 if skip_samples_count < 0:
64 raise Exception('Acquisition took too long. Try reducing the polling cooldown')
65
66 # Merge log
67 for i in range(len(concatenated_log['signals'])):
68 new_samples = log['signals'][i]['samples'][skip_samples_count:]
69 concatenated_log['signals'][i]['samples'] += new_samples
70
71 # Update total acquired duration
72 acquired_duration = len(concatenated_log['signals'][0]['samples']) * concatenated_log['period']
73
74 # Return all log objects
75 return [concatenated_logs[device] for device in devices], [all_logs[device] for device in devices]
76
77async def main():
78
79 devices = ['RFNA.866.07.ETH1', 'RFNA.866.08.ETH1']
80 log_property = 'LOG.SPY.DATA[0]'
81 log_name = 'ACQ'
82
83 # Authenticate & create sessions
84 rbac_token = pyfgc_rbac.get_token_location()
85 fgc_sessions = await asyncio.gather(*[pyfgc.FgcAsyncSession.instance(device, 'async', rbac_token=rbac_token) for device in devices])
86
87 # Acquire logs
88 concatenated_logs, all_logs = await acquire_log_polling(
89 fgc_sessions,
90 devices,
91 log_property,
92 30,
93 10,
94 6
95 )
96
97 # Disconnect sessions
98 await asyncio.gather(*[session.disconnect() for session in fgc_sessions])
99
100 # Push everything to FL
101 for i in range(len(devices)):
102 concatenated_log = concatenated_logs[i]
103 all_log = all_logs[i]
104
105 # Create acquisition
106 acquisition_id = fortlogs_create_acquisition(OAUTH_TOKEN, comment='debug: automatic log polling and concatenation')
107
108 # Push concatenated log
109 for log in all_log + [concatenated_log]:
110 fortlogs_add_log(acquisition_id, 'analog', json.dumps(log), OAUTH_TOKEN)
111
112
113
114pyfgc_name.read_name_file()
115pyfgc_name.read_group_file()
116
117asyncio.new_event_loop().run_until_complete(main())
118