· 3 years ago · Feb 05, 2022, 01:10 PM
1package main
2
3import (
4 "bufio"
5 "context"
6 "database/sql"
7 "fmt"
8 "log"
9 "os"
10 "strconv"
11 "strings"
12 "sync"
13 "time"
14)
15
16//Download and unpack data file, start db and run importer.
17//Play around with numbers to get max performance out of your system.
18//Code is using goroutines quite heavily to get min import time.
19//
20//```bash
21//wget https://datasets.imdbws.com/title.basics.tsv.gz
22//unarchive somewhere (to ./data/)
23//service mysql start
24//
25//# run directly binary
26//bin/imdb-andrejs "root:test@tcp(127.0.0.1:3306)/imdb" .\data\data.tsv 3000 120
27//# or from source code
28//go run .\main.go "root:test@tcp(127.0.0.1:3306)/imdb" .\data\data.tsv 3000 120
29//
30//# where
31//# 3000 = batch size
32//# 120 = writer count (db connections)
33//```
34
35
36import (
37 "bufio"
38 "context"
39 "database/sql"
40 "fmt"
41 _ "github.com/go-sql-driver/mysql"
42 "log"
43 "os"
44 "strconv"
45 "strings"
46 "sync"
47 "time"
48)
49
50type Job struct {
51 from uint32
52 till uint32
53 lines [][]string
54}
55
56type Result struct {
57 job Job
58 success bool
59}
60
61var jobs chan Job
62var results chan Result
63
64const insertQuery = "INSERT INTO titles (`id`, `titleType`, `primaryTitle`, `originalTitle`, `isAdult`, `startYear`, `endYear`, `runtimeMinutes`, `genres`) VALUES "
65const queryVals = "(?, ?, ?, ?, ?, ?, ?, ?, ?)"
66
67var dbConnStr string
68
69func getDb() *sql.DB {
70 db, dbErr := sql.Open("mysql", dbConnStr)
71 if dbErr != nil {
72 log.Fatal(dbErr)
73 return nil
74 }
75
76 return db
77}
78
79func processLines(lines [][]string, ctx context.Context) bool {
80 db := getDb()
81 db.SetMaxOpenConns(1)
82
83 var inserts []string
84 var params []interface{}
85
86 var startYear, endYear, runtimeMinutes int
87 for _, columns := range lines {
88 inserts = append(inserts, queryVals)
89 startYear, _ = strconv.Atoi(columns[5])
90 endYear, _ = strconv.Atoi(columns[6])
91 runtimeMinutes, _ = strconv.Atoi(columns[7])
92
93 params = append(params, columns[0], columns[1], columns[2], columns[3], columns[4], startYear, endYear, runtimeMinutes, columns[8])
94 }
95 queryVals := strings.Join(inserts, ",")
96
97 query := insertQuery + queryVals
98
99 stmt, err := db.PrepareContext(ctx, query)
100 if err != nil {
101 db.Close()
102 log.Printf("#1Error %s", err)
103 log.Fatal(err)
104 }
105
106 res, err := stmt.ExecContext(ctx, params...)
107 if err != nil {
108 db.Close()
109 stmt.Close()
110 log.Printf("#2Error %s", err)
111 return false
112 }
113
114 stmt.Close()
115 db.Close()
116
117 _, errAffected := res.RowsAffected()
118 if errAffected != nil {
119 log.Printf("Error %s when finding rows affected", errAffected)
120 log.Fatal(errAffected)
121
122 return false
123 }
124
125 return true
126}
127
128func worker(wg *sync.WaitGroup) {
129 ctx := context.Background()
130 for job := range jobs {
131 output := Result{job, processLines(job.lines, ctx)}
132 results <- output
133 }
134 wg.Done()
135}
136
137func createWorkerPool(noOfWorkers uint32) {
138 var wg sync.WaitGroup
139 var i uint32
140
141 for i = 0; i < noOfWorkers; i++ {
142 wg.Add(1)
143 go worker(&wg)
144 }
145 wg.Wait()
146 close(results)
147}
148
149func allocate(scanner *bufio.Scanner, batchSize uint32) {
150 var from, k uint32
151 var header = true
152 var stopIndexSize, i uint32
153
154 batchedLines := make([][]string, batchSize)
155 stopIndexSize = batchSize - 1
156
157 for scanner.Scan() {
158 if header {
159 header = false
160 continue
161 }
162 k++
163
164 columns := strings.Split(scanner.Text(), "\t")
165 if columns == nil {
166 continue
167 }
168
169 batchedLines[i] = columns
170
171 if i >= stopIndexSize {
172 copyBatchedLines := make([][]string, batchSize)
173 copy(copyBatchedLines, batchedLines)
174
175 jobs <- Job{from, k, copyBatchedLines}
176 i = 0
177 from = k
178 continue
179 }
180
181 i++
182 }
183
184 z := k % batchSize
185
186 copyBatchedLines := make([][]string, z)
187 copy(copyBatchedLines, batchedLines)
188 jobs <- Job{from, k, copyBatchedLines}
189
190 close(jobs)
191}
192
193func result(done chan bool, startTime *time.Time) {
194 for result := range results {
195 var perSec int
196 elapsed := time.Since(*startTime)
197 d := int(elapsed) / 1000000000
198 if d > 0 {
199 perSec = int(result.job.till) / d
200 }
201 go fmt.Printf("%d - %d = %v (%v /sec)\n", result.job.from, result.job.till, result.success, perSec)
202 }
203 done <- true
204}
205
206func main() {
207 var batchSize uint32 = 2000
208 var noOfWriters uint32 = 120
209 var file = "data/data.tsv"
210 dbConnStr = "root:test@tcp(127.0.0.1:3306)/imdb"
211
212 argsLen := len(os.Args)
213 if argsLen >= 2 {
214 dbConnStr = string(os.Args[1])
215 }
216 if argsLen >= 3 {
217 file = string(os.Args[2])
218 }
219 if argsLen >= 4 {
220 if val, err := strconv.ParseUint(os.Args[3], 0, 0); err == nil {
221 batchSize = uint32(val)
222 }
223 }
224 if argsLen >= 5 {
225 if val, err := strconv.ParseUint(os.Args[4], 0, 0); err == nil {
226 noOfWriters = uint32(val)
227 }
228 }
229
230 var noOfWorkers = noOfWriters + uint32(noOfWriters/10)
231
232 var sleep time.Duration = 3
233 fmt.Printf("Processing with Batch size = %v Writers = %v Workers = %v in %d sec..\n", batchSize, noOfWriters, noOfWorkers, sleep)
234
235 db := getDb()
236 createDBTable(db)
237 db.Close()
238
239 time.Sleep(sleep * time.Second)
240
241 // start calculation after table is created
242 startTime := time.Now()
243
244 jobs = make(chan Job, noOfWorkers)
245 results = make(chan Result, noOfWriters)
246
247 scanner := getScanner(file)
248 go allocate(scanner, batchSize)
249
250 done := make(chan bool)
251 go result(done, &startTime)
252
253 createWorkerPool(noOfWriters)
254 <-done
255
256 elapsed := time.Since(startTime)
257 log.Printf("TIME %s", elapsed)
258}
259
260func getScanner(filename string) *bufio.Scanner {
261 file, err := os.Open(filename)
262 if err != nil {
263 log.Fatal(err)
264 }
265
266 return bufio.NewScanner(file)
267}
268
269func createDBTable(db *sql.DB) {
270 tableCreate := "CREATE TABLE IF NOT EXISTS `imdb`.`titles` (" +
271 " `id` VARCHAR(25) NOT NULL," +
272 " `titleType` VARCHAR(25) NULL," +
273 " `primaryTitle` VARCHAR(511) NULL," +
274 " `originalTitle` VARCHAR(511) NULL," +
275 " `isAdult` INT(1) NULL," +
276 " `startYear` INT(4) NULL," +
277 " `endYear` INT(4) NULL," +
278 " `runtimeMinutes` INT(5) NULL," +
279 " `genres` VARCHAR(255) NULL," +
280 " PRIMARY KEY (`id`)) ENGINE=InnoDB ROW_FORMAT=DYNAMIC;"
281
282 var sqls = []string{
283 "CREATE DATABASE IF NOT EXISTS `imdb`;",
284 "DROP TABLE IF EXISTS `imdb`.`titles`;",
285 tableCreate,
286 }
287
288 execQuery(db, sqls)
289}
290
291func execQuery(db *sql.DB, sqls []string) {
292 db.SetMaxOpenConns(len(sqls))
293
294 for _, sql := range sqls {
295 _, errDrop := db.Query(sql)
296 if errDrop != nil {
297 log.Fatal(errDrop)
298 }
299 }
300}
301