· 7 years ago · Sep 25, 2018, 04:26 AM
1// configs
2const config = require('./config')
3// configure AWS
4const AWS = require('aws-sdk')
5AWS.config.accessKeyId = config.aws.accessKeyId
6AWS.config.secretAccessKey = config.aws.secretAccessKey
7AWS.config.region = config.aws.region
8
9const dynamodb = new AWS.DynamoDB()
10const docClient = new AWS.DynamoDB.DocumentClient({convertEmptyValues: true})
11const MongoClient = require('mongodb').MongoClient
12// Connection URL
13const url = `mongodb://${config.mongodb.host}:${config.mongodb.port}`
14// Database Name
15const dbName = config.mongodb.db
16
17MongoClient.connect(url, {useNewUrlParser: true}, function (err, mongoClient) {
18 createDynamoTable(config.mongodb.collection, dynamodb, mongoClient)
19})
20
21const createDynamoTable = function (collectionName, dynamodb, mongoClient) {
22 const dynamoTable = config.dynamodb
23
24 dynamodb.describeTable({TableName: dynamoTable.TableName}, function (err, data) {
25 if (err) {
26 // table is not exists
27 dynamodb.createTable(dynamoTable, function (err, data) {
28 if (err) {
29 console.error("Unable to create table. Error JSON:", JSON.stringify(err, null, 2))
30 } else {
31 console.log("Created table. Table description JSON:", JSON.stringify(data, null, 2))
32 console.log('Please wait...')
33 sleep(30e3)
34 migrateDocuments(mongoClient.db(dbName), collectionName, mongoClient, dynamoTable.TableName)
35 }
36 })
37 } else {
38 migrateDocuments(mongoClient.db(dbName), collectionName, mongoClient, dynamoTable.TableName)
39 }
40 })
41}
42
43const migrateDocuments = function (mongodb, collectionName, mongoClient, dynamoTableName) {
44 console.log(`Import documents from collection \`${collectionName}\``)
45 // Get the documents collection
46 const collection = mongodb.collection(collectionName)
47 // Find all documents
48 collection.find(function (err, documents) {
49 if (documents !== null) {
50 documents.count(function (err, count) {
51 let processed = 0
52 documents.forEach(function (document) {
53 docClient.put(prepareDocument(dynamoTableName, document), function (err, data) {
54 processed++
55 if (err) {
56 console.error("Unable to add item. Error JSON:", JSON.stringify(err, null, 2))
57 } else {
58 console.log(`Added ${processed} items`)
59 }
60 if (processed === count) {
61 mongoClient.close()
62 }
63 })
64 })
65 })
66 }
67 })
68 /*
69 let processed = 0
70 let numCursors = 3
71 collection.parallelCollectionScan({
72 numCursors: numCursors
73 }, function (err, cursors) {
74 for (let i = 0; i < cursors.length; i++) {
75 // Documents from the cursor
76 cursors[i].on('data', function (doc) {
77 docClient.put(prepareDocument(dynamoTableName, doc), function (err, data) {
78 processed++
79 if (err) {
80 console.error("Unable to add item. Error JSON:", JSON.stringify(err, null, 2))
81 } else {
82 console.log(`Added ${processed} items`)
83 }
84 })
85 })
86
87 // The end signal for each cursor
88 cursors[i].once('end', function () {
89 numCursors--
90 // No more cursors let's ensure we got all results
91 if (numCursors === 0) {
92 mongodb.close()
93 }
94 })
95 }
96 }
97 )*/
98}
99
100const prepareDocument = function (tableName, item) {
101 item._id = item._id.toString()
102
103 // remove ignore fields
104 for (let key in item) {
105 if (config.mongodb.ignoreColumns.indexOf(key) > -1) {
106 delete item[key]
107 }
108 }
109
110 // convert datetime to timestamp
111 let date, time, day, month, year, hour, min, sec;
112 [date, time] = item.created_at.split(' ');
113 [year, month, day] = date.split('-');
114 [hour, min, sec] = time.split(':')
115 item.created_at = Date.UTC(year, month, day, hour, min, sec)
116
117 return {
118 TableName: tableName,
119 Item: prepareItem(item)
120 }
121}
122
123const prepareItem = function (item) {
124 for (let prop in item) {
125 if (typeof item[prop] === 'object') {
126 prepareItem(item[prop])
127 } else if (!isNaN(item[prop])) {
128 item[prop] = Number(item[prop])
129 }
130 }
131
132 return item
133}
134
135const sleep = (milliseconds) => {
136 const start = new Date().getTime()
137 while (1) {
138 if ((new Date().getTime() - start) > milliseconds) {
139 break
140 }
141 }
142}