[Spring] Spring Batch 실습해보기

https://spring.io/projects/spring-batch

 

 

Batch와 Spring Batch

 Spring Batch는 배치 작업을 위해서 스프링에서 제공하는 배치 프레임워크이며, 배치(Batch)란 데이터를 실시간으로 처리하지 않고 단발성으로 일괄처리하는 방식을 말한다.

 

 상품 결제를 승인하거나 api요청에 따라 뷰를 띄워주는 등의 작업을 위해서는 클라이언트에게 즉각적인 응답을 제공해야 한다. 하지만 일정 시간을 기준으로 데이터를 취합하여 통계 리포트를 생성하거나, 부하가 큰 대용량의 데이터 작업을 서버 부하가 적은 특정 시간대에 처리하려 할 경우에는 실시간 처리보다는 특정 주기를 두고 처리하는 것이 바람직하다. 이러한 데이터 처리 방식을 배치(Batch)라고 한다.

 

이러한 배치 어플리케이션의 특성에 Spring Framework의 3대 요소인 DI, AOP, PSA(서비스 추상화)를 지원하는 프레임워크가 Spring Batch이다. Spring Batch는 Batch에 관련된 다양한 기능을 정형화시켜 제공하기에 비즈니스 코드 작성에 집중할 수 있으며, 기존 스프링 프레임워크를 사용한 프로젝트에 효과적으로 적용할 수 있다.

 

Spring Batch는 특정 시간대의 데이터 일괄 처리를 목적으로 하기에 보통 스케줄러와 같이 사용된다. 이를 위해 스프링의 @Scheduled 어노테이션을 사용하거나, 보다 복잡한 스케줄링을 위해서는 Quartz와 같은 외부 라이브러리와 조합하여 사용할 수 있다.

 

Spring 프로젝트에 Spring Batch와 함께 스케줄러를 적용하여 일정 주기별로 데이터를 처리해보자.

 

 

Spring Batch는 간단하게 위와 같은 구조를 지니며, 각 단계는 다음과 같은 역할을 한다.

 

JobLauncher Spring Application에서 설정된 Job을 실행한다.
Job 하나의 배치 작업 단위를 말하며, 세부 내용은 Step에서 처리된다. 하나의 Job은 여러개의 Step으로 구성될 수 있다.
Step Job에서 실제로 수행되는 개별 작업 단위를 말하며, 각 Step은 읽기, 처리, 쓰기의 3단계로 나누어진다. 이 각각의 작업은 ItemReader, ItemProcessor, ItemWriter가 각각 담당한다.
1개의 Job에 1개 이상의 Step이 존재할 수 있다.
JobRepository 배치가 수행될 때 메타데이터를 관리하고, 시작시간, 종료시간, Job의 상태 등 배치 수행에 관련된 데이터가 저장된다. JobRepository를 이용하여 실행 상태를 저장하여 실패 작업을 복구하거나, 관련 정보를 확인할 수 있다.

 

 

 

프로젝트 실습

// spring batch (latest: 5.x)
implementation 'org.springframework.boot:spring-boot-starter-batch'
spring:
  batch:
    jdbc:
      initialize-schema: always
    job:
      name: testJob # job 이름 설정
      
  datasource:
    hikari:
      driver-class-name: com.mysql.cj.jdbc.Driver
      jdbc-url: jdbc:mysql://localhost:3306/${DB_NAME}?serverTimezone=Asia/Seoul
      username: ${DB_USERNAME}
      password: ${DB_PASSWORD}

 

가장 먼저 dependencies와 application.yml에 관련 설정을 추가해준다.

 

 

Spring Batch 5.0 설정하기

현재 SpringBoot 3.x버전에서는 Spring Batch 5.0을 사용하는 것을 권장하고 있는데, 기존 4.x버전과는 차이가 존재한다. 

 

이전에는 @EnableBatchProcessing이라는 어노테이션을 써 주어야 BatchAutoConfiguration을 사용하여 @Configuration에 설정된 빈들을 자동으로 Job으로 등록해주었지만, 이제는 사용하지 않아도 된다. 대신 DefaultBatchConfiguration이 존재하지 않는 경우에만 BatchAutoConfiguration이 활성화된다는 점은 기억해두자

 

 

또한 JobBuilderFactory와 StepBuilderFactory를 의존성 주입받아 사용하던 기존 Spring Batch 4.0과는 다르게, 5.x부터는 아래와 같이 JobBuilder와 StepBuilder를 사용하고, JobRepository와 PlatformTransactionManager를 주입받아서 Job과 Step을 등록한다.

 

