· last year · May 22, 2024, 11:25 AM
1import collections # Stackexchange code for list utilities requires this
2import datetime # get current time, convert time string representations
3import logging # warning messages etc.
4import re # regular expressions, used to match new section edit summaries
5import sys # allows command line arguments
6
7# Pywikibot and associated imports
8import pywikibot
9
10
11# Commands that directly call the API using PWB
12def manual_API_call(site, format='json', formatversion=2, **kwargs): # noqa: D301
13 """Make API request by giving parameters 'by hand'.
14
15 Workaround to make direct API calls, because PWB does not (yet?) support
16 all API syntax, including some stuff we need.
17
18 Input:
19 - site is an APISite, e.g. obtained by pywikibot.Site(); PWB should be able
20 to read pages there (i.e. be logged with the appropriate permissions if
21 needed)
22 - keyword arguments: will be passed unmodified to the API for Site.
23 Defaults include:
24 format=json&formatversion=2
25
26
27 Doctests:
28 >>> manual_API_call(pywikibot.Site(), action='parse', prop='sections',\
29 oldid=837538913)['parse']['sections'][:2] ==\
30 [{'index': '1', 'anchor': 'Interesting_facts', 'toclevel': 1,\
31 'line': 'Interesting facts','byteoffset': 3282,'level': '2',\
32 'number': '1', 'fromtitle': 'Wikipedia:Teahouse'},\
33 {'index': '2', 'anchor': 'oclc', 'toclevel': 1,\
34 'line': 'oclc', 'byteoffset': 9831, 'number': '2',\
35 'fromtitle': 'Wikipedia:Teahouse', 'level': '2'}]
36 True
37 """
38 request = pywikibot.data.api.Request.create_simple(site, **kwargs)
39 return request.submit()
40
41
42def whoami(site=pywikibot.Site()):
43 """Check the currently logged-in user via the API."""
44 return site.userinfo['name']
45
46
47def get_user_info(userlist, site=pywikibot.Site()): # noqa:D301
48 """Query the API for user info.
49
50 Input:
51 - userlist is a list of strings, each string being a username
52
53 Output:
54 - dict whose keys match the provided userlist; each entry contains user
55 information as given by the API
56
57 Doctests:
58 >>> get_user_info(['Jimbo Wales','Sandbox for user warnings']
59 ... ).keys() == {'Jimbo Wales','Sandbox for user warnings'}
60 True
61 >>> get_user_info(['Jimbo Wales'])['Jimbo Wales']['registration']
62 '2001-03-27T20:47:31Z'
63 >>> get_user_info(['Nonexisting username'])==\
64 {'Nonexisting username': {'missing': '', 'name': 'Nonexisting username'}}
65 True
66 """
67
68 usersgen = site.users(userlist)
69
70 # transform into a dictionary whose keys are the usernames
71 resultdict = dict()
72 for entry in usersgen:
73 resultdict[entry['name']] = entry
74 return resultdict
75
76
77def get_block_info(userlist, site=pywikibot.Site()):
78 """Query the API for block info.
79
80 Input: a list of strings, each string being a username.
81 Output: a dictionary of bool such that dict[user] is True if the user
82 currently (1) exists and (2) is blocked; dict keys match the input.
83
84 Although get_user_info could be used to check for a current block on logged
85 accounts, it is not possible on IP accounts, hence the need for this other
86 subfunction. See also
87 - https://www.mediawiki.org/wiki/API:Users
88 - https://www.mediawiki.org/w/index.php?title=Topic:Tspl9p7oiyzzm19w
89
90 Doctests:
91 >>> get_block_info(['Tigraan', 'Blocked user', 'Nonexisting username']
92 ... ) == {'Tigraan': False,
93 ... 'Blocked user': True,
94 ... 'Nonexisting username': False}
95 True
96 """
97 blockgen = site.blocks(users=userlist)
98
99 # transform result into a dict of bool
100 resultdict = dict()
101 for user in userlist:
102 resultdict[user] = False
103 for block in blockgen:
104 resultdict[block['user']] = True
105
106 return resultdict
107
108
109def get_sections_from_revid(pageindicator, site=pywikibot.Site()): # noqa: D301
110 """Get list of sections from specific page revision.
111
112 Input:
113 - site:
114 - pageindicator: indicates which page to use.
115 - if a str, uses the current revision of the page with that title
116 - if an int, treated as a revision number via 'oldid' in
117 https://www.mediawiki.org/wiki/API:Parsing_wikitext
118
119 Doctests:
120 >>> get_sections_from_revid(783718598)[:2]==\
121 [{'anchor': 'Request:_World_Cafe',
122 ... 'byteoffset': 3329,
123 ... 'fromtitle': 'Wikipedia:Teahouse',
124 ... 'index': '1',
125 ... 'level': '2',
126 ... 'line': 'Request: World Cafe',
127 ... 'number': '1',
128 ... 'toclevel': 1},
129 ... {'anchor': 'How_to_publish_my_page',
130 ... 'byteoffset': 8292,
131 ... 'fromtitle': 'Wikipedia:Teahouse',
132 ... 'index': '2',
133 ... 'level': '2',
134 ... 'line': 'How to publish my page',
135 ... 'number': '2',
136 ... 'toclevel': 1}
137 ... ]
138 True
139 """
140 params = {'action': 'parse',
141 'prop': 'sections',
142 }
143 if isinstance(pageindicator, int):
144 params['oldid'] = pageindicator
145 else:
146 params['page'] = pageindicator
147
148 api_call_result = manual_API_call(site, **params)
149
150 # Traverse two levels of the dictionary and return
151 return api_call_result['parse']['sections']
152
153
154def get_revisions_from_api(pagename, oldtimestamp, newtimestamp,
155 maxcontinuenumber=0, continuestring=None,
156 site=pywikibot.Site()): # noqa: D301
157 """Get all revisions to specific page since a given timestamp.
158
159 Input:
160 - pagename: string, title of the page for which to pull revisions
161 - oldtimestamp, newtimestamp: strings, representing timestamps in Mediawiki
162 format, between which to lookup the revisions
163 Output: a list of dict, each corresponding to a single revision
164
165 That function can also pull multiple pages with the rvcontinue API key.
166 To do so, the function is called recursively with a continuenumber (counter
167 describing the maximum number of page pulls left, to avoid infinite looping
168 while requesting API resources) and a continuestring, cf. rvcontinue in
169 https://www.mediawiki.org/wiki/API:Revisions
170
171 Doctests:
172 >>> get_revisions_from_api('Tiger','2018-03-01T00:00:00Z',
173 ... '2018-03-05T00:00:00Z') ==\
174 [{'timestamp': '2018-03-04T15:30:31Z',
175 ... 'parentid': 828307448,
176 ... 'comment': '/* Size */Journal cites: format page range,',
177 ... 'user': 'Rjwilmsi',
178 ... 'revid': 828751877},
179 ... {'timestamp': '2018-03-01T20:11:02Z',
180 ... 'parentid': 828233956,
181 ... 'comment': '/* Reproduction */ hatnote',
182 ... 'user': 'BDD',
183 ... 'revid': 828307448},
184 ... {'timestamp': '2018-03-01T10:08:52Z',
185 ... 'parentid': 828032712,
186 ... 'comment': '/* Taxonomy */ edited ref',
187 ... 'user': 'BhagyaMani',
188 ... 'revid': 828233956}]
189 True
190 """
191 params = {'action': 'query',
192 'prop': 'revisions',
193 'titles': pagename,
194 'rvprop': 'timestamp|user|comment|ids',
195 'rvdir': 'older',
196 'rvend': oldtimestamp,
197 'rvstart': newtimestamp,
198 'rvlimit': 'max'
199 }
200
201 # Previous call may require to continue a call
202 if continuestring:
203 params['rvcontinue'] = continuestring
204
205 api_call_result = manual_API_call(site, **params)
206
207 tmp = api_call_result['query']['pages']
208 tmp2 = list(tmp.keys()) # single-element list e.g. ['36896']
209 revlist = tmp[tmp2[0]]['revisions']
210
211 # Check if we need to pull more revisions
212 # If so, recursively call itself and merge results
213 if maxcontinuenumber > 0 and 'query-continue' in api_call_result:
214 # 'batchcomplete' key present = no continue needed
215 # maxcontinuenumber<=0 = we have reached the maximum of continues
216
217 # toprint = api_call_result.copy()
218 # toprint['query']['pages'] = '...some stuff...'
219 # print(toprint)
220 cs = api_call_result['query-continue']['revisions']['rvcontinue']
221 rcl = get_revisions_from_api(pagename, oldtimestamp, newtimestamp,
222 maxcontinuenumber=maxcontinuenumber - 1,
223 continuestring=cs)
224 full_list = revlist + rcl
225 return full_list
226 else:
227 return revlist
228
229
230# Other commands
231def isnotifiable(users):
232 """Check if specified users can be notified.
233
234 Input: list of strings (usernames).
235 Output is a dict of booleans, keys match input (True = can be notified).
236
237 This takes care of the policy aspect (who gets notified, in general)
238 but NOT of bot exclusion compliance, which must be handled elsewhere.
239 For instance pywikibot's scripts should take care of it, per
240 https://en.wikipedia.org/wiki/Template:Bots#Implementation
241
242 Current policy is to notify anyone regardless of 'age' (edit count) or
243 groups (autoconfirmed etc.) but to not notify blocked users.
244
245 Doctests:
246 >>> isnotifiable(['Tigraan'])
247 {'Tigraan': True}
248
249 >>> isnotifiable(['Blocked user'])
250 {'Blocked user': False}
251
252 >>> isnotifiable(['Nonexisting username'])
253 {'Nonexisting username': False}
254 """
255 # Block information
256 isblocked = get_block_info(users)
257
258 # Other general user information
259 # WARNING! For IP editors, all we get is the 'invalid' key.
260 # Do not rely on this to get (e.g.) the edit count of an IP editor!
261 userinfo = get_user_info(users)
262
263 is_notifiable = dict()
264 no_notif_str = 'No notification will be sent.'
265 unknown_user_str = 'User "{un}" does not seem to exist. ' + no_notif_str
266 blocked_user_str = 'User "{un}" is currently blocked. ' + no_notif_str
267 for u in users:
268 info = userinfo[u]
269 # NOTIFICATION POLICY APPLIES HERE
270
271 # If username does not exist (renamed user?) do not notify
272 if 'missing' in info:
273 is_notifiable[u] = False
274 logging.info(unknown_user_str.format(un=u))
275 continue
276
277 # Do not notify currently-blocked users
278 if isblocked[u]:
279 is_notifiable[u] = False
280 logging.info(blocked_user_str.format(un=u))
281 continue
282
283 # # Further policy options, inactive as of 2018-03-18
284 # # Do not notify users with more than x edits
285 # maxedits = 1000
286 # if info['editcount']>maxedits:
287 # is_notifiable[u] = False
288 # logging.info('User "{un}" performed more than {nedits} edits and will not be notified.'.format(un=u,nedits=maxedits)) # noqa: E501
289 #
290 # # Do not notify users with the ECP flag
291 # if 'extendedconfirmed' in info['groups']:
292 # is_notifiable[u] = False
293 # logging.info('User "{un}" is extended confirmed and will not be notified.'.format(un=u)) # noqa: E501
294
295 # By default, we should notify
296 is_notifiable[u] = True
297
298 return is_notifiable
299
300
301def UTC_timestamp_x_days_ago(days_offset=0):
302 """Timestamp x days ago in Mediawiki format.
303
304 Input is the number of days that will be substracted from the
305 current timestamp.
306 Format: cf. https://www.mediawiki.org/wiki/Manual:Timestamp
307 """
308 current_time = datetime.datetime.utcnow() # MediaWiki servers use UTC time
309 offset = datetime.timedelta(days=-days_offset)
310 UTC_time_then = current_time + offset
311
312 timestamp = UTC_time_then.strftime("%Y%m%d%H%M%S") # MW format
313 return timestamp
314
315
316def safe_list_diff(listbefore, listafter):
317 """Find elements that were removed from one list to another.
318
319 Compared to a basic set diff, this takes care of the edge case
320 where an element is present multiple times in the larger list
321 by removing it altogether (and logging this fact).
322 Also, it will raise an AssertionError if the second list is not
323 included in the first one (which is expected for an archival diff).
324
325 Warning: because a set diff is used, no order is guaranteed in the output
326 list.
327
328 Inputs: lists of strings (names of the threads from page history)
329 Output: list of strings
330
331 Doctests:
332
333 Standard use:
334 >>> safe_list_diff(['Hello','See you later','Bye'],['Hello'])
335 ['See you later', 'Bye']
336
337 Duplicate name: will be scrapped from output and log a warning
338 >>> safe_list_diff(['Duplicate','Duplicate','Hello', 'Later'],['Hello'])
339 ['Later']
340
341 Erroneous input: listafter contains a thread name not in listbefore
342 >>> safe_list_diff(['Hello','See you later'],['Hello', 'Abnormal name'])
343 Traceback (most recent call last):
344 (some traceback)
345 AssertionError
346
347 """
348 setbefore = set(listbefore)
349 setafter = set(listafter)
350 # Sanity check that listafter <= listbefore (less threads after archiving)
351 assert(not bool(setafter - setbefore)) # True iff. set diff is empty
352
353 # Identify duplicate elements in listbefore and remove them. See
354 # https://stackoverflow.com/questions/11236006/identify-duplicate-values-in-a-list-in-python
355 duplicate_values = [k for k, v in collections.Counter(listbefore).items()
356 if v > 1]
357
358 for val in duplicate_values:
359 logging.warning('Multiple threads that share the same name will be '
360 + 'ignored. The name was '
361 + '"{nameofthread}".'.format(nameofthread=val))
362
363 setdupes = set(duplicate_values)
364
365 # Return threads that were removed and which are not duplicates
366 # Ensure we return them in the original order!
367 final_list = []
368 set_to_return = setbefore - setafter - setdupes
369 for tn in listbefore:
370 if tn in set_to_return:
371 final_list.append(tn)
372
373 return final_list
374
375
376def list_matching(ta, threadscreated):
377 """Match string elements from two lists.
378
379 We have on the one hand a list of threads that underwent the last
380 archival, and on the other hand a list of created new sections.
381 We want to match each of the archived threads to its creation.
382 If a thread is matched multiple times or not at all, it must not be
383 passed later on, but the event should be logged.
384
385 ta is a list (it has been sanitized upstream to deal
386 with name collisions). threadscreated is a list of dict; each dict
387 contains at least 'name', the thread title to match.
388 The output is a list of dict, the subset of threadscreated
389 that have been matched exactly once in threadsarchived.
390
391 Leading and trailing white spaces are discarded during the comparison
392 because of some obscure false positive cases identified at test run.
393
394 Inputs: list of strings and list of dict
395 Output: list of dict
396
397 Doctests:
398
399 >>> list_matching(['Thread#1','Thread#3'],
400 ... [{'revid' : 1, 'name' : 'Thread#1','user' : 'User#1'},
401 ... {'revid' : 2, 'name' : 'Thread#2','user' : 'User#2'},
402 ... {'revid' : 3, 'name' : 'Thread#3','user' : 'User#3'},
403 ... {'revid' : 4, 'name' : 'Thread#4','user' : 'User#4'}
404 ... ]
405 ... ) == [{'revid': 1, 'name': 'Thread#1','user': 'User#1'},
406 ... {'revid': 3, 'name': 'Thread#3','user': 'User#3'}]
407 True
408 """
409 output = []
410
411 for i in range(len(ta)):
412 cur_str = ta[i].strip()
413 matching_indices = [j for j, k in enumerate(threadscreated)
414 if k['name'].strip() == cur_str]
415
416 if len(matching_indices) == 1: # normal case, one single match
417 output.append(threadscreated[matching_indices[0]])
418 continue
419
420 # exceptional cases
421 if len(matching_indices) == 0: # no matches
422 logging.warning('No matches for the creation of the following'
423 + 'thread: "{tn}"'.format(tn=cur_str))
424 else: # more than one match
425 logging.warning('Multiple matches (all will be ignored) for the'
426 + 'creation of the following thread: '
427 + '"{tn}"'.format(tn=cur_str))
428
429 return output
430
431
432def traverse_list_of_sections(inputlistofdict):
433 """Get list of sections from the API output.
434
435 Remove the fluff (data offset etc.) from get_sections_from_revid to get
436 only thread names (i.e. the 'line' key).
437 """
438 output_list = []
439
440 for item in inputlistofdict:
441 output_list.append(item['line'])
442
443 return output_list
444
445
446def find_section_anchor(inputlistofdict, sectionname):
447 """Match a section name to the output of get_sections_from_revid.
448
449 Input: inputlistofdict comes from get_sections_from_revid (list of dict),
450 sectionname is a string (name of a thread).
451
452 Output: a list of section anchors, corresponding to all unique
453 sections that have the name sectionname. The normal case is for the
454 list to have a single element, but returning a list allows easier
455 testing for edge cases later.
456
457 Leading and trailing spaces are removed for the comparison.
458
459 Doctests:
460 >>> find_section_anchor([{'anchor': 'Request:_World_Cafe',
461 ... 'byteoffset': 3329,
462 ... 'fromtitle': 'Wikipedia:Teahouse',
463 ... 'index': '1',
464 ... 'level': '2',
465 ... 'line': 'Request: World Cafe',
466 ... 'number': '1',
467 ... 'toclevel': 1},
468 ... {'anchor': 'How_to_publish_my_page',
469 ... 'byteoffset': 8292,
470 ... 'fromtitle': 'Wikipedia:Teahouse',
471 ... 'index': '2',
472 ... 'level': '2',
473 ... 'line': 'How to publish my page',
474 ... 'number': '2',
475 ... 'toclevel': 1}
476 ... ],
477 ... 'How to publish my page')
478 ['How_to_publish_my_page']
479 """
480 outlist = []
481
482 for item in inputlistofdict:
483 if sectionname.strip() == item['line'].strip():
484 outlist.append(item['anchor'])
485
486 return outlist
487
488
489def search_archives_for_section(links_to_search, sectionnames):
490 """Find links to archived threads.
491
492 This checks the current content of multiple archive links for the
493 desired section names, and ensure only a unique match is accepted
494 for each. Otherwise, failure to find a unique match is logged.
495
496 Input: links_to_search is a list of strings, the names (shortened URL) of
497 archive pages to search; sectionnames is a list of strings, the 'anchor's
498 to match.
499
500 Doctests:
501 >>> search_archives_for_section(['Wikipedia:Teahouse/Questions/Archive_98',
502 ... 'Wikipedia:Teahouse/Questions/Archive_99'
503 ... ],['Picture problem', 'Blog as reference?']) # noqa: E501
504 ['Wikipedia:Teahouse/Questions/Archive_98#Picture_problem', 'Wikipedia:Teahouse/Questions/Archive_99#Blog_as_reference?']
505 """
506 # First, query the API for the content of the archive links
507 archive_contents = dict()
508 for archivelink in links_to_search:
509 linkcontent = get_sections_from_revid(archivelink)
510 archive_contents[archivelink] = linkcontent # links as keys, why not
511
512 # print(linkcontent)
513 # Loop over the queried section names
514 out_links = []
515
516 for sn in sectionnames:
517 matches = [] # will hold the matched section(s)
518
519 for arlink in links_to_search:
520 linkmatches = find_section_anchor(archive_contents[arlink], sn)
521 if linkmatches: # found (at least) one good thread there
522 candidatelink = arlink
523
524 matches += linkmatches # append current matches to old ones
525
526 if len(matches) == 1: # the good case: we found exactly one match
527 fullarchivelink = candidatelink + "#" + matches[0]
528 out_links.append(fullarchivelink)
529 continue
530
531 # If we did not continue, we are in the bad case, so we default
532 # the link to an empty string
533 out_links.append('')
534
535 # Log the problem
536 nomatch = 'No thread "{tn}" found in the links "{l}"'
537 morematches = 'Multiple matches for thread "{tn}" in the links "{l}"'
538 if len(matches) == 0:
539 logging.warning(nomatch.format(tn=sn, l=links_to_search))
540 else: # len(matches)>1
541 logging.warning(morematches.format(tn=sn, l=links_to_search))
542
543 return out_links
544
545
546def sections_removed_by_diff(revid1, revid2):
547 """Get sections removed between two edits.
548
549 Inputs: two revision IDs (integers). You should ensure that both revids
550 refer to consecutive edits on the same page; this is not directly checked.
551 That function makes a call to safe_list_diff, which will probably throw an
552 exception if a different page is used or if the diff is too far apart, but
553 you should not rely on that.
554
555 Output: a list of strings, the names of removed threads.
556
557 Doctests:
558 (Cf. https://en.wikipedia.org/w/index.php?oldid=783715718&diff=783718598)
559 >>> sections_removed_by_diff(783715718,783718598)[:2]
560 ['Red links', 'how to undo a merge made 6 yrs ago']
561 """
562 json1 = get_sections_from_revid(revid1)
563 sec_list_1 = traverse_list_of_sections(json1)
564
565 json2 = get_sections_from_revid(revid2)
566 sec_list_2 = traverse_list_of_sections(json2)
567
568 set_of_sections_removed = safe_list_diff(sec_list_1, sec_list_2)
569 return set_of_sections_removed
570
571
572def revisions_since_x_days(pagename, ndays, maxcontinuenumber=0):
573 """Get revision data for a given page for the last n days.
574
575 Input:
576 - pagename (string), the name of the page
577 - ndays (int or float): lookup revisions of the last ndays days
578 - maxcontinuenumber (int): recursion limit for API calls
579 Output: a list of dict (cf. get_revisions_from_api).
580 """
581 # Per https://www.mediawiki.org/wiki/API:Revisions, rvstart is newer
582 # than rvend if we list in reverse chronological order
583 # (newer revisions first), i.e. "end" and "start" refer to the list.
584 oldtimestamp = UTC_timestamp_x_days_ago(days_offset=ndays)
585 currenttimestamp = UTC_timestamp_x_days_ago(days_offset=0)
586 revs = get_revisions_from_api(pagename, oldtimestamp, currenttimestamp,
587 maxcontinuenumber=maxcontinuenumber)
588
589 return revs
590
591
592def rev_created_newsection(revision): # noqa: D301
593 """Parse the given edit summary to see if a new section was created.
594
595 Input: a dic with key 'comment' which is the string of edit summary, or the
596 key 'commenthidden' if the revision was revdelled
597 Output: a dict whose key 'flag' is True if a section was created and False
598 otherwise; additionally, if 'flag' is True, the dict has the key 'name',
599 containing the name of the thread.
600
601 The given string is matched to "/* %s */ new section"; if matched,
602 we assume the corresponding edit created a section named %s.
603
604 Doctests:
605 >>> rev_created_newsection({'commenthidden': ''})
606 {'flag': False}
607
608 rev_created_newsection({'comment': r'/* Waiting for Godot */ new section'}) ==\
609 True
610 {'flag': True, 'name': 'Waiting for Godot'}
611 """
612 if 'commenthidden' in revision:
613 # Revision was revdelled and the edit summary cannot be known. Skip it.
614 return {'flag': False}
615 pattern = re.compile(r'(\/\* )(.*)( \*\/ new section)')
616 match = pattern.match(revision['comment'])
617 # Note: using pattern.search will pick up e.g. Sinebot's edit summaries of
618 # "Signing comment by Foo - "/* Bar */: new section""
619 # Instead, pattern.match enforces a match at the start of the string
620 if match:
621 return {'flag': True, 'name': match.group(2)}
622 else:
623 return {'flag': False}
624
625
626def newsections_at_teahouse(ndays=10, thname='Wikipedia:Teahouse',
627 maxcontinuenumber=0):
628 """Get 'new section' creations at Teahouse in the last few days.
629
630 Optional arguments:
631 - ndays (10): (int or float) timeframe in days of revision to pull
632 - thname: (string) name of the page whose revisions to pull
633 - maxcontinuenumber: (int) recursion limit for API calls
634 """
635 rev_table = revisions_since_x_days(thname, ndays,
636 maxcontinuenumber=maxcontinuenumber)
637 output = []
638 for rev in rev_table:
639 newsection_created = rev_created_newsection(rev)
640 if newsection_created['flag']:
641 tosave = {'revid': rev['revid'],
642 'name': newsection_created['name'],
643 'user': rev['user'],
644 }
645 output.append(tosave)
646
647 return output
648
649
650def last_archival_edit(maxdays=1, thname='Wikipedia:Teahouse',
651 archiver='Lowercase sigmabot III'):
652 """Parse page history for last archival edit.
653
654 Input:
655 - maxdays (int) the timeframe in days to look for an archival edit
656 - thname (string) title of the page to look at
657 - archiver (string) username of the archival bot
658
659 Output: dict describing the last archival edit.
660 """
661 rev_table = revisions_since_x_days(thname, maxdays)
662 found_flag = False
663 for rev in rev_table:
664 if rev['user'] == archiver: # we found an archival edit
665 es = rev['comment'] # extract edit summary
666 # Determine archive locations from edit summary.
667 # Beware! The edit summary may contain multiple wikilinks.
668 # See for instance
669 # https://en.wikipedia.org/w/index.php?title=Wikipedia%3ATeahouse&type=revision&diff=783570477&oldid=783564581
670 # We need to match non-greedily and find all such links.
671 pattern = r'(\[\[.*?\]\])'
672 links = re.findall(pattern, es)
673
674 if not links: # sanity check that at least one match was found
675 raise ValueError('Archival edit summary does not contain'
676 + 'any wikilink.', es)
677
678 # strip brackets in links
679 strippedlinks = [l[2:-2] for l in links]
680
681 # save relevant edit information
682 output = {'after': rev['revid'],
683 'before': rev['parentid'],
684 'links': strippedlinks,
685 'es': es, # for debugging purposes
686 'archiver': archiver, # same (not used as of 2018-03-18)
687 }
688 found_flag = True
689 break
690 if not found_flag:
691 raise ValueError('No edit by {arc} '.format(arc=archiver)
692 + 'found in the last {n} days'.format(n=maxdays),
693 rev_table)
694 return output
695
696
697def generate_notification_list():
698 """Make list of notifications to make.
699
700 This function makes all the API read calls necessary to determine which
701 threads have been last archived, which users started them, and whether
702 those users are eligible to receive a notification.
703
704 The output is a list of dict, each containing the keys:
705 - 'user' - username of thread started
706 - 'tn' - thread name
707 - 'invalid' - whether a notification can be sent
708 Additionally, it can also contain:
709 - 'archivelink' - a link to the archived thread (with anchor), if found
710 - 'reason' - if 'invalid' is True, explains why
711 """
712 # Get last archival edit
713 lae = last_archival_edit()
714 idbefore = lae['before']
715 idafter = lae['after']
716 # Sections from last archival edit
717 archived_sections = sections_removed_by_diff(idbefore, idafter)
718
719 # New section creations in recent days from page history
720 maxpagestopull = 5
721 nscreated = newsections_at_teahouse(maxcontinuenumber=maxpagestopull)
722
723 # List of threads that were archived in last archival edit, which
724 # could be matched to their creation in the last few days
725 thread_matched = list_matching(archived_sections, nscreated)
726 thread_matched_names = [thread['name'] for thread in thread_matched]
727 thread_matched_users = [thread['user'] for thread in thread_matched]
728
729 # For those, try and recover the corresponding archival link
730 # (including anchor)
731 possible_archive_links = lae['links']
732 list_of_archive_links = search_archives_for_section(possible_archive_links,
733 thread_matched_names)
734
735 # Check if user can be notified
736 is_notifiable = isnotifiable(thread_matched_users)
737
738 # Generate notification list
739 N = len(list_of_archive_links)
740 notification_list = list()
741 for i in range(N):
742 username = thread_matched_users[i]
743 tn = thread_matched_names[i]
744 al = list_of_archive_links[i]
745
746 notif = {'user': username,
747 'thread': tn,
748 'invalid': False,
749 }
750
751 if al:
752 notif['archivelink'] = al
753 else:
754 # skip if the archive link is empty, i.e. it was not found
755 # previously (such an event was logged)
756 notif['invalid'] = True
757 notif['reason'] = 'archive link not found'
758
759 if not is_notifiable[username]:
760 notif['invalid'] = True
761 notif['reason'] = 'user is not notifiable'
762
763 notification_list.append(notif)
764
765 return notification_list
766
767
768def notify(user, argstr, testlvl):
769 """Post archival notification.
770
771 Input:
772 - user: (string) username, will post to User talk:<user>
773 - argstr: (string) contains arguments to pass to template
774 - testlvl: (int) 0 for production, >=1 for various test levels
775
776 No output to stdout, since this will cause posts on WP.
777 """
778 if testlvl == 1:
779 raise ValueError('Test level 1 no longer works.')
780 site = pywikibot.Site('test', 'test')
781 page = pywikibot.Page(site, 'User talk:Muninnbot/THA log')
782 sn = 'Notification intended for [[:en:User talk:' + user + ']]'
783 es = 'Notification intended for [[:en:User talk:' + user + ']]'
784
785 elif testlvl == 2:
786 site = pywikibot.Site('en', 'wikipedia')
787 page = pywikibot.Page(site, 'User talk:Muninnbot/THA log')
788 sn = 'Notification intended for [[:en:User talk:' + user + ']]'
789 es = 'Notification intended for [[:en:User talk:' + user + ']]'
790
791 elif testlvl == 3:
792 site = pywikibot.Site('en', 'wikipedia')
793 page = pywikibot.Page(site, 'User talk:' + user)
794 sn = 'Your thread has been archived'
795 es = 'Automated notification of thread archival (test run)'
796
797 elif testlvl == 0:
798 # Production code goes here
799 site = pywikibot.Site('en', 'wikipedia')
800 page = pywikibot.Page(site, 'User talk:' + user)
801 sn = 'Your thread has been archived'
802
803 # 0 for production, all the rest creates a "this is in test phase" comment
804 if testlvl > 0:
805 test_comment = "</br><small>This functionality is currently under "\
806 + "test. If you received this notification by error, "\
807 + "please [[User talk:Tigraan|notify the bot's"\
808 + " maintainer]].</small>"
809 text = '{{subst:User:Muninnbot/Teahouse archival notification|'\
810 + argstr + '|additionaltext=' + test_comment + '}}'
811 else:
812 text = '{{subst:User:Muninnbot/Teahouse archival notification|'\
813 + argstr + '}}'
814
815 page.save(text=text, summary=sn, section='new', minor=False, botflag=True)
816
817
818def notify_all(notification_list, status,
819 archive_from='[[Wikipedia:Teahouse]]',
820 botname='Muninnbot'):
821 """Execute notification list.
822
823 Input:
824 - notification_list: cf. generate_notification_list for format
825 - status: 'offlinetest' for printing to stdout, 'test-X' for various
826 testing levels, 'prod' for production use
827 - archive_from: original page of the thread (only for notification
828 formatting, not actually checked)
829 - botname: name of the bot who leaves the notification
830
831 No output to stdout, but this will cause posts on WP.
832 """
833 formatspec = 'pagelinked={pl}|threadname={tn}|archivelink={al}|'\
834 + 'botname={bn}|editorname={en}'
835 warnmsg = 'Thread "{thread}" by user {user} will not cause notification:'\
836 + ' {reason}.'
837 for item in notification_list:
838 user = item['user']
839 thread = item['thread']
840
841 if item['invalid']:
842 logging.warning(warnmsg.format(thread=thread, user=user,
843 reason=item['reason']))
844 continue
845 archivelink = item['archivelink']
846
847 argstr = formatspec.format(pl=archive_from, tn=thread, al=archivelink,
848 bn=botname, en=user)
849
850 if status == 'offlinetest':
851 print('[[User talk:' + user + ']] -> {{subst:User:Tigraan-testbot/'
852 + 'Teahouse archival notification|' + argstr + '}}')
853 elif status == 'test-1':
854 notify(user, argstr, testlvl=1)
855 elif status == 'test-2':
856 notify(user, argstr, testlvl=2)
857 elif status == 'test-3':
858 notify(user, argstr, testlvl=3)
859 elif status == 'prod':
860 notify(user, argstr, testlvl=0)
861 else:
862 raise ValueError('Option was not understood.', status)
863
864
865def main(test=False):
866 """Run main procedure.
867
868 Run once the full procedure:
869 - find last archival edit and extract archived threads
870 - lookup in the page history who created those threads
871 - check for each user whether they can be sent a notification
872 - send notifications for whoever can receive them
873
874 With PWB/OAuth we should be logged in everytime.
875 """
876 # try to log in, fail if it does not work
877 s = pywikibot.Site()
878 s.login()
879 assert s.logged_in()
880
881 cur_user = whoami(site=s)
882 logging.info('Currently logged as:' + cur_user)
883 assert cur_user == 'Muninnbot'
884
885 # place the notifications
886 notiflist = generate_notification_list()
887 if test:
888 notify_all(notiflist, status='test-2')
889 else:
890 notify_all(notiflist, status='prod')
891
892if __name__ == "__main__":
893 # Unit test run. See
894 # https://docs.python.org/3/library/doctest.html#simple-usage-checking-examples-in-docstrings
895 import doctest
896 logging.basicConfig(level=logging.ERROR) # ignore logging warnings
897 (failure_count, test_count) = doctest.testmod()
898
899 if failure_count > 0:
900 logging.error("I failed at least one unit test, and will stop here.")
901 else:
902 if '-t' in sys.argv or '--test' in sys.argv:
903 print('Tests run successfully!')
904 else:
905 logging.basicConfig(level=logging.INFO)
906 logging.info("Unit tests passed. Executing the full procedure...")
907 if '-l' in sys.argv or '--tha-log' in sys.argv:
908 main(test=True)
909 else:
910 main(test=False)