::: IT인터넷 :::

AirFlow의 DAG을 파이썬 패키지로 구성하기 (2)

곰탱이푸우 2022. 7. 7. 08:20
DAG를 파이썬 패키지로 구성하는 방법에 대해 정리한다.
  1. 기능 정의
  2. 프로젝트 생성
  3. DAG 코드 작성
  4. 기능 코드 작성 (1)
  5. 기능 코드 작성 (2)
  6. 테스트 코드 작성
  7. 패키지 정의
  8. 테스트
  9. 형상 관리, 빌드, 배포
 
기능 정의, 프로젝트 생성, DAG 코드 작성은 아래 포스팅을 참고한다.
기능 코드는 항목이 많아서 나눠서 정리한다.
먼저 __init__.py, info.py, initializer.py, callback.py에 대해 살펴본다.
 
 

기능 코드 작성

기본적인 DAG 구성을 마쳤으니 DAG의 설정과 실행에 필요한 기능들을 정의한다.
 

__init__.py

DAG 코드 작성의 __init__.py 항목을 참고한다.
 
아래 포스팅을 참고한다.

info.py

패키지의 이름과 버전을 정의한다.
패키지 작성하거나 수정할 때 가장 먼저 수정해야 하는 값이다.
__package_name__ = 'srtest-airflow'
__version__ = '1.0.0'
 
개발 중인 버전이므로 버전 뒤에 dev를 붙이는 것이 좋다.
예를 들어 최초 작성 버전이 1.0.0이면 1.0.0.dev와 같은 형식이다.
테스트를 위한 예제이므로 dev 없이 진행한다.
기능 코드에서 사용할 일이 없으면 setup.py 파일과 같은 경로에 위치하는 것이 좋다.
만약 기능 코드에서 패키지 이름이나 버전을 사용해야 한다면 기능을 정의한 폴더에 위치하는 것이 편리하다.
 

initializer.py

파이썬 패키지에 포함 된 DAG와 필요한 코드들을 AirFlow의 dags 폴더에 복사한다.
 
Bitnami에서 배포한 Docker 이미지인 경우 해당 경로는 BITNAMI_PKG_EXTRA_DIRS 환경 변수에 정의되어 있다.
해당 환경 변수의 값은 /opt/bitnami/airflow/dags이다.
 
컨테이너 내부의 환경 변수를 사용하여 AirFlow에 DAG 파일을 등록한다.
따라서 해당 작업은 AirFlow 웹서버 컨테이너 내부에서 실행하는 것을 가정하고 작성한다.
 
 
임포트와 함수 정의
DAG 코드에서 사용할 라이브러리를 임포트하고 함수를 정의한다.
import argparse 
import os 
import shutil 

def initializer():   
    """ 
    pip install 을 통해 airflow 를 설치하고 나면,   
    dag파일(srtest_pipeline.py)과 srtestairflow 폴더를 airflow에 복사하는 과정이 필요하다 
    """
 
각 라이브러리들의 용도는 다음과 같다.
  • argparse - 실행 옵션 처리
  • os - 경로 조작
  • shutil - 파일 복사
 
실행 옵션 처리
명령어를 실행할 때 지정한 옵션을 처리한다.
argparse 라이브러리를 사용한다.
def initializer():
    ... 중략 ...

    parser = argparse.ArgumentParser() 
    subparsers = parser.add_subparsers(dest="mode")

    subparser = subparsers.add_parser("initialize") 
    subparser.set_defaults(cmd="initialize")

    args = parser.parse_args()

    if args.mode is None:
        parser.parse_args(['-h'])
 
예제이기 때문에 AirFlow의 경로에 DAG과 구성 파일들을 복사하는 initialize 옵션만 정의했다.
필요에 따라 DAG과 구성 파일들을 해당 파일들을 제거하는 옵션을 추가해도 좋을 것 같다.
 
옵션을 지정하지 않고 명령어만 실행한 경우 mode에는 None이 전달된다.
옵션을 지정하지 않은 경우 도움말을 출력하고 종료한다.
 
 
대상 경로 생성
실행 옵션이 initialize인 경우에 대해 조건문으로 구분한다.
 
해당 부분에서는 원본 폴더의 경로와 복사할 대상 폴더의 경로를 생성한다.
def initializer():
    ... 중략 ...

    elif args.mode == "initialize":   
        # 현재 실행 중인 파이썬 코드의 실제 경로(절대 경로)에서 폴더 경로만 추출 
        srtest_folder = os.path.dirname(os.path.realpath(__file__))   
        dags_folder = os.path.join(os.path.dirname(srtest_folder), "dags") 
     
        # airflow 경로에 srtestairflow 폴더를 복사하기 위한 경로 설정   
        # /opt/bitnami/airflow/dags/srtest 
        dst_folder = os.path.join(os.environ.get("BITNAMI_PKG_EXTRA_DIRS"), "srtestairflow") 
        dst_dags = os.environ.get("BITNAMI_PKG_EXTRA_DIRS") 
     
        # 해당 환경 변수가 없으면 AirFlow 기본 경로(AIRFLOW_HOME)를 활용하여 dags 경로 생성 
        if dst_dags is None:     
            dst_dags = os.path.join(os.environ.get("AIRFLOW_HOME"), "dags") 
       
        # dags 폴더는 AirFlow의 기본 구성 폴더이므로 없으면 예외 발생 
        if not os.path.exists(dst_dags): 
            raise FileNotFoundError(f"dags_dst_folder is not exist: {dst_dags}!")
 
