::: IT인터넷 :::

AirFlow의 DAG을 파이썬 패키지로 구성하기 (4)

곰탱이푸우 2022. 7. 14. 08:20
DAG를 파이썬 패키지로 구성하는 방법에 대해 정리한다.
  1. 기능 정의
  2. 프로젝트 생성
  3. DAG 코드 작성
  4. 기능 코드 작성 (1)
  5. 기능 코드 작성 (2)
  6. 테스트 코드 작성
  7. 패키지 정의
  8. 테스트
  9. 형상 관리, 빌드, 배포

 

기능 코드 작성은 아래 포스팅을 참고한다.
기능 정의, 프로젝트 생성, DAG 코드 작성은 아래 포스팅을 참고한다.
구현한 기능을 검증하는 코드 작성과 테스트,  형상 관리, 빌드, 배포 방법에 대해 정리한다.
 
 

테스트 코드 작성

기능 구현이 완료되면 정상적으로 동작하는지 확인하기 위한 테스트 코드를 작성한다.
구현한 기능을 테스트하는 개발 방법을 테스트 주도 개발 (TDD, Test Driven Development)이라고 한다.
TDD는 아래 문서에 잘 정리가 되어 있으므로 참고한다.
아래 코드들은 모두 tests 폴더 하위에 위치한다.
 

srtest-config/config.yml

파이썬 예제 코드에서 다뤘던 테스트 코드 작성의 srtest-config/config.yml 항목과 동일하다.
 
아래 포스팅의 srtest-config/config.yml 기능 부분을 참고한다.

__init__.py

DAG 코드 작성의 __init__.py 항목을 참고한다.
 
아래 포스팅을 참고한다.
 

test_emailclient.py

전체적인 구조와 설명은 파이썬 예제 코드에서 다뤘던 내용과 거의 동일하다.
 
아래 포스팅의 test_emailclient.py 항목을 참고한다.
아래는 변경 사항 위주로 정리한다.
import 부분
Display 함수에서 Arrow 타입으로 날짜 처리를 했으므로, arrow 라이브러리를 임포트한 것에 주목한다.
import pytest # 파이썬 유닛 테스트 (설치 필요) 
import os # 파이썬의 파일과 경로 처리 (파이썬 기본 제공) 
import yaml # yaml 포맷 처리 (설치 필요) 
import textwrap # 여러 행의 문자열을 처리 (파이썬 기본 제공) 
import arrow  # 날짜 처리 (설치 필요)

# emailclient.py에 구현한 설정 파일 처리 (직접 구현) 
from srtestairflow.emailclient import EmailClient
 
테스트 클래스 정의
DAG 코드에서 display Task를 정의할 때 Jinja Templating으로 DAG의 execution_date를 넘겨주었다.
테스트 코드에서 DAG의 execution_date를 얻을 수 없다.
 
따라서 테스트 하기 위해서는 동일한 타입을 가지는 값을 임의로 생성해서 사용한다.
이러한 값을 Mock 또는 Mock Object라고 한다.
 
테스트에서는 임의로 execution_date를 생성해서 Arrow 타입으로 변환해서 사용한다.
class TestEmailClient:
    @pytest.fixture(scope="function", autouse=True)
    def setup_teardown(self):
        """
        config.yml 파일을 읽고 dictionary 타입으로 저장한다.
        """
        test_folder = os.path.dirname(os.path.realpath(__file__))
        self.test_conf_file = os.path.join(test_folder, "resources", "config.yml")

        with open(self.test_conf_file, 'r', encoding='utf-8') as yaml_file:
            self.yaml_dict = yaml.load(yaml_file, Loader=yaml.Loader)

        self.mock_context = {'ts': "2020-03-25T10:00:00+00:00"}
        self.today = arrow.get(self.mock_context['ts']).to("Asia/Seoul")
 
전체적인 내용은 파이썬 예제 코드의 테스트 클래스 정의 항목을 참고한다.
 
