IT 삽질기

Airflow Xcom 사용하기 본문

BigData/Airflow

Airflow Xcom 사용하기

화이팅빌런 2021. 9. 24. 00:10

이번 글에서는 Airflow Xcom에 대해서 알아보도록 하자.

Airflow Version : 2.1.3에서 테스트를 진행했다.

Xcom이란

Xcom은 DAG 내의 task 사이에서 데이터를 전달하기 위해서 사용되는데, CeleryExecutor를 예로 들면, 각 task들이 각기 다른 Worker에서 실행될 수 있으며, Xcom은 이러한 경우 task간 데이터 전달을 가능하게 한다. Variables와 마찬가지로 key-value의 형식으로 사용되지만, Variables과는 달리 Xcom은 DAG내에서만 공유할 수 있는 변수라는 점이다. Xcom을 이용해 데이터를 전달하는 경우 DataFrame이나 많은 양의 데이터를 전달하는 것은 지원하지 않으며, 소량의 데이터만 전달하는 것을 권장한다. Xcom을 사용하기 위해서는 각 task에서 push, pull 하는 방식으로 기본적으로 사용되지만, PythtonOperator의 경우 return이 자동적으로 Xcom 변수로 지정되게 된다.

보다 자세한 사항은 Airflow 공식문서를 참고하기 바란다.

https://airflow.apache.org/docs/apache-airflow/stable/concepts/xcoms.html?highlight=xcom 

Xcom 사용하기

이제 Xcom을 사용하는 몇 가지 방법을 알아보고 예제를 통해 살펴보도록 하자.

  • PythonOperator return 값을 이용한 Xcom 사용
  • push-pull을 이용한 Xcom 사용
  • Jinja templates을 이용한 Xcom 사용

Jinja templates와 Airflow에 관련된 내용은 아래의 글을 참고하기 바란다.

2021.05.19 - [BigData/Airflow] - Airflow에서 Jinja template 사용하기

사용법에 대해 하나씩 살펴보도록 하자.

PythonOperator return 값을 이용한 Xcom 사용

PythonOperator에서 return을 하면 Airflow xcom에 자동으로 push되기에 return하는 함수를 만들어 하나의 task로 실행시켰다.

def return_xcom():
    return "xcom!"
    
return_xcom = PythonOperator(
    task_id = 'return_xcom',
    python_callable = return_xcom,
    dag = dag
)

push-pull을 이용한 Xcom 사용

PythonOperator에서 return을 하는 방법 이외에도 아래와 같이 context['task_instance']를 이용하여 xcom에 push, pull 하여 데이터를 주고받는 것 또한 가능한데, 여기서 알아야 할 내용들에 대해 간단하게 설명한다.

먼저 context['task_instance']와 context['ti']는 동일한 의미로 ti = task_instance로 간단하게 축약하여 사용할 수 있다.

다음으로 PythonOperator을 사용하는 경우 return과 push를 하나의 task에서 중복하여 사용할 수 있으며, 해당 데이터를 전달받는 곳에서 xcom_pull(key=~) 혹은 xcom_pull(task_ids=~)를 이용해 전달받는 방식이 서로 다른 것을 인지하자.

return으로 xcom을 사용하는 경우 xcom_pull(task_ids)를 사용해 데이터를 전달받고,

push하는 경우에는 key-value 형식에 따라 데이터를 주고받게 된다.

def xcom_push_test(**context):
    xcom_value = "xcom_push_value"
    context['task_instance'].xcom_push(key='xcom_push_value', value=xcom_value)

    return "xcom_return_value"

def xcom_pull_test(**context):
    xcom_return = context["task_instance"].xcom_pull(task_ids='return_xcom')
    xcom_push_value = context['ti'].xcom_pull(key='xcom_push_value')
    xcom_push_return_value = context['ti'].xcom_pull(task_ids='xcom_push_task')

    print("xcom_return : {}".format(xcom_return))
    print("xcom_push_value : {}".format(xcom_push_value))
    print("xcom_push_return_value : {}".format(xcom_push_return_value))
    
xcom_push_task = PythonOperator(
    task_id = 'xcom_push_task',
    python_callable = xcom_push_test,
    dag = dag
)

xcom_pull_task = PythonOperator(
    task_id = 'xcom_pull_task',
    python_callable = xcom_pull_test,
    dag = dag
)

Jinja templates을 이용한 Xcom 사용

마지막으로 jinja template를 이용하여 전달받는 방식이다.

여기서는 BashOperator을 사용해 예제를 표현했지만 다른 Operator에서도 동일하게 사용하는 것이 가능하다.

