간단한 Spark Application의 실행과 패키징
소스코드 작성이 완료되었으면 실제로 실행해 본다. 그리고 정상적으로 실행되면 sbt로 패키징하여 Jar 파일을 생성한다. 간단한 Spark Application 작성 방법은 아래 포스팅을 참고한다. 간단한 Spark A
www.bearpooh.com
Scala 빌드 파이프라인 설정하기 (빌드 생성)
Jenkins와 연동한 Scala 빌드 머신을 활용하여 실제 Spark Application을 빌드하고 배포한다. Scala 빌드머신을 Jenkins에 연동하는 방법은 아래 문서를 참고한다. Jenkins에 Scala 빌드머신 연동하기 (Jenkins 연
www.bearpooh.com
-
전체 구조와 기본 설정
-
헬퍼 (Helper) 기능 작성
-
데이터 처리 로직 작성
-
비즈니스 로직 작성
-
테스트 작성과 빌드, 배포
Spark Application 패키지 작성하기 (2) - 헬퍼 기능 작성
기본적인 Scala 개발환경이 구축되고 나면 목표로 했던 Spark Application을 작성한다. 앞서 테스트에 사용한 Spark Application은 main 함수에 기능을 구현한 단순한 예제 프로그램이다. 기존에 Hadoop 기반의
www.bearpooh.com
Spark Application 패키지 작성하기 (3) - 데이터 처리 로직 작성
기본적인 Scala 개발환경이 구축되고 나면 목표로 했던 Spark Application을 작성한다. 앞서 테스트에 사용한 Spark Application은 main 함수에 기능을 구현한 단순한 예제 프로그램이다. 기존에 Hadoop 기반의
www.bearpooh.com
테스트 작성
-
데이터 처리 기능 로직이 정상적으로 동작하는지 확인한다.
-
부가기능 (헬퍼) 코드는 확인이 필요하다고 판단되면 테스트 코드를 작성한다.
테스트 데이터 (log.json)
{"id":"2688425498","type":"buy","created_at":"2013-08-13T22:58:08Z"}
{"id":"2688425496","type":"view","created_at":"2013-08-13T22:59:08Z"}
{"id":"2688425492","type":"view","created_at":"2013-08-13T23:01:08Z"}
{"id":"2688425491","type":"buy","created_at":"2013-08-13T22:02:08Z"}
{"id":"2688425493","type":"view","created_at":"2013-08-13T22:03:08Z"}
{"id":"2688425494","type":"view","created_at":"2013-08-13T22:06:08Z"}
{"id":"2688425495","type":"view","created_at":"2013-08-13T22:08:08Z"}
SparkExampleTest.scala
-
해당 메소드의 정상 동작 여부
-
의도한 예외의 발생 여부
-
결과 파일의 생성 여부
-
결과 데이터의 정상 여부 (컬럼, 타입, 레코드 수 등)
-
예외 발생 여부
라이브러리 임포트
-
테스트를 위해 생성한 임시 디렉토리 삭제를 위한 FileUtils
-
하둡 경로의 예외 테스트를 위한 FileAlreadyExistsException (이미 존재), PathNotFoundException (없음)
-
로컬 경로 생성을 위한 Paths, 폴더 여부 속성의 확인을 위한 Files
-
테스트를 위한 로컬 SparkSession과 결과 데이터를 읽기 위한 DataFrame
-
행동 기반의 테스트 구현을 위한 AnyFlatSpec ("테스트 이름" should "만족할 조건" in { 테스트 }" 형태)
-
기대 결과와 실제 결과를 비교하기 위한 Matchers (should, should not, must, can, in 등)
-
전체 테스트 시작 전과 종료 후의 초기화, 후처리를 제어하는 BeforeAndAfterAll
-
테스트 단위의 시작 전과 종료 후의 초기화, 후처리를 제어하는 BeforeAndAfterEach
-
로컬 경로 제어와 속성 값 활용을 위한 File
-
테스트의 임시 저장 경로에 포함 된 무작위 값 생성을 위한 randomUUID
-
특정 기능 수행하면서 정상 실행 (예외 발생) 여부를 확인하기 위한 Try
package com.bearpooh.bdp.example.sparkexample.application
import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs.{FileAlreadyExistsException, PathNotFoundException}
import java.nio.file.{Files, Paths}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
import java.io.Fi
import java.util.UUID.randomUUID
import scala.util.Try
임포트 구문을 소개하는 이유
굳이 손으로 입력하지 않아도 되는 부분임에도 내용을 설명하는 목적은 다음과 같다.
- 사용한 라이브러리 파악 가능
- 해당 코드의 기능을 예측
- 코드 이해를 위한 기반 지식으로 활용
이러한 이유로 코드 분석을 진행할때 Import 부분을 제일 먼저 확인한다.
Scala 뿐만 아니라 모든 언어에 해당하는 부분이다.
테스트 클래스 선언
-
AnyFlatSpec - 행동 기반의 테스트 구현 ("테스트 이름" should "만족할 조건" in { 테스트 }" 형태)
-
Matchers - 기대 결과와 실제 결과 비교 (should, should not, must, can, in 등)
-
BeforeAndAfterAll - 전체 테스트 시작 전과 종료 후의 초기화, 후처리 제어
-
BeforeAndAfterEach - 테스트 단위의 시작 전과 종료 후의 초기화, 후처리 제어
// 클래스 정의는 코드 길이로 인해 줄바꿈 하였으나 실제로는 한줄에 입력
class SparkExampleTest extends AnyFlatSpec with Matchers
with BeforeAndAfterAll with BeforeAndAfterEach {
// .. 생략 ..
}
공통 변수 정의
-
SparkSession 인스턴스 - Spark을 로컬 모드로 실행 (로컬 경로의 데이터 사용)
-
SparkExample 인스턴스 - 데이터 처리 클래스의 기능 호출을 위한 인스턴스 생성 (SparkSession 전달)
-
테스트 기대 결과 상수 변수 - 실제 기능이 실행된 결과 값이 같아야 하는 기댓값을 상수 변수로 정의
-
테스트 데이터와 결과 저장 경로 - 테스트에 사용할 데이터의 로컬 경로 (읽을 경로, 저장 경로 순)
// 클래스 정의는 코드 길이로 인해 줄바꿈 하였으나 실제로는 한줄에 입력
class SparkExampleTest extends AnyFlatSpec with Matchers
with BeforeAndAfterAll with BeforeAndAfterEach {
// Spark Session
private val master: String = "local"
private val appName: String = "SparkExampleTest"
lazy val spark: SparkSession = new SparkSession.Builder()
.appName(appName)
.master(master)
.getOrCreate()
// Instance creation and assignment
val example: SparkExample = new SparkExample(spark)
// Test Expected Result
val NUM_OF_COLUMNS: Int = 3
val NUM_OF_ROWS: Int = 7
val NUM_OF_TYPE: Int = 2
// Test Data Path
val testSrcPath: String = Paths.get("src", "test", "resources", "data", "log").toString
val tmpDirectory: String = System.getProperty("java.io.tmpdir")
val baseDir: String = Paths.get(tmpDirectory, "sparkexample-test-" + randomUUID().toString).toString
val testDstPath: String = Paths.get(baseDir, "srteam", "test-data", "log-data").toString
// .. 생략 ..
}
기능 테스트 준비와 후처리 정의
-
수행할 작업의 함수 정의 (deleteFileRecursively)
-
전체 테스트 수행 전, 후에 수행할 작업 (beforeAll, afterAll)
-
개별 테스트 중간마다 수행할 작업 (afterEach)
- BeforeAndAfterAll
- BeforeAndAfterEach
// 클래스 정의는 코드 길이로 인해 줄바꿈 하였으나 실제로는 한줄에 입력
class SparkExampleTest extends AnyFlatSpec with Matchers
with BeforeAndAfterAll with BeforeAndAfterEach {
// .. 생략 ..
// Delete test directory
def deleteFileRecursively(testPath: String): Unit = {
if (Files.isDirectory(Paths.get(testPath))) // 지정 경로가 존재하고 디렉토리이면
FileUtils.deleteDirectory(new File(testPath)) // 하위 경로 포함해서 삭제
}
// Called before test
override def beforeAll(): Unit = {
deleteFileRecursively(baseDir)
}
// Called between each test
override def afterEach(): Unit = {
deleteFileRecursively(baseDir)
}
// Called after test
override def afterAll(): Unit = {
deleteFileRecursively(baseDir)
}
// .. 생략 ..
}
기능별 테스트 작성
-
전체 레코드 개수 확인 기능
-
JSON을 Parquet로 저장하는 기능
-
정상 실행 테스트 - 오류 없이 기능이 실행되면 의도한 결과와 일치해야 한다.
-
예외 발생 테스트 - 예외를 직접 발생(throw) 시키는 경우, 특정 조건에서 해당 예외가 발생해야 한다.
-
"테스트명" should "만족조건" in {기능 호출기대 결과와 일치 여부 확인}
-
테스트 정의에 사용하는 should와 in은 AnyFlatSpec 을 통해 제공된다.
-
기대 결과와 일치 여부를 확인하는 shouldBe, should be 는 Matchers를 통해 제공된다.
-
정상 실행 테스트
-
예외 발생 테스트
// 클래스 정의는 코드 길이로 인해 줄바꿈 하였으나 실제로는 한줄에 입력
class SparkExampleTest extends AnyFlatSpec with Matchers
with BeforeAndAfterAll with BeforeAndAfterEach {
// .. 생략 ..
"countRecords Function Test" should "not raise Exception" in {
val countResult: Try[Unit] = Try(example.countRecords(testSrcPath))
countResult.isSuccess shouldBe true
// 아래 테스트는 예제 목적으로 작성한 것으로 큰 의미는 없다.
val testSrcDF: DataFrame = spark.read.json(testSrcPath)
testSrcDF.columns.length shouldBe NUM_OF_COLUMNS
testSrcDF.count shouldBe NUM_OF_ROWS
}
// .. 생략 ..
}
-
an [예외명] sholud be thrownBy {기능클래스.함수명(함수인자)}
-
코드 실행 중 발생한 예외가 테스트에 작성한 "예외명"과 동일하면 Pass한다.
-
보통 전달한 인자의 값이 유효하지 않은 경우, 경로가 없거나 이미 존재하는 경우 등을 체크한다.
// 클래스 정의는 코드 길이로 인해 줄바꿈 하였으나 실제로는 한줄에 입력
class SparkExampleTest extends AnyFlatSpec with Matchers
with BeforeAndAfterAll with BeforeAndAfterEach {
// .. 생략 ..
"Parameter Exception Test for countRecords" should "raise Exception" in {
an [IllegalArgumentException] should be thrownBy {
example.countRecords("")
}
an [PathNotFoundException] should be thrownBy {
example.countRecords(testDstPath)
}
}
// .. 생략 ..
}
-
정상 실행 테스트
-
예외 발생 테스트
-
생성 된 경로의 디렉토리 여부, Parquet 파일 생성 여부
-
결과 데이터를 읽고 컬럼수, 레코드수, 특정 컬럼의 고유 값 개수가 기대 값과 같은지 확인
// 클래스 정의는 코드 길이로 인해 줄바꿈 하였으나 실제로는 한줄에 입력
class SparkExampleTest extends AnyFlatSpec with Matchers
with BeforeAndAfterAll with BeforeAndAfterEach {
// .. 생략 ..
"convertParquet Function Test" should "not raise Exception" in {
val convertResult: Try[Unit] = Try(example.convertParquet(testSrcPath, testDstPath))
convertResult.isSuccess shouldBe true
// 생성된 데이터 파일의 존재 여부와 개수 확인
new File(testDstPath).isDirectory shouldBe true
new File(testDstPath).listFiles.count(f => f.isFile && f.getName.endsWith(".parquet")) should be > 0
// 생성된 데이터를 읽고 결과 확인
val testDstDF: DataFrame = spark.read.parquet(testDstPath)
testDstDF.columns.length shouldBe NUM_OF_COLUMNS
testDstDF.count shouldBe NUM_OF_ROWS
// 생성 된 데이터의 특정 컬럼 결과 확인
val typeCount: Long = testDstDF.select("type").distinct.count
typeCount shouldBe NUM_OF_TYPE
typeCount should be < NUM_OF_ROWS.toLong
}
// .. 생략 ..
}
-
an [예외명] sholud be thrownBy {기능클래스.함수명(함수인자)}
-
코드 실행 중 발생한 예외가 테스트에 작성한 "예외명"과 동일하면 Pass한다.
-
보통 전달한 인자의 값이 유효하지 않은 경우, 경로가 없거나 이미 존재하는 경우 등을 체크한다.
// 클래스 정의는 코드 길이로 인해 줄바꿈 하였으나 실제로는 한줄에 입력
class SparkExampleTest extends AnyFlatSpec with Matchers
with BeforeAndAfterAll with BeforeAndAfterEach {
// .. 생략 ..
"Parameter Exception Test for convertParquet" should "raise Exception" in {
an[IllegalArgumentException] should be thrownBy {
example.convertParquet("", testDstPath)
}
an[IllegalArgumentException] should be thrownBy {
example.convertParquet(testSrcPath, "")
}
an[PathNotFoundException] should be thrownBy {
example.convertParquet(testDstPath, testDstPath)
}
an[FileAlreadyExistsException] should be thrownBy {
example.convertParquet(testSrcPath, testSrcPath)
}
}
}
Docstring 작성
/**
* SparkExample 테스트 클래스다. local spark를 사용한다.
*/
// 클래스 정의는 코드 길이로 인해 줄바꿈 하였으나 실제로는 한줄에 입력
class SparkExampleTest extends AnyFlatSpec with Matchers
with BeforeAndAfterAll with BeforeAndAfterEach {
// .. 생략 ..
}
-
테스트 하려는 기능에 대한 간단한 설명
-
테스트 통과 조건 (결과 기대값 설명)
// 클래스 정의는 코드 길이로 인해 줄바꿈 하였으나 실제로는 한줄에 입력
class SparkExampleTest extends AnyFlatSpec with Matchers
with BeforeAndAfterAll with BeforeAndAfterEach {
// .. 생략 ..
/**
* JSON 데이터를 읽고 Parquet 포맷으로 저장하는 기능에 대한 테스트이다.
* 1) 발생한 Exception이 없다.
* 2) 생성된 데이터 경로가 존재하고 개수가 1개 이상이다.
* 3) Parquet 데이터의 컬럼 개수가 3개이고 레코드수가 7개이다.
* 4) type 컬럼의 중복 제거 개수가 2개이고, 7개 이하이다.
*/
"convertParquet Function Test" should "not raise Exception" in {
// .. 생략 ..
}
// .. 생략 ..
}
-
테스트 하려는 기능에 대한 간단한 설명
-
테스트 통과 조건 (예외 발생 조건과 발생하는 예외 설명)
// 클래스 정의는 코드 길이로 인해 줄바꿈 하였으나 실제로는 한줄에 입력
class SparkExampleTest extends AnyFlatSpec with Matchers
with BeforeAndAfterAll with BeforeAndAfterEach {
// .. 생략 ..
/**
* JSON 데이터를 읽고 Parquet 포맷으로 저장하는 기능에 잘못 된 입력이 전달 된 경우에 대한 테스트이다.
* 예외 발생 조건의 입력 값을 전달하고 정상적으로 예외가 발생하는지 확인한다.
* 1) 데이터의 경로가 전달되지 않으면 IllegalArgumentException이 발생한다.
* 2) 전달 된 경로가 존재하지 않으면 PathNotFoundException이 발생한다.
* 3) 저장 경로에 데이터가 존재하면 FileAlreadyExistsException이 발생한다.
*/
"Parameter Exception Test for convertParquet" should "raise Exception" in {
// .. 생략 ..
}
}
빌드와 배포
Scala 빌드 파이프라인 설정하기 (1) - 빌드 생성
Jenkins와 연동한 Scala 빌드 머신을 활용하여 실제 Spark Application을 빌드하고 배포한다. Scala 빌드머신을 Jenkins에 연동하는 방법은 아래 문서를 참고한다. Jenkins에 Scala 빌드머신 연동하기 (Jenkins 연
www.bearpooh.com
'::: 데이터 분석 :::' 카테고리의 다른 글
Spark Application 패키지 작성하기 (4) - 비즈니스 로직 작성 (0) | 2023.09.14 |
---|---|
Spark Application 패키지 작성하기 (3) - 데이터 처리 로직 작성 (35) | 2023.09.11 |
Spark Application 패키지 작성하기 (2) - 헬퍼 기능 작성 (1) | 2023.09.07 |
Spark Application 패키지 작성하기 (1) - 전체 구조와 기본 설정 (49) | 2023.09.04 |
간단한 Spark Application의 실행과 패키징 (128) | 2023.08.07 |