각 경로들의 의미는 다음과 같다.
구분
내용
srtest_folder
파이썬 패키지에 포함 된 srtestairflow 폴더 경로
dags_folder
파이썬 패키지에 포함 된 dags 폴더 경로
dst_folder
AirFlow dags 폴더 하위의 srtestairflow 경로
dst_dags
AirFlow dags 폴더 경로
 
Bitnami에서 배포한 Docker 이미지의 BITNAMI_PKG_EXTRA_DIRS 환경 변수 경로는 /opt/bitnami/airflow/dags 이다.
해당 환경 변수가 없으면 AirFlow 기본 Docker 이미지가 제공하는 AIRFLOW_HOME 환경 변수를 사용한다.
실제 경로는 /opt/bitnami/airflow 이다.
 
마지막에는 AirFlow의 dags 경로가 실제로 존재하는지 확인하고 없으면 예외를 발생시킨다.
AirFlow의 DAG은 기본 생성되어 있는 dags 폴더에 정의하도록 되어 있다.
따라서 해당 폴더가 없으면 AirFlow 환경이 아니거나 문제가 있는 것으로 판단해야 한다.
 
DAG 기능 코드 복사
DAG 패키지에서 DAG의 기능을 정의한 폴더와 코드들을 AirFlow dags 폴더 하위의 srtestairflow경로에 복사한다.
def initializer():
    ... 중략 ...

    elif args.cmd == "initialize":
        ... 중략 ...

        ## 기능 코드 폴더 복사
        # DAG의 기능 코드 폴더가 설치될 경로 확인 
        if os.path.exists(dst_folder) is False:   
            try: 
                # 파일 복사 
                shutil.copytree(srtest_folder, dst_folder) 
       
            # 복사 도중 오류가 발생할 경우 cleanup 과정 수행 
            except AttributeError: 
                if os.path.exists(dst_folder): 
                    shutil.rmtree(dst_folder) 
                   
                print(f"Copy failed!! : {dst_folder}") 
        else: 
            raise FileExistsError(f"dst_folder already exists: {dst_folder}!")

 

만약 폴더가 존재할 경우 기존 코드의 가용성을 유지하기 위해 예외를 발생시킨다.
기존 파일들을 삭제하는 기능 구현을 하지 않았으므로, 기존 파일들은 수동으로 삭제해야 한다.

 

 
DAG 파일 복사
파이썬 패키지에 포함 된 dags 폴더의 파일을 AirFlow의 dags 폴더에 복사한다.
dags 폴더 파일 중에 확장명이 .py인 파일들만 복사한다.
해당 폴더 내에 .py 파일이 없으면 예외를 발생시킨다.
def initializer():
    ... 중략 ...

    elif args.cmd == "initialize":
        ... 중략 ...

        ## DAG 파일 복사
        files = os.listdir(dags_folder)
        # 확장명이 .py가 아닌 파일은 제외 
        files = list(filter(lambda x: x.endswith(".py"), files)) 
     
        # 복사할 DAG 파일이 없는 경우   
        if files is False: 
            raise FileNotFoundError(f"DAG file is not exist!") 
     
        # dag 파일들을 /opt/bitnami/airflow/dags 경로에 복사   
        for filename in files:   
            file_path = os.path.join(dags_folder, filename) 
         
            try: 
                shutil.copy(file_path, dst_dags) 
            except AttributeError: 
                dst_file = os.path.join(dst_dags, filename) 
                print(f"Copy failed!! : {dst_file}") 
     
        print("Initialize is completed!")
 
예외 처리
예제 코드에서 실행 옵션은 initializer만 설정했다.
해당 옵션이  전달되지 않은 경우에는 실행 옵션에 대한 도움말을 출력하고 종료한다.
def initializer():
    ... 중략 ...

    elif args.cmd == "initialize":
        ... 중략 ...
    else:
        parser.parse_args(['-h'])
 
예제 코드에서는 AirFlow의 DAG에 등록하는 옵션만 구현했다.
 
필요에 따라 AirFlow에서 제거하는 옵션을 추가해도 좋을 것 같다.
해당 패키지의 버전이 변경되는 경우 기존 파일들을 수작업으로 제거하지 않고 자동으로 할 수 있다.
 
 

callback.py

특정 Task가 성공하거나 실패한 경우 Callback 형태로 Python 클래스나 함수를 호출 할 수 있다.
일반적으로 Task가 실패한 경우 이메일을 보내주는 함수를 구현해서 호출한다.
 
