· 7 years ago · Apr 11, 2018, 07:02 PM
1package com.exceedlabs.spark.jobs.util
2
3import java.io.File
4
5import com.amazonaws.{AmazonClientException, AmazonServiceException}
6import com.amazonaws.auth.BasicAWSCredentials
7import com.amazonaws.services.s3.transfer.Transfer.TransferState
8import com.amazonaws.services.s3.transfer.{MultipleFileDownload, TransferManager}
9import org.slf4j.{Logger, LoggerFactory}
10
11object Downloader {
12 def logger: Logger = LoggerFactory.getLogger(this.getClass)
13
14 def downloadFromS3(bucketName: String, accessKey: String, secretKey: String): TransferState = {
15 val transferManager: TransferManager = new TransferManager(new BasicAWSCredentials(accessKey, secretKey))
16 val dir: File = new File(s"../download")
17
18 try {
19 val download: MultipleFileDownload = transferManager.downloadDirectory(bucketName, null, dir)
20
21 logger.info(s"State of the transfer: ${download.getState()}")
22 /**
23 * Blocking call to wait until the download finishes.
24 */
25 download.waitForCompletion()
26 transferManager.shutdownNow()
27 logger.info(s"State of the transfer: ${download.getState()}")
28 download.getState()
29 } catch {
30 case ace: AmazonClientException => {
31 logger.error(s"Errors encountered in the client while making the request or handling the response: ${ace.getMessage}")
32 TransferState.Failed
33 }
34 case ase: AmazonServiceException => {
35 logger.error(s"Errors occurred in Amazon S3 while processing the request: ${ase.getMessage}")
36 ase.printStackTrace()
37 TransferState.Failed
38 }
39 case ie: InterruptedException => {
40 logger.error(s"Transfer interrupted: ${ie.getMessage}")
41 ie.printStackTrace()
42 TransferState.Canceled
43 }
44 case e: Exception => {
45 logger.error(s"Error: ${e.getMessage}")
46 e.printStackTrace()
47 TransferState.Canceled
48 }
49 }
50 }
51
52}