· 4 years ago · Aug 08, 2021, 02:44 AM
1package main
2
3import (
4 "fmt"
5 "log"
6 "strconv"
7 "strings"
8 "sync"
9 "time"
10
11 "github.com/yugabyte/gocql"
12)
13
14func main() {
15 cluster := gocql.NewCluster("127.0.0.1")
16 cluster.Timeout = 12 * time.Second
17
18 session, err := cluster.CreateSession()
19 if err != nil {
20 log.Fatal(err)
21 }
22 defer session.Close()
23
24 keyspaceStmt := `CREATE KEYSPACE IF NOT EXISTS benchmark
25 WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }
26 AND DURABLE_WRITES = true`
27 if err := session.Query(keyspaceStmt).Exec(); err != nil {
28 log.Fatal("failed to create keyspace: ", err)
29 }
30 fmt.Println("Created keyspace benchmark")
31
32 createStmt := `CREATE TABLE benchmark.objects (
33 database text,
34 bucket text,
35 name text,
36 data text,
37 PRIMARY KEY((database, bucket), name))
38 WITH default_time_to_live = 0`
39 if err := session.Query(createStmt).Exec(); err != nil {
40 log.Fatal(err)
41 }
42 defer session.Query("DROP table benchmark.objects").Exec()
43 fmt.Println("Created table benchmark.objects")
44
45 fmt.Println("Inserting data to benchmark.objects")
46 const (
47 N = 20000
48 Workers = 100
49 DataSize = 1024
50 )
51
52 var insertStmt string = `INSERT into benchmark.objects (database, bucket, name, data) VALUES (?, ?, ?, ?);`
53 db := "my_db"
54 bucket := "my_bucket_1"
55 name := "name"
56 data := strings.Repeat("X", DataSize)
57
58 queue := make(chan int)
59 var wg sync.WaitGroup
60
61 // spin up workers to insert concurrently
62 for i := 0; i < Workers; i++ {
63 wg.Add(1)
64 go func() {
65 writeQuery := session.Query(insertStmt)
66 for j := range queue {
67 writeQuery.Bind(db, bucket, name+strconv.Itoa(j), data)
68 if err := writeQuery.Exec(); err != nil {
69 log.Fatal(err)
70 }
71 }
72 writeQuery.Release()
73 wg.Done()
74 }()
75 }
76
77 // feed N items to concurrent workers for insert
78 start := time.Now()
79 for i := 0; i < N; i++ {
80 if i%2000 == 0 {
81 fmt.Printf("Progress: %d / %d\n", i, N)
82 }
83 queue <- i
84 }
85 close(queue)
86 wg.Wait()
87 end := time.Since(start)
88 fmt.Printf("Inserted %d in %s, %0.1f/s\n", N, end, N/end.Seconds())
89
90 // Read back N rows
91 fmt.Println("Reading data from benchmark.objects")
92 count := 0
93 start = time.Now()
94 iter := session.Query(`SELECT database, bucket, name, data FROM benchmark.objects`).Iter()
95 for iter.Scan(&db, &bucket, &name, &data) {
96 if len(data) != DataSize {
97 log.Fatalf("expected read data size %d, got %d", DataSize, len(data))
98 }
99 count++
100 }
101 if err := iter.Close(); err != nil {
102 log.Fatal(err)
103 }
104
105 end = time.Since(start)
106 fmt.Printf("Read %d in %s\n", count, end)
107}
108