· 7 years ago · Dec 04, 2018, 01:28 PM
1import mysql.connector
2import reddit_constants as rc
3import logging
4
5
6class MysqlManager:
7 def __init__(self, database, username, password, logger):
8 self.username = username
9 self.password = password
10 self.database = database
11 self.logger = logger
12
13 def __enter__(self):
14 try:
15 self.create_db()
16 self.conn = mysql.connector.connect(
17 user=self.username,
18 passwd=self.password,
19 auth_plugin='mysql_native_password',
20 database=self.database
21 )
22 if self.conn.is_connected():
23 self.logger.info('Connected to reddit database')
24 return self
25 except:
26 self.logger.error('Error: Could not connect to reddit database')
27 self.conn.close()
28
29 def __exit__(self, exc_type, exc_val, exc_tb):
30 self.conn.close()
31
32 def create_db(self):
33 mydb = mysql.connector.connect(
34 user=self.username,
35 passwd=self.password,
36 auth_plugin='mysql_native_password'
37 )
38
39 cursor = mydb.cursor()
40
41 cursor.execute("CREATE DATABASE IF NOT EXISTS " + self.database)
42
43 mydb.close()
44
45 def create_tables(self):
46 if not self.conn.is_connected():
47 self.logger.Error('Error in MySqlManager: You must initialize the connection to MySql')
48 return
49
50 cursor = self.conn.cursor()
51 cursor.execute("""
52 CREATE TABLE IF NOT EXISTS subreddit
53 (
54 subreddit_id INT AUTO_INCREMENT,
55 subreddit_name VARCHAR(30) NOT NULL UNIQUE,
56 subscriptions INT,
57 PRIMARY KEY (subreddit_id)
58 )
59 """)
60
61 cursor.execute("""
62 CREATE TABLE IF NOT EXISTS post (
63 post_id INT AUTO_INCREMENT,
64 subreddit_id INT NOT NULL,
65 post_title VARCHAR(500) NOT NULL,
66 post_ref VARCHAR(2084),
67 comments_ref VARCHAR(2084),
68 username VARCHAR(30),
69 created_time DATETIME NOT NULL,
70 PRIMARY KEY (post_id),
71 FOREIGN KEY (subreddit_id) REFERENCES subreddit(subreddit_id),
72 CONSTRAINT UC_post UNIQUE(subreddit_id, post_title, username, created_time)
73 )
74 """)
75
76 cursor.execute("""
77 CREATE TABLE IF NOT EXISTS post_history (
78 post_history_id INT AUTO_INCREMENT,
79 post_id INT,
80 votes INT,
81 ranks INT,
82 updated_time DATETIME,
83 PRIMARY KEY (post_history_id),
84 FOREIGN KEY (post_id) references post(post_id)
85 )
86 """)
87
88 cursor.execute("""
89 CREATE TABLE IF NOT EXISTS comment
90 (
91 comment_id INT AUTO_INCREMENT,
92 post_id INT NOT NULL,
93 subreddit_id INT NOT NULL,
94 username VARCHAR(30) NOT NULL,
95 created_time DATETIME NOT NULL,
96 PRIMARY KEY (comment_id),
97 FOREIGN KEY (subreddit_id) REFERENCES subreddit(subreddit_id),
98 FOREIGN KEY (post_id) REFERENCES post(post_id),
99 CONSTRAINT UC_comment UNIQUE(subreddit_id, post_id, username, created_time)
100 )
101 """)
102
103 cursor.execute("""
104 CREATE TABLE IF NOT EXISTS comment_history
105 (
106 comment_history_id INT AUTO_INCREMENT,
107 comment_id INT,
108 message TEXT,
109 votes INT,
110 updated_time DATETIME,
111 PRIMARY KEY (comment_history_id),
112 FOREIGN KEY (comment_id) references comment(comment_id)
113 )
114 """)
115
116 cursor.execute("""
117 CREATE TABLE IF NOT EXISTS youtube_info
118 (
119 youtube_info_id INT AUTO_INCREMENT,
120 post_id INT,
121 video_title TEXT,
122 publish_date DATETIME,
123 view_count INT,
124 like_count INT,
125 dislike_count INT,
126 comment_count INT,
127 PRIMARY KEY (youtube_info_id),
128 FOREIGN KEY (post_id) REFERENCES post(post_id),
129 CONSTRAINT UC_youtube_info UNIQUE(post_id)
130 )
131 """)
132
133 def insert_subreddits(self, posts):
134 if not self.conn.is_connected():
135 self.logger.Error('Error in MySqlManager: You must initialize the connection to MySql')
136 return
137 cursor = self.conn.cursor()
138
139 for post in posts:
140 values = (post[rc.SUBREDDIT_KEY], None)
141 query = """
142 INSERT IGNORE INTO subreddit (subreddit_name, subscriptions)
143 VALUES(%s, %s)
144 """
145 cursor.execute(query, values)
146 self.conn.commit()
147 new_id = cursor.lastrowid
148 if new_id == 0:
149 id_query = "SELECT subreddit_id FROM subreddit WHERE subreddit_name = %s"
150 id_values = (post[rc.SUBREDDIT_KEY],)
151 cursor.execute(id_query, id_values)
152 new_id = cursor.next()[0]
153 post[rc.SUBREDDIT_ID] = new_id
154 self.logger.info(' - Inserted subreddits from page successfully')
155
156 def insert_posts(self, posts):
157 if not self.conn.is_connected():
158 self.logger.Error('Error in MySqlManager: You must initialize the connection to MySql')
159 return
160 cursor = self.conn.cursor()
161
162 for post in posts:
163 post_values = (post[rc.SUBREDDIT_ID],
164 post[rc.POST_TITLE_KEY],
165 post[rc.POST_REF_KEY],
166 post[rc.COMMENTS_REF_KEY],
167 post[rc.USER_KEY],
168 post[rc.CREATED_TIMESTAMP_KEY])
169 post_query = """
170 INSERT IGNORE INTO post (subreddit_id, post_title, post_ref,
171 comments_ref, username, created_time)
172 VALUES (%s, %s, %s, %s, %s, %s)
173 """
174
175 cursor.execute(post_query, post_values)
176 self.conn.commit()
177 new_id = cursor.lastrowid
178
179 if new_id == 0:
180 id_query = """
181 SELECT post_id
182 FROM post
183 WHERE subreddit_id = %s
184 AND post_title = %s
185 AND username = %s
186 AND created_time = %s
187 """
188 id_values = (post[rc.SUBREDDIT_ID],
189 post[rc.POST_TITLE_KEY],
190 post[rc.USER_KEY],
191 post[rc.CREATED_TIMESTAMP_KEY])
192 cursor.execute(id_query, id_values)
193 new_id = cursor.next()[0]
194 post[rc.POST_ID] = new_id
195
196 post_history_values = (post[rc.POST_ID],
197 post[rc.VOTE_KEY],
198 post[rc.RANK_KEY])
199
200 post_history_query = """
201 INSERT INTO post_history (post_id, votes, ranks, updated_time)
202 (SELECT %s, %s, %s, NOW())
203 """
204
205 cursor.execute(post_history_query, post_history_values)
206 self.conn.commit()
207 self.logger.info(' - Inserted posts from page successfully')
208
209 def insert_video_info(self, video_info):
210 if not self.conn.is_connected():
211 self.logger.Error('MySqlManager: You must initialize the connection to MySql')
212 return
213 cursor = self.conn.cursor()
214
215 video_info_values = (video_info[rc.POST_ID],
216 video_info[rc.YOUTUBE_TITLE_KEY],
217 video_info[rc.YOUTUBE_PUBLISHED_KEY],
218 video_info[rc.YOUTUBE_VIEW_COUNT_KEY],
219 video_info[rc.YOUTUBE_LIKE_KEY],
220 video_info[rc.YOUTUBE_DISLIKE_KEY],
221 video_info[rc.YOUTUBE_COMMENT_KEY])
222
223 video_info_query = """
224 INSERT IGNORE INTO youtube_info (post_id,
225 video_title,
226 publish_date,
227 view_count,
228 like_count,
229 dislike_count,
230 comment_count)
231 VALUES (%s, %s, %s, %s, %s, %s, %s)
232 """
233
234 cursor.execute(video_info_query, video_info_values)
235 self.conn.commit()
236
237 def insert_comments(self, comments, post):
238 if not self.conn.is_connected():
239 self.logger.Error('Error in MySqlManager: You must initialize the connection to MySql')
240 return
241 cursor = self.conn.cursor()
242
243 for comment in comments:
244
245 comment_values = (post[rc.POST_ID],
246 post[rc.SUBREDDIT_ID],
247 comment[rc.USER_KEY],
248 comment[rc.CREATED_TIMESTAMP_KEY])
249 comment_query = """
250 INSERT IGNORE INTO comment (post_id, subreddit_id, username, created_time)
251 VALUES (%s, %s, %s, %s)
252 """
253
254 cursor.execute(comment_query, comment_values)
255 self.conn.commit()
256 new_id = cursor.lastrowid
257 if new_id == 0:
258 id_query = """
259 SELECT comment_id
260 FROM comment
261 WHERE post_id = %s
262 AND username = %s
263 AND created_time = %s
264 """
265 id_values = (post[rc.POST_ID],
266 comment[rc.USER_KEY],
267 comment[rc.CREATED_TIMESTAMP_KEY])
268 cursor.execute(id_query, id_values)
269 new_id = cursor.next()[0]
270 comment[rc.COMMENT_ID] = new_id
271
272 comment_history_values = (comment[rc.COMMENT_ID],
273 comment[rc.VOTE_KEY],
274 comment[rc.MESSAGE_KEY])
275 comment_history_query = """
276 INSERT INTO comment_history (comment_id, votes, message, updated_time)
277 (SELECT %s, %s, %s, NOW())
278 """
279 cursor.execute(comment_history_query, comment_history_values)
280 self.conn.commit()
281 self.logger.info('Inserted comments from {} post successfully'.format(post[rc.POST_REF_KEY]))
282
283
284def mysql_test_suite():
285 import csv
286 with MysqlManager(rc.DATABASE_NAME,
287 rc.DB_USERNAME,
288 rc.DB_PASSWORD,
289 logging.getLogger('test')) as mysql_test:
290
291 with open("testfiles/posts.csv", "r") as f:
292 reader = csv.DictReader(f)
293 posts = list(reader)
294
295 mysql_test.insert_subreddits(posts)
296 mysql_test.insert_posts(posts)
297
298 for post_test in posts:
299 assert post_test[rc.SUBREDDIT_ID] > 0
300 assert post_test[rc.POST_ID] > 0
301
302 with open("testfiles/comments.csv", "r") as f:
303 reader = csv.DictReader(f)
304 comments = list(reader)
305
306 mysql_test.insert_comments(comments, posts[0])
307
308 for comment in comments:
309 assert comment[rc.COMMENT_ID] > 0
310
311
312if __name__ == "__main__":
313 mysql_test_suite()
314 print("All mysql tests ran successfully")