· 6 years ago · Jan 08, 2020, 12:10 PM
1import subprocess
2import json
3import logging
4import os
5import uuid
6
7from airflow.contrib.kubernetes.volume import Volume
8from airflow.contrib.kubernetes.volume_mount import VolumeMount
9#from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
10from airflow.contrib.kubernetes.pod import Resources
11from airflow_dags_sens_reproc_orch.kubernetes_pod_operator_mod.kubernetes_pod_operator import KubernetesPodOperator
12
13import airflow.utils.dates
14from airflow.models import DAG
15from airflow.operators.python_operator import PythonOperator
16from airflow.hooks.base_hook import BaseHook
17from airflow_dags_sens_reproc_orch.config import conf, k8s_config, conf_mapr
18from airflow_dags_sens_reproc_orch.modules.mapr_control_mystem_endpoint import MaprControlSystemEndpoint
19
20
21os.environ["MAPR_TICKETFILE_LOCATION"] = conf.MAPR_TICKETFILE_LOCATION
22streams = ("BN_CALIFR",)
23connection = BaseHook.get_connection('mapr_api')
24mapr_password = connection.password
25
26output_dir = "/ad-vantage/data/store/reprocessed/sensor/airflow-test/"
27
28
29dag = DAG(
30 dag_id="sensor-reprocessing-ib",
31 default_args={"start_date": airflow.utils.dates.days_ago(2), "owner": "Airflow"},
32 schedule_interval=None,
33)
34dag.doc_md = """
35#### DAG Summary
36This is a DAG for single IB-task on OpenShift.
37
38#### On Failure Actions
39DAG will rerun based on k8s retries parameters
40
41#### Points of Contact
42trachev@dxc.com or graychev@dxc.com
43
44"""
45
46
47def make_json(name, files):
48 return {'reprocessingInputFileStreams': [{'key': name, "files": files}]}
49
50
51def create_volume():
52 volume_name = str(uuid.uuid1())
53 logging.info('Creating output dir.')
54 volume_output = output_dir + volume_name
55
56 mapr_conn = MaprControlSystemEndpoint(conf_mapr.config, mapr_password)
57 mapr_conn.create_volume(volume_output, volume_name)
58
59 logging.info(volume_output)
60 k8s_volume = volume_output.replace("/ad-vantage/data", "")
61
62 return volume_output, k8s_volume
63
64
65def load_config(**context):
66 rpus = context["dag_run"].conf["rpus"]
67 version = context["dag_run"].conf["rpus"][0]["version"]
68 ib_id = context["dag_run"].conf["rpus"][0]["id"]
69 input_path = context["dag_run"].conf["input-path"]
70 volume_output = context['ti'].xcom_pull(task_ids='create')[0]
71
72 for stream in streams:
73 path = "/ad-vantage/data/store/{0}/{1}/*.*".format(input_path, stream)
74 raw_files = "echo $(hadoop fs -ls {0} | awk '{{print $NF}}' | cut -d'/' -f1,4-)".format(path)
75 ec = subprocess.Popen(raw_files, shell=True, stdout=subprocess.PIPE)
76
77 out, err = ec.communicate()
78 list_files = out.decode('utf-8').split()
79
80 app_config = make_json(stream, list_files)
81 logging.info(app_config)
82
83 put_files = "hadoop fs -put - {0}/input.json <<< '{1}'".format(volume_output, json.dumps(app_config))
84 put_process = subprocess.Popen(put_files, shell=True)
85 put_process.communicate()
86
87 return version
88
89
90"""Kubernetes POD parameters"""
91
92CONFIG_PATH = k8s_config.CONFIG_PATH
93PROJECT = k8s_config.PROJECT
94
95compute_resource = Resources()
96compute_resource.request_cpu = '1'
97compute_resource.request_memory = '4096Mi'
98compute_resource.limit_cpu = '1'
99compute_resource.limit_memory = '4096Mi'
100
101vol_name = 'data-vol'
102claim_name = k8s_config.pvc
103mount_dir = '/store'
104mount_name = 'data-vol'
105
106vol_conf = {'persistentVolumeClaim': {'claimName': claim_name}}
107
108
109data_vol = Volume(name=vol_name, configs=vol_conf)
110data_volume_mount = VolumeMount(
111 mount_name,
112 mount_path=mount_dir,
113 sub_path='',
114 read_only=False
115 )
116
117
118CreateVolumeTask = PythonOperator(
119 task_id='create',
120 python_callable=create_volume,
121 dag=dag)
122
123CreateVolumeTask.doc_md = """
124####Task for creating Mapr volume
125
126Creates a Mapr volume via Mapr REST API
127
128:return: Volume name
129 """
130
131LoadingTask = PythonOperator(
132 task_id="parse_config",
133 python_callable=load_config,
134 provide_context=True,
135 dag=dag)
136
137LoadingTask.doc_md = """
138 ###Task for loading the DAG run config and creating input.json file
139
140Loads the DAG run parameters. Lists the "input folder". Prepare input.json.
141Push input.json to the Mapr Volume created on previous task.
142
143:param rpus: should contain json object with id, version and input-path
144:return: Version
145 """
146
147KubernetesJobTask = KubernetesPodOperator(
148 task_id='k8s_job',
149 namespace=PROJECT,
150 image="docker-registry.default.svc.cluster.local:5000/sensor-reprocessing-ib/ib:{{ ti.xcom_pull(task_ids='parse_config') }}",
151 volumes=[data_vol],
152 volume_mounts=[data_volume_mount],
153 cmds=['bash', '-c'],
154 arguments=["./app {{ ti.xcom_pull(task_ids='create')[1] }}/input.json {{ ti.xcom_pull(task_ids='create')[1] }}"],
155 in_cluster=False,
156 config_file=CONFIG_PATH,
157 resources=compute_resource,
158 get_logs=True,
159 is_delete_operator_pod=False,
160 image_pull_policy='IfNotPresent',
161 startup_timeout_seconds=k8s_config.k8s_timeout,
162 name='hil-job',
163 retries=k8s_config.k8s_retries,
164 retry_delay=k8s_config.k8s_wait_time,
165 dag=dag)
166
167KubernetesJobTask.doc_md = """
168 #### Kubernetes Task
169
170Spin up an OpenShift Pod, which run executable in the container.
171The executable takes two input arguments:
172-path to previosly created input.json file
173-path to previosly created output directory (Volume)
174
175The tag of the container image refers to the version, provided on DAG run.
176
177:param k8s_config: config file containing information regarding Kubernetes configuration
178(Project,PV,PVC, etc)
179 """
180
181CreateVolumeTask >> LoadingTask >> KubernetesJobTask