일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
- Lambda architecture
- NoSQL
- re
- java
- LDAP
- Namenode
- airflow
- hadoop
- Kafka
- Service
- 빅데이터
- 람다 아키텍처
- python
- Example DAG
- SlackWebhookOperator
- jupyter
- Windows
- MapReduce
- slack
- Scala
- execution_date
- HBase
- slack app
- ambari
- HIVE
- HDP
- yarn
- docker
- 정규표현식
- HDFS
- Today
- Total
IT 삽질기
Airflow HiveOperator LDAP 연결 본문
지난 글에서는 HiveServer2 인증 방법으로 LDAP을 사용하는 경우 Airflow HiveServer2Hook를 사용하는 방법에 대해서 알아보았다.
2021.04.23 - [BigData/Airflow] - Airflow HiveServer2Hook LDAP 연결
이번 글에서는 HiveServer2 인증 방법으로 LDAP을 사용하는 경우
Airflow HiveOperator를 사용해 hive에 query을 날리는 방법에 대해서 알아보도록 하자
Airflow의 버전은 마찬가지로 2.0.0을 사용했다
먼저 HiveOperator의 코드를 살펴보자
github.com/apache/airflow/blob/master/airflow/providers/apache/hive/operators/hive.py
HiveOperator에서는 기본적인 설정값을 입력받고 HiveCliHook을 사용하는 것을 볼 수 있는데,
HiveCliHook의 코드도 함께 살펴보자.
github.com/apache/airflow/blob/master/airflow/providers/apache/hive/hooks/hive.py#L55
HiveCliHook에서는 beeline을 지원하고 사용하고자 하는 경우 connection의 extra field에 아래의 코드를 추가하면 된다고 명시되어 있다.
beeline를 통해 인증과정을 진행할 것으로 해당 옵션을 추가할 예정이다.
{"use_beeline": true}
beeline에 대한 추가적인 내용은 아래의 링크와 스크린샷을 참고하기 바란다
cwiki.apache.org/confluence/display/Hive/HiveServer2+Clients#HiveServer2Clients-BeelineCommands
다음으로 auth에 관한 부분을 살펴보자 auth값이 입력되어 있지 않으면 noSasl로 대체되도록 설계되어 있는데, noSasl로 설정하는 경우 인증과정을 거치지 않게 되므로 이 부분도 extra 부분에 추가해주도록 하자
{"auth": "LDAP"}
위의 내용을 기억한 후 connection을 만들어보자
Conn Type로는 Hive Client Wrapper을 사용한다
Extra에는 beeline를 사용할 것이기 때문에 use_beeline를 true로 설정하고 auth로 LDAP를 넣는다
위에서 언급한대로 auth값을 넣지 않는 경우 noSasl 값이 들어가기 때문에 auth에 값을 넣어주었지만 꼭 넣을 필요는 없이 빈값을 넣어주어도 되는 것 같다. 테스트를 진행한 후 해당 내용을 수정할 예정
{"use_beeline": true, "auth": "LDAP"}
Connection을 만들 때 LDAP에 로그인이 가능한 username과 password를 정확하게 입력하도록 하자
이제 DAG를 작성해보자
github.com/dydwnsekd/airflow_example/blob/main/dags/HiveOperator_auth_LDAP.py
from airflow import DAG
from datetime import datetime, timedelta
from airflow.providers.apache.hive.hooks.hive import *
from airflow.providers.apache.hive.operators.hive import HiveOperator
from airflow.utils.dates import days_ago
default_args= {
'start_date': days_ago(1),
'retries': 0,
'catchup': False,
'retry_delay': timedelta(minutes=0),
}
dag = DAG(
'HiveOperator_test',
default_args=default_args,
schedule_interval="@once"
)
hql='''
CREATE TABLE if not exists test_airflow.airflow(
id STRING,
name STRING
);
'''
t1 = HiveOperator(
task_id='HiveOperator_test',
hql=hql,
hive_cli_conn_id='hive_cli_connect',
run_as_owner=True,
dag=dag,
)
실행 결과는 아래와 같다
설정한 내용과 같이 server, username, password가 정상적으로 들어갔는지 확인하고,
정상적으로 실행되는지 확인하면 된다.
'BigData > Airflow' 카테고리의 다른 글
Docker를 이용한 Airflow 2.0.2 실행하기(1) (2) | 2021.05.13 |
---|---|
Airflow 2.0 설치하기(4) (0) | 2021.05.03 |
Airflow HiveServer2Hook LDAP 연결 (0) | 2021.04.23 |
Airflow2.x providers 설치하기 (0) | 2021.04.03 |
Airflow 2.0 설치하기(3) (0) | 2021.03.21 |