· 5 years ago · May 05, 2020, 05:22 PM
1import asyncio
2from asyncio import AbstractEventLoop
3from typing import Optional
4
5import asyncpg
6from asyncpg.pool import Pool
7from asyncpg.connection import Connection
8
9
10class Engine:
11
12 def __init__(self, host, port, db_name, user, password, loop: Optional[AbstractEventLoop] = None):
13
14 if loop is None:
15 loop = asyncio.get_event_loop()
16
17 self._loop = loop
18 self.host = host
19 self.port = port
20 self.db_name = db_name
21 self.user = user
22 self.password = password
23 self._pool: Optional[Pool] = None
24
25 async def create_database(self):
26
27 async with self.open_connection() as connect:
28 await connect.execute(f"""
29 CREATE DATABASE IF NOT EXISTS {self.db_name}
30 WITH OWNER = {self.user} ENCODING = 'UTF8'
31 LC_COLLATE = 'en_US.UTF-8' LC_CTYPE = 'en_US.UTF-8'
32 TEMPLATE template0;
33 """)
34
35 async def drop_database(self):
36
37 async with self.open_connection() as connect:
38 await connect.execute(f"""
39 DROP DATABASE {self.db_name};
40 """)
41
42 async def create_tables(self):
43
44 async with self.open_connection() as connect:
45 await connect.execute("""
46 CREATE TABLE IF NOT EXISTS test_table (
47 row_id serial PRIMARY KEY,
48 test_column text NOT NULL,
49 used_timestamp timestamptz NOT NULL
50 DEFAULT now()
51 );
52 """)
53
54 async def create_pool(self):
55
56 self._pool = await asyncpg.create_pool(
57 host=self.host,
58 port=self.port,
59 user=self.user,
60 password=self.password,
61 database=self.db_name,
62 loop=self._loop
63 )
64
65 def acquire_connect_from_pool(self):
66
67 return self._pool.acquire()
68
69 async def release_connect_to_pool(self, connect):
70
71 await self._pool.release(connect)
72
73 async def close_pool(self):
74
75 await self._pool.close()
76
77 def open_connection(self):
78
79 return ConnectionContext(self)
80
81 async def get_connection(self):
82
83 return await asyncpg.connect(host=self.host, port=self.port, user=self.user,
84 password=self.password, loop=self._loop)
85
86
87class ConnectionContext:
88
89 def __init__(self, manager: Engine):
90
91 self._manager = manager
92 self._connection: Optional[Connection] = None
93
94 async def __aenter__(self) -> Connection:
95
96 self._connection = await self._manager.get_connection()
97 return self._connection
98
99 async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
100
101 await self._connection.close()