· 3 years ago · Jun 20, 2022, 04:20 PM
1import abc
2from typing import Callable, List, Dict, Any, Optional
3import time
4import json
5import queue
6import threading
7
8from airflow.providers.amazon.aws.hooks.s3 import S3Hook
9import requests
10
11
12class PokeAPIParser(abc.ABC):
13 """Abstract class to parse PokeAPI endpoints."""
14 def __init__(self, poke_api_url: str, sleep_duration: float, threads_number: int,
15 processed_folder: str = None, unprocessed_folder: str = None, **kwargs) -> None:
16 self.poke_api_url = poke_api_url
17 self.sleep_duration = sleep_duration
18 self.threads_number = threads_number
19 self.processed_folder = processed_folder
20 self.unprocessed_folder = unprocessed_folder
21
22 self.processed_resources = queue.Queue()
23 self.unprocessed_resources = queue.Queue()
24 self.hook = S3Hook()
25
26 @classmethod
27 @property
28 @abc.abstractmethod
29 def ENDPOINT(cls) -> str:
30 """Endpoint name."""
31 raise NotImplementedError
32
33 @classmethod
34 @property
35 @abc.abstractmethod
36 def PATHS(cls) -> List[str]:
37 """List of sequences of JSON keys separated by dot symbol."""
38 raise NotImplementedError
39
40 def _get_value(self, path: str, json: Dict, res: Dict) -> None:
41 """
42 # Insert to `res` values from `json` based on `path`.
43 Recursively find all values by ``path`` from ``json`` and put them in `res`.
44
45 Changes `res` dictionary in place.
46 Path must be a string containing sequence of string keys
47 for `json` separated by dot symbol.
48 If key sequence from `path` maps to list in `json` -
49 get remaining key sequence in every element of the list and put in `res`.
50 If key is url
51 Examples
52 --------
53 Example1:
54
55 >>> json = {'k1': [{'k21': 1, 'k22': 2}, {'k21': 3, 'k22': 4}]}
56 >>> path = 'k1.k21'
57 >>> res = dict()
58 >>> get_value(path, json, res)
59 >>> res
60 {'k1': [{'k21': 1}, {'k21': 3}]}
61
62 Example2:
63
64 >>> json = {'Garden': {'Flowers': {'Red flower': 'Rose'}}}
65 >>> path = 'Garden.Flowers.Red flower'
66 >>> res = dict()
67 >>> get_value(path, json, res)
68 >>> res
69 {'Garden': {'Flowers': {'Red flower': 'Rose'}}}
70 """
71 keys = path.split('.')
72 key = keys.pop(0)
73 json = json[key]
74 if isinstance(json, dict):
75 if res.get(key) is None:
76 res[key] = dict()
77 self._get_value('.'.join(keys), json, res[key])
78 elif isinstance(json, list):
79 if res.get(key) is None:
80 res[key] = [dict() for _ in json]
81 for index, item in enumerate(json):
82 self._get_value('.'.join(keys), item, res[key][index])
83 else:
84 if key == 'url':
85 res['id'] = int(json.split('/')[-2]) # save id as int to save space
86 else:
87 res[key] = json
88
89 def parse_json(self, json: Dict) -> Dict:
90 """Parse JSON fetched from PokeAPI resource."""
91 res = dict()
92 res['id'] = json['id']
93 res['name'] = json['name']
94 for path in self.PATHS:
95 self._get_value(path, json, res)
96 return res
97
98 def _get_json_content(self, url: str) -> Dict:
99 """Send `Get` request and get dictionary from response."""
100 return requests.get(url).json()
101
102 def _fetch_resource(self, offset: int, limit: int) -> Optional[str]:
103 """Fetch from list of available NamedAPIResources for that API.
104 NamedAPIResource is dict that contain two keys: `name` and `url`.
105 NamedAPIResource is put in `unprocessed_resources` queue."""
106 url = f'{self.poke_api_url}{self.ENDPOINT}?limit={limit}&offset={offset}'
107 content = self._get_json_content(url)
108 for resource in content['results']:
109 self.unprocessed_resources.put(resource)
110 return content['next']
111
112 def _fetch_resources(self, begin: int, step: int, limit: int) -> None:
113 """Fetch resources from PokeAPI endpoint."""
114 counter = 0
115 while self._fetch_resource(begin + counter * step, limit):
116 time.sleep(self.sleep_duration)
117 counter += 1
118
119 def _concurrent_running(self, target: Callable, kwargs_list: List[Dict[str, Any]]) -> None:
120 """Run `target` function concurrently."""
121 threads = []
122 for kwargs in kwargs_list:
123 thread = threading.Thread(target=target, kwargs = kwargs)
124 thread.start()
125 threads.append(thread)
126 for thread in threads:
127 thread.join()
128
129 def fetch_resources_concurrently(self, limit: int) -> None:
130 """Fetch resources from PokeAPI endpoint concurrently."""
131 kwargs_list = [
132 {'begin': limit * i, 'step': limit * self.threads_number, 'limit': limit}
133 for i in range(self.threads_number)
134 ]
135 self._concurrent_running(self._fetch_resources, kwargs_list)
136
137 def _fetch_endpoint(self) -> None:
138 """Get from unprocessed queue resource, process it and put in processed queue."""
139 while True:
140 time.sleep(self.sleep_duration)
141 try:
142 resource = self.unprocessed_resources.get(block=False)
143 except queue.Empty:
144 return
145 content = self._get_json_content(resource['url'])
146 parsed_content = self.parse_json(content)
147 self.processed_resources.put(parsed_content)
148
149 def fetch_endpoint_concurrently(self) -> None:
150 """Fetch endpoint in threads."""
151 kwargs_list = [None] * self.threads_number
152 self._concurrent_running(self._fetch_endpoint, kwargs_list)
153
154 def load_unprocessed_resources(self) -> None:
155 """Load unprocessed data from S3."""
156 bucket, key = self.hook.parse_s3_url(f'{self.unprocessed_folder}{self.ENDPOINT}.json')
157 data_string = self.hook.read_key(
158 bucket_name=bucket,
159 key=key
160 )
161 decoded_data = json.loads(data_string)
162 for resource in decoded_data:
163 self.unprocessed_resources.put(resource)
164
165 def dump_unprocessed(self) -> None:
166 """Dump unprocessed data into S3."""
167 self._load_string(self.unprocessed_folder, self.unprocessed_resources)
168
169 def dump_processed(self) -> None:
170 """Dump processed data into S3."""
171 self._load_string(self.processed_folder, self.processed_resources)
172
173 def _load_string(self, prefix, resources_queue) -> None:
174 """Load queue data as JSON string file into S3."""
175 encoded_data = json.dumps(list(resources_queue.queue))
176 self.hook.load_string(
177 string_data=encoded_data,
178 key=f'{prefix}{self.ENDPOINT}.json', replace=True
179 )
180
181
182class PokemonParser(PokeAPIParser):
183 ENDPOINT = 'pokemon'
184 PATHS = ['stats.base_stat', 'stats.stat.url']
185
186
187class TypeParser(PokeAPIParser):
188 ENDPOINT = 'type'
189 PATHS = ['pokemon.pokemon.url']
190
191
192class StatParser(PokeAPIParser):
193 ENDPOINT = 'stat'
194 PATHS = []
195
196
197class GenerationParser(PokeAPIParser):
198 ENDPOINT = 'generation'
199 PATHS = []
200
201
202class SpeciesParser(PokeAPIParser):
203 ENDPOINT = 'pokemon-species'
204 PATHS = ['generation.url', 'varieties.pokemon.url']
205
206
207class MoveParser(PokeAPIParser):
208 ENDPOINT = 'move'
209 PATHS = ['learned_by_pokemon.url']
210
211
212parsers: List[PokeAPIParser] = [
213 SpeciesParser,
214 GenerationParser,
215 StatParser,
216 TypeParser,
217 PokemonParser,
218 MoveParser,
219]