배치 작업을 활용한 외부 API의 성능 문제 극복
Spring Batch와 Kubernetes를 통해 배치 작업 구현하기
2024-01-25
서비스를 개발하다 보면 외부 API(Application Programming Interface)를 사용하는 경우가 빈번하게 존재합니다.
물론 서비스마다 외부 API를 사용하는 빈도는 다 다르지만, 제가 개발하는 서비스에서는 외부 API 통신이 많은 편이었습니다.
문제는 제가 사용하는 외부 API인 학교 API가 지연 시간이 긴 편이며 페이지네이션(Pagination) 등의 기능을 제공하지 않는다는 점인데요.
이렇게 되면 제가 성능 최적화를 수행해도 학교 API에 의존하는 이상 의미가 없어지겠다고 생각했습니다.


그래서 주기적으로 학교 API로부터 데이터를 가져와 데이터베이스에 저장하는 배치 작업을 구현하기로 했습니다..
이러면 기존의 학교 API를 사용하는 부분을 지연 시간이 짧은 데이터베이스를 사용해 대체할 수 있게 됩니다.

Batch

배치 작업을 구현하기 위해 Spring Batch와 Kubernetes의 CronJob을 사용하기로 했습니다.
외부 API로부터 가져온 데이터들을 데이터베이스를 저장하는 작업을 Spring Batch를 통해 구현하고, 해당 작업을 Kubernetes의 CronJob을 통해 주기적으로 수행하면 됩니다.

Spring Batch

application.yaml
spring:
  datasource:
    url: jdbc:h2:mem:batch
    username: sa
    password:
    driverClassName: org.h2.Driver
우선 Spring Batch를 사용하기 위해서는 배치 작업을 관리하는 메타 데이터를 저장할 공간이 필요합니다.
이에 대해서는 임시적으로 인메모리(In-memory) DB인 H2를 사용하기로 했습니다.

Step

StepJob을 구성하는 하나의 단계로, 실제로 수행되는 작업을 정의합니다.
이때, Step은 청크(Chunk)나 Tasklet을 기반으로 구현됩니다.
Tasklet 기반 배치는 Step을 하나의 메서드로 한번에 처리하는 반면, 청크 기반 배치는 Stepread(), process(), write()로 구성해 설정한 chunk 수만큼 데이터를 묶어 처리합니다.
즉, chunk가 5인 경우 ItemReaderItemProcessor가 데이터를 각각 1개씩 읽고 처리한 후 5개를 모아서 ItemWriter에 한번에 전달합니다.


학교 API에는 강의들을 각 연도와 학기 별로 조회하는 방법 밖에 없으므로 청크 기반 배치가 아닌 Tasklet 기반 배치를 사용해도 큰 성능 차이는 없습니다.
그렇지만 청크 기반 배치가 좀 더 가독성이 좋다고 생각해 청크 기반으로 Step을 구현해 보았습니다.
ItemReader.java
@FunctionalInterface
public interface ItemReader<T> {
	T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException;
}
ItemWriter.java
@FunctionalInterface
public interface ItemWriter<T> {
	void write(@NonNull Chunk<? extends T> chunk) throws Exception;
}
우선 Step을 구성할 ItemReaderItemWriter를 구현해야 합니다.
조회 및 저장 외의 복잡한 처리가 필요하지 않으므로 ItemProcessor는 구현하지 않았습니다.
WebClientLecturesReader.kt
@Component
class WebClientLecturesReader(
    private val lectureClient: LectureClient
) : ItemReader<List<Lecture>> {
    private val logger = getLogger()
    private val semesters = enumValues<Semester>()
    private val years = (2020..Year.now().value).toList()
    private val pairs = years.flatMap { year ->
        semesters.map { semester ->
            year to semester
        }
    }
    private var index = 0
 
    override fun read(): List<Lecture>? =
        pairs.getOrNull(index++)
            ?.let { (year, semester) ->
                lectureClient.getLecturesByYearAndSemester(year, semester)
                    .doOnNext { logger.info { "Read: $it" } }
                    .collectList()
                    .block()
            }
}
WebClientLecturesReaderWebClient를 통해 각 연도와 학기를 순회하며 학교 API로부터 강의들을 조회합니다.
ItemReaderread()에서 null을 반환하면 조회 작업을 종료합니다.
MongoLecturesWriter.kt
@Component
class MongoLecturesWriter(
    private val lectureRepository: LectureRepository
) : ItemWriter<List<Lecture>> {
    private val logger = getLogger()
 
    override fun write(chunk: Chunk<out List<Lecture>>) {
        lectureRepository.saveAll(chunk.items.first())
            .doOnNext { logger.info { "Write: $it" } }
            .blockLast()
    }
}
MongoLecturesWriter는 조회한 강의들을 데이터베이스에 저장합니다.
Spring Batch는 아직까진 리액티브 프로그래밍을 지원하지 않으므로 Spring WebFlux 환경에서 Spring Batch를 사용할 때는 Reactor의 block()을 통해 스레드를 블로킹(Blocking)해야 합니다.
그렇지 않으면 스트림을 구독한 채로 배치 스레드가 종료되어 작업이 정상적으로 완료되지 않을 수 있습니다.
LectureBatchConfiguration.kt
@Bean
fun saveLecturesStep(
    reader: WebClientLecturesReader,
    writer: MongoLecturesWriter
): Step =
    StepBuilder("saveLecturesStep", jobRepository)
        .chunk<List<Lecture>, List<Lecture>>(1, transactionManager)
        .reader(reader)
        .writer(writer)
        .build()
