IT 삽질기

Airflow 2.0 설치하기(4) 본문

BigData/Airflow

Airflow 2.0 설치하기(4)

화이팅빌런 2021. 5. 3. 23:02

지난번까지 Airflow를 설치하고, mysql과 연동하는 작업까지 마무리 했다.

오늘은 실질적으로 DAG를 만들어 동작시켜보도록 하자

2021.01.27 - [BigData/Airflow] - Airflow 2.0 설치하기(1)

2021.01.29 - [BigData/Airflow] - Airflow 2.0 설치하기(2)

2021.03.21 - [BigData/Airflow] - Airflow 2.0 설치하기(3)

 

설치가 완료되었으니 webserver, scheduler을 실행시켜보도록 하자

webserver 실행

-D 옵션은 daemon 형식으로 실행하는 옵션으로 -D 옵션만 이용한다

airflow webserver -D

 

WebServer가 실행되었으면 PC에서 확인이 가능하다

기존에 접근했던 것과 같이 localhost:8080을 이용해 접근하여 로그인을 하면 위와 같은 메세지가 나오는데,

해당 메세지는 Scheduler이 올라오지 않아 나오는 메세지로 Scheduler이 정상적으로 실행되면 없어지게 된다.

Scheduler 실행

Scheduler도 마찬가지 방법으로 실행시킨다.

airflow scheduler -D

실행이 완료된 이후 webserver에서 새로고침을 해보면 아래와 같이 scheduler에 대한 메세지가 사라진 것을 볼 수 있다

 

Airflow Web의 UI를 살펴보면 예제로 나와있는 DAGs들을 볼 수 있는데, DAGs가 하나의 작업으로 순서와 방향성을 설정해 원하는 작업을 실행시킬 수 있다.

 

DAG를 실제로 작성해 테스트를 진행해보도록 하자.

여기서는 BashOperator, PythonOperator을 이용해 DAG를 만들고 실행하는 예제를 통해 테스트를 진행한다.

지난글까지의 순서대로 Airflow를 설치했다면, Docker container의 dags dir과 로컬 dir을 연결시키지 않았기 때문에 container에서 dags를 작성해보도록 하자.

 

DAGs dir 생성

먼저 DAG를 어디에 작성해야 하는지를 알아보도록 하자.

airflow가 설치된 docker container에서 airflow.cfg를 열어보자

4번 line에 dags_folder에 관한 설정값이 있는데, 해당 경로에 dags를 작성하면 된다.

기본적으로는 해당 경로에 /root/airflow/dags라는 경로에 dags라는 dir이 존재하지 않으며, 해당 dir을 만들어주면 된다.

mkdir dags

 

DAG만들기

DAG는 아래와 같이 만들었으며, 자세한 설명은 추후 진행하도록 한다.

코드는 github에도 존재한다.

github.com/dydwnsekd/airflow_example/blob/main/dags/firstDAG.py

from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago

default_args = {
    'start_date': days_ago(1),
}

dag = DAG(
    'firstDAG_test',
    default_args=default_args,
    schedule_interval="@once",
)

def Hello_airflow():
    print("Hello airflow")

t1 = BashOperator(
    task_id='bash',
    bash_command='echo "Hello airflow"',
    dag=dag,
)

t2 = PythonOperator(
    task_id='python',
    python_callable=Hello_airflow,
    dag=dag,
)

t1 >> t2

DAG code에 에러가 없다면 위와 webserver에서 확인할 수 있는데,

example로 있는 다른 dag들이 많아 보기 복잡할 수 있어 검색기능을 이용해 작성한 dag만 표시되도록 했다

위의 이미지에서 보이는 firstDAG_test는 DAG파일 작성시 dag에 관한 부분인대

이 부분에 대해서만 간단히 설명을 하자면,

firstDAG_test는 DAG의 이름으로 unique한 값을 가져야 한다.

default_args는 dag에서 사용할 기본적인 파라미터 값으로 위에서 정의한 내용의 일부를 사용하며,

scheduler_interval은 해당 DAG가 언제 실행될지에 대한 것을 설정하는데 test용으로 "@once"를 이용해 한번만 실행되도록 했다.

dag = DAG(
    'firstDAG_test',
    default_args=default_args,
    schedule_interval="@once",
)

 

DAG 실행

DAG를 실행하기 위해서는 제일 왼쪽에 존재하는 on/off toggle키를 눌러주면 된다.

 

DAG 실행 확인

error이 없는 경우 위의 이미지에서 보는 것처럼 run에 초록색 원 안에 1, Recent Tasks 초록색 원 안에 2가 나오면 작성한 DAG가 정상적으로 동작했다는 뜻이며, 내용을 확인하기 위해 DAG 이름인 firstDAG_test를 눌러 확인한다

 

DAG 이름을 눌러 들어오면 해당 먼저 Tree View를 볼 수 있는데, Tree View에서는 해당 DAG의 시간에 따른 실행결과를 확인하기 좋다.

 

 

해당 DAG는 한번만 실행되었으며, 왼쪽편에 DAG가 어떤 순서로 동작하는지 task 이름으로 되어 있는 것을 확인할 수 있다. bash, python이라는 task가 존재하며 해당 task들은 순서대로 실행될 것이다.

task의 이름은 같은 DAG내에서만 중복되지 않으면 된다.

 

오른쪽편에 보이는 원과 사각형에 대해 살펴보면 먼저 원은 DAG를 나타내는 것으로 DAG가 정상적으로 종료되었다면, 초록색으로 표시가 된다.

사각형도 역시 마찬가지로 task를 나타내는 것으로 각 task들의 상태를 볼 수 있다.

이제 task의 실행 log를 확인해보도록 하자

bash task에 해당하는 사각형을 눌러 log를 살펴보자.

bash task에 대한 것을 확인하고 log를 눌러 작성한 task가 맞는지 정상적으로 동작했는지 확인이 가능하다

 

log를 살펴보면 위쪽은 설정값으로 어떤게 들어가 있는지에 대해 확인할 수 있으며, 작성한 bash_command, 실행결과까지 나온것을 확인할 수 있다.

 

마찬가지로 python task의 결과도 확인해보자.

python task의 결과에서도 Hello airflow가 잘 출력되는 것을 확인할 수 있다.

 

이렇게 4개의 글에 걸쳐 Airflow를 실행하고 DAG를 작성하는 방법까지 알아보았다.

Airflow에 대한 원래의 계획은 Airflow에서 제공하는 Executor인 CeleryExecutor 사용을 위해 RabbitMQ와의 연동과 테스트까지를 계획하였으나, Docker에 대한 지식 부족과 초반 단계 Docker Container에 대한 잘못된 설정으로 삽질은 여기까지만 진행한다.

Airflow 공식문서에서 Docker를 이용해 Airflow를 설치하는 것에 대한 내용이 되어 있으니, 해당 내용을 이용해 다시 설정을 진행할 예정이다.

airflow.apache.org/docs/apache-airflow/stable/start/docker.html

 

Running Airflow in Docker — Airflow Documentation

 

airflow.apache.org

추가적으로 Operator에 대한 예제와 작성한 DAG에 대한 설명 등은 블로그에 계속 작성하고,

Docker에 대한 공부를 진행해 CeleryExecutor까지 적용하는 것도 추후 진행할 예정이다.

Comments