::: 데이터 분석 :::

Spark Application 패키지 작성하기 (5) - 테스트 작성과 빌드, 배포

곰탱이푸우 2023. 9. 18. 08:20
기본적인 Scala 개발환경이 구축되고 나면 목표로 했던 Spark Application을 작성한다.
 
앞서 테스트에 사용한 Spark Application은 main 함수에 기능을 구현한 단순한 예제 프로그램이다.
기존에 Hadoop 기반의 Spark 클러스터를 구축했으므로, Spark Application은 해당 환경에서 실행할 수 있는 형태로 작성해야 한다.
 
따라서 Spark Application 작성을 위한 기본 형태 (Skeleton) 제공을 위한 예제 프로그램을 재작성한다.
참고로 예전에 실무에서 활용했던 Spark Application 코드에서 재사용 또는 공개가 가능한 코드만 활용하여 정리하였다.
 
전체적인 내용은 아래 문서를 참고하고, 변경 사항을 중심으로 정리한다.
전체 내용은 아래와 같은 순서로 작성한다.
  • 전체 구조와 기본 설정
  • 헬퍼 (Helper) 기능 작성
  • 데이터 처리 로직 작성
  • 비즈니스 로직 작성
  • 테스트 작성과 빌드, 배포
 
이전 단계 내용들은 아래 문서를 참고한다.
 

테스트 작성

기능 정의가 모두 완료되면 정의한 기능들이 정상적으로 동작하는지 확인한다.
  • 데이터 처리 기능 로직이 정상적으로 동작하는지 확인한다.
  • 부가기능 (헬퍼) 코드는 확인이 필요하다고 판단되면 테스트 코드를 작성한다.
 
테스트에 사용할 미니 데이터를 준비해서 resource 폴더에 저장한다.
데이터  처리 기능 로직은 해당 테스트 데이터를 이용하여 테스트를 진행한다.
 

테스트 데이터 (log.json)

테스트 데이터는 간단한 예제 코드 만들기에 사용했던 log.json 파일을 활용한다.
간단한 Spark 예제 코드 만들기는 아래 문서를 참고한다.
대신 파일 경로를 src/resources 하위 폴더에서 test/resources/data/log/ 폴더로 변경한다.
최종 데이터 경로는 test/resources/data/log/log.json 이다.
 
log.json 파일은 아래와 같이 JSON 형태의 임의 로그 데이터이다.
{"id":"2688425498","type":"buy","created_at":"2013-08-13T22:58:08Z"}
{"id":"2688425496","type":"view","created_at":"2013-08-13T22:59:08Z"}
{"id":"2688425492","type":"view","created_at":"2013-08-13T23:01:08Z"}
{"id":"2688425491","type":"buy","created_at":"2013-08-13T22:02:08Z"}
{"id":"2688425493","type":"view","created_at":"2013-08-13T22:03:08Z"}
{"id":"2688425494","type":"view","created_at":"2013-08-13T22:06:08Z"}
{"id":"2688425495","type":"view","created_at":"2013-08-13T22:08:08Z"}
 
 

SparkExampleTest.scala

실제 Spark 데이터 처리 로직을 정의한 SparkExample 클래스의 테스트를 정의한다.
클래스에 포함 된 기능 메소드에 대해 아래 사항을 테스트한다.
  • 해당  메소드의 정상 동작 여부
  • 의도한 예외의 발생 여부
 
요구 조건을 충족하는 최소한의 미니 데이터를 사용하여 실제 기능을 호출하는 방식으로 테스트한다.
보통 해당 Application이 실행 될 HDFS의 실제 데이터 경로와 동일한 구조로 테스트 데이터 또는 환경을 구성한다.
 
테스트의 결과 확인은 아래 사항을 중심으로 확인한다.
  • 결과 파일의 생성 여부
  • 결과 데이터의 정상 여부 (컬럼, 타입, 레코드 수 등)
  • 예외 발생 여부
 

라이브러리 임포트

