· 6 years ago · Jun 11, 2019, 05:08 PM
1package main
2
3import (
4 "database/sql"
5 "flag"
6 "fmt"
7 "github.com/gorilla/mux"
8 "github.com/lib/pq"
9 _ "github.com/lib/pq"
10 "io/ioutil"
11 "log"
12 "net/http"
13 "net/http/pprof"
14 "os"
15 "path/filepath"
16 "regexp"
17 "runtime"
18 "strings"
19 "sync"
20 "time"
21 "unicode/utf8"
22)
23
24var (
25 dbHost, dbUser, dbName, dbPassword string
26)
27
28const (
29 dsn = "host=%s user=%s password=%s dbname=%s sslmode=disable"
30 username = "username"
31 password = "password"
32)
33
34type Store struct {
35 Storer
36 StorerWithTx
37 sync.RWMutex
38
39 db *sql.DB
40 tx *sql.Tx
41}
42
43type StorerWithTx interface {
44 Commit() error
45 Rollback() error
46}
47
48type Storer interface {
49 NewStmt(string) (*sql.Stmt, error)
50}
51
52type creds struct {
53 credType credType
54 user string
55 pass string
56}
57
58type credType string
59
60const (
61 CLEAR credType = "clear"
62 SHA = "sha"
63 MD5 = "md5"
64
65)
66
67func main() {
68 // Flags
69 input := flag.String("input", "/Users/joshuahemmings/Documents/Dev/Personal/GoTxtToPostgres/testDocuments", "Data to Import [STRING]")
70 delimiters := flag.String("delimiters", ";:|", "delimiters list [STRING]")
71 concurrency := flag.Int("concurrency", 10, "Concurrency (amount of GoRoutines) [INT]")
72 copySize := flag.Int("copySize", 2, "How many rows get imported per execution [INT]")
73 dbUser := flag.String("dbUser", "pwned", "define DB username")
74 dbName := flag.String("dbName", "pwned", "define DB name")
75 // dbTable := flag.String("dbTable", "", "define DB table")
76 dbPassword := flag.String("dbPassword", "123", "define DB password")
77 dbHost := flag.String("dbHost", "192.168.178.206", "define DB host")
78 flag.Parse()
79
80 compiledRegex := regexp.MustCompile("^(.*?)[" + *delimiters + "](.*)$")
81
82 md5Regex := regexp.MustCompile("^[a-f0-9]{32}$")
83 sha1Regex := regexp.MustCompile("\b[0-9a-f]{5,40}\b")
84
85 var hashesMap map[string]*regexp.Regexp
86 hashesMap = make(map[string]*regexp.Regexp)
87
88 hashesMap["MD5"] = md5Regex
89 hashesMap["SHA1"] = sha1Regex
90
91 var wg = sync.WaitGroup{}
92
93 credChannel := make(chan creds, 1000)
94 filePathChannel := make(chan string, *concurrency*4)
95 stopToolChannel := make(chan bool, 1)
96 stopFileWalkChannel := make(chan bool, 1)
97
98 numberOfTxtFiles := 0
99 numberOfProcessedFiles := 0
100
101 // TODO: Remove for stable version
102 go func() {
103 // Create a new router
104 router := mux.NewRouter()
105
106 // Register pprof handlers
107 router.HandleFunc("/debug/pprof/", pprof.Index)
108 router.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
109 router.HandleFunc("/debug/pprof/profile", pprof.Profile)
110 router.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
111
112 router.Handle("/debug/pprof/goroutine", pprof.Handler("goroutine"))
113 router.Handle("/debug/pprof/heap", pprof.Handler("heap"))
114 router.Handle("/debug/pprof/threadcreate", pprof.Handler("threadcreate"))
115 router.Handle("/debug/pprof/block", pprof.Handler("block"))
116 router.Handle("/debug/pprof/allocs", pprof.Handler("allocs"))
117 log.Fatal(http.ListenAndServe(":80", router))
118 }()
119
120 log.Println("Starting Import at", time.Now().Format("02-Jan-2006 15:04:05"))
121 defer timeTrack(time.Now(), "Txt To Postgres")
122
123 _ = filepath.Walk(*input,
124 func(path string, file os.FileInfo, err error) error {
125 if err != nil {
126 log.Fatalf("Error reading %s: %v", path, err)
127 return nil
128 }
129 if file.IsDir() {
130 return nil
131 }
132
133 if filepath.Ext(file.Name()) == ".txt" {
134 numberOfTxtFiles++
135 }
136 return nil
137 })
138
139 go fileWalk(input, filePathChannel, stopFileWalkChannel)
140 go textToPostgres(credChannel, *copySize)
141
142 for i := 0; i < *concurrency; i++ {
143 wg.Add(1)
144 go readFile(filePathChannel, compiledRegex, credChannel, numberOfTxtFiles, &numberOfProcessedFiles, &wg, hashesMap)
145 }
146
147 log.Println("Waiting to close Filepath Channel")
148 <-stopFileWalkChannel
149 log.Println("Closing Filepath Channel")
150 close(filePathChannel)
151
152 log.Println("WAITING")
153 wg.Wait()
154 log.Println("CLOSING LINE CHANNEL")
155 close(credChannel)
156
157 <-stopToolChannel
158}
159
160func readFile(filePathChannel chan string, delimiters *regexp.Regexp, credChannel chan creds, numberOfTxtFiles int, numberOfProcessedFiles *int, wg *sync.WaitGroup, hashesMap map[string]*regexp.Regexp) {
161 for {
162 path, morePaths := <-filePathChannel
163 if morePaths {
164 fileData, err := ioutil.ReadFile(path)
165 if err != nil {
166 log.Fatalf("Cannot read file %s", path)
167 return
168 }
169 fileAsString := string(fileData)
170 lines := strings.Split(fileAsString, "\n")
171
172 for _, line := range lines {
173 line = strings.TrimSpace(line)
174 if line != "" {
175 strings.Replace(line, "\u0000", "", -1)
176 insert := delimiters.ReplaceAllString(line, "${1}:$2")
177 splitLine := strings.SplitN(insert, ":", 2)
178
179 credentialForChan := creds{}
180
181 if len(splitLine) == 2 && utf8.ValidString(splitLine[0]) && utf8.ValidString(splitLine[1]) {
182
183 username := string(splitLine[0])
184 password := string(splitLine[1])
185
186 if hashesMap["MD5"].Match([]byte(password)) {
187 credentialForChan = creds{credType: "md5", user: username, pass: password}
188 } else if hashesMap["SHA1"].Match([]byte(password)) {
189 credentialForChan = creds{credType: "sha1", user: username, pass: password}
190 } else {
191 credentialForChan = creds{credType: "clear", user: username, pass: password}
192 }
193 }
194 credChannel <- credentialForChan
195 }
196 }
197
198 *numberOfProcessedFiles++
199 log.Printf("Read %v / %v Files", *numberOfProcessedFiles, numberOfTxtFiles)
200 runtime.GC()
201 } else {
202 log.Println("Closing readFile Goroutine")
203 break
204 }
205 }
206 wg.Done()
207}
208
209func fileWalk(dataSource *string, filePathChannel chan string, stopFileWalkChannel chan bool) {
210 _ = filepath.Walk(*dataSource,
211 func(path string, file os.FileInfo, err error) error {
212 if err != nil {
213 log.Fatalf("Error reading %s: %v", path, err)
214 return nil
215 }
216 if file.IsDir() {
217 return nil
218 }
219
220 if filepath.Ext(file.Name()) == ".txt" {
221 // log.Printf("reading %s, %vB", path, file.Size())
222 filePathChannel <- path
223 }
224 return nil
225 })
226
227 log.Println("stop file walk channel")
228 stopFileWalkChannel <- true
229}
230
231func textToPostgres(credChannel chan creds, copySize int) {
232
233 server, err := initDB()
234 handleErr(err)
235
236 checkConnectionAndCreateTables(server.db)
237
238 log.Println("Started Text to postgres goroutine")
239 var lineCount int64 = 0
240
241 for {
242 credentialToInsert, more := <-credChannel
243 if !more {
244 log.Printf("Inserted %v lines", lineCount)
245 break
246 }
247 lineCount++
248
249 if lineCount%int64(copySize*10) == 0 {
250 go server.Exec(credentialToInsert)
251 log.Printf("Inserted %v lines", lineCount)
252
253 }
254 }
255}
256
257func initDB() (*Store, error) {
258 connStr := fmt.Sprintf(dsn,dbHost, dbUser, dbPassword, dbName)
259 log.Println("Conenction: ", connStr)
260 db, err := sql.Open("postgres", connStr)
261 return &Store{
262 db: db,
263 }, err
264}
265
266type TxFn func(tx StorerWithTx) error
267
268func (t *Store) WithTx(fn TxFn) error {
269 tx, err := t.db.Begin()
270 if err != nil {
271 return err
272 }
273
274 conn := &Store{db: t.db, tx: tx}
275
276 defer func() {
277 if p := recover(); p != nil {
278 // a panic occurred, rollback
279 if err = tx.Rollback(); err != nil {
280 log.Printf("err during recovery Rollback: %v", err)
281 }
282 } else if err != nil {
283 // something went wrong, rollback
284 if err = tx.Rollback(); err != nil {
285 log.Printf("err during Rollback: %v", err)
286 }
287 } else {
288 // all good, commit
289 if err = tx.Commit(); err != nil {
290 log.Printf("err during Commit: %v", err)
291 }
292 }
293 }()
294
295 return fn(conn)
296}
297
298func (s *Store) NewStmtWithTx(c credType) (stmt *sql.Stmt, err error) {
299 if s.tx == nil {
300 s.tx, err = s.db.Begin()
301 if err != nil {
302 return nil, err
303 }
304 }
305
306 return s.tx.Prepare(pq.CopyIn(string(c), username, password))
307}
308
309func (s *Store) NewQueryWithTx(user, pass string, stmt *sql.Stmt) error {
310 return s.WithTx(func (fn StorerWithTx) (err error) {
311 if s.tx == nil {
312 s.tx, err = s.db.Begin()
313 if err != nil {
314 return err
315 }
316 }
317
318 _, err = stmt.Exec(user, pass)
319
320 return err
321 })
322}
323
324
325func (s *Store) Exec(c creds) {
326 stmt, err := s.NewStmtWithTx(c.credType)
327 handleErr(err)
328
329 err = s.NewQueryWithTx(c.user, c.pass, stmt)
330 handleErr(err)
331}
332
333func checkConnectionAndCreateTables(db *sql.DB) {
334 const queryClear = `CREATE TABLE IF NOT EXISTS clear (username varchar, password varchar)`
335 const queryMD5 = `CREATE TABLE IF NOT EXISTS md5 (username varchar, password varchar)`
336 const querySHA1 = `CREATE TABLE IF NOT EXISTS sha1 (username varchar, password varchar)`
337
338 var version string
339 serverVersion := db.QueryRow("SHOW server_version").Scan(&version)
340 if serverVersion != nil {
341 log.Fatal(serverVersion)
342 }
343 log.Println("Connected to:", version)
344
345 _, errClear := db.Exec(queryClear)
346 if errClear != nil {
347 log.Fatal("Failed to create table if exists")
348 }
349 _, errMD5 := db.Exec(queryMD5)
350 if errMD5 != nil {
351 log.Fatal("Failed to create table if exists")
352 }
353 _, errSHA1 := db.Exec(querySHA1)
354 if errSHA1 != nil {
355 log.Fatal("Failed to create table if exists")
356 }
357}
358
359func handleErr(err error) {
360 defer func (){
361 if r := recover(); r != nil {
362 log.Println(r)
363 }
364 }()
365 log.Println(err)
366}
367
368func timeTrack(start time.Time, name string) {
369 elapsed := time.Since(start)
370 log.Println("Finished Import at", time.Now().Format("02-Jan-2006 15:04:05"))
371 log.Printf("%s took %s", name, elapsed)
372}