실행할 작업들의 순서를 구성한 워크플로우(WorkFlow)는 AirFlow에서 DAG 이라는 형태로 사용한다.
이번에는 DAG에서 수행하는 작업을 의미하는 Operator에 대해 정리한다.
DAG 소개와 기본 구조는 아래 포스팅을 참고한다.
AirFlow의 소개와 구조는 아래 포스팅을 참고한다.
Binami에서 배포한 AirFlow Docker 이미지를 사용하는 방법은 아래 포스팅을 참고한다.
AirFlow에서 Task
DAG을 구성하는 작업 단위를 Task라고 하며, DAG이 수행할 작업들을 의미한다.
하나 또는 여러 개의 Task를 연결해서 DAG을 생성하며, Task에는 Operator, Sensor, Hook이 있다.
Task는 Bash, Python을 포함해서 다양한 작업을 수행할 수 있다.
아래 사이트를 참고한다.
또한 잘 알려진 서비스나 오픈소스 백엔드에 대한 작업을 수행할 수 있도록 다양한 Providers Package를 제공한다.
아래 사이트를 참고한다.
Providers Package의 경우 pip를 사용해서 설치 가능하다.
아래 기술 문서를 참고한다.
Operator
Operator는 task를 어떻게 실행시킬지를 나타낸다.
하나의 워크플로우 안에서 하나의 테스크를 나타낸다.
Sensor와 Hook도 있지만 일반적으로 Operator를 대부분 사용한다.
Operator는 Action Operator와 Transfer Operator로 구분된다.
-
Action Operator는 작업을 수행하거나 다른 시스템에 작업을 수행하도록 지시한다.
-
Transfer Operator는 특정 시스템에서 다른 시스템으로 데이터를 이동시킨다.
AirFlow에서 제공하는 기본 Operator의 종류는 Bash와 Python을 포함해서 상당히 많다.
자세한 사항은 아래 기술 문서를 참고한다.
참고로 _operator로 끝나는 Operator들은 대부분 Defrecated 되었다.
대신 이름에서 _operator를 제외한 Operator를 사용하면 된다.
Operator에 공통적으로 **kwargs라는 Keyword Arguments 를 전달하는 부분이 있다.
해당 부분은 DAG을 정의할 때 정의했던 default_args가 전달된다고 이해하면 된다.
자세한 내용은 아래 포스팅을 참고한다.
일반적으로 많이 사용하는 주요 Operator만 살펴본다.
Dummy Operator
아무 작업을 하지 않는 Operator이다.
보통 시작과 종료를 나타내거나, 다른 작업들을 그룹화하는데 사용한다.
클래스 정의
다음과 같이 정의되어 있다.
# 클래스 경로
# airflow.operators.dummy
class DummyOperator(**kwargs)
자세한 사항은 아래 페이지를 참고한다.
사용 예제
다음과 같이 사용한다.
from airflow.operators.dummy import DummyOperator
from airflow.operators.bash import BashOperator
from airflow import DAG
from datetime import datetime
dag = DAG(dag_id="testdag", start_date=datetime(2022, 4, 7))
task1 = BashOperator(task_id="task2", bash_command="echo task1", dag=dag)
### DummyOperator 예제
start = DummyOperator(task_id="start", dag=dag)
end = DummyOperator(task_id="end", dag=dag)
start >> task1 >> end
다른 작업을 그룹화 하는 경우 다음과 같이 사용한다.
DAG과 TaskGroup을 with와 함께 사용하면 계층적 구성이 가능하다.
각 Operator에 dag를 지정하는 전달 인자를 사용하지 않은 것에 주목한다.
from airflow.operators.dummy import DummyOperator
from airflow.operators.bash import BashOperator
from airflow.utils.task_group import TaskGroup
from airflow import DAG
from datetime import datetime
with DAG(dag_id="testdag", start_date=datetime(2022, 4, 7)) as testdag:
start = DummyOperator(task_id="start")
end = DummyOperator(task_id="end")
# START TaskGroup Example
with TaskGroup(group_id="group1", tooltip="Tasks for group1") as group1:
task1 = DummyOperator(task_id="task1")
task2 = BashOperator(task_id="task2", bash_command="echo test_group")
task3 = DummyOperator(task_id="task3")
task1 >> [task2, task3]
start >> group1 >> end
위의 코드를 도식화하면 아래와 같다.

