· 6 years ago · Jun 05, 2019, 01:44 AM
1import psycopg2;
2from multiprocessing.dummy import Pool as ThreadPool
3
4num_write_threads=2
5num_users=10000
6
7def create_table():
8 conn = psycopg2.connect("host=localhost dbname=postgres user=postgres port=5433")
9 conn.set_session(autocommit=True)
10 cur = conn.cursor()
11 cur.execute("""DROP TABLE IF EXISTS users""");
12 cur.execute("""CREATE TABLE IF NOT EXISTS users(
13 id text,
14 ename text,
15 age int,
16 PRIMARY KEY(id))
17 """)
18 print("Created users table")
19 print("====================")
20 cur.execute("""
21 CREATE INDEX IF NOT EXISTS name_idx ON users(ename)
22 """)
23 print("Created name_idx on table")
24
25
26def load_data_slave(thread_num):
27 thread_id = str(thread_num)
28 conn = psycopg2.connect("host=localhost dbname=postgres user=postgres port=5433")
29 conn.set_session(autocommit=True)
30 cur = conn.cursor()
31
32 print("Thread-" + thread_id + ": Inserting %d rows..." % (num_users))
33 # "name" column is indexed, and may have duplicates across threads.
34 for idx in range(num_users):
35 cur.execute("""INSERT INTO users (id, ename, age) VALUES (%s, %s, %s)""",
36 ("user-"+thread_id+"-"+str(idx),
37 "name--"+str(idx),
38 20 + (idx % 50)))
39 print("Thread-" + thread_id + ": Inserted %d rows" % (num_users))
40
41def load_data():
42 pool = ThreadPool(num_write_threads)
43 results = pool.map(load_data_slave, range(num_write_threads))
44
45# Main
46create_table()
47load_data()