· 6 years ago · Jan 06, 2020, 04:42 PM
1import trio
2import logging
3import sys
4import uuid
5import json
6import sqlite3.dbapi2 as sqlite3
7from typing import Optional
8
9
10class UCSAPIServer:
11
12 def __init__(self, addr: Optional[str] = "127.0.0.1", port: Optional[int] = 8081, buf: Optional[int] = 1024,
13 database: Optional[str] = "", logging_level: int = logging.INFO,
14 console_format: Optional[str] = "[%(levelname)s] %(asctime)s %(message)s",
15 datefmt: Optional[str] = "%d/%m/%Y %H:%M:%S %p"):
16 """Initializes self"""
17
18 self.addr = addr
19 self.port = port
20 self.buf = buf
21 self.logging_level = logging_level
22 self.console_format = console_format
23 self.datefmt = datefmt
24 self.database_path = database
25 self.database = None
26
27 ### API RESPONSE HANDLERS ###
28
29 async def malformed_request(self, session_id: uuid.uuid4, stream: trio.SocketStream):
30 json_response = bytes(json.dumps({"status": "failure", "error": "ERR_REQUEST_MALFORMED"}), "u8")
31 response_header = len(json_response).to_bytes(2, "big")
32 response_data = response_header + json_response
33 await self.send_response(stream, response_data, session_id)
34
35 async def invalid_json_request(self, session_id: uuid.uuid4, stream: trio.SocketStream):
36 json_response = bytes(json.dumps({"status": "failure", "error": "ERR_REQUEST_INVALID"}), "u8")
37 response_header = len(json_response).to_bytes(2, "big")
38 response_data = response_header + json_response
39 await self.send_response(stream, response_data, session_id)
40
41 async def missing_json_field(self, session_id: uuid.uuid4, stream: trio.SocketStream, missing_field: str):
42 missing_field = missing_field.upper().strip("'")
43 json_response = bytes(json.dumps({"status": "failure", "error": f"ERR_MISSING_{missing_field}_FIELD"}), "u8")
44 response_header = len(json_response).to_bytes(2, "big")
45 response_data = response_header + json_response
46 await self.send_response(stream, response_data, session_id)
47
48 async def server_busy(self, session_id: uuid.uuid4, stream: trio.SocketStream):
49 json_response = bytes(json.dumps({"status": "failure", "error": "SERVER_BUSY"}), "u8")
50 response_header = len(json_response).to_bytes(2, "big")
51 response_data = response_header + json_response
52 await self.send_response(stream, response_data, session_id)
53
54 ### END OF RESPONSE HANDLERS SECTION ###
55
56
57 async def send_response(self, stream: trio.SocketStream, response_data: bytes, session_id):
58 with trio.move_on_after(60) as cancel_scope:
59 try:
60 logging.debug(f"({session_id}) {{Response handler}} Sending response to client")
61 await stream.send_all(response_data)
62 except trio.BrokenResourceError:
63 logging.info(f"({session_id}) {{Response Handler}} The connection was closed")
64 await stream.aclose()
65 return False
66 except trio.ClosedResourceError:
67 logging.info(f"({session_id}) {{Response Handler}} The connection was closed")
68 await stream.aclose()
69 return False
70 if cancel_scope.cancelled_caught:
71 return None
72 else:
73 logging.debug(f"({session_id}) {{Response Handler}} Response sent")
74 return True
75
76 async def rebuild_incomplete_stream(self, session_id: uuid.uuid4, stream: trio.SocketStream, raw_data):
77 """
78 This function gets called when a stream's length is smaller than 2 bytes, that
79 is the minimum amount of data needed to parse an API call (The length header)
80 """
81
82 with trio.move_on_after(60) as cancel_scope:
83 while len(raw_data) < 2:
84 try:
85 logging.debug(f"({session_id}) {{Stream rebuilder}} Requesting 1 more byte")
86 raw_data += await stream.receive_some(1)
87 except trio.BrokenResourceError:
88 logging.info(f"({session_id}) {{Stream rebuilder}} The connection was closed")
89 await stream.aclose()
90 break
91 except trio.ClosedResourceError:
92 logging.info(f"({session_id}) {{Stream rebuilder}} The connection was closed")
93 await stream.aclose()
94 break
95 if cancel_scope.cancelled_caught:
96 return None
97 logging.debug(f"({session_id}) {{Stream rebuilder}} Stream is now 2 bytes long")
98 return raw_data
99
100 async def complete_stream(self, header, stream: trio.SocketStream, session_id: uuid.uuid4):
101 """
102 This functions completes the stream until the specified length is reached
103 """
104
105 stream_data = b""
106 with trio.move_on_after(60) as cancel_scope:
107 while len(stream_data) < header:
108 try:
109 logging.debug(
110 f"({session_id}) {{Stream completer}} Requesting {self.buf} more bytes until length {header}")
111 stream_data += await stream.receive_some(max_bytes=self.buf)
112 except trio.BrokenResourceError:
113 logging.info(f"({session_id}) {{Stream completer}} The connection was closed")
114 await stream.aclose()
115 break
116 except trio.ClosedResourceError:
117 logging.info(f"({session_id}) {{Stream completer}} The connection was closed")
118 await stream.aclose()
119 break
120 if not stream:
121 logging.error(f"({session_id}) {{Stream completer}} Stream has ended")
122 await stream.aclose()
123 break
124 if cancel_scope.cancelled_caught:
125 return None
126 return stream_data
127
128 def query_manager(self, query, *args):
129 database = sqlite3.connect(self.database_path)
130 cursor = database.cursor()
131 try:
132 executed = cursor.execute(query, args)
133 except sqlite3.OperationalError as sqlite3_error:
134 return None, sqlite3_error
135 return True, executed.fetchall()
136
137 async def check_license(self, session_id: uuid.uuid4, data: dict, stream: trio.SocketStream):
138 """
139 This functions checks whether a license-device_id couple is valid or not
140 """
141
142 status = "failure"
143 license_exists = "invalid"
144 authorized = False
145 error = None
146 end = False
147 try:
148 client_license = str(data["license"])
149 device_id = str(data["device_id"])
150 except KeyError as missing_field:
151 logging.error(
152 f"({session_id}) {{License checker}} Missing {missing_field} field in JSON check_license request")
153 await self.missing_json_field(session_id, stream, str(missing_field))
154 end = True
155 else:
156 logging.debug(f"({session_id}) {{License checker}} License is {client_license}")
157 logging.debug(f"({session_id}) {{License checker}} Device ID is {device_id}")
158 check = await trio.to_thread.run_sync(self.query_manager,
159 "SELECT authorized_devices FROM customers WHERE license = ?",
160 client_license)
161 if check[0] is None:
162 logging.error(
163 f"({session_id}) {{License checker}} Something went wrong while dealing with database: {check[1]}")
164 await self.server_busy(session_id, stream)
165 else:
166 query_result = check[1]
167 if not query_result:
168 error = "ERR_LICENSE_INVALID"
169 logging.debug(f"({session_id}) {{License checker}} License {client_license} does not exist")
170 else:
171 logging.debug(
172 f"({session_id}) {{License checker}} License {client_license} is valid, checking device ID")
173 authorized_devices = query_result[0][0].split("-")
174 if device_id in authorized_devices:
175 authorized = True
176 license_exists = "valid"
177 status = "success"
178 logging.debug(
179 f"({session_id}) {{License checker}} Device {device_id} is authorized, verification complete")
180 else:
181 error = "ERR_DEVICE_UNAUTHORIZED"
182 license_exists = "unchecked"
183 logging.warning(
184 f"({session_id}) {{License checker}} Device {device_id} isn't an authorized device")
185 if not end:
186 json_response = bytes(
187 json.dumps({"status": status, "license": license_exists, "authorized": authorized, "error": error}), "u8")
188 response_header = len(json_response).to_bytes(2, "big")
189 response_data = response_header + json_response
190 await self.send_response(stream, response_data, session_id)
191
192 async def ping_request(self, session_id: uuid.uuid4, stream: trio.SocketStream):
193 """Just a test function"""
194
195 json_response = bytes(json.dumps({"status": "success", "error": None}), "u8")
196 response_header = len(json_response).to_bytes(2, "big")
197 response_data = response_header + json_response
198 await self.send_response(stream, response_data, session_id)
199
200 async def parse_call(self, session_id: uuid.uuid4, json_request: bytes, stream: trio.SocketStream):
201 """This function parses the JSON API request and acts accordingly"""
202
203 try:
204 data = json.loads(json_request, encoding="utf-8")
205 except json.decoder.JSONDecodeError as json_error:
206 logging.error(f"({session_id}) {{API Parser}} Invalid JSON data, full exception -> {json_error}")
207 await self.malformed_request(session_id, stream)
208 else:
209 try:
210 request_type = data["request_type"]
211 except KeyError:
212 logging.error(f"({session_id}) {{API Parser}} Missing request_type field in JSON request")
213 await self.missing_json_field(session_id, stream, "request_type")
214 else:
215 if request_type == "ping":
216 logging.debug(f"({session_id}) {{API Parser}} Request Type: ping")
217 logging.debug(f"({session_id}) {{API Parser}} PONG!")
218 await self.ping_request(stream=stream, session_id=session_id)
219 elif request_type == "check_license":
220 logging.debug(f"({session_id}) {{API Parser}} Request Type: check_license")
221 await self.check_license(session_id, data, stream)
222 else:
223 logging.error(f" {{API Parser}} Invalid request_type: '{request_type}'")
224 await self.invalid_json_request(session_id, stream)
225
226 async def handle_client(self, stream: trio.SocketStream):
227 """
228 This function handles a single client connection:
229
230 - It assigns a unique ID to each client session
231 - It listens on the asynchronous socket and acts accordingly
232 e.g. incomplete streams or abrupt disconnection
233 - It handles timeouts if the client hangs for some reason
234 """
235
236 session_id = uuid.uuid4()
237 logging.info(f" {{Client handler}} New session started, UUID is {session_id}")
238 with trio.move_on_after(60) as cancel_scope:
239 while True:
240 try:
241 raw_data = await stream.receive_some(max_bytes=self.buf)
242 except trio.BrokenResourceError:
243 logging.info(f"({session_id}) {{Client handler}} The connection was closed")
244 await stream.aclose()
245 break
246 except trio.ClosedResourceError:
247 logging.info(f"({session_id}) {{Client handler}} The connection was closed")
248 await stream.aclose()
249 break
250 if not raw_data:
251 logging.error(f"({session_id}) {{Client handler}} Stream has ended")
252 await stream.aclose()
253 break
254 if len(raw_data) < 2:
255 logging.debug(f"({session_id}) {{Client handler}} Stream is shorter than 2 bytes, rebuilding")
256 stream_complete = await self.rebuild_incomplete_stream(session_id, stream, raw_data)
257 if stream_complete is None:
258 logging.error(f"({session_id}) {{Client handler}} The operation has timed out")
259 await stream.aclose()
260 break
261 else:
262 raw_data = stream_complete
263 header = int.from_bytes(raw_data[0:2], "big")
264 logging.debug(f"({session_id}) {{Client handler}} Expected stream length is {header}")
265 if len(raw_data) - 2 == header:
266 logging.debug(f"({session_id}) {{Client handler}} Stream completed, processing API call")
267 await self.parse_call(session_id, raw_data, stream)
268 else:
269 logging.debug(
270 f"({session_id}) {{Client handler}} Fragmented stream detected, rebuilding in progress")
271 actual_data = await self.complete_stream(header, stream, session_id)
272 if actual_data is None:
273 logging.error(f"({session_id}) {{Client Handler}} The operation has timed out")
274 await stream.aclose()
275 break
276 logging.debug(f"({session_id}) {{Client handler}} Stream completed, processing API call")
277 await self.parse_call(session_id, actual_data, stream)
278 else:
279 header = int.from_bytes(raw_data[0:2], "big")
280 logging.debug(f"({session_id}) {{Client handler}} Expected stream length is {header}")
281 if len(raw_data[2:]) == header:
282 logging.debug(f"({session_id}) {{Client handler}} Stream complete, processing API call")
283 await self.parse_call(session_id, raw_data[2:], stream)
284 else:
285 logging.debug(f"({session_id}) {{Client handler}} Fragmented stream detected, rebuilding")
286 stream_complete = await self.complete_stream(header - len(raw_data[2:]), stream, session_id)
287 if stream_complete is None:
288 logging.debug(f"({session_id}) {{Client handler}} The operation has timed out")
289 await stream.aclose()
290 break
291 else:
292 raw_data += stream_complete
293 await self.parse_call(session_id, raw_data[2:], stream)
294 if cancel_scope.cancelled_caught:
295 logging.error(f"({session_id}) {{Client handler}} The operation has timed out")
296
297 async def load_database(self):
298 """
299 This function loads the SQLite3 database on startup.
300 If the database file is not present, or if the customers table is
301 missing, a fresh database will be created and loaded.
302 """
303
304 try:
305 self.database = sqlite3.connect(self.database_path)
306 except sqlite3.DatabaseError as db_error:
307 logging.debug(f"{{API main}} Something went wrong while attempting to connect to the database: {db_error}")
308 sys.exit(db_error)
309 cursor = self.database.cursor()
310 try:
311 cursor.execute("SELECT * from customers")
312 except sqlite3.OperationalError as sqlite3_error:
313 if str(sqlite3_error) == "no such table: customers":
314 logging.debug("{API main} Database is empty, creating table 'customers'")
315 query = """CREATE TABLE customers(
316 customer_id INTEGER NULL PRIMARY KEY AUTOINCREMENT,
317 license TEXT NOT NULL UNIQUE,
318 authorized_devices TEXT NOT NULL
319 );"""
320 try:
321 cursor.execute(query)
322 self.database.commit()
323 except sqlite3.OperationalError as sqlite3_error:
324 logging.error(f"{{API main}} Error while dealing with database: {sqlite3_error}")
325 sys.exit(sqlite3_error)
326 else:
327 logging.debug("{API main} Database created")
328 cursor.close()
329 else:
330 logging.error(f"{{API main}} Error while dealing with database: {sqlite3_error}")
331 sys.exit(sqlite3_error)
332
333 async def serve_forever(self):
334 """
335 This function is the server's main loop, what it does is:
336 - Call the database loader function
337 - Starts to serve the asynchronous TCP socket
338 - Waits forever for clients to connect and redirect them to
339 the handle_client() function defined above
340 """
341
342 logging.basicConfig(datefmt=self.datefmt, format=self.console_format, level=self.logging_level)
343 logging.info(" {API main} UCS API Server is starting up")
344 logging.info(" {API main} Preparing to load SQLIte3 database")
345 await self.load_database()
346 logging.info(" {API main} Database loaded")
347 logging.info(f" {{API main}} Now serving at {self.addr} on port {self.port}")
348 logging.debug(f"{{API main}} The buffer is set to {self.buf} bytes, logging is set to {self.logging_level}")
349 try:
350 logging.debug(f"{{API main}} Opening an async listener at {self.addr}:{self.port}")
351 await trio.serve_tcp(self.handle_client, host=self.addr, port=self.port)
352 except KeyboardInterrupt:
353 logging.info(" {API main} Ctrl + C detected, exiting")
354 sys.exit(0)
355 except PermissionError as perms_error:
356 logging.error(f"{{API main}} Could not bind to chosen port, full error: {perms_error}")
357 sys.exit("PORT_UNAVAILABLE")
358 except OSError as os_error:
359 logging.error(f"{{API main}} An error occurred while preparing to serve: {os_error}")
360 sys.exit(os_error)
361
362
363if __name__ == "__main__":
364 server = UCSAPIServer(port=int(sys.argv[1]), database="database1.db", logging_level=logging.INFO)
365 trio.run(server.serve_forever)
366$