굳이 손으로 타이핑하지 않아도 된다.
아래 설명할 코드들을 작성할 때 IntelliJ에서 Alt + Enter로 손쉽게 추가할 수 있다.
 
SparkExampleTest.scala에서 사용하는 라이브러리들을 선언한다.
  • 테스트를 위해 생성한 임시 디렉토리 삭제를 위한 FileUtils
  • 하둡 경로의 예외 테스트를 위한 FileAlreadyExistsException (이미 존재), PathNotFoundException (없음) 
  • 로컬 경로 생성을 위한 Paths, 폴더 여부 속성의 확인을 위한 Files
  • 테스트를 위한 로컬 SparkSession과 결과 데이터를 읽기 위한 DataFrame
  • 행동 기반의 테스트 구현을 위한 AnyFlatSpec ("테스트 이름" should "만족할 조건" in { 테스트 }" 형태)
  • 기대 결과와 실제 결과를 비교하기 위한 Matchers (should, should not, must, can, in 등)
  • 전체 테스트 시작 전과 종료 후의 초기화, 후처리를 제어하는 BeforeAndAfterAll
  • 테스트 단위의 시작 전과 종료 후의 초기화, 후처리를 제어하는 BeforeAndAfterEach
  • 로컬 경로 제어와 속성 값 활용을 위한 File
  • 테스트의 임시 저장 경로에 포함 된 무작위 값 생성을 위한 randomUUID
  • 특정 기능 수행하면서 정상 실행 (예외 발생) 여부를 확인하기 위한 Try
 
코드는 다음과 같다.
package com.bearpooh.bdp.example.sparkexample.application

import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs.{FileAlreadyExistsException, PathNotFoundException}

import java.nio.file.{Files, Paths}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}

import java.io.Fi
import java.util.UUID.randomUUID
import scala.util.Try
 
임포트 구문을 소개하는 이유
굳이 손으로 입력하지 않아도 되는 부분임에도 내용을 설명하는 목적은 다음과 같다.
- 사용한 라이브러리 파악 가능
- 해당 코드의 기능을 예측
- 코드 이해를 위한 기반 지식으로 활용

이러한 이유로 코드 분석을 진행할때 Import 부분을 제일 먼저 확인한다.
Scala 뿐만 아니라 모든 언어에 해당하는 부분이다.

 

 

테스트 클래스 선언