@Slf4j
@RequiredArgsConstructor
@Configuration
public class BatchJobConfig {

    private final String JOB_NAME = "testJob";
    private final String STEP_NAME = "testStep";

    /**
     * Job 등록
     */
    @Bean
    public Job testJob(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
        return new JobBuilder(JOB_NAME, jobRepository)
                .incrementer(new RunIdIncrementer()) // sequential id
                .start(testStep(jobRepository, transactionManager)) // step 설정
                .build();
    }

    /**
     * Step 등록
     */
    @Bean
    @JobScope
    public Step testStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
        return new StepBuilder(STEP_NAME, jobRepository)
                .tasklet(testTasklet(), transactionManager) // tasklet 설정
                .build();
    }

    /**
     * Tasklet: Reader-Processor-Writer를 구분하지 않는 단일 step
     */
    @Bean
    @StepScope
    public Tasklet testTasklet() {
        return new Tasklet() {
            @Override
            public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                log.info("Spring Batch Test Success");
                return RepeatStatus.FINISHED; // 작업에 대한 Status 명시
            }
        };
    }
}

 

이렇게 주입받은 Repository와 TransactionManager를 통해서 Spring Batch 작업 단계에서 필요한 트랜잭션을 관리하여 데이터 일관성을 유지할 수 있다. 

 

더보기

 

- 추가로 해당 프로젝트에서 MyBatis와 JPA를 동시에 사용하기 위해서 PlatformTransactionManager를 JpaTransactionManager와 DatasourceTransactionManager를 연결시킨 형태의 ChainedTransactionManager의 형태로 커스텀해서 사용하고 있었는데, 이 때 Job과 Step에 PlatformTransactionManager를 주입하려고 시도하니 isolation level이 다르다고 나오면서 실행되지 않았다.

 

- isolation level을 설정파일에서 일치시킬 수 있는 방법도 찾지 못해서 일단 JpaTransactionManager를 커스텀하여 사용하는 방법으로 대신했는데, 나중에 더 알아보자

 

- 일단 Spring Batch 5.0에서 ChainedTransactionManager가 deprecated된 것과 연관이 있지 않을까 싶다.

 

- PlatformTransactionManager: 모든 트랜잭션 매니저의 상위 인터페이스이다. 트랜잭션 관리를 위한 기본 인터페이스를 제공한다.

- ChainedTransactionManager: 여러개의 다른 트랜잭션 매니저를 연결해서 사용하기 위하여 사용하는 클래스이다.

- AbstractPlatformTransactionManager: 트랜잭션 관리자를 만들기 위한 추상 클래스이다. 이를 JpaTransactionManager, DatasourceTransactionManager, JtaTransactionManager 등으로 구현하여 JPA, JDBC, JTA와 같은 다양한 데이터 액세스 기술을 지원한다.

 

- 트랜잭션 매니저에 관련된 자세한 설명은 https://eckrin.tistory.com/155 에 있다. 

 

 

이후 스프링 부트를 실행하면 위와 같이 Batch가 동작하는 것을 확인할 수 있다. 성공적으로 실행되었다면 datasource에 설정해 둔 데이터베이스에 몇 개의 메타 테이블들이 생성된다.

 

 

 

 

데이터 마이그레이션

 주문(Orders) 테이블에서 정산(Accounts) 테이블로 일정 시간마다 스케줄러를 통해서 데이터를 마이그레이션하는 상황을 가정하였다.

 

/**
 * 주문(order) 테이블 -> 정산(accounts) 테이블 데이터 이관 가정
 */
@Slf4j
@RequiredArgsConstructor
@Configuration
public class TrMigrationConfig {

    private final String JOB_NAME = "trMigrationJob";
    private final String STEP_NAME = "trMigrationStep";

    private final OrderRepository orderRepository;
    private final AccountRepository accountRepository;

    /**
     * Job 등록
     */
    @Bean
    public Job trMigrationJob(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
        return new JobBuilder(JOB_NAME, jobRepository)
                .incrementer(new RunIdIncrementer()) // sequential id
                .start(trMigrationStep(jobRepository, transactionManager)) // step 설정
                .build();
    }

