· 5 years ago · Sep 29, 2020, 10:56 AM
1package main
2
3// https://github.com/novalagung/dasarpemrogramangolang-example/blob/master/chapter-D-insert-1mil-csv-record-into-db-in-a-minute/main.go
4
5// docker
6// https://gabrieltanner.org/blog/golang-file-uploading
7
8// Building a CSV and Excel Parser REST API in Golang using Gorilla Mux
9// https://medium.com/containerocean/building-a-csv-and-excel-parser-rest-api-in-golang-using-gorilla-mux-7be62ba83437
10
11import (
12 "bufio"
13 "context"
14 "database/sql"
15 "encoding/csv"
16 "encoding/json"
17 "fmt"
18 "io"
19 "log"
20 "math"
21 "net/http"
22 "os"
23 "strings"
24 "sync"
25 "time"
26
27 _ "github.com/go-sql-driver/mysql"
28 "github.com/gorilla/mux"
29)
30
31var (
32 dbMaxIdleConns = 4
33 dbMaxConns = 100
34 totalWorker = 100
35 // csvFile = "static/majestic_million.csv"
36 dataHeaders = []string{
37 "GlobalRank",
38 "TldRank",
39 "Domain",
40 "TLD",
41 "RefSubNets",
42 "RefIPs",
43 "IDN_Domain",
44 "IDN_TLD",
45 "PrevGlobalRank",
46 "PrevTldRank",
47 "PrevRefSubNets",
48 "PrevRefIPs",
49 }
50)
51
52type Result struct {
53 IsSuccess bool
54 FileName string
55}
56
57// CREATE DATABASE IF NOT EXISTS test;
58// USE test;
59// CREATE TABLE IF NOT EXISTS domain (
60// GlobalRank int,
61// TldRank int,
62// Domain varchar(255),
63// TLD varchar(255),
64// RefSubNets int,
65// RefIPs int,
66// IDN_Domain varchar(255),
67// IDN_TLD varchar(255),
68// PrevGlobalRank int,
69// PrevTldRank int,
70// PrevRefSubNets int,
71// PrevRefIPs int
72// );
73
74func main() {
75 setupRoutes()
76}
77
78type uploaderUseCase struct {
79}
80
81type UploaderUseCase interface {
82 uploadFile(w http.ResponseWriter, r *http.Request) (bool, string)
83}
84
85func NewUploaderUseCase() UploaderUseCase {
86 return &uploaderUseCase{}
87}
88
89// uploaderHandler ...
90type uploaderHandler struct {
91 uploaderUseCase UploaderUseCase
92}
93
94// UploaderHandler ...
95type UploaderHandler interface {
96 uploadFile(w http.ResponseWriter, r *http.Request)
97}
98
99// NewUploaderHandler ...
100func NewUploaderHandler(uploaderUseCase UploaderUseCase) UploaderHandler {
101 return &uploaderHandler{
102 uploaderUseCase: uploaderUseCase,
103 }
104}
105
106func (handler *uploaderHandler) uploadFile(w http.ResponseWriter, r *http.Request) {
107 success, filename := handler.uploaderUseCase.uploadFile(w, r)
108 result := Result{
109 IsSuccess: success,
110 FileName: filename,
111 }
112 data, err := json.Marshal(result)
113 if err != nil {
114 log.Println("ERROR", err)
115 w.WriteHeader(http.StatusInternalServerError)
116 return
117 }
118 w.Write(data)
119}
120
121func openDbConnection() (*sql.DB, error) {
122 host := os.Getenv("DBHOST") //127.0.0.1
123 port := os.Getenv("DBPORT") //30338
124 user := os.Getenv("DBUSER") //vardocker
125 pass := os.Getenv("DBPASSWORD") //tentanglo
126 dbname := os.Getenv("DBNAME") //graphdb
127 dbConnString := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s", user, pass, host, port, dbname)
128
129 db, err := sql.Open("mysql", dbConnString)
130 if err != nil {
131 return nil, err
132 }
133
134 db.SetMaxOpenConns(dbMaxConns)
135 db.SetMaxIdleConns(dbMaxIdleConns)
136
137 return db, nil
138}
139
140func openCsvFile(filePath string) (*csv.Reader, *os.File, error) {
141 // log.Println("=> open csv file")
142 f, err := os.Open(filePath)
143 if err != nil {
144 if os.IsNotExist(err) {
145 log.Fatal("file majestic_million.csv tidak ditemukan. silakan download terlebih dahulu di https://blog.majestic.com/development/majestic-million-csv-daily")
146 }
147 return nil, nil, err
148 }
149 reader := csv.NewReader(bufio.NewReader(f))
150 // reader := csv.NewReader(f)
151 return reader, f, nil
152}
153
154func dispatchWorkers(db *sql.DB, jobs <-chan []interface{}, wg *sync.WaitGroup) {
155 for workerIndex := 0; workerIndex <= totalWorker; workerIndex++ {
156 go func(workerIndex int, db *sql.DB, jobs <-chan []interface{}, wg *sync.WaitGroup) {
157 counter := 0
158 for job := range jobs {
159 doTheJob(workerIndex, counter, db, job)
160 wg.Done()
161 counter++
162 }
163 }(workerIndex, db, jobs, wg)
164 }
165}
166
167func readCsvFilePerLineThenSendToWorker(csvReader *csv.Reader, jobs chan<- []interface{}, wg *sync.WaitGroup) {
168 isHeader := true
169 for {
170 row, err := csvReader.ReadAll()
171 if err != nil {
172 if err == io.EOF {
173 err = nil
174 }
175 break
176 }
177
178 if isHeader {
179 isHeader = false
180 continue
181 }
182
183 rowOrdered := make([]interface{}, 0)
184 for _, each := range row {
185 rowOrdered = append(rowOrdered, each)
186 }
187
188 wg.Add(1)
189 jobs <- rowOrdered
190 }
191 close(jobs)
192}
193
194func doTheJob(workerIndex, counter int, db *sql.DB, values []interface{}) {
195 for {
196 var outerError error
197 func(outerError *error) {
198 defer func() {
199 if err := recover(); err != nil {
200 *outerError = fmt.Errorf("%v", err)
201 }
202 }()
203
204 conn, err := db.Conn(context.Background())
205 query := fmt.Sprintf("insert into domain (%s) values (%s)",
206 strings.Join(dataHeaders, ","),
207 strings.Join(generateQuestionsMark(len(dataHeaders)), ","),
208 )
209
210 _, err = conn.ExecContext(context.Background(), query, values...)
211 if err != nil {
212 log.Println("error => ini ya")
213 log.Fatal(err.Error())
214 }
215
216 err = conn.Close()
217 if err != nil {
218 log.Fatal(err.Error())
219 }
220 }(&outerError)
221 if outerError == nil {
222 break
223 }
224 }
225
226 if counter%100 == 0 {
227 log.Println("=> worker", workerIndex, "inserted", counter, "data")
228 }
229}
230
231func generateQuestionsMark(n int) []string {
232 s := make([]string, 0)
233 for i := 0; i < n; i++ {
234 s = append(s, "?")
235 }
236 return s
237}
238
239func (uploaderUseCase *uploaderUseCase) uploadFile(w http.ResponseWriter, r *http.Request) (bool, string) {
240 start := time.Now()
241
242 db, err := openDbConnection()
243 if err != nil {
244 log.Fatal(err.Error())
245 }
246
247 file, handler, err := r.FormFile("file")
248 fileArray := strings.Split(handler.Filename, ".")
249 fileName := fileArray[0] + "-" + time.Now().Format("20060102150405") + "." + fileArray[1]
250
251 if err != nil {
252 return false, fileName
253 }
254 defer file.Close()
255
256 // fileCSV := "static/" + fileName
257 // csvReader, csvFile, err := openCsvFile(fileCSV)
258 // if err != nil {
259 // log.Fatal(err.Error())
260 // }
261 // defer csvFile.Close()
262
263 f, err := os.OpenFile("./static/"+fileName, os.O_WRONLY|os.O_CREATE, 0666)
264 if err != nil {
265 return false, fileName
266 }
267 defer f.Close()
268
269 _, err = io.Copy(f, file)
270 if err != nil {
271 log.Println("File " + fileName + " Fail uploaded")
272 return false, fileName
273 }
274
275 blobPath := "./static/" + fileName
276 csvReader, csvFile, err := openCsvFile(blobPath)
277 parsedJson, _ := json.Marshal(csvReader)
278 fmt.Println(string(parsedJson))
279 defer csvFile.Close()
280
281 jobs := make(chan []interface{}, 0)
282 wg := new(sync.WaitGroup)
283
284 go dispatchWorkers(db, jobs, wg)
285 readCsvFilePerLineThenSendToWorker(csvReader, jobs, wg)
286
287 wg.Wait()
288
289 duration := time.Since(start)
290 fmt.Println("done in", int(math.Ceil(duration.Seconds())), "seconds")
291
292 return true, fileName
293}
294
295// func home(w http.ResponseWriter, r *http.Request) {
296// var data = map[string]string{
297// "username": "muharik",
298// "message": "Welcome to the Go !",
299// }
300// var t, err = template.ParseFiles("html/view.html")
301// if err != nil {
302// fmt.Println(err.Error())
303// return
304// }
305// t.Execute(w, data)
306// return
307// }
308
309func setupRoutes() {
310 r := mux.NewRouter()
311 r.Use(CORSMiddleware)
312 api := r.PathPrefix("/").Subrouter()
313
314 uploaderUseCase := NewUploaderUseCase()
315 uploaderHandler := NewUploaderHandler(uploaderUseCase)
316
317 // http.HandleFunc("/", home)
318 api.HandleFunc("/upload", uploaderHandler.uploadFile).Methods("POST")
319 // http.HandleFunc("/upload", uploadFile)
320
321 port := ":8080"
322 fmt.Println(fmt.Sprintf("running on port %s !\n", port))
323 server := http.Server{
324 ReadTimeout: 30 * time.Minute,
325 WriteTimeout: 30 * time.Minute,
326 Handler: r,
327 Addr: fmt.Sprintf("%s", port),
328 }
329
330 log.Fatal(server.ListenAndServe())
331
332 // http.ListenAndServe(":8080", nil)
333}
334
335// CORSMiddleware ...
336func CORSMiddleware(next http.Handler) http.Handler {
337 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
338 start := time.Now()
339
340 //Enable CORS ...
341 w.Header().Set("Access-Control-Allow-Origin", "*")
342 w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
343 w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PATCH, PUT, DELETE, OPTIONS")
344
345 // Call the next handler, which can be another middleware in the chain, or the final handler.
346 next.ServeHTTP(w, r)
347
348 end := time.Now()
349 executionTime := end.Sub(start)
350
351 log.Println(
352 r.RemoteAddr,
353 r.Method,
354 r.URL,
355 r.Header.Get("user-agent"),
356 executionTime.Seconds()*1000,
357 )
358 })
359}
360