· 7 years ago · Mar 27, 2018, 08:08 AM
1[2018-03-22 13:37:02,762] {models.py:1428} INFO - Executing <Task(SparkSubmitOperator): ImportCrawlJob> on 2018-03-22 13:37:00
2[2018-03-22 13:37:02,763] {base_task_runner.py:115} INFO - Running: ['bash', '-c', 'sudo -H -u hdfs airflow run dag_extract_jobs ImportCrawlJob 2018-03-22T13:37:00 --job_id 21 --raw -sd DAGS_FOLDER/run_extract_jobs.py --cfg_path /tmp/tmpir3e3r32']
3[2018-03-22 13:37:04,194] {base_task_runner.py:98} INFO - Subtask: [2018-03-22 13:37:04,193] {__init__.py:45} INFO - Using executor SequentialExecutor
4[2018-03-22 13:37:04,356] {base_task_runner.py:98} INFO - Subtask: [2018-03-22 13:37:04,356] {models.py:189} INFO - Filling up the DagBag from /home/airflow/airflow/dags/run_extract_jobs.py
5[2018-03-22 13:37:04,451] {base_task_runner.py:98} INFO - Subtask: Traceback (most recent call last):
6[2018-03-22 13:37:04,451] {base_task_runner.py:98} INFO - Subtask: File "/usr/bin/airflow", line 27, in <module>
7[2018-03-22 13:37:04,451] {base_task_runner.py:98} INFO - Subtask: args.func(args)
8[2018-03-22 13:37:04,452] {base_task_runner.py:98} INFO - Subtask: File "/usr/lib/python3.5/site-packages/airflow/bin/cli.py", line 353, in run
9[2018-03-22 13:37:04,452] {base_task_runner.py:98} INFO - Subtask: dag = get_dag(args)
10[2018-03-22 13:37:04,452] {base_task_runner.py:98} INFO - Subtask: File "/usr/lib/python3.5/site-packages/airflow/bin/cli.py", line 130, in get_dag
11[2018-03-22 13:37:04,452] {base_task_runner.py:98} INFO - Subtask: 'parse.'.format(args.dag_id))
12[2018-03-22 13:37:04,452] {base_task_runner.py:98} INFO - Subtask: airflow.exceptions.AirflowException: dag_id could not be found: dag_extract_jobs. Either the dag did not exist or it failed to parse.
13
14# Parameters to initialize Spark:
15access_id = Variable.get("AWS_ACCESS_KEY")
16bucket_name = 'cb-scrapinghub'
17secret_key = Variable.get("AWS_SECRET_KEY")
18timestamp = datetime.now().strftime("%Y-%m-%d-%H:%M:%S")
19
20
21default_args = {
22 'owner': 'airflow',
23 'depends_on_past': False,
24 'retries': 1,
25 'retry_delay': timedelta(minutes=5),
26}
27
28DAG = DAG(
29 dag_id='dag_extract_jobs',
30 description='Run Extract Jobs',
31 schedule_interval='@once',
32 start_date=datetime(2018, 1, 1),
33 catchup=False,
34 default_args=default_args,
35)
36
37# Spark Job that runs ImportS3CrawlData:
38importCrawlJob = SparkSubmitOperator(
39 task_id='ImportCrawlJob',
40 ...
41 run_as_user='hdfs',
42 dag=DAG,
43)
44
45# Spark Job that runs ExtractAboutText:
46extractAboutText = SparkSubmitOperator(
47 task_id='ExtractAboutText',
48 ...
49 run_as_user='hdfs',
50 dag=DAG
51)
52extractAboutText.set_upstream(importCrawlJob)
53
54# Spark Job that runs ExtractCompanyInfo:
55extractCompanyInfo = SparkSubmitOperator(
56 task_id='ExtractCompanyInfo',
57 ...
58 run_as_user='hdfs',
59 dag=DAG
60)
61extractCompanyInfo.set_upstream(importCrawlJob)
62
63# Spark Job that runs ExtractWebPeople:
64extractWebPeople = SparkSubmitOperator(
65 task_id='ExtractWebPeople',
66 ...
67 run_as_user='hdfs',
68 dag=DAG
69)
70extractWebPeople.set_upstream(importCrawlJob)