· 7 years ago · Mar 26, 2018, 04:52 PM
1import time
2import json
3import gzip
4import base64
5from networking.base_slave import BaseSlave
6from api.bibox_api import BiboxAPI
7
8
9class BiboxSlave(BaseSlave):
10
11 PRINT_DELAY = 120
12
13 def __init__(self, ip, port, api_key, secret_key):
14 super().__init__(ip, port)
15 self.api = BiboxAPI(api_key, secret_key)
16 self.listener = None
17
18 def _reset(self):
19 self._close_listener()
20 super()._reset()
21
22 def _start_depths(self, msg):
23 symbols = msg.split(',')
24 print('received symbols:', len(symbols), symbols)
25 channels = []
26 for symbol in symbols:
27 channels.append('bibox_sub_spot_' + symbol + '_depth')
28 self._reset_listener(channels)
29
30 def _reset_listener(self, channels):
31 self._close_listener()
32 self.listener = self.api.create_listener(channels, self._on_listener_message, self._on_listener_error)
33 self.thread_manager.start_thread(self.listener.start)
34
35 def _close_listener(self):
36 if self.listener:
37 self.listener.close()
38 self.listener = None
39
40 def _on_listener_message(self, ws, message):
41 if self.thread_manager.running:
42 message = json.loads(message)
43 for sub_message in message:
44 data = sub_message['data']
45 data = json.loads(gzip.decompress(base64.b64decode(data)).decode())
46 if 'pair' in data:
47 data['asks'] = self._trim_orders(data['asks'], reverse=True)
48 data['bids'] = self._trim_orders(data['bids'], reverse=False)
49 self._send_message(json.dumps(data))
50
51 def _on_listener_error(self, channels):
52 time.sleep(10)
53 if self.thread_manager.running:
54 self._reset_listener(channels)
55
56 def _trim_orders(self, data, reverse):
57 if data:
58 return sorted(data, key=lambda x: float(x['price']), reverse=reverse)[-self.MAX_DEPTH:]
59 return []
60
61 '''
62 def _update_depth(self, symbol):
63 last_server_timestamp = 0
64 last_request_time = time.time()
65 while not self.failed:
66 try:
67 data = self.api.get_depth(symbol, size=5)
68 request_time = data['request_timestamp']
69 self._touch_fetch_time(data)
70 self._touch_update_time(symbol)
71 server_timestamp = data['payload']['result']['update_time']
72 if server_timestamp > last_server_timestamp:
73 last_server_timestamp = server_timestamp
74 data['request_timestamp'] = last_request_time
75 self._send_message(json.dumps(data))
76 self._touch_delta_time(symbol)
77 precision = time.time() - last_request_time
78 time.sleep(max(0.3 - precision, 0))
79 last_request_time = request_time
80 self.max_usage = max(self.max_usage, self.api.public_rate_limit.usage_current)
81 except Exception as e:
82 print('exception occured while fetching depth', symbol, e)
83 time.sleep(1)
84 '''