· 4 years ago · Aug 16, 2021, 05:04 PM
1class EncryptionUploadTask {
2 int i = 0;
3
4 Map<String, String> chunkNonces = {};
5
6 void setState(String s) {
7 progress.add(s);
8 }
9
10 final progress = StreamController<String>.broadcast();
11
12 Stream<List<int>> encryptStreamInBlocks(Stream<List<int>> source,
13 CipherWithAppendedMac cipher, SecretKey secretKey) async* {
14 i = 0;
15 int internalI = 0;
16
17 chunkNonces = {};
18
19 // Wait until a new chunk is available, then process it.
20 await for (var chunk in source) {
21 // print('crypt $internalI');
22 internalI++;
23 while (i < internalI - 3) {
24 await Future.delayed(Duration(milliseconds: 20));
25 }
26
27 final chunkNonce = Nonce.randomBytes(16);
28 chunkNonces[internalI.toString()] = base64.encode(chunkNonce.bytes);
29
30 yield await cipher.encrypt(
31 chunk,
32 secretKey: secretKey,
33 nonce: chunkNonce,
34 );
35 }
36 // print('done');
37 }
38
39 Future<List<String>> uploadChunkedStreamToSkynet(
40 int fileSize, Stream<List<int>> byteUploadStream) async {
41 final totalChunks = (fileSize / (chunkSize + 32)).abs().toInt() + 1;
42
43 // print('Total chunks: $totalChunks');
44
45 final uploaderFileId = Uuid().v4();
46 // print('send $uploaderFileId');
47
48 setState('Encrypting and uploading file... (Chunk 1 of $totalChunks)');
49
50 List<String> skylinks = List.generate(totalChunks, (index) => null);
51
52 _uploadChunk(final Uint8List chunk, final int currentI) async {
53 // print('_uploadChunk $currentI');
54
55 String skylink;
56
57 // print('up $currentI');
58
59 while (skylink == null) {
60 try {
61 skylink = await uploadFileToSkynet(chunk);
62
63 if ((skylink ?? '').isEmpty) throw Exception('oops');
64 } catch (e, st) {
65 print(e);
66 print(st);
67 print('retry');
68 }
69 }
70
71 skylinks[currentI] = skylink;
72 i++;
73
74 setState(
75 'Encrypting and uploading file... ($i/$totalChunks Chunks done)');
76/* setState(
77 'Encrypting and uploading file... $i/$totalChunks Chunks uploaded (16 MB each)'); */
78 }
79
80 int internalI = 0;
81
82 // TODO Parallel chunk uploading to multiple portals
83 await for (final chunk in byteUploadStream) {
84 // print('chunk $internalI');
85
86 _uploadChunk(chunk, internalI);
87
88 while (i < internalI - 2) {
89 await Future.delayed(Duration(milliseconds: 20));
90 }
91 internalI++;
92 }
93
94 // print('done');
95
96 while (true) {
97 await Future.delayed(Duration(milliseconds: 20));
98 bool notASingleNull = true;
99
100 for (final val in skylinks) {
101 if (val == null) {
102 notASingleNull = false;
103 break;
104 }
105 }
106 if (notASingleNull) break;
107 }
108 // print(skylinks);
109 return skylinks;
110 }
111
112 Future<String> uploadFileToSkynet(List<int> chunk) async {
113 var byteStream = new http.ByteStream.fromBytes(chunk);
114
115 var uri = Uri.parse('https://${SkynetConfig.portal}/skynet/skyfile');
116
117 // print(uri);
118
119 var request = new http.MultipartRequest("POST", uri);
120
121 var multipartFile = new http.MultipartFile(
122 'file',
123 byteStream,
124 chunk.length,
125 filename: 'blob',
126 contentType: MediaType('application', 'octet-stream'),
127 );
128
129 request.files.add(multipartFile);
130
131 var response = await request.send();
132
133 if (response.statusCode != 200) {
134 throw Exception('HTTP ${response.statusCode}');
135 }
136
137 final res = await response.stream.transform(utf8.decoder).join();
138
139 final resData = json.decode(res);
140
141 if (resData['skylink'] == null) throw Exception('Skynet Upload Fail');
142
143 return resData['skylink'];
144 }
145}