· 6 years ago · Apr 06, 2019, 03:16 PM
1import asyncio
2import socket
3import logging
4import struct
5import threading
6
7from . import opus
8from .backoff import ExponentialBackoff
9from .gateway import *
10from .errors import ClientException, ConnectionClosed
11from .player import AudioPlayer, AudioSource
12
13client = discord.Client()
14
15try:
16 import nacl.secret
17 has_nacl = True
18except ImportError:
19 has_nacl = False
20
21
22log = logging.getLogger(__name__)
23
24class VoiceClient:
25 """Represents a Discord voice connection.
26 You do not create these, you typically get them from
27 e.g. :meth:`VoiceChannel.connect`.
28 Warning
29 --------
30 In order to play audio, you must have loaded the opus library
31 through :func:`opus.load_opus`.
32 If you don't do this then the library will not be able to
33 transmit audio.
34 Attributes
35 -----------
36 session_id: :class:`str`
37 The voice connection session ID.
38 token: :class:`str`
39 The voice connection token.
40 endpoint: :class:`str`
41 The endpoint we are connecting to.
42 channel: :class:`abc.Connectable`
43 The voice channel connected to.
44 loop
45 The event loop that the voice client is running on.
46 """
47 def __init__(self, state, timeout, channel):
48 if not has_nacl:
49 raise RuntimeError("PyNaCl library needed in order to use voice")
50
51 self.channel = channel
52 self.main_ws = None
53 self.timeout = timeout
54 self.ws = None
55 self.socket = None
56 self.loop = state.loop
57 self._state = state
58 # this will be used in the AudioPlayer thread
59 self._connected = threading.Event()
60 self._handshake_complete = asyncio.Event(loop=self.loop)
61
62 self.mode = None
63 self._connections = 0
64 self.sequence = 0
65 self.timestamp = 0
66 self._runner = None
67 self._player = None
68 self.encoder = opus.Encoder()
69
70 warn_nacl = not has_nacl
71 supported_modes = (
72 'xsalsa20_poly1305_suffix',
73 'xsalsa20_poly1305',
74 )
75
76 @property
77 def guild(self):
78 """Optional[:class:`Guild`]: The guild we're connected to, if applicable."""
79 return getattr(self.channel, 'guild', None)
80
81 @property
82 def user(self):
83 """:class:`ClientUser`: The user connected to voice (i.e. ourselves)."""
84 return self._state.user
85
86 def checked_add(self, attr, value, limit):
87 val = getattr(self, attr)
88 if val + value > limit:
89 setattr(self, attr, 0)
90 else:
91 setattr(self, attr, val + value)
92
93 # connection related
94
95 async def start_handshake(self):
96 log.info('Starting voice handshake...')
97
98 guild_id, channel_id = self.channel._get_voice_state_pair()
99 state = self._state
100 self.main_ws = ws = state._get_websocket(guild_id)
101 self._connections += 1
102
103 # request joining
104 await ws.voice_state(guild_id, channel_id)
105
106 try:
107 await asyncio.wait_for(self._handshake_complete.wait(), timeout=self.timeout, loop=self.loop)
108 except asyncio.TimeoutError:
109 await self.terminate_handshake(remove=True)
110 raise
111
112 log.info('Voice handshake complete. Endpoint found %s (IP: %s)', self.endpoint, self.endpoint_ip)
113
114 async def terminate_handshake(self, *, remove=False):
115 guild_id, channel_id = self.channel._get_voice_state_pair()
116 self._handshake_complete.clear()
117 await self.main_ws.voice_state(guild_id, None, self_mute=True)
118
119 log.info('The voice handshake is being terminated for Channel ID %s (Guild ID %s)', channel_id, guild_id)
120 if remove:
121 log.info('The voice client has been removed for Channel ID %s (Guild ID %s)', channel_id, guild_id)
122 key_id, _ = self.channel._get_voice_client_key()
123 self._state._remove_voice_client(key_id)
124
125 async def _create_socket(self, server_id, data):
126 self._connected.clear()
127 self.session_id = self.main_ws.session_id
128 self.server_id = server_id
129 self.token = data.get('token')
130 endpoint = data.get('endpoint')
131
132 if endpoint is None or self.token is None:
133 log.warning('Awaiting endpoint... This requires waiting. ' \
134 'If timeout occurred considering raising the timeout and reconnecting.')
135 return
136
137 self.endpoint = endpoint.replace(':80', '')
138 self.endpoint_ip = socket.gethostbyname(self.endpoint)
139
140 if self.socket:
141 try:
142 self.socket.close()
143 except Exception:
144 pass
145
146 self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
147 self.socket.setblocking(False)
148
149 if self._handshake_complete.is_set():
150 # terminate the websocket and handle the reconnect loop if necessary.
151 self._handshake_complete.clear()
152 await self.ws.close(4000)
153 return
154
155 self._handshake_complete.set()
156
157 async def connect(self, *, reconnect=True, _tries=0, do_handshake=True):
158 log.info('Connecting to voice...')
159 try:
160 del self.secret_key
161 except AttributeError:
162 pass
163
164 if do_handshake:
165 await self.start_handshake()
166
167 try:
168 self.ws = await DiscordVoiceWebSocket.from_client(self)
169 self._connected.clear()
170 while not hasattr(self, 'secret_key'):
171 await self.ws.poll_event()
172 self._connected.set()
173 except (ConnectionClosed, asyncio.TimeoutError):
174 if reconnect and _tries < 5:
175 log.exception('Failed to connect to voice... Retrying...')
176 await asyncio.sleep(1 + _tries * 2.0, loop=self.loop)
177 await self.terminate_handshake()
178 await self.connect(reconnect=reconnect, _tries=_tries + 1)
179 else:
180 raise
181
182 if self._runner is None:
183 self._runner = self.loop.create_task(self.poll_voice_ws(reconnect))
184
185 async def poll_voice_ws(self, reconnect):
186 backoff = ExponentialBackoff()
187 while True:
188 try:
189 await self.ws.poll_event()
190 except (ConnectionClosed, asyncio.TimeoutError) as exc:
191 if isinstance(exc, ConnectionClosed):
192 # The following close codes are undocumented so I will document them here.
193 # 1000 - normal closure (obviously)
194 # 4014 - voice channel has been deleted.
195 # 4015 - voice server has crashed
196 if exc.code in (1000, 4014, 4015):
197 await self.disconnect()
198 break
199
200 if not reconnect:
201 await self.disconnect()
202 raise
203
204 retry = backoff.delay()
205 log.exception('Disconnected from voice... Reconnecting in %.2fs.', retry)
206 self._connected.clear()
207 await asyncio.sleep(retry, loop=self.loop)
208 await self.terminate_handshake()
209 try:
210 await self.connect(reconnect=True)
211 except asyncio.TimeoutError:
212 # at this point we've retried 5 times... let's continue the loop.
213 log.warning('Could not connect to voice... Retrying...')
214 continue
215
216 async def disconnect(self, *, force=False):
217 """|coro|
218 Disconnects this voice client from voice.
219 """
220 if not force and not self._connected.is_set():
221 return
222
223 self.stop()
224 self._connected.clear()
225
226 try:
227 if self.ws:
228 await self.ws.close()
229
230 await self.terminate_handshake(remove=True)
231 finally:
232 if self.socket:
233 self.socket.close()
234
235 async def move_to(self, channel):
236 """|coro|
237 Moves you to a different voice channel.
238 Parameters
239 -----------
240 channel: :class:`abc.Snowflake`
241 The channel to move to. Must be a voice channel.
242 """
243 guild_id, _ = self.channel._get_voice_state_pair()
244 await self.main_ws.voice_state(guild_id, channel.id)
245
246 def is_connected(self):
247 """:class:`bool`: Indicates if the voice client is connected to voice."""
248 return self._connected.is_set()
249
250 # audio related
251
252 def _get_voice_packet(self, data):
253 header = bytearray(12)
254
255 # Formulate rtp header
256 header[0] = 0x80
257 header[1] = 0x78
258 struct.pack_into('>H', header, 2, self.sequence)
259 struct.pack_into('>I', header, 4, self.timestamp)
260 struct.pack_into('>I', header, 8, self.ssrc)
261
262 encrypt_packet = getattr(self, '_encrypt_' + self.mode)
263 return encrypt_packet(header, data)
264
265 def _encrypt_xsalsa20_poly1305(self, header, data):
266 box = nacl.secret.SecretBox(bytes(self.secret_key))
267 nonce = bytearray(24)
268 nonce[:12] = header
269
270 return header + box.encrypt(bytes(data), bytes(nonce)).ciphertext
271
272 def _encrypt_xsalsa20_poly1305_suffix(self, header, data):
273 box = nacl.secret.SecretBox(bytes(self.secret_key))
274 nonce = nacl.utils.random(nacl.secret.SecretBox.NONCE_SIZE)
275
276 return header + box.encrypt(bytes(data), nonce).ciphertext + nonce
277
278 def play(self, source, *, after=None):
279 """Plays an :class:`AudioSource`.
280 The finalizer, ``after`` is called after the source has been exhausted
281 or an error occurred.
282 If an error happens while the audio player is running, the exception is
283 caught and the audio player is then stopped.
284 Parameters
285 -----------
286 source: :class:`AudioSource`
287 The audio source we're reading from.
288 after
289 The finalizer that is called after the stream is exhausted.
290 All exceptions it throws are silently discarded. This function
291 must have a single parameter, ``error``, that denotes an
292 optional exception that was raised during playing.
293 Raises
294 -------
295 ClientException
296 Already playing audio or not connected.
297 TypeError
298 source is not a :class:`AudioSource` or after is not a callable.
299 """
300
301 if not self._connected:
302 raise ClientException('Not connected to voice.')
303
304 if self.is_playing():
305 raise ClientException('Already playing audio.')
306
307 if not isinstance(source, AudioSource):
308 raise TypeError('source must an AudioSource not {0.__class__.__name__}'.format(source))
309
310 self._player = AudioPlayer(source, self, after=after)
311 self._player.start()
312
313 def is_playing(self):
314 """Indicates if we're currently playing audio."""
315 return self._player is not None and self._player.is_playing()
316
317 def is_paused(self):
318 """Indicates if we're playing audio, but if we're paused."""
319 return self._player is not None and self._player.is_paused()
320
321 def stop(self):
322 """Stops playing audio."""
323 if self._player:
324 self._player.stop()
325 self._player = None
326
327 def pause(self):
328 """Pauses the audio playing."""
329 if self._player:
330 self._player.pause()
331
332 def resume(self):
333 """Resumes the audio playing."""
334 if self._player:
335 self._player.resume()
336
337 @property
338 def source(self):
339 """Optional[:class:`AudioSource`]: The audio source being played, if playing.
340 This property can also be used to change the audio source currently being played.
341 """
342 return self._player.source if self._player else None
343
344 @source.setter
345 def source(self, value):
346 if not isinstance(value, AudioSource):
347 raise TypeError('expected AudioSource not {0.__class__.__name__}.'.format(value))
348
349 if self._player is None:
350 raise ValueError('Not playing anything.')
351
352 self._player._set_source(value)
353
354 def send_audio_packet(self, data, *, encode=True):
355 """Sends an audio packet composed of the data.
356 You must be connected to play audio.
357 Parameters
358 ----------
359 data: bytes
360 The :term:`py:bytes-like object` denoting PCM or Opus voice data.
361 encode: bool
362 Indicates if ``data`` should be encoded into Opus.
363 Raises
364 -------
365 ClientException
366 You are not connected.
367 OpusError
368 Encoding the data failed.
369 """
370
371 self.checked_add('sequence', 1, 65535)
372 if encode:
373 encoded_data = self.encoder.encode(data, self.encoder.SAMPLES_PER_FRAME)
374 else:
375 encoded_data = data
376 packet = self._get_voice_packet(encoded_data)
377 try:
378 self.socket.sendto(packet, (self.endpoint_ip, self.voice_port))
379 except BlockingIOError:
380 log.warning('A packet has been dropped (seq: %s, timestamp: %s)', self.sequence, self.timestamp)
381
382 self.checked_add('timestamp', self.encoder.SAMPLES_PER_FRAME, 4294967295)
383
384 client.run("NTYwODQ5MDA4NDA3NjA5MzU0.D356-w.ryoxlZx6f1Jf2DkaA3XO_WqBao8")