기본적인 Scala 개발환경이 구축되고 나면 목표로 했던 Spark Application을 작성한다.
앞서 테스트에 사용한 Spark Application은 main 함수에 기능을 구현한 단순한 예제 프로그램이다.
기존에 Hadoop 기반의 Spark 클러스터를 구축했으므로, Spark Application은 해당 환경에서 실행할 수 있는 형태로 작성해야 한다.
따라서 Spark Application 작성을 위한 기본 형태 (Skeleton) 제공을 위한 예제 프로그램을 재작성한다.
참고로 예전에 실무에서 활용했던 Spark Application 코드에서 재사용 또는 공개가 가능한 코드만 활용하여 정리하였다.
전체적인 내용은 아래 문서를 참고하고, 변경 사항을 중심으로 정리한다.
전체 내용은 아래와 같은 순서로 작성한다.
-
전체 구조와 기본 설정
-
헬퍼 (Helper) 기능 작성
-
데이터 처리 로직 작성
-
비즈니스 로직 작성
-
테스트 작성과 빌드, 배포
전체 구조와 기본 설정, 헬퍼 (Helper) 기능 작성은 아래 문서를 참고한다.
기능 구현
실제 기능을 수행하는 코드를 작성한다.
SparkExample.scala
Spark Application에서 실제 데이터 처리를 수행하는 로직을 작성한다.
내부에 포함 된 기능은 다음과 같고, SparkExampleApp.scala에서 전달 된 saveFlag 값에 의해 선택적으로 호출한다.
-
전체 레코드 개수 확인
-
Parquet 포맷으로 저장
라이브러리 임포트
굳이 손으로 타이핑하지 않아도 된다.
아래 설명할 코드들을 작성할 때 IntelliJ에서 Alt + Enter로 손쉽게 추가할 수 있다.
SparkExample.scala에서 사용하는 라이브러리들을 선언한다.
-
로그 출력을 위해 SparkExampleApp에 정의한 log 함수
-
하둡의 데이터 파일 경로를 처리하는 FileSystemHelper
-
하둡 경로의 예외 처리를 위한 FileAlreadyExistsException (이미 존재), PathNotFoundException (없음)
-
FileSystemHelper에 암시적(Implicit)으로 전달하기 위한 SparkContext
-
데이터 처리를 위한 Spark의 고수준 API를 제공하는 SparkSession
코드는 다음과 같다.
package com.bearpooh.bdp.example.sparkexample.application
import com.sr.ocean.sparkexample.SparkExampleApp.log
import com.sr.ocean.sparkexample.util.FileSystemHelper
import org.apache.hadoop.fs.{FileAlreadyExistsException, PathNotFoundException}
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
클래스 객체 정의
Spark Application에서 실제 데이터 처리를 수행하기 위한 SparkExample 클래스를 정의한다.
SparkSession 객체를 인자로 전달 받는다. Spark 클러스터 연결과 고수준 API를 사용하여 데이터를 처리한다.
클래스 내부의 상단에는 해당 클래스에서 공통으로 사용할 변수들을 정의한다.
-
SparkContext를 암시적 변수로 선언한다. FileSystemHelper의 기능 메소드에 암시적 파라미터로 전달한다.
-
Spark에서 파일로 저장하는 파티션 개수를 상수 변수로 정의한다. repartition 함수의 파라미터로 전달한다.
코드는 다음과 같다.
class SparkExample(spark: SparkSession) {
// Implicit definition of SparkContext (for FileSystemHelper)
implicit val sc: SparkContext = spark.sparkContext
// defind common values
val MINIMUM_PARTITION_NUM = 1
// .. 생략 ..
}
기능 메소드 구현
SparkExample에서 실제로 데이터를 처리하는 로직을 작성한다.
데이터 처리는 SparkExampleApp.scala에서 전달 된 saveFlag 값에 의해 아래 기능을 선택적으로 호출한다.
-
전체 레코드 개수 확인
-
Parquet 포맷으로 저장
전체 레코드 개수 확인
전체 레코드 개수를 로그로 출력하는 기능이다. 반환하는 값은 없으므로 리턴 타입은 Unit이다.
개인적으로 데이터 처리 로직에서 로그 출력을 하지 않고, 필요한 경우 main 함수에서 출력한다.
데이터 처리 로직에서는 오류가 발생한 경우에 예외를 통해 로그를 출력한다.
그러나 예제 코드 목적상 전체 레코드 개수를 출력한다. (사실 할게 없다..)
코드는 다음과 같다. 자세한 내용은 주석을 참고한다.
class SparkExample(spark: SparkSession) {
// .. 생략 ..
def countRecords(srcPath: String): Unit = { // 사용할 데이터 경로를 인자로 전달
// Check arguments
// 함수 인자의 타입이 Option[String]이 아니므로 None인 경우는 없다.
if(srcPath.isEmpty) // 경로가 빈 문자열인 경우 예외 발생
throw new IllegalArgumentException("The srcPath is empty.")
// Check data path
// 전달 받은 HDFS 경로가 존재하는지 확인
if (!FileSystemHelper.exists(srcPath)) // SparkContext인 sc는 implicit 형태로 전달
throw new PathNotFoundException("The srcPath is not exist: " + srcPath)
// Run Spark Business Logic
val logDF = spark.read.json(srcPath) // JSON 포맷의 데이터를 DataFrame으로 읽고
val logCount = logDF.count() // 레코드의 개수를 얻어 logCount에 저장
log.info("log count: " + logCount) // 전체 레코드 개수를 로그로 출력
logDF.show(false) // DataFrame의 상위 20개 레코드를 로그로 출
}
// .. 생략 ..
}
Parquet 포맷으로 저장
JSON 포맷의 데이터를 읽고 Parquet 포맷으로 저장한다. 반환하는 값은 없으므로 리턴 타입은 Unit이다.
레코드 개수가 적기 때문에 저장하기 전에 파티션을 1개로 줄인다.
파티션 개수를 1개로 줄이는 경우에는 coalesce 대신 repartition을 쓰는게 처리 속도면에서 훨씬 효율이 좋다. (Best Practice)
코드는 다음과 같다. 자세한 내용은 주석을 참고한다.
class SparkExample(spark: SparkSession) {
// .. 생략 ..
// 사용할 데이터와 저장할 경로를 인자로 전달
def convertParquet(srcPath: String, dstPath: String): Unit = {
// Check arguments
// 함수 인자의 타입이 Option[String]이 아니므로 None인 경우는 없다.
if (srcPath.isEmpty) // 경로가 빈 문자열인 경우 예외 발생
throw new IllegalArgumentException("The srcPath is empty.")
if (dstPath.isEmpty) // 경로가 빈 문자열인 경우 예외 발생
throw new IllegalArgumentException("The dstPath is empty.")
// Check data path
if (!FileSystemHelper.exists(srcPath)) // 사용할 데이터의 HDFS 경로가 없는 경우
throw new PathNotFoundException("The srcPath is not exist: " + srcPath)
if (FileSystemHelper.exists(dstPath)) // 저장할 데이터의 HDFS 경로가 있는 경우
throw new FileAlreadyExistsException("The dstPath is already exist: " + dstPath)
// Run Spark Business Logic
spark.read.json(srcPath) // JSON 포맷의 데이터를 DataFrame으로 읽고
.repartition(MINIMUM_PARTITION_NUM) // 파티션을 1개로 줄이고
.write.parquet(dstPath) // 저장 경로에 Parquet 포맷으로 저장
}
}
Docstring 작성
Docstring은 클래스나 함수의 기능을 설명하기 위해 작성한 주석을 의미한다.
-
코드 사용법, 동작 방식, 입력 및 출력 값, 예제 등을 설명한다.
-
코드의 가독성과 유지 보수 향상이 목적이다.
SparkExample.scala는 데이터 처리 요구사항을 구현하는 코드이기 때문에 Docstring을 작성한다.
Scala의 경우 클래스와 메소드 선언 구문의 바로 위에 작성한다. (파이썬은 선언 구문 바로 아래에 작성한다.)
또한 클래스와 메소드에 따라 포맷에 약간 차이가 있다.
Class의 Docstring 작성
클래스의 Docstring은 아래 항목에 대한 설명을 간략하게 작성한다.
-
수행 기능 (간단 요약)
-
전달 인자 (@param 인자명)
클래스의 Docstring을 작성한 예시는 다음과 같다.
/**
* SparkExample은 Spark Application의 예제 패키지를 제공하기 위해
* JSON 형식의 데이터를 읽고 Parquet으로 저장한다.
*
* 아래 기능을 수행한다.
* 1) countRecords - JSON 데이터의 전체 레코드수를 확인한다.
* 2) convertParquet - JSON 데이터를 읽고 Parquet로 저장한다.
*
* @param spark SparkSession이다. 정의된 sparkSession에서 spark 연산을 수행한다.
*/
class SparkExample(spark: SparkSession) {
// .. 생략 ..
}
Method의 Docstring 작성
메소드의 Docstring은 아래 항목에 대한 설명을 간략하게 작성한다.
-
수행 기능 (간단 요약)
-
전달 인자 (@param 인자명 설명 타입)
-
반환 값과 타입 (@return 변수명 설명 타입)
-
직접 예외를 발생시키는 경우 해당 예외 (@throw 예외패키지경로 설명)
만약 반환하는 값이 없어 Unit 타입을 반환하는 경우 @return 항목은 비워놓는다.
메소드의 Docstring을 작성한 예시는 다음과 같다.
class SparkExample(spark: SparkSession) {
// .. 생략 ..
/**
* JSON 데이터의 전체 레코드수를 확인한다.
* 1) 입력 받은 경로를 기반으로 JSON 데이터 읽음
* 2) 읽은 데이터의 전체 레코드 수 확인
*
* @param srcPath 원본 JSON 데이터 경로 (String)
* @return
* @throws java.lang.IllegalArgumentException 함수에 전달한 인자값이 유효하지 않으면 발생한다.
* @throws org.apache.hadoop.fs.PathNotFoundException 원본 데이터가 존재하지 않으면 발생한다.
*/
def countRecords(srcPath: String): Unit = {
// .. 생략 ..
}
// .. 생략 ..
}
비즈니스 로직 작성
비즈니스 로직 작성은 다음 문서를 참고한다.
'::: 데이터 분석 :::' 카테고리의 다른 글
Spark Application 패키지 작성하기 (5) - 테스트 작성과 빌드, 배포 (0) | 2023.09.18 |
---|---|
Spark Application 패키지 작성하기 (4) - 비즈니스 로직 작성 (0) | 2023.09.14 |
Spark Application 패키지 작성하기 (2) - 헬퍼 기능 작성 (1) | 2023.09.07 |
Spark Application 패키지 작성하기 (1) - 전체 구조와 기본 설정 (49) | 2023.09.04 |
간단한 Spark Application의 실행과 패키징 (128) | 2023.08.07 |