일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
- 빅데이터
- python
- MapReduce
- 정규표현식
- airflow
- SlackWebhookOperator
- LDAP
- HDP
- ambari
- slack
- yarn
- Windows
- NoSQL
- HIVE
- Scala
- Lambda architecture
- slack app
- docker
- jupyter
- Service
- Example DAG
- java
- Kafka
- hadoop
- execution_date
- 람다 아키텍처
- HBase
- HDFS
- Namenode
- re
- Today
- Total
IT 삽질기
Airflow Xcom 사용하기 본문
이번 글에서는 Airflow Xcom에 대해서 알아보도록 하자.
Airflow Version : 2.1.3에서 테스트를 진행했다.
Xcom이란
Xcom은 DAG 내의 task 사이에서 데이터를 전달하기 위해서 사용되는데, CeleryExecutor를 예로 들면, 각 task들이 각기 다른 Worker에서 실행될 수 있으며, Xcom은 이러한 경우 task간 데이터 전달을 가능하게 한다. Variables와 마찬가지로 key-value의 형식으로 사용되지만, Variables과는 달리 Xcom은 DAG내에서만 공유할 수 있는 변수라는 점이다. Xcom을 이용해 데이터를 전달하는 경우 DataFrame이나 많은 양의 데이터를 전달하는 것은 지원하지 않으며, 소량의 데이터만 전달하는 것을 권장한다. Xcom을 사용하기 위해서는 각 task에서 push, pull 하는 방식으로 기본적으로 사용되지만, PythtonOperator의 경우 return이 자동적으로 Xcom 변수로 지정되게 된다.
보다 자세한 사항은 Airflow 공식문서를 참고하기 바란다.
https://airflow.apache.org/docs/apache-airflow/stable/concepts/xcoms.html?highlight=xcom
Xcom 사용하기
이제 Xcom을 사용하는 몇 가지 방법을 알아보고 예제를 통해 살펴보도록 하자.
- PythonOperator return 값을 이용한 Xcom 사용
- push-pull을 이용한 Xcom 사용
- Jinja templates을 이용한 Xcom 사용
Jinja templates와 Airflow에 관련된 내용은 아래의 글을 참고하기 바란다.
2021.05.19 - [BigData/Airflow] - Airflow에서 Jinja template 사용하기
사용법에 대해 하나씩 살펴보도록 하자.
PythonOperator return 값을 이용한 Xcom 사용
PythonOperator에서 return을 하면 Airflow xcom에 자동으로 push되기에 return하는 함수를 만들어 하나의 task로 실행시켰다.
def return_xcom():
return "xcom!"
return_xcom = PythonOperator(
task_id = 'return_xcom',
python_callable = return_xcom,
dag = dag
)
push-pull을 이용한 Xcom 사용
PythonOperator에서 return을 하는 방법 이외에도 아래와 같이 context['task_instance']를 이용하여 xcom에 push, pull 하여 데이터를 주고받는 것 또한 가능한데, 여기서 알아야 할 내용들에 대해 간단하게 설명한다.
먼저 context['task_instance']와 context['ti']는 동일한 의미로 ti = task_instance로 간단하게 축약하여 사용할 수 있다.
다음으로 PythonOperator을 사용하는 경우 return과 push를 하나의 task에서 중복하여 사용할 수 있으며, 해당 데이터를 전달받는 곳에서 xcom_pull(key=~) 혹은 xcom_pull(task_ids=~)를 이용해 전달받는 방식이 서로 다른 것을 인지하자.
return으로 xcom을 사용하는 경우 xcom_pull(task_ids)를 사용해 데이터를 전달받고,
push하는 경우에는 key-value 형식에 따라 데이터를 주고받게 된다.
def xcom_push_test(**context):
xcom_value = "xcom_push_value"
context['task_instance'].xcom_push(key='xcom_push_value', value=xcom_value)
return "xcom_return_value"
def xcom_pull_test(**context):
xcom_return = context["task_instance"].xcom_pull(task_ids='return_xcom')
xcom_push_value = context['ti'].xcom_pull(key='xcom_push_value')
xcom_push_return_value = context['ti'].xcom_pull(task_ids='xcom_push_task')
print("xcom_return : {}".format(xcom_return))
print("xcom_push_value : {}".format(xcom_push_value))
print("xcom_push_return_value : {}".format(xcom_push_return_value))
xcom_push_task = PythonOperator(
task_id = 'xcom_push_task',
python_callable = xcom_push_test,
dag = dag
)
xcom_pull_task = PythonOperator(
task_id = 'xcom_pull_task',
python_callable = xcom_pull_test,
dag = dag
)
Jinja templates을 이용한 Xcom 사용
마지막으로 jinja template를 이용하여 전달받는 방식이다.
여기서는 BashOperator을 사용해 예제를 표현했지만 다른 Operator에서도 동일하게 사용하는 것이 가능하다.
앞서 설명했던 것과 동일하게 task_instance(ti)를 이용하는 것과 동일하게 사용이 가능하며 push, pull 모두 사용이 가능하다.
bash_xcom_taskids = BashOperator(
task_id='bash_xcom_taskids',
bash_command='echo "{{ task_instance.xcom_pull(task_ids="xcom_push_task") }}"',
dag=dag
)
bash_xcom_key = BashOperator(
task_id='bash_xcom_key',
bash_command='echo "{{ ti.xcom_pull(key="xcom_push_value") }}"',
dag=dag
)
bash_xcom_push = BashOperator(
task_id='bash_xcom_push',
bash_command='echo "{{ ti.xcom_push(key="bash_xcom_push", value="bash_xcom_push_value") }}"',
dag=dag
)
bash_xcom_pull = BashOperator(
task_id='bash_xcom_pull',
bash_command='echo "{{ ti.xcom_pull(key="bash_xcom_push") }}"',
dag=dag
)
전체 예제 dag는 아래와 같다.
https://github.com/dydwnsekd/airflow_example/blob/main/dags/xcom.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime
dag = DAG(
dag_id = 'xcom_test',
start_date = datetime(2021,9,23),
catchup=False,
schedule_interval='@once'
)
def return_xcom():
return "xcom!"
def xcom_push_test(**context):
xcom_value = "xcom_push_value"
context['task_instance'].xcom_push(key='xcom_push_value', value=xcom_value)
return "xcom_return_value"
def xcom_pull_test(**context):
xcom_return = context["task_instance"].xcom_pull(task_ids='return_xcom')
xcom_push_value = context['ti'].xcom_pull(key='xcom_push_value')
xcom_push_return_value = context['ti'].xcom_pull(task_ids='xcom_push_task')
print("xcom_return : {}".format(xcom_return))
print("xcom_push_value : {}".format(xcom_push_value))
print("xcom_push_return_value : {}".format(xcom_push_return_value))
return_xcom = PythonOperator(
task_id = 'return_xcom',
python_callable = return_xcom,
dag = dag
)
xcom_push_task = PythonOperator(
task_id = 'xcom_push_task',
python_callable = xcom_push_test,
dag = dag
)
xcom_pull_task = PythonOperator(
task_id = 'xcom_pull_task',
python_callable = xcom_pull_test,
dag = dag
)
bash_xcom_taskids = BashOperator(
task_id='bash_xcom_taskids',
bash_command='echo "{{ task_instance.xcom_pull(task_ids="xcom_push_task") }}"',
dag=dag
)
bash_xcom_key = BashOperator(
task_id='bash_xcom_key',
bash_command='echo "{{ ti.xcom_pull(key="xcom_push_value") }}"',
dag=dag
)
bash_xcom_push = BashOperator(
task_id='bash_xcom_push',
bash_command='echo "{{ ti.xcom_push(key="bash_xcom_push", value="bash_xcom_push_value") }}"',
dag=dag
)
bash_xcom_pull = BashOperator(
task_id='bash_xcom_pull',
bash_command='echo "{{ ti.xcom_pull(key="bash_xcom_push") }}"',
dag=dag
)
return_xcom >> xcom_push_task >>xcom_pull_task >> bash_xcom_taskids >> bash_xcom_key >> bash_xcom_push >> bash_xcom_pull
실행 결과
실행 결과를 살펴보도록 하자.
모든 task가 성공하고 pull하여 데이터를 확인하는 부분만 살펴보도록 한다.
xcom_pull_task
bash_xcom_taskids
bash_xcom_key
bash_xcom_pull
Airflow Web UI에서 확인하는 것 또한 가능하다.
Admin-XComs 메뉴에서 확인이 가능하며 Dag id, task id, key, value 등의 정보를 확인할 수 있다.
참고자료
https://airflow.apache.org/docs/apache-airflow/stable/index.html
'BigData > Airflow' 카테고리의 다른 글
Airflow execution_date 이해하기 (4) | 2021.09.30 |
---|---|
Airflow Example DAG 제거하기 (0) | 2021.09.19 |
Airflow SlackOperator 사용하기 (0) | 2021.09.16 |
Airflow 서비스 등록하기 (0) | 2021.09.04 |
Airflow Local Executor와 Celery Executor (0) | 2021.08.20 |