· 7 years ago · Dec 04, 2018, 01:12 PM
1import multiprocessing
2from multiprocessing.pool import Pool
3from mailbox import mbox, Message
4import logging
5import os
6import argparse
7import sqlite3
8import json
9import quopri
10from pathlib import Path
11import sys
12
13logger = logging.getLogger(__name__)
14
15
16class BodyParseFailedException(Exception):
17 pass
18
19
20class MBOXParser(object):
21 def __init__(self, mbox_path):
22 self.__mbox_path = mbox_path
23
24 def parse_mbox(self):
25 mailbox = mbox(self.__mbox_path, factory=Message)
26 return [parsed_email for parsed_email in [self.parse_mail(mail) for mail in mailbox] if parsed_email]
27
28 def parse_mail(self, mail):
29 try:
30 parsed_mail = self.parse_mail_payload(mail)
31 except BodyParseFailedException:
32 return None
33
34 try:
35 parsed_mail['headers'] = json.dumps(mail.items())
36 except TypeError as e:
37 logger.critical('failed to parse headers', extra=dict(headers=mail.items()))
38 parsed_mail['headers'] = json.dumps([])
39
40 parsed_mail['in_reply_to'] = mail.get('in-reply-to', '')
41 parsed_mail['references'] = mail.get('references', '')
42 return parsed_mail
43
44 def parse_mail_payload(self, mail):
45 """
46 Parse payload of a mail. Payload can be multipart or non-multipart.
47 non-multipart will only have plain text body
48 multipart may have html body or attachments. html body is handled, attachments are ignored for now
49 :param mail:
50 :type: Message
51 :return:
52 """
53 obj = dict(
54 is_multipart=mail.is_multipart(),
55 body_plain_text="",
56 body_html="",
57 has_attachment=False
58 )
59 if not mail.is_multipart():
60 obj["body_plain_text"] = mail.get_payload()
61 return obj
62
63 for mime_part in mail.get_payload():
64 if mime_part.get_content_type() == 'text/plain':
65 obj["body_plain_text"] = self.decode_quoted_printable(mime_part)
66 elif mime_part.get_content_type() == 'text/html':
67 obj["body_html"] = self.decode_quoted_printable(mime_part)
68 elif mime_part.get_content_disposition() == 'attachment':
69 obj["has_attachment"] = True
70 else:
71 logger.debug('unknown content type',
72 extra=dict(content_type=mime_part.get_content_type(), headers=mime_part.items()))
73
74 if not obj["body_plain_text"] and not obj["body_html"]:
75 try:
76 raw_mime = str(mail)
77 except UnicodeEncodeError:
78 raw_mime = ''
79
80 logger.info('body parsing failed or was an empty message', extra=dict(mail=raw_mime))
81 raise BodyParseFailedException('body parsing failed')
82
83 return obj
84
85 def decode_quoted_printable(self, mime_part):
86 content_trasnfer_encoding = mime_part.get('Content-Transfer-Encoding')
87 if content_trasnfer_encoding == 'quoted-printable':
88 try:
89 return quopri.decodestring(mime_part.get_payload())
90 except ValueError:
91 # non ascii characters cannot be handled. don't try to decode quoted printable
92 return mime_part.get_payload()
93
94 return mime_part.get_payload()
95
96
97class BacthMBOXParser(object):
98 def __init__(self, mbox_dir_path):
99 sqlite_path = os.path.join(Path.home(), 'parsed_emails.db')
100 self.__mbox_dir_path = mbox_dir_path
101 self.__db = sqlite3.connect(sqlite_path)
102 self.__db.execute(
103 'CREATE TABLE IF NOT EXISTS `email` (`body_html` TEXT, `body_plain_text` TEXT, `headers` TEXT, `in-reply-to` TEXT, `references` TEXT, has_attachment INTEGER)')
104
105 def start(self):
106 process_pool = Pool(multiprocessing.cpu_count())
107 files = [os.path.join(self.__mbox_dir_path, file) for file in os.listdir(self.__mbox_dir_path)]
108 for parsed_mails in process_pool.imap_unordered(parse_mbox, files):
109 self.insert_in_sqlite(parsed_mails)
110
111 def insert_in_sqlite(self, parsed_mails):
112 self.__db.executemany(
113 'INSERT INTO email(body_html, body_plain_text, headers, `in-reply-to`, `references`, has_attachment) VALUES (:body_html, :body_plain_text, :headers, :in_reply_to, :references, :has_attachment)',
114 parsed_mails)
115 self.__db.commit()
116
117 def __del__(self):
118 self.__db.close()
119
120
121def parse_mbox(mbox_path):
122 print('processing %s' % mbox_path)
123 return MBOXParser(mbox_path).parse_mbox()
124
125
126if __name__ == '__main__':
127 if (sys.version_info[0] * 1 + sys.version_info[1] * 0.1) < 3.5:
128 raise Exception("Python 3.5 or a more recent version is required.")
129
130 # Use the following to split a bigger mbox file into multiple files of 100 emails each
131 # awk 'BEGIN{chunk=0} /^From /{msgs++;if(msgs==100){msgs=0;chunk++}}{print > "chunk_" chunk ".txt"}' ~/email_dump/podesta-emails.mbox-2016-11-06
132 parser = argparse.ArgumentParser(description='Parse mbox files into SQLite')
133 parser.add_argument('--mbox_dir_path',
134 help='Path to the directory containing mbox files. Please ensure that this directory is only having valid mbox files',
135 required=True)
136 args = parser.parse_args()
137 BacthMBOXParser(args.mbox_dir_path).start()