· 6 years ago · Nov 20, 2019, 09:20 AM
1import asyncio
2from asyncio.events import AbstractEventLoop
3from typing import List, Callable, Optional, Any
4
5import asyncpg
6from asyncpg.pool import Pool
7from asyncpg.connection import Connection
8
9from .types import Table, Column, ProxyModel, PhoneModel
10from ..communicate.types import Phone
11from ..proxy.types import Proxy
12from ..helpers.time import get_utc_now_timestamp
13
14
15def _acquire_connection(method: Callable) -> Callable:
16 async def wrapper(self: "Database", *args, **kwargs) -> Any:
17 async with self._pool.acquire() as connect:
18 async with connect.transaction():
19 return await method(self, *args, **kwargs, __connect=connect)
20
21 return wrapper
22
23
24class Database:
25 def __init__(self, pool: Pool, loop: Optional[AbstractEventLoop] = None):
26
27 if loop is None:
28 loop = asyncio.get_event_loop()
29 self.loop = loop
30 self._pool = pool
31
32 @classmethod
33 async def init_from_auth(cls, host: str, port: int, database: str,
34 user: str, password: str, loop: Optional[AbstractEventLoop] = None) -> "Database":
35 pool = await asyncpg.create_pool(host=host, port=port, database=database, user=user,
36 password=password, loop=loop)
37 return cls(pool=pool, loop=loop)
38
39 @_acquire_connection
40 async def create_tables(self, __connect: Connection) -> None:
41 await __connect.execute(
42 f"""
43 CREATE TABLE IF NOT EXISTS {Table.USED_PROXIES} (
44 {Column.RECORD_ID} SERIAL PRIMARY KEY,
45 {Column.PROXY} TEXT UNIQUE NOT NULL,
46 {Column.ADD_UTC_TIMESTAMP} INTEGER NOT NULL
47 )
48 """
49 )
50
51 await __connect.execute(
52 f"""
53 CREATE TABLE IF NOT EXISTS {Table.USAGE_PHONES} (
54 {Column.RECORD_ID} SERIAL PRIMARY KEY,
55 {Column.PHONE} TEXT NOT NULL,
56 {Column.ADD_UTC_TIMESTAMP} INTEGER NOT NULL
57 )
58 """
59 )
60
61 @_acquire_connection
62 async def get_used_proxies(self, __connect: Connection) -> List[ProxyModel]:
63 records = await __connect.fetch(
64 f"""
65 SELECT *
66 FROM {Table.USED_PROXIES}
67 """
68 )
69
70 return [ProxyModel.init_from_star_record(record=record) for record in records]
71
72 @_acquire_connection
73 async def get_phone_usage(self, phone: Phone, __connect: Connection) -> List[PhoneModel]:
74 records = await __connect.fetch(
75 f"""
76 SELECT *
77 FROM {Table.USAGE_PHONES}
78 WHERE {Column.PHONE}=$1
79 """, str(phone)
80 )
81
82 return [PhoneModel.init_from_star_record(record=record) for record in records]
83
84 @_acquire_connection
85 async def add_used_proxy(self, proxy: Proxy, __connect: Connection) -> None:
86 await __connect.execute(
87 f"""
88 INSERT INTO {Table.USED_PROXIES} ({Column.PROXY}, {Column.ADD_UTC_TIMESTAMP})
89 VALUES ($1, $2)
90 """, str(proxy), get_utc_now_timestamp()
91 )
92
93 @_acquire_connection
94 async def add_phone_usage(self, phone: Phone, __connect: Connection) -> None:
95 await __connect.execute(
96 f"""
97 INSERT INTO {Table.USAGE_PHONES} ({Column.PHONE}, {Column.ADD_UTC_TIMESTAMP})
98 VALUES ($1, $2)
99 """, str(phone), get_utc_now_timestamp()
100 )
101
102 async def close(self):
103 await self._pool.close()