test_set_curr_time  함수
해당 함수가 반환한 값이 setup_teardown 함수에서 생성한 Arrow 타입의 today 변수와 같은지 확인한다.
마지막에 값을 비교하는 부분이다.
class TestEmailClient:
    ... 중략 ...

    def test_set_curr_time(self):
        """
        set_curr_time을 통해 현재 시각을 제대로 기록하는지 확인한다.
        """
        test_cls = EmailClient()
        test_cls.set_curr_time(self.today)

        assert test_cls.curr_time == self.today
 
전체적인 내용은 파이썬 예제 코드의 test_set_curr_time  함수 항목을 참고한다.
 
test_set_email_info  함수
setup_teardown 함수에서 정의한 today 값을 전달해서 set_curr_time 함수를 호출한다.
그리고 set_email_info 함수를 호출해서 반환한 값이 today 변수의 날짜와 동일한지 확인한다.
 
해당 함수가 반환한 값이 setup_teardown 함수에서 생성한 Arrow 타입의 today 변수와 같은지 확인한다.
class TestEmailClient:
    ... 중략 ...

    def test_set_email_info(self):
        """
        config.yml에서 읽은 이메일 관련 정보가 예상대로 나오는지 확인한다.
        """
        test_cls = EmailClient()
        test_cls.set_curr_time(self.today)
        message = test_cls.set_email_info(self.test_conf_file)

        subject = message['Subject']
        expected_subject = f"test-mail"
        assert subject == expected_subject

        sender = message['From']
        expected_sender = f"no-reply@address.com"
        assert sender == expected_sender

        receiver = message['To']
        expected_receiver = f"your@email.address"
        assert receiver == expected_receiver

        mail_body = message['Body']
        expected_mail_body = textwrap.dedent(f"""
            2020-03-25에 테스트를 수행했습니다.

            From bearpooh.com auto-mailing
            """)
        assert mail_body == expected_mail_body
 
전체적인 내용은 파이썬 예제 코드의 test_set_curr_time  함수 항목을 참고한다.
 
test_applying_magic_keyword 함수
테스트에 사용할 연월일 값을 setup_teardown 함수에서 정의한 Arrow 타입의 today 변수에서 추출한다.
 
추출한 값을  _apply_value_to_magic_keyword 함수에 전달해서 결과를 비교한다.
class TestEmailClient:
    ... 중략 ...

    def test_applying_magic_keyword(self):
        """
        yml의 year, month, date를 정상적으로 치환하는지 확인한다.
        """
        payload = self.yaml_dict['common']['email']
     
        test_year = str(self.today.year)
        test_month = str(self.today.month).zfill(2)
        test_day = str(self.today.day).zfill(2)
     
        result_dict = EmailClient._apply_value_to_magic_keyword(dictionary=payload,
            year=test_year, month=test_month, day=test_day)
     
        expected_result = f"2020-03-25"
     
        assert result_dict['date'] == expected_result
 
전체적인 내용은 파이썬 예제 코드의 test_applying_magic_keyword 함수 항목을 참고한다.

test_srtest.py

전체적인 구조와 설명은 파이썬 예제 코드에서 다뤘던 test_srtest.py 내용과 거의 동일하다.
 
아래 포스팅의 test_srtest.py 항목을 참고한다.
아래는 변경 사항 위주로 정리한다.
 
 
import 부분
테스트 하면서 파일을 임시 경로에 생성할 것이므로 tempfile 라이브러리의 gettempdir 함수를 임포트한다.
import pytest
import os
import shutil
from srtestairflow.srtest import SrTest
from tempfile import gettempdir  # 임시 경로 (/tmp) 사용
import textwrap
 
테스트 클래스 정의
DAG 코드에서 display Task를 정의할 때 Jinja Templating으로 DAG의 execution_date를 넘겨주었다.
테스트 코드에서 DAG의 execution_date를 얻을 수 없다.
 
