IT 삽질기

Airflow dag_run.conf 사용하기 본문

BigData/Airflow

Airflow dag_run.conf 사용하기

화이팅빌런 2021. 5. 26. 23:24

Airflow dag_run.conf

Airflow dag_run.conf 사용법에 대해 알아보도록 하자.

dag_run.conf는 dag를 trigger dag를 사용할 때, 나오는 옵션과 같은 것인데

Dag list나 Dag view에서 볼 수 있다.

 

Trigger Dag를 클릭하면 아래와 같은 화면으로 넘어간다.

Trigger DAG를 사용하는 경우 Optional하게 원하는 데이터를 JSON 형식으로 전달할 수 있는데, 사용 방법은 이전 포스팅에서 설명한 Jinja template 방식을 따른다.

2021.05.19 - [BigData/Airflow] - Airflow에서 Jinja template 사용하기

 

Trigger DAG의 아래 부분에도 친절하게 설명되어 있다.

{{ dag_run.conf }}

위와 같이 사용하면 되는데 실제 사용해보도록 하자.

DAG는 아래와 같이 작성한다.

from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago

default_args= {
    'start_date': days_ago(1),
    'retries': 0,
    'catchup': False,
    'retry_delay': timedelta(minutes=5),
}

templated_command = """
    echo "ds : {{ dag_run.conf.ds }}"
    """
    
dag = DAG(
        'templated_test', 
        default_args=default_args, 
        schedule_interval="@daily",
)

t1 = BashOperator(
    task_id='bash_templated',
    bash_command=templated_command,
    dag=dag,
)

t1

BashOperator을 이용해 ds라는 key값을 받아 출력해볼 예정이다.

 

Trigger DAG에는 아래와 같이 입력하도록 한다.

{ "ds": "today" }

출력 결과는 아래와 같다

{{ dag_run.conf.ds }} 가 today로 변경되어 출력되었다.

그렇다면 ds값을 입력하지 않는 경우는 어떻게 될까

이번에는 입력값 없이 Trigger DAG를 사용해보도록 하자.

결과는 아래와 같이 당연히 오류가 발생하게 된다.

ds값을 전달하지 않아 에러가 발생했다.

 

JinjaTemplate에 있는 기능을 사용하면 되는데, 만약 ds로 원하는 값이 특정값으로 정해져 있는 경우 아래와 같이 사용할 수 있다.

{{ dag_run.conf.ds | d("yesterday") }}

 

Trigger DAG에서 동일하게 아무값도 입력하지 않고 테스트를 진행해보자.

"yesterday"로 지정한 값이 나왔다.

다음으로 다시 ds 값을 넣어보자.

값을 이전과 같이 today로 넣은 경우 원하는 값이 출력된다.

위와 같이 d( ) 부분에 문자열을 넣지 않고 macro에 지정되어 있는 값을 활용하여 아래와 같이 사용하는 것 또한 가능하다.

{{ dag_run.conf.ds | d(ds) }}

 

최종 코드는 아래와 같다.

https://github.com/dydwnsekd/airflow_example/blob/main/dags/dag_run.conf.py

from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago

default_args= {
    'start_date': days_ago(1),
    'retries': 0,
    'catchup': False,
    'retry_delay': timedelta(minutes=5),
}

templated_command = """
    echo "ds : {{ dag_run.conf.ds|d(execution_date) }}"
    """
    
dag = DAG(
        'templated_test', 
        default_args=default_args, 
        schedule_interval="@daily",
)

t1 = BashOperator(
    task_id='bash_templated',
    bash_command=templated_command,
    dag=dag,
)

t1

 

'BigData > Airflow' 카테고리의 다른 글

Airflow TeamsWebHook 사용하기  (0) 2021.06.03
Airflow Variables  (0) 2021.05.27
Airflow에서 Jinja template 사용하기  (0) 2021.05.19
Docker를 이용한 Airflow 2.0.2 실행하기(1)  (2) 2021.05.13
Airflow 2.0 설치하기(4)  (0) 2021.05.03
Comments