· 6 years ago · Apr 07, 2019, 08:26 PM
1from minio import Minio
2from minio.error import ResponseError, BucketAlreadyOwnedByYou, BucketAlreadyExists
3import os
4import glob
5import uuid
6
7MINIO_PORT = 9000
8HOST_URI = <Minio Host URI>
9OUTPUT_BUCKET_NAME = <BUCKET NAME>
10ACCESS_KEY = <ACCESS KEY>
11SECRET_KEY = <SECRET KEY>
12TEST_DOC_NAME = "test-doc-%s.txt" % str(uuid.uuid4())
13
14# Write a mock document
15with open("./%s" % TEST_DOC_NAME, "w") as f:
16 f.write("hello!")
17
18minioClient = Minio('%s:%s' % (HOST_URI, MINIO_PORT),
19 access_key= ACCESS_KEY,
20 secret_key= SECRET_KEY,
21 secure=False)
22
23try:
24 minioClient.make_bucket(OUTPUT_BUCKET_NAME)
25except BucketAlreadyOwnedByYou as err:
26 pass
27except BucketAlreadyExists as err:
28 pass
29except ResponseError as err:
30 raise
31
32try:
33 object = minioClient.fput_object(
34 bucket_name=OUTPUT_BUCKET_NAME, object_name=TEST_DOC_NAME, file_path="./%s" % TEST_DOC_NAME
35 )
36 print("FPUT COMPLETE!")
37 print(object)
38except ResponseError as err:
39 print(err)
40
41print("Cleaning up...")
42for f in glob.glob("./*.txt"):
43 os.remove(f)