· 4 years ago · Feb 18, 2021, 02:24 PM
1import Event from "./Event";
2import Command from "./Commands/Command";
3import ProductCommand from "./Commands/ProductCommand";
4import Database from "./Database";
5import {Pool} from "pg";
6import Query from "./Query";
7import Publisher, {PublisherDTO} from "./Publisher";
8import {SubscriberDTO} from "./Subscriber";
9import axios from "axios";
10import {AxiosResponse} from "axios";
11import PublisherCommandHandler from "./Commands/PublisherCommandHandler";
12
13export default class SubmitHandler {
14 private handlers: Map<string, Command> = new Map<string, Command>();
15 private eventStore: Map<string, Map<string, Event[]>> = new Map<string, Map<string, Event[]>>();
16
17 private static connectionString: string = process.env.DATABASE_URL || `Postgresql://postgres:deinemama@localhost:5432/testshopdb`;
18 private static DBSconnectionString: string = `Postgresql://postgres:deinemama@localhost:5432/`;
19 private static warehouseUrl: string = 'https://deine-mama-server-shop.herokuapp.com';
20
21 public dataBase: Database = new Database();
22
23 constructor() {
24 this.handlers.set("products", new ProductCommand(this));
25 this.handlers.set("publishers", new PublisherCommandHandler(this));
26 }
27
28 async start() {
29 await this.createDatabase();
30 await this.createEventTable();
31 await this.subscribeAtWarehouse();
32 }
33
34 async subscribeAtWarehouseForTopic(publisherId: string, topic: string, subscriberId: string) {
35 // subscribe at warehouse
36 let publisher: Publisher = null;
37 const publisherDTO: PublisherDTO = {
38 id: publisherId,
39 url: SubmitHandler.warehouseUrl + '/api/query',
40 topic,
41 lastKnown: '1970-01-01T00:00:00.000Z'
42 };
43
44 publisher = this.dataBase.havePublisher(publisherDTO);
45
46 const subscriberDTO: SubscriberDTO = {
47 id: subscriberId,
48 url: SubmitHandler.warehouseUrl + '/api/submit',
49 topic,
50 lastKnown: publisher.lastKnown
51 };
52
53 const query: Query = {
54 topic: 'subscribers',
55 payload: subscriberDTO
56 };
57
58 try {
59 const response: AxiosResponse = await axios.post(publisher.url, query);
60 const eventList = response.data.eventList;
61
62 for (const event of eventList) {
63 const handler = this.handlers.get(event.topic);
64 await handler.invokeAction(event);
65 }
66
67 console.log(eventList);
68 } catch (error) {
69 console.log('warehouse is offline');
70 }
71 }
72
73 async subscribeAtWarehouse() {
74 await this.subscribeAtWarehouseForTopic('warehouse-products', 'products', 'shop-products');
75 await this.subscribeAtWarehouseForTopic('warehouse-palettes', 'palettes', 'shop-palettes');
76 }
77
78 async clearDatabase() {
79 const sql: string = "DROP TABLE IF EXISTS events;";
80 const pool = new Pool(process.env.DATABASE_URL ? {
81 connectionString: SubmitHandler.connectionString,
82 ssl: {
83 rejectUnauthorized: false
84 }
85 } : { connectionString: SubmitHandler.connectionString});
86 const res = await pool.query(sql);
87 console.log("drop events table done");
88 await pool.end();
89 }
90
91 async createEventTable() {
92 const pool = new Pool(process.env.DATABASE_URL ? {
93 connectionString: SubmitHandler.connectionString,
94 ssl: {
95 rejectUnauthorized: false
96 }
97 } : { connectionString: SubmitHandler.connectionString});
98
99 const sql: string = `
100 CREATE TABLE IF NOT EXISTS events (
101 topic varchar NOT NULL,
102 id varchar NOT NULL,
103 storetime varchar NOT NULL,
104 json varchar NOT NULL,
105 PRIMARY KEY (topic, id)
106 );`;
107
108
109 try {
110 const res = await pool.query(sql);
111 console.log("create events table done");
112 } catch (error) {
113 console.log(error);
114 }
115 await pool.end();
116 }
117
118 async createDatabase() {
119 const pool = new Pool( process.env.DATABASE_URL ? {
120 connectionString: SubmitHandler.DBSconnectionString,
121 ssl: {
122 rejectUnauthorized: false
123 }
124 } : { connectionString: SubmitHandler.DBSconnectionString });
125
126 try {
127 const res = await pool.query("CREATE DATABASE testshopdb;");
128 console.log("create testshopdb done");
129 } catch (error) {
130 console.log("Database exists");
131 }
132 await pool.end();
133 }
134
135 async handle(req, res) {
136 const clientData = req.body;
137 const event: Event = clientData;
138
139 const handler = this.handlers.get(event.topic);
140 await handler.invokeAction(event);
141
142 const result = {
143 status: "OK",
144 clientData
145 }
146
147 res.json(result);
148 }
149
150 async getLatestEvents(topic: string): Promise<Event[]> {
151 const eventList: Event[] = [];
152
153 const pool = new Pool(process.env.DATABASE_URL ? {
154 connectionString: SubmitHandler.connectionString,
155 ssl: {
156 rejectUnauthorized: false
157 }
158 } : { connectionString: SubmitHandler.connectionString});
159
160 const sql = `
161 SELECT topic, id, storetime, json
162 FROM events
163 WHERE topic=$1;`;
164
165
166 try {
167 const res = await pool.query(sql, [topic]);
168
169 for (const row of res.rows) {
170 const jsonString: string = row.json;
171 const event: Event = JSON.parse(jsonString);
172 eventList.push(event);
173 }
174
175 } catch (error) {
176 console.log("Error, while querying for an eventlist");
177 }
178 await pool.end();
179
180 return eventList;
181 }
182
183 async getEventHistory(topic: string, id: string) : Promise<Event[]> {
184 const eventList: Event[] = [];
185
186 const pool = new Pool(process.env.DATABASE_URL ? {
187 connectionString: SubmitHandler.connectionString,
188 ssl: {
189 rejectUnauthorized: false
190 }
191 } : { connectionString: SubmitHandler.connectionString});
192
193 const sql = `
194 SELECT topic, id, storetime, json
195 FROM events
196 WHERE topic=$1 AND id=$2;`;
197
198
199 try {
200 const res = await pool.query(sql, [topic, id]);
201
202 for (const row of res.rows) {
203 const jsonString: string = row.json;
204 const event: Event = JSON.parse(jsonString);
205 eventList.push(event);
206 }
207
208 } catch (error) {
209
210 }
211 await pool.end();
212
213 return eventList;
214 }
215
216 async putIntoEventStore(event: Event) {
217 const pool = new Pool(process.env.DATABASE_URL ? {
218 connectionString: SubmitHandler.connectionString,
219 ssl: {
220 rejectUnauthorized: false
221 }
222 } : { connectionString: SubmitHandler.connectionString});
223
224 const json: string = JSON.stringify(event, null, 3);
225 let sql = `
226 INSERT INTO events (topic, id, storetime, json)
227 VALUES ($1, $2, '2021-01-19', $3);`;
228
229
230 try {
231 const res = await pool.query(sql, [event.topic, event.id, json]);
232 } catch (error) {
233 sql = `
234 UPDATE events
235 SET storetime='2021-01-19', json='${json}'
236 WHERE topic='${event.topic}' AND id='${event.id}';`;
237
238 try {
239 const res = await pool.query(sql);
240 } catch (error) { }
241 }
242 await pool.end();
243 }
244}
245