· 6 years ago · Dec 18, 2019, 06:50 PM
1const mysql = require('mysql');
2const Lblr = require('line-by-line');
3
4
5// let importfiles = ['RC_2007-10.redditjson',
6// 'RC_2011-07.redditjson', 'RC_2012-12.redditjson'];
7const importfile = 'RC_2007-10.redditjson';
8let connection = '';
9const constrained = true;
10
11
12const constrainedTablesQueries = [`CREATE TABLE IF NOT EXISTS subreddits (
13 id VARCHAR(15) PRIMARY KEY,
14 name VARCHAR(255) NOT NULL
15)`, `CREATE TABLE IF NOT EXISTS posts (
16 id VARCHAR(15) PRIMARY KEY,
17 subr_id VARCHAR(15),
18 CONSTRAINT FOREIGN KEY(subr_id) REFERENCES subreddits(id)
19)`, `CREATE TABLE IF NOT EXISTS comments (
20 id VARCHAR(15) PRIMARY KEY,
21 parent_id VARCHAR(15) NOT NULL,
22 body TEXT NOT NULL,
23 score INT NOT NULL,
24 created_time INT(11) NOT NULL,
25 author VARCHAR(100) NOT NULL,
26 post_id VARCHAR(15),
27 CONSTRAINT FOREIGN KEY(post_id) REFERENCES posts(id)
28)`];
29
30const unconstrainedTablesQueries = [`CREATE TABLE IF NOT EXISTS subreddits (
31 id VARCHAR(15),
32 name VARCHAR(255)
33)`, `CREATE TABLE IF NOT EXISTS posts (
34 id VARCHAR(15),
35 subr_id VARCHAR(15)
36)`, `CREATE TABLE IF NOT EXISTS comments (
37 id VARCHAR(15),
38 parent_id VARCHAR(15),
39 body TEXT,
40 score INT,
41 created_time INT(11),
42 author VARCHAR(100),
43 post_id VARCHAR(15)
44)`];
45
46
47const getConnection = () => {
48 const options = {
49 host: 'localhost',
50 user: 'albert',
51 password: 'password1',
52 insecureAuth: false,
53 };
54
55 if (constrained) {
56 options.database = 'reddit2restricted';
57 } else {
58 options.database = 'reddit1';
59 }
60 return mysql.createConnection(options);
61};
62
63const insertTables = (callback) => {
64 if (constrained) {
65 connection.query(constrainedTablesQueries[0], () => {
66 connection.query(constrainedTablesQueries[1], () => {
67 connection.query(constrainedTablesQueries[2], () => {
68 callback();
69 });
70 });
71 });
72 } else {
73 connection.query(unconstrainedTablesQueries[0], () => {
74 connection.query(unconstrainedTablesQueries[1], () => {
75 connection.query(unconstrainedTablesQueries[2], () => {
76 callback();
77 });
78 });
79 });
80 }
81};
82
83const insertData = () => {
84 const lineReader = new Lblr(importfile);
85 lineReader.setMaxListeners(0);
86 let comments = [];
87 let posts = [];
88 let subreddits = [];
89 let postsSet = new Set();
90 let subredditsSet = new Set();
91
92 let readLines = 0;
93 const maxReadLines = 500000;
94
95 const doInsert = (callback) => {
96 lineReader.pause();
97 // inputing the information we have into the database with bulk
98 const sql1 = `INSERT INTO subreddits(id, name) VALUES ?`;
99 const sql2 = `INSERT INTO posts (id, subr_id) VALUES ?`;
100 const sql3 = `INSERT INTO comments (id, parent_id, body, score
101 ,created_time, author, post_id)
102 VALUES ?`;
103
104
105 connection.query(sql1, [subreddits], () => {
106 console.log('ADDED BULK ENTRY TO SUBREDDITS TABLE');
107 connection.query(sql2, [posts], () => {
108 console.log('ADDED BULK ENTRY TO POSTS TABLE');
109 connection.query(sql3, [comments], () => {
110 console.log('ADDED BULK ENTRY TO COMMENTS TABLE');
111 readLines = 0;
112 comments = [];
113 posts = [];
114 subreddits = [];
115 postsSet = new Set();
116 subredditsSet = new Set();
117 lineReader.resume();
118 });
119 });
120 });
121 callback();
122 };
123
124 lineReader.on('line', (line) => {
125 if (readLines < maxReadLines) {
126 const lineObject = JSON.parse(line);
127 const currentComments = [lineObject.id, lineObject.parent_id, lineObject.body,
128 lineObject.score,
129 lineObject.created_utc, lineObject.author, lineObject.link_id];
130 const currentPosts = [lineObject.link_id, lineObject.subreddit_id];
131 const currentSubreddits = [lineObject.subreddit_id, lineObject.subreddit];
132
133 comments.push(currentComments);
134
135 if (!postsSet.has(lineObject.link_id)) {
136 postsSet.add(lineObject.link_id);
137 posts.push(currentPosts);
138 }
139
140 if (!subredditsSet.has(lineObject.subreddit_id)) {
141 subredditsSet.add(lineObject.subreddit_id);
142 subreddits.push(currentSubreddits);
143 }
144 /* posts.add(currentPosts, lineObject.link_id);
145 // subreddits.add(currentSubreddits);
146 subreddits.add(lineObject.subreddit_id, lineObject.subreddit); */
147 readLines++;
148 } else {
149 doInsert(connection.end);
150 }
151 });
152 lineReader.on('end', () => {
153 doInsert();
154 });
155};
156
157
158const start = () => {
159 connection = getConnection();
160 insertTables(insertData);
161};
162
163start();