· 4 years ago · May 26, 2021, 07:58 PM
1import time
2import boto3
3from botocore.exceptions import ClientError
4
5def get_client(access_key, secret_key):
6 # athena client
7 athena = boto3.client(
8 'athena',
9 aws_access_key_id=access_key,
10 aws_secret_access_key=secret_key
11 )
12 return athena
13
14def get_query_results(athena_client, query):
15 # call query via athena
16 query_id = athena_client.start_query_execution(
17 QueryString=query,
18 ResultConfiguration={
19 'OutputLocation': 's3://cfadatalaketest-useast1-core-aws-athena-query-results/',
20 'EncryptionConfiguration': {
21 'EncryptionOption': 'SSE_S3'
22 }
23 }
24 )
25
26 # get query results
27 query_done = False
28 while not query_done:
29 try:
30 response = athena_client.get_query_results(
31 QueryExecutionId=query_id['QueryExecutionId']
32 )
33 query_done = True
34 except ClientError as e:
35 if 'Query has not yet finished' in e.response['Error']['Message']:
36 pass
37 else:
38 raise e
39 time.sleep(1)
40 results = []
41 for data in response['ResultSet']['Rows']:
42 row = []
43 for value in data['Data']:
44 row.append(value['VarCharValue'])
45 results.append(row)
46 headers = results[0]
47 results = results[1:]
48
49 return headers, results
50
51def main():
52 access_key = 'AKIATUGYBJBOXLKSUVRS'
53 secret_key = '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'
54
55 athena = get_client(access_key, secret_key)
56
57 query = '''
58 select count(*)
59 from draft.fusion_organizationpvo_silver
60 '''
61
62 headers, results = get_query_results(athena, query)
63
64 print(f'headers: {headers}')
65 print(f'results: {results}')
66
67main()