IT 삽질기

Airflow KakaoWork bot 사용하기 본문

BigData/Airflow

Airflow KakaoWork bot 사용하기

화이팅빌런 2021. 6. 19. 21:07
해당 글은 KakaoWorkOperator, Hook의 내용이 변경되면 계속해서 수정될 예정이며, 업데이트 과정에서 코드와 사용 예가 동작하지 않을 수 있으니 양해 부탁드립니다.

수정 내용
2021-06-16 최초 문서 작성
2021-06-19 KakaoWorkHook 관련 내용 수정 및 사용 설명 수정, 실사용 예 제거 추후 추가 예정

Airflow KakaoWorkOperator

지난번 글에서도 언급한 것처럼 Airflow에서 기본적으로 제공하는 업무용 협업툴에는 slack, discord가 있지만 teams, kakaowork 등에 대한 지원은 하지 않는다. 따라서 custom을 통해 사용해야 하는데 지난번 글에서는 teams webhook과 연동하는 것을 알아보았다.

2021.06.03 - [BigData/Airflow] - Airflow TeamsWebHook 사용하기

 

KakaoWorkOperator은 이전에 만들었던 TeamsWebHook를 활용하여 개발을 진행하였으며 공식문서를 참고해 block를 사용할 수 있도록 진행중에 있다. kakaowork에 대한 자세한 내용은 아래의 공식문서를 참고하기 바란다.

https://docs.kakaoi.ai/kakao_work/

KakaoWorkHook

개발한 KakaoWorkHook에 대해 살펴보자.

https://github.com/dydwnsekd/airflow_example/blob/main/customOperator/KakaoWorkHook.py

import json
import warnings
from typing import Optional
 
import requests
from requests.auth import HTTPBasicAuth
 
from airflow.exceptions import AirflowException
from airflow.providers.http.hooks.http import HttpHook
 
class KakaoWorkhookHook(HttpHook):
    def __init__(
        self,
        http_conn_id=None,
        text='',
        block=False,
        *args,
        **kwargs,
    ):
        super().__init__(http_conn_id=http_conn_id, *args, **kwargs)
        conn = self.get_connection(http_conn_id)
        self.Kakao_URl = conn.host
        self.conversation_id = conn.extra_dejson.get('conversation_id')
        self.text = text
        self.block = block
        self.block_list = []
 
        app_key = conn.extra_dejson.get('app_key')
        self.headers = {
            "Authorization": "Bearer {app_key}".format(app_key=app_key),
            "Content-Type": "application/json"
        }
     
 
    def makeKakaoMessage(self, text):
        if not self.block:
            return json.dumps({
                "conversation_id": "{conversation_id}".format(conversation_id=self.conversation_id),
                "text": "{text}".format(text=text)
            })
        else:
            return json.dumps({
                "conversation_id": "{conversation_id}".format(conversation_id=self.conversation_id),
                "text": "{text}".format(text=text),
                "blocks": self.block_list
            })
 
    def kakaoBlocksMaker(
        self,
        type='text',
        text='text',
        style='default',
        image_url='https://airflow.apache.org/docs/apache-airflow/stable/_images/pin_large.png',
        action_type='open_system_browser',
        value="localhost:8080"
    ):
     
        if type == 'text':
            self.block_list.append({
                "type": "text",
                "text": "{text}".format(text=text),
                "markdown": False
            })
 
        elif type == 'button':
            # style : ['default', 'primary', 'danger']
            self.block_list.append({
                "type": "button",
                "text": "{text}".format(text=text),
                "style": "{style}".format(style=style),
                "action_type": "{action_type}".format(action_type=action_type),
                "value": "{value}".format(value=value)
            })
         
        # header을 사용하는 경우 제일 상위에서만 사용 가능
        # kakaoBlocksMaker에서 제일 먼저 add 필요
        elif type == 'header':
            # style : ['blue', 'red', 'yellow']
            style = 'blue'
            self.block_list.append({
                "type": "header",
                "text": "{text}".format(text=text),
                "style": "{style}".format(style=style)
            })
             
        elif type == 'image_link':
            self.block_list.append({
                "type": "image_link",
                "url": "{url}".format(url=image_url)
            })
 
    def sendMessage(self, data) -> None:
        r = requests.post(self.Kakao_URl, data=data, headers=self.headers)
 
    def execute(self) -> None:
        data = self.makeKakaoMessage(self.text)
        r = requests.post(self.Kakao_URl, data=data, headers=self.headers)

hook에서는 메세지의 기본인 text뿐만 아니라 block를 이용한 header, button, text, image_link에 대한 기능을 제공한다.

현재 사용할 수 있는 block의 종류와 파라미터는 아래와 같으니 참고하기 바란다.

type 함수명 parameter 설명 기본값
text textBlock() text 전달할 text message 기본값 text
buttton buttonBlock() text button에 적힐 text값 text
style 버튼 스타일 default, primary, danger로 설정 가능 default
action_type 버튼 클릭시 수행할 작업 open_system_browser
action_name 사용자가 어떤 것을 눌렀는지 구분하기 위해 사용  
value 버튼 클릭 시 수행할 정보
json, query 등의 형식 지원
localhost:8080
header headerBlock() text header에 들어갈 text값 text
style 색상선택으로 blue, red, yellow 사용 가능 blue
imagelink imagelinkBlock() url 이미지 url 설정 airflow 로고

KakaoWorkOperator

다음으로 kakaoOperator에 대해서 살펴보자.

https://github.com/dydwnsekd/airflow_example/blob/main/customOperator/KakaoWorkOperator.py

from typing import Any, Dict, Optional
 
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.utils.decorators import apply_defaults
from custom_operator.kakaowork.KakaoHook import KakaoWorkhookHook
 
class kakaoWorkOperator(SimpleHttpOperator):
 
    @apply_defaults
    def __init__(
        self,
        *,
        http_conn_id: str,
        text: str,
        block: bool = False,
        **kwargs,
    ) -> None:
        super().__init__(**kwargs)
        self.http_conn_id = http_conn_id
        self.text = text
        self.block = block
        self.hook: Optional[KakaoWorkhookHook] = None
 
 
    def execute(self, context: Dict[str, Any]) -> None:
        self.hook = KakaoWorkhookHook(
            self.http_conn_id,
            self.text,
            self.block,
        )
        self.hook.execute()

operator은 간단하게 작성했다.

operator을 사용하는 경우 text만 사용할 수 있게 구현이 되어 있어 간단하게 text만 사용하기를 원하는 경우 Operator을 사용이 가능하다.

'BigData > Airflow' 카테고리의 다른 글

Airflow Local Executor와 Celery Executor  (0) 2021.08.20
Airflow Sensor 사용법  (0) 2021.06.21
Airflow TeamsWebHook 사용하기  (0) 2021.06.03
Airflow Variables  (0) 2021.05.27
Airflow dag_run.conf 사용하기  (2) 2021.05.26
Comments