앞서 정의한 WebClientLecturesReaderMongoLecturesWriter를 통해 Step을 만들어 Bean으로 등록합니다.

Job

LectureBatchConfiguration.kt
@Bean
fun updateLecturesJob(
    webClientLecturesReader: WebClientLecturesReader,
    mongoLecturesWriter: MongoLecturesWriter
): Job =
    JobBuilder("updateLecturesJob", jobRepository)
        .start(saveLecturesStep(webClientLecturesReader, mongoLecturesWriter))
        .build()
마지막으로 saveLecturesStepStep으로 가지는 updateLecturesJob을 Bean으로 등록합니다.
2024-01-25 13:47:21.851 INFO [main] o.s.b.c.l.s.SimpleJobLauncher: Job: [SimpleJob: [name=updateLecturesJob]] launched with the following parameters: [{}]
2024-01-25 13:47:21.861 INFO [main] o.s.b.c.j.SimpleStepHandler: Executing step: [saveLecturesStep]
2024-01-25 13:47:50.287 INFO [main] o.s.b.c.l.s.SimpleJobLauncher: Job: [SimpleJob: [name=updateLecturesJob]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 28s426ms
애플리케이션을 시작하면 JobLauncherApplicationRunnerJobLauncher를 통해 updateLecturesJob을 실행하게 됩니다.
application.yaml
spring:
  batch:
    job:
      name: ${JOB}
Bean으로 등록된 배치 작업이 여러 개인 경우, 애플리케이션 시작 시에 수행할 Job을 환경 변수로 정의할 수 있도록 했습니다.
BatchApplication.kt
fun main(args: Array<String>) {
    exitProcess(SpringApplication.exit(runApplication<BatchApplication>(*args)))
}
또한 Kubernetes의 CronJob을 통해 배치 작업을 수행할 예정이므로 배치 작업이 끝나면 종료 코드와 함께 프로세스가 종료되도록 합니다.

CronJob

이제 Kubernetes의 CronJob을 통해 해당 배치 작업을 특정 시간에 주기적으로 수행하도록 하겠습니다.
현재 인프라에서는 Helm과 ArgoCD를 통해 배포를 진행하고 있는데요.
그러므로 따로 존재하는 차트 레포지토리 내에 CronJob에 대한 매니페스트(Manifest)를 작성해야 합니다.
values.yaml
jobs:
- name: updateLecturesJob
  schedule: 0 1 * * *
- name: updateStudentsJob
  schedule: 0 2 * * *
cronjob.yaml
{{- range .Values.jobs }}
---
apiVersion: batch/v1
kind: CronJob
metadata:
  name: {{ kebabcase .name }}
spec:
  schedule: {{ .schedule }}
  timeZone: 'Asia/Seoul'
  jobTemplate:
    spec:
      template:
        spec:
          restartPolicy: OnFailure
          containers:
          - name: {{ $.Values.controller.batch.containerName }}
            image: {{ $.Values.controller.batch.image.name }}:{{ $.Values.controller.batch.image.tag }}
            imagePullPolicy: Always
            env:
            - name: SPRING_PROFILES_ACTIVE
              value: {{ $.Values.env }}
            - name: JOB
              value: {{ .name }}
            envFrom:
            - secretRef:
                name: {{ $.Values.secret.name }}
          imagePullSecrets:
          - name: docker-secret
---
{{- end }}
CronJob에서 schedule을 통해 Cron 표현식으로 해당 작업을 수행할 시간을 정의할 수 있습니다.
저는 반복문을 통해 values.yaml에서 여러 배치 작업들의 Job 이름과 수행할 시간을 받아 CronJob을 생성하도록 구현했습니다.
> kubectl get job -n doyoumate
NAME                                COMPLETIONS   DURATION   AGE
update-lectures-job-28574100   1/1           71s        10h
update-students-job-28574100   1/1           16m        10h
실제로 CronJob에 의해 특정 시간에 배치 작업이 자동으로 수행된 것을 확인할 수 있습니다.