SparkExample에 대한 테스트를 진행하는 SparkExampleTest 클래스를 선언한다.
extends와 with로 아래 trait 들을 믹스인한다.
  • AnyFlatSpec - 행동 기반의 테스트 구현 ("테스트 이름" should "만족할 조건" in { 테스트 }" 형태)
  • Matchers - 기대 결과와 실제 결과 비교 (should, should not, must, can, in 등)
  • BeforeAndAfterAll - 전체 테스트 시작 전과 종료 후의 초기화, 후처리 제어
  • BeforeAndAfterEach - 테스트 단위의 시작 전과 종료 후의 초기화, 후처리 제어
 
코드는 다음과 같다.
// 클래스 정의는 코드 길이로 인해 줄바꿈 하였으나 실제로는 한줄에 입력
class SparkExampleTest extends AnyFlatSpec with Matchers
      with BeforeAndAfterAll with BeforeAndAfterEach  {
  // .. 생략 ..
}
 
object는 상속이 불가능한 싱글톤 (Singleton) 객체이고,  class는 다중 상속이 불가능하다.
따라서 위와 같이 extend 이후에 여러 객체들이 with로 연결되는 경우 trait라고 생각해야 한다.
 

공통 변수 정의

테스트 클래스에서 공통적으로 사용하는 값들을 변수로 할당한다.
아래 값들을 생성한다.
  • SparkSession 인스턴스 - Spark을 로컬 모드로 실행 (로컬 경로의 데이터 사용)
  • SparkExample 인스턴스 - 데이터 처리 클래스의 기능 호출을 위한 인스턴스 생성 (SparkSession 전달)
  • 테스트 기대 결과 상수 변수 - 실제 기능이 실행된 결과 값이 같아야 하는 기댓값을 상수 변수로 정의
  • 테스트 데이터와 결과 저장 경로 - 테스트에 사용할 데이터의 로컬 경로 (읽을 경로, 저장 경로 순)
 
자세한 내용은 코드와 주석을 참고한다.
// 클래스 정의는 코드 길이로 인해 줄바꿈 하였으나 실제로는 한줄에 입력
class SparkExampleTest extends AnyFlatSpec with Matchers 
      with BeforeAndAfterAll with BeforeAndAfterEach  {
  // Spark Session
  private val master: String = "local"
  private val appName: String = "SparkExampleTest"

  lazy val spark: SparkSession = new SparkSession.Builder()
    .appName(appName)
    .master(master)
    .getOrCreate()

  // Instance creation and assignment
  val example: SparkExample = new SparkExample(spark)

  // Test Expected Result
  val NUM_OF_COLUMNS: Int = 3
  val NUM_OF_ROWS: Int = 7
  val NUM_OF_TYPE: Int = 2

  // Test Data Path
  val testSrcPath: String = Paths.get("src", "test", "resources", "data", "log").toString

  val tmpDirectory: String = System.getProperty("java.io.tmpdir")
  val baseDir: String = Paths.get(tmpDirectory, "sparkexample-test-" + randomUUID().toString).toString
  val testDstPath: String = Paths.get(baseDir, "srteam", "test-data", "log-data").toString

  // .. 생략 ..
}
 
개인적으로 공통적으로 사용하는 테스트의 기댓값 값은 상수 변수로 정의해서 사용하는 것을 선호한다.
단순히 숫자 값을 하드코딩 해놓으면 기댓값의 의미를 파악하기 어렵다.
따라서 간단한 의미 전달이 가능한 상수 변수를 사용하는 것이 좋다.
 
 

기능 테스트 준비와 후처리 정의

각 테스트 단위마다 테스트 전, 후에 수행할 작업들을 정의한다.
아래 기능들을 정의한다.
  • 수행할 작업의 함수 정의 (deleteFileRecursively)
  • 전체 테스트 수행 전, 후에 수행할 작업 (beforeAll, afterAll)
  • 개별 테스트 중간마다 수행할 작업 (afterEach)
 
beforeAll, afterAll, afterEach 기능들은 SparkExampleTest에 믹스인 (Mix-in)한 아래 trait에 의해 제공된다.
  • BeforeAndAfterAll
  • BeforeAndAfterEach
 
코드는 다음과 같다. 임시로 생성한 테스트 폴더를 삭제하는 역할을 수행한다.
// 클래스 정의는 코드 길이로 인해 줄바꿈 하였으나 실제로는 한줄에 입력
class SparkExampleTest extends AnyFlatSpec with Matchers 
      with BeforeAndAfterAll with BeforeAndAfterEach  {
  // .. 생략 ..

  // Delete test directory
  def deleteFileRecursively(testPath: String): Unit = {
    if (Files.isDirectory(Paths.get(testPath)))    // 지정 경로가 존재하고 디렉토리이면
      FileUtils.deleteDirectory(new File(testPath))    // 하위 경로 포함해서 삭제
  }

  // Called before test
  override def beforeAll(): Unit = {
    deleteFileRecursively(baseDir)
  }

  // Called between each test
  override def afterEach(): Unit = {
    deleteFileRecursively(baseDir)
  }

  // Called after test
  override def afterAll(): Unit = {
    deleteFileRecursively(baseDir)
  }

  // .. 생략 ..
}
 
 

기능별 테스트 작성

테스트 작성 준비가 완료되면 SparkExample에 구현한 기능의 테스트를 작성한다.
  • 전체 레코드 개수 확인 기능
  • JSON을 Parquet로 저장하는 기능
 
개인적으로 기능의 테스트를 작성할 때에는 두가지 기준으로 작성한다.
  • 정상 실행 테스트 - 오류 없이 기능이 실행되면 의도한 결과와 일치해야 한다.
  • 예외 발생 테스트 - 예외를 직접 발생(throw) 시키는 경우, 특정 조건에서 해당 예외가 발생해야 한다.
 
테스트 작성은 아래 형식으로 작성한다.
  • "테스트명" should "만족조건" in {
      기능 호출
      기대 결과와 일치 여부 확인
    }
  • 테스트 정의에 사용하는 should와 in은 AnyFlatSpec 을 통해 제공된다.
  • 기대 결과와 일치 여부를 확인하는 shouldBe, should be 는 Matchers를 통해 제공된다.
 
전체 레코드 개수 확인 기능 테스트
전체 레코드 개수를 확인하는 countRecords 기능의 테스트를 진행한다.
아래 테스트를 구분하여 작성한다.
  • 정상 실행 테스트
  • 예외 발생 테스트

 

 

정상 실행 테스트
전체 레코드 개수를 확인하는 기능의 테스트를 진행한다.
기능 호출 부분을 Try로 감싸서 예외 발생 여부를 체크한다. 성공하면 true, 실패하면 false가 반환된다.
 
그 아래는 테스트 데이터를 읽고 컬럼수와 레코드수가 기대 값과 같은지 확인한다.
이 부분은 예제 목적으로 작성한 것으로 큰 의미는 없다.
작성한 기능이 데이터프레임 (DataFrame)을 반환하거나 결과 데이터를 저장하는 경우의 테스트 방법으로 참고한다.
 
실제 코드는 다음과 같다.
// 클래스 정의는 코드 길이로 인해 줄바꿈 하였으나 실제로는 한줄에 입력
class SparkExampleTest extends AnyFlatSpec with Matchers 
      with BeforeAndAfterAll with BeforeAndAfterEach  {
  // .. 생략 ..

  "countRecords Function Test" should "not raise Exception" in {
    val countResult: Try[Unit] = Try(example.countRecords(testSrcPath))

    countResult.isSuccess shouldBe true

    // 아래 테스트는 예제 목적으로 작성한 것으로 큰 의미는 없다.
    val testSrcDF: DataFrame = spark.read.json(testSrcPath)

    testSrcDF.columns.length shouldBe NUM_OF_COLUMNS
    testSrcDF.count shouldBe NUM_OF_ROWS
  }

  // .. 생략 ..
}

 

예외 발생 테스트
countRecords 기능에서 throw new 형태로 발생시킨 예외가 정상 동작하는지 확인한다.
예외 발생 조건의 함수 인자를 전달하고, 의도한 예외가 발생하는지 확인한다.
 
아래와 같은 형태로 작성한다.
  • an [예외명] sholud be thrownBy {
       기능클래스.함수명(함수인자)
    }
  • 코드 실행 중 발생한 예외가 테스트에 작성한 "예외명"과 동일하면 Pass한다.
  • 보통 전달한 인자의 값이 유효하지 않은 경우, 경로가 없거나 이미 존재하는 경우 등을 체크한다.
 
실제 코드는 다음과 같다.
// 클래스 정의는 코드 길이로 인해 줄바꿈 하였으나 실제로는 한줄에 입력
class SparkExampleTest extends AnyFlatSpec with Matchers 
      with BeforeAndAfterAll with BeforeAndAfterEach  {
  // .. 생략 ..

  "Parameter Exception Test for countRecords" should "raise Exception" in {
    an [IllegalArgumentException] should be thrownBy {
      example.countRecords("")
    }

    an [PathNotFoundException] should be thrownBy {
      example.countRecords(testDstPath)
    }
  }

  // .. 생략 ..
}

 

 
Parquet 포맷으로 저장 기능 테스트
Parquet 포맷으로 저장하는 convertParquet 기능의 테스트를 진행한다.
아래 테스트를 구분하여 작성한다.
  • 정상 실행 테스트
  • 예외 발생 테스트
 
정상 실행 테스트
JSON 파일을 읽고 Parquet 포맷으로 저장하는 기능의 테스트를 진행한다.
기능 호출 부분을 Try로 감싸서 예외 발생 여부를 체크한다. 성공하면 true, 실패하면 false가 반환된다.
 
그리고 아래 사항들을 확인한다.
  • 생성 된 경로의 디렉토리 여부, Parquet 파일 생성 여부
  • 결과 데이터를 읽고 컬럼수, 레코드수, 특정 컬럼의 고유 값 개수가 기대 값과 같은지 확인
 
실제 코드는 다음과 같다.
// 클래스 정의는 코드 길이로 인해 줄바꿈 하였으나 실제로는 한줄에 입력
class SparkExampleTest extends AnyFlatSpec with Matchers 
      with BeforeAndAfterAll with BeforeAndAfterEach  {

  // .. 생략 ..

  "convertParquet Function Test" should "not raise Exception" in {
    val convertResult: Try[Unit] = Try(example.convertParquet(testSrcPath, testDstPath))

    convertResult.isSuccess shouldBe true

    // 생성된 데이터 파일의 존재 여부와 개수 확인
    new File(testDstPath).isDirectory shouldBe true
    new File(testDstPath).listFiles.count(f => f.isFile && f.getName.endsWith(".parquet")) should be > 0

    // 생성된 데이터를 읽고 결과 확인
    val testDstDF: DataFrame = spark.read.parquet(testDstPath)

    testDstDF.columns.length shouldBe NUM_OF_COLUMNS
    testDstDF.count shouldBe NUM_OF_ROWS

    // 생성 된 데이터의 특정 컬럼 결과 확인
    val typeCount: Long = testDstDF.select("type").distinct.count
    typeCount shouldBe NUM_OF_TYPE
    typeCount should be < NUM_OF_ROWS.toLong
  }
  // .. 생략 ..
}
 
예외 발생 테스트
convertParquet 기능에서 throw new 형태로 발생시킨 예외가 정상 동작하는지 확인한다.
예외 발생 조건의 함수 인자를 전달하고, 의도한 예외가 발생하는지 확인한다.
 
아래와 같은 형태로 작성한다.
  • an [예외명] sholud be thrownBy {
       기능클래스.함수명(함수인자)
    }
  • 코드 실행 중 발생한 예외가 테스트에 작성한 "예외명"과 동일하면 Pass한다.
  • 보통 전달한 인자의 값이 유효하지 않은 경우, 경로가 없거나 이미 존재하는 경우 등을 체크한다.
 
실제 코드는 다음과 같다.
// 클래스 정의는 코드 길이로 인해 줄바꿈 하였으나 실제로는 한줄에 입력
class SparkExampleTest extends AnyFlatSpec with Matchers
      with BeforeAndAfterAll with BeforeAndAfterEach  {
  // .. 생략 ..

  "Parameter Exception Test for convertParquet" should "raise Exception" in {
    an[IllegalArgumentException] should be thrownBy {
      example.convertParquet("", testDstPath)
    }

    an[IllegalArgumentException] should be thrownBy {
      example.convertParquet(testSrcPath, "")
    }

    an[PathNotFoundException] should be thrownBy {
      example.convertParquet(testDstPath, testDstPath)
    }

    an[FileAlreadyExistsException] should be thrownBy {
      example.convertParquet(testSrcPath, testSrcPath)
    }
  }
}

 

 

Docstring 작성

테스트의 Docstring은 테스트 항목에 대한 내용을 설명하기 위한 주석을 의미한다.
따라서 기능 구현에 작성하는 Docstring과 약간의 차이가 있다.
- 테스트 항목과 테스트 조건 등을 설명한다.
- 테스트 코드의 가독성과 유지 보수 향상이 목적이다.
 
SparkExampleTest.scala는 데이터 처리 기능의 정상 동작 여부를 테스트하는 코드이기 때문에 Docstring을 작성한다.
Scala의 경우 클래스와 메소드 선언 구문의 바로 위에 작성한다. (파이썬은 선언 구문 바로 아래에 작성한다.)
 
또한 테스트 클래스와 정상 실행 테스트, 예외 발생 테스트에 따라 포맷에 약간 차이가 있다.
 
테스트 클래스
테스트 클래스에 대한 간략한 설명을 작성한다.
 
테스트 클래스의 Docstring 예시는 다음과 같다.
/**
* SparkExample 테스트 클래스다. local spark를 사용한다.
*/
// 클래스 정의는 코드 길이로 인해 줄바꿈 하였으나 실제로는 한줄에 입력
class SparkExampleTest extends AnyFlatSpec with Matchers 
      with BeforeAndAfterAll with BeforeAndAfterEach  {
  // .. 생략 ..
}
 
정상 실행 테스트
아래 항목에 대한 설명을 간략하게 작성한다.
  • 테스트 하려는 기능에 대한 간단한 설명
  • 테스트 통과 조건 (결과 기대값 설명)
 
기능 Docstring에 작성한 @param, @return, @throw 등은 작성하지 않는다.
 
정상 실행 테스트의 Docstring 예시는 다음과 같다.
// 클래스 정의는 코드 길이로 인해 줄바꿈 하였으나 실제로는 한줄에 입력
class SparkExampleTest extends AnyFlatSpec with Matchers
      with BeforeAndAfterAll with BeforeAndAfterEach  {
  // .. 생략 ..

  /**
  * JSON 데이터를 읽고 Parquet 포맷으로 저장하는 기능에 대한 테스트이다.
  * 1) 발생한 Exception이 없다.
  * 2) 생성된 데이터 경로가 존재하고 개수가 1개 이상이다.
  * 3) Parquet 데이터의 컬럼 개수가 3개이고 레코드수가 7개이다.
  * 4) type 컬럼의 중복 제거 개수가 2개이고, 7개 이하이다.
  */
  "convertParquet Function Test" should "not raise Exception" in {
    // .. 생략 .. 
  }

  // .. 생략 ..
}
 
 
예외 발생 테스트
아래 항목에 대한 설명을 간략하게 작성한다.
  • 테스트 하려는 기능에 대한 간단한 설명
  • 테스트 통과 조건 (예외 발생 조건과 발생하는 예외 설명)
 
기능 Docstring에 작성한 @param, @return, @throw 등은 작성하지 않는다.
 
예외 발생 테스트의 Docstring 예시는 다음과 같다.
// 클래스 정의는 코드 길이로 인해 줄바꿈 하였으나 실제로는 한줄에 입력
class SparkExampleTest extends AnyFlatSpec with Matchers 
      with BeforeAndAfterAll with BeforeAndAfterEach  {
  // .. 생략 ..

  /**
  * JSON 데이터를 읽고 Parquet 포맷으로 저장하는 기능에 잘못 된 입력이 전달 된 경우에 대한 테스트이다.
  * 예외 발생 조건의 입력 값을 전달하고 정상적으로 예외가 발생하는지 확인한다.
  * 1) 데이터의 경로가 전달되지 않으면 IllegalArgumentException이 발생한다.
  * 2) 전달 된 경로가 존재하지 않으면 PathNotFoundException이 발생한다.
  * 3) 저장 경로에 데이터가 존재하면 FileAlreadyExistsException이 발생한다.
  */
  "Parameter Exception Test for convertParquet" should "raise Exception" in {
    // .. 생략 ..
  }
}
 

빌드와 배포

빌드와 배포는 기존에 작성한 빌드와 배포 문서를 참고한다.
 

Scala 빌드 파이프라인 설정하기 (1) - 빌드 생성

Jenkins와 연동한 Scala 빌드 머신을 활용하여 실제 Spark Application을 빌드하고 배포한다. Scala 빌드머신을 Jenkins에 연동하는 방법은 아래 문서를 참고한다. Jenkins에 Scala 빌드머신 연동하기 (Jenkins 연

www.bearpooh.com