· 7 years ago · Jan 17, 2019, 07:14 PM
1import datetime
2import sqlite3
3
4def connect():
5 db = sqlite3.connect("test.db", isolation_level=None, uri=True, detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES)
6 cursor = db.cursor()
7 return cursor
8
9def create_table(cursor):
10 cursor.execute(
11 """CREATE TABLE IF NOT EXISTS test_table(
12 name TEXT NOT NULL,
13 time TIMESTAMP NOT NULL)"""
14 )
15
16def write_to_db(cursor):
17 row = {
18 "name": "testinput",
19 "ts": datetime.datetime.now(datetime.timezone.utc),
20 }
21 cursor.execute(
22 """INSERT INTO test_table
23 (name, time)
24 VALUES (:name, :ts)""",
25 row,
26 )
27
28def read_from_db(cursor):
29 cursor.execute(
30 """SELECT name, time FROM test_table"""
31 )
32 return cursor.fetchall()
33
34
35
36cursor = connect()
37create_table(cursor)
38write_to_db(cursor)
39rows = read_from_db(cursor)
40for row in rows:
41 print(row)