· 6 years ago · Oct 11, 2019, 05:26 PM
1// Initialise Kafka
2var kafka = require('kafka-node'),
3 Producer = kafka.Producer,
4 client = new kafka.KafkaClient({kafkaHost: 'kafka:9092'}),
5 producer = new Producer(client)
6 KeyedMessage = kafka.KeyedMessage
7
8// Import database Interactions
9const {
10 bitmexKeysAll
11} = require('../../utils/db')
12
13// Set module exports to exports to make the exporting cleaner
14var exports = module.exports = {};
15
16// Set up the delay
17const delay = ms => new Promise(res => setTimeout(res, ms));
18
19const ExceptionHandler = require('../../utils/ExceptionHandler')
20const { RESPONSE_CODES } = require('../../utils/constants')
21
22
23// TODO : Use the 1m trade bin for BTC
24//Open a websocket to bitmex to retrieve the trade bins for the speecified asset and the specified timeframe
25exports.btc1mTradeBin = async () => {
26 // Retrive all of the API keys for each of the bots
27 let APIkeys = await bitmexKeysAll()
28
29 // Asign to a var the object for the first api key
30 // We realy only need one API key in this instance so no need to loop through al of them
31 let bitmexAPI = APIkeys[0].key
32
33 // Get the position from the bitmex websocket
34 let btc1m = new Promise(async function(resolve, reject) {
35 await bitmexAPI.monitorStream('XBTUSD', 'tradeBin1m', data =>{
36 // Data
37 let priceBin = JSON.stringify(data)
38
39 // Set up the payload Topic Message and Partition
40 payloads = [
41 { topic: 'price', messages: priceBin , partition: 0 },
42 ],
43
44 // Send the message to the Kafka Broker
45 producer.send(payloads, function (err, res) {
46 })
47 if (reject){
48 throw new ExceptionHandler(RESPONSE_CODES.APPLICATION_ERROR, 'PAYLOAD ISSUE : ' + global.jsonErrorMessage)
49 }
50 else{
51 resolve()
52 }
53 });
54 })
55
56 await btc1m.catch(function(error) {
57 console.error("boi shit failed");
58 })
59
60}