· 4 years ago · Mar 10, 2021, 05:36 PM
1from abc import abstractmethod
2from typing import Union
3
4import asyncpg
5
6from config import data
7from .connection_config import FormattedArgs
8
9
10class BaseConnection:
11 """Базовый класс для подключения к базе данных"""
12
13 def __init__(self):
14 self.user = data.PG_USER
15 self.password = data.PG_PASS
16 self.host = data.HOST
17 self.database = data.DATABASE
18 self.pool: Union[asyncpg.Pool, None] = None
19 self.connected = False
20
21 @abstractmethod
22 async def connect(self, **kwargs) -> None:
23 pool = await asyncpg.create_pool(
24 user=self.user, # Пользователь базы (postgres или ваше имя), для которой была создана роль
25 password=self.password, # Пароль к пользователю
26 host=self.host, # Ip адрес базы данных. Если локальный компьютер - localhost
27 database=self.database # Название базы данных. По умолчанию - postgres, если вы не создавали свою
28 )
29 self.pool = pool
30 self.connected = True
31
32 @abstractmethod
33 def is_connected(self) -> bool:
34 if self.connected:
35 return True
36 return False
37
38
39class ApiMixin:
40 """Миксин с форматированием операторов"""
41
42 @staticmethod
43 def and_(query: str, **kwargs) -> FormattedArgs:
44 query += " AND ".join([
45 f"{item} = ${num}" for num, item in enumerate(kwargs, start=1)
46 ])
47 return FormattedArgs(query=query, params=tuple(kwargs.values()))
48
49 @staticmethod
50 def or_(query: str, **kwargs):
51 query += " OR ".join([
52 f"{item} = ${num}" for num, item in enumerate(kwargs, start=1)
53 ])
54 return FormattedArgs(query=query, params=tuple(kwargs.values()))
55
56
57class BaseDatabase(BaseConnection, ApiMixin):
58 """Основной класс базы данных"""
59
60 def __init__(self):
61 super(BaseConnection, self).__init__()
62
63 async def connect(self) -> asyncpg.pool:
64 await super().connect()
65 return self.pool
66
67 def is_connected(self) -> bool:
68 pass
69
70--- client_code.py ---
71import asyncio
72
73from db_api.connection.connections import BaseDatabase
74from db_api.connection.exceptions import ConnectionError
75
76
77class Database(BaseDatabase):
78
79 async def create_table_users(self):
80 if not self.connected:
81 raise ConnectionError()
82 query = """CREATE TABLE IF NOT EXISTS users(
83 user_id int GENERATED ALWAYS AS IDENTITY,
84 name varchar(255) NOT NULL,
85 username varchar(255) NULL,
86 balance decimal DEFAULT 0 NOT NULL,
87 CONSTRAINT pk_users_user_id PRIMARY KEY (user_id)
88 )"""
89 await self.pool.execute(query)
90
91 async def select_user(self, **kwargs):
92 query = """SELECT name, username
93 FROM users
94 WHERE """
95 data = self.and_(query=query, **kwargs)
96 return await self.pool.fetchrow(
97 data.query,
98 *data.params
99 )
100
101 async def create_user(self, name: str, username: str):
102 query = """INSERT INTO users(name, username)
103 VALUES($1, $2)"""
104 await self.pool.execute(
105 query,
106 name,
107 username
108 )
109
110
111async def main():
112 db = Database()
113 await db.connect()
114 await db.create_table_users()
115
116
117if __name__ == '__main__':
118 loop = asyncio.get_event_loop()
119 try:
120 loop.run_until_complete(main())
121 finally:
122 loop.close()