일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
- execution_date
- re
- Lambda architecture
- Namenode
- LDAP
- NoSQL
- hadoop
- 빅데이터
- yarn
- MapReduce
- java
- Service
- Scala
- SlackWebhookOperator
- ambari
- Example DAG
- HDFS
- HIVE
- airflow
- Kafka
- 정규표현식
- slack
- HDP
- slack app
- jupyter
- 람다 아키텍처
- python
- docker
- Windows
- HBase
- Today
- Total
IT 삽질기
Airflow Variables 본문
Airflow Variables란?
Variables은 Airflow 전역에서 사용할 수 있는 값을 미리 저장해두고 DAG에서 공통적으로 사용할 수 변수를 말한다.
WebServer UI에서 쉽게 설정이 가능하다.
Variables는 key-value 형식으로 구성되며, key값을 이용해 value를 사용하는 방식이다.
공식 문서는 아래와 같이 2개의 문서로 정리되어 있다.
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
https://airflow.apache.org/docs/apache-airflow/stable/howto/variable.html
Airflow Varialbes 설정하기
Webserver에서 Variables를 설정하는 방법에 대해서 알아보자.
Admin - Variable 메뉴로 들어가보자.
이미 등록된 Variable를 확인할 수 있으며 등록 또한 가능하다.
JSON으로 여러개의 Variable를 등록하는 것 또한 가능하다.
Import를 test할 JSON 파일은 아래와 같다.
https://github.com/dydwnsekd/airflow_example/blob/main/Variables.json
{
"a_user": "a",
"a_password": "password",
"a_secret": "secret",
"a_passwd": "passwd",
"a_authorization": "authorization",
"a_api_key": "api_key",
"a_apikey": "apikey",
"a_access_token": "access_token"
}
파일은 선택한 후 Import Variables을 눌러 Import를 진행한다.
Import가 완료되면 아래와 같이 Variables들이 등록된다.
여기서 몇몇 value들은 block 처리가 되어 WebServer에서 보여지게 되는데 이는 key값 때문이다.
key로 지정하면 value가 암호화되는 keyword는 아래와 같다.
- password
- secret
- passwd
- authorization
- api_key
- apikey
- access_token
위의 예제에서도 마찬가지로 해당 keyword가 포함되어 있는 value들만 block 처리 되었다.
이제 WebServer에서 등록하는 것도 진행해보자.
Variables List 상단에 + 버튼을 이용해 생성하면 된다.
Key-Value 형식으로 등록하면 된다.
새로운 값이 추가된 것을 확인할 수 있다.
Variables 사용하기
이제 등록 완료된 Variables을 사용해보도록 하자.
Variables를 사용하는 방법은 크게 2가지로 나뉘는데 예제 코드와 함께 살펴보도록 하자.
https://github.com/dydwnsekd/airflow_example/blob/main/dags/Variables.py
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
from airflow.models.variable import Variable
a_user = Variable.get("a_user")
a_password = Variable.get("a_password")
default_args = {
'start_date': days_ago(1),
}
dag = DAG(
'Varialbes_dag',
default_args=default_args,
schedule_interval="@once",
)
t1 = BashOperator(
task_id='variable_get',
bash_command='echo {a_user} / {a_password}'.format(a_user=a_user, a_password=a_password),
dag=dag,
)
t2 = BashOperator(
task_id='variable_jinja',
bash_command='echo {{ var.value.a_user }} / {{ var.value.a_password }}',
dag=dag,
)
t1 >> t2
먼저 t1에 대해서 살펴보도록 하자.
from airflow.models.variable import Variable
a_user = Variable.get("a_user")
a_password = Variable.get("a_password")
t1 = BashOperator(
task_id='variable_get',
bash_command='echo {a_user} / {a_password}'.format(a_user=a_user, a_password=a_password),
dag=dag,
)
Variable를 import 하여 Variable.get를 이용하는 방식으로 Variable.get("key")를 이용하여 사용한다.
다음으로 t2에 대해서 살펴보면 지난 포스팅에서 계속해서 나왔던 JinjaTemplate방식을 사용하면 된다.
2021.05.19 - [BigData/Airflow] - Airflow에서 Jinja template 사용하기
t2 = BashOperator(
task_id='variable_jinja',
bash_command='echo {{ var.value.a_user }} / {{ var.value.a_password }}',
dag=dag,
)
JinjaTemplate를 사용할 때는 {{ var.value.[key] }} 를 사용하면 된다.
위의 2가지 예제에서 모두 같은 값을 출력하도록 설정했다.
결과를 살펴보자.
t1
t2
위의 설정에서 한 것처럼
a_user의 값으로 a
a_password의 값으로 password가 출력되는 것을 볼 수 있다.
Variable는 DAG들에서 공통적으로 사용할 수 있는 값을 지정하는 것으로 Server에 대한 host명, password 등을 저장해놓고 사용하기에 용이할 것으로 판단되며, 이후 변경 작업이 발생하는 경우에도 Variable의 value 값만 변경해주면 되기 때문에 다양하게 활용할 수 있을 것 같다.
'BigData > Airflow' 카테고리의 다른 글
Airflow KakaoWork bot 사용하기 (0) | 2021.06.19 |
---|---|
Airflow TeamsWebHook 사용하기 (0) | 2021.06.03 |
Airflow dag_run.conf 사용하기 (2) | 2021.05.26 |
Airflow에서 Jinja template 사용하기 (0) | 2021.05.19 |
Docker를 이용한 Airflow 2.0.2 실행하기(1) (2) | 2021.05.13 |