· 6 years ago · Feb 12, 2019, 11:02 PM
1import os
2from multiprocessing import Pool, Lock
3import uuid
4
5import boto3
6
7ACCESS_KEY = os.environ.get('ACCESS_KEY')
8SECRET_KEY = os.environ.get('SECRET_KEY')
9
10ENDPOINT='http://zenko.dev.tmacs.space'
11BUCKET = 'test'
12FILE_SIZE = 1024 * 1024 * 100
13NUM_FILES = 100
14
15class FakeFile:
16 def __init__(self, size, content = b'0', parent=None):
17 self._size = size
18 self._pos = 0
19 self._content = bytes(content)
20 self._parent = parent
21
22 def _makebytes(self, num):
23 self._pos += num
24 return self._content * num
25
26 def read(self, num = -1):
27 if num == -1:
28 return self._makebytes(self._size)
29 if self._pos == self._size:
30 return self._makebytes(0)
31 elif self._size < self._pos + num:
32 return self._makebytes(self._size - self._pos)
33 return self._makebytes(num)
34
35 def seek(self, offset, from_what = os.SEEK_SET):
36 if from_what == os.SEEK_SET:
37 self._pos = offset
38 elif from_what == os.SEEK_CUR:
39 self._pos += offset
40 elif from_what == os.SEEK_END:
41 self._pos = self._size - offset
42 return self._pos
43
44 def tell(self):
45 return self._pos
46
47 def close(self):
48 pass
49
50
51def build_client():
52 return boto3.Session(
53 aws_access_key_id=ACCESS_KEY
54 aws_secret_access_key=SECRET_KEY,
55 endpoint_url=ENDPOINT
56 ).resource('s3').Bucket(BUCKET)
57
58def upload_file():
59 bucket = build_client()
60 name = uuid.uuid4()
61 bucket.put_object(Key=name, Body=FakeFile(FILE_SIZE))
62
63PENDING = NUM_FILES
64PENDING_LOCK = Lock()
65
66def success_callback(x):
67 print('Finished uploading')
68 with PENDING_LOCK:
69 PENDING -= 1
70 print('%s remaining'%PENDING)
71
72def failed_callback(err):
73 print('Failed to upload with %s'%str(err))
74 with PENDING_LOCK:
75 PENDING -= 1
76 print('%s remaining'%PENDING)
77
78if __name__ == '__main__':
79 pool = Pool(processes=20)
80 for i in range(NUM_FILES):
81 print(i)
82 pool.apply_async(upload_file, (), {}, success_callback, failed_callback)
83 input()