따라서 테스트 하기 위해서는 동일한 타입을 가지는 값을 임의로 생성해서 사용한다.
이러한 값을 Mock 또는 Mock Object라고 한다.
 
테스트에서는 임의로 Dictionary 타입의 mock_context 를 생성해서 DAG의 context를 대체한다.
class TestSrTest:
    @pytest.fixture(scope="function", autouse=True)
    def setup_teardown(self):
        """
        config.yml 파일을 읽고 dictionary 타입으로 저장한다.
        """
        base_folder = os.path.dirname(os.path.realpath(__file__))
        self.base_conf_file = os.path.join(base_folder, "resources", "config.yml")

        self.test_folder = os.path.join(gettempdir(), "srtestairflow")
        self.test_dst_folder = os.path.join(self.test_folder, "resources")
        self.test_conf_file = os.path.join(self.test_dst_folder, "config.yml")
       
        self.mock_context = {'ts': "2020-03-25T10:00:00+00:00"}
 
전체적인 내용은 파이썬 예제 코드의 테스트 클래스 정의 항목을 참고한다.
 
test_init 함수
setup_teardown 함수에서 정의한 파일 경로 변수를 전달해서 init 함수를 호출한다.
 
파일이 존재하지 않는 경우 의도한 예외가 발생하는지 확인한다.
또한 init 함수가 정상 동작 했을때 파일과 폴더가 생성되었는지 확인한다.
 
테스트가 종료되면 테스트를 위해 생성한 파일을 삭제한다.
해당 파일은 finalizer 함수를 구현해서 호출해도 된다.
class TestSrTest:
    ... 중략 ...

    def test_init(self):
        """
        init 함수가 정상적으로 동작하는지 확인한다. 
        """
        test_cls = SrTest()

        # 파일이 존재하는 경우 FileExistsError 예외 발생
        with pytest.raises(FileExistsError):
            test_cls.init(self.base_conf_file, self.base_conf_file)

        # init 함수 동작 테스트 (파일 복사 여부 확인)
        test_cls.init(self.base_conf_file, self.test_conf_file)
        assert os.path.exists(self.test_conf_file) is True
        assert os.path.exists(self.test_dst_folder) is True

        # 테스트를 위해 생성한 폴더 삭제
        if os.path.exists(self.test_dst_folder):
            shutil.rmtree(self.test_dst_folder)
            print("test_init cleanup is completed!")

 

test_display 함수
setup_teardown 함수에서 정의한 파일 경로와 시간 값을 전달해서 display 함수를 호출한다.
 
파일이 존재하지 않는 경우 의도한 예외가 발생하는지 확인한다.
또한 display 함수가 정상 동작 했을때 반환된 값이 의도한 값과 동일한지 확인한다.
class TestSrTest:
    ... 중략 ...

    def test_display(self):
        """
        display 함수가 정상적으로 동작하는지 확인한다. 
        """
        test_cls = SrTest()

        # 파일이 없으면 경우 FileNotFoundError 예외 발생
        with pytest.raises(FileNotFoundError):
            test_cls.display(self.test_conf_file, self.test_conf_file, self.mock_context["ts"])

        # display 함수 동작 테스트 (날짜 변경 여부 확인)
        test_cls.display(self.base_conf_file, self.base_conf_file, self.mock_context["ts"])

        result = test_cls.result['Body']
        expected_result = textwrap.dedent(f"""
            2020-03-25에 테스트를 수행했습니다.

            From bearpooh.com auto-mailing
            """)
        assert result == expected_result
 
 
test_cleanup 함수
setup_teardown 함수에서 정의한 파일 경로를 전달해서 test_cleanup 함수를 호출한다.
 
