· 4 years ago · Mar 17, 2021, 06:06 PM
1import asyncio
2from dataclasses import dataclass
3from itertools import repeat
4from typing import Literal, Optional, Union, Dict, List, Tuple, Any
5from aiohttp import ClientTimeout, ClientSession, ContentTypeError
6from aiohttp.client import DEFAULT_TIMEOUT
7
8
9@dataclass(frozen=True)
10class Response:
11 status_code: int
12 response_data: Union[
13 Optional[Union[dict, str, bytes, bytearray]]
14 ]
15
16
17class RequestAuthError(Exception):
18 """
19 Ошибка при неправильной аунтефикации POST or GET data
20
21 """
22
23
24class HttpBase(object):
25 """
26 Class, which include abstract methods of parser
27
28 """
29
30
31class HttpXParser(HttpBase):
32 """
33 Парсер для django сайта, собирает дополнительную информацию
34
35 """
36 _sleep_time = 2
37
38 def __init__(self):
39 self._base_headers = {
40 'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.90 Safari/537.36',
41 'Accept-Language': "ru-RU,ru;q=0.9,en-US;q=0.8,en;q=0.7",
42 # 'Accept': 'application/json, text/plain, */*',
43 }
44 self._session: Optional[ClientSession] = None
45 self.url = 'http://127.0.0.1/api/'
46 self._timeout = ClientTimeout(total=2 * 15, connect=None, sock_connect=5, sock_read=None)
47
48 async def _request(
49 self,
50 url: Optional[str] = None,
51 get_json: bool = False,
52 method: Literal['POST', 'GET'] = 'POST',
53 set_timeout: bool = True,
54 data: Optional[Dict[str, Union[str, int, List[Union[str, int]]]]] = None,
55 headers: Optional[Dict[str, Union[str, int]]] = None,
56 session: Optional[ClientSession] = None,
57 **client_kwargs) -> Response:
58 """
59 Метод для отправки запроса
60
61 :param url: ссылка, куда вы хотите отправить ваш запрос
62 :param get_json: указывает на то, хотите ли вы получить ответ в формате json
63 :param method: POST or GET(тип запроса)
64 :param data:
65 :param headers:
66 :param session: object of aiohttp.ClientSession
67 :param client_kwargs: key/value for aiohttp.ClientSession initialization
68 :return: Response instance
69 """
70 if isinstance(headers, dict):
71 headers = headers.update(self._base_headers)
72 else:
73 headers = self._base_headers
74 if not isinstance(session, ClientSession):
75 async with ClientSession(
76 timeout=self._timeout if set_timeout else DEFAULT_TIMEOUT,
77 **client_kwargs
78 ) as session:
79 self._session = session
80 response = await self._session.request(
81 method=method, url=self.url if not url else url, data=self._set_auth(data), headers=headers
82 )
83 try:
84 data = await response.json(
85 content_type="application/json"
86 )
87 except ContentTypeError as ex:
88 if get_json:
89 raise RequestAuthError() from ex
90 data = await response.read()
91 return Response(
92 status_code=response.status,
93 response_data=data
94 )
95
96 @staticmethod
97 def _set_auth(
98 data: Optional[
99 Dict[str, Union[str, int, List[Union[str, int]], Tuple[Union[str, int]]]]
100 ] = None) -> Optional[Dict[str, str]]:
101 """
102 Метод валидации для джанго апи
103
104 :param data: It must be dict(your headers or data)
105 :return: validated data or headers
106 """
107 from djangoProject.settings import SECRET_KEY, SECRET_CODE
108 if not isinstance(data, dict):
109 data = {}
110 data.update(
111 {
112 'SECRET_KEY': SECRET_KEY,
113 'SECRET_CODE': SECRET_CODE
114 }
115 )
116 return data
117
118 async def fetch(self, *, times: int = 10, **kwargs) -> Optional[list]:
119 """
120
121 :param times: int of quantity requests
122 :param kwargs: HttpXParser._request kwargs
123 :return:
124 """
125 results = []
126 coroutines = [self._request(**kwargs) for _ in repeat(None, times)]
127 for future in asyncio.as_completed(fs=coroutines):
128 results.append(await future)
129 # print([result.response_data for result in results])
130 return results
131
132 def fast(self):
133 """
134 Method to fetching faster with using faster event loop(uvloop)
135
136 :return:
137 """
138 try:
139 from uvloop import EventLoopPolicy
140 asyncio.set_event_loop_policy(EventLoopPolicy())
141 except ImportError:
142 from asyncio import AbstractEventLoopPolicy as EventLoopPolicy
143 asyncio.set_event_loop_policy(EventLoopPolicy())
144 "Catching import error and forsake standard policy"
145 return self
146
147 def __getattr__(self, item: Any) -> Any:
148 """
149 Method, which can get an attribute of base_headers by this method
150
151 :param item: key name of _base_headers dict data
152 :return:
153 """
154 try:
155 return self._base_headers.get(item)
156 except KeyError:
157 """Returning None"""
158
159 def __eq__(self, other: Any) -> bool:
160 """
161 Method to compare instances of parsers
162
163 :param other: other object
164 :return: bool
165 """
166 if isinstance(other, self.__class__):
167 if other.url == self.url and other._base_headers == self._base_headers:
168 return True
169 return False
170
171 def __setitem__(self, key, value) -> None:
172 """
173
174 :param key: key of base_headers dict
175 :param value: value of base_headers dict
176 :return: None
177 """
178 self._base_headers.update(
179 {key: value}
180 )
181
182
183if __name__ == '__main__':
184 parser = HttpXParser()
185 parser['Product'] = 'True'
186 asyncio.run(parser.fast().fetch(times=1000, url='https://www.bia.pp.ua/'))
187
188 # asyncio.run(parser.fast().fetch(1))
189