· 4 years ago · Feb 15, 2021, 02:24 PM
1import asyncio
2import functools
3import re
4import textwrap
5from contextlib import suppress
6from types import SimpleNamespace
7from typing import Optional, Tuple
8
9import discord
10from bs4 import BeautifulSoup
11from bs4.element import PageElement, Tag
12from discord.errors import NotFound
13from discord.ext import commands
14from markdownify import MarkdownConverter
15from requests import ConnectTimeout, ConnectionError, HTTPError
16from sphinx.ext import intersphinx
17from urllib3.exceptions import ProtocolError
18from utils.converters import ValidPythonIdentifier, ValidURL
19from utils.paginator import LinePaginator
20from utils.cache import AsyncCache
21
22# Since Intersphinx is intended to be used with Sphinx,
23# we need to mock its configuration.
24SPHINX_MOCK_APP = SimpleNamespace(
25 config=SimpleNamespace(
26 intersphinx_timeout=3,
27 tls_verify=True,
28 user_agent="python3:python-discord/bot:1.0.0",
29 )
30)
31
32NO_OVERRIDE_GROUPS = (
33 "2to3fixer",
34 "token",
35 "label",
36 "pdbcommand",
37 "term",
38)
39NO_OVERRIDE_PACKAGES = ("python",)
40
41SEARCH_END_TAG_ATTRS = (
42 "data",
43 "function",
44 "class",
45 "exception",
46 "seealso",
47 "section",
48 "rubric",
49 "sphinxsidebar",
50)
51UNWANTED_SIGNATURE_SYMBOLS_RE = re.compile(r"\[source]|\\\\|¶")
52WHITESPACE_AFTER_NEWLINES_RE = re.compile(r"(?<=\n\n)(\s+)")
53
54FAILED_REQUEST_RETRY_AMOUNT = 3
55
56symbol_cache = AsyncCache()
57
58allowed_roles = [
59 790221089786822657,
60 795937707263000596,
61 790219985229709342,
62 794402650569310239,
63]
64
65
66class DocMarkdownConverter(MarkdownConverter):
67 """Subclass markdownify's MarkdownCoverter to provide custom conversion methods."""
68
69 def convert_code(self, el: PageElement, text: str) -> str:
70 """Undo `markdownify`s underscore escaping."""
71 return f"`{text}`".replace("\\", "")
72
73 def convert_pre(self, el: PageElement, text: str) -> str:
74 """Wrap any codeblocks in `py` for syntax highlighting."""
75 code = "".join(el.strings)
76 return f"```py\n{code}```"
77
78
79def markdownify(html: str) -> DocMarkdownConverter:
80 """Create a DocMarkdownConverter object from the input html."""
81 return DocMarkdownConverter(bullets="•").convert(html)
82
83
84class InventoryURL(commands.Converter):
85 """
86 Represents an Intersphinx inventory URL.
87 This converter checks whether intersphinx accepts the given inventory URL, and raises
88 `BadArgument` if that is not the case.
89 Otherwise, it simply passes through the given URL.
90 """
91
92 @staticmethod
93 async def convert(ctx: commands.Context, url: str) -> str:
94 """Convert url to Intersphinx inventory URL."""
95 try:
96 intersphinx.fetch_inventory(SPHINX_MOCK_APP, "", url)
97 except AttributeError:
98 raise commands.BadArgument(
99 f"Failed to fetch Intersphinx inventory from URL `{url}`."
100 )
101 except ConnectionError:
102 if url.startswith("https"):
103 raise commands.BadArgument(
104 f"Cannot establish a connection to `{url}`. Does it support HTTPS?"
105 )
106 raise commands.BadArgument(f"Cannot connect to host with URL `{url}`.")
107 except ValueError:
108 raise commands.BadArgument(
109 f"Failed to read Intersphinx inventory from URL `{url}`. "
110 "Are you sure that it's a valid inventory file?"
111 )
112 return url
113
114
115class Doc(commands.Cog):
116 """A set of commands for querying & displaying documentation."""
117
118 def __init__(self, bot):
119 self.base_urls = {}
120 self.bot = bot
121 self.inventories = {}
122 self.renamed_symbols = set()
123
124 self.bot.loop.create_task(self.init_refresh_inventory())
125
126
127 async def on_ready():
128 print('Docs cog loaded successfully')
129
130 async def init_refresh_inventory(self) -> None:
131 """Refresh documentation inventory on cog initialization."""
132 await self.refresh_inventory()
133
134 async def update_single(
135 self, package_name: str, base_url: str, inventory_url: str
136 ) -> None:
137 """
138 Rebuild the inventory for a single package.
139 Where:
140 * `package_name` is the package name to use, appears in the log
141 * `base_url` is the root documentation URL for the specified package, used to build
142 absolute paths that link to specific symbols
143 * `inventory_url` is the absolute URL to the intersphinx inventory, fetched by running
144 `intersphinx.fetch_inventory` in an executor on the bot's event loop
145 """
146 self.base_urls[package_name] = base_url
147
148 package = await self._fetch_inventory(inventory_url)
149 if not package:
150 return None
151
152 for group, value in package.items():
153 for symbol, (package_name, _version, relative_doc_url, _) in value.items():
154 absolute_doc_url = base_url + relative_doc_url
155
156 if symbol in self.inventories:
157 group_name = group.split(":")[1]
158 symbol_base_url = self.inventories[symbol].split("/", 3)[2]
159 if group_name in NO_OVERRIDE_GROUPS or any(
160 package in symbol_base_url for package in NO_OVERRIDE_PACKAGES
161 ):
162
163 symbol = f"{group_name}.{symbol}"
164 # If renamed `symbol` already exists, add library name in front to differentiate between them.
165 if symbol in self.renamed_symbols:
166 # Split `package_name` because of packages like Pillow that have spaces in them.
167 symbol = f"{package_name.split()[0]}.{symbol}"
168
169 self.inventories[symbol] = absolute_doc_url
170 self.renamed_symbols.add(symbol)
171 continue
172
173 self.inventories[symbol] = absolute_doc_url
174
175 async def refresh_inventory(self) -> None:
176 """Refresh internal documentation inventory."""
177
178 # Clear the old base URLS and inventories to ensure
179 # that we start from a fresh local dataset.
180 # Also, reset the cache used for fetching documentation.
181 self.base_urls.clear()
182 self.inventories.clear()
183 self.renamed_symbols.clear()
184 symbol_cache.clear()
185
186 # Run all coroutines concurrently - since each of them performs a HTTP
187 # request, this speeds up fetching the inventory data heavily.
188 coros = [
189 self.update_single(
190 package["package"], package["base_url"], package["inventory_url"]
191 )
192 for package in await self.bot.api_client.get("bot/documentation-links")
193 ]
194 await asyncio.gather(*coros)
195
196 async def get_symbol_html(self, symbol: str) -> Optional[Tuple[list, str]]:
197 """
198 Given a Python symbol, return its signature and description.
199 The first tuple element is the signature of the given symbol as a markup-free string, and
200 the second tuple element is the description of the given symbol with HTML markup included.
201 If the given symbol is a module, returns a tuple `(None, str)`
202 else if the symbol could not be found, returns `None`.
203 """
204 url = self.inventories.get(symbol)
205 if url is None:
206 return None
207
208 async with self.bot.http_session.get(url) as response:
209 html = await response.text(encoding="utf-8")
210
211 # Find the signature header and parse the relevant parts.
212 symbol_id = url.split("#")[-1]
213 soup = BeautifulSoup(html, "lxml")
214 symbol_heading = soup.find(id=symbol_id)
215 search_html = str(soup)
216
217 if symbol_heading is None:
218 return None
219
220 if symbol_id == f"module-{symbol}":
221 # Get page content from the module headerlink to the
222 # first tag that has its class in `SEARCH_END_TAG_ATTRS`
223 start_tag = symbol_heading.find("a", attrs={"class": "headerlink"})
224 if start_tag is None:
225 return [], ""
226
227 end_tag = start_tag.find_next(self._match_end_tag)
228 if end_tag is None:
229 return [], ""
230
231 description_start_index = search_html.find(str(start_tag.parent)) + len(
232 str(start_tag.parent)
233 )
234 description_end_index = search_html.find(str(end_tag))
235 description = search_html[description_start_index:description_end_index]
236 signatures = None
237
238 else:
239 signatures = []
240 description = str(symbol_heading.find_next_sibling("dd"))
241 description_pos = search_html.find(description)
242 # Get text of up to 3 signatures, remove unwanted symbols
243 for element in [symbol_heading] + symbol_heading.find_next_siblings(
244 "dt", limit=2
245 ):
246 signature = UNWANTED_SIGNATURE_SYMBOLS_RE.sub("", element.text)
247 if signature and search_html.find(str(element)) < description_pos:
248 signatures.append(signature)
249
250 return signatures, description.replace("¶", "")
251
252 @symbol_cache(arg_offset=1)
253 async def get_symbol_embed(self, symbol: str) -> Optional[discord.Embed]:
254 """
255 Attempt to scrape and fetch the data for the given `symbol`, and build an embed from its contents.
256 If the symbol is known, an Embed with documentation about it is returned.
257 """
258 scraped_html = await self.get_symbol_html(symbol)
259 if scraped_html is None:
260 return None
261
262 signatures = scraped_html[0]
263 permalink = self.inventories[symbol]
264 description = markdownify(scraped_html[1])
265
266 # Truncate the description of the embed to the last occurrence
267 # of a double newline (interpreted as a paragraph) before index 1000.
268 if len(description) > 1000:
269 shortened = description[:1000]
270 description_cutoff = shortened.rfind("\n\n", 100)
271 if description_cutoff == -1:
272 # Search the shortened version for cutoff points in decreasing desirability,
273 # cutoff at 1000 if none are found.
274 for string in (". ", ", ", ",", " "):
275 description_cutoff = shortened.rfind(string)
276 if description_cutoff != -1:
277 break
278 else:
279 description_cutoff = 1000
280 description = description[:description_cutoff]
281
282 # If there is an incomplete code block, cut it out
283 if description.count("```") % 2:
284 codeblock_start = description.rfind("```py")
285 description = description[:codeblock_start].rstrip()
286 description += f"... [read more]({permalink})"
287
288 description = WHITESPACE_AFTER_NEWLINES_RE.sub("", description)
289 if signatures is None:
290 # If symbol is a module, don't show signature.
291 embed_description = description
292
293 elif not signatures:
294 # It's some "meta-page", for example:
295 # https://docs.djangoproject.com/en/dev/ref/views/#module-django.views
296 embed_description = (
297 "This appears to be a generic page not tied to a specific symbol."
298 )
299
300 else:
301 embed_description = "".join(
302 f"```py\n{textwrap.shorten(signature, 500)}```"
303 for signature in signatures
304 )
305 embed_description += f"\n{description}"
306
307 embed = discord.Embed(
308 title=f"`{symbol}`", url=permalink, description=embed_description
309 )
310 # Show all symbols with the same name that were renamed in the footer.
311 embed.set_footer(
312 text=", ".join(
313 renamed
314 for renamed in self.renamed_symbols - {symbol}
315 if renamed.endswith(f".{symbol}")
316 )
317 )
318 return embed
319
320 @commands.group(name="docs", aliases=("doc", "d"), invoke_without_command=True)
321 async def docs_group(
322 self, ctx: commands.Context, symbol: commands.clean_content = None
323 ) -> None:
324 """Lookup documentation for Python symbols."""
325 await self.get_command(ctx, symbol)
326
327 @docs_group.command(name="get", aliases=("g",))
328 async def get_command(
329 self, ctx: commands.Context, symbol: commands.clean_content = None
330 ) -> None:
331 """
332 Return a documentation embed for a given symbol.
333 If no symbol is given, return a list of all available inventories.
334 Examples:
335 !docs
336 !docs aiohttp
337 !docs aiohttp.ClientSession
338 !docs get aiohttp.ClientSession
339 """
340 if symbol is None:
341 inventory_embed = discord.Embed(
342 title=f"All inventories (`{len(self.base_urls)}` total)",
343 colour=discord.Colour.blue(),
344 )
345
346 lines = sorted(
347 f"• [`{name}`]({url})" for name, url in self.base_urls.items()
348 )
349 if self.base_urls:
350 await LinePaginator.paginate(
351 lines, ctx, inventory_embed, max_size=400, empty=False
352 )
353
354 else:
355 inventory_embed.description = (
356 "Hmmm, seems like there's nothing here yet."
357 )
358 await ctx.send(embed=inventory_embed)
359
360 else:
361 # Fetching documentation for a symbol (at least for the first time, since
362 # caching is used) takes quite some time, so let's send typing to indicate
363 # that we got the command, but are still working on it.
364 async with ctx.typing():
365 doc_embed = await self.get_symbol_embed(symbol)
366
367 if doc_embed is None:
368 error_embed = discord.Embed(
369 description=f"Sorry, I could not find any documentation for `{symbol}`.",
370 colour=discord.Colour.red(),
371 )
372 error_message = await ctx.send(embed=error_embed)
373 with suppress(NotFound):
374 pass
375 else:
376 msg = await ctx.send(embed=doc_embed)
377
378 @docs_group.command(name="set", aliases=("s",))
379 @commands.has_any_permissions(administrator=True)
380 async def set_command(
381 self,
382 ctx: commands.Context,
383 package_name: ValidPythonIdentifier,
384 base_url: ValidURL,
385 inventory_url: InventoryURL,
386 ) -> None:
387 """
388 Adds a new documentation metadata object to the site's database.
389 The database will update the object, should an existing item with the specified `package_name` already exist.
390 Example:
391 !docs set \
392 python \
393 https://docs.python.org/3/ \
394 https://docs.python.org/3/objects.inv
395 """
396 body = {
397 "package": package_name,
398 "base_url": base_url,
399 "inventory_url": inventory_url,
400 }
401 await self.bot.api_client.post("bot/documentation-links", json=body)
402
403 # Rebuilding the inventory can take some time, so lets send out a
404 # typing event to show that the Bot is still working.
405 async with ctx.typing():
406 await self.refresh_inventory()
407 await ctx.send(
408 f"Added package `{package_name}` to database and refreshed inventory."
409 )
410
411 @docs_group.command(name="delete", aliases=("remove", "rm", "d"))
412 @commands.has_any_permissions(administrator=True)
413 async def delete_command(
414 self, ctx: commands.Context, package_name: ValidPythonIdentifier
415 ) -> None:
416 """
417 Removes the specified package from the database.
418 Examples:
419 !docs delete aiohttp
420 """
421 await self.bot.api_client.delete(f"bot/documentation-links/{package_name}")
422
423 async with ctx.typing():
424 # Rebuild the inventory to ensure that everything
425 # that was from this package is properly deleted.
426 await self.refresh_inventory()
427 await ctx.send(
428 f"Successfully deleted `{package_name}` and refreshed inventory."
429 )
430
431 @docs_group.command(name="refresh", aliases=("rfsh", "r"))
432 @commands.has_any_permissions(administrator=True)
433 async def refresh_command(self, ctx: commands.Context) -> None:
434 """Refresh inventories and send differences to channel."""
435 old_inventories = set(self.base_urls)
436 with ctx.typing():
437 await self.refresh_inventory()
438 # Get differences of added and removed inventories
439 added = ", ".join(inv for inv in self.base_urls if inv not in old_inventories)
440 if added:
441 added = f"+ {added}"
442
443 removed = ", ".join(inv for inv in old_inventories if inv not in self.base_urls)
444 if removed:
445 removed = f"- {removed}"
446
447 embed = discord.Embed(
448 title="Inventories refreshed",
449 description=f"```diff\n{added}\n{removed}```" if added or removed else "",
450 )
451 await ctx.send(embed=embed)
452
453 async def _fetch_inventory(self, inventory_url: str) -> Optional[dict]:
454 """Get and return inventory from `inventory_url`. If fetching fails, return None."""
455 fetch_func = functools.partial(
456 intersphinx.fetch_inventory, SPHINX_MOCK_APP, "", inventory_url
457 )
458 for retry in range(1, FAILED_REQUEST_RETRY_AMOUNT + 1):
459 try:
460 package = await self.bot.loop.run_in_executor(None, fetch_func)
461 except ConnectTimeout:
462 pass
463 except ProtocolError:
464 pass
465 except HTTPError:
466 return None
467 except ConnectionError:
468 return None
469 else:
470 return package
471 return None
472
473 @staticmethod
474 def _match_end_tag(tag: Tag) -> bool:
475 """Matches `tag` if its class value is in `SEARCH_END_TAG_ATTRS` or the tag is table."""
476 for attr in SEARCH_END_TAG_ATTRS:
477 if attr in tag.get("class", ()):
478 return True
479
480 return tag.name == "table"
481
482
483def setup(bot) -> None:
484 """Load the Doc cog."""
485 bot.add_cog(Doc(bot))
486