· 7 years ago · Nov 23, 2018, 08:12 AM
1import tqdm
2import psycopg2
3import random
4from multiprocessing import Pool
5
6
7
8def go():
9 conn = psycopg2.connect("dbname=postgres user=postgres host=127.0.0.1")
10 conn.set_session(autocommit=True)
11 conn.set_isolation_level(0)
12
13 cursor = conn.cursor()
14
15 cursor.execute('DROP TABLE IF EXISTS message')
16 cursor.execute('DROP TABLE IF EXISTS chat')
17
18 cursor.execute('''
19create table chat
20(
21 id bigserial not null
22 constraint chat_pkey
23 primary key,
24 title text not null
25)
26 ''')
27
28 cursor.execute('''
29
30CREATE TABLE message (
31 id bigserial,
32 chat_id bigint REFERENCES chat (id),
33 content text NOT NULL,
34
35 PRIMARY KEY (chat_id, id)
36
37-- FOREIGN KEY (company_id, ad_id) -- added
38-- REFERENCES ads (company_id, id)
39);
40
41
42
43 ''')
44
45 cursor.execute('''
46 SELECT create_distributed_table('chat', 'id');
47 ''')
48 cursor.execute(''' SELECT create_distributed_table('message', 'chat_id');
49''')
50
51
52 cursor.close()
53 conn.close()
54
55
56def go2():
57 conn = psycopg2.connect("dbname=postgres user=postgres host=127.0.0.1")
58 conn.set_session(autocommit=True)
59 conn.set_isolation_level(0)
60
61 cursor = conn.cursor()
62 cursor.execute('DROP DATABASE IF EXISTS db_boner')
63 cursor.execute('CREATE DATABASE db_boner')
64 conn.close()
65
66 conn = psycopg2.connect("dbname=db_boner user=postgres host=127.0.0.1")
67 conn.set_session(autocommit=True)
68 conn.set_isolation_level(0)
69 cursor = conn.cursor()
70 cursor.execute('DROP TABLE IF EXISTS message')
71 cursor.execute('DROP TABLE IF EXISTS chat')
72
73 cursor.execute('''
74 create table chat
75 (
76 id bigserial PRIMARY KEY,
77 title text not null
78 )
79 ''')
80
81 cursor.execute('''
82 CREATE TABLE message (
83 id bigserial PRIMARY KEY,
84 chat_id bigint REFERENCES chat (id),
85 content text NOT NULL
86
87
88 -- FOREIGN KEY (company_id, ad_id) -- added
89 -- REFERENCES ads (company_id, id)
90 );
91
92 ''')
93
94
95 cursor.close()
96 conn.close()
97
98def run_on_citus(i):
99 print ('citus', i)
100 conn = psycopg2.connect("dbname=postgres user=postgres host=127.0.0.1")
101 conn.set_session(autocommit=True)
102 conn.set_isolation_level(0)
103
104 cursor = conn.cursor()
105
106 for i in range(300):
107 cursor.execute(
108 'INSERT INTO chat (title) VALUES (\'title' + str(i) + '\')');
109
110 for i in range(1000):
111 cursor.execute(
112 'INSERT INTO message (content, chat_id) VALUES (\'title' + str(
113 i) + '\', ' + str(random.randint(1, 299)) + ')');
114
115 cursor.close()
116 conn.close()
117
118def run_on_original(i):
119 print(i)
120 conn = psycopg2.connect("dbname=db_boner user=postgres host=127.0.0.1")
121 conn.set_session(autocommit=True)
122 conn.set_isolation_level(0)
123 cursor = conn.cursor()
124
125 for i in range(300):
126 cursor.execute(
127 'INSERT INTO chat (title) VALUES (\'title' + str(i) + '\')');
128
129 for i in range(1000):
130 cursor.execute(
131 'INSERT INTO message (content, chat_id) VALUES (\'title' + str(
132 i) + '\', ' + str(random.randint(1, 299)) + ')');
133
134 cursor.close()
135 conn.close()
136
137import time
138
139# TEST ORIGINAL
140go2()
141start_time = time.time()
142with Pool(4) as p:
143 p.map(run_on_original, list(range(4)))
144
145print("--- %s seconds ---" % (time.time() - start_time))
146
147
148# TEST CITUS
149go()
150
151start_time = time.time()
152with Pool(4) as p:
153 p.map(run_on_citus, list(range(4)))
154
155print("--- %s seconds ---" % (time.time() - start_time))