IT 삽질기

Airflow Sensor 사용법 본문

BigData/Airflow

Airflow Sensor 사용법

화이팅빌런 2021. 6. 21. 22:41

Airflow Sensor란?

Sensor는 시간, 파일, 외부 이벤트를 기다리며 해당 조건을 충족해야만 이후의 작업을 진행할 수 있게 해주는 Airflow의 기능으로 Operator와 같이 하나의 task가 될 수 있으며 filesystem, hdfs, hive 등 다양한 형식을 제공한다.

이전글에서 작성한 것과 같이 airflow 2.x의 버전에서는 third party로 분류되는 서비스의 경우 airflow와 별개로 설치가 필요하며 필요에 따라 원하는 package를 설치를 진행하면 된다.

2021.04.03 - [BigData/Airflow] - Airflow2.x providers 설치하기

 

sensor의 종류는 아래의 링크를 참고하기 바란다.

airflow sensor

 

FileSensor

이 글에서는 FileSensor을 이용하는 예시를 살펴보도록 하자.

먼저 FileSensor을 코드를 살펴보자

https://airflow.apache.org/docs/apache-airflow/stable/_modules/airflow/sensors/filesystem.html#FileSensor

 

FileSensor을 사용하기 위해서는 fs_conn_id, filepath가 필요한대 fs_conn_id는 connection에서 등록하여 사용하면 된다.

 

Connection 만들기

생성한 Connection은 아래와 같다.

Conn Type는 File(path)로 설정하고, extra에 path key를 추가하여 dir까지의 경로를 입력한다.

여기서 입력하는 path는 FileSensor에서 filepath와 조합하여 해당 파일이 있는지 없는지 여부를 계속 확인하게 된다.

 

Connection을 만들었으니 이제 sensor dag를 만들어서 테스트를 진행한다.

 

FileSensor 예제 dag 만들기

작성해놓은 FileSensor은 아래의 링크에서 확인할 수 있다.

https://github.com/dydwnsekd/airflow_example/blob/main/dags/sensor.py

from airflow import DAG
from datetime import datetime, timedelta
from airflow.sensors.filesystem import FileSensor
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago

default_args= {
    'start_date': days_ago(1),
    'retries': 0,
    'catchup': False,
    'retry_delay': timedelta(minutes=5),
}
    
dag = DAG(
        'sensor_test', 
        default_args=default_args, 
        schedule_interval="@once",
)

t1 = FileSensor(
    task_id='sensor_a',
    fs_conn_id='file_sensor',
    filepath='a.txt',
    dag=dag,
)

t2 = BashOperator(
    task_id='cat_a',
    bash_command='cat /opt/airflow/sensor/a.txt',
    dag=dag,
)

t1 >> t2

각 task의 역할을 확인해보자.

t1은 FileSensor로 해당 경로에 파일이 있는지 확인하는 역할을 하며, 1분마나 한번씩 체크를 진행해 파일이 있는 경우에만 다음 단계로 넘어갈 수 있게 된다. Connection에서 설정한 path를 포함하여 /opt/airflow/sensor/a.txt 파일이 존재하는 경우 성공하는것이 되겠다.

t2는 단순하게 /opt/airflow/sensor/a.txt의 내용을 보여주는 BashOperator이다

 

실제 실행 결과를 확인해보자.

해당 경로에 파일이 없는 경우 작업은 더 이상 진행되지 않는다

 

log를 살펴보자.

sensor_a task의 log를 확인해보면 /opt/airflfow/sensor/a.txt를 1분마다 한번씩 계속 확인하고 있는 것을 알 수 있다.

그럼 이제 a.txt 파일을 만들어보자.

a.txt파일을 생성했다.

생성 이후 바로 확인이 되는것이 아니라 일정 시간이 지나야 확인한다는 것이기 때문에 시간이 조금 걸릴 수 있다.

생성이 완료된 이후 작업도 정상적으로 종료된 것을 확인할 수 있다.

log를 확인해보자.

주기적으로 체크를 진행하다가 a.txt가 만들어진 것을 확인한 후 task가 정상적으로 종료되었다.

 

이렇게 FileSensor를 이용하는 방법을 알아보았다.

FileSensor뿐만 아니라 hive, hdfs 등 다양한 Sensor을 제공하기 때문에 필요에 따라 사용할 수 있다.

'BigData > Airflow' 카테고리의 다른 글

Airflow 서비스 등록하기  (0) 2021.09.04
Airflow Local Executor와 Celery Executor  (0) 2021.08.20
Airflow KakaoWork bot 사용하기  (0) 2021.06.19
Airflow TeamsWebHook 사용하기  (0) 2021.06.03
Airflow Variables  (0) 2021.05.27
Comments