파일나 폴더가 존재하지 않는 경우 의도한 예외가 발생하는지 확인한다.
또한 cleanup 함수가 정상 동작 했을 때 해당 경로가 삭제 되었는지 확인한다.
class TestSrTest:
    ... 중략 ...

    def test_cleanup(self):
        """ 
        cleanup함수가 정상적으로 동작하는지 확인한다.   
        """ 
        test_cls = SrTest() 

        # 폴더가 없으면 경우 FileNotFoundError 예외 발생 
        with pytest.raises(FileNotFoundError): 
            test_cls.cleanup(self.test_dst_folder) 

        # 파일이 없으면 경우 FileNotFoundError 예외 발생 
        with pytest.raises(FileNotFoundError): 
            test_cls.cleanup(self.test_conf_file) 

        # 테스트를 위한 데이터 생성 
        test_cls.init(self.base_conf_file, self.test_conf_file) 

        # cleanup 함수 동작 테스트 (파일 삭제 여부 확인) 
        test_cls.cleanup(self.test_conf_file) 
        assert os.path.exists(self.test_conf_file) is False 
        assert os.path.exists(self.test_dst_folder) is False 

        # 테스트를 위해 생성한 폴더 삭제 
        if os.path.exists(self.test_dst_folder): 
            shutil.rmtree(self.test_dst_folder) 
            print("test_cleanup cleanup is completed!")
 

패키지 정의

전체적인 내용은 파이썬 예제 코드의 패키지 정의 항목을 참고한다.
아래 내용은 변경 사항 위주로 정리한다.
 
 

requirement.txt

해당 패키지가 동작하는데 필요한 외부 라이브러리 패키지를 정의한다.
pip로 whl 파일을 설치할 때 해당 패키지가 함께 설치 된다.
--no-deps 옵션을 사용하면 requirement.txt에 정의 된 패키지들을 설치하지 않는다.
 
Docker 이미지가 AirFlow 2.2.3 버전을 사용하기 때문에 apache-airflow 2.2.3 버전을 사용한다.
해당 패키지는 AirFlow의 DAG과 Operator를 정의하기 위해 사용한다.
 
또한 날짜 정보를 변환하기 위한 arrow를 사용한다.
개발 당시 사용한 버전이 1.2.2 버전이었고, 테스트 결과 이상이 없었으므로 해당 버전으로 명시한다. 
apache-airflow==2.2.3
arrow==1.2.2
 

test_requirement.txt

해당 패키지의 단위 (유닛) 테스트를 수행할때 필요한 패키지를 정의한다.
 
배포를 위한 빌드 파이프라인을 구성할 때 유닛 테스트하기 전에 설치한다.
또는 로컬에서 개발 환경을 구성할 때도 사용한다.
requirement.txt와 동일하고 마지막에 pytest와 pytest-runner가 추가되었다.
apache-airflow==2.2.3
arrow==1.2.2
pytest-runner==6.0.0
pytest==7.1.1
 

테스트

로컬 테스트와 패키지 테스트를 진행한다.
 
로컬 테스트는 개발자 환경에서 정의한 기능이 테스트 코드를 통해 정상적으로 수행되는지 확인한다.
패키지 테스트는 로컬 환경에서 whl 포맷의 패키지 생성이 잘 되고 설치가 되는지 확인한다.
 
자세한 내용은 아래 포스팅의 테스트 항목을 참고한다.
 

형상 관리

작성한 코드는 GitLab과 같은 형상 관리 시스템에서 관리해야 한다.
 
프로젝트를 생성하고, .gitignore와 README.md 파일을 작성해서 생성한 저장소에 커밋해야 한다.
 
자세한 내용은 아래 포스팅의 형상 관리 항목을 참고한다.
GitLab의 사용 방법은 아래 포스팅을 참고한다.
 

빌드와 배포

형상 관리 시스템에 커밋한 코드는 패키지 파일로 빌드하여 저장소에 배포한다.
이전에 구성한 GitLab, Jenkins, Nexus를 사용하여 빌드 파이프라인을 구성한다.
그리고 해당 파이프라인을 통해 빌드하여 배포한다.
 
자세한 방법은 아래 포스팅을 참고한다.