· 4 years ago · Mar 23, 2021, 06:14 PM
1from typing import Union
2
3import asyncpg
4from asyncpg import Connection
5from asyncpg.pool import Pool
6
7from data import config
8
9
10class Database:
11
12 def __init__(self):
13 self.pool: Union[Pool, None] = None
14
15 async def create(self):
16 self.pool = await asyncpg.create_pool(
17 user=config.DB_USER,
18 password=config.DB_PASS,
19 host=config.DB_HOST,
20 database=config.DB_NAME
21 )
22
23 async def execute(self, command, *args,
24 fetch: bool = False,
25 fetchval: bool = False,
26 fetchrow: bool = False,
27 execute: bool = False
28 ):
29 async with self.pool.acquire() as connection:
30 connection: Connection
31 async with connection.transaction():
32 if fetch:
33 result = await connection.fetch(command, *args)
34 elif fetchval:
35 result = await connection.fetchval(command, *args)
36 elif fetchrow:
37 result = await connection.fetchrow(command, *args)
38 elif execute:
39 result = await connection.execute(command, *args)
40 return result
41
42 async def create_table_users(self):
43 sql = """
44 CREATE TABLE IF NOT EXISTS Users (
45 id SERIAL PRIMARY KEY,
46 full_name VARCHAR(255) NOT NULL,
47 username varchar(255) NULL,
48 telegram_id BIGINT NOT NULL UNIQUE
49 );
50 """
51 await self.execute(sql, execute=True)
52
53 @staticmethod
54 def format_args(sql, parameters: dict):
55 sql += " AND ".join([
56 f"{item} = ${num}" for num, item in enumerate(parameters.keys(),
57 start=1)
58 ])
59 return sql, tuple(parameters.values())
60
61 async def add_user(self, full_name, username, telegram_id):
62 sql = "INSERT INTO users (full_name, username, telegram_id) VALUES($1, $2, $3) returning *"
63 return await self.execute(sql, full_name, username, telegram_id, fetchrow=True)
64
65 async def select_all_users(self):
66 sql = "SELECT * FROM Users"
67 return await self.execute(sql, fetch=True)
68
69 async def select_user(self, **kwargs):
70 sql = "SELECT * FROM Users WHERE "
71 sql, parameters = self.format_args(sql, parameters=kwargs)
72 return await self.execute(sql, *parameters, fetchrow=True)
73
74 async def count_users(self):
75 sql = "SELECT COUNT(*) FROM Users"
76 return await self.execute(sql, fetchval=True)
77
78 async def update_user_username(self, username, telegram_id):
79 sql = "UPDATE Users SET username=$1 WHERE telegram_id=$2"
80 return await self.execute(sql, username, telegram_id, execute=True)
81
82 async def delete_users(self):
83 await self.execute("DELETE FROM Users WHERE TRUE", execute=True)
84
85 async def drop_users(self):
86 await self.execute("DROP TABLE Users", execute=True)