· 4 years ago · Jun 18, 2021, 03:06 PM
1import 'dart:convert';
2import 'dart:io';
3import 'dart:core';
4import 'dart:async';
5import 'dart:math';
6import 'dart:typed_data';
7import 'package:cure/crypto.dart';
8import 'package:pedantic/pedantic.dart';
9import 'package:web_socket_channel/web_socket_channel.dart';
10import 'package:aws_url_signer/aws_url_signer.dart';
11
12// import 'aws-signature-v4.dart';
13
14
15// import 'aws-signature-v4.dart';
16
17
18const AWS_ACCESS_KEY_ID = "AKIAXMZKJ7Q434VVG2XX";
19const AWS_SECRET_ACCESS_KEY = "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";
20
21
22int _read32bitInt(Uint8List bytesList, int start) {
23 final bytes = bytesList.sublist(start, start+4);
24 int r = (bytes[0] << 24) + (bytes[1] << 16) + (bytes[2] << 8) + bytes[3];
25 return r;
26}
27
28
29
30class EventStreamHeader {
31 final String name;
32 final int type;
33 final String value;
34 EventStreamHeader(this.name, this.type, this.value);
35}
36
37
38class EventStreamResponseException implements Exception {
39 final String type;
40 final String message;
41
42 EventStreamResponseException({this.type, this.message});
43
44 String toString() => "EventStreamResponseException: $message";
45}
46
47
48Uint8List encodeEventStreamHeaders(List<EventStreamHeader> headers) {
49 var bytesInt = <int>[];
50
51 for (final header in headers) {
52 bytesInt.add(header.name.length);
53 bytesInt.addAll(header.name.codeUnits);
54 bytesInt.add(header.type);
55 bytesInt.add(header.value.length);
56 bytesInt.addAll(header.value.codeUnits);
57 }
58
59 return Uint8List.fromList(bytesInt);
60}
61
62List<EventStreamHeader> decodeEventStreamHeaders(Uint8List headersData) {
63 var lastEnd = 0;
64 var headers = <EventStreamHeader>[];
65 while (lastEnd < headersData.length) {
66 final nameLen = headersData[0];
67 final name = String.fromCharCodes(headersData.sublist(1, 1 + nameLen));
68 final type = headersData[1 + nameLen];
69 final valueLen = headersData[nameLen + 2] << 8 + headersData[nameLen + 3];
70 final valueStart = nameLen + 4;
71 lastEnd = valueStart + valueLen;
72 final value = String.fromCharCodes(
73 headersData.sublist(valueStart, lastEnd));
74 headers.add(EventStreamHeader(name, type, value));
75 }
76 return headers;
77}
78
79
80Uint8List createEventStreamFrame(Uint8List audioChunk) {
81 final headers = [
82 EventStreamHeader(":content-type", 7, "application/octet-stream"),
83 EventStreamHeader(":event-type", 7, "AudioEvent"),
84 EventStreamHeader(":message-type", 7, "event")
85 ];
86 final headersData = encodeEventStreamHeaders(headers);
87
88 final int totalLength = 16 + audioChunk.lengthInBytes + headersData.lengthInBytes;
89 // final prelude = [headersData.length, totalLength];
90 // print("Prelude: " + prelude.toString());
91
92 // Convert a 32b int to 4 bytes
93 List<int> int32ToBytes(int i) { return [(0xFF000000 & i) >> 24, (0x00FF0000 & i) >> 16, (0x0000FF00 & i) >> 8, (0x000000FF & i)]; }
94
95 final audioBytes = ByteData.sublistView(audioChunk);
96 var offset = 0;
97 var audioDataList = <int>[];
98 while (offset < audioBytes.lengthInBytes) {
99 audioDataList.add(audioBytes.getInt16(offset, Endian.little));
100 offset += 2;
101 }
102
103 final crc = CRC.crc32();
104 final messageBldr = BytesBuilder();
105 messageBldr.add(int32ToBytes(totalLength));
106 messageBldr.add(int32ToBytes(headersData.length));
107
108 // Now we can calc the CRC. We need to do it on the bytes, not the Ints
109 final preludeCrc = crc.calculate(messageBldr.toBytes());
110
111 // Continue adding data
112 messageBldr.add(int32ToBytes(preludeCrc));
113 messageBldr.add(headersData.toList());
114 // messageBldr.add(audioChunk.toList());
115 messageBldr.add(audioDataList);
116 final messageCrc = crc.calculate(messageBldr.toBytes().toList());
117 messageBldr.add(int32ToBytes(messageCrc));
118 final frame = messageBldr.toBytes();
119 //print("${frame.length} == $totalLength");
120 return frame;
121}
122
123
124dynamic decodeEventStreamResponse(Uint8List resp) {
125 final totalLen = _read32bitInt(resp, 0);
126 //print("${totalLen} == ${resp.length}");
127 final headersLen = _read32bitInt(resp, 4);
128 final preludeCRC = _read32bitInt(resp, 8);
129 final messageCrc = _read32bitInt(resp, resp.length-4);
130 final preludeBytes = resp.sublist(0, 8);
131 final messageBytes = resp.sublist(0, totalLen - 4);
132 final payloadStart = 12 + headersLen;
133 final headersBytes = resp.sublist(12, payloadStart);
134 final payloadLen = totalLen - 16 - headersLen;
135 final payloadBytes = resp.sublist(payloadStart, payloadStart + payloadLen);
136
137 // Check the CRCs
138 final crc = CRC.crc32();
139 if (!crc.verify(messageBytes.toList(growable: false), messageCrc)) {
140 throw Exception("Message CRC failed");
141 }
142 if (!crc.verify(preludeBytes.toList(growable: false), preludeCRC)) {
143 throw Exception("Prelude CRC failed");
144 }
145
146 final headers = decodeEventStreamHeaders(headersBytes);
147 final body = String.fromCharCodes(messageBytes);
148 // print(headers);
149 // print(body);
150 final messageType = headers
151 .firstWhere((header) => header.name == ':message-type')
152 .value;
153
154 // EXCEPTION RESPONSE
155 if (messageType == "exception") {
156 final exceptionType = headers
157 .firstWhere((header) => header.name == ':exception-type')
158 .value;
159 // parse body as string
160 throw EventStreamResponseException(type: exceptionType, message: body);
161 }
162
163 if (messageType != 'event') { throw Exception("Unknown message-type: " + messageType); }
164 // Parse body as JSON string
165
166 // DECODE JSON
167 final json = JsonDecoder().convert(body);
168 // print(json);
169 return json;
170}
171
172void _printBytes(Uint8List bytes) {
173 print([
174 for (final x in bytes)
175 if (x >= 32 && x <= 127 ) String.fromCharCode(x)
176 else
177 "<${x.toRadixString(16).toUpperCase()}>"
178 ].join());
179}
180
181
182
183
184void main(List<String> arguments) async {
185 print("ENDIAN: " + (Endian.host == Endian.little ? 'Little' : 'Big'));
186
187 final audioBytes = File("./bin/test5.dat").readAsBytesSync();
188 // final R = Random();
189 // final audioBytes = Uint8List.fromList(List<int>.from([for (var x=0; x<16000*5; x++) R.nextInt(0xFF)]));
190 // final audioData = Uint16List.fromList([for (var x=32; x<128; x++) x ]).buffer.asUint8List();
191 // final audioWavBuff = audioFile.readAsBytesSync();
192 // print(audioWavBuff.getRange(37,40));
193
194 //
195 // audioStream.forEach((element) {
196 // print('SENDING...');
197 // final bytes = base64.encode(element);
198 // //audioStream.sendData(bytes);
199 // });
200
201 final signedUrl = getSignedWebSocketUrl(
202 apiId: 'transcribestreaming',
203 service: 'transcribe',
204 accessKey: AWS_ACCESS_KEY_ID,
205 secretKey: AWS_SECRET_ACCESS_KEY,
206 region: REGION,
207 stage: "stream-transcription-websocket",
208 queryParams: {
209 'language-code': 'en-US',
210 'media-encoding': 'pcm',
211 'sample-rate': '16000',
212 },
213 debug: false
214 );
215
216 final streamUrl = Uri.parse(signedUrl);
217 final channel = WebSocketChannel.connect(streamUrl);
218
219 // channel.stream.asBroadcastStream(onListen: (subscription) {
220 // subscription.onData((data) { print("DATA! " + data.toString());});
221 // subscription.onError((){ print("ERROR");});
222 // }, onCancel: (subscription) {
223 // print("CANCEL");
224 // },);
225 sendNextChunk(int head) async {
226 final readEnd = 16000 * 1 + head;
227 print("SENDING: ${head} to ${readEnd}");
228
229 final dataToSend = audioBytes.sublist(head, readEnd);
230 // print(dataToSend.reduce((value, element) => value + element));
231 final frame = createEventStreamFrame(dataToSend);
232 // _printBytes(frame);
233 channel.sink.add(frame);
234
235 // wait 1 sec and iterate
236 await Future.delayed(Duration(milliseconds: 500));
237 sendNextChunk(readEnd);
238 }
239 // Start sending...
240 sendNextChunk(0);
241
242
243 readNext() async {
244 final responses = await channel.stream.toList();
245 print("Resp len: " + responses.length.toString());
246 if (responses.length == 0) { readNext(); }
247
248 final resp = responses[0];
249 //print(resp);
250 final buffer = Uint8List.fromList(resp);
251 print('RECEVIED!');
252 _printBytes(buffer);
253 // final json = decodeEventStreamResponse(buffer);
254 // print('decoded');
255 // print(json);
256 readNext();
257 }
258
259 readNext();
260
261
262 //
263
264 //:message-typeappexception{"Message":"The request signature we calculated does not match the signature you provided. Check your AWS Secret Access Key and signing method. Consult the service documentation for details.\n\nThe Canonical String for this request should have been\n'GET\n/stream-transcription-websocket\nAccept=application%2Fjson&X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAXMZKJ7Q434VVG2XX%2F20210614%2Feu-central-1%2Ftranscribe%2Faws4_request&X-Amz-Date=20210614T145716Z&X-Amz-Expires=15&X-Amz-SignedHeaders=accept%3Bhost%3Bx-amz-content-sha256%3Bx-amz-date&language-code=en-US&media-encoding=pcm&sample-rate=44100\naccept:\nhost:transcribestreaming.eu-central-1.amazonaws.com:8443\nx-amz-content-sha256:\nx-amz-date:\n\naccept;host;x-amz-content-sha256;x-amz-date\ne3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'\n\nThe String-to-Sign should have been\n'AWS4-HMAC-SHA256\n20210614T145716Z\n20210614/eu-central-1/transcribe/aws4_request\n8d02df66b5964115b115db75576558e2103c2e3de98864c7a2e1e807f0bbbb05'\n"}óß+y
265
266 // channel.sink.add("aaaaaaaaaaaaaa");
267
268
269 //
270 // // let query = "language-code=" + languageCode + "'media-encoding=pcm&sample-rate': '" + sampleRate',
271 // // if (showSpeakerLabel) {
272 // // query += ''show-speaker-label': '' + showSpeakerLabel',
273 // // }
274 // //
275 // // return createPresignedURL(
276 // // "GET",
277 // // endpoint,
278 // // "/stream-transcription-websocket",
279 // // "transcribe",
280 // // crypto.createHash("sha256").update("", "utf8").digest("hex"),
281 // // {
282 // // key: this.accessKeyId,
283 // // secret: this.secretAccessKey,
284 // // sessionToken: this.sessionToken,
285 // // protocol: "wss",
286 // // expires: 15,
287 // // region: region,
288 // // query: query,
289 // // }
290 // // )
291 //
292 //
293 //
294 // final headers = client.signedHeaders(
295 // uri.toString()
296 // );
297 //
298 // headers.addAll({
299 // 'method': 'POST',
300 // 'path': uri.path,
301 // 'scheme': uri.scheme,
302 // 'authority': uri.host,
303 // 'x-amz-content-sha256': 'STREAMING-AWS4-HMAC-SHA256-EVENTS',
304 // "x-amz-target": "com.amazonaws.transcribe.Transcribe.StartStreamTranscription",
305 // "x-amzn-transcribe-language-code": "en-US",
306 // "x-amz-transcribe-sample-rate": "44100",
307 // 'x-amzn-transcribe-media-encoding': 'pcm',
308 // "transfer-encoding": "chunked"
309 // });
310 //
311 // // Convert to header array
312 // var headersProper = <Header>[];
313 // headers.forEach((key, value) {
314 // headersProper.add(Header.ascii(":$key", value));
315 // });
316 //
317 //
318 // headersProper.forEach((e) {
319 // Header h = e;
320 // stdout.writeln("${String.fromCharCodes(h.name)}: ${String.fromCharCodes(h.value)}");
321 // });
322 //
323 // final socket = await connect(uri);
324 // final transport = new ClientTransportConnection.viaSocket(socket);
325 // print(transport.isOpen ? "YESSSS" : "NOOOOO");
326 // var stream = transport.makeRequest(headersProper, endStream: true);
327 // // stream.sendData([1,2,3]);
328 //
329 // stream.incomingMessages.forEach((element) {print(element);});
330
331
332 // try {
333 // await for (var message in stream.incomingMessages) {
334 // if (message is HeadersStreamMessage) {
335 // for (var header in message.headers) {
336 // var name = utf8.decode(header.name);
337 // var value = utf8.decode(header.value);
338 // print('$name: $value');
339 // }
340 // } else if (message is DataStreamMessage) {
341 // // Use [message.bytes] (but respect 'content-encoding' header)
342 // }
343 // }
344 // } on Exception catch(e) {
345 // print(e.toString());
346 // }
347
348 // await transport.finish();
349 // final ClientTransportStream stream = transport.makeRequest(
350 // headersProper, endStream: false);
351 //
352 // stream.incomingMessages.forEach((element) {
353 // print("LISTENING...");
354 // print(element);
355 // });
356
357 // .asBroadcastStream(onListen: (subscription) {
358 // print("LISTENING...");
359 // subscription.onData((data) {
360 // print('RECEIVED...');
361 // print(data);
362 // });
363 // });
364
365 // stream.sendData([1,2,3]);
366
367 // print("STREAMING...");
368 // final audioStream = audioFile.transform(latin1.decoder);
369 //
370 // audioStream.forEach((element) {
371 // print('SENDING...');
372 // final bytes = latin1.encode(element);
373 // stream.sendData(bytes);
374 // });
375}
376
377const REGION = 'eu-central-1';
378
379//
380// await for (var message in stream.incomingMessages) {
381// if (message is HeadersStreamMessage) {
382// for (var header in message.headers) {
383// var name = utf8.decode(header.name);
384// var value = utf8.decode(header.value);
385// print('Header: $name: $value');
386// }
387// } else if (message is DataStreamMessage) {
388// // Use [message.bytes] (but respect 'content-encoding' header)
389// }
390// }
391// await transport.finish();
392
393
394Future<Socket> connect(Uri uri) async {
395 var useSSL = uri.scheme == 'https';
396 if (useSSL) {
397 var secureSocket = await SecureSocket.connect(uri.host, uri.port,
398 supportedProtocols: ['h2']);
399 if (secureSocket.selectedProtocol != 'h2') {
400 throw Exception('Failed to negogiate http/2 via alpn. Maybe server '
401 "doesn't support http/2.");
402 }
403 return secureSocket;
404 } else {
405 return await Socket.connect(uri.host, uri.port);
406 }
407}
408