· 5 years ago · Jul 03, 2020, 07:50 AM
1# Any copyright is dedicated to the Public Domain.
2# https://creativecommons.org/publicdomain/zero/1.0/
3
4import json
5import urllib.request
6import time
7import sqlite3
8import multiprocessing
9import os
10import sys
11
12def GetPosts(_before_date, _after_date):
13 conn = sqlite3.connect('fullcommunism_' + str(_before_date) + '.db')
14 c = conn.cursor()
15
16 sql_statement = " CREATE TABLE IF NOT EXISTS posts (id text PRIMARY_KEY, author text NOT NULL, title text NOT NULL, selftext text NOT NULL, url text NOT NULL, imgurl text NOT NULL);"
17
18 c.execute(sql_statement)
19
20 conn.commit()
21 c.close()
22
23 before_date = _before_date
24 after_date = _after_date
25
26 while 1:
27 url = "https://api.pushshift.io/reddit/submission/search/?after={}&before={}&sort=asc&sort_type=created_utc&subreddit=FULLCOMMUNISM&size=1000".format(after_date, before_date)
28 print(url)
29 data = json.loads(urllib.request.urlopen(url).read().decode())
30 last_timestamp = after_date
31 current_post = {}
32 for x in range(len(data["data"])):
33 postinfo = data["data"][x]
34 last_timestamp = postinfo["created_utc"]
35 if "selftext" not in postinfo:
36 postinfo["selftext"] = ""
37 current_post = {
38 "id": postinfo["id"],
39 "author": postinfo["author"],
40 "title": postinfo["title"],
41 "selftext": postinfo["selftext"],
42 "url": postinfo["permalink"],
43 "imgurl": postinfo["url"],
44 }
45 sql = "INSERT INTO posts (id,author,title,selftext,url,imgurl) VALUES (?,?,?,?,?,?)"
46 cursor = conn.cursor()
47 cursor.execute(sql, ( current_post["id"], current_post["author"], current_post["title"], current_post["selftext"], current_post["url"], current_post["imgurl"]))
48
49 conn.commit()
50
51 if after_date == last_timestamp:
52 break
53 after_date = last_timestamp
54
55def GetComments(_before_date, _after_date):
56 conn = sqlite3.connect('fullcommunism_' + str(_before_date) + '.db')
57 c = conn.cursor()
58
59 sql_statement = " CREATE TABLE IF NOT EXISTS comments (id text PRIMARY_KEY, author text NOT NULL, body text NOT NULL, link_id text NOT NULL, parent_id text NOT NULL);"
60
61 c.execute(sql_statement)
62
63 conn.commit()
64
65 before_date = _before_date
66 after_date = _after_date
67
68 while 1:
69 url = "https://api.pushshift.io/reddit/comment/search/?after={}&before={}&sort=asc&sort_type=created_utc&subreddit=FULLCOMMUNISM&size=1000".format(after_date, before_date)
70 print(url)
71 data = json.loads(urllib.request.urlopen(url).read().decode())
72 last_timestamp = after_date
73 current_post = {}
74 for x in range(len(data["data"])):
75 comment_data = data["data"][x]
76 last_timestamp = comment_data["created_utc"]
77 comment_info = {
78 "author": comment_data["author"],
79 "body": comment_data["body"],
80 "id": comment_data["id"],
81 "link_id": comment_data["link_id"],
82 "parent_id": comment_data["parent_id"]
83 }
84 sql = "INSERT INTO comments (id,author,body,link_id,parent_id) VALUES (?,?,?,?,?)"
85 c.execute(sql, (comment_info["id"], comment_info["author"], comment_info["body"], comment_info["link_id"], comment_info["parent_id"]))
86
87 conn.commit()
88
89 if after_date == last_timestamp:
90 break
91 after_date = last_timestamp
92
93def subdivide(after, before, target_processes):
94 rf = 1 / target_processes
95 r=0
96 ret = []
97 for x in range(target_processes):
98 ret.append(int(after + (before - after)*r))
99 r += rf
100 ret.append(int(before))
101 return ret
102
103if __name__ == "__main__":
104 before, after = (int(x) for x in sys.argv[1:])
105
106 # just use 4 processes for now
107 # this is kinda bad, since the first days in a sub's life is less active than preset meaning the first processes get quite a light task load
108 values = subdivide(after, before, 4)
109
110 # this takes a while doing sequentially and cpython still has that GIL limiting all threads to one core and i am forced to spawn child processes
111 # when this would have been perfectly fine to just use threads but such is life
112 processes = []
113 for x in range(len(values) - 1):
114 _after = values[x]
115 _before = values[x+1]
116 process = multiprocessing.Process(target=GetPosts, args=(_before, _after,))
117 processes.append(process)
118 process.start()
119 time.sleep(3) # pushshift thinks i am DoSing them or something if all the processes does a request at the same time
120
121 for process in processes:
122 process.join()
123
124 processes = []
125 for x in range(len(values) - 1):
126 _after = values[x]
127 _before = values[x+1]
128 process = multiprocessing.Process(target=GetComments, args=(_before, _after,))
129 processes.append(process)
130 process.start()
131 time.sleep(3) # pushshift thinks i am DoSing them or something if all the processes does a request at the same time
132
133 for process in processes:
134 process.join()
135
136 # merge dbs
137 conn_primary = sqlite3.connect('fullcommunism.db')
138 c_primary = conn_primary.cursor()
139 sql_statement = " CREATE TABLE IF NOT EXISTS posts (id text PRIMARY_KEY, author text NOT NULL, title text NOT NULL, selftext text NOT NULL, url text NOT NULL, imgurl text NOT NULL);"
140 c_primary.execute(sql_statement)
141 sql_statement = " CREATE TABLE IF NOT EXISTS comments (id text PRIMARY_KEY, author text NOT NULL, body text NOT NULL, link_id text NOT NULL, parent_id text NOT NULL);"
142 c_primary.execute(sql_statement)
143 conn_primary.commit()
144
145 files = [f for f in os.listdir('.') if os.path.isfile(f) and f.endswith(".db") and f != "fullcommunism.db"]
146 for file in files:
147 conn_temp = sqlite3.connect(file)
148 c_temp = conn_temp.cursor()
149 c_temp.execute("SELECT * FROM posts")
150 rows = c_temp.fetchall()
151
152 for row in rows:
153 c_primary.execute("INSERT INTO posts (id,author,title,selftext,url,imgurl) VALUES (?,?,?,?,?,?)", row)
154
155 c_temp.execute("SELECT * FROM comments")
156 rows = c_temp.fetchall()
157
158 for row in rows:
159 c_primary.execute("INSERT INTO comments (id,author,body,link_id,parent_id) VALUES (?,?,?,?,?)", row)
160
161 c_temp.close()
162 conn_temp.close()
163 os.remove(file)
164
165 conn_primary.commit()
166 c_primary.close()