· 6 years ago · May 06, 2019, 03:26 PM
1from airflow import DAG
2from datetime import datetime, timedelta
3from airflow.operators.python_operator import PythonOperator
4from airflow.operators.postgres_operator import PostgresOperator
5from airflow.hooks.postgres_hook import PostgresHook
6from psycopg2.extras import execute_values
7
8default_args = {
9 'owner': 'airflow',
10 'depends_on_past': False,
11 'start_date': datetime(2018, 4, 15),
12 'email': ['example@email.com'],
13 'email_on_failure': False,
14 'email_on_retry': False,
15 'retries': 1,
16 'retry_delay': timedelta(minutes=5),
17 'catchup': False
18}
19
20dag = DAG('example_python',
21 default_args=default_args,
22 schedule_interval='@once',
23 start_date=datetime(2017, 3, 20),
24 catchup=False)
25
26def csvToPostgres():
27 #Open Postgres Connection
28 pg_hook = PostgresHook(postgres_conn_id='airflow_db')
29 get_postgres_conn = PostgresHook(postgres_conn_id='airflow_db').get_conn()
30 curr = get_postgres_conn.cursor("cursor")
31 # CSV loading to table.
32 with open('/usr/local/airflow/dags/example.csv', 'r') as f:
33 next(f)
34 curr.copy_from(f, 'example_table', sep=',')
35 get_postgres_conn.commit()
36
37
38task1 = PostgresOperator(task_id = 'create_table',
39 sql = ("create table if not exists example_table " +
40 "(" +
41 "test_id text, " +
42 "test_value text " +
43 ")"),
44 postgres_conn_id='airflow_db',
45 autocommit=True,
46 dag= dag)
47
48task2 = PythonOperator(task_id='csv_to_db',
49 provide_context=False,
50 python_callable=csvToPostgres,
51 dag=dag)
52
53
54task1 >> task2