· 6 years ago · Jun 15, 2019, 11:24 AM
1package main
2
3import (
4 "database/sql"
5 "flag"
6 "fmt"
7 "github.com/gorilla/mux"
8 "github.com/lib/pq"
9 _ "github.com/lib/pq"
10 "io/ioutil"
11 "log"
12 "net/http"
13 "net/http/pprof"
14 "os"
15 "path/filepath"
16 "regexp"
17 "runtime"
18 "strings"
19 "sync"
20 "time"
21 "unicode/utf8"
22)
23
24var (
25 host, user, name, pass string
26)
27
28const (
29 dsn = "host=%s user=%s password=%s dbname=%s sslmode=disable"
30 username = "username"
31 password = "password"
32)
33
34type Store struct {
35 Storer
36 StorerWithTx
37 sync.RWMutex
38
39 db *sql.DB
40 tx *sql.Tx
41}
42
43type StorerWithTx interface {
44 Commit() error
45 Rollback() error
46}
47
48type Storer interface {
49 NewStmt(string) (*sql.Stmt, error)
50}
51
52type creds struct {
53 credType credType
54 user string
55 pass string
56}
57
58type credType string
59
60const (
61 CLEAR credType = "clear"
62 SHA = "sha"
63 MD5 = "md5"
64
65)
66
67func main() {
68 // Flags
69 input := flag.String("input", "/Users/joshuahemmings/Documents/Dev/Personal/GoTxtToPostgres/testDocuments", "Data to Import [STRING]")
70 delimiters := flag.String("delimiters", ";:|", "delimiters list [STRING]")
71 concurrency := flag.Int("concurrency", 10, "Concurrency (amount of GoRoutines) [INT]")
72 copySize := flag.Int("copySize", 10, "How many rows get imported per execution [INT]")
73 dbUser := flag.String("dbUser", "pwned", "define DB username")
74 dbName := flag.String("dbName", "pwned", "define DB name")
75 // dbTable := flag.String("dbTable", "", "define DB table")
76 dbPassword := flag.String("dbPassword", "123", "define DB password")
77 dbHost := flag.String("dbHost", "192.168.178.206", "define DB host")
78 dbConnections := flag.Int("dbConnections", 20, "Amount of DB connections [INT]")
79 flag.Parse()
80
81 host = *dbHost
82 pass = *dbPassword
83 user = *dbUser
84 name = *dbName
85
86 compiledRegex := regexp.MustCompile("^(.*?)[" + *delimiters + "](.*)$")
87
88 md5Regex := regexp.MustCompile("^[a-f0-9]{32}$")
89 sha1Regex := regexp.MustCompile("\b[0-9a-f]{5,40}\b")
90
91 var hashesMap map[string]*regexp.Regexp
92 hashesMap = make(map[string]*regexp.Regexp)
93
94 hashesMap["MD5"] = md5Regex
95 hashesMap["SHA1"] = sha1Regex
96
97 var wgFileReader = sync.WaitGroup{}
98
99 credChannel := make(chan creds, 1000)
100 filePathChannel := make(chan string, *concurrency*4)
101 stopToolChannel := make(chan bool, 1)
102 stopFileWalkChannel := make(chan bool, 1)
103 dbConnectionChannel := make(chan bool, *dbConnections)
104
105 numberOfTxtFiles := 0
106 numberOfProcessedFiles := 0
107
108 // TODO: Remove for stable version
109 go func() {
110 // Create a new router
111 router := mux.NewRouter()
112
113 // Register pprof handlers
114 router.HandleFunc("/debug/pprof/", pprof.Index)
115 router.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
116 router.HandleFunc("/debug/pprof/profile", pprof.Profile)
117 router.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
118
119 router.Handle("/debug/pprof/goroutine", pprof.Handler("goroutine"))
120 router.Handle("/debug/pprof/heap", pprof.Handler("heap"))
121 router.Handle("/debug/pprof/threadcreate", pprof.Handler("threadcreate"))
122 router.Handle("/debug/pprof/block", pprof.Handler("block"))
123 router.Handle("/debug/pprof/allocs", pprof.Handler("allocs"))
124 log.Fatal(http.ListenAndServe(":80", router))
125 }()
126
127 log.Println("Starting Import at", time.Now().Format("02-Jan-2006 15:04:05"))
128 defer timeTrack(time.Now(), "Txt To Postgres")
129
130 _ = filepath.Walk(*input,
131 func(path string, file os.FileInfo, err error) error {
132 if err != nil {
133 log.Fatalf("Error reading %s: %v", path, err)
134 return nil
135 }
136 if file.IsDir() {
137 return nil
138 }
139
140 if filepath.Ext(file.Name()) == ".txt" {
141 numberOfTxtFiles++
142 }
143 return nil
144 })
145
146 go fileWalk(input, filePathChannel, stopFileWalkChannel)
147 go textToPostgres(credChannel, *copySize, dbConnectionChannel)
148
149 for i := 0; i < *concurrency; i++ {
150 wgFileReader.Add(1)
151 go readFile(filePathChannel, compiledRegex, credChannel, numberOfTxtFiles, &numberOfProcessedFiles, &wgFileReader, hashesMap)
152 }
153
154 log.Println("Waiting to close Filepath Channel")
155 <-stopFileWalkChannel
156 log.Println("Closing Filepath Channel")
157 close(filePathChannel)
158
159 log.Println("WAITING")
160 wgFileReader.Wait()
161 log.Println("CLOSING LINE CHANNEL")
162 close(credChannel)
163
164 <-stopToolChannel
165}
166
167func readFile(filePathChannel chan string, delimiters *regexp.Regexp, credChannel chan creds, numberOfTxtFiles int, numberOfProcessedFiles *int, wg *sync.WaitGroup, hashesMap map[string]*regexp.Regexp) {
168 for {
169 path, morePaths := <-filePathChannel
170 if morePaths {
171 fileData, err := ioutil.ReadFile(path)
172 if err != nil {
173 log.Fatalf("Cannot read file %s", path)
174 return
175 }
176 fileAsString := string(fileData)
177 lines := strings.Split(fileAsString, "\n")
178
179 for _, line := range lines {
180 line = strings.TrimSpace(line)
181 if line != "" {
182 strings.Replace(line, "\u0000", "", -1)
183 insert := delimiters.ReplaceAllString(line, "${1}:$2")
184 splitLine := strings.SplitN(insert, ":", 2)
185
186 credentialForChan := creds{}
187
188 if len(splitLine) == 2 && utf8.ValidString(splitLine[0]) && utf8.ValidString(splitLine[1]) {
189
190 username := string(splitLine[0])
191 password := string(splitLine[1])
192
193 if hashesMap["MD5"].Match([]byte(password)) {
194 credentialForChan = creds{credType: "md5", user: username, pass: password}
195 } else if hashesMap["SHA1"].Match([]byte(password)) {
196 credentialForChan = creds{credType: "sha1", user: username, pass: password}
197 } else {
198 credentialForChan = creds{credType: "clear", user: username, pass: password}
199 }
200 }
201 credChannel <- credentialForChan
202 }
203 }
204
205 *numberOfProcessedFiles++
206 log.Printf("Read %v / %v Files", *numberOfProcessedFiles, numberOfTxtFiles)
207 runtime.GC()
208 } else {
209 log.Println("Closing readFile Goroutine")
210 break
211 }
212 }
213 wg.Done()
214}
215
216func fileWalk(dataSource *string, filePathChannel chan string, stopFileWalkChannel chan bool) {
217 _ = filepath.Walk(*dataSource,
218 func(path string, file os.FileInfo, err error) error {
219 if err != nil {
220 log.Fatalf("Error reading %s: %v", path, err)
221 return nil
222 }
223 if file.IsDir() {
224 return nil
225 }
226
227 if filepath.Ext(file.Name()) == ".txt" {
228 // log.Printf("reading %s, %vB", path, file.Size())
229 filePathChannel <- path
230 }
231 return nil
232 })
233
234 log.Println("stop file walk channel")
235 stopFileWalkChannel <- true
236}
237
238func textToPostgres(credChannel chan creds, copySize int, dbConnectionChannel chan bool) {
239
240 server, err := initDB()
241 handleErr(err)
242
243 checkConnectionAndCreateTables(server.db)
244
245 log.Println("Started Text to postgres goroutine")
246 var lineCount int64 = 0
247
248 for {
249 credentialToInsert, more := <-credChannel
250 if !more {
251 log.Printf("Inserted %v lines", lineCount)
252 break
253 }
254 lineCount++
255
256 dbConnectionChannel <- true
257 go server.Exec(credentialToInsert, dbConnectionChannel)
258
259 if lineCount%int64(copySize) == 0 {
260 log.Printf("Inserted %v lines", lineCount)
261
262 }
263 }
264}
265
266func initDB() (*Store, error) {
267 connStr := fmt.Sprintf(dsn, host, user, pass, name)
268 log.Println("Conenction: ", connStr)
269 db, err := sql.Open("postgres", connStr)
270 return &Store{
271 db: db,
272 }, err
273}
274
275type TxFn func(tx StorerWithTx) error
276
277func (t *Store) WithTx(fn TxFn) error {
278 tx, err := t.db.Begin()
279 if err != nil {
280 return err
281 }
282
283 conn := &Store{db: t.db, tx: tx}
284
285 defer func() {
286 if p := recover(); p != nil {
287 // a panic occurred, rollback
288 if err = tx.Rollback(); err != nil {
289 log.Printf("err during recovery Rollback: %v", err)
290 }
291 } else if err != nil {
292 // something went wrong, rollback
293 if err = tx.Rollback(); err != nil {
294 log.Printf("err during Rollback: %v", err)
295 }
296 } else {
297 // all good, commit
298 if err = tx.Commit(); err != nil {
299 log.Printf("err during Commit: %v", err)
300 }
301 }
302 }()
303
304 return fn(conn)
305}
306
307func (s *Store) NewStmtWithTx(c credType) (stmt *sql.Stmt, err error) {
308 if s.tx == nil {
309 s.tx, err = s.db.Begin()
310 if err != nil {
311 return nil, err
312 }
313 }
314
315 return s.tx.Prepare(pq.CopyIn(string(c), username, password))
316}
317
318func (s *Store) NewQueryWithTx(user, pass string, stmt *sql.Stmt) error {
319 return s.WithTx(func (fn StorerWithTx) (err error) {
320 if s.tx == nil {
321 s.tx, err = s.db.Begin()
322 if err != nil {
323 return err
324 }
325 }
326
327 _, err = stmt.Exec(user, pass)
328
329 return err
330 })
331}
332
333
334func (s *Store) Exec(c creds, dbConnectionChannel chan bool) {
335 stmt, err := s.NewStmtWithTx(c.credType)
336 handleErr(err)
337
338 err = s.NewQueryWithTx(c.user, c.pass, stmt)
339 handleErr(err)
340 <- dbConnectionChannel
341}
342
343func checkConnectionAndCreateTables(db *sql.DB) {
344 const queryClear = `CREATE TABLE IF NOT EXISTS clear (username varchar, password varchar)`
345 const queryMD5 = `CREATE TABLE IF NOT EXISTS md5 (username varchar, password varchar)`
346 const querySHA1 = `CREATE TABLE IF NOT EXISTS sha1 (username varchar, password varchar)`
347
348 var version string
349 serverVersion := db.QueryRow("SHOW server_version").Scan(&version)
350 if serverVersion != nil {
351 log.Fatal(serverVersion)
352 }
353 log.Println("Connected to:", version)
354
355 _, errClear := db.Exec(queryClear)
356 if errClear != nil {
357 log.Fatal("Failed to create table if exists")
358 }
359 _, errMD5 := db.Exec(queryMD5)
360 if errMD5 != nil {
361 log.Fatal("Failed to create table if exists")
362 }
363 _, errSHA1 := db.Exec(querySHA1)
364 if errSHA1 != nil {
365 log.Fatal("Failed to create table if exists")
366 }
367}
368
369func handleErr(err error) {
370 defer func (){
371 if r := recover(); r != nil {
372 log.Println(r)
373 }
374 }()
375 if err != nil {
376 log.Println(err)
377 }
378}
379
380func timeTrack(start time.Time, name string) {
381 elapsed := time.Since(start)
382 log.Println("Finished Import at", time.Now().Format("02-Jan-2006 15:04:05"))
383 log.Printf("%s took %s", name, elapsed)
384}