일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | |||||
3 | 4 | 5 | 6 | 7 | 8 | 9 |
10 | 11 | 12 | 13 | 14 | 15 | 16 |
17 | 18 | 19 | 20 | 21 | 22 | 23 |
24 | 25 | 26 | 27 | 28 | 29 | 30 |
- 정규표현식
- 람다 아키텍처
- LDAP
- HDP
- MapReduce
- 빅데이터
- hadoop
- Lambda architecture
- Namenode
- Example DAG
- Windows
- HBase
- ambari
- jupyter
- SlackWebhookOperator
- execution_date
- slack app
- Service
- HDFS
- airflow
- re
- HIVE
- java
- Kafka
- Scala
- slack
- docker
- yarn
- NoSQL
- python
- Today
- Total
IT 삽질기
Airflow KakaoWork bot 사용하기 본문
해당 글은 KakaoWorkOperator, Hook의 내용이 변경되면 계속해서 수정될 예정이며, 업데이트 과정에서 코드와 사용 예가 동작하지 않을 수 있으니 양해 부탁드립니다.
수정 내용
2021-06-16 최초 문서 작성
2021-06-19 KakaoWorkHook 관련 내용 수정 및 사용 설명 수정, 실사용 예 제거 추후 추가 예정
Airflow KakaoWorkOperator
지난번 글에서도 언급한 것처럼 Airflow에서 기본적으로 제공하는 업무용 협업툴에는 slack, discord가 있지만 teams, kakaowork 등에 대한 지원은 하지 않는다. 따라서 custom을 통해 사용해야 하는데 지난번 글에서는 teams webhook과 연동하는 것을 알아보았다.
2021.06.03 - [BigData/Airflow] - Airflow TeamsWebHook 사용하기
KakaoWorkOperator은 이전에 만들었던 TeamsWebHook를 활용하여 개발을 진행하였으며 공식문서를 참고해 block를 사용할 수 있도록 진행중에 있다. kakaowork에 대한 자세한 내용은 아래의 공식문서를 참고하기 바란다.
https://docs.kakaoi.ai/kakao_work/
KakaoWorkHook
개발한 KakaoWorkHook에 대해 살펴보자.
https://github.com/dydwnsekd/airflow_example/blob/main/customOperator/KakaoWorkHook.py
import json
import warnings
from typing import Optional
import requests
from requests.auth import HTTPBasicAuth
from airflow.exceptions import AirflowException
from airflow.providers.http.hooks.http import HttpHook
class KakaoWorkhookHook(HttpHook):
def __init__(
self,
http_conn_id=None,
text='',
block=False,
*args,
**kwargs,
):
super().__init__(http_conn_id=http_conn_id, *args, **kwargs)
conn = self.get_connection(http_conn_id)
self.Kakao_URl = conn.host
self.conversation_id = conn.extra_dejson.get('conversation_id')
self.text = text
self.block = block
self.block_list = []
app_key = conn.extra_dejson.get('app_key')
self.headers = {
"Authorization": "Bearer {app_key}".format(app_key=app_key),
"Content-Type": "application/json"
}
def makeKakaoMessage(self, text):
if not self.block:
return json.dumps({
"conversation_id": "{conversation_id}".format(conversation_id=self.conversation_id),
"text": "{text}".format(text=text)
})
else:
return json.dumps({
"conversation_id": "{conversation_id}".format(conversation_id=self.conversation_id),
"text": "{text}".format(text=text),
"blocks": self.block_list
})
def kakaoBlocksMaker(
self,
type='text',
text='text',
style='default',
image_url='https://airflow.apache.org/docs/apache-airflow/stable/_images/pin_large.png',
action_type='open_system_browser',
value="localhost:8080"
):
if type == 'text':
self.block_list.append({
"type": "text",
"text": "{text}".format(text=text),
"markdown": False
})
elif type == 'button':
# style : ['default', 'primary', 'danger']
self.block_list.append({
"type": "button",
"text": "{text}".format(text=text),
"style": "{style}".format(style=style),
"action_type": "{action_type}".format(action_type=action_type),
"value": "{value}".format(value=value)
})
# header을 사용하는 경우 제일 상위에서만 사용 가능
# kakaoBlocksMaker에서 제일 먼저 add 필요
elif type == 'header':
# style : ['blue', 'red', 'yellow']
style = 'blue'
self.block_list.append({
"type": "header",
"text": "{text}".format(text=text),
"style": "{style}".format(style=style)
})
elif type == 'image_link':
self.block_list.append({
"type": "image_link",
"url": "{url}".format(url=image_url)
})
def sendMessage(self, data) -> None:
r = requests.post(self.Kakao_URl, data=data, headers=self.headers)
def execute(self) -> None:
data = self.makeKakaoMessage(self.text)
r = requests.post(self.Kakao_URl, data=data, headers=self.headers)
hook에서는 메세지의 기본인 text뿐만 아니라 block를 이용한 header, button, text, image_link에 대한 기능을 제공한다.
현재 사용할 수 있는 block의 종류와 파라미터는 아래와 같으니 참고하기 바란다.
type | 함수명 | parameter | 설명 | 기본값 |
text | textBlock() | text | 전달할 text message 기본값 | text |
buttton | buttonBlock() | text | button에 적힐 text값 | text |
style | 버튼 스타일 default, primary, danger로 설정 가능 | default | ||
action_type | 버튼 클릭시 수행할 작업 | open_system_browser | ||
action_name | 사용자가 어떤 것을 눌렀는지 구분하기 위해 사용 | |||
value | 버튼 클릭 시 수행할 정보 json, query 등의 형식 지원 |
localhost:8080 | ||
header | headerBlock() | text | header에 들어갈 text값 | text |
style | 색상선택으로 blue, red, yellow 사용 가능 | blue | ||
imagelink | imagelinkBlock() | url | 이미지 url 설정 | airflow 로고 |
KakaoWorkOperator
다음으로 kakaoOperator에 대해서 살펴보자.
https://github.com/dydwnsekd/airflow_example/blob/main/customOperator/KakaoWorkOperator.py
from typing import Any, Dict, Optional
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.utils.decorators import apply_defaults
from custom_operator.kakaowork.KakaoHook import KakaoWorkhookHook
class kakaoWorkOperator(SimpleHttpOperator):
@apply_defaults
def __init__(
self,
*,
http_conn_id: str,
text: str,
block: bool = False,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.http_conn_id = http_conn_id
self.text = text
self.block = block
self.hook: Optional[KakaoWorkhookHook] = None
def execute(self, context: Dict[str, Any]) -> None:
self.hook = KakaoWorkhookHook(
self.http_conn_id,
self.text,
self.block,
)
self.hook.execute()
operator은 간단하게 작성했다.
operator을 사용하는 경우 text만 사용할 수 있게 구현이 되어 있어 간단하게 text만 사용하기를 원하는 경우 Operator을 사용이 가능하다.
'BigData > Airflow' 카테고리의 다른 글
Airflow Local Executor와 Celery Executor (0) | 2021.08.20 |
---|---|
Airflow Sensor 사용법 (0) | 2021.06.21 |
Airflow TeamsWebHook 사용하기 (0) | 2021.06.03 |
Airflow Variables (0) | 2021.05.27 |
Airflow dag_run.conf 사용하기 (2) | 2021.05.26 |