· 5 years ago · Sep 01, 2020, 04:36 AM
1package main
2
3import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "fmt"
8 _ "github.com/lib/pq"
9 "go.mongodb.org/mongo-driver/bson"
10 "go.mongodb.org/mongo-driver/mongo"
11 "go.mongodb.org/mongo-driver/mongo/options"
12 "log"
13 "os/exec"
14 "strconv"
15 "strings"
16 "time"
17)
18
19// Mg Containing database client
20type Mg struct {
21 *mongo.Client
22}
23
24type Pg struct {
25 *sql.DB
26}
27
28type Customer struct {
29 Name string `bson:"name" json:"name"`
30 Age int `bson:"age" json:"age"`
31}
32
33type (
34 // postgresDoc contains json structure of podman inspect output
35 postgresDoc []struct {
36 NetworkSettings struct {
37 Ports struct {
38 TCPPort []struct {
39 HostIP string `json:"HostIp"`
40 HostPort string `json:"HostPort"`
41 } `json:"5432/tcp"`
42 } `json:"Ports"`
43 } `json:"NetworkSettings"`
44 }
45 // mongoDoc contains json structure of podman inspect output
46 mongoDoc []struct {
47 NetworkSettings struct {
48 Ports struct {
49 TCPPort []struct {
50 HostIP string `json:"HostIp"`
51 HostPort string `json:"HostPort"`
52 } `json:"27017/tcp"`
53 } `json:"Ports"`
54 } `json:"NetworkSettings"`
55 }
56)
57
58func (p postgresDoc) GetHostPort() int {
59 port, _ := strconv.Atoi(p[0].NetworkSettings.Ports.TCPPort[0].HostPort)
60 return port
61}
62
63func (p postgresDoc) GetHostIP() string {
64 return p[0].NetworkSettings.Ports.TCPPort[0].HostIP
65}
66
67func (m mongoDoc) GetHostIP() string {
68 return m[0].NetworkSettings.Ports.TCPPort[0].HostIP
69}
70
71func (m mongoDoc) GetHostPort() int {
72 port, _ := strconv.Atoi(m[0].NetworkSettings.Ports.TCPPort[0].HostPort)
73 return port
74}
75
76// CInspector contains set of behaviours
77// to display information within container's (Podman / Docker) network settings
78// for integration testing purposes.
79type CInspector interface {
80 GetHostPort() int
81 GetHostIP() string
82}
83
84// NewContainer creates new Container instance
85// takes image container name as param
86// for instance: "postgres:12-alpine"
87func newContainer(img string) (*Container, error) {
88 return &Container{
89 Engine: "/usr/sbin/podman",
90 ImageName: img,
91 }, nil
92}
93
94// RunCmd runs os binary in a *nix shell
95// takes os binary, and flags
96func runCmd(cmdName string, flags ...string) (string, error) {
97 cmd := exec.Command(cmdName, flags...)
98 out, err := cmd.CombinedOutput()
99 if err != nil {
100 return "", err
101 }
102 lenbuf := len(out)
103 output := string(out[:lenbuf-1])
104 return output, nil
105}
106
107func startContainer(args []string, imgName string) *Container {
108 arguments := []string{"run", "--rm", "-P", "-d"}
109 arguments = append(arguments, args...)
110 c, err := newContainer(imgName)
111 runCmd(c.Engine, "stop", "postgres")
112 runCmd(c.Engine, "stop", "mongo")
113 id, err := runCmd(
114 c.Engine, arguments...,
115 )
116 if err != nil {
117 log.Fatalf("could not start container: %v", err)
118 }
119 c.ID = strings.TrimSpace(id)
120 out, err := runCmd(c.Engine, "inspect", c.ID)
121 if err != nil {
122 log.Printf("Could not inspect container %s: %v", c.ID, err)
123 }
124 if strings.Contains(imgName, "postgres") {
125 var inspector postgresDoc
126 if err := json.Unmarshal([]byte(out), &inspector); err != nil {
127 log.Fatalf("Could not decode json: %v", err)
128 }
129 c.Inspector = inspector
130 } else if strings.Contains(imgName, "mongo") {
131 var inspector mongoDoc
132 if err := json.Unmarshal([]byte(out), &inspector); err != nil {
133 log.Fatalf("Could not decode json: %v", err)
134 }
135 c.Inspector = inspector
136 }
137 c.Host = fmt.Sprintf("127.0.0.1:%d", c.Inspector.GetHostPort())
138 return c
139}
140
141// StartMongoContainer runs a mongo container (podman) to execute commands
142func StartMongoContainer() *Container {
143 mongoImg := "mongo:4"
144 return startContainer([]string{
145 "--name", "mongo",
146 "-e", "MONGO_INITDB_DATABASE=mongodb",
147 "-e", "MONGO_INITDB_ROOT_USERNAME=mongo", "-e",
148 "MONGO_INITDB_ROOT_PASSWORD=mongo", mongoImg,
149 }, mongoImg)
150}
151
152// StartPGContainer runs postgresql container to execute commands
153func StartPostgresContainer() *Container {
154 postgresImg := "postgres:12-alpine"
155 return startContainer(
156 []string{
157 "--name", "postgres",
158 "-e", "POSTGRES_USER=postgres", "-e",
159 "POSTGRES_DB=postgres", "-e", "POSTGRES_PASSWORD=postgres",
160 postgresImg,
161 }, postgresImg)
162}
163
164// Container tracks info about a podman container started for tests.
165type Container struct {
166 Engine string // podman / docker
167 Inspector CInspector
168 ImageName string // postgres?
169 ID string
170 Host string // in the form of container_ip:port
171}
172
173func (c *Container) StopContainer() {
174 if err := exec.Command(c.Engine, "stop", c.ID).Run(); err != nil {
175 log.Fatalf("could not stop container: %v", err)
176 }
177 log.Printf("Stopped: %s", c.ID)
178}
179
180func (c *Container) DumpContainerLogs() {
181 out, err := exec.Command(c.Engine, "logs", c.ID).CombinedOutput()
182 if err != nil {
183 log.Fatalf("could not log container: %v", err)
184 }
185 log.Printf("Logs for %s\n%s:", c.ID, out)
186}
187
188// new: Create new database connection
189func connect(ctx context.Context, dburl string) (*Mg, error) {
190 timeoutCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
191 client, err := mongo.Connect(timeoutCtx, options.Client().ApplyURI(dburl))
192 defer cancel()
193 if err != nil {
194 return nil, err
195 }
196 return &Mg{
197 client,
198 }, nil
199}
200
201func pgconnect(dsn string) (*Pg, error) {
202 client, err := sql.Open("postgres", dsn)
203 if err != nil {
204 return nil, err
205 }
206 return &Pg{ client }, nil
207}
208
209func main() {
210 c := StartMongoContainer()
211 dsn := fmt.Sprintf(`mongodb://mongo:mongo@%s/mongodb?authsource=admin`, c.Host)
212 ctx := context.Background()
213 // grace time
214 time.Sleep(4 * time.Second)
215 mgc, err := connect(ctx, dsn)
216 if err != nil {
217 log.Fatalf("Cannot connect to mongo database: %s\n => %v", dsn, err)
218 }
219 log.Printf("waiting for mongodb to be ready\n")
220 var pingError error
221 for attempts := 1; attempts <= 20; attempts++ {
222 pingError = mgc.Ping(ctx, nil)
223 if pingError == nil {
224 break
225 }
226 time.Sleep(time.Duration(attempts) * 100 * time.Millisecond)
227 }
228 log.Printf("mongodb connected\n")
229 trdown := func() {
230 c.StopContainer()
231 }
232 if pingError != nil {
233 trdown()
234 mgc.Disconnect(ctx)
235 log.Fatalf("Waiting for mongodb database to be ready: %v", pingError)
236 }
237 mdb := mgc.Database("mongo")
238 customerColl := mdb.Collection("customers")
239 newCust := Customer{"Frank", 28}
240 newCust2 := Customer{"Trent", 32}
241 res, err := customerColl.InsertMany(ctx, []interface{}{newCust, newCust2})
242 if err != nil {
243 log.Printf("error inserting many stuff to mongo: %v\n", err)
244 }
245 log.Println(res)
246 curs, err := customerColl.Find(context.TODO(), bson.D{}, options.Find())
247 if err != nil {
248 log.Printf("error blah blah: %v", err)
249 }
250
251 var customers []Customer
252 for curs.Next(context.TODO()) {
253 var cus Customer
254 if err = curs.Decode(&cus); err != nil {
255 log.Printf("error decoding customer: %v\n", err)
256 }
257 customers = append(customers, cus)
258 }
259 log.Printf("Customers: %v", customers)
260 trdown()
261 // inserting to postgresql
262 pgcontainer := StartPostgresContainer()
263 defer pgcontainer.StopContainer()
264 log.Printf("waiting for postgres to be ready\n")
265 time.Sleep(3 * time.Second)
266 dsn = fmt.Sprintf("postgres://postgres:postgres@%s/postgres?sslmode=disable", pgcontainer.Host)
267 pgc, err := pgconnect(dsn)
268 if err != nil {
269 log.Println(err)
270 }
271 defer pgc.Close()
272 var pgPingError error
273 for attempts := 1; attempts <= 20; attempts++ {
274 pgPingError = pgc.Ping()
275 if pgPingError == nil {
276 break
277 }
278 time.Sleep(time.Duration(attempts) * 100 * time.Millisecond)
279 }
280 if pgPingError != nil {
281 log.Println(pingError)
282 }
283 _, err = pgc.ExecContext(context.Background(), "CREATE TABLE IF NOT EXISTS customers( id SERIAL NOT NULL PRIMARY KEY, customer JSONB NOT NULL);")
284 if err != nil {
285 log.Println(err)
286 }
287 tx, err := pgc.Begin()
288 if err != nil {
289 log.Println(err)
290 }
291 defer tx.Rollback()
292 stmt, err := tx.Prepare(`INSERT INTO customers(customer) VALUES($1)`)
293 if err != nil {
294 log.Println(err)
295 }
296 bjson, err := json.Marshal(customers)
297 if err != nil {
298 log.Println(err)
299 }
300 if _, err = stmt.Exec(bjson); err != nil {
301 log.Println(err)
302 }
303 if err = tx.Commit(); err != nil {
304 log.Println(err)
305 }
306 row := pgc.QueryRow(`SELECT customer FROM customers WHERE id = $1`, 1)
307 var byteResult []byte
308 if err = row.Scan(&byteResult); err != nil {
309 log.Println(err)
310 }
311 log.Println(string(byteResult))
312}