IT 삽질기

Airflow TeamsWebHook 사용하기 본문

BigData/Airflow

Airflow TeamsWebHook 사용하기

화이팅빌런 2021. 6. 3. 20:17

Airflow - TeamsWebHook 사용하기

Airflow에서는 기본적으로 teams에 대한 Operator, Hook을 제공하지 않고 slack, discord에 대한 내용은 제공

작성일 기준 MS teams에 대한 package는 제공하지 않고 있어 사용을 위한 Operator을 custom 개발을 진행했다.

Teams에서 webhook으로 사용하는 커넥터는 아래와 같으며, 해당 커넥터에 대한 테스트가 완료되었다.

Custom Operator 개발하기

custom Operator 개발은 Hook과 Operator로 나누어 개발을 진행했으며, slack webhook 관련 Hook과 Operator을 참고했다.

teams메세지에서 button에 link를 연결하는 메세지를 보내는 것이 가능하지만 추가 개발 사항으로 포함시킬지는 아직 결정하지 않았다.

TeamshookHook

먼저 TeamshookHook class의 내용을 살펴보자.

HttpHook를 상속받아 사용하며, 메세지 생성하는 _build_teams_message와 실제 메세지를 전송하는 execute로 구성된다.

http_conn_id를 전달받아서 host명으로 데이터를 전달하기 때문에 connection을 추가해야하며, host를 webhook url로 설정하면 된다

해당 파일의 소스는 아래의 링크에서 확인할 수 있다. 

https://github.com/dydwnsekd/airflow_example/blob/main/customOperator/TeamshookHook.py

import json
import warnings
from typing import Optional

import requests
from requests.auth import HTTPBasicAuth

from airflow.exceptions import AirflowException
from airflow.providers.http.hooks.http import HttpHook

# channel, username, icon_emoji, icon_url, link_names, block
class TeamshookHook(HttpHook):
    def __init__(
        self,
        http_conn_id=None,
        title='',
        title_text='',
        activityTitle='',
        activitySubtitle='',
        message='',
        color='green',
        *args,
        **kwargs,
    ):
        print(http_conn_id)
        super().__init__(http_conn_id=http_conn_id, *args, **kwargs)
        self.conn = self.get_connection(http_conn_id)
        self.teams_url = self.conn.host
        self.title = title
        self.title_text = title_text
        self.activityTitle = activityTitle
        self.activitySubtitle = activitySubtitle
        self.message = message
        self.color = color
        self.color_dict = {'red': 'FF0000', 'green': '00FF00', 'blue': '0000FF'}
        

    def _build_teams_message(self) -> str:
        return {
            "themeColor": self.color_dict[self.color],
            "title": self.title,
            "text": self.title_text,
            "sections" : [
                {
                    "activityTitle": self.activityTitle,
                    "activitySubtitle": self.activitySubtitle,
                    "activityImage":"",
                    "text": self.message
                }
            ]
        }

    def execute(self) -> None:
        teams_message = self._build_teams_message()
        # self.run(
        #     endpoint='',
        #     data=teams_message,
        #     headers = {"cache-control": "no-cache"},
        #     extra_options={'check_response': False},
        # )

        headers = {"cache-control": "no-cache"}
 
        r = requests.post(self.teams_url, json=teams_message, headers=headers)

TeamsWebhookOperator

SimpleHttpOperator를 상속받아 구현했으며, TeamsWebhookOperator는 TeamshookHook을 사용하기 위해 parameter을 전달받고, 메세지를 생성해 전달하는 방식이다.

https://github.com/dydwnsekd/airflow_example/blob/main/customOperator/TeamsWebhookOperator.py

from typing import Any, Dict, Optional

from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.utils.decorators import apply_defaults
from custom_operator.teams.TeamsHook import TeamshookHook

class TeamsWebhookOperator(SimpleHttpOperator):

    @apply_defaults
    def __init__(
        self,
        *,
        http_conn_id: str,
        title: str='',
        title_text: str='',
        activityTitle: str='',
        activitySubtitle: str='',
        message: str = '',
        color: str = 'green',
        **kwargs,
    ) -> None:
        super().__init__(**kwargs)
        self.http_conn_id = http_conn_id
        self.title = title
        self.title_text = title_text
        self.activityTitle = activityTitle
        self.activitySubtitle = activitySubtitle
        self.message = message
        self.color = color
        self.hook: Optional[TeamshookHook] = None

    def execute(self, context: Dict[str, Any]) -> None:
        self.hook = TeamshookHook(
            self.http_conn_id,
            self.title,
            self.title_text,
            self.activityTitle,
            self.activitySubtitle,
            self.message,
            self.color,
        )
        self.hook.execute()

이제 사용법에 대해 알아보도록 하자.

 

connection 만들기

사용을 위해선 먼저 connection을 만들어야 한다.

Admin - Connections 에서 신규 connections를 생성한다.

 

생성하며 Conn id, Conn Type, Host만 지정하면 된다.

Conn Id는 다른 conn id와 다른 값으로 원하는 값으로 지정하면 된다.

Conn Type는 Hook에서 Http conn id로 사용하기로 했으므로 HTTP 형식으로 사용한다.

Host가 중요한 부분인데 teams web hook url을 확인하면 된다.

실행 dag 만들기

테스트를 위해 만들어진 dag는 아래와 같다.

TeamsWebhookOperator을 사용하기 위한 import를 진행하고 Operator을 실행하는 task를 만들어 실행하면 된다.

import 부분에 대해서만 간단하게 알아보자.

airflow의 dag들은 airflow.cfg에 설정된 dag_folder에 저장이 되며, custom으로 사용하는 class들 역시 dag_folder 하위에 존재하면 import하여 사용할 수 있다.

아래의 dag에서 customOperator.teams.TeamsOperator로 import를 한 이유는 디렉토리 구조가 아래와 같이 구성되어 있기 때문이다.

${dag_folder}
  ㄴ customOperator
      ㄴ teams
          ㄴ TeamsOperator
# -*- coding:utf-8 -*-
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
import requests
from requests.auth import HTTPBasicAuth

from customOperator.teams.TeamsOperator import TeamsWebhookOperator
 
default_args= {
    'retries': 0,
    'catchup': False,
    'retry_delaty': timedelta(minutes=5),
}
 
dag = DAG(
    'webhook_test_dydwnsekd',
    default_args=default_args,
    start_date=days_ago(1),
    schedule_interval="@once"
)
 
t1 = TeamsWebhookOperator(
    task_id='teams_test',
    http_conn_id='webhook_dydwnsekd',
    title='webhook title!',
    title_text='안녕하세요',
    message='메세지 보내기 성공!',
    color='green',
    dag=dag,
)
 
t1

 

결과 확인

dag 실행의 결과는 teams에서 확인할 수 있다.

'BigData > Airflow' 카테고리의 다른 글

Airflow Sensor 사용법  (0) 2021.06.21
Airflow KakaoWork bot 사용하기  (0) 2021.06.19
Airflow Variables  (0) 2021.05.27
Airflow dag_run.conf 사용하기  (2) 2021.05.26
Airflow에서 Jinja template 사용하기  (0) 2021.05.19
Comments