Bash Operator
Bash Shell 스크립트를 실행하는 Operator이다.
리눅스 명령어 실행도 가능하며 프로그램 실행도 가능하다.
실행하려는 작업을 프로그램으로 작성한 경우, Bash Opearator로 실행할 수 있기 때문에 중요하다.
클래스 정의
다음과 같이 정의되어 있다.
# 클래스 경로
# airflow.operators.bash
class BashOperator(
bash_command: str,
env: Optional[Dict[str, str]] = None,
output_encoding: str = "utf-8",
skip_exit_code: int = 99,
cwd: str = None,
**kwargs)
전달 인자는 다음과 같다.
구분
|
타입
|
기본값
|
내용
|
bash_command
|
str
|
|
실행할 Bash 명령어
|
env
|
Optional[Dict[str, str]]
|
None
|
Dict 형태의 환경 변수 전달
|
output_encoding
|
str
|
"utf-8"
|
Bash 명령어 출력값 인코딩 방식
|
skip_exit_code
|
int
|
99
|
명령어 결과인 exit_code로 성공 여부 확인 여부
None이면 0이 아닌 모든 값은 실패로 간주
|
cwd
|
str
|
None
|
명령어가 실행 될 경로 지정
None이면 임시 경로에서 실행
|
provide_context
|
bool
|
False
|
True로 설정하면 Task 인스턴스의 attribute들을 kwargs로 사용 가능
BaseOperator에 정의 되어 있다
|
명령어 실행 이후 exit_code로 명령어의 실행이 정상으로 종료되었는지 확인하려면 skit_exit_code를 사용한다.
특정 경로에서 명령어 실행을 원하는 경우 cwd를 사용한다.
자세한 사항은 아래 페이지를 참고한다.
사용 예제
다음과 같이 사용한다.
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow import DAG
from datetime import datetime
dag = DAG(dag_id="testdag", start_date=datetime(2022, 4, 7))
start = DummyOperator(task_id="start", dag=dag)
end = DummyOperator(task_id="end", dag=dag)
### BashOperator 예제
test_bash = BashOperator(
task_id = "test_bash",
bash_command = "echo 'This is the message'",
output_encoding="utf-8",
dag=dag)
start >> test_bash >> end
환경 변수를 전달하고자 하는 경우에는 다음과 같이 사용한다.
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow import DAG
from datetime import datetime
dag = DAG(dag_id="testdag", start_date=datetime(2022, 4, 7))
start = DummyOperator(task_id="start", dag=dag)
end = DummyOperator(task_id="end", dag=dag)
### BashOperator 예제
test_bash = BashOperator(
task_id = "test_bash",
bash_command = "echo 'This is the ds: \'$msg\''",
env = { "msg": '{{ dag_run.conf.ds | d("ds not found!") }}' },
output_encoding="utf-8",
dag=dag)
start >> test_bash >> end
그냥 실행하면 ds라는 환경 변수가 존재하지 않아 ds not found! 메시지가 출력된다.
문자열을 d()로 감싼 것에 주의한다.

dag_run의 ds가 존재하는 경우 해당 값을 출력한다.
실행할 때 해당 값을 전달해야 한다. 우측의 ▶ 버튼을 클릭하고 Trigger DAG w/ config 버튼을 클릭한다.

아래와 같이 Dictionary 포맷으로 ds 값을 정의하고, 아래의 Trigger 버튼을 클릭한다.

로그를 확인해보면 전달한 값이 정상적으로 출력되는 것을 확인할 수 있다.

