· 5 years ago · Jul 02, 2020, 08:42 PM
1# Any copyright is dedicated to the Public Domain.
2# https://creativecommons.org/publicdomain/zero/1.0/
3
4import json
5import urllib.request
6import time
7import sqlite3
8import multiprocessing
9import os
10import sys
11
12def GetPosts(_before_date, _after_date):
13 conn = sqlite3.connect('fullcommunism_' + str(_before_date) + '.db')
14 c = conn.cursor()
15
16 sql_statement = " CREATE TABLE IF NOT EXISTS posts (id text PRIMARY_KEY, author text NOT NULL, title text NOT NULL, selftext text NOT NULL, url text NOT NULL, imgurl text NOT NULL);"
17
18 c.execute(sql_statement)
19
20 conn.commit()
21 c.close()
22
23 before_date = _before_date
24 after_date = _after_date
25
26 while 1:
27 url = "https://api.pushshift.io/reddit/submission/search/?after={}&before={}&sort=asc&sort_type=created_utc&subreddit=FULLCOMMUNISM&size=1000".format(after_date, before_date)
28 print(url)
29 data = json.loads(urllib.request.urlopen(url).read().decode())
30 last_timestamp = after_date
31 current_post = {}
32 for x in range(len(data["data"])):
33 postinfo = data["data"][x]
34 last_timestamp = postinfo["created_utc"]
35 if "selftext" not in postinfo:
36 postinfo["selftext"] = ""
37 current_post = {
38 "id": postinfo["id"],
39 "author": postinfo["author"],
40 "title": postinfo["title"],
41 "selftext": postinfo["selftext"],
42 "url": postinfo["permalink"],
43 "imgurl": postinfo["url"],
44 }
45 sql = "INSERT INTO posts (id,author,title,selftext,url,imgurl) VALUES (?,?,?,?,?,?)"
46 cursor = conn.cursor()
47 cursor.execute(sql, ( current_post["id"], current_post["author"], current_post["title"], current_post["selftext"], current_post["url"], current_post["imgurl"]))
48
49 conn.commit()
50
51 if after_date == last_timestamp:
52 break
53 after_date = last_timestamp
54
55def GetComments(_before_date, _after_date):
56 conn = sqlite3.connect('fullcommunism_' + str(_before_date) + '.db')
57 c = conn.cursor()
58
59 sql_statement = " CREATE TABLE IF NOT EXISTS comments (id text PRIMARY_KEY, author text NOT NULL, body text NOT NULL, link_id text NOT NULL, parent_id text NOT NULL);"
60
61 c.execute(sql_statement)
62
63 conn.commit()
64
65 before_date = _before_date
66 after_date = _after_date
67
68 while 1:
69 url = "https://api.pushshift.io/reddit/comment/search/?after={}&before={}&sort=asc&sort_type=created_utc&subreddit=FULLCOMMUNISM&size=1000".format(after_date, before_date)
70 print(url)
71 data = json.loads(urllib.request.urlopen(url).read().decode())
72 last_timestamp = after_date
73 current_post = {}
74 for x in range(len(data["data"])):
75 comment_data = data["data"][x]
76 comment_info = {
77 "author": comment_data["author"],
78 "body": comment_data["body"],
79 "id": comment_data["id"],
80 "link_id": comment_data["link_id"],
81 "parent_id": comment_data["parent_id"]
82 }
83 sql = "INSERT INTO comments (id,author,body,link_id,parent_id) VALUES (?,?,?,?,?)"
84 c.execute(sql, (comment_info["id"], comment_info["author"], comment_info["body"], comment_info["link_id"], comment_info["parent_id"]))
85
86 conn.commit()
87
88 if after_date == last_timestamp:
89 break
90 after_date = last_timestamp
91
92def subdivide(after, before, target_processes):
93 rf = 1 / target_processes
94 r=0
95 ret = []
96 for x in range(target_processes):
97 ret.append(int(after + (before - after)*r))
98 r += rf
99 ret.append(int(before))
100 return ret
101
102if __name__ == "__main__":
103 before, after = (int(x) for x in sys.argv[1:])
104
105 # just use 4 processes for now
106 # this is kinda bad, since the first days in a sub's life is less active than preset meaning the first processes get quite a light task load
107 values = subdivide(after, before, 4)
108
109 # this takes a while doing sequentially and cpython still has that GIL limiting all threads to one core and i am forced to spawn child processes
110 # when this would have been perfectly fine to just use threads but such is life
111 processes = []
112 for x in range(len(values) - 1):
113 _after = values[x]
114 _before = values[x+1]
115 process = multiprocessing.Process(target=GetPosts, args=(_before, _after,))
116 processes.append(process)
117 process.start()
118 time.sleep(3) # pushshift thinks i am DoSing them or something if all the processes does a request at the same time
119
120 for process in processes:
121 process.join()
122
123 processes = []
124 for x in range(len(values) - 1):
125 _after = values[x]
126 _before = values[x+1]
127 process = multiprocessing.Process(target=GetComments, args=(_before, _after,))
128 processes.append(process)
129 process.start()
130 time.sleep(3) # pushshift thinks i am DoSing them or something if all the processes does a request at the same time
131
132 for process in processes:
133 process.join()
134
135 # merge dbs
136 conn_primary = sqlite3.connect('fullcommunism.db')
137 c_primary = conn_primary.cursor()
138 sql_statement = " CREATE TABLE IF NOT EXISTS posts (id text PRIMARY_KEY, author text NOT NULL, title text NOT NULL, selftext text NOT NULL, url text NOT NULL, imgurl text NOT NULL);"
139 c_primary.execute(sql_statement)
140 sql_statement = " CREATE TABLE IF NOT EXISTS comments (id text PRIMARY_KEY, author text NOT NULL, body text NOT NULL, link_id text NOT NULL, parent_id text NOT NULL);"
141 c_primary.execute(sql_statement)
142 conn_primary.commit()
143
144 files = [f for f in os.listdir('.') if os.path.isfile(f) and f.endswith(".db") and f != "fullcommunism.db"]
145 for file in files:
146 conn_temp = sqlite3.connect(file)
147 c_temp = conn_temp.cursor()
148 c_temp.execute("SELECT * FROM posts")
149 rows = c_temp.fetchall()
150
151 for row in rows:
152 c_primary.execute("INSERT INTO posts (id,author,title,selftext,url,imgurl) VALUES (?,?,?,?,?,?)", row)
153
154 c_temp.execute("SELECT * FROM comments")
155 rows = c_temp.fetchall()
156
157 for row in rows:
158 c_primary.execute("INSERT INTO comments (id,author,body,link_id,parent_id) VALUES (?,?,?,?,?)", row)
159
160 c_temp.close()
161 conn_temp.close()
162 os.remove(file)
163
164 conn_primary.commit()
165 c_primary.close()