· 9 years ago · Jan 11, 2017, 10:52 AM
1package main
2
3import (
4 "bytes"
5 "io"
6 "fmt"
7 "log"
8 "sync"
9 "strings"
10 "os"
11
12 "github.com/minio/minio-go"
13 "github.com/veqryn/go-email/email"
14)
15
16type ApiRequest struct {
17 MessageId string
18 Sender string
19 Recipient string
20 Headers map[string][]string
21 Html []byte
22 Text []byte
23 Attachments map[string]string
24}
25
26var aL = &sync.Mutex{}
27var m minio.Client
28var bucketName string
29var bucketLocation string
30var sendTo string
31
32func init() {
33 if sendTo = os.Getenv("SEND_TO"); sendTo == "" {
34 //log.Fatal("Must have a destination to post end result.")
35 }
36
37 endpoint := os.Getenv("MINIO_ENDPOINT")
38 accessKey := os.Getenv("MINIO_ACCESS_KEY")
39 secretKey := os.Getenv("MINIO_SECRET_KEY")
40 useTls := os.Getenv("MINIO_TLS") == "true"
41
42 if endpoint == "" {
43 endpoint = "localhost:9000"
44 }
45
46 if accessKey == "" {
47 accessKey = "AKIAIOSFODNN7EXAMPLE"
48 }
49
50 if secretKey == "" {
51 secretKey = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
52 }
53
54 if bucketName = os.Getenv("MINIO_BUCKET_NAME"); bucketName == "" {
55 bucketName = "attachments"
56 }
57
58 if bucketLocation = os.Getenv("MINIO_REGION"); bucketLocation == "" {
59 bucketLocation = "us-east-1"
60 }
61
62 m, err := minio.New(endpoint, accessKey, secretKey, useTls)
63
64 if err != nil {
65 log.Fatalln("S3 connection failed. ", err)
66 }
67
68 if err = m.MakeBucket(bucketName, bucketLocation); err != nil {
69 exists, err := m.BucketExists(bucketName)
70 if !exists {
71 log.Fatalln(err)
72 }
73 }
74}
75
76func parseMessage(buf *bytes.Buffer) (*email.Message, error) {
77 log.Printf("Length of buffer: %d", buf.Len())
78
79 msg, err := email.ParseMessage(buf)
80
81 if err != nil {
82 return nil, fmt.Errorf("Invalid message: %s", err)
83 }
84
85 return msg, nil
86}
87
88func main() {
89 buf := bytes.NewBuffer([]byte{})
90 io.Copy(buf, os.Stdin)
91
92 msg, err := parseMessage(buf)
93
94 if err != nil {
95 log.Fatal("Unable to parse message.", err)
96 }
97
98 w := &sync.WaitGroup{}
99 r := &ApiRequest{
100 MessageId: ExtractMessageIdFromHeaders(msg.Header),
101 }
102
103 for _, p := range(msg.Parts) {
104 ct := ExtractContentTypeFromHeaders(p.Header)
105
106 if strings.HasPrefix(ct, "text/plain") && len(r.Text) == 0 {
107 r.Text = p.Body
108 } else if strings.HasPrefix(ct, "text/html") && len(r.Html) == 0 {
109 r.Html = p.Body
110 } else {
111 w.Add(1)
112 StorePartAsAttachment(r, p, w)
113 }
114 }
115
116 w.Wait()
117
118 log.Print("Total parts: ", len(msg.Parts))
119 log.Print(r.Attachments)
120}
121
122func ExtractMessageIdFromHeaders(h email.Header) string {
123 if val, ok := h["Message-Id"]; ok {
124 return val[0]
125 }
126
127 return ""
128}
129
130func ExtractContentTypeFromHeaders(h email.Header) string {
131 if val, ok := h["Content-Type"]; ok {
132 return val[0]
133 }
134
135 return "text/plain"
136}
137
138func StorePartAsAttachment(r *ApiRequest, p *email.Message, w *sync.WaitGroup) {
139 go func () {
140 fn := ExtractFilenameFromAttachment(p)
141 pa := fn
142
143 c, err := m.PutObject(bucketName, pa, bytes.NewReader(p.Body), ExtractContentTypeFromHeaders(p.Header))
144 log.Fatal(err)
145
146 if err != nil {
147 log.Println("Received error uploading file: ", err)
148 log.Println("Will just continue whilst skipping file.")
149 w.Done()
150 return;
151 }
152
153 if int(c) != len(p.Body) {
154 log.Printf("Missmatch in uploaded file will continue anyway, original %dbytes stored %dbytes.\n", c, len(p.Body))
155 }
156
157 aL.Lock()
158 defer aL.Unlock()
159
160 r.Attachments[fn] = pa
161 w.Done()
162 }()
163}
164
165func ExtractFilenameFromAttachment(p *email.Message) string {
166 // todo check Content-Disposition for filename.
167 // todo check Content-Type for filename.
168 // todo if all fails return random string.
169 return "hey-you-guys.pdf"
170}