· 5 years ago · Apr 19, 2020, 07:20 PM
1const amqp = require('amqplib/callback_api');
2const mysql = require('mysql');
3const bodyParser = require('body-parser')
4const express = require('express');
5const Ticket = require('./Ticket')
6const app = express();
7const ROUTING_KEYS = require('./RoutingKeys');
8console.log("Routing keys : ", ROUTING_KEYS)
9
10const EXCHANGE_NAME = "Ticket"
11
12var connection = mysql.createConnection({
13 host: 'localhost',
14 user: 'theophile',
15 password: 'password',
16});
17
18function initialisation(callback) {
19 connection.connect(function (err) {
20 if (err) throw err;
21 console.log("Connected!");
22
23 createAndUseDatabase(() => {
24 createTable(callback);
25 })
26 });
27 amqp.connect('amqp://localhost', function (err, connection) {
28 if (err) throw err;
29 console.log("Connected to RabbitMQ")
30 subscribeTicket(connection);
31 });
32
33 app.use(express.urlencoded({ extended: true }));
34 app.use(express.json());
35 configureRoutes();
36 app.listen(3000);
37}
38
39function configureRoutes() {
40 app.post("/ticket/create", (req, res, next) => {
41 console.log(req.body)
42 Ticket.create(req.body);
43 res.json({});
44 })
45}
46
47function subscribeTicket(connection) {
48 connection.createChannel(function (err, channel) {
49 if (err)
50 throw err;
51
52 channel.assertExchange(EXCHANGE_NAME, 'direct', {
53 durable: false
54 });
55
56 channel.assertQueue('',
57 {
58 exclusive: true
59 },
60 function (err, q) {
61 if (err)
62 throw err;
63
64 Object.keys(ROUTING_KEYS).forEach(key => {
65 channel.bindQueue(q.queue, EXCHANGE_NAME, ROUTING_KEYS[key]);
66 console.log("Queue binded to routing key : %s", ROUTING_KEYS[key])
67 })
68
69 channel.consume(q.queue, function (msg) {
70 console.log(" [x] %s: '%s'", msg.fields.routingKey, msg.content.toString());
71 let dataReceived = JSON.parse(msg.content);
72
73 addToEventStore({
74 internalId: dataReceived.option1,
75 other: dataReceived.option2,
76 operation: msg.fields.routingKey
77 })
78 }, {
79 noAck: true
80 });
81 });
82 });
83}
84
85function createAndUseDatabase(callback) {
86 connection.query("CREATE DATABASE IF NOT EXISTS EventStoreDB", function (err, result) {
87 if (err) throw err;
88 console.log("Database EventStoreDB created");
89 connection.query("USE EventStoreDB", (err, result) => {
90 if (err) throw err;
91 console.log("Using EventStoreDB")
92 if (callback !== undefined)
93 callback();
94 });
95 });
96}
97
98function createTable(callback) {
99 connection.query(`CREATE TABLE IF NOT EXISTS EventStore (
100 uid INT(6) UNSIGNED AUTO_INCREMENT PRIMARY KEY,
101 aid INT(6) NOT NULL,
102 timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
103 internalId INT(6) NOT NULL,
104 other VARCHAR(30),
105 operation VARCHAR(30) NOT NULL
106 )`,
107 function (error, results, fields) {
108 if (error) throw error;
109 console.log('Table EventStore created');
110 if (callback != undefined)
111 callback();
112 }
113 );
114}
115
116function addToEventStore(data) {
117 function addToDatabase(aid, internalId, other, operation) {
118 connection.query(`INSERT INTO EventStore (aid, internalId, other, operation)
119 VALUES (${aid}, ${internalId}, ${other || null}, '${operation}')`,
120 function (err, results, fields) {
121 if (err)
122 throw err;
123 console.log(results, fields);
124 });
125 }
126
127 if (data.operation == ROUTING_KEYS.TicketCreated) {
128 addToDatabase(1, data.internalId, data.other, data.operation)
129 } else {
130 connection.query(`SELECT max(aid) from EventStore where internalId = ${data.internalId}`,
131 function (error, results, fields) {
132 if (error) throw error;
133 let aid = results[0]["max(aid)"];
134 console.log("L'AID C'EST : ", aid)
135
136 addToDatabase(aid + 1, data.internalId, data.other, data.operation);
137 }
138 );
139 }
140}
141
142initialisation(() => {
143 addToEventStore({
144 operation: ROUTING_KEYS.TicketCreated,
145 internalId: 2,
146 other: null
147 })
148});
149/*
150amqp.connect('amqp://localhost', function (error0, connection) {
151 if (error0) {
152 throw error0;
153 }
154 connection.createChannel(function (error1, channel) {
155 if (error1) {
156 throw error1;
157 }
158 var queue = 'hello';
159 var msg = 'Hello world';
160
161 channel.assertQueue(queue, {
162 durable: false
163 });
164
165 channel.sendToQueue(queue, Buffer.from(msg));
166 console.log(" [x] Sent %s", msg);
167 });
168});
169*/
170
171// uid | aid | timestamp | internalId | externalId | Operation
172// 1 | 1 | - | 1 | null | TicketCreated
173// 1 | 2 | - | 1 | Lady Gaga | TicketNameSet
174// 1 | 3 | - | 1 | 19/04/20 | TicketDateSet
175
176/*
177CREATE TABLE EventStore (
178uid INT(6) UNSIGNED AUTO_INCREMENT PRIMARY KEY,
179aid INT(6) NOT NULL,
180timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
181internalId INT(6) NOT NULL,
182externalId INT(6),
183operation VARCHAR(30) NOT NULL
184)
185*/