IT 삽질기

Airflow에서 Jinja template 사용하기 본문

BigData/Airflow

Airflow에서 Jinja template 사용하기

화이팅빌런 2021. 5. 19. 22:23

Airflow에서는 Jinja2 template를 내장하고 있어 이를 활용할 수 있는데,

Jinja2 template에 대한 자세한 내용은 Jinja Document를 참고하기 바란다.

https://jinja.palletsprojects.com/en/3.0.x/

 

Jinja2 template를 활용할 수 있는 방법에는 어떤것이 있을까

Airflow를 이용할 때, batch job을 실행하는 경우 현재 날짜에 대한 정보가 필요할 수 있다.

예를 들어 글 작성 날짜인 2021-05-19 라는 값을 Hive Query에 이용할 수 있을 것이다.

이런 경우 어떻게 해야할까

Dags에 직접 입력하는 방법도 있지만 그렇게 동작시키는 경우 매번 값을 바꿔 실행해주어야 한다.

이런 경우 Jinja template와 Airflow에서 제공하는 macro를 사용할 수 있다.

Airflow에서 제공하는 macro 목록은 아래의 링크에서 확인할 수 있다,

https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html?highlight=macro 

 

그렇다면 Macro를 어떻게 사용할 수 있을까 예제를 통해 알아보도록 하자.

https://github.com/dydwnsekd/airflow_example/blob/main/dags/jinja_example.py

from airflow import DAG
from datetime import datetime, timedelta
from airflow.providers.apache.hive.hooks.hive import *
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago

default_args= {
    'start_date': days_ago(1),
    'retries': 0,
    'catchup': False,
    'retry_delay': timedelta(minutes=5),
}

templated_command = """
    echo "ds : {{ ds }}"
    echo "ds_nodash : {{ ds_nodash }}"
    echo "prev_ds : {{ prev_ds }}"
    echo "prev_ds_nodash : {{ prev_ds_nodash }}"
    echo "next_ds : {{ next_ds }}"
    echo "next_ds_nodash : {{ next_ds_nodash }}"
    echo "yesterday_ds : {{ yesterday_ds }}"
    echo "yesterday_ds_nodash : {{ yesterday_ds_nodash }}"
    echo "tomorrow_ds : {{ tomorrow_ds }}"
    echo "tomorrow_ds_nodash : {{ tomorrow_ds_nodash }}"
    echo "ts : {{ ts }}"
    echo "ts_nodash : {{ ts_nodash }}"
    echo "ts_nodash_with_tz : {{ ts_nodash_with_tz }}"
    echo "macros.ds_add(ds, 2) : {{ macros.ds_add(ds, 2) }}"
    echo "macros.ds_add(ds, -2) : {{ macros.ds_add(ds, -2) }}"
    echo "macros.ds_format(ds, '%Y-%m-%d', '%Y__%m__%d'): {{ macros.ds_format(ds, '%Y-%m-%d', '%Y__%m__%d') }}"
    echo "macros.datetime.now() : {{ macros.datetime.strftime(macros.datetime.now(), '%Y%m%d%H') }}"
"""

def templated_test(d1):
    print("{{ ds }}")
    print("ds test:", d1)
    
dag = DAG(
        'templated_test', 
        default_args=default_args, 
        schedule_interval="@daily",
)

t1 = BashOperator(
    task_id='bash_templated',
    bash_command=templated_command,
    dag=dag,
)

t2 = PythonOperator(
    task_id='python_task',
    python_callable=templated_test,
    op_args=["{{ ds }}"],
    dag=dag,
)

t1 >> t2

위의 예제에서는 BashOperator, PythonOperator을 사용해 macro를 사용하는 예제가 나타나있다.

HiveOperator을 사용하는 경우는 query에 BashOperator에서 사용한 것과 동일한 방식으로 {{ ds }}와 같은 형식으로 사용하면 되며, 다른 Operator에 대한 내용은 테스트를 진행하지 않았다.

먼저 BashOperator에 있는 macro들에 대해 하나씩 알아보도록 하자. 실행 결과는 아래와 같다.

macro문서에 있는것과 동일한 결과를 표시하며, prev_ds와 next_ds의 경우에는 오늘 날짜를 기준으로 어제, 오늘이 아닌 이전 scheduling날짜 기준의 이전, 이후인 것을 기억하자 예를 들어 scheduling이 일주일 단위였다면, prev_ds는 2021-05-09 next_ds는 2021-05-23이 표시될 것이다.

아래에 macro.ds_add를 이용하여 연산도 가능한 것을 알 수 있다.

그런데 macro에서 사용하는 날짜들을 살펴보면 대부분 yyyymmdd에 대한 정보만을 표시한다.

시간에 대한 정보까지 원하는 형식으로 변경하기 위해서는 python에서 제공하는 datetime을 함께 사용할 수 있다.

macros.datetime를 이용하면 python datetime library에서 제공하는 메서드들을 모두 사용할 수 있어 원하는 형식으로 변환이 가능하다.

 

다음으로 PythonOperator에서 사용하는 것을 확인해보자.

templated_test 함수의 경우 print에서 직접 사용하는 {{ ds }}와 파라미터 형식으로 전달받아 사용하는 {{ ds }}를 print를 출력하고 있는데, 이렇게 사용하는 경우 파라미터로 전달받은 {{ ds }}만 원하는대로 출력이 된다.

아래의 실행결과를 확인해보자.

다른 marco들도 PythonOperator에서 동일하게 사용이 가능하다.

 

macro는 필요에 따라 plugin형식으로 선언이 가능하다고 Airflow문서에 명시되어 있으며, 자주 사용하는 경우 이를 이용하여 등록해 사용할 수 있을 것 같다.

'BigData > Airflow' 카테고리의 다른 글

Airflow Variables  (0) 2021.05.27
Airflow dag_run.conf 사용하기  (2) 2021.05.26
Docker를 이용한 Airflow 2.0.2 실행하기(1)  (2) 2021.05.13
Airflow 2.0 설치하기(4)  (0) 2021.05.03
Airflow HiveOperator LDAP 연결  (0) 2021.04.27
Comments