· 6 months ago · Mar 21, 2025, 03:00 PM
1import os
2import sys
3import signal
4import psycopg2
5import random
6import json
7from datetime import datetime
8import time
9from pathlib import Path
10import subprocess
11from multiprocessing import Process, Queue
12from configparser import ConfigParser
13import numpy as np
14import pika
15
16last_orders_mtype_time = {}
17
18class Killer:
19 procs = []
20
21 def __init__(self):
22 signal.signal(signal.SIGINT, self.exit)
23 signal.signal(signal.SIGTERM, self.exit)
24
25 def exit(self, signum, frame):
26 for p in self.procs:
27 os.killpg(os.getpgid(p.pid), signum)
28
29class DbWriterClass:
30 def __init__(self):
31 self.server = None
32 self.connection = None
33 self.symbol_id = {}
34
35 def load_config(self, filename, section):
36 parser = ConfigParser()
37 parser.read(filename)
38
39 database = {}
40 if parser.has_section(section):
41 params = parser.items(section)
42 for param in params:
43 database[param[0]] = param[1]
44
45 else:
46 raise Exception(
47 "Section {0} not found in the {1} file".format(section, filename)
48 )
49
50 return database
51
52 def start_connection(self, filename = '/home/ubuntu/database.ini', section = 'postgresql'):
53 while 1:
54 try:
55 self.connection = psycopg2.connect(**self.load_config(filename, section))
56 break
57 except Exception as e:
58 print(e)
59 delay = random.randint(1, 10) / 5
60 time.sleep(delay)
61
62 print("Database connected")
63
64 self.query_string = {'orders': {}, 'book': {}, 'public_trades': {}, 'private_trades': {}, 'balance': {}, 'position': {}, 'status': {}}
65 self.s_hash = {}
66
67 def execute_select(self, query):
68 with self.connection.cursor() as cursor:
69 cursor.execute(query)
70 return cursor.fetchall()
71
72 def init_symbol(self, symbol):
73 sym = symbol.split('_')
74 market = '_'.join(sym[2:]).lower()
75 sym = '_'.join(sym[:2])
76 with self.connection.cursor() as cursor:
77 print(sym)
78 print(market)
79 cursor.execute(f"SELECT s.* FROM symbols s JOIN exchanges e USING (exchange_id) WHERE symbol = '{sym}' AND exchange_name = '{market}'")
80 try:
81 self.symbol_id[symbol] = cursor.fetchall()[0][0]
82 except Exception as e:
83 print(e)
84 return False
85
86 book_table = f"""
87 CREATE TABLE IF NOT EXISTS book_{self.symbol_id[symbol]} (
88 id SERIAL PRIMARY KEY,
89 time TIMESTAMP,
90 ask_price DOUBLE PRECISION,
91 bid_price DOUBLE PRECISION
92 )
93 """
94 with self.connection.cursor() as cursor:
95 cursor.execute(book_table)
96 self.connection.commit()
97
98 pub_trade_table = f"""
99 CREATE TABLE IF NOT EXISTS public_trade_{self.symbol_id[symbol]} (
100 id SERIAL PRIMARY KEY,
101 time TIMESTAMP,
102 price DOUBLE PRECISION,
103 quantity DOUBLE PRECISION,
104 is_buy BOOLEAN
105 )
106 """
107 with self.connection.cursor() as cursor:
108 cursor.execute(pub_trade_table)
109 self.connection.commit()
110
111 usr_trade_table = f"""
112 CREATE TABLE IF NOT EXISTS user_trade_{self.symbol_id[symbol]} (
113 id SERIAL PRIMARY KEY,
114 time TIMESTAMP,
115 price DOUBLE PRECISION,
116 quantity DOUBLE PRECISION,
117 is_buy BOOLEAN,
118 strategy CHAR
119 )
120 """
121 with self.connection.cursor() as cursor:
122 cursor.execute(usr_trade_table)
123 self.connection.commit()
124
125 balance_table = f"""
126 CREATE TABLE IF NOT EXISTS balance_{self.symbol_id[symbol]} (
127 id SERIAL PRIMARY KEY,
128 time TIMESTAMP,
129 base DOUBLE PRECISION,
130 quote DOUBLE PRECISION,
131 price DOUBLE PRECISION,
132 strategy CHAR
133 )
134 """
135 with self.connection.cursor() as cursor:
136 cursor.execute(balance_table)
137 self.connection.commit()
138
139 position_table = f"""
140 CREATE TABLE IF NOT EXISTS position_{self.symbol_id[symbol]} (
141 id SERIAL PRIMARY KEY,
142 time TIMESTAMP,
143 ask_bound DOUBLE PRECISION,
144 bid_bound DOUBLE PRECISION
145 )
146 """
147 with self.connection.cursor() as cursor:
148 cursor.execute(position_table)
149 self.connection.commit()
150
151 active_orders_table = f"""
152 CREATE TABLE IF NOT EXISTS active_orders_{self.symbol_id[symbol]} (
153 id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
154 time TIMESTAMP,
155 price DOUBLE PRECISION,
156 quantity DOUBLE PRECISION,
157 is_bid BOOLEAN,
158 strategy CHAR
159 )
160 """
161 with self.connection.cursor() as cursor:
162 cursor.execute(active_orders_table)
163 self.connection.commit()
164 print(symbol)
165 self.query_string['orders'][symbol] = f'INSERT INTO active_orders_new_{self.symbol_id[symbol]} (time, price, quantity, is_bid, strategy)\n' + "VALUES (%s, %s, %s, %s, %s)"
166 self.query_string['book'][symbol] = f'INSERT INTO book_{self.symbol_id[symbol]} (time, ask_price, bid_price)\n' + "VALUES (%s, %s, %s)"
167 self.query_string['public_trades'][symbol] = f'INSERT INTO public_trade_{self.symbol_id[symbol]} (time, price, quantity, is_buy)\n' + "VALUES (%s, %s, %s, %s)"
168 self.query_string['private_trades'][symbol] = f'INSERT INTO user_trade_{self.symbol_id[symbol]} (time, price, quantity, is_buy, strategy)\n' + "VALUES (%s, %s, %s, %s, %s)"
169 self.query_string['balance'][symbol] = f'INSERT INTO balance_{self.symbol_id[symbol]} (time, base, quote, strategy)\n' + "VALUES (%s, %s, %s, %s)"
170 self.query_string['position'][symbol] = f'INSERT INTO position_{self.symbol_id[symbol]} (time, bound, base, quote)\n' + "VALUES (%s, %s, %s, %s)"
171 self.query_string['status'][symbol] = f'INSERT INTO status (time, symbol_id, status, strategy)\n' + "VALUES (%s, %s, %s, %s)"
172
173 return True
174
175def pipe(q, filename, symbol, strat):
176 p = subprocess.Popen(['tail', '-F', filename], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
177 for line in iter(p.stdout.readline, b''):
178 q.put((line.decode('utf-8'), symbol, strat))
179
180def send(channels, symbol, strat, line):
181 if line == 'init':
182 message = {
183 'name': f"{'_'.join(symbol.split('_')[:2])}.{'_'.join(symbol.split('_')[2:])}.{strat}",
184 'ts': int(time.time()),
185 'mtype': 'init',
186 'msg': ''
187 }
188 else:
189 if len(line.split(' ')) < 3:
190 print(f'can not parse line: {line}')
191 return
192
193 global last_orders_mtype_time
194
195 if line.split(' ')[1] == 'orders':
196 if symbol not in last_orders_mtype_time:
197 last_orders_mtype_time[symbol] = 0
198 if time.time() - last_orders_mtype_time[symbol] < 10:
199 return
200 last_orders_mtype_time[symbol] = time.time()
201
202 message = {
203 'name': f"{'_'.join(symbol.split('_')[:2])}.{'_'.join(symbol.split('_')[2:])}.{strat}",
204 'ts': int(line.split(' ')[0]),
205 'mtype': line.split(' ')[1],
206 'msg': eval(' '.join(line.split(' ')[2:]))
207 }
208 if 'status' in line:
209 print(line)
210 for channel in channels:
211 channel.basic_publish(
212 exchange="",
213 routing_key="log_queue.old_prod",
214 body=json.dumps(message),
215 properties=pika.BasicProperties(delivery_mode=2) # Make message persistent
216 )
217
218if __name__ == '__main__':
219 # Rabbit init
220 RABBITMQ_HOST = "18.178.242.45"
221 RABBITMQ_TERMINAL_HOST = "80.93.176.68"
222 connection = pika.BlockingConnection(pika.ConnectionParameters(host=RABBITMQ_HOST, port=9669))
223 connection_terminal = pika.BlockingConnection(pika.ConnectionParameters(host=RABBITMQ_TERMINAL_HOST, port=5672))
224 channel = connection.channel()
225 channel_terminal = connection_terminal.channel()
226 # Declare a queue
227 channel.queue_declare(queue="log_queue.old_prod", durable=True)
228 channel_terminal.queue_declare(queue="log_queue.old_prod", durable=True)
229 channels = [channel, channel_terminal]
230
231 old_files = set()
232 q = Queue()
233 killer = Killer()
234 writer = DbWriterClass()
235 args = {}
236 if len(sys.argv) > 1:
237 args['filename'] = sys.argv[1]
238 writer.start_connection(**args)
239 prev_time = 0
240 working = {}
241 print(writer.connection.closed)
242 while True:
243 cur_time = int(time.time())
244 if cur_time - prev_time > 60:
245 files_path = [(file, '/mnt/mm_telemetry/' + file.name) for file in Path('/mnt/mm_telemetry').iterdir() if file.is_file()]
246 files_path += [(file, '/mnt/volume_telemetry/' + file.name) for file in Path('/mnt/volume_telemetry').iterdir() if file.is_file()]
247 files_path += [(file, '/mnt/wolume_telemetry/' + file.name) for file in Path('/mnt/wolume_telemetry').iterdir() if file.is_file()]
248 files_path += [(file, '/mnt/tb_telemetry/' + file.name) for file in Path('/mnt/tb_telemetry').iterdir() if file.is_file()]
249 for raw_file, file in files_path:
250 strat = file[5].upper()
251 mtime = int(raw_file.stat().st_mtime * 1000)
252 ctime = int(time.time() * 1000)
253 if not file in old_files:
254 name = file.split('/')[-1]
255 symbol = name
256 if not writer.init_symbol(symbol):
257 continue
258 send(channels, symbol, strat, 'init')
259 working[file] = 0
260 old_files.add(file)
261 # symbol = (name.split('usdt')[0].replace('_', '').replace('-', '') + '_usdt').upper()
262 # symbol += name.split('usdt')[1]
263 killer.procs.append(Process(target=pipe, args=(q, file, symbol, strat)))
264 killer.procs[-1].start()
265 if strat == 'V':
266 cd = 180
267 else:
268 cd = 60
269 print(f'{file}: {mtime} {ctime}; {working[file]}')
270 if mtime < ctime - cd * 1000:
271 if working[file] != -1:
272 name = file.split('/')[-1]
273 symbol = name
274 # symbol = (name.split('usdt')[0].replace('_', '').replace('-', '') + '_usdt').upper()
275 # symbol += name.split('usdt')[1]
276 working[file] = -1
277 send(channels, symbol, strat, f"{ctime} status {{'status': False}}")
278 elif working[file] != 1:
279 name = file.split('/')[-1]
280 symbol = name
281 working[file] = 1
282 send(channels, symbol, strat, f"{ctime + 1000} status {{'status': True}}")
283 prev_time = cur_time
284 z = q.get()
285 send(channels, z[1], z[2], z[0])