· 4 years ago · Jun 14, 2021, 10:56 PM
1import json
2import requests
3import time
4import hmac
5import base64
6import hashlib
7
8class Api:
9
10 BASE_URL = "https://api.ndax.io:8443/AP"
11 OMS_ID = 1
12
13 def __init__(self, api_key, secret_key, user_id, user_name):
14 self.__api_key = api_key
15 self.__secret_key = secret_key
16 self.__user_id = user_id
17 self.__user_name = user_name
18 self.__nonce = None
19 self.__signature = None
20 self.__get__headers = None
21 self.__post__headers = None
22 self.__account_id = None
23
24 def init(self):
25 self.__nonce = self.__create_nonce()
26 self.__signature = self.__create_signature()
27
28 self.__get__headers = {
29 "Nonce": self.__nonce,
30 "APIKey": self.__api_key,
31 "Signature": self.__signature,
32 "UserId": self.__user_id
33 }
34
35 self.__post__headers = dict(self.__get__headers)
36 self.__post__headers["Content-Type"] = "application/json"
37
38 self.__account_id = self.__get_account_id()
39
40 def get_instrument_snapshot(self, instrument):
41 payload = {
42 "OMSId": Api.OMS_ID,
43 "InstrumentId": instrument["id"]
44 }
45
46 response = self.__post_json("GetLevel1", payload)
47
48 return response
49
50 def get_instruments(self):
51 payload = {
52 "omsId": Api.OMS_ID
53 }
54
55 response = self.__post_json("GetInstruments", payload)
56
57 return response
58
59 def get_products(self):
60 payload = {
61 "omsId": Api.OMS_ID
62 }
63
64 response = self.__post_json("GetProducts", payload)
65
66 return response
67
68 def get_account_positions(self):
69 payload = {
70 "OMSId": Api.OMS_ID,
71 "AccountId": self.__account_id
72 }
73
74 response = self.__post_json("GetAccountPositions", payload)
75
76 return response
77
78 def get_account_trades(self):
79 payload = {
80 "OMSId": Api.OMS_ID,
81 "AccountId": self.__account_id,
82 "StartIndex": 0,
83 "Count": 200
84 }
85
86 response = self.__post_json("GetAccountTrades", payload)
87
88 return response
89
90 def __get_account_id(self):
91 payload = {
92 "omsId": Api.OMS_ID,
93 "userId": self.__user_id,
94 "userName": self.__user_name
95 }
96
97 response = self.__get_json("GetUserAccounts", payload)
98
99 return response[0] if response else None
100
101 def __create_nonce(self):
102 return str(int(time.time() * 1000))
103
104 def __create_signature(self):
105 new_auth = self.__nonce + self.__user_id + self.__api_key
106 h = hmac.new(self.__secret_key.encode('latin-1'), new_auth.encode('latin-1'), hashlib.sha256)
107 binary = h.digest()
108 return base64.b16encode(binary).decode('latin-1').lower()
109
110 def __get_json(self, method, payload):
111 response = requests.get(
112 url = "%s/%s" % (Api.BASE_URL, method),
113 data = json.dumps(payload),
114 headers = self.__get__headers
115 )
116
117 return response.json() if response.status_code == 200 else None
118
119 def __post_json(self, method, payload):
120 response = requests.post(
121 url= "%s/%s" % (Api.BASE_URL, method),
122 data=json.dumps(payload),
123 headers= self.__post__headers
124 )
125
126 return response.json() if response.status_code == 200 else None