일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
- HDP
- yarn
- Windows
- python
- hadoop
- Kafka
- 람다 아키텍처
- slack app
- 정규표현식
- ambari
- Namenode
- HDFS
- docker
- slack
- Scala
- Service
- re
- java
- MapReduce
- HBase
- execution_date
- LDAP
- HIVE
- Example DAG
- airflow
- NoSQL
- Lambda architecture
- 빅데이터
- jupyter
- SlackWebhookOperator
- Today
- Total
IT 삽질기
Airflow SlackOperator 사용하기 본문
이번 글에서는 Airflow에서 SlackOperator을 사용하는 방법에 대해서 알아보도록 하자.
이 글에서는 Slack App은 이미 생성되어 있다고 가정하며,
Slack App을 생성하는 방법은 이전 글을 참고하기 바란다.
2021.09.11 - [개발] - Slack App 만들어 메세지 보내기
Airflow Slack provider 설치하기
Airflow 2.x 버전을 사용하는 경우 slack provider을 설치해야 하며, 관련 정보는 Airflow 공식 문서에서 얻을 수 있다.
https://airflow.apache.org/docs/apache-airflow-providers-slack/stable/index.html
아래의 명령어를 이용해 slack provider을 설치한다.
pip install apache-airflow-providers-slack[http]
Connection 설정하기
설치가 완료되면 Airflow에서 Connection을 설정한다.
Admin - Connections에서 설정할 수 있으며, slack.operator에 있는 slack_webhook을 사용할 것이므로 http_conn을 설정한다.
Conn Tyep는 HTTP로 설정한 후, Host에는 slack App을 만들 때 사용한 Webhook URL을 그대로 사용한다.
SlackOperator 사용하기
이제 준비는 모두 완료되었다. SlackWebhookOperator을 사용하기 위해 SlackWebhookOperator을 import 하고,
SlackWebhookOperator을 사용하기 위한 최소 파라미터 값인 task_id, http_conn_id, message를 입력한 후 테스트를 진행한다. http_conn_id는 위의 Connection에서 설정한 Conn Id 값와 동일하게 설정한다.
추가적인 옵션은 공식 페이지에서 확인하기 바란다.
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
send_slack_message = SlackWebhookOperator(
task_id='send_slack',
http_conn_id='slack_webhook',
message='success'
)
전체 코드는 아래의 예제와 같으며, 정상적으로 실행이 완료된 경우 slack에서 메세지를 확인할 수 있다.
https://github.com/dydwnsekd/airflow_example/blob/main/dags/SlackWebhookOperator_example.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
import requests
import logging
import json
dag = DAG(
dag_id = 'slack_test',
start_date = days_ago(1),
max_active_runs = 1,
catchup = False,
schedule_interval="@once"
)
send_slack_message = SlackWebhookOperator(
task_id='send_slack',
http_conn_id='slack_webhook',
message='Hello slack',
dag=dag
)
send_slack_message
참고자료
'BigData > Airflow' 카테고리의 다른 글
Airflow Xcom 사용하기 (0) | 2021.09.24 |
---|---|
Airflow Example DAG 제거하기 (0) | 2021.09.19 |
Airflow 서비스 등록하기 (0) | 2021.09.04 |
Airflow Local Executor와 Celery Executor (0) | 2021.08.20 |
Airflow Sensor 사용법 (0) | 2021.06.21 |