· 6 years ago · Jun 19, 2019, 05:00 AM
1for {
2 currentGoroutinesChannel <- 1
3 path, morePaths := <-filePathChannel
4 if !morePaths {
5 log.Println("No more files to process")
6 close(lineChannel)
7 break
8
9 }
10 fmt.Println("processing file: ", path)
11 go readFile(path, compiledRegex, lineChannel, currentGoroutinesChannel)
12 fmt.Println("Number of Goroutines: ", runtime.NumGoroutine())
13 }
14
15package main
16
17import (
18 "database/sql"
19 "flag"
20 "fmt"
21 "github.com/lib/pq"
22 _ "github.com/lib/pq"
23 "io/ioutil"
24 "log"
25 "os"
26 "path/filepath"
27 "regexp"
28 "runtime"
29 "strings"
30)
31
32func main() {
33 // Flags
34 dataSource := flag.String("input", "/Users/joshuahemmings/Documents/Dev/Personal/GoTxtToPostgres/testDocuments", "Data to Import [STRING]")
35 delimiters := flag.String("delimiters", ";:|", "delimiters list [STRING]")
36 concurrency := flag.Int("concurrency", 2, "Concurrency (amount of GoRoutines) [INT]")
37 copySize := flag.Int("copySize", 5000, "How many rows get imported per execution [INT]")
38 flag.Parse()
39
40 compiledRegex := regexp.MustCompile("^(.?)[" + *delimiters + "](.)$")
41
42 lineChannel := make(chan string, 1000)
43 filePathChannel := make(chan string, 1000)
44 currentGoroutinesChannel := make(chan int, *concurrency)
45 stopToolChannel := make(chan bool, 1)
46
47 connStr := "user=todo dbname=golang password=123 sslmode=disable"
48 db, err := sql.Open("postgres", connStr)
49 if err != nil {
50 log.Fatal(err)
51 }
52
53 _ = filepath.Walk(*dataSource,
54 func(path string, file os.FileInfo, err error) error {
55 if err != nil {
56 log.Fatalf("Error reading %s: %v", path, err)
57 return nil
58 }
59 if file.IsDir() {
60 return nil
61 }
62
63 if filepath.Ext(file.Name()) == ".txt" {
64 log.Printf("reading %s, %vB", path, file.Size())
65 filePathChannel <- path
66 }
67 return nil
68 })
69
70 close(filePathChannel)
71
72 go textToPostgres(&lineChannel, *copySize, *db, &stopToolChannel)
73
74 for {
75 currentGoroutinesChannel <- 1
76 path, morePaths := <-filePathChannel
77 if !morePaths {
78 log.Println("No more files to process")
79 close(lineChannel)
80 break
81
82 }
83 fmt.Println("processing file: ", path)
84 go readFile(path, compiledRegex, lineChannel, currentGoroutinesChannel)
85 fmt.Println("Number of Goroutines: ", runtime.NumGoroutine())
86 }
87
88
89 fmt.Println("Number of Goroutines: ", runtime.NumGoroutine())
90
91 <-stopToolChannel
92 fmt.Println("Stopping tool")
93}
94
95func readFile(path string, delimiters *regexp.Regexp, lineChannel chan string, currentGoroutinesChannel chan int) {
96
97 fileData, err := ioutil.ReadFile(path)
98 if err != nil {
99 log.Fatalf("Cannot read file %s", path)
100 return
101 }
102 fileAsString := string(fileData)
103 fileData = nil
104 lines := strings.Split(fileAsString, "n")
105 fileAsString = ""
106
107 for _, line := range lines {
108 line = strings.TrimSpace(line)
109 if line != "" {
110 lineChannel <- delimiters.ReplaceAllString(line, "${1}:$2")
111 }
112 }
113 log.Printf("Done reading %s", path)
114 <-currentGoroutinesChannel
115
116}
117
118func textToPostgres(lineChannel *chan string, copySize int, db sql.DB, stopToolChannel *chan bool) {
119
120 const query = `
121CREATE TABLE IF NOT EXISTS table (
122 col1 varchar(300),
123 col2 varchar(300)
124)`
125
126 _, err := db.Exec(query)
127 if err != nil {
128 log.Fatal("Failed to create table if exists")
129 }
130
131 lineCount := 0
132
133 txn, err := db.Begin()
134 if err != nil {
135 log.Fatal(err)
136 }
137
138 stmt, err := txn.Prepare(pq.CopyIn("table", "col1", "col2"))
139 if err != nil {
140 log.Fatal(err)
141 }
142
143 for {
144 line, more := <-*lineChannel
145 fmt.Println("Processing LINE: " + line)
146
147 copySize++;
148 splitLine := strings.SplitN(line, ":", 2)
149
150 if len(splitLine) == 2 {
151 if lineCount%copySize == 0 {
152 fmt.Println("Commiting: " + splitLine[0] + " Password: " + splitLine[1])
153 _, err = stmt.Exec(splitLine[0], splitLine[1])
154 if err != nil {
155 fmt.Println("Error on split Line")
156 log.Fatal(err)
157 }
158 }
159 }
160
161 if !more {
162 fmt.Println("NO MORE LINES")
163
164 _, err = stmt.Exec()
165 if err != nil {
166 log.Fatal(err)
167 }
168
169 err = stmt.Close()
170 if err != nil {
171 log.Fatal(err)
172 }
173
174 err = txn.Commit()
175 if err != nil {
176 log.Fatal(err)
177 }
178
179 break
180 }
181 }
182 *stopToolChannel <- true
183}