· 5 years ago · Sep 30, 2020, 04:38 PM
1package main
2
3// https://github.com/novalagung/dasarpemrogramangolang-example/blob/master/chapter-D-insert-1mil-csv-record-into-db-in-a-minute/main.go
4
5// docker
6// https://gabrieltanner.org/blog/golang-file-uploading
7
8// Building a CSV and Excel Parser REST API in Golang using Gorilla Mux
9// https://medium.com/containerocean/building-a-csv-and-excel-parser-rest-api-in-golang-using-gorilla-mux-7be62ba83437
10
11import (
12 "bufio"
13 "context"
14 "database/sql"
15 "encoding/csv"
16 "encoding/json"
17 "fmt"
18 "io"
19 "log"
20 "math"
21 "net/http"
22 "os"
23 "strings"
24 "sync"
25 "time"
26
27 _ "github.com/go-sql-driver/mysql"
28 "github.com/gorilla/mux"
29)
30
31var (
32 dbMaxIdleConns = 4
33 dbMaxConns = 100
34 totalWorker = 100
35 // csvFile = "static/majestic_million.csv"
36 dataHeaders = []string{
37 "GlobalRank",
38 "TldRank",
39 "Domain",
40 "TLD",
41 "RefSubNets",
42 "RefIPs",
43 "IDN_Domain",
44 "IDN_TLD",
45 "PrevGlobalRank",
46 "PrevTldRank",
47 "PrevRefSubNets",
48 "PrevRefIPs",
49 }
50)
51
52type Result struct {
53 IsSuccess bool
54 FileName string
55}
56
57// CREATE DATABASE IF NOT EXISTS test;
58// USE test;
59// CREATE TABLE IF NOT EXISTS domain (
60// GlobalRank int,
61// TldRank int,
62// Domain varchar(255),
63// TLD varchar(255),
64// RefSubNets int,
65// RefIPs int,
66// IDN_Domain varchar(255),
67// IDN_TLD varchar(255),
68// PrevGlobalRank int,
69// PrevTldRank int,
70// PrevRefSubNets int,
71// PrevRefIPs int
72// );
73
74func main() {
75 setupRoutes()
76}
77
78type uploaderUseCase struct {
79}
80
81type UploaderUseCase interface {
82 uploadFile(w http.ResponseWriter, r *http.Request) (bool, string)
83}
84
85func NewUploaderUseCase() UploaderUseCase {
86 return &uploaderUseCase{}
87}
88
89// uploaderHandler ...
90type uploaderHandler struct {
91 uploaderUseCase UploaderUseCase
92}
93
94// UploaderHandler ...
95type UploaderHandler interface {
96 uploadFile(w http.ResponseWriter, r *http.Request)
97}
98
99// NewUploaderHandler ...
100func NewUploaderHandler(uploaderUseCase UploaderUseCase) UploaderHandler {
101 return &uploaderHandler{
102 uploaderUseCase: uploaderUseCase,
103 }
104}
105
106func (handler *uploaderHandler) uploadFile(w http.ResponseWriter, r *http.Request) {
107 success, filename := handler.uploaderUseCase.uploadFile(w, r)
108 result := Result{
109 IsSuccess: success,
110 FileName: filename,
111 }
112 data, err := json.Marshal(result)
113 if err != nil {
114 log.Println("ERROR", err)
115 w.WriteHeader(http.StatusInternalServerError)
116 return
117 }
118 w.Write(data)
119}
120
121func openDbConnection() (*sql.DB, error) {
122 host := os.Getenv("DBHOST") //127.0.0.1
123 port := os.Getenv("DBPORT") //30338
124 user := os.Getenv("DBUSER") //vardocker
125 pass := os.Getenv("DBPASSWORD") //tentanglo
126 dbname := os.Getenv("DBNAME") //graphdb
127 dbConnString := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s", user, pass, host, port, dbname)
128
129 db, err := sql.Open("mysql", dbConnString)
130 if err != nil {
131 return nil, err
132 }
133
134 db.SetMaxOpenConns(dbMaxConns)
135 db.SetMaxIdleConns(dbMaxIdleConns)
136
137 return db, nil
138}
139
140func openCsvFile(filePath string) (*csv.Reader, *os.File, error) {
141 f, err := os.Open(filePath)
142 if err != nil {
143 if os.IsNotExist(err) {
144 log.Fatal("file majestic_million.csv tidak ditemukan. silakan download terlebih dahulu di https://blog.majestic.com/development/majestic-million-csv-daily")
145 }
146 return nil, nil, err
147 }
148 reader := csv.NewReader(bufio.NewReader(f))
149 // reader := csv.NewReader(f)
150 return reader, f, nil
151}
152
153func dispatchWorkers(db *sql.DB, jobs <-chan []interface{}, wg *sync.WaitGroup) {
154 for workerIndex := 0; workerIndex <= totalWorker; workerIndex++ {
155 go func(workerIndex int, db *sql.DB, jobs <-chan []interface{}, wg *sync.WaitGroup) {
156 counter := 0
157 for job := range jobs {
158 doTheJob(workerIndex, counter, db, job)
159 defer wg.Done()
160 counter++
161 }
162 }(workerIndex, db, jobs, wg)
163 }
164}
165
166func readCsvFilePerLineThenSendToWorker(csvReader *csv.Reader, jobs chan<- []interface{}, wg *sync.WaitGroup) {
167 isHeader := true
168 for {
169 row, err := csvReader.Read()
170 if err != nil {
171 if err == io.EOF {
172 err = nil
173 }
174 break
175 }
176
177 if isHeader {
178 isHeader = false
179 continue
180 }
181
182 rowOrdered := make([]interface{}, 0)
183 for _, each := range row {
184 rowOrdered = append(rowOrdered, each)
185 }
186
187 wg.Add(1)
188 jobs <- rowOrdered
189 }
190 close(jobs)
191}
192
193func doTheJob(workerIndex, counter int, db *sql.DB, values []interface{}) {
194 for {
195 var outerError error
196 func(outerError *error) {
197 defer func() {
198 if err := recover(); err != nil {
199 *outerError = fmt.Errorf("%v", err)
200 }
201 }()
202
203 conn, err := db.Conn(context.Background())
204 query := fmt.Sprintf("insert into domain (%s) values (%s)",
205 strings.Join(dataHeaders, ","),
206 strings.Join(generateQuestionsMark(len(dataHeaders)), ","),
207 )
208
209 _, err = conn.ExecContext(context.Background(), query, values...)
210 if err != nil {
211 log.Fatal(err.Error())
212 }
213
214 err = conn.Close()
215 if err != nil {
216 log.Fatal(err.Error())
217 }
218 }(&outerError)
219 if outerError == nil {
220 break
221 }
222 }
223
224 if counter%100 == 0 {
225 log.Println("=> worker", workerIndex, "inserted", counter, "data")
226 }
227}
228
229func generateQuestionsMark(n int) []string {
230 s := make([]string, 0)
231 for i := 0; i < n; i++ {
232 s = append(s, "?")
233 }
234 return s
235}
236
237func (uploaderUseCase *uploaderUseCase) uploadFile(w http.ResponseWriter, r *http.Request) (bool, string) {
238 // var buf bytes.Buffer
239 start := time.Now()
240
241 db, err := openDbConnection()
242 if err != nil {
243 log.Fatal(err.Error())
244 }
245
246 file, handler, err := r.FormFile("file")
247 fileArray := strings.Split(handler.Filename, ".")
248 fileName := fileArray[0] + "-" + time.Now().Format("20060102150405") + "." + fileArray[1]
249
250 if err != nil {
251 return false, fileName
252 }
253 defer file.Close()
254
255 // fileCSV := "static/" + fileName
256 // csvReader, csvFile, err := openCsvFile(fileCSV)
257 // if err != nil {
258 // log.Fatal(err.Error())
259 // }
260 // defer csvFile.Close()
261
262 f, err := os.OpenFile("./static/"+fileName, os.O_WRONLY|os.O_CREATE, 0666)
263 if err != nil {
264 return false, fileName
265 }
266 defer f.Close()
267
268 _, err = io.Copy(f, file)
269 if err != nil {
270 log.Println("File " + fileName + " Fail uploaded")
271 return false, fileName
272 }
273
274 blobPath := "./static/" + fileName
275 csvReader, csvFile, err := openCsvFile(blobPath)
276 // parsedJson, _ := json.Marshal(csvReader)
277 // fmt.Println(string(parsedJson))
278 defer csvFile.Close()
279
280 jobs := make(chan []interface{}, 0)
281 wg := new(sync.WaitGroup)
282
283 go dispatchWorkers(db, jobs, wg)
284 readCsvFilePerLineThenSendToWorker(csvReader, jobs, wg)
285
286 wg.Wait()
287
288 duration := time.Since(start)
289 fmt.Println("done in", int(math.Ceil(duration.Seconds())), "seconds")
290
291 log.Println("File " + fileName + " Uploaded successfully")
292
293 return true, fileName
294}
295
296// func home(w http.ResponseWriter, r *http.Request) {
297// var data = map[string]string{
298// "username": "muharik",
299// "message": "Welcome to the Go !",
300// }
301// var t, err = template.ParseFiles("html/view.html")
302// if err != nil {
303// fmt.Println(err.Error())
304// return
305// }
306// t.Execute(w, data)
307// return
308// }
309
310func setupRoutes() {
311 r := mux.NewRouter()
312 r.Use(CORSMiddleware)
313 api := r.PathPrefix("/").Subrouter()
314
315 uploaderUseCase := NewUploaderUseCase()
316 uploaderHandler := NewUploaderHandler(uploaderUseCase)
317
318 // http.HandleFunc("/", home)
319 api.HandleFunc("/upload", uploaderHandler.uploadFile).Methods("POST")
320 // http.HandleFunc("/upload", uploadFile)
321
322 port := ":8080"
323 fmt.Println(fmt.Sprintf("running on port %s !\n", port))
324 server := http.Server{
325 ReadTimeout: 30 * time.Minute,
326 WriteTimeout: 30 * time.Minute,
327 Handler: r,
328 Addr: fmt.Sprintf("%s", port),
329 }
330
331 log.Fatal(server.ListenAndServe())
332
333 // http.ListenAndServe(":8080", nil)
334}
335
336// CORSMiddleware ...
337func CORSMiddleware(next http.Handler) http.Handler {
338 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
339 start := time.Now()
340
341 //Enable CORS ...
342 w.Header().Set("Access-Control-Allow-Origin", "*")
343 w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
344 w.Header().Set("Content-Type", "text/csv")
345 w.Header().Set("Content-Disposition", "attachment;filename=*.csv")
346 w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PATCH, PUT, DELETE, OPTIONS")
347
348 // Call the next handler, which can be another middleware in the chain, or the final handler.
349 next.ServeHTTP(w, r)
350
351 end := time.Now()
352 executionTime := end.Sub(start)
353
354 log.Println(
355 r.RemoteAddr,
356 r.Method,
357 r.URL,
358 r.Header.Get("user-agent"),
359 executionTime.Seconds()*1000,
360 )
361 })
362}
363