    /**
     * Step 등록
     */
    @Bean
    @JobScope
    public Step trMigrationStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
        return new StepBuilder(STEP_NAME, jobRepository)
                .<Order, Account>chunk(5, transactionManager) // chunkSize: 몇 개 단위로 데이터를 처리할 것인지 지정. 참고로 파라미터로 받아서 사용할 수는 없다.
                .reader(trOrderReader())
                .processor(trOrderProcessor())
                .writer(trOrderWriter())
                .build();
    }

    /**
     * Account를 Repository의 method를 명시하여 데이터 처리
     */
    @Bean
    @StepScope
    public RepositoryItemWriter<Account> trOrderWriter() {
        return new RepositoryItemWriterBuilder<Account>()
                .repository(accountRepository)
                .methodName("save")
                .build();
    }

    /**
     * Order를 Account로 변경하는 Processor
     */
    @Bean
    @StepScope
    public ItemProcessor<Order, Account> trOrderProcessor() {
        return new ItemProcessor<Order, Account>() {
            @Override
            public Account process(Order item) throws Exception {
                return new Account(item);
            }
        };
    }

    @Bean
    @StepScope
    public RepositoryItemReader<Order> trOrderReader() {
        return new RepositoryItemReaderBuilder<Order>()
                .name("trOrderReader")
                .repository(orderRepository)
                .methodName("findAll")
                .pageSize(5) // chunkSize와 일치하게 설정
                .arguments(List.of())
                .sorts(Collections.singletonMap("id", Sort.Direction.ASC))
                .build();

    }


}

 

위에서 작성한 testJob과 비슷하지만, 보다 세부적인 작업을 위해서 tasklet이 아닌 ItemReader, ItemProcessor, ItemWriter를 사용하였다. 

 

reader에서 orderRepository의 findAll() 메소드를 통해서 데이터를 조회한 후 id순 정렬하고, processort에서 Order객체를 Account객체로 변환한 후, 받아온 Item 각각을 writer에서 accountRepository의 save 메소드의 인자로 넘기는 것을 확인할 수 있다.

 

참고로 Chunk란, ItemReader를 통해서 읽은 아이템을 쌓아두는 단위이며 Job 생성시점에 정의된다. Chunk size만큼 데이터가 쌓였다면 ItemWriter로 일괄 전달되며, Chunk 단위로 트랜잭션 작업(커밋)이 이루어진다. 추가로 txOrderReader를 보면 pageSize()라는 메서드가 존재하는데, 이는 읽어올 때 페이징하는 데이터 개수로 chunk와는 다른 개념이다.

 

 

이렇게 설정파일을 세팅하고 스프링부트를 실행시키면 (현재 스케줄러가 없으므로) 위 사진과 같이 Order테이블의 데이터들이 Account 테이블로 넘어간 것을 확인할 수 있다. 또한 스프링부트를 반복 실행시켜서 Job을 여러 차례 동작시켜도 Accounts테이블에는 데이터가 반복 추가되지 않았는데, 아마도 Spring Batch의 메타데이터를 관리하는 JobRepository에서  이미 실행된 Job이라고 판단하여 Job의 반복 실행을 방지하는 것이라고 추측된다.

 

 

 

스케줄러 적용

스케줄링(Scheduling)이란 일정한 시간 간격을 두고 반복적인 작업을 수행하는 것을 말한다. 앞서 말했듯이 배치 작업의 경우 실시간 작업이 아닌, 특정 시간대에 예약 작업으로 진행되는 경우가 많다. 이를 적용하기 위해서 스프링에서 제공하는 @Scheduler 어노테이션이나, spring boot quartz와 같은 스케줄링 라이브러리를 사용할 수 있다. 

 

 

@Component
@RequiredArgsConstructor
public class SampleScheduler {

    private final Job testJob;
    private final JobLauncher jobLauncher; // 스케줄링을 활용하여 Job 실행

    @Scheduled(cron = "0 */1 * * * *") // 1분마다 실행할 수 있게 함
    public void testJobRun() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException {

        JobParameters jobParameters = new JobParameters(
                Collections.singletonMap("requestTime", new JobParameter(System.currentTimeMillis(), Long.class))
        );

        jobLauncher.run(testJob, jobParameters);
    }
}

 

@Scheduled 어노테이션을 사용하여 Job을 실행하는 예시이다. application.yml 파일에서 batch의 enabled 속성을 false로 지정하여 Job이 스프링 부트 시작시점에 자동 실행되는 것이 아니라 스케줄러에게 위임되고, JobLauncher를 사용하여 직접 주입받은 Job을 실행한다.