· 5 years ago · Sep 29, 2020, 08:08 AM
1package main
2
3// https://github.com/novalagung/dasarpemrogramangolang-example/blob/master/chapter-D-insert-1mil-csv-record-into-db-in-a-minute/main.go
4// docker
5// https://gabrieltanner.org/blog/golang-file-uploading
6
7import (
8 "context"
9 "database/sql"
10 "encoding/csv"
11 "encoding/json"
12 "fmt"
13 "io"
14 "log"
15 "math"
16 "net/http"
17 "os"
18 "strings"
19 "sync"
20 "time"
21
22 _ "github.com/go-sql-driver/mysql"
23 "github.com/gorilla/mux"
24)
25
26var (
27 dbMaxIdleConns = 4
28 dbMaxConns = 100
29 totalWorker = 100
30 // csvFile = "static/majestic_million.csv"
31 dataHeaders = []string{
32 "GlobalRank",
33 "TldRank",
34 "Domain",
35 "TLD",
36 "RefSubNets",
37 "RefIPs",
38 "IDN_Domain",
39 "IDN_TLD",
40 "PrevGlobalRank",
41 "PrevTldRank",
42 "PrevRefSubNets",
43 "PrevRefIPs",
44 }
45)
46
47type Result struct {
48 IsSuccess bool
49 FileName string
50}
51
52// CREATE DATABASE IF NOT EXISTS test;
53// USE test;
54// CREATE TABLE IF NOT EXISTS domain (
55// GlobalRank int,
56// TldRank int,
57// Domain varchar(255),
58// TLD varchar(255),
59// RefSubNets int,
60// RefIPs int,
61// IDN_Domain varchar(255),
62// IDN_TLD varchar(255),
63// PrevGlobalRank int,
64// PrevTldRank int,
65// PrevRefSubNets int,
66// PrevRefIPs int
67// );
68
69func main() {
70 setupRoutes()
71}
72
73type uploaderUseCase struct {
74}
75
76type UploaderUseCase interface {
77 uploadFile(w http.ResponseWriter, r *http.Request) (bool, string)
78}
79
80func NewUploaderUseCase() UploaderUseCase {
81 return &uploaderUseCase{}
82}
83
84// uploaderHandler ...
85type uploaderHandler struct {
86 uploaderUseCase UploaderUseCase
87}
88
89// UploaderHandler ...
90type UploaderHandler interface {
91 uploadFile(w http.ResponseWriter, r *http.Request)
92}
93
94// NewUploaderHandler ...
95func NewUploaderHandler(uploaderUseCase UploaderUseCase) UploaderHandler {
96 return &uploaderHandler{
97 uploaderUseCase: uploaderUseCase,
98 }
99}
100
101func (handler *uploaderHandler) uploadFile(w http.ResponseWriter, r *http.Request) {
102 success, filename := handler.uploaderUseCase.uploadFile(w, r)
103 result := Result{
104 IsSuccess: success,
105 FileName: filename,
106 }
107 data, err := json.Marshal(result)
108 if err != nil {
109 log.Println("ERROR", err)
110 w.WriteHeader(http.StatusInternalServerError)
111 return
112 }
113 w.Write(data)
114}
115
116func openDbConnection() (*sql.DB, error) {
117 host := os.Getenv("DBHOST") //127.0.0.1
118 port := os.Getenv("DBPORT") //30338
119 user := os.Getenv("DBUSER") //vardocker
120 pass := os.Getenv("DBPASSWORD") //tentanglo
121 dbname := os.Getenv("DBNAME") //graphdb
122 dbConnString := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s", user, pass, host, port, dbname)
123
124 db, err := sql.Open("mysql", dbConnString)
125 if err != nil {
126 return nil, err
127 }
128
129 db.SetMaxOpenConns(dbMaxConns)
130 db.SetMaxIdleConns(dbMaxIdleConns)
131
132 return db, nil
133}
134
135func openCsvFile(file string) (*csv.Reader, *os.File, error) {
136 log.Println("=> open csv file")
137 f, err := os.OpenFile(file, os.O_WRONLY|os.O_CREATE, 0666)
138 if err != nil {
139 if os.IsNotExist(err) {
140 log.Fatal("file majestic_million.csv tidak ditemukan. silakan download terlebih dahulu di https://blog.majestic.com/development/majestic-million-csv-daily")
141 }
142 return nil, nil, err
143 }
144 reader := csv.NewReader(f)
145 return reader, f, nil
146}
147
148func dispatchWorkers(db *sql.DB, jobs <-chan []interface{}, wg *sync.WaitGroup) {
149 for workerIndex := 0; workerIndex <= totalWorker; workerIndex++ {
150 go func(workerIndex int, db *sql.DB, jobs <-chan []interface{}, wg *sync.WaitGroup) {
151 counter := 0
152 for job := range jobs {
153 log.Println("ok")
154 doTheJob(workerIndex, counter, db, job)
155 wg.Done()
156 counter++
157 }
158 }(workerIndex, db, jobs, wg)
159 }
160}
161
162func readCsvFilePerLineThenSendToWorker(csvReader *csv.Reader, jobs chan<- []interface{}, wg *sync.WaitGroup) {
163 isHeader := true
164 for {
165 row, err := csvReader.Read()
166 if err != nil {
167 if err == io.EOF {
168 err = nil
169 }
170 break
171 }
172
173 if isHeader {
174 isHeader = false
175 continue
176 }
177
178 rowOrdered := make([]interface{}, 0)
179 for _, each := range row {
180 rowOrdered = append(rowOrdered, each)
181 }
182
183 wg.Add(1)
184 jobs <- rowOrdered
185 }
186 close(jobs)
187}
188
189func doTheJob(workerIndex, counter int, db *sql.DB, values []interface{}) {
190 for {
191 var outerError error
192 func(outerError *error) {
193 defer func() {
194 if err := recover(); err != nil {
195 *outerError = fmt.Errorf("%v", err)
196 }
197 }()
198
199 conn, err := db.Conn(context.Background())
200 query := fmt.Sprintf("insert into domain (%s) values (%s)",
201 strings.Join(dataHeaders, ","),
202 strings.Join(generateQuestionsMark(len(dataHeaders)), ","),
203 )
204
205 _, err = conn.ExecContext(context.Background(), query, values...)
206 if err != nil {
207 log.Fatal(err.Error())
208 }
209
210 err = conn.Close()
211 if err != nil {
212 log.Fatal(err.Error())
213 }
214 }(&outerError)
215 if outerError == nil {
216 break
217 }
218 }
219
220 if counter%100 == 0 {
221 log.Println("=> worker", workerIndex, "inserted", counter, "data")
222 }
223}
224
225func generateQuestionsMark(n int) []string {
226 s := make([]string, 0)
227 for i := 0; i < n; i++ {
228 s = append(s, "?")
229 }
230 return s
231}
232
233func (uploaderUseCase *uploaderUseCase) uploadFile(w http.ResponseWriter, r *http.Request) (bool, string) {
234 start := time.Now()
235
236 db, err := openDbConnection()
237 if err != nil {
238 log.Fatal(err.Error())
239 }
240
241 file, handler, err := r.FormFile("file")
242 fileArray := strings.Split(handler.Filename, ".")
243 fileName := fileArray[0] + "-" + time.Now().Format("20060102150405") + "." + fileArray[1]
244
245 if err != nil {
246 return false, fileName
247 }
248 defer file.Close()
249
250 fileCSV := "static/" + fileName
251 csvReader, csvFile, err := openCsvFile(fileCSV)
252 if err != nil {
253 log.Fatal(err.Error())
254 }
255 defer csvFile.Close()
256
257 _, err = io.Copy(csvFile, file)
258 if err != nil {
259 log.Println("File " + fileName + " Fail uploaded")
260 return false, fileName
261 }
262
263 jobs := make(chan []interface{}, 0)
264 wg := new(sync.WaitGroup)
265
266 go dispatchWorkers(db, jobs, wg)
267 readCsvFilePerLineThenSendToWorker(csvReader, jobs, wg)
268
269 wg.Wait()
270
271 duration := time.Since(start)
272 fmt.Println("done in", int(math.Ceil(duration.Seconds())), "seconds")
273
274 return true, fileName
275}
276
277// func home(w http.ResponseWriter, r *http.Request) {
278// var data = map[string]string{
279// "username": "muharik",
280// "message": "Welcome to the Go !",
281// }
282// var t, err = template.ParseFiles("html/view.html")
283// if err != nil {
284// fmt.Println(err.Error())
285// return
286// }
287// t.Execute(w, data)
288// return
289// }
290
291func setupRoutes() {
292 r := mux.NewRouter()
293 r.Use(CORSMiddleware)
294 api := r.PathPrefix("/").Subrouter()
295
296 uploaderUseCase := NewUploaderUseCase()
297 uploaderHandler := NewUploaderHandler(uploaderUseCase)
298
299 // http.HandleFunc("/", home)
300 api.HandleFunc("/upload", uploaderHandler.uploadFile).Methods("POST")
301 // http.HandleFunc("/upload", uploadFile)
302
303 port := ":8080"
304 fmt.Println(fmt.Sprintf("running on port %s !\n", port))
305 server := http.Server{
306 ReadTimeout: 30 * time.Minute,
307 WriteTimeout: 30 * time.Minute,
308 Handler: r,
309 Addr: fmt.Sprintf("%s", port),
310 }
311
312 log.Fatal(server.ListenAndServe())
313
314 // http.ListenAndServe(":8080", nil)
315}
316
317// CORSMiddleware ...
318func CORSMiddleware(next http.Handler) http.Handler {
319 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
320 start := time.Now()
321
322 //Enable CORS ...
323 w.Header().Set("Access-Control-Allow-Origin", "*")
324 w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
325 w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PATCH, PUT, DELETE, OPTIONS")
326
327 // Call the next handler, which can be another middleware in the chain, or the final handler.
328 next.ServeHTTP(w, r)
329
330 end := time.Now()
331 executionTime := end.Sub(start)
332
333 log.Println(
334 r.RemoteAddr,
335 r.Method,
336 r.URL,
337 r.Header.Get("user-agent"),
338 executionTime.Seconds()*1000,
339 )
340 })
341}
342