· 7 years ago · Oct 03, 2018, 08:12 PM
1package com.vertigo.user;
2
3import java.util.Collections;
4import java.util.Map;
5import java.util.concurrent.atomic.AtomicReference;
6import com.amazonaws.auth.AWSStaticCredentialsProvider;
7import com.amazonaws.auth.BasicAWSCredentials;
8import com.amazonaws.regions.Regions;
9import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
10import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
11import com.amazonaws.services.dynamodbv2.model.AttributeValue;
12import com.amazonaws.services.dynamodbv2.model.ScanRequest;
13import com.amazonaws.services.dynamodbv2.model.ScanResult;
14import io.reactivex.BackpressureStrategy;
15import io.reactivex.Flowable;
16import lombok.val;
17import org.apache.commons.lang3.tuple.Pair;
18
19public class DynaRun {
20
21 public static void main(String[] args) {
22 val awsCredentials = new BasicAWSCredentials(
23 "access-key",
24 "secret-key"
25 );
26 val client = AmazonDynamoDBClientBuilder
27 .standard()
28 .withCredentials(new AWSStaticCredentialsProvider(awsCredentials))
29 .withRegion(Regions.US_WEST_2)
30 .build();
31
32 val disposable = Flowable.range(0, Integer.MAX_VALUE)
33 .zipWith(flowableScan(client, "load-post", 1000), Pair::of)
34 // .map(ScanResult::getItems)
35 .subscribe(pair ->
36 System.out.println(
37 "page: " + pair.getLeft() +
38 ", items: " + pair.getRight().getCount()
39 )
40 );
41 disposable.dispose(); // not needed in this case, but in general we might want to do it
42
43 // other example: the flow of batches of keys
44 // Flowable<Set<String>> itemsFlow =
45 // flowableScan(client, "staging-post", 1000)
46 // .flatMapIterable(ScanResult::getItems)
47 // .map(Map::keySet);
48
49 }
50
51 public static Flowable<ScanResult> flowableScan(
52 AmazonDynamoDB client, String table, int batchSize
53 ) {
54 val ref = new AtomicReference<Map<String, AttributeValue>>(Collections.emptyMap());
55 return Flowable.create(e -> {
56 if (e.isCancelled()) {
57 return;
58 }
59 try {
60 do {
61 ScanRequest scanRequest = new ScanRequest()
62 .withTableName(table)
63 .withAttributesToGet("postid", "body")
64 .withLimit(batchSize);
65 if (!ref.get().isEmpty()) {
66 scanRequest = scanRequest.withExclusiveStartKey(ref.get());
67 }
68 val scanResult = client.scan(scanRequest);
69 // the last page returns getLastEvaluatedKey == null
70 ref.set(scanResult.getLastEvaluatedKey());
71
72 e.onNext(scanResult);
73 } while (ref.get() != null && !ref.get().isEmpty());
74 } catch (Exception ex) {
75 e.onError(ex);
76 }
77 }, BackpressureStrategy.BUFFER);
78 }
79
80}
81
82// pom file
83 <dependency>
84 <groupId>com.amazonaws</groupId>
85 <artifactId>aws-java-sdk-core</artifactId>
86 <version>1.11.421</version>
87 </dependency>
88 <dependency>
89 <groupId>com.amazonaws</groupId>
90 <artifactId>aws-java-sdk-dynamodb</artifactId>
91 <version>1.11.421</version>
92 </dependency>
93 <dependency>
94 <groupId>io.reactivex.rxjava2</groupId>
95 <artifactId>rxjava</artifactId>
96 <version>2.2.2</version>
97 </dependency>