파이썬의 smtplib 라이브러리를 사용해서 메일 서버의 SMTP 방식으로 메일을 발송한다.
일반적으로 사용할 메일 서버가 없는 경우 Gmail의 메일 서버를 활용하면 된다.
 
설정 방법은 다음과 같다.
1. Gmail 설정 → 전달 및 POP/IMAP 탭 → IMAP 사용에 체크
2. Google 계정 관리 → 보안 → Google에 로그인 → 앱 비밀번호 → 생성 클릭
3. 출력 된 비밀번호 복사해서 메모장에 붙여넣기
4. 아래 코드의 app_password 부분에 복사한 비밀번호 붙여넣기
5. 아래 코드의 your@gmail.address 부분에 실제 gmail 주소 입력
 
이메일 계정과 앱 비밀번호는 노출되지 않도록 주의한다.
 
임포트와 클래스 정의
Task가 실패하거나 DAG 실행이 성공한 경우 메일을 보내는 기능을 정의한다.
사용하는 라이브러리는 다음과 같다.
  • smtplib - SMTP 메일 서버에 연결해서 메일 발송
  • MIMEText - 이메일의 헤더 처리
  • textwrap - 이메일의 본문 처리
 
실제 코드는 다음과 같다.
import smtplib
from email.mime.text import MIMEText
import textwrap

class SrTestCallback:
 
send_fail_mail
Task 실행이 실패한 경우 메일을 발송하는 함수이다.
Operator의 callback 인자에서 python_callable 형태로 호출 되기 때문에 classmehod 데코레이터를 사용했다.
 
해당 함수에서는 context에 전달 된 DAG과 Task 관련 정보들을 조합하여 이메일 본문을 만들고 발송한다.
Gmail로 SMTP 메일 발송하는 코드는 아래 코드의 가장 마지막 부분을 참고한다.
class SrTestCallback:
    @classmethod
    def send_fail_mail(cls, **context) -> None:
        """
        Operator 실행이 실패한 경우 AirFlow에 의해 자동 호출되는 함수

        :param context: 구동된 Operator에 대한 정보를 담고 있는 dictionary 
        https://airflow.apache.org/docs/stable/_api/airflow/models/taskinstance/index.html
        """
        # DAG과 Task에 대한 정보를 context에서 추출
        task_id = str(context['task_instance'].task_id)
        dag_id = str(context['task_instance'].dag_id)
        execution_date = str(context['task_instance'].execution_date)
        log_url = str(context['task_instance'].log_url)

        # 발송할 메일 본문 템플릿
        message_body = textwrap.dedent(f"""
            AirFlow 구동 중 실패 발생

            Task: {task_id}
            DAG: {dag_id}

            수집 시점: {execution_date}
            로그 url: {log_url}
        """)

        message  MIMEText(message_body.encode('utf-8'), _charset='UTF-8')
        message['Subject'] = f"[AirFlow] Task {task_id} Failed!"
        message['From'] = "no-reply@address.com"
        message['To'] = "your@email.address"

        # Gmail 발송 코드
        # 이메일 주소와 앱 비밀번호는 본인의 것으로 수정한다.
        sender = smtplib.SMTP('smtp.gmail.com', 587)
        sender.starttls()
        sender.login('your@gmail.address', 'app_password') 
        sender.sendmail(message['From'], message['To'], message.as_string())
        sender.quit()
 
 
send_finish_mail
DAG 실행이 성공한 경우 메일을 발송하는 함수이다.
Task 실패가 아니므로 Task 정보는 제외한다.
 
Operator의 callback 인자에서 python_callable 형태로 호출 되기 때문에 classmehod 데코레이터를 사용했다.
 
해당 함수에서는 context에 전달 된 DAG 관련 정보들을 조합하여 이메일 본문을 만들고 발송한다.
Gmail로 SMTP 메일 발송하는 코드는 send_fail_mail 코드의 가장 마지막 부분을 참고한다.
 
나머지는 send_fail_mail 함수와 동일하다.
class SrTestCallback:
    ... 중략 ...

    def send_finish_mail(cls, **context) -> None:
        ... 중략 ...

        # DAG에 대한 정보를 context에서 추출
        dag_id = str(context['task_instance'].dag_id)
        execution_date = str(context['task_instance'].execution_date)
        log_url = str(context['task_instance'].log_url).replace("http://", "https://")

        # 발송할 메일 본문 템플릿
        message_body = textwrap.dedent(f"""
            AirFlow 실행 성공!

            DAG: {dag_id}

            수집 시점: {execution_date}
            로그 url: {log_url}
        """)

        # 이메일 제목, 발신인, 수신인
        message = MIMEText(message_body.encode('utf-8'), _charset='UTF-8')
        message['Subject'] = f"[AirFlow] {dag_id} {execution_date[0:10]} Success!"

        ... 중략 ...