· 5 years ago · Jul 17, 2020, 10:40 PM
1def get_code_from_mail(self, student, mode):
2 ImapConfig = namedtuple('ImapConfig', ['host', 'port', 'domains', ])
3
4 IMAP_CONFIGS = [
5 ImapConfig('imap.mail.yahoo.com', imaplib.IMAP4_SSL_PORT, ['yahoo.com', ]),
6 ImapConfig('imap.gmail.com', imaplib.IMAP4_SSL_PORT, ['gmail.com', ]),
7 ImapConfig('imap.yandex.ru', imaplib.IMAP4_SSL_PORT, ['yandex.ru', ]),
8 ImapConfig('imap.mail.ru', imaplib.IMAP4_SSL_PORT, ['mail.ru', 'inbox.ru', 'list.ru', 'bk.ru']),
9 ]
10
11 def get_imap_config(email):
12 # type: (str) -> typing.Optional[ImapConfig]
13 host = email.lower().rsplit('@', 1)[-1]
14 for config in IMAP_CONFIGS:
15 if host in config.domains:
16 return config
17 return None
18
19 def get_subject(raw_subject):
20 # type: (str) -> str
21 try:
22 email_subject, email_subject_encoding = decode_header(raw_subject)[0]
23 if email_subject_encoding is not None:
24 return email_subject.decode(email_subject_encoding)
25 except (ValueError):
26 pass
27 return raw_subject
28
29 class EmailDetails(object):
30 def __init__(self, email_msg):
31 # type: (email.message.Message) -> None
32 self._email = email_msg
33
34 @cached_property
35 def subject(self):
36 # type: () -> str
37 return get_subject(str(self._email.get('Subject', '')))
38
39 @cached_property
40 def date(self):
41 # type: () -> datetime.datetime
42 return email.utils.parsedate_to_datetime(self._email.get('date'))
43
44 @property
45 def body(self, mimetype='text/html'):
46 # type: (str) -> typing.Generator[typing.Any]
47 decode = True
48 if self._email.is_multipart():
49 for part in self._email.get_payload(): # type: email.message.Message
50 content_disposition = str(part.get('Content-Disposition'))
51 if 'attachment' in content_disposition:
52 continue
53
54 if part.get_content_type() == mimetype:
55 yield part.get_payload(decode=decode)
56 else:
57 if self._email.get_content_type() == mimetype:
58 yield self._email.get_payload(decode=decode)
59
60 class BaseMailClient(object):
61 def __init__(self, email, password):
62 # type: (str, str) -> None
63 self._email = email
64 self._password = password
65 self._config = get_imap_config(self._email) # type: typing.Optional[ImapConfig]
66 self._client = None # type: typing.Optional[imaplib.IMAP4_SSL]
67
68 @property
69 def client(self):
70 if self._config is None:
71 raise Exception('Unknown mail provider')
72 if self._client is None:
73 self._client = imaplib.IMAP4_SSL(self._config.host, self._config.port)
74 return self._client
75
76 def connect(self):
77 # type: () -> None
78 logger.debug('Using %s:%s for %s', self._config.host, self._config.port, self._email)
79 logger.debug(self.client.login(self._email, self._password))
80
81 def __enter__(self):
82 # type: () -> BaseMailClient
83 self.connect()
84 logger.debug(' > open connection')
85 return self
86
87 def __exit__(self, exc_type, exc_val, exc_tb):
88 # type: (typing.Optional[typing.Any], typing.Optional[typing.Any], typing.Optional[typing.Any]) -> None
89 logger.debug(' > closing connection')
90 self.client.close()
91
92 def close(self):
93 # type: () -> None
94 self.client.logout()
95 self.client.close()
96
97 def fetch_mails(self, mailbox='INBOX', max_attempts=3):
98 # type: (str, int) -> typing.Generator[EmailDetails]
99 result, _ = self.client.select(mailbox, readonly=True)
100 result, data = self.client.uid('search', None, 'ALL')
101 for mail_id in reversed(data[0].split()): # type: bytes
102 attempts = max_attempts
103 while attempts >= 0:
104 result, data = self.client.uid('fetch', mail_id, '(RFC822)')
105 if result != 'OK':
106 logger.debug('Unable to fetch mail %s; status: %s', mail_id, result)
107 attempts -= 1
108 continue
109 yield EmailDetails(email.message_from_bytes(data[0][1]))
110 break
111
112 def extract_otp(self, text):
113 # type: (str) -> typing.Optional[str]
114 if mode == 'reg':
115 code = re.findall(r'\"otp\">(.*?)<\/p>', text, re.DOTALL | re.UNICODE | re.MULTILINE)
116 logger.debug(text)
117 try:
118 otp_code = code[0]
119 self.otp_code = otp_code
120 except (IndexError, ValueError):
121 otp_code = None
122 return otp_code
123
124 def find_otp_code(client, subject, max_age):
125 # type: (BaseMailClient, str, int) -> typing.Optional[str]
126 current_time = datetime.datetime.now()
127 logger.debug('Current time: %s', current_time)
128
129 for mail in client.fetch_mails(): # type: EmailDetails
130 time_delta = current_time - mail.date.replace(tzinfo=None)
131 ## if received less then N sec
132 if time_delta.total_seconds() <= max_age:
133 ## skip if mail subject mismatch
134 if mail.subject != subject:
135 continue
136 logger.debug(' * %s', mail.subject)
137 ## find first OTP code
138 for part in mail.body:
139 otp_code = extract_otp(self, part.decode())
140 if otp_code is not None:
141 return otp_code
142 else:
143 break
144
145 def login_code():
146 # type: () -> None
147 EMAIL = student.login
148 PASSWORD = student.password
149
150 ## Settings
151 if mode == 'reg':
152 MAIL_SUBJECT = 'Verify your new student account'
153 MAIL_SEARCH_THRESHOLD = 20000 * 60 ## 10 min
154
155 with BaseMailClient(EMAIL, PASSWORD) as client:
156 otp_code = find_otp_code(client, subject=MAIL_SUBJECT, max_age=MAIL_SEARCH_THRESHOLD)
157 if otp_code is not None:
158 logger.info('OTP code: %s', otp_code)
159 print(otp_code)
160 else:
161 logger.info('Unable to find OTP code')
162
163 login_code()