· 4 years ago · Jun 14, 2021, 11:02 AM
1import asyncio
2import collections
3import struct
4
5from . import authenticator
6from ..extensions.messagepacker import MessagePacker
7from .mtprotoplainsender import MTProtoPlainSender
8from .requeststate import RequestState
9from .mtprotostate import MTProtoState
10from ..tl.tlobject import TLRequest
11from .. import helpers, utils
12from ..errors import (
13 BadMessageError, InvalidBufferError, SecurityError,
14 TypeNotFoundError, rpc_message_to_error
15)
16from ..extensions import BinaryReader
17from ..tl.core import RpcResult, MessageContainer, GzipPacked
18from ..tl.functions.auth import LogOutRequest
19from ..tl.functions import PingRequest, DestroySessionRequest
20from ..tl.types import (
21 MsgsAck, Pong, BadServerSalt, BadMsgNotification, FutureSalts,
22 MsgNewDetailedInfo, NewSessionCreated, MsgDetailedInfo, MsgsStateReq,
23 MsgsStateInfo, MsgsAllInfo, MsgResendReq, upload, DestroySessionOk, DestroySessionNone,
24)
25from ..crypto import AuthKey
26from ..helpers import retry_range
27
28
29class MTProtoSender:
30 """
31 MTProto Mobile Protocol sender
32 (https://core.telegram.org/mtproto/description).
33
34 This class is responsible for wrapping requests into `TLMessage`'s,
35 sending them over the network and receiving them in a safe manner.
36
37 Automatic reconnection due to temporary network issues is a concern
38 for this class as well, including retry of messages that could not
39 be sent successfully.
40
41 A new authorization key will be generated on connection if no other
42 key exists yet.
43 """
44 def __init__(self, auth_key, *, loggers,
45 retries=99999**99999, delay=1, auto_reconnect=True, connect_timeout=15,
46 auth_key_callback=None,
47 update_callback=None, auto_reconnect_callback=None):
48 self._connection = None
49 self._loggers = loggers
50 self._log = loggers[__name__]
51 self._retries = retries
52 self._delay = delay
53 self._auto_reconnect = auto_reconnect
54 self._connect_timeout = connect_timeout
55 self._auth_key_callback = auth_key_callback
56 self._update_callback = update_callback
57 self._auto_reconnect_callback = auto_reconnect_callback
58 self._connect_lock = asyncio.Lock()
59 self._ping = None
60
61 # Whether the user has explicitly connected or disconnected.
62 #
63 # If a disconnection happens for any other reason and it
64 # was *not* user action then the pending messages won't
65 # be cleared but on explicit user disconnection all the
66 # pending futures should be cancelled.
67 self._user_connected = False
68 self._reconnecting = False
69 self._disconnected = asyncio.get_event_loop().create_future()
70 self._disconnected.set_result(None)
71
72 # We need to join the loops upon disconnection
73 self._send_loop_handle = None
74 self._recv_loop_handle = None
75
76 # Preserving the references of the AuthKey and state is important
77 self.auth_key = auth_key or AuthKey(None)
78 self._state = MTProtoState(self.auth_key, loggers=self._loggers)
79
80 # Outgoing messages are put in a queue and sent in a batch.
81 # Note that here we're also storing their ``_RequestState``.
82 self._send_queue = MessagePacker(self._state, loggers=self._loggers)
83
84 # Sent states are remembered until a response is received.
85 self._pending_state = {}
86
87 # Responses must be acknowledged, and we can also batch these.
88 self._pending_ack = set()
89
90 # Similar to pending_messages but only for the last acknowledges.
91 # These can't go in pending_messages because no acknowledge for them
92 # is received, but we may still need to resend their state on bad salts.
93 self._last_acks = collections.deque(maxlen=10)
94
95 # Jump table from response ID to method that handles it
96 self._handlers = {
97 RpcResult.CONSTRUCTOR_ID: self._handle_rpc_result,
98 MessageContainer.CONSTRUCTOR_ID: self._handle_container,
99 GzipPacked.CONSTRUCTOR_ID: self._handle_gzip_packed,
100 Pong.CONSTRUCTOR_ID: self._handle_pong,
101 BadServerSalt.CONSTRUCTOR_ID: self._handle_bad_server_salt,
102 BadMsgNotification.CONSTRUCTOR_ID: self._handle_bad_notification,
103 MsgDetailedInfo.CONSTRUCTOR_ID: self._handle_detailed_info,
104 MsgNewDetailedInfo.CONSTRUCTOR_ID: self._handle_new_detailed_info,
105 NewSessionCreated.CONSTRUCTOR_ID: self._handle_new_session_created,
106 MsgsAck.CONSTRUCTOR_ID: self._handle_ack,
107 FutureSalts.CONSTRUCTOR_ID: self._handle_future_salts,
108 MsgsStateReq.CONSTRUCTOR_ID: self._handle_state_forgotten,
109 MsgResendReq.CONSTRUCTOR_ID: self._handle_state_forgotten,
110 MsgsAllInfo.CONSTRUCTOR_ID: self._handle_msg_all,
111 DestroySessionOk: self._handle_destroy_session,
112 DestroySessionNone: self._handle_destroy_session,
113 }
114
115 # Public API
116
117 async def connect(self, connection):
118 """
119 Connects to the specified given connection using the given auth key.
120 """
121 async with self._connect_lock:
122 if self._user_connected:
123 self._log.info('User is already connected!')
124 return False
125
126 self._connection = connection
127 await self._connect()
128 self._user_connected = True
129 return True
130
131 def is_connected(self):
132 return self._user_connected
133
134 def _transport_connected(self):
135 return (
136 not self._reconnecting
137 and self._connection is not None
138 and self._connection._connected
139 )
140
141 async def disconnect(self):
142 """
143 Cleanly disconnects the instance from the network, cancels
144 all pending requests, and closes the send and receive loops.
145 """
146 await self._disconnect()
147
148 def send(self, request, ordered=False):
149 """
150 This method enqueues the given request to be sent. Its send
151 state will be saved until a response arrives, and a ``Future``
152 that will be resolved when the response arrives will be returned:
153
154 .. code-block:: python
155
156 async def method():
157 # Sending (enqueued for the send loop)
158 future = sender.send(request)
159 # Receiving (waits for the receive loop to read the result)
160 result = await future
161
162 Designed like this because Telegram may send the response at
163 any point, and it can send other items while one waits for it.
164 Once the response for this future arrives, it is set with the
165 received result, quite similar to how a ``receive()`` call
166 would otherwise work.
167
168 Since the receiving part is "built in" the future, it's
169 impossible to await receive a result that was never sent.
170 """
171 if not self._user_connected:
172 raise ConnectionError('Cannot send requests while disconnected')
173
174 if not utils.is_list_like(request):
175 try:
176 state = RequestState(request)
177 except struct.error as e:
178 # "struct.error: required argument is not an integer" is not
179 # very helpful; log the request to find out what wasn't int.
180 self._log.error('Request caused struct.error: %s: %s', e, request)
181 raise
182
183 self._send_queue.append(state)
184 return state.future
185 else:
186 states = []
187 futures = []
188 state = None
189 for req in request:
190 try:
191 state = RequestState(req, after=ordered and state)
192 except struct.error as e:
193 self._log.error('Request caused struct.error: %s: %s', e, request)
194 raise
195
196 states.append(state)
197 futures.append(state.future)
198
199 self._send_queue.extend(states)
200 return futures
201
202 @property
203 def disconnected(self):
204 """
205 Future that resolves when the connection to Telegram
206 ends, either by user action or in the background.
207
208 Note that it may resolve in either a ``ConnectionError``
209 or any other unexpected error that could not be handled.
210 """
211 return asyncio.shield(self._disconnected)
212
213 # Private methods
214
215 async def _connect(self):
216 """
217 Performs the actual connection, retrying, generating the
218 authorization key if necessary, and starting the send and
219 receive loops.
220 """
221 self._log.info('Connecting to %s...', self._connection)
222
223 connected = False
224
225 while True:
226 attempt = 1
227 if not connected:
228 connected = await self._try_connect(attempt)
229 if not connected:
230 continue # skip auth key generation until we're connected
231
232 if not self.auth_key:
233 try:
234 if not await self._try_gen_auth_key(attempt):
235 continue # keep retrying until we have the auth key
236 except (IOError, asyncio.TimeoutError) as e:
237 # Sometimes, specially during user-DC migrations,
238 # Telegram may close the connection during auth_key
239 # generation. If that's the case, we will need to
240 # connect again.
241 self._log.warning('Connection error %d during auth_key gen: %s: %s',
242 attempt, type(e).__name__, e)
243
244 # Whatever the IOError was, make sure to disconnect so we can
245 # reconnect cleanly after.
246 await self._connection.disconnect()
247 connected = False
248 await asyncio.sleep(self._delay)
249 continue # next iteration we will try to reconnect
250
251 break # all steps done, break retry loop
252 else:
253 if not connected:
254 raise ConnectionError('Connection to Telegram failed {} time(s)'.format(self._retries))
255
256 e = ConnectionError('auth_key generation failed {} time(s)'.format(self._retries))
257 await self._disconnect(error=e)
258 raise e
259
260 loop = asyncio.get_event_loop()
261 self._log.debug('Starting send loop')
262 self._send_loop_handle = loop.create_task(self._send_loop())
263
264 self._log.debug('Starting receive loop')
265 self._recv_loop_handle = loop.create_task(self._recv_loop())
266
267 # _disconnected only completes after manual disconnection
268 # or errors after which the sender cannot continue such
269 # as failing to reconnect or any unexpected error.
270 if self._disconnected.done():
271 self._disconnected = loop.create_future()
272
273 self._log.info('Connection to %s complete!', self._connection)
274
275 async def _try_connect(self, attempt):
276 try:
277 self._log.debug('Connection attempt %d...', attempt)
278 await self._connection.connect(timeout=self._connect_timeout)
279 self._log.debug('Connection success!')
280 return True
281 except (IOError, asyncio.TimeoutError) as e:
282 self._log.warning('Attempt %d at connecting failed: %s: %s',
283 attempt, type(e).__name__, e)
284 await asyncio.sleep(self._delay)
285 return False
286
287 async def _try_gen_auth_key(self, attempt):
288 plain = MTProtoPlainSender(self._connection, loggers=self._loggers)
289 try:
290 self._log.debug('New auth_key attempt %d...', attempt)
291 self.auth_key.key, self._state.time_offset = \
292 await authenticator.do_authentication(plain)
293
294 # This is *EXTREMELY* important since we don't control
295 # external references to the authorization key, we must
296 # notify whenever we change it. This is crucial when we
297 # switch to different data centers.
298 if self._auth_key_callback:
299 self._auth_key_callback(self.auth_key)
300
301 self._log.debug('auth_key generation success!')
302 return True
303 except (SecurityError, AssertionError) as e:
304 self._log.warning('Attempt %d at new auth_key failed: %s', attempt, e)
305 await asyncio.sleep(self._delay)
306 return False
307
308 async def _disconnect(self, error=None):
309 if self._connection is None:
310 self._log.info('Not disconnecting (already have no connection)')
311 return
312
313 self._log.info('Disconnecting from %s...', self._connection)
314 self._user_connected = False
315 try:
316 self._log.debug('Closing current connection...')
317 await self._connection.disconnect()
318 finally:
319 self._log.debug('Cancelling %d pending message(s)...', len(self._pending_state))
320 for state in self._pending_state.values():
321 if error and not state.future.done():
322 state.future.set_exception(error)
323 else:
324 state.future.cancel()
325
326 self._pending_state.clear()
327 await helpers._cancel(
328 self._log,
329 send_loop_handle=self._send_loop_handle,
330 recv_loop_handle=self._recv_loop_handle
331 )
332
333 self._log.info('Disconnection from %s complete!', self._connection)
334 self._connection = None
335
336 if self._disconnected and not self._disconnected.done():
337 if error:
338 self._disconnected.set_exception(error)
339 else:
340 self._disconnected.set_result(None)
341
342 async def _reconnect(self, last_error):
343 """
344 Cleanly disconnects and then reconnects.
345 """
346 self._log.info('Closing current connection to begin reconnect...')
347 await self._connection.disconnect()
348
349 await helpers._cancel(
350 self._log,
351 send_loop_handle=self._send_loop_handle,
352 recv_loop_handle=self._recv_loop_handle
353 )
354
355 # TODO See comment in `_start_reconnect`
356 # Perhaps this should be the last thing to do?
357 # But _connect() creates tasks which may run and,
358 # if they see that reconnecting is True, they will end.
359 # Perhaps that task creation should not belong in connect?
360 self._reconnecting = False
361
362 # Start with a clean state (and thus session ID) to avoid old msgs
363 self._state.reset()
364
365 retries = self._retries if self._auto_reconnect else 0
366
367 attempt = 0
368 ok = True
369 # We're already "retrying" to connect, so we don't want to force retries
370 while True:
371 attempt = 1
372 try:
373 await self._connect()
374 except (IOError, asyncio.TimeoutError) as e:
375 last_error = e
376 self._log.info('Failed reconnection attempt %d with %s',
377 attempt, e.__class__.__name__)
378 await asyncio.sleep(self._delay)
379 except BufferError as e:
380 # TODO there should probably only be one place to except all these errors
381 if isinstance(e, InvalidBufferError) and e.code == 404:
382 self._log.info('Broken authorization key; resetting')
383 self.auth_key.key = None
384 if self._auth_key_callback:
385 self._auth_key_callback(None)
386
387 ok = False
388 break
389 else:
390 self._log.warning('Invalid buffer %s', e)
391
392 except Exception as e:
393 last_error = e
394 self._log.exception('Unexpected exception reconnecting on '
395 'attempt %d', attempt)
396
397 await asyncio.sleep(self._delay)
398 else:
399 self._send_queue.extend(self._pending_state.values())
400 self._pending_state.clear()
401
402 if self._auto_reconnect_callback:
403 asyncio.get_event_loop().create_task(self._auto_reconnect_callback())
404
405 break
406 else:
407 ok = False
408
409 if not ok:
410 self._log.error('Automatic reconnection failed %d time(s)', attempt)
411 # There may be no error (e.g. automatic reconnection was turned off).
412 error = last_error.with_traceback(None) if last_error else None
413 await self._disconnect(error=error)
414
415 def _start_reconnect(self, error):
416 """Starts a reconnection in the background."""
417 if self._user_connected and not self._reconnecting:
418 # We set reconnecting to True here and not inside the new task
419 # because it may happen that send/recv loop calls this again
420 # while the new task hasn't had a chance to run yet. This race
421 # condition puts `self.connection` in a bad state with two calls
422 # to its `connect` without disconnecting, so it creates a second
423 # receive loop. There can't be two tasks receiving data from
424 # the reader, since that causes an error, and the library just
425 # gets stuck.
426 # TODO It still gets stuck? Investigate where and why.
427 self._reconnecting = True
428 asyncio.get_event_loop().create_task(self._reconnect(error))
429
430 def _keepalive_ping(self, rnd_id):
431 """
432 Send a keep-alive ping. If a pong for the last ping was not received
433 yet, this means we're probably not connected.
434 """
435 # TODO this is ugly, update loop shouldn't worry about this, sender should
436 if self._ping is None:
437 self._ping = rnd_id
438 self.send(PingRequest(rnd_id))
439 else:
440 self._start_reconnect(None)
441
442 # Loops
443
444 async def _send_loop(self):
445 """
446 This loop is responsible for popping items off the send
447 queue, encrypting them, and sending them over the network.
448
449 Besides `connect`, only this method ever sends data.
450 """
451 while self._user_connected and not self._reconnecting:
452 if self._pending_ack:
453 ack = RequestState(MsgsAck(list(self._pending_ack)))
454 self._send_queue.append(ack)
455 self._last_acks.append(ack)
456 self._pending_ack.clear()
457
458 self._log.debug('Waiting for messages to send...')
459 # TODO Wait for the connection send queue to be empty?
460 # This means that while it's not empty we can wait for
461 # more messages to be added to the send queue.
462 batch, data = await self._send_queue.get()
463
464 if not data:
465 continue
466
467 self._log.debug('Encrypting %d message(s) in %d bytes for sending',
468 len(batch), len(data))
469
470 data = self._state.encrypt_message_data(data)
471
472 # Whether sending succeeds or not, the popped requests are now
473 # pending because they're removed from the queue. If a reconnect
474 # occurs, they will be removed from pending state and re-enqueued
475 # so even if the network fails they won't be lost. If they were
476 # never re-enqueued, the future waiting for a response "locks".
477 for state in batch:
478 if not isinstance(state, list):
479 if isinstance(state.request, TLRequest):
480 self._pending_state[state.msg_id] = state
481 else:
482 for s in state:
483 if isinstance(s.request, TLRequest):
484 self._pending_state[s.msg_id] = s
485
486 try:
487 await self._connection.send(data)
488 except IOError as e:
489 self._log.info('Connection closed while sending data')
490 self._start_reconnect(e)
491 return
492
493 self._log.debug('Encrypted messages put in a queue to be sent')
494
495 async def _recv_loop(self):
496 """
497 This loop is responsible for reading all incoming responses
498 from the network, decrypting and handling or dispatching them.
499
500 Besides `connect`, only this method ever receives data.
501 """
502 while self._user_connected and not self._reconnecting:
503 self._log.debug('Receiving items from the network...')
504 try:
505 body = await self._connection.recv()
506 except IOError as e:
507 self._log.info('Connection closed while receiving data')
508 self._start_reconnect(e)
509 return
510
511 try:
512 message = self._state.decrypt_message_data(body)
513 except TypeNotFoundError as e:
514 # Received object which we don't know how to deserialize
515 self._log.info('Type %08x not found, remaining data %r',
516 e.invalid_constructor_id, e.remaining)
517 continue
518 except SecurityError as e:
519 # A step while decoding had the incorrect data. This message
520 # should not be considered safe and it should be ignored.
521 self._log.warning('Security error while unpacking a '
522 'received message: %s', e)
523 continue
524 except BufferError as e:
525 if isinstance(e, InvalidBufferError) and e.code == 404:
526 self._log.info('Broken authorization key; resetting')
527 self.auth_key.key = None
528 if self._auth_key_callback:
529 self._auth_key_callback(None)
530
531 await self._disconnect(error=e)
532 else:
533 self._log.warning('Invalid buffer %s', e)
534 self._start_reconnect(e)
535 return
536 except Exception as e:
537 self._log.exception('Unhandled error while receiving data')
538 self._start_reconnect(e)
539 return
540
541 try:
542 await self._process_message(message)
543 except Exception:
544 self._log.exception('Unhandled error while processing msgs')
545
546 # Response Handlers
547
548 async def _process_message(self, message):
549 """
550 Adds the given message to the list of messages that must be
551 acknowledged and dispatches control to different ``_handle_*``
552 method based on its type.
553 """
554 self._pending_ack.add(message.msg_id)
555 handler = self._handlers.get(message.obj.CONSTRUCTOR_ID,
556 self._handle_update)
557 await handler(message)
558
559 def _pop_states(self, msg_id):
560 """
561 Pops the states known to match the given ID from pending messages.
562
563 This method should be used when the response isn't specific.
564 """
565 state = self._pending_state.pop(msg_id, None)
566 if state:
567 return [state]
568
569 to_pop = []
570 for state in self._pending_state.values():
571 if state.container_id == msg_id:
572 to_pop.append(state.msg_id)
573
574 if to_pop:
575 return [self._pending_state.pop(x) for x in to_pop]
576
577 for ack in self._last_acks:
578 if ack.msg_id == msg_id:
579 return [ack]
580
581 return []
582
583 async def _handle_rpc_result(self, message):
584 """
585 Handles the result for Remote Procedure Calls:
586
587 rpc_result#f35c6d01 req_msg_id:long result:bytes = RpcResult;
588
589 This is where the future results for sent requests are set.
590 """
591 rpc_result = message.obj
592 state = self._pending_state.pop(rpc_result.req_msg_id, None)
593 self._log.debug('Handling RPC result for message %d',
594 rpc_result.req_msg_id)
595
596 if not state:
597 # TODO We should not get responses to things we never sent
598 # However receiving a File() with empty bytes is "common".
599 # See #658, #759 and #958. They seem to happen in a container
600 # which contain the real response right after.
601 try:
602 with BinaryReader(rpc_result.body) as reader:
603 if not isinstance(reader.tgread_object(), upload.File):
604 raise ValueError('Not an upload.File')
605 except (TypeNotFoundError, ValueError):
606 self._log.info('Received response without parent request: %s', rpc_result.body)
607 return
608
609 if rpc_result.error:
610 error = rpc_message_to_error(rpc_result.error, state.request)
611 self._send_queue.append(
612 RequestState(MsgsAck([state.msg_id])))
613
614 if not state.future.cancelled():
615 state.future.set_exception(error)
616 else:
617 try:
618 with BinaryReader(rpc_result.body) as reader:
619 result = state.request.read_result(reader)
620 except Exception as e:
621 # e.g. TypeNotFoundError, should be propagated to caller
622 if not state.future.cancelled():
623 state.future.set_exception(e)
624 else:
625 if not state.future.cancelled():
626 state.future.set_result(result)
627
628 async def _handle_container(self, message):
629 """
630 Processes the inner messages of a container with many of them:
631
632 msg_container#73f1f8dc messages:vector<%Message> = MessageContainer;
633 """
634 self._log.debug('Handling container')
635 for inner_message in message.obj.messages:
636 await self._process_message(inner_message)
637
638 async def _handle_gzip_packed(self, message):
639 """
640 Unpacks the data from a gzipped object and processes it:
641
642 gzip_packed#3072cfa1 packed_data:bytes = Object;
643 """
644 self._log.debug('Handling gzipped data')
645 with BinaryReader(message.obj.data) as reader:
646 message.obj = reader.tgread_object()
647 await self._process_message(message)
648
649 async def _handle_update(self, message):
650 try:
651 assert message.obj.SUBCLASS_OF_ID == 0x8af52aac # crc32(b'Updates')
652 except AssertionError:
653 self._log.warning('Note: %s is not an update, not dispatching it %s', message.obj)
654 return
655
656 self._log.debug('Handling update %s', message.obj.__class__.__name__)
657 if self._update_callback:
658 self._update_callback(message.obj)
659
660 async def _handle_pong(self, message):
661 """
662 Handles pong results, which don't come inside a ``rpc_result``
663 but are still sent through a request:
664
665 pong#347773c5 msg_id:long ping_id:long = Pong;
666 """
667 pong = message.obj
668 self._log.debug('Handling pong for message %d', pong.msg_id)
669 if self._ping == pong.ping_id:
670 self._ping = None
671
672 state = self._pending_state.pop(pong.msg_id, None)
673 if state:
674 state.future.set_result(pong)
675
676 async def _handle_bad_server_salt(self, message):
677 """
678 Corrects the currently used server salt to use the right value
679 before enqueuing the rejected message to be re-sent:
680
681 bad_server_salt#edab447b bad_msg_id:long bad_msg_seqno:int
682 error_code:int new_server_salt:long = BadMsgNotification;
683 """
684 bad_salt = message.obj
685 self._log.debug('Handling bad salt for message %d', bad_salt.bad_msg_id)
686 self._state.salt = bad_salt.new_server_salt
687 states = self._pop_states(bad_salt.bad_msg_id)
688 self._send_queue.extend(states)
689
690 self._log.debug('%d message(s) will be resent', len(states))
691
692 async def _handle_bad_notification(self, message):
693 """
694 Adjusts the current state to be correct based on the
695 received bad message notification whenever possible:
696
697 bad_msg_notification#a7eff811 bad_msg_id:long bad_msg_seqno:int
698 error_code:int = BadMsgNotification;
699 """
700 bad_msg = message.obj
701 states = self._pop_states(bad_msg.bad_msg_id)
702
703 self._log.debug('Handling bad msg %s', bad_msg)
704 if bad_msg.error_code in (16, 17):
705 # Sent msg_id too low or too high (respectively).
706 # Use the current msg_id to determine the right time offset.
707 to = self._state.update_time_offset(
708 correct_msg_id=message.msg_id)
709 self._log.info('System clock is wrong, set time offset to %ds', to)
710 elif bad_msg.error_code == 32:
711 # msg_seqno too low, so just pump it up by some "large" amount
712 # TODO A better fix would be to start with a new fresh session ID
713 self._state._sequence += 64
714 elif bad_msg.error_code == 33:
715 # msg_seqno too high never seems to happen but just in case
716 self._state._sequence -= 16
717 else:
718 for state in states:
719 state.future.set_exception(
720 BadMessageError(state.request, bad_msg.error_code))
721 return
722
723 # Messages are to be re-sent once we've corrected the issue
724 self._send_queue.extend(states)
725 self._log.debug('%d messages will be resent due to bad msg',
726 len(states))
727
728 async def _handle_detailed_info(self, message):
729 """
730 Updates the current status with the received detailed information:
731
732 msg_detailed_info#276d3ec6 msg_id:long answer_msg_id:long
733 bytes:int status:int = MsgDetailedInfo;
734 """
735 # TODO https://goo.gl/VvpCC6
736 msg_id = message.obj.answer_msg_id
737 self._log.debug('Handling detailed info for message %d', msg_id)
738 self._pending_ack.add(msg_id)
739
740 async def _handle_new_detailed_info(self, message):
741 """
742 Updates the current status with the received detailed information:
743
744 msg_new_detailed_info#809db6df answer_msg_id:long
745 bytes:int status:int = MsgDetailedInfo;
746 """
747 # TODO https://goo.gl/G7DPsR
748 msg_id = message.obj.answer_msg_id
749 self._log.debug('Handling new detailed info for message %d', msg_id)
750 self._pending_ack.add(msg_id)
751
752 async def _handle_new_session_created(self, message):
753 """
754 Updates the current status with the received session information:
755
756 new_session_created#9ec20908 first_msg_id:long unique_id:long
757 server_salt:long = NewSession;
758 """
759 # TODO https://goo.gl/LMyN7A
760 self._log.debug('Handling new session created')
761 self._state.salt = message.obj.server_salt
762
763 async def _handle_ack(self, message):
764 """
765 Handles a server acknowledge about our messages. Normally
766 these can be ignored except in the case of ``auth.logOut``:
767
768 auth.logOut#5717da40 = Bool;
769
770 Telegram doesn't seem to send its result so we need to confirm
771 it manually. No other request is known to have this behaviour.
772
773 Since the ID of sent messages consisting of a container is
774 never returned (unless on a bad notification), this method
775 also removes containers messages when any of their inner
776 messages are acknowledged.
777 """
778 ack = message.obj
779 self._log.debug('Handling acknowledge for %s', str(ack.msg_ids))
780 for msg_id in ack.msg_ids:
781 state = self._pending_state.get(msg_id)
782 if state and isinstance(state.request, LogOutRequest):
783 del self._pending_state[msg_id]
784 if not state.future.cancelled():
785 state.future.set_result(True)
786
787 async def _handle_future_salts(self, message):
788 """
789 Handles future salt results, which don't come inside a
790 ``rpc_result`` but are still sent through a request:
791
792 future_salts#ae500895 req_msg_id:long now:int
793 salts:vector<future_salt> = FutureSalts;
794 """
795 # TODO save these salts and automatically adjust to the
796 # correct one whenever the salt in use expires.
797 self._log.debug('Handling future salts for message %d', message.msg_id)
798 state = self._pending_state.pop(message.msg_id, None)
799 if state:
800 state.future.set_result(message.obj)
801
802 async def _handle_state_forgotten(self, message):
803 """
804 Handles both :tl:`MsgsStateReq` and :tl:`MsgResendReq` by
805 enqueuing a :tl:`MsgsStateInfo` to be sent at a later point.
806 """
807 self._send_queue.append(RequestState(MsgsStateInfo(
808 req_msg_id=message.msg_id, info=chr(1) * len(message.obj.msg_ids)
809 )))
810
811 async def _handle_msg_all(self, message):
812 """
813 Handles :tl:`MsgsAllInfo` by doing nothing (yet).
814 """
815
816 async def _handle_destroy_session(self, message):
817 """
818 Handles both :tl:`DestroySessionOk` and :tl:`DestroySessionNone`.
819 It behaves pretty much like handling an RPC result.
820 """
821 for msg_id, state in self._pending_state.items():
822 if isinstance(state.request, DestroySessionRequest)\
823 and state.request.session_id == message.obj.session_id:
824 break
825 else:
826 return
827
828 del self._pending_state[msg_id]
829 if not state.future.cancelled():
830 state.future.set_result(message.obj)
831