· 9 years ago · Oct 24, 2016, 02:44 AM
1#!/usr/bin/env python
2
3import asyncore
4import uuid
5import time
6import subprocess
7import pyinotify
8import argparse
9import re
10import math
11import os
12import sys
13import logging
14import threading
15from queue import Queue
16
17log = logging.getLogger()
18log.setLevel(logging.INFO)
19ch = logging.StreamHandler(sys.stdout)
20ch.setLevel(logging.INFO)
21formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
22ch.setFormatter(formatter)
23log.addHandler(ch)
24
25parser = argparse.ArgumentParser()
26parser.add_argument('--log-dir', '-d', action="store", help='Log dir to watch', default="/var/log/nginx")
27parser.add_argument('--path-pattern', '-p', action="store", help='Log name pattern match', default='.*(.1.gz)$')
28parser.add_argument('--aws-s3-bucket', '-b', action="store", help='AWS S3 bucket name', required=True)
29parser.add_argument('--aws-access-key', '-a', action="store", help='AWS access key or from ENV AWS_ACCESS_KEY_ID')
30parser.add_argument('--aws-secret-key', '-s', action="store", help='AWS secret key or from ENV AWS_SECRET_ACCESS_KEY')
31parser.add_argument('--parallel-workers', '-P', action="store", help='Number of parallel uploads in threads', default=3)
32parser.add_argument('--s3-storage-class', '-S', action="store", help='S3 storage class in AWS', default="REDUCED_REDUNDANCY")
33parser.add_argument('--s3-app-dir', '-A', action="store", help='S3 in bucket dir name for this app', required=True)
34args = parser.parse_args()
35
36q = Queue()
37
38
39class MyEventHandler(pyinotify.ProcessEvent):
40 def process_IN_CREATE(self, event):
41 parse_event(event.pathname)
42
43 def process_IN_MODIFY(self, event):
44 parse_event(event.pathname)
45
46 def process_IN_OPEN(self, event):
47 parse_event(event.pathname)
48
49
50def s3upload(pathname, keyname, bucketname, access_key, secret_key, dir_struc):
51 start = time.time()
52 environ = os.environ.copy()
53 destpath = "s3://{0}/{1}/{2}".format(bucketname, dir_struc, dstname(keyname))
54 command = "/usr/local/bin/aws s3 cp --storage-class {storage_class} {source} {dest}".format(
55 source=pathname,
56 dest=destpath,
57 storage_class=args.s3_storage_class
58 )
59 log.info("Uploading file {0}".format(pathname))
60 copy = subprocess.Popen(
61 command,
62 env=environ,
63 stdout=subprocess.PIPE,
64 stderr=subprocess.PIPE,
65 shell=True
66 )
67 log.debug("Running: {}".format(args))
68 log.debug(copy.communicate())
69 if copy.returncode == 0:
70 log.info("Sending {0} to {1} SUCCESS! in {2} (sec)".format(pathname, destpath, time.time() - start))
71 error = False
72 else:
73 error = True
74 if error:
75 log.exception("Error sending file {}".format(pathname))
76 sys.exit(1)
77
78
79def parse_event(pathname):
80 pattern = re.compile(args.path_pattern)
81 pathmatch = pattern.match(pathname)
82 s3choose(pathmatch, pathname)
83
84
85def dstname(pathname):
86 randname = random_name(8)
87 return randname + "_" + pathname
88
89
90def random_name(string_length=8):
91 random = str(uuid.uuid4())
92 random = random.replace("-", "")
93 return random[0:string_length]
94
95
96def datedir():
97 appname = args.s3_app_dir
98 dirname = time.strftime('%Y/%m/%d/%H/%M')
99 return appname + "/" + dirname
100
101
102def worker():
103 bucketname = args.aws_s3_bucket
104 dir_struc = datedir()
105
106 if args.aws_access_key:
107 access_key = args.aws_access_key
108 else:
109 access_key = os.environ['AWS_ACCESS_KEY_ID']
110
111 if args.aws_secret_key:
112 secret_key = args.aws_secret_key
113 else:
114 secret_key = os.environ['AWS_SECRET_ACCESS_KEY']
115
116 while True:
117 item = q.get()
118 keyname = os.path.basename(item)
119 s3upload(item, keyname, bucketname, access_key, secret_key, dir_struc)
120 q.task_done()
121
122
123def s3choose(parsedpath, pathname):
124 if parsedpath:
125 q.put(pathname)
126 else:
127 log.info("file not match - {0} - skipping sending to S3".format(pathname))
128
129
130def main():
131
132 wm = pyinotify.WatchManager()
133 log.info("Watching {0} with pattern {1} ....".format(args.log_dir, args.path_pattern))
134
135 for i in range(args.parallel_workers):
136 t = threading.Thread(target=worker)
137 t.daemon = True
138 t.start()
139
140 q.join()
141
142 notifier = pyinotify.AsyncNotifier(wm, MyEventHandler())
143 ret = wm.add_watch(args.log_dir, pyinotify.IN_CREATE, rec=True)
144 asyncore.loop()
145
146if __name__ == '__main__':
147 main()