Python Operator
파이썬 코드 (.py)를 실행하기 위한 Operator이다.
일반적으로 자동화 스크립트를 파이썬으로 작성하는 경우가 많다.
이러한 파이썬 코드들은 해당 Operator를 통해 실행할 수 있다.
whl로 배포한 프로그램의 경우 BashOperator를 통해 실행이 가능하다.
클래스 정의
다음과 같이 정의되어 있다.
# 클래스 경로
# airflow.operators.python
class PythonOperator(
python_callable: Callable,
op_args: Optional[Collection[Any]] = None,
op_kwargs: Optional[Mapping[str, Any]] = None,
templates_dict: Optional[Dict] = None,
templates_exts: Optional[List[str]] = None,
**kwargs
)
전달 인자는 다음과 같다.
구분
|
타입
|
기본값
|
내용
|
python_callable
|
Callable
|
|
호출할 파이썬 개체 (함수, 클래스)
|
op_args
|
Optional[Collection[Any]]
|
None
|
파이썬 개체에 전달할 명시적 (explict) 인자
|
op_kwargs
|
Optional[Mapping[str, Any]]
|
None
|
파이썬 개체에 전달할 키워드 인자 (krawgs)
|
templates_dict
|
Optional[Dict]
|
None
|
AirFlow 엔진이 생성한 템플릿으로 전달되는
Dictionary 타입의 값 목록 |
templates_exts
|
Optional[List[str]]
|
None
|
템플릿 필드를 처리하는 동안 확인할 파일 확장자 목록
|
provide_context
|
bool
|
False
|
True로 설정하면
Task 인스턴스의 attribute들을 kwargs로 사용 가능 BaseOperator에 정의 되어 있다
|
python_callable에서 DAG 코드 외부의 함수나 클래스를 사용하는 경우, DAG 코드 상단에 import 되어 있어야 한다.
자세한 사항은 아래 페이지를 참고한다.
참고로 template_dict 값은 Dictionary 타입의 Jinja Template으로 전달 된다.
DAG과 Task 실행에 관한 정보들이 전달된다.
자세한 사항은 아래 페이지를 참고한다.
사용 예제
다음과 같이 사용한다.
from airflow.operators.python import PythonOperator
from airflow.operators.dummy import DummyOperator
from airflow import DAG
from datetime import datetime
import time
dag = DAG(dag_id="testdag", start_date=datetime(2022, 4, 7))
start = DummyOperator(task_id="start", dag=dag)
end = DummyOperator(task_id="end", dag=dag)
### PythonOperator 예제
def test_function(sleep_time):
"""This is a function that will run within the DAG execution"""
print("Start sleep {sleep_time} seconds!".format(sleep_time=sleep_time))
time.sleep(sleep_time)
print("End sleep {sleep_time} seconds!".format(sleep_time=sleep_time))
test_bash = PythonOperator(
task_id = "test_python",
python_callable=test_function,
op_kwargs={"sleep_time": 10},
dag=dag)
start >> test_bash >> end
그 외 주요 Operator
제공하는 Operator가 많아서 특징만 살펴본다.
구분
|
클래스 경로
|
내용
|
EmailOperator
|
airflow.operators.email
|
이메일 발송
|
BranchPythonOperator
|
airflow.operators.branch
|
파이썬 실행 결과에 따른 분기를 설정하는 Operator
설정과 실행 Method로 구분
|
ShortCircuitOperator
|
airflow.operators.python
|
bool 조건에 맞을 때만 실행
bool 연산 로직은 python_callable로 전달
|
PythonVirtualenvOperator
|
airflow.operators.python
|
Python 가상 환경에서 실행
|
GenericTransfer
|
airflow.operators.generic_transfer
|
Database의 데이터를 다른 Database로 복사
|
SubDagOperator
|
airflow.operators.subdag
|
특정 DAG을 SubDag으로 실행
|
TriggerDagRunOperator
|
airflow.operators.trigger_dagrun
|
지정한 DAG 실행
|
각 Operator의 사용법은 아래 기술 문서를 참고한다.
'::: IT인터넷 :::' 카테고리의 다른 글
AirFlow DAG의 DAG Runs, Task, TaskFlow (0) | 2022.06.23 |
---|---|
AirFlow의 Sensor 이해하기 (0) | 2022.06.20 |
AirFlow DAG 소개와 기본 구조 (0) | 2022.06.13 |
AirFlow의 계정 생성, 권한 관리 방법 (CLI) (99) | 2022.06.09 |
AirFlow의 계정 생성, 권한 관리 방법 (WebUI) (0) | 2022.06.06 |