· 7 years ago · Nov 12, 2018, 10:52 AM
1import airflow
2from datetime import datetime, timedelta
3from airflow.operators.hive_operator import HiveOperator
4from airflow.models import DAG
5
6args = {
7 'owner': 'raul',
8 'start_date': datetime(2018, 11, 12),
9 'provide_context': True,
10 'depends_on_past': False,
11 'retries': 2,
12 'retry_delay': timedelta(minutes=5),
13 'email': ['raul.gregglino@leroymerlin.ru'],
14 'email_on_failure': True,
15 'email_on_retry': False
16}
17
18dag = DAG('opus_data',
19 default_args=args,
20 max_active_runs=6,
21 schedule_interval="@daily"
22)
23
24import_lv_data = HiveOperator(
25 task_id='fct_latest_values',
26 hive_cli_conn_id='metastore_default',
27 hql='create_import_table_fct_latest_values.hql ',
28 hiveconf_jinja_translate=True,
29 dag=dag
30 )
31
32deps = {}
33
34# Explicity define the dependencies in the DAG
35for downstream, upstream_list in deps.iteritems():
36 for upstream in upstream_list:
37 dag.set_dependency(upstream, downstream)
38
39*I'm testing the connection to understand if the table is created or not, then I'll try to LOAD DATA, hence the LOAD DATA is commented out.
40CREATE TABLE IF NOT EXISTS opus_data.fct_latest_values_new_data (
41 id_product STRING,
42 id_model STRING,
43 id_attribute STRING,
44 attribute_value STRING
45) ROW FORMAT DELIMITED FIELDS TERMINATED ',';
46
47#LOAD DATA LOCAL INPATH
48#'/media/windows_share/schemas/opus/fct_latest_values_20181106.csv'
49#OVERWRITE INTO TABLE opus_data.fct_latest_values_new_data;
50
51CREATE TABLE IF NOT EXISTS opus_data.fct_latest_values_new_data (
52 id_product STRING,
53 id_model STRING,
54 id_attribute STRING,
55 attribute_value STRING
56) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';