IT 삽질기

Airflow SlackOperator 사용하기 본문

BigData/Airflow

Airflow SlackOperator 사용하기

화이팅빌런 2021. 9. 16. 22:51

이번 글에서는 Airflow에서 SlackOperator을 사용하는 방법에 대해서 알아보도록 하자.

이 글에서는 Slack App은 이미 생성되어 있다고 가정하며,

Slack App을 생성하는 방법은 이전 글을 참고하기 바란다.

2021.09.11 - [개발] - Slack App 만들어 메세지 보내기

Airflow Slack provider 설치하기

Airflow 2.x 버전을 사용하는 경우 slack provider을 설치해야 하며, 관련 정보는 Airflow 공식 문서에서 얻을 수 있다.

https://airflow.apache.org/docs/apache-airflow-providers-slack/stable/index.html

아래의 명령어를 이용해 slack provider을 설치한다.

pip install apache-airflow-providers-slack[http]

Connection 설정하기

설치가 완료되면 Airflow에서 Connection을 설정한다.

Admin - Connections에서 설정할 수 있으며, slack.operator에 있는 slack_webhook을 사용할 것이므로 http_conn을 설정한다.

https://airflow.apache.org/docs/apache-airflow-providers-slack/stable/_api/airflow/providers/slack/operators/slack_webhook/index.html

Conn Tyep는 HTTP로 설정한 후, Host에는 slack App을 만들 때 사용한 Webhook URL을 그대로 사용한다.

SlackOperator 사용하기

이제 준비는 모두 완료되었다. SlackWebhookOperator을 사용하기 위해 SlackWebhookOperator을 import 하고,

SlackWebhookOperator을 사용하기 위한 최소 파라미터 값인 task_id, http_conn_id, message를 입력한 후 테스트를 진행한다. http_conn_id는 위의 Connection에서 설정한 Conn Id 값와 동일하게 설정한다.

추가적인 옵션은 공식 페이지에서 확인하기 바란다.

https://airflow.apache.org/docs/apache-airflow-providers-slack/stable/_api/airflow/providers/slack/operators/slack_webhook/index.html

from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
send_slack_message = SlackWebhookOperator(
    task_id='send_slack',
    http_conn_id='slack_webhook',
    message='success'
)

전체 코드는 아래의 예제와 같으며, 정상적으로 실행이 완료된 경우 slack에서 메세지를 확인할 수 있다.

 

https://github.com/dydwnsekd/airflow_example/blob/main/dags/SlackWebhookOperator_example.py

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from airflow.utils.dates import days_ago

from datetime import datetime, timedelta
import requests
import logging
import json

dag = DAG(
    dag_id = 'slack_test',
    start_date = days_ago(1),
    max_active_runs = 1,
    catchup = False,
    schedule_interval="@once"
)

send_slack_message = SlackWebhookOperator(
    task_id='send_slack',
    http_conn_id='slack_webhook',
    message='Hello slack',
    dag=dag
)

send_slack_message

 

참고자료

https://airflow.apache.org/docs/

'BigData > Airflow' 카테고리의 다른 글

Airflow Xcom 사용하기  (0) 2021.09.24
Airflow Example DAG 제거하기  (0) 2021.09.19
Airflow 서비스 등록하기  (0) 2021.09.04
Airflow Local Executor와 Celery Executor  (0) 2021.08.20
Airflow Sensor 사용법  (0) 2021.06.21
Comments