· 6 years ago · Jul 04, 2019, 01:28 PM
1import com.datastax.oss.driver.api.core.CqlSession;
2import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
3import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
4import com.datastax.oss.driver.api.core.cql.BatchStatement;
5import com.datastax.oss.driver.api.core.cql.BatchStatementBuilder;
6import com.datastax.oss.driver.api.core.cql.DefaultBatchType;
7import com.datastax.oss.driver.api.core.cql.PreparedStatement;
8import com.datastax.oss.driver.api.core.cql.ResultSet;
9import com.datastax.oss.driver.api.core.cql.SimpleStatement;
10import com.datastax.oss.driver.shaded.guava.common.base.Strings;
11import java.io.IOException;
12import java.net.InetSocketAddress;
13import java.util.concurrent.TimeUnit;
14import java.util.stream.Stream;
15import java.util.stream.StreamSupport;
16import org.openjdk.jmh.annotations.Benchmark;
17import org.openjdk.jmh.annotations.BenchmarkMode;
18import org.openjdk.jmh.annotations.Fork;
19import org.openjdk.jmh.annotations.Measurement;
20import org.openjdk.jmh.annotations.Mode;
21import org.openjdk.jmh.annotations.OutputTimeUnit;
22import org.openjdk.jmh.annotations.Param;
23import org.openjdk.jmh.annotations.Scope;
24import org.openjdk.jmh.annotations.Setup;
25import org.openjdk.jmh.annotations.State;
26import org.openjdk.jmh.annotations.Warmup;
27import org.openjdk.jmh.infra.Blackhole;
28
29public class PagingIterableSpliteratorBenchmark {
30
31 @State(Scope.Benchmark)
32 public static class PagingIterableState {
33
34 @Param({"32", "128", "256", "512", "1024"})
35 public int batchSize;
36
37 private CqlSession session;
38
39 @Setup
40 public void setUp() throws IOException {
41 session =
42 CqlSession.builder()
43 .addContactPoint(InetSocketAddress.createUnresolved("127.0.1.1", 9042))
44 .withLocalDatacenter("Cassandra")
45 .withKeyspace("test")
46 .build();
47 DriverExecutionProfile slowProfile =
48 session
49 .getContext()
50 .getConfig()
51 .getDefaultProfile()
52 .withString(DefaultDriverOption.REQUEST_TIMEOUT, "30s");
53 session.execute(
54 SimpleStatement.builder(
55 "CREATE TABLE IF NOT EXISTS test (k0 int, k1 int, v text, PRIMARY KEY(k0, k1))")
56 .setExecutionProfile(slowProfile)
57 .build());
58 PreparedStatement prepared = session.prepare("INSERT INTO test (k0, k1, v) VALUES (?, ?, ?)");
59 for (int i = 0; i < 20_000; i += 100) {
60 BatchStatementBuilder batch = BatchStatement.builder(DefaultBatchType.UNLOGGED);
61 for (int j = 0; j < 100; j++) {
62 int n = i + j;
63 String text = String.valueOf(System.nanoTime());
64 batch.addStatement(prepared.bind(0, n, Strings.repeat(text, 15)));
65 }
66 session.execute(batch.setExecutionProfile(slowProfile).build());
67 }
68 }
69
70 Stream<String> pagingIterable(int batchSize) {
71 ResultSet rs = session.execute("SELECT v FROM test WHERE k0 = 0");
72 return StreamSupport.stream(
73 PagingIterableSpliterator.builder(rs).withChunkSize(batchSize).build(), true)
74 .map(row -> row.getString(0));
75 }
76 }
77
78 @Benchmark
79 @BenchmarkMode(Mode.AverageTime)
80 @OutputTimeUnit(TimeUnit.MILLISECONDS)
81 @Measurement(iterations = 3)
82 @Warmup(iterations = 1)
83 @Fork(1)
84 public void benchmarkPagingIterable(PagingIterableState state, Blackhole sink)
85 throws IOException {
86 try (Stream<String> lines = state.pagingIterable(state.batchSize)) {
87 sink.consume(
88 lines
89 .parallel()
90 .mapToLong(
91 line -> {
92 double d = 0;
93 for (int i = 0; i < line.length(); i++)
94 for (int j = 0; j < line.length(); j++)
95 d += Math.pow(line.charAt(i), line.charAt(j) / 32.0);
96 sink.consume(d);
97 return 1;
98 })
99 .sum());
100 }
101 }
102}