· 6 years ago · Jul 01, 2019, 03:56 AM
1public DataTable GetS3SelectDetails(string accessKey, string SecretKey, string bucketName, string fileName, Amazon.RegionEndpoint region)
2 {
3 DataTable dt = (DataTable)GetAmazonS3Select(accessKey, SecretKey, bucketName, fileName, region).Result;
4
5 return dt;
6 }
7
8 public async Task<DataTable> GetAmazonS3Select(string accessKey, string SecretKey, string bucketName, string fileName, Amazon.RegionEndpoint region)
9 {
10 DataTable table = new DataTable();
11 using (var eventStream = await GetSelectObjectContentEventStream(accessKey, SecretKey, bucketName, fileName, region))
12 {
13 var recordResults = eventStream
14 .Where(ev => ev is RecordsEvent)
15 .Cast<RecordsEvent>()
16 .Select(records =>
17 {
18 using (var reader = new StreamReader(records.Payload, Encoding.UTF8))
19 {
20 return reader.ReadToEnd();
21 }
22 }).ToArray();
23
24 string[] fileData = recordResults[0].ToString().Split('n');
25
26 for (int i = 0; i < fileData.Length; i++)
27 {
28 string[] rowData = fileData[i].ToString().Split(';');
29
30 if (i == 0)
31 {
32 for (int j = 0; j < rowData.Length; j++)
33 {
34 table.Columns.Add(rowData[j].ToString(), typeof(string));
35 }
36 }
37 else
38 {
39 DataRow row = table.NewRow();
40 for (int j = 0; j < rowData.Length; j++)
41 {
42 row[j] = rowData[j].ToString();
43 }
44 table.Rows.Add(row);
45 }
46 }
47 }
48 return table;
49 }
50
51 public async Task<ISelectObjectContentEventStream> GetSelectObjectContentEventStream(string accessKey, string SecretKey, string bucketName, string fileName, Amazon.RegionEndpoint region)
52 {
53 SelectObjectContentResponse response1 = null;
54 try
55 {
56 IAmazonS3 client1 = new AmazonS3Client(accessKey, SecretKey, region);
57
58 SelectObjectContentRequest request = new SelectObjectContentRequest();
59 request.Bucket = bucketName;
60 request.Key = fileName;
61 request.ExpressionType = Amazon.S3.ExpressionType.SQL;
62 request.Expression = "select * from S3Object";
63
64 request.InputSerialization = new InputSerialization()
65 {
66 CSV = new CSVInput()
67 {
68 FileHeaderInfo = FileHeaderInfo.Use
69 //FieldDelimiter = ";",
70 }
71 };
72
73 request.OutputSerialization = new OutputSerialization()
74 {
75 CSV = new CSVOutput()
76 {
77 //QuoteFields = QuoteFields.Always
78 FieldDelimiter = ";"
79 }
80 };
81
82 response1 = await client1.SelectObjectContentAsync(request);
83 }
84 catch (Exception ex)
85 {
86
87 }
88 return response1.Payload;
89 }