앞서 설명했던 것과 동일하게 task_instance(ti)를 이용하는 것과 동일하게 사용이 가능하며 push, pull 모두 사용이 가능하다.

bash_xcom_taskids = BashOperator(
    task_id='bash_xcom_taskids',
    bash_command='echo "{{ task_instance.xcom_pull(task_ids="xcom_push_task") }}"',
    dag=dag
)

bash_xcom_key = BashOperator(
    task_id='bash_xcom_key',
    bash_command='echo "{{ ti.xcom_pull(key="xcom_push_value") }}"',
    dag=dag
)

bash_xcom_push = BashOperator(
    task_id='bash_xcom_push',
    bash_command='echo "{{ ti.xcom_push(key="bash_xcom_push", value="bash_xcom_push_value") }}"',
    dag=dag
)

bash_xcom_pull = BashOperator(
    task_id='bash_xcom_pull',
    bash_command='echo "{{ ti.xcom_pull(key="bash_xcom_push") }}"',
    dag=dag
)

 

전체 예제 dag는 아래와 같다.

https://github.com/dydwnsekd/airflow_example/blob/main/dags/xcom.py

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime

dag = DAG(
    dag_id = 'xcom_test',
    start_date = datetime(2021,9,23),
    catchup=False,
    schedule_interval='@once'
)

def return_xcom():
    return "xcom!"

def xcom_push_test(**context):
    xcom_value = "xcom_push_value"
    context['task_instance'].xcom_push(key='xcom_push_value', value=xcom_value)

    return "xcom_return_value"

def xcom_pull_test(**context):
    xcom_return = context["task_instance"].xcom_pull(task_ids='return_xcom')
    xcom_push_value = context['ti'].xcom_pull(key='xcom_push_value')
    xcom_push_return_value = context['ti'].xcom_pull(task_ids='xcom_push_task')

    print("xcom_return : {}".format(xcom_return))
    print("xcom_push_value : {}".format(xcom_push_value))
    print("xcom_push_return_value : {}".format(xcom_push_return_value))


return_xcom = PythonOperator(
    task_id = 'return_xcom',
    python_callable = return_xcom,
    dag = dag
)

xcom_push_task = PythonOperator(
    task_id = 'xcom_push_task',
    python_callable = xcom_push_test,
    dag = dag
)

xcom_pull_task = PythonOperator(
    task_id = 'xcom_pull_task',
    python_callable = xcom_pull_test,
    dag = dag
)

bash_xcom_taskids = BashOperator(
    task_id='bash_xcom_taskids',
    bash_command='echo "{{ task_instance.xcom_pull(task_ids="xcom_push_task") }}"',
    dag=dag
)

bash_xcom_key = BashOperator(
    task_id='bash_xcom_key',
    bash_command='echo "{{ ti.xcom_pull(key="xcom_push_value") }}"',
    dag=dag
)

bash_xcom_push = BashOperator(
    task_id='bash_xcom_push',
    bash_command='echo "{{ ti.xcom_push(key="bash_xcom_push", value="bash_xcom_push_value") }}"',
    dag=dag
)

bash_xcom_pull = BashOperator(
    task_id='bash_xcom_pull',
    bash_command='echo "{{ ti.xcom_pull(key="bash_xcom_push") }}"',
    dag=dag
)

return_xcom >> xcom_push_task >>xcom_pull_task >> bash_xcom_taskids >> bash_xcom_key >> bash_xcom_push >> bash_xcom_pull

 

실행 결과

실행 결과를 살펴보도록 하자.

모든 task가 성공하고 pull하여 데이터를 확인하는 부분만 살펴보도록 한다.

 

xcom_pull_task

bash_xcom_taskids

bash_xcom_key

bash_xcom_pull

Airflow Web UI에서 확인하는 것 또한 가능하다.

Admin-XComs 메뉴에서 확인이 가능하며 Dag id, task id, key, value 등의 정보를 확인할 수 있다.

참고자료

https://airflow.apache.org/docs/apache-airflow/stable/index.html

https://moons08.github.io/programming/airflow-xcom/

'BigData > Airflow' 카테고리의 다른 글

Airflow execution_date 이해하기  (4) 2021.09.30
Airflow Example DAG 제거하기  (0) 2021.09.19
Airflow SlackOperator 사용하기  (0) 2021.09.16
Airflow 서비스 등록하기  (0) 2021.09.04
Airflow Local Executor와 Celery Executor  (0) 2021.08.20
Comments