스프링 배치(Spring Batch) 파티셔닝(Partitioning)은 스프링 배치(Spring Batch)의 Scalling 기능 중 하나다. 스프링 배치(Spring Batch)에서 1개로만 배치를 돌리면 적은 데이터에서는 그렇게 차이는 없는데, 데이터가 많아질 경우에는 1개로 배치를 돌리기에는 시간이 너무 많이 걸리고, 그 만큼 대기하는 시간과 프로세스를 처리하는 시간이 길어진다. 그래서 이 단점을 보완하기 위한 방법 중 1개인 스프링 배치(Spring Batch)의 파티셔닝(Partitioning)이다.
스프링 배치(Spring Batch) 파티셔닝(Partitioning)
스프링 배치(Spring Batch) 파티셔닝(Partitioning)은 데이터를 나눠 Chunk라는 단위로 나눠 배치를 돌리는 기능이다.
예를 들어, 100개의 데이터가 있는 경우에 Chunk를 4개로 나눠 각 25개씩 배치를 병렬로 처리하는 것을 말한다. 그래서, 대량의 데이터들을 처리할 때 스프링 배치(Spring Batch) 파티셔닝(Partitioning)을 사용한다.
파티셔닝(Partitioning)은 독립적인 Step들을 설정한 크기에 맞게 구성하여 독립적인 Step들 각각 StepExecution을 갖게 된다. 즉, 아래의 그림과 같이 Step은 ItemReader, ItemProcessor, ItemWriter 각 1개씩 가지게 된다. 그래서, 파티셔닝(Partitioning)은 각 Step을 가지기 때문에 병렬처리가 가능하다.
스프링 배치(Spring Batch) 파티셔닝(Partitioning) 예제
1. partitionChunkJob.xml
- job id는 partitionChunkJob으로, parallelPartitioner를 파티셔너로 사용한다.
- grid-size는 병렬 처리할 Executor의 개수(Thread 개수)
<job id="partitionChunkJob" restartable="true" >
<step id="step01">
<partition step="partitionStep" partitioner="parallelPartitioner">
<handler grid-size="4" task-executor="taskExecutor" />
</partition>
</step>
<job>
2. 각 파티셔너와 Executor를 java 파일로 Bean 주입한다.
<bean id="parallelPartitioner" class="com.batch.util.ParallelPartitioner"/>
<bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor"/>
1) ParallelPartitioner.java
package com.batch.util;
import java.util.HashMap;
import java.util.Map;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;
public class ParallelPartitioner implements Partitioner {
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
Map<String, ExecutionContext> result = new HashMap<>();
for (int index = 0; index < gridSize; index++) {
ExecutionContext value = new ExecutionContext();
value.putInt("parallelTotal", gridSize);
value.putInt("parallelIndex", index);
result.put("partition" + index, value);
}
return result;
}
}
3. partitionStep id를 가진 step.
- 1개의 stepReader와 1개의 stepWriter를 가진다.
- reader : 작업대상을 불러드릴 인터페이스
- processor : reader 에서 읽어온 데이터를 ETL(Extract, Transfrom, Load)을 한다. 프로세서 단계는 생략되어도 다음작업이 가능하다.
- writer : 처리된 데이터를 Insert 또는 Update 처리한다.
- commit-interval : 설정된 값만큼 위의 reader, processor, writer 작업을 거친 후에 트랜잭션 처리를 한다. 작업량에 따라 커밋 범위가 설정되어야 한다.
- skip-limit : 예외 발생시 skip count가 증가하며 지정 숫자 이상 발생하면 step은 fail 처리된다.
- skippable-exception-classes : java.lang.Exception 발생시 Skip이 일어나도록 설정되어 있다.
<step id="partitionStep" xmlns="http://www.springframework.org/schema/batch">
<tasklet transaction-manager="transactionManagerT1">
<chunk reader="stepReader" writer="stepWriter" commit-interval="100"></chunk>
</tasklet>
</step>
4. 각 stepReader와 stepWriter는 개발자가 수정해야 하는 곳이다.
<bean id="stepReader" class="com.batch.tasklet.Reader" scope="step">
<property name="sqlSessionFactory" ref="sqlSessionFactoryT1"/>
<property name="queryId" value="Batch.TRAN_TEST_BAT_CHUNK_LIST"/>
</bean>
<bean id="stepWriter" class="com.batch.tasklet.Writer" scope="step">
<property name="sqlSessionTemplateT1" ref="sqlSessionTemplateT1" />
<property name="sqlSessionTemplateT2" ref="sqlSessionTemplateT2" />
</bean>
최근댓글