· 5 years ago · Sep 10, 2020, 03:44 PM
1import sqlite3
2
3
4def ensure_connection(func):
5 def inner(*args, **kwargs):
6 with sqlite3.connect('test.db') as conn:
7 kwargs['conn'] = conn
8 res = func(*args, **kwargs)
9 return res
10
11 return inner
12
13
14@ensure_connection
15def init_db(conn, force: bool = False):
16
17 c = conn.cursor()
18
19 if force:
20 c.execute('DROP TABLE IF EXISTS user_message')
21
22 c.execute('''
23 CREATE TABLE IF NOT EXISTS user_message (
24 user_id INTEGER NOT NULL,
25 text TEXT NOT NULL
26 )
27 ''')
28
29 conn.commit()
30
31
32@ensure_connection
33def add_message(conn, user_id: int, text: str):
34 c = conn.cursor()
35 c.execute('INSERT INTO user_message (user_id, text) VALUES (?, ?)', (user_id, text))
36 conn.commit()
37
38
39@ensure_connection
40def count_messages(conn, user_id: int):
41 c = conn.cursor()
42 c.execute('SELECT COUNT(*) FROM user_message WHERE user_id = ? LIMIT 1', (user_id, ))
43 (res, ) = c.fetchone()
44 return res
45
46
47@ensure_connection
48def list_messages(conn, user_id: int, limit: int = 10):
49 c = conn.cursor()
50 c.execute('SELECT id, text FROM user_message WHERE user_id = ? ORDER BY id DESC LIMIT ?', (user_id, limit))
51 return c.fetchall()