· 6 years ago · Jun 23, 2019, 06:50 AM
1package main
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "flag"
8 "fmt"
9 "log"
10 "sync"
11 "time"
12
13 _ "github.com/lib/pq"
14)
15
16var jitter = flag.Int("jitter-ms", 10, "jitter in millis")
17var timeout = flag.Int("timeout-ms", 1000, "query timeout in millis")
18var reps = flag.Int("reps", 100, "number of times the query is made")
19var doTx = flag.Bool("tx", true, "run queries within a transaction")
20var workaround = flag.Bool("workaround", false, "try the workaround")
21
22func main() {
23 flag.Parse()
24
25 query := fmt.Sprintf(`SELECT pg_sleep_for('%d ms') || 'a string to return'`, *timeout)
26 fmt.Println(query)
27
28 var timeouts []time.Duration
29 for i := 0; i < *jitter*1000; i++ {
30 millis := time.Duration(*timeout-i) * time.Millisecond
31 for j := 0; j < 100; j++ {
32 timeouts = append(timeouts, millis-(time.Duration(j)*10*time.Microsecond))
33 }
34 }
35
36 db, openErr := openAndPing("postgres", "postgres:///pq_test?sslmode=disable")
37 if openErr != nil {
38 log.Fatalf("failed to open a connection: %s", openErr)
39 }
40
41 _, createErr := db.Exec(`CREATE TABLE IF NOT EXISTS test (something text)`)
42 if createErr != nil {
43 log.Fatalf("failed to create table: %s", createErr)
44 }
45
46 for _, timeout := range timeouts {
47 wg := sync.WaitGroup{}
48
49 for i := 0; i < *reps; i++ {
50
51 wg.Add(1)
52 go func(rep int, timeout time.Duration) {
53 defer wg.Done()
54
55 ctx, cancel := context.WithTimeout(context.Background(), timeout)
56 defer cancel()
57
58 var tx *sql.Tx
59 var err error
60
61 if *doTx {
62 tx, err = db.BeginTx(ctx, nil)
63
64 if *workaround {
65 err = PQCancelErrWorkaround(ctx, err)
66 }
67
68 if err != nil {
69 log.Printf("timeout=%.08fs rep=%d status=error error=%q", timeout.Seconds(), rep, err)
70 return
71 }
72 }
73
74 var result string
75
76 if *doTx {
77 err = tx.QueryRowContext(ctx, query).Scan(&result)
78 } else {
79 err = db.QueryRowContext(ctx, query).Scan(&result)
80 }
81
82 if *workaround {
83 err = PQCancelErrWorkaround(ctx, err)
84 }
85
86 if err != nil {
87 log.Printf("timeout=%.08fs rep=%d status=error error=%q", timeout.Seconds(), rep, err)
88 } else {
89 log.Printf("timeout=%.08fs rep=%d status=ok", timeout.Seconds(), rep)
90 }
91
92 }(i, timeout)
93 }
94
95 wg.Wait()
96 time.Sleep(1 * time.Second)
97 }
98}
99
100func openAndPing(driver, source string) (*sql.DB, error) {
101 db, err := sql.Open("postgres", "postgres:///pq_test?sslmode=disable&user=postgres")
102 if err != nil {
103 return nil, err
104 }
105
106 if err := db.Ping(); err != nil {
107 return nil, err
108 }
109 return db, nil
110}
111
112func PQCancelErrWorkaround(ctx context.Context, err error) error {
113 if err != sql.ErrNoRows {
114 return err;
115 }
116
117 select {
118 case <-ctx.Done():
119 return errors.New("context deadline exceeded")
120 default:
121 return err
122 }
123}