· 6 years ago · May 23, 2019, 09:56 AM
1const uuid = require('uuid/v4')
2const config = require('config')
3const Minio = require('minio')
4const { isString, isUndefined } = require('ramda-adjunct')
5const pkg = require('./../package.json')
6const {
7 BucketNotExistsError,
8 BucketAlreadyExistsError,
9 MinioPingError,
10 MinioInitializationError,
11} = require('./../error')
12
13const {
14 DEFAULT_REGION,
15 URL_EXPIRE,
16} = require('./../constants')
17
18const FsService = {
19 name: 'fs',
20 settings: {
21 endPoint: config.get('minio.endPoint'),
22 port: config.get('minio.port'),
23 accessKey: config.get('minio.accessKey'),
24 secretKey: config.get('minio.secretKey'),
25 useSSL: false,
26 minioHealthCheckInterval: 5000,
27 },
28 actions: {
29 version: () => pkg.version,
30 /**
31 * Create a new Bucket
32 *
33 * @actions
34 *
35 * @param {string} bucket - The name of a bucket
36 * @param {string} region - The region to create the bucket in. Defaults to 'us-east-1'
37 */
38 'bucket.create': {
39 params: {
40 bucket: 'string',
41 region: {
42 type: 'string',
43 optional: true,
44 },
45 },
46 async handler (ctx) {
47 const { bucket, region } = ctx.params
48 if (await this.bucketExists(bucket)) {
49 throw new BucketAlreadyExistsError()
50 }
51 await this.makeBucket(bucket, region)
52 return `Bucket with name ${bucket} was created`
53 },
54 },
55 /**
56 * Checks if a bucket exists.
57 *
58 * @param {string} bucket = Name of the bucket
59 */
60 'bucket.exists': {
61 params: {
62 bucket: 'string',
63 },
64 handler (ctx) {
65 return this.minio.bucketExists(ctx.params.bucket)
66 },
67 },
68 /**
69 * List all buckets
70 */
71 'bucket.list': {
72 description: 'Lists all buckets.',
73 handler () {
74 return this.minio.listBuckets().then(buckets => (isUndefined(buckets) ? [] : buckets))
75 },
76 },
77 /**
78 * Removes a bucket
79 *
80 * @actions
81 * @param {string} bucket = Name of the bucket
82 */
83 'bucket.destroy': {
84 params: {
85 bucket: 'string',
86 },
87 handler (ctx) {
88 return this.minio.removeBucket(ctx.params.bucket)
89 },
90 },
91 /**
92 * Lists all objects in a bucket.
93 *
94 * @actions
95 * @param {string} bucket - Name of the bucket
96 * @param {string} prefix - The prefix of the objects that should be listed
97 * (optional, default '').
98 * @param {boolean} recursive - `true` indicates recursive style listing and false indicates
99 * directory style listing delimited by '/'. (optional, default `false`).
100 *
101 * @returns {PromiseLike<Object[]|Error>}
102 */
103 'object.list': {
104 params: {
105 bucket: { type: 'string' },
106 prefix: { type: 'string', optional: true },
107 recursive: { type: 'boolean', optional: true },
108 },
109 handler (ctx) {
110 return this.Promise.resolve(ctx.params)
111 .then(({ bucket, prefix = '', recursive = false }) => new this.Promise((resolve, reject) => {
112 try {
113 const stream = this.minio.listObjects(bucket, prefix, recursive)
114 const objects = []
115 stream.on('data', el => objects.push(el))
116 stream.on('end', () => resolve(objects))
117 stream.on('error', reject)
118 } catch (e) {
119 reject(e)
120 }
121 }))
122 },
123 },
124 /**
125 * Lists partially uploaded objects in a bucket.
126 *
127 * @actions
128 * @param {string} bucketName - Name of the bucket
129 * @param {string} prefix - The prefix of the objects that should
130 * be listed (optional, default '').
131 * @param {boolean} recursive - `true` indicates recursive style listing and false
132 * indicates directory style listing delimited by '/'. (optional, default `false`).
133 *
134 * @returns {PromiseLike<Object[]|Error>}
135 */
136 'object.listIncompleteUploads': {
137 params: {
138 bucket: { type: 'string' },
139 prefix: { type: 'string', optional: true },
140 recursive: { type: 'boolean', optional: true },
141 },
142 handler (ctx) {
143 return this.Promise.resolve(ctx.params)
144 .then(({ bucket, prefix = '', recursive = false }) => new this.Promise((resolve, reject) => {
145 try {
146 const stream = this.minio.listIncompleteUploads(bucket, prefix, recursive)
147 const objects = []
148 stream.on('data', el => objects.push(el))
149 stream.on('end', () => resolve(objects))
150 stream.on('error', reject)
151 } catch (e) {
152 reject(e)
153 }
154 }))
155 },
156 },
157 'object.put': {
158 params: {
159 bucket: 'string',
160 file: 'object',
161 name: {
162 type: 'string',
163 optional: true,
164 },
165 meta: {
166 type: 'object',
167 optional: true,
168 },
169
170 },
171 async handler (ctx) {
172 const filename = ctx.params.name || uuid
173 const { bucket, file, meta } = ctx.params
174 await this.createBucketIfNotExists(bucket)
175 await this.minio.putObject(
176 bucket,
177 filename,
178 file,
179 meta,
180 )
181 return filename
182 },
183 },
184 /**
185 * Downloads an object as a stream.
186 *
187 * @actions
188 * @param {string} bucket - Name of the bucket
189 * @param {string} name - Name of the object.
190 *
191 * @returns {PromiseLike<ReadableStream|Error>}
192 */
193 'object.get': {
194 params: {
195 bucket: 'string',
196 name: 'string',
197 },
198 handler (ctx) {
199 return this.minio.getObject(
200 ctx.params.bucket,
201 ctx.params.name,
202 )
203 },
204 },
205 /**
206 * Downloads the specified range bytes of an object as a stream.
207 *
208 * @actions
209 * @param {string} bucket - Name of the bucket.
210 * @param {string} name - Name of the object.
211 * @param {number} offset - `offset` of the object from where the stream will start.
212 * @param {number} length - `length` of the object that will be read in the stream (optional,
213 * if not specified we read the rest of the file from the offset).
214 *
215 * @returns {PromiseLike<ReadableStream|Error>}
216 */
217 'object.getPartial': {
218 params: {
219 bucket: { type: 'string' },
220 name: { type: 'string' },
221 offset: { type: 'number' },
222 length: { type: 'number', optional: true },
223 },
224 handler (ctx) {
225 return this.minio.getPartialObject(
226 ctx.params.bucket, ctx.params.name, ctx.params.offset, ctx.params.length,
227 )
228 },
229 },
230 /**
231 * Generates a presigned URL for the provided HTTP method, 'httpMethod'.
232 * Browsers/Mobile clients may point to this URL to directly download objects even
233 * if the bucket is private.
234 * This presigned URL can have an associated expiration time in seconds after which
235 * the URL is no longer valid. The default value is 7 days.
236 *
237 * @actions
238 * @param {string} httpMethod - The HTTP-Method (eg. `GET`). Default value is 'GET'
239 * @param {string} bucket - Name of the bucket.
240 * @param {string} name - Name of the object.
241 * @param {number} expires - Expiry time in seconds. Default value is 7 days. (optional)
242 * @param {object} reqParams - request parameters. (optional)
243 * @param {string} requestDate - An ISO date string, the url will be issued at.
244 * Default value is now. (optional)
245 * @returns {PromiseLike<String|Error>}
246 */
247 'object.url': {
248 params: {
249 httpMethod: {
250 type: 'string',
251 optional: true,
252 },
253 bucket: 'string',
254 name: 'string',
255 expiry: {
256 type: 'number',
257 integer: true,
258 optional: true,
259 },
260 reqParams: { type: 'object', optional: true },
261 requestDate: { type: 'string', optional: true },
262 },
263 async handler (ctx) {
264 return this.Promise.resolve(ctx.params)
265 .then(({
266 httpMethod = 'GET',
267 bucket,
268 name,
269 expires = URL_EXPIRE,
270 reqParams = {},
271 requestDate = new Date(),
272 }) => {
273 let reqDate = requestDate
274 if (isString(reqDate)) {
275 reqDate = new Date(reqDate)
276 }
277
278 return new this.Promise((resolve, reject) => {
279 this.minio.presignedUrl(
280 httpMethod,
281 bucket,
282 name,
283 expires,
284 reqParams,
285 reqDate,
286 (error, url) => {
287 if (error) {
288 reject(error)
289 } else {
290 resolve(url)
291 }
292 },
293 )
294 })
295 })
296 },
297 },
298 /**
299 * Gets metadata of an object.
300 *
301 * @actions
302 * @param {string} bucketName - Name of the bucket.
303 * @param {string} name - Name of the object.
304 *
305 * @returns {PromiseLike<{size: {number}, metaData: {object},
306 * lastModified: {string}, etag: {string}}|Error>}
307 */
308 'object.stat': {
309 params: {
310 bucket: 'string',
311 name: 'string',
312 },
313 handler (ctx) {
314 return this.minio.statObject(ctx.params.bucket, ctx.params.name)
315 },
316 },
317 /**
318 * Removes an Object
319 *
320 * @actions
321 * @param {string} bucket - Name of the bucket.
322 * @param {string} name - Name of the object.
323 *
324 * @returns {PromiseLike<undefined|Error>}
325 */
326 'object.remove': {
327 params: {
328 bucket: { type: 'string' },
329 name: { type: 'string' },
330 },
331 handler (ctx) {
332 return this.minio.removeObject(ctx.params.bucketName, ctx.params.objectName)
333 },
334 },
335 /**
336 * Removes a list of Objects
337 *
338 * @actions
339 * @param {string} bucket - Name of the bucket.
340 * @param {string[]} names - Names of the objects.
341 *
342 * @returns {PromiseLike<undefined|Error>}
343 */
344 'object.removeMany': {
345 params: {
346 bucket: { type: 'string' },
347 names: { type: 'array', items: 'string' },
348 },
349 handler (ctx) {
350 return this.minio.removeObjects(ctx.params.bucketName, ctx.params.objectNames)
351 },
352 },
353 /**
354 * Removes a partially uploaded object.
355 *
356 * @actions
357 * @param {string} bucket - Name of the bucket.
358 * @param {string} name - Name of the object.
359 *
360 * @returns {PromiseLike<undefined|Error>}
361 */
362 'object.removeIncompleteUpload': {
363 params: {
364 bucket: { type: 'string' },
365 name: { type: 'string' },
366 },
367 handler (ctx) {
368 return this.Promise.resolve(ctx.params)
369 .then(({ bucket, name }) => this.minio.removeIncompleteUpload(bucket, name))
370 },
371 },
372 },
373 methods: {
374 createMinioClient () {
375 return new Minio.Client({
376 endPoint: this.settings.endPoint,
377 port: this.settings.port,
378 accessKey: this.settings.accessKey,
379 secretKey: this.settings.secretKey,
380 useSSL: this.settings.useSSL,
381 })
382 },
383 makeBucket (bucketName, region = DEFAULT_REGION) {
384 this.logger.info('bucketName: ', bucketName)
385 return this.minio.makeBucket(bucketName, region)
386 },
387 async assertBucketExist (ctx) {
388 const result = await this.bucketExists(ctx.params.bucket)
389 if (!result) throw new BucketNotExistsError()
390 return true
391 },
392
393 async createBucketIfNotExists (bucket) {
394 const result = await this.bucketExists(bucket)
395 if (!result) await this.makeBucket(bucket)
396 },
397 ping ({ timeout = 5000 } = {}) {
398 return this.Promise.race([
399 this.minio.listBuckets().then(() => true),
400 this.Promise.delay(timeout).then(() => { throw new MinioPingError() }),
401 ])
402 },
403 },
404 created () {
405 this.minio = this.createMinioClient()
406 },
407 started () {
408 return this.Promise.resolve()
409 .then(() => this.ping())
410 .then(() => {
411 if (this.settings.minioHealthCheckInterval) {
412 this.healthCheckInterval = setInterval(
413 () => this.ping().catch(e => this.logger.error('Minio backend can not be reached', e)),
414 this.settings.minioHealthCheckInterval,
415 )
416 return this.healthCheckInterval
417 }
418 return undefined
419 }).catch(e => {
420 throw new MinioInitializationError(e.message)
421 })
422 },
423 stopped () {
424 if (this.healthCheckInterval) clearInterval(this.healthCheckInterval)
425 },
426}
427
428module.exports = FsService