· 5 years ago · Dec 07, 2019, 10:22 AM
1using Amazon;
2using Amazon.Runtime;
3using Amazon.S3;
4using Amazon.S3.IO;
5using Amazon.S3.Model;
6using Irender.Transport.AWS.S3.Model;
7using Irender.Transport.Events;
8using Irender.Transport.Model;
9using Irender.Utility;
10using System;
11using System.Collections.Generic;
12using System.Diagnostics;
13using System.IO;
14using System.Linq;
15using System.Threading;
16using System.Threading.Tasks;
17
18namespace SnapBoxProject
19{
20 public class S3DownloadFileOptimizeEdited
21 {
22 private static readonly slf4net.ILogger _logger = slf4net.LoggerFactory.GetLogger(typeof(S3DownloadFileOptimizeEdited));
23 static readonly object lockObject = new object();//lock for parallel
24 private readonly AmazonS3Config _config;
25 private readonly AWSCredentials _aWSCredentials;
26 private readonly IAmazonS3 s3Client;
27
28 public S3DownloadFileOptimizeEdited(string accessKey, string secretKey)
29 {
30 this._config = new AmazonS3Config()
31 {
32 RegionEndpoint = RegionEndpoint.APSoutheast1
33 };
34 this._aWSCredentials = new BasicAWSCredentials(accessKey, secretKey);
35 }
36
37 public S3DownloadFileOptimizeEdited(AWSCredentials aWSCredentials, AmazonS3Config config)
38 {
39 this._config = config;
40 this._aWSCredentials = aWSCredentials;
41 this.s3Client = new AmazonS3Client(aWSCredentials, config);
42 }
43 public object ObjectState { get; set; }
44
45 public async Task DownloadFile(string downloadFilePath, string s3BucketName, string s3KeyName)
46 {
47 await DownloadFile(downloadFilePath, s3BucketName, s3KeyName, 0);
48 }
49 public async Task DownloadFile(string downloadFilePath, string s3BucketName, string s3KeyName, long partSize)
50 {
51 var request = new DownloadFileRequest()
52 {
53 FilePath = downloadFilePath,
54 BucketName = s3BucketName,
55 Key = s3KeyName,
56 PartSize = partSize
57 };
58 await DownloadFile(request);
59 }
60
61 private string StanddardAndCombine(string root, string fileName)
62 {
63 if (fileName.StartsWith("\\"))
64 {
65 fileName = fileName.Remove(0, 1);
66 }
67 return Path.Combine(root, fileName);
68 }
69
70 public async Task<string> DownloadFile(DownloadFileRequest request, CancellationToken ct = default)
71 {
72 // Was cancellation already requested?
73 if (ct != null && ct.IsCancellationRequested)
74 {
75 _logger.Info($"Task DownloadFile: {request.Key} was cancelled before it got started.");
76 ct.ThrowIfCancellationRequested();
77 }
78 string eTag = string.Empty;
79 string s3BucketName = request.BucketName;
80 string s3KeyName = request.Key;
81 long partSize = request.PartSize;
82 var fileName = request.FilePath;
83 string fullName = string.Empty;
84 using (var s3Client = CreateS3Client())
85 {
86 // Initiate the download.
87 S3FileInfo s3File = new S3FileInfo(s3Client, s3BucketName, s3KeyName);
88 if (!s3File.Exists) throw new AmazonS3Exception("File not found");
89 request.ContentLength = s3File.Length;
90 //fileName = (request.RemovePrefixKey.Length > 0) ? s3File.Name.Substring(request.RemovePrefixKey.Length).Replace("/", "\\") : request.Key;
91 fullName = s3File.FullName;
92 }
93 string downloadFilePath = StanddardAndCombine(request.LocalDir, fileName);
94
95 var sw = new Stopwatch();
96 sw.Start();
97 //Raise event start download
98 UpDownLoadEventArg baseEvent = new UpDownLoadEventArg() { CurrentObjectName = downloadFilePath };
99 var startDownloadEvent = new UpDownLoadEventArg()
100 {
101 BeginUpDownLoadAt = Stopwatch.GetTimestamp(),
102 EndUpDownLoadAt = -1,
103 NumberOfFailObject = 0,
104 NumberOfSuccessObject = 0,
105 ObjectState = ObjectState,
106 TotalObject = 1,
107 CurrentObjectName = baseEvent.CurrentObjectName,
108 };
109 request.OnDownloadStart(this, startDownloadEvent);
110
111 var endDownloadEvent = new UpDownLoadEventArg()
112 {
113 BeginUpDownLoadAt = startDownloadEvent.BeginUpDownLoadAt,
114 EndUpDownLoadAt = Stopwatch.GetTimestamp(),
115 NumberOfFailObject = 0,
116 NumberOfSuccessObject = 0,
117 ObjectState = ObjectState,
118 TotalObject = 1,
119 CurrentObjectName = downloadFilePath,
120 };
121 try
122 {
123 long contentLength = request.ContentLength;
124 if (contentLength < partSize * 4)//40MB
125 {
126 eTag = await this.DownloadSmallObject(downloadFilePath, s3BucketName, s3KeyName);
127 _logger.Info("Download small file {0} with size: {1}", downloadFilePath, contentLength);
128 }
129 else
130 {
131 downloadProgressArgsBasicInfo = new DownloadProgressArgs()
132 {
133 FilePath = downloadFilePath,
134 TotalBytes = contentLength,
135 PartSize = partSize
136 };
137 eTag = this.DownloadBigObject(request, ct).GetAwaiter().GetResult();
138 _logger.Info("Download big file {0} with size: {1}", downloadFilePath, contentLength);
139 }
140 }
141 catch (Exception ex)
142 {
143 _logger.Error("Download file: {0} with error: {1}", fullName, ex.Message, ex);
144 _logger.Error(ex, ex.Message);
145 endDownloadEvent.Error = ex;
146 endDownloadEvent.NumberOfFailObject = 1;
147 endDownloadEvent.NumberOfSuccessObject = 0;
148
149 request.OnDownloadError(this, new UpDownLoadErrorEventArg()
150 {
151
152 BeginUpDownLoadAt = startDownloadEvent.BeginUpDownLoadAt,
153 EndUpDownLoadAt = Stopwatch.GetTimestamp(),
154 ObjectState = ObjectState,
155 CurrentObjectName = startDownloadEvent.CurrentObjectName,
156 });
157 }
158 finally
159 {
160 sw.Stop();
161 //Raise event finished download
162 //s3Client.Dispose();
163 endDownloadEvent.UpDownLoadMiliseconds = sw.ElapsedMilliseconds;
164 request.OnDownloadFinished(this, endDownloadEvent);
165 _logger.Info("Duration download: {0} minutes", TimeSpan.FromMilliseconds(sw.ElapsedMilliseconds).ToString("c"));
166 downloadProgressArgsBasicInfo = null;
167 }
168 return eTag;
169 }
170
171 private Irender.Transport.Events.DownloadProgressArgs downloadProgressArgsBasicInfo;
172
173 /// <summary>
174 /// Only use for downloading directory
175 /// </summary>
176 /// <param name="keyName"></param>
177 /// <returns></returns>
178 public async Task<string> DownloadSmallObject(string downloadFilePath, string s3BucketName, string s3KeyName)
179 {
180 //if (!FileFolderUtility.IsFileLocked(new FileInfo(downloadFilePath)))
181 //{
182
183 //}
184 //return string.Empty;
185 //FileInfo fileInfo = new FileInfo(downloadFilePath);
186 //bool a = FileFolderUtility.IsFileLocked(new FileInfo(downloadFilePath));
187 try
188 {
189 using (var s3Client = CreateS3Client())
190 {
191 var request = new GetObjectRequest()
192 {
193 BucketName = s3BucketName,
194 Key = s3KeyName,
195 };
196 using (var response = await s3Client.GetObjectAsync(request))
197 {
198 response.WriteResponseStreamToFile(downloadFilePath);
199 return response.ETag;
200 }
201 }
202 }
203 catch(Exception)
204 {
205 return string.Empty;
206 }
207 }
208
209 public Task<string> DownloadBigObject(DownloadFileRequest request, CancellationToken cancellationToken)
210 {
211 return DownloadObjectAsync(request, cancellationToken);
212 }
213
214 public IAmazonS3 CreateS3Client()
215 {
216 lock (lockObject)
217 {
218 if (_aWSCredentials == null || _config == null)
219 {
220 return this.s3Client;
221 }
222 return new AmazonS3Client(_aWSCredentials, _config);
223 }
224 }
225
226 private readonly long defaultPartSize = 5 * (long)Math.Pow(2, 20);
227 private Task<string> DownloadObjectAsync(DownloadFileRequest request, CancellationToken ct)
228 {
229 var downloadObjTask = Task.Factory.StartNew<string>(() =>
230 {
231 var downloadedEtag = string.Empty;
232 string s3BucketName = request.BucketName;
233 string s3KeyName = request.Key;
234 long partSize = request.PartSize;
235 if (partSize <= 0)
236 {
237 partSize = defaultPartSize;
238 }
239 // Create list to store download part responses.
240 try
241 {
242 _logger.Info("Downloading parts");
243
244 //Prepare download
245 Dictionary<int, DownloadPartRequest> downloadPartRequestSummary = PrepareDownload(request);
246 request.DownloadPartRequests = downloadPartRequestSummary;
247 request.PartNumber = downloadPartRequestSummary.Count;
248 //Do download parallel
249 var downloadResponses = DownloadParallel(request, ct).GetAwaiter().GetResult();
250
251 CompleteMultipartDownloadRequest completeRequest = new CompleteMultipartDownloadRequest
252 {
253 BucketName = s3BucketName,
254 Key = s3KeyName,
255 };
256
257 if (downloadResponses != null && downloadResponses.Count > 0)
258 {
259 var reponses = downloadResponses.OrderBy(x => x.PartNumber);
260 completeRequest.PartDownloadeds = reponses.ToList();
261 int retryAttempt = 5;
262 // Complete the download.
263 Exception error = null;
264 CompleteMultipartDownloadResponse completeDownloadResponse = null;
265 while (retryAttempt > 0)
266 {
267 try
268 {
269 completeDownloadResponse = CompleteMultipartDownload(completeRequest);
270 error = null;
271
272 if (request.ContentLength == completeDownloadResponse.ContentLength)//join success
273 {
274 //delete all part file
275 if (completeRequest.PartDownloadeds != null && completeRequest.PartDownloadeds.Count > 0)
276 {
277 var parts = completeRequest.PartDownloadeds;
278 try
279 {
280 for (var i = 0; i < parts.Count; i++)
281 {
282 var part = parts[i];
283 File.Delete(part.FullPartName);
284 }
285 }
286 catch (Exception ex)
287 {
288 _logger.Error(ex, ex.Message);
289 }
290 }
291 downloadedEtag = completeDownloadResponse.ETag;
292 break;
293 }
294
295 }
296 catch (Exception ex)
297 {
298 _logger.Error(ex, ex.Message);
299 error = ex;
300
301 }
302 finally
303 {
304 retryAttempt--;
305 }
306 }
307 if (error != null)
308 {
309 throw error;
310 }
311 if (completeDownloadResponse == null || request.ContentLength != completeDownloadResponse.ContentLength)
312 {
313 throw new Exception($"Join file {request.Key} fail!");
314 }
315 }
316
317 }
318 catch (Exception exception)
319 {
320 _logger.Error("An AmazonS3Exception was thrown: { 0}", exception.Message, exception);
321 throw;
322 }
323 return downloadedEtag;
324 });
325 return downloadObjTask;
326 }
327
328 private CompleteMultipartDownloadResponse CompleteMultipartDownload(CompleteMultipartDownloadRequest request)
329 {
330 //Join multipart here
331 _logger.Info($"Begin CompleteMultipartDownload join part: {request.Key}");
332 CompleteMultipartDownloadResponse response = new CompleteMultipartDownloadResponse()
333 {
334 Key = request.Key,
335 };
336 if (request.PartDownloadeds != null && request.PartDownloadeds.Count > 0)
337 {
338 var parts = request.PartDownloadeds;
339 var filePath = parts[0].FullName;
340 response.ETag = parts[0].ETag;
341 const int DefaultBufferSize = 102400;//100KB
342 // Default value of progress update interval for streaming is 100KB.
343 const long DefaultProgressUpdateInterval = 102400;
344
345 // Make sure the directory exists to write too.
346 FileInfo fi = new FileInfo(filePath);
347 Directory.CreateDirectory(fi.DirectoryName);
348
349 Stream downloadStream = new FileStream(filePath, FileMode.Create, FileAccess.ReadWrite, FileShare.Read, DefaultBufferSize);
350
351 try
352 {
353 for (var i = 0; i < parts.Count; i++)
354 {
355 var part = parts[i];
356 _logger.Info($"CompleteMultipartDownload join part: {i} - file: {part.FullPartName} - etag: {part.ETag}");
357 var attemptRetry = 5;
358 while (attemptRetry > 0)
359 {
360 long current = 0;
361 var partFile = new FileInfo(part.FullPartName);
362 var partStream = new FileStream(part.FullPartName, FileMode.Open, FileAccess.Read, FileShare.Read, DefaultBufferSize);
363 try
364 {
365 using (BufferedStream bufferedStream = new BufferedStream(partStream))
366 {
367 byte[] buffer = new byte[DefaultBufferSize];
368 int bytesRead = 0;
369
370 long totalIncrementTransferred = 0;
371 while ((bytesRead = bufferedStream.Read(buffer, 0, buffer.Length)) > 0)
372 {
373 downloadStream.Write(buffer, 0, bytesRead);
374 current += bytesRead;
375 totalIncrementTransferred += bytesRead;
376
377 if (totalIncrementTransferred >= DefaultProgressUpdateInterval ||
378 current == partFile.Length)
379 {
380 totalIncrementTransferred = 0;
381 }
382 }
383 attemptRetry = 0;
384 }
385 }
386 catch (Exception ex)
387 {
388 _logger.Error(ex, ex.Message);
389 attemptRetry--;
390 if (attemptRetry <= 0)
391 {
392 throw;
393 }
394 }
395 finally
396 {
397 partStream.Close();
398 }
399 }
400 }
401 }
402 finally
403 {
404 response.ContentLength = downloadStream.Length;
405 downloadStream.Close();
406 }
407 }
408 _logger.Info($"End CompleteMultipartDownload join part: {request.Key}");
409 return response;
410 }
411
412 private Task<List<DownloadPartResponse>> DownloadParallel(DownloadFileRequest request, CancellationToken ct)
413 {
414 // Use ParallelOptions instance to store the CancellationToken
415 ParallelOptions po = new ParallelOptions();
416 po.CancellationToken = ct;
417
418 var taskDownloadParallel = Task.Factory.StartNew<List<DownloadPartResponse>>(() =>
419 {
420 Dictionary<int, DownloadPartRequest> downloadPartRequestSummary = request.DownloadPartRequests;
421 Dictionary<int, DownloadPartRequest> downloadPartRequestSummaryOrg = new Dictionary<int, DownloadPartRequest>(downloadPartRequestSummary);
422 Dictionary<int, DownloadPartResponse> downloadResponses = new Dictionary<int, DownloadPartResponse>();
423 var totalUploadItem = downloadPartRequestSummary.Count;
424 var tranferredItem = 0;
425 var remainPartItem = totalUploadItem;
426 var transferredBytes = 0L;
427 while (downloadPartRequestSummary.Count > 0)
428 {
429 List<DownloadPartRequest> downloadPartRequestArray = downloadPartRequestSummary.Values.ToList();
430 Parallel.ForEach(
431 // The input intervals
432 downloadPartRequestArray,
433 po,
434 (downloadRequest) =>
435 {
436 if (po.CancellationToken != null)
437 {
438 po.CancellationToken.ThrowIfCancellationRequested();
439 }
440
441 downloadPartRequestSummary.Remove(downloadRequest.PartNumber.Value);
442 _logger.Info($"Begin download part:{downloadRequest.PartNumber.Value} of: {downloadRequest.FilePath}");
443 //var task = DownloadPart(downloadRequest).ContinueWith<DownloadPartResponse>((x) =>
444 //{
445 // DownloadPartResponse ulPResponse = x.Result;
446 // lock (lockObject)
447 // {
448 // if (ulPResponse != null && ulPResponse.Error == null)
449 // {
450 // _logger.Info($"End download part: {ulPResponse.FullPartName}");
451 // lock (downloadResponses)
452 // {
453 // downloadResponses[ulPResponse.PartNumber.Value] = ulPResponse;
454 // }
455
456 // transferredBytes = ulPResponse.ContentLength;
457 // Task.Factory.StartNew(() =>
458 // {
459 // try
460 // {
461 // request.OnDownloadProgressEvent(this, new Irender.Transport.Events.DownloadProgressArgs()
462 // {
463 // FilePath = ulPResponse.FullName,
464 // PercentDone = (int)(++tranferredItem * 100.0 / totalUploadItem),
465 // RemainPartItem = --remainPartItem,
466 // TransferredBytes = transferredBytes,
467 // TotalBytes = request.ContentLength,
468 // });
469 // }
470 // catch (Exception ex)
471 // {
472 // _logger.Error(ex, ex.Message);
473 // }
474 // });
475 // }
476 // else
477 // {
478 // _logger.Error($"Fail download and need retry part:{downloadRequest.PartNumber} of: {downloadRequest.FilePath}");
479 // downloadPartRequestSummary[downloadRequest.PartNumber.Value] = downloadRequest;
480 // }
481 // }
482 // return ulPResponse;
483 //}, TaskContinuationOptions.AttachedToParent);
484 int retryAttemp = 3;
485 while(retryAttemp > 0)
486 {
487 var downloadPartResponse = DownloadPart(downloadRequest).GetAwaiter().GetResult();
488 if (downloadPartResponse != null && downloadPartResponse.Error == null)
489 {
490 _logger.Info($"End download part: {downloadPartResponse.FullPartName}");
491 lock (downloadResponses)
492 {
493 downloadResponses[downloadPartResponse.PartNumber.Value] = downloadPartResponse;
494 }
495
496 transferredBytes = downloadPartResponse.ContentLength;
497 Task.Factory.StartNew(() =>
498 {
499 try
500 {
501 request.OnDownloadProgressEvent(this, new Irender.Transport.Events.DownloadProgressArgs()
502 {
503 FilePath = downloadPartResponse.FullName,
504 PercentDone = (int)(++tranferredItem * 100.0 / totalUploadItem),
505 RemainPartItem = --remainPartItem,
506 TransferredBytes = transferredBytes,
507 TotalBytes = request.ContentLength,
508 });
509 }
510 catch (Exception ex)
511 {
512 _logger.Error(ex, ex.Message);
513 }
514 });
515 retryAttemp = 0;
516 }
517 else
518 {
519 if (retryAttemp == 1)
520 {
521 _logger.Error($"Fail download and need retry part:{downloadRequest.PartNumber} of: {downloadRequest.FilePath}");
522 downloadPartRequestSummary[downloadRequest.PartNumber.Value] = downloadRequest;
523 }
524 else
525 retryAttemp--;
526 }
527 }
528
529 });
530
531 var partNo1 = downloadResponses.Keys.ToList();
532 var partNo2 = downloadPartRequestSummaryOrg.Keys.ToList();
533 var exceptPart = partNo2.Except(partNo1).ToList();
534 if (exceptPart != null && exceptPart.Count > 0)
535 {
536 _logger.Info($"Missing number of part is: {exceptPart.Count}");
537 foreach (var partNo in exceptPart)
538 {
539 _logger.Info($"Missing part number: {partNo}");
540 downloadPartRequestSummary[partNo] = downloadPartRequestSummaryOrg[partNo];
541 }
542 }
543 }
544 return downloadResponses.Values.ToList();
545 }, TaskCreationOptions.AttachedToParent);
546 return taskDownloadParallel;
547 }
548
549 private Task<DownloadPartResponse> DownloadPart(DownloadPartRequest downloadRequest)
550 {
551 var task = Task.Factory.StartNew<DownloadPartResponse>(() =>
552 {
553 DownloadPartResponse ulPResponse = null;
554 using (var s3Client = CreateS3Client())
555 {
556 try
557 {
558 ulPResponse = s3Client.DownloadPart(downloadRequest);
559 }
560 catch (Exception ex)
561 {
562 _logger.Error("downloadRequest part number: {0} with error:{1} File name: {2}", downloadRequest.PartNumber, ex.Message, downloadRequest.FilePath, ex);
563 }
564 }
565 return ulPResponse;
566 }, TaskCreationOptions.AttachedToParent);
567 return task;
568 }
569
570 private Dictionary<int, DownloadPartRequest> PrepareDownload(DownloadFileRequest request)
571 {
572 string s3BucketName = request.BucketName;
573 string s3KeyName = request.Key;
574 long partSize = request.PartSize;
575 // Download parts.
576 long contentLength = request.ContentLength;
577 //long filePosition = 0;
578 var totalPartNumber = Math.Ceiling((double)contentLength / partSize);
579 var downloadPartRequestDic = new Dictionary<int, DownloadPartRequest>();
580 var start = 0L;
581 var end = partSize - 1;
582 for (int i = 1; i <= totalPartNumber; i++)
583 {
584
585 DownloadPartRequest downloadRequest = new DownloadPartRequest
586 {
587 LocalDir = request.LocalDir,
588 BucketName = s3BucketName,
589 Key = s3KeyName,
590 RemovePrefixKey = request.RemovePrefixKey,
591 PartNumber = i,
592 ByteRange = new ByteRange(start, end),
593 FilePath = request.FilePath,
594 TotalFileSize = contentLength,
595 TotalPartNumber = (long)totalPartNumber
596 };
597
598 _logger.Info($"PartPosition: {i}, length: {end - start}");
599 downloadPartRequestDic[downloadRequest.PartNumber.Value] = downloadRequest;
600
601 start = end + 1;
602 if (partSize + end >= contentLength)
603 {
604 end = contentLength - 1;
605 }
606 else
607 {
608 end += partSize;
609 }
610 }
611
612 // Download a part and add the response to our list.
613 _logger.Info("Total downloadPartRequests count: {0}", downloadPartRequestDic.Count);
614
615 return downloadPartRequestDic;
616 }
617 }
618}