· 9 years ago · Dec 20, 2016, 06:08 PM
1"""
2Base classes for Athena queries and loads.
3"""
4
5import datetime
6
7import jaydebeapi;
8import luigi
9import luigi.s3
10
11import sources
12
13
14class AthenaQuery(luigi.Task):
15 """
16 Base class for Athena queries.
17
18 Provides methods to execute a query against Athena. Typically the
19 only Luigi methods needed to be implemented will be requires() and
20 run(). Run will usually only require a call to query_store() to
21 generate its output at S3.
22
23 Note that the ".csv.metadata" key at S3 will be removed to allow
24 later loads to Redshift. This may impact viewing the query's
25 results in the Athena web UI.
26 """
27
28 results_path_base = "s3://500px-emr/athena-results/"
29
30 resources = {"athena_query": 1}
31
32 date = luigi.DateParameter(default=datetime.date.today())
33 access_key = luigi.Parameter(default="",
34 config_path={"section": "s3", "name": "aws_access_key_id"})
35 secret_key = luigi.Parameter(default="",
36 config_path={"section": "s3", "name": "aws_secret_access_key"})
37 athena_jdbc_driver_path = luigi.Parameter(default="",
38 config_path={"section": "athena", "name": "jdbc_driver_path"})
39
40 def __init__(self, *args, **kwargs):
41 super(AthenaQuery, self).__init__(*args, **kwargs)
42
43 self.results_path = self.results_path_base + self.results_path_template % self.date
44 if self.results_path.endswith("/"):
45 self.results_path = self.results_path.rstrip("/")
46
47 def run(self):
48 raise NotImplementedError("must override run() method")
49
50 def _execute_query(self, sql):
51 props = {"s3_staging_dir": self.results_path,
52 "user": self.access_key,
53 "password": self.secret_key}
54
55 conn = jaydebeapi.connect(
56 "com.amazonaws.athena.jdbc.AthenaDriver",
57 ["jdbc:awsathena://athena.us-east-1.amazonaws.com:443"],
58 jars=athena_jdbc_driver_path,
59 props=props)
60
61 rs = conn.execute(sql)
62
63 s3_client = luigi.s3.S3Client()
64 metadata = [k for k in s3_client.list(self.results_path)
65 if k.endswith(".csv.metadata")]
66 for k in metadata:
67 s3_client.remove("%s/%s" % (self.results_path, k))
68
69 return rs
70
71 def query_store(self, sql):
72 """
73 Query Athena and close result set immediately. Results will be
74 at self.results_path.
75 """
76
77 rs = self._execute_query(sql)
78 rs.close()
79
80 def query_retrieve(self, sql):
81 """
82 Query Athena and return result set.
83 """
84
85 rs = self._execute_query(sql)
86
87 while rs.next():
88 yield rs
89
90 def output(self):
91 return luigi.s3.S3Target(self.results_path)