일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
- 람다 아키텍처
- HBase
- execution_date
- Service
- docker
- Kafka
- Scala
- slack app
- LDAP
- Namenode
- Lambda architecture
- HDFS
- java
- jupyter
- HDP
- re
- Windows
- MapReduce
- slack
- Example DAG
- hadoop
- yarn
- python
- NoSQL
- ambari
- 빅데이터
- 정규표현식
- airflow
- HIVE
- SlackWebhookOperator
- Today
- Total
IT 삽질기
Airflow에서 Jinja template 사용하기 본문
Airflow에서는 Jinja2 template를 내장하고 있어 이를 활용할 수 있는데,
Jinja2 template에 대한 자세한 내용은 Jinja Document를 참고하기 바란다.
https://jinja.palletsprojects.com/en/3.0.x/
Jinja2 template를 활용할 수 있는 방법에는 어떤것이 있을까
Airflow를 이용할 때, batch job을 실행하는 경우 현재 날짜에 대한 정보가 필요할 수 있다.
예를 들어 글 작성 날짜인 2021-05-19 라는 값을 Hive Query에 이용할 수 있을 것이다.
이런 경우 어떻게 해야할까
Dags에 직접 입력하는 방법도 있지만 그렇게 동작시키는 경우 매번 값을 바꿔 실행해주어야 한다.
이런 경우 Jinja template와 Airflow에서 제공하는 macro를 사용할 수 있다.
Airflow에서 제공하는 macro 목록은 아래의 링크에서 확인할 수 있다,
https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html?highlight=macro
그렇다면 Macro를 어떻게 사용할 수 있을까 예제를 통해 알아보도록 하자.
https://github.com/dydwnsekd/airflow_example/blob/main/dags/jinja_example.py
from airflow import DAG
from datetime import datetime, timedelta
from airflow.providers.apache.hive.hooks.hive import *
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
default_args= {
'start_date': days_ago(1),
'retries': 0,
'catchup': False,
'retry_delay': timedelta(minutes=5),
}
templated_command = """
echo "ds : {{ ds }}"
echo "ds_nodash : {{ ds_nodash }}"
echo "prev_ds : {{ prev_ds }}"
echo "prev_ds_nodash : {{ prev_ds_nodash }}"
echo "next_ds : {{ next_ds }}"
echo "next_ds_nodash : {{ next_ds_nodash }}"
echo "yesterday_ds : {{ yesterday_ds }}"
echo "yesterday_ds_nodash : {{ yesterday_ds_nodash }}"
echo "tomorrow_ds : {{ tomorrow_ds }}"
echo "tomorrow_ds_nodash : {{ tomorrow_ds_nodash }}"
echo "ts : {{ ts }}"
echo "ts_nodash : {{ ts_nodash }}"
echo "ts_nodash_with_tz : {{ ts_nodash_with_tz }}"
echo "macros.ds_add(ds, 2) : {{ macros.ds_add(ds, 2) }}"
echo "macros.ds_add(ds, -2) : {{ macros.ds_add(ds, -2) }}"
echo "macros.ds_format(ds, '%Y-%m-%d', '%Y__%m__%d'): {{ macros.ds_format(ds, '%Y-%m-%d', '%Y__%m__%d') }}"
echo "macros.datetime.now() : {{ macros.datetime.strftime(macros.datetime.now(), '%Y%m%d%H') }}"
"""
def templated_test(d1):
print("{{ ds }}")
print("ds test:", d1)
dag = DAG(
'templated_test',
default_args=default_args,
schedule_interval="@daily",
)
t1 = BashOperator(
task_id='bash_templated',
bash_command=templated_command,
dag=dag,
)
t2 = PythonOperator(
task_id='python_task',
python_callable=templated_test,
op_args=["{{ ds }}"],
dag=dag,
)
t1 >> t2
위의 예제에서는 BashOperator, PythonOperator을 사용해 macro를 사용하는 예제가 나타나있다.
HiveOperator을 사용하는 경우는 query에 BashOperator에서 사용한 것과 동일한 방식으로 {{ ds }}와 같은 형식으로 사용하면 되며, 다른 Operator에 대한 내용은 테스트를 진행하지 않았다.
먼저 BashOperator에 있는 macro들에 대해 하나씩 알아보도록 하자. 실행 결과는 아래와 같다.
macro문서에 있는것과 동일한 결과를 표시하며, prev_ds와 next_ds의 경우에는 오늘 날짜를 기준으로 어제, 오늘이 아닌 이전 scheduling날짜 기준의 이전, 이후인 것을 기억하자 예를 들어 scheduling이 일주일 단위였다면, prev_ds는 2021-05-09 next_ds는 2021-05-23이 표시될 것이다.
아래에 macro.ds_add를 이용하여 연산도 가능한 것을 알 수 있다.
그런데 macro에서 사용하는 날짜들을 살펴보면 대부분 yyyymmdd에 대한 정보만을 표시한다.
시간에 대한 정보까지 원하는 형식으로 변경하기 위해서는 python에서 제공하는 datetime을 함께 사용할 수 있다.
macros.datetime를 이용하면 python datetime library에서 제공하는 메서드들을 모두 사용할 수 있어 원하는 형식으로 변환이 가능하다.
다음으로 PythonOperator에서 사용하는 것을 확인해보자.
templated_test 함수의 경우 print에서 직접 사용하는 {{ ds }}와 파라미터 형식으로 전달받아 사용하는 {{ ds }}를 print를 출력하고 있는데, 이렇게 사용하는 경우 파라미터로 전달받은 {{ ds }}만 원하는대로 출력이 된다.
아래의 실행결과를 확인해보자.
다른 marco들도 PythonOperator에서 동일하게 사용이 가능하다.
macro는 필요에 따라 plugin형식으로 선언이 가능하다고 Airflow문서에 명시되어 있으며, 자주 사용하는 경우 이를 이용하여 등록해 사용할 수 있을 것 같다.
'BigData > Airflow' 카테고리의 다른 글
Airflow Variables (0) | 2021.05.27 |
---|---|
Airflow dag_run.conf 사용하기 (2) | 2021.05.26 |
Docker를 이용한 Airflow 2.0.2 실행하기(1) (2) | 2021.05.13 |
Airflow 2.0 설치하기(4) (0) | 2021.05.03 |
Airflow HiveOperator LDAP 연결 (0) | 2021.04.27 |