· 6 years ago · Feb 06, 2020, 05:26 AM
1const API = require('../../utils/api');
2
3const
4 sql_query = {
5 'yelo.tb_jobs': `
6 SELECT
7 ??
8 FROM
9 yelo.tb_jobs
10 WHERE
11 (updated_at >= ?
12 OR (updated_at IS NULL
13 AND creation_datetime >= ? )
14 ) AND
15 marketplace_user_id = ?
16 ORDER BY job_id
17 `,
18 'yelo.tb_marketplace_storefronts': `
19 SELECT
20 ??
21 FROM
22 yelo.tb_marketplace_storefronts
23 WHERE
24 ( update_datetime >= ?
25 OR ( update_datetime IS NULL
26 AND creation_datetime >= ? )
27 ) AND
28 user_id IN (
29 SELECT
30 marketplace_storefront_user_id
31 FROM
32 yelo.tb_marketplace_storefront_mapping
33 WHERE
34 marketplace_user_id = ?
35 )
36 ORDER BY id
37 `,
38 'yelo.tb_app_settings': `
39 SELECT
40 ??
41 FROM
42 yelo.tb_app_settings
43 WHERE
44 ( updated_at >= ?
45 OR ( updated_at IS NULL
46 AND creation_datetime >= ? )
47 ) AND
48 user_id = ?
49 ORDER BY id
50 `,
51 'yelo.tb_job_payment_details': `
52 SELECT
53 ??
54 FROM
55 yelo.tb_job_payment_details
56 WHERE
57 ( update_datetime >= ?
58 OR ( update_datetime IS NULL
59 AND creation_datetime >= ? )
60 ) AND
61 user_id = ?
62 ORDER BY id
63 `,
64 'yelo.tb_vendors': `
65 SELECT
66 ??
67 FROM
68 yelo.tb_vendors
69 WHERE
70 ( updated_at >= ?
71 OR ( updated_at IS NULL
72 AND creation_datetime >=? )
73 ) AND
74 marketplace_user_id = ?
75 ORDER BY vendor_id
76 `,
77 'yelo.tb_job_sellers': `
78 SELECT
79 ??
80 FROM
81 yelo.tb_job_sellers
82 WHERE
83 ( update_datetime >= ?
84 OR ( update_datetime IS NULL
85 AND created_at >= ? )
86 ) AND
87 user_id IN (
88 SELECT
89 marketplace_storefront_user_id
90 FROM
91 yelo.tb_marketplace_storefront_mapping
92 WHERE
93 marketplace_user_id = ?
94 )
95 ORDER BY id
96 `,
97 'yelo.tb_order_items': `
98 SELECT
99 ??
100 FROM
101 yelo.tb_order_items ot
102 WHERE
103 (ot.updated_at >= ?
104 OR(ot.updated_at IS NULL
105 AND ot.created_at >= ?)
106 )
107 AND job_id IN (
108 SELECT
109 job_id
110 FROM
111 yelo.tb_jobs
112 WHERE
113 marketplace_user_id = ?
114 )
115 ORDER BY order_item_id
116 `,
117 'yelo.tb_products': `
118 SELECT
119 ??
120 FROM
121 yelo.tb_products
122 WHERE
123 ( updation_datetime >= ?
124 OR ( updation_datetime IS NULL
125 AND creation_datetime >= ? )
126 ) AND
127 user_id IN (
128 SELECT
129 marketplace_storefront_user_id
130 FROM
131 yelo.tb_marketplace_storefront_mapping
132 WHERE
133 marketplace_user_id = ?
134 )
135 ORDER BY product_id
136 `,
137 'yelo.tb_admin_catalogue': `
138 SELECT
139 ??
140 FROM
141 yelo.tb_admin_catalogue
142 WHERE
143 ( update_datetime >= ?
144 OR ( update_datetime IS NULL
145 AND creation_datetime >=? )
146 ) AND
147 user_id = ?
148 ORDER BY catalogue_id
149 `,
150 'tookan.tb_users': `
151 SELECT
152 ??
153 FROM
154 tookan.tb_users
155 WHERE
156 user_id = ?
157 ORDER BY user_id
158 `,
159 'tookan.tb_teams': `
160 SELECT
161 ??
162 FROM
163 tookan.tb_teams
164 WHERE
165 ( updated_at >= ?
166 OR ( updated_at IS NULL
167 AND creation_datetime >= ? )
168 ) AND
169 user_id = ?
170 ORDER BY team_id
171 `,
172 'tookan.tb_jobs': `
173 SELECT
174 ??
175 FROM
176 tookan.tb_jobs
177 WHERE
178 ( updated_at >= ?
179 OR ( updated_at IS NULL
180 AND creation_datetime >= ? )
181 ) AND
182 user_id = ?
183 ORDER BY job_id
184 `,
185 'tookan.tb_fleet_teams': `
186 SELECT
187 ??
188 FROM
189 tookan.tb_fleet_teams
190 WHERE
191 ( updated_at >= ?
192 OR ( updated_at IS NULL
193 AND creation_datetime >=? )
194 ) AND
195 user_id = ?
196 ORDER BY team_id
197 `,
198 'tookan.tb_fleets': `
199 SELECT
200 ??
201 FROM
202 tookan.tb_fleets
203 WHERE
204 ( updated_at >= ?
205 OR ( updated_at IS NULL
206 AND creation_datetime >=? )
207 ) AND
208 user_id = ?
209 ORDER BY fleet_id
210 `,
211 'tookan.tb_customers': `
212 SELECT
213 ??
214 FROM
215 tookan.tb_customers
216 WHERE
217 ( updated_at >= ?
218 OR ( updated_at IS NULL
219 AND creation_datetime >=? )
220 ) AND
221 user_id = ?
222 ORDER BY customer_id
223 `,
224 'jugnoo_live.tb_users': `
225 SELECT
226 ??
227 FROM
228 jugnoo_live.tb_users
229 WHERE
230 ( updated_at >= ?
231 OR ( updated_at IS NULL
232 AND created_at >= ? )
233 ) AND
234 operator_id = ?
235 ORDER BY user_id
236 `,
237 'jugnoo_live.tb_drivers': `
238 SELECT
239 ??
240 FROM
241 jugnoo_live.tb_drivers
242 WHERE
243 ( updated_at >= ?
244 OR ( updated_at IS NULL
245 AND created_at >= ? )
246 ) AND
247 operator_id = ?
248 ORDER BY id
249 `,
250 'jugnoo_live.tb_engagements': `
251 SELECT
252 ??
253 FROM
254 jugnoo_live.tb_engagements
255 WHERE
256 ( updated_at >= ?
257 OR ( updated_at IS NULL
258 AND created_at >= ? )
259 ) AND
260 operator_id_x = ?
261 ORDER BY engagement_id
262 `,
263 'jugnoo_live.tb_session': `
264 SELECT
265 ??
266 FROM
267 jugnoo_live.tb_session
268 WHERE
269 ( updated_at >= ?
270 OR ( updated_at IS NULL
271 AND created_at >= ? )
272 ) AND
273 operator_id = ?
274 ORDER BY session_id
275 `,
276 'jugnoo_live.tb_cities': `
277 SELECT
278 ?
279 FROM
280 jugnoo_live.tb_cities
281 WHERE
282 ( updated_at >= ?
283 OR ( updated_at IS NULL
284 AND created_at >= ? )
285 ) AND
286 city_id IN(
287 SELECT
288 city_id
289 FROM
290 jugnoo_live.tb_operator_cities
291 WHERE
292 operator_id = ?
293 )
294 ORDER BY city_id
295 `,
296 'jugnoo_live.tb_fare': `
297 SELECT
298 ??
299 FROM
300 jugnoo_live.tb_fare
301 WHERE
302 ( updated_at >= ?
303 OR ( updated_at IS NULL
304 AND created_at >= ? )
305 ) AND
306 operator_id = ?
307 ORDER BY id
308 `,
309 //---------------------------
310 'jugnoo_live.tb_vehicle_type':`
311 SELECT
312 ??
313 FROM
314 jugnoo_live.tb_vehicle_type
315 WHERE
316 ( updated_at >= ?
317 OR ( updated_at IS NULL
318 AND created_at >= ? )
319 )
320
321 `
322 }
323;
324
325exports.getUserData = class extends API {
326
327 // type = 'yelo', 'tookan', 'jugnoo'
328 async getUserData() {
329 let response = [];
330 const user = [
331 {
332 userId: 226118,
333 sourceDb: 'tookanRead',
334 destinationDb: 'tupucaAzure',
335 table: ['tb_users', 'tb_teams','tb_jobs','tb_fleet_teams','tb_fleets','tb_customers'],
336 type: 'tookan'
337 },
338 {
339 userId: 40148,
340 sourceDb: 'tookanRead',
341 destinationDb: 'tupucaAzure',
342 table: ['tb_jobs', 'tb_marketplace_storefronts','tb_app_settings','tb_job_payment_details','tb_vendors','tb_job_sellers','tb_order_items','tb_products','tb_admin_catalogue'],
343 type: 'yelo'
344 }
345 // ,
346 // {
347 // userId: 3451,
348 // sourceDb: 'read',
349 // destinationDb: '',
350 // table: ['tb_users', 'tb_drivers','tb_engagements','tb_session','tb_cities','tb_fare','tb_vehicle_type'],
351 // type: 'jugnoo'
352 // }
353 ];
354
355 for(let data of user) {
356 const dataMigration = new DataMigration(data.userId, data.sourceDb, data.destinationDb, data.table, data.type);
357 response.push(await dataMigration.DataMigration());
358 }
359 return response;
360
361 }
362};
363
364class DataMigration extends API {
365
366 constructor(userId, sourceDb, destinationDb, table, type) {
367 super();
368
369 this.limit = 5000;
370 this.userId = userId;
371 this.sourceDb = sourceDb;
372 this.destinationDb = destinationDb;
373 this.table = table;
374 this.type = type;
375 }
376
377 async DataMigration() {
378
379 const
380 tables = await this.getDbNameWithTable(this.table, this.type);
381
382 let response = [];
383 this.columns = await this.getSchema(tables, this.destinationDb);
384 this.destinationConnection = this.destinationDb;
385 this.sourceConnection = this.sourceDb;
386
387 for(const table of tables) {
388 const warehouse = new WarehouseTable(table, this.userId, this);
389 response.push(await warehouse.main());
390 }
391 return response;
392 }
393
394 async getSchema(tables, connection) {
395 return await this.mysql.query(`
396 SELECT
397 CONCAT(TABLE_SCHEMA, ".", TABLE_NAME) table_name,
398 column_name
399 FROM
400 information_schema.columns
401 WHERE
402 column_name NOT LIKE("%_warehouse%")
403 HAVING
404 table_name IN (?)`,
405 [tables],
406 connection
407 );
408 }
409
410 async getDbNameWithTable(arr, type) {
411 if(type == 'tookan') {
412 arr = arr.map(i => 'tookan.' + i);
413
414 } else if(type == 'yelo') {
415 arr = arr.map(i => 'yelo.' + i);
416
417 } else if(type == 'jugnoo') {
418 arr = arr.map(i => 'jugnoo_live.' + i);
419
420 }
421
422 return arr;
423 }
424}
425
426class WarehouseTable {
427
428 constructor(table, userId, warehouse) {
429
430 this.table = table;
431 this.userId = userId;
432 this.warehouse = warehouse;
433 this.offset = 0;
434 }
435
436 async main() {
437
438 const
439 tableSyncStatus = {table: this.table, attempts: []}
440 ;
441 for await (const data of this.fetchData()) {
442
443 if(!data.length) {
444
445 tableSyncStatus.attempts.push({selectCount: data.length});
446 break;
447 }
448
449 (async () => {
450
451 const result = await this.insert(data);
452
453 tableSyncStatus.attempts.push({selectCount: data.length, insert: result});
454
455 })();
456
457 }
458
459 return tableSyncStatus;
460 }
461
462 get whitelistedColumns() {
463
464 if(!this.columnsList) {
465
466 this.columnsList = this.warehouse.columns.filter(x => (x.table_name == this.table)).map(x => x.column_name);
467 }
468
469 return this.columnsList;
470 }
471
472 async getLastUpdatedAt() {
473 if(this.last_updated_at) {
474
475 return this.last_updated_at
476 }
477
478 const [last_updated_at] = await this.warehouse.mysql.query(`
479 SELECT
480 MAX(updated_at_warehouse) updated_at_warehouse
481 FROM
482 ??
483 `,
484 [this.table],
485 this.warehouse.destinationConnection
486 );
487
488 return this.last_updated_at = last_updated_at.updated_at_warehouse;
489
490 }
491
492 async * fetchData() {
493
494 while(1) {
495
496 const newRecords = await this.warehouse.mysql.query(
497 sql_query[this.table] + ` LIMIT ${this.warehouse.limit} OFFSET ${this.offset}`,
498 [this.whitelistedColumns, await this.getLastUpdatedAt(), await this.getLastUpdatedAt(), this.userId],
499 this.warehouse.sourceConnection
500 );
501
502 this.offset += this.warehouse.limit;
503 yield newRecords.map(data => Object.values(data));
504
505 if(!newRecords.length) {
506 break;
507 }
508 }
509
510 }
511
512 async insert(formatedData) {
513
514 let inser_sql = `INSERT INTO ?? (??) VALUES ? ON duplicate key update `;
515 const param = [this.table, this.whitelistedColumns, formatedData];
516
517 for (const key of this.whitelistedColumns) {
518 inser_sql += ` ?? = VALUES(??),`;
519 param.push(key, key);
520 }
521
522 inser_sql = inser_sql.slice(0, -1);
523 return await this.warehouse.mysql.query(inser_sql, param, this.warehouse.destinationConnection);
524 }
525}