· 5 years ago · Sep 15, 2020, 08:40 PM
1import urllib.request as urlreq
2from random import randint
3from typing import *
4import xml.etree.ElementTree as ET
5import asyncio
6from furl import furl
7
8class DataContainer:
9 '''Image container for results
10 Meant to be used with get_post_data'''
11
12 def __init__(self, payload: dict):
13 self.data = payload
14 self.id = int(payload.get('id'))
15 self.height = int(payload.get('height'))
16 self.width = int(payload.get('width'))
17 self.sample_height = int(payload.get('sample_height'))
18 self.sample_width = int(payload.get('sample_width'))
19 self.preview_width = int(payload.get('preview_width'))
20 self.preview_height = int(payload.get('preview_height'))
21 self.score = int(payload.get('score'))
22 self.change = int(payload.get('change'))
23 self.file_url = payload.get('file_url')
24 self.parent_id = payload.get('parent_id')
25 self.sample_url = payload.get('sample_url')
26 self.preview_url = payload.get('preview_url')
27 self.rating = payload.get('rating')
28 self.tags = payload.get('tags')
29 self.md5 = payload.get('md5')
30 self.creator_id = payload.get('creator_id')
31 self.has_children = payload.get('has_children')
32 self.created_at = payload.get('created_at')
33 self.status = payload.get('status')
34 self.source = payload.get('source')
35 self.has_notes = payload.get('has_notes')
36 self.has_comments = payload.get('has_comments')
37
38 async def show_all_data(self) -> dict:
39 '''Get all data for post'''
40 return self.data
41
42 async def show_tags(self) -> tuple:
43 tags = self.tags.strip().split(' ')
44 if self.rating == 's':
45 tags.append('rating:safe')
46 if self.rating == 'q':
47 tags.append('rating:questionable')
48 if self.rating == 'e':
49 tags.append('rating:explicit')
50
51 return tuple(tags)
52
53 async def show_comments(self) -> list:
54 if self.has_comments == 'false':
55 return None
56
57 comments = await Gelbooru().get_comments(self.id)
58 return comments
59
60class Gelbooru:
61
62 def __init__(self, api_key: Optional[str] = None,
63 user_id: Optional[str] = None,
64 loop: Optional[asyncio.AbstractEventLoop] = None):
65
66 self.api_key = api_key
67 self.user_id = user_id
68 self.page_num = randint(0, 200)
69 self.booru_url = 'https://gelbooru.com/'
70 self._loop = None
71
72 def __endpoint(self, s) -> furl:
73
74 endpoint = furl(self.booru_url)
75 endpoint.args['page'] = 'dapi'
76 endpoint.args['s'] = s
77 endpoint.args['q'] = 'index'
78
79 # Add api key and user ID if possible
80 if self.api_key:
81 endpoint.args['api_key'] = self.api_key
82 if self.user_id:
83 endpoint.args['user_id'] = self.user_id
84
85 return endpoint
86
87 # Private function to create a post URL and a related image URL
88 def __link_images(self, response):
89
90 image_list = []
91 temp_dict = dict()
92
93 post_url = 'https://gelbooru.com/index.php?page=post&s=view&id='
94 for i in range(len(response)):
95 temp_dict['post_url'] = post_url + f'{response[i]["id"]}'
96 temp_dict['image_url'] = response[i]['file_url']
97 temp_dict['id'] = response[i]['id']
98 image_list.append(temp_dict)
99 temp_dict = dict()
100
101 return image_list
102
103 def __tagifier(self, tags) -> list:
104
105 tags = [tag.strip().lower().replace(' ', '_') for tag in tags.split(', ')] if tags else []
106 return tags
107
108 # Get a bunch of posts based on a limit and tags that the user enters.
109 async def get_posts(self, tags='', limit=100) -> list:
110 '''User can pass in tags separated by a comma
111 Using a dash before a tag will exclude it
112 e.g. (cat ears, blue eyes, rating:safe, -nude)
113 The limit parameter has a default value of 100
114 Regardless of limit, this should return a list'''
115
116 posts = []
117 tags = self.__tagifier(tags)
118 endpoint = self.__endpoint('post')
119 endpoint.args['limit'] = limit
120 endpoint.args['pid'] = self.page_num
121 endpoint.args['tags'] = tags
122
123 # This error should not ever happen.
124 try:
125 urlobj = urlreq.urlopen(str(endpoint))
126 data = ET.parse(urlobj)
127 urlobj.close()
128 except ET.ParseError:
129 return None
130 finally:
131 root = data.getroot()
132
133 # Reduce search if length of root is 0. Gives up if pid=0 has 0 results
134 temp = 4
135 attempts = 5
136 while len(root) == 0:
137 if attempts == 0:
138 return None
139 else:
140 pass
141 self.page_num = randint(0, temp)
142 endpoint.args['pid'] = self.page_num
143
144 try:
145 urlobj = urlreq.urlopen(str(endpoint))
146 data = ET.parse(urlobj)
147 root = data.getroot()
148 except ET.ParseError:
149 return None
150 finally:
151 urlobj.close()
152
153 temp += -1
154 attempts += -1
155
156 for post in root:
157 posts.append(post.attrib)
158
159 images = self.__link_images(posts)
160 return images
161
162 # Get a single image based on tags that the user enters.
163 async def get_single_post(self, tags='') -> dict:
164 '''User can pass in tags separated by a comma
165 Using a dash before a tag will exclude it
166 e.g. (cat ears, blue eyes, rating:safe, -nude)
167 Has a hard limit of 1'''
168
169 tags = self.__tagifier(tags)
170 posts = []
171 endpoint = self.__endpoint('post')
172 endpoint.args['limit'] = 100
173 endpoint.args['pid'] = self.page_num
174 endpoint.args['tags'] = tags
175
176 # This error should not ever happen
177 try:
178 urlobj = urlreq.urlopen(str(endpoint))
179 data = ET.parse(urlobj)
180 root = data.getroot()
181 except ET.ParseError:
182 return None
183 finally:
184 urlobj.close()
185
186 # Reduce search if length of root is 0. Gives up if pid=0 has 0 results
187 temp = 4
188 attempts = 5
189 while len(root) == 0:
190 if attempts == 0:
191 return None
192 else:
193 pass
194 self.page_num = randint(0, temp)
195 endpoint.args['pid'] = self.page_num
196
197 try:
198 urlobj = urlreq.urlopen(str(endpoint))
199 data = ET.parse(urlobj)
200 root = data.getroot()
201 except ET.ParseError:
202 return None
203 finally:
204 urlobj.close()
205
206 temp += -1
207 attempts += -1
208
209 posts.append(root[randint(0, len(root)-1)].attrib)
210 image = self.__link_images(posts)
211 return image[0]
212
213 # Chooses an image out of 5000000+ images!
214 async def get_random_post(self) -> dict:
215 '''Simply, returns a random image out of 5000000+ possible images.'''
216
217 posts = []
218 try:
219 urlobj = urlreq.urlopen(self.booru_url)
220 data = ET.parse(urlobj)
221 root_temp = data.getroot()
222 except ET.ParseError:
223 return None
224 finally:
225 urlobj.close()
226
227 post_id = randint(1, int(root_temp.attrib['count']))
228 final_url = self.booru_url + f'&id={post_id}'
229 try:
230 urlobj = urlreq.urlopen(final_url)
231 data = ET.parse(urlobj)
232 root = data.getroot()
233 except ET.ParseError:
234 return None
235 finally:
236 urlobj.close()
237
238 posts.append(root[0].attrib)
239 image = self.__link_images(posts)
240 return image[0]
241
242 # Get comments from a post using post_id
243 async def get_comments(self, post_id):
244 '''Pass in a post ID to get the comments for the post.
245 If no comments are found, returns None.'''
246
247 comment_list = []
248 endpoint = self.__endpoint('comment')
249 endpoint.args['post_id'] = post_id
250 try:
251 urlobj = urlreq.urlopen(str(endpoint))
252 data = ET.parse(urlobj)
253 except:
254 return None
255 finally:
256 urlobj.close()
257
258 root = data.getroot()
259 temp = dict()
260
261 # Iterate through comments
262 for i in range(len(root)):
263 temp['author'] = root[i].attrib['creator']
264 temp['comment'] = root[i].attrib['body']
265 comment_list.append(temp)
266 temp = dict()
267
268 if len(comment_list) == 0:
269 return None
270 else:
271 return comment_list
272
273 # Get data for a post
274 async def get_post_data(self, post_id) -> Optional[DataContainer]:
275 '''User can pass in a post ID to get all of its data'''
276
277 endpoint = self.__endpoint('post')
278 endpoint.args['id'] = post_id
279 try:
280 urlobj = urlreq.urlopen(str(endpoint))
281 data = ET.parse(urlobj)
282 except:
283 return None
284 finally:
285 urlobj.close()
286
287 root = data.getroot()
288 return DataContainer(root[0].attrib)
289