[Spring] 1. Spring Batch의 기본 개념
January 30, 2020
Spring Batch Guide 시리즈
- Spring Batch의 기본 개념
- Spring Batch의 ItemReader - File Item Reader
- Spring Batch의 ItemReader - DB Item Reader
Chapter 1에서는 Spring Batch의 구조와 동작 방식의 대한 이야기를 다룹니다
Spring의 DI,AOP,서비스 + Accenture의 Batch 노하우
해당 포스팅의 모든 소스는 Github에서 확인하실 수 있습니다
Batch 를 통한 일괄처리
Spring Batch의 특성
- 자동화 - 개입 없이
자동 실행
- 견고성 -
충돌/중단
없는 안전한 처리 - 신뢰성 - 이슈 처리를 추적 할
로깅/알림
기능 - 성능 - 처리 완료와 독립적 수행의 대한
퍼포먼스
확보
Data Reader
와 Writer
를 지원
DataSource | 기술 | 설명 |
---|---|---|
Database | JDBC | 페이징, 커서, 일괄 업데이트등 |
Database | Hibernate | 페이징, 커서 |
Database | JPA | 페이징 (커서 기능 삭제됨) |
File | Flat file | 지정한 구분자로 파싱 |
File | XML | XML 파싱 |
Ibatis 모듈은 현재 삭재 되었고 JDBC ItemReader로 교체를 추천
Spring Batch 기본 구조
Spring Batch 실행을 위해서는 다음과 같이 SpringApplication에 annotation을 선언해 주는것 부터 시작합니다
@EnableBatchProcessing @SpringBootApplication
public class BatchApplication {
public static void main(String[] args) {
SpringApplication.run(BatchApplication.class, args);
}
}
다음으로 Spring Batch의 구조를 확인하겠습니다
Spring Batch는 크게 계층형 구조를 이루고 있습니다
Job > Step > Task
Source : Spring Batch Architecture
위에 그림에서 처럼 Job은 Step을 가지고 있고 Step은 Tasklet 인터페이스를 통해 수행 작업들을 가지고 있습니다
Task는 기본적인 사용자 정의 형식과 read / process / write (RPW) 형식이 존재합니다
그럼 각각의 요소들을 정리해 보겠습니다
1. Job
-
Batch Job의 한 단위
@Bean public Job jobBean(){ return jobBuilderFactory.get("{Job Name}") .start(stepBean()) .build(); }
2. Step
- Job내부에서 수행될 1개의 Step
-
Tasklet에 Step에서 수행할 기능으로 써 2가지의 형태가 존재
- User 커스텀
- Reader & Processor & Writer(RPW)가 한 묶음으로 존재
@Bean public Step stepBean(){ return stepBuilderFactory.get("{Step Name}") .tasklet(((contribution, chunkContext) -> { log.info("[========= This is Step ==========]"); return RepeatStatus.FINISHED; })) .build(); }
파라미터로 Job 실행 지정하기
Spring Batch는 자체적으로 스케줄 처리를 하는 기능이 없습니다
그래서 외부에서 Crontab / Jenkins 등을 사용해서 Job을 실행하게 됩니다
이때 program argument를 통해 job name을 전달해서 원하는 job을 실행 하실 수 있습니다
@Bean
public Job jobBean(){
return jobBuilderFactory.get("jpaItemListWriterJob")
.start(stepBean())
.build();
}
spring:
batch:
job:
names: ${job.name:NONE}
원하는 job을 외부 파라미터로 실행할 경우 다음과 같이 properties로 지정해 줍니다
properties를 다음과 같이 지정하므로 모든 job이 실행되는 것을 미연에 방지할 수 있습니다
그리고 다음으로 program argument 파라미터를 지정해 주시면 됩니다
예를 들어 jpaItemListWriterJob 이라는 job을 실행하고 싶을 경우 다음과 같이 해주시면 됩니다
program argument : --job.name=jpaItemListWriterJob
Srping Boot Meta Data
Spring Batch에서는 DB를 통해 완료/실패
와 같은 상태관리
를 합니다
크게 4가지의 상태를 DB에 저장하는데
- 이전 실행 Job History
- 실패한 Batch와 Parameter / 성공한 Job
- 실행 재개 지점
- Job 기준 Step현황과 성공/실패 여부
Source : Spring_Batch_Doc
해당 Table들은 Spring Batch 동작에 꼭 필요하며 H2 DB
사용시 자동으로 생성되지만
그외 DB들은 직접 생성해 주어야합니다
DB DDL
쿼리는 org.springframework.batch.core에 포함되어 있고 탐색 및 schema 검색으로 확인할 수 있습니다
-
BATCH_JOB_INSTANCE
- job이 실행 되는 단위
- job의 name/key/version 등의 정보를 가지고 있습니다
-
BATCH_JOB_EXCUTION_PARAMS
- job과 1:1의 관계를 갖는 parameters 입니다
- job과 1:1의 속성때문에 param이 다르면 job_instance가 새롭게 생성됩니다
- Map타입으로 지정데이터를 job에 넘길 수 있습니다
@Bean
public Job jobBean() {
return jobBuilderFactory.get("testJob")
.start(stepBean(null))
.build();
}
@Bean
@JobScope
public Step stepBean(@Value("#{jobParameters[requestDate]}") String requestDate) {
return stepBuilderFactory.get("testStep")
.tasklet(((contribution, chunkContext) -> {
log.info("[========= This is Step ==========]");
log.info("[========= requestDate {} ==========]", requestDate);
return RepeatStatus.FINISHED;
}))
.build();
}
Spring Boot Batch Instance
Spring Boot Batch Param
-
동일
Parameter
로 실행시 이미Instance
가 존재해서 에러가 납니다
Parameter Exist Error
-
BATCH_JOB_EXECUTION
- batch_job_instance와 대응되면서
성공/실패
내역을 갖고 있습니다 - process는 해당 table을 조회해서 재수행이 필요한 job만 처리 합니다
- batch_job_instance와 대응되면서
BATCHJOBEXECUTION
이외의 Step과 관련된 Meta Table은 생략하겠습니다
Job과 비슷하므로 한번 실행해 보시면 바로 이해 가능하실껍니다
추가적으로 Batch에서 Meta Table을 사용하지 않고 RunIdIncrementer을 설정해서 실행도 가능합니다
@Bean
public Job autoIncrementJob() {
return jobBuilderFactory.get("autoIncrementJob")
.incrementer(new RunIdIncrementer())
.start(autoIncrementStep())
.build();
}
⚠ 하지만 이 방식은 기존의 Spring Batch을 상태관리의 장점을 지워버리므로 추천드리지 않습니다
Spring Batch Flow
다음은 Batch으 흐름제어 입니다
여기서는 간단하게 실패와 성공 등등의 여부의 흐름만을 설명합니다
이후 skip
과 같은 예외사항의 대한 흐름제어는 3부에서 설명하겠습니다
여기서는 기본적인 Job의 흐름제어는 상태를 저장하고 그 상태의 대한 조건으로 흐름을 제어합니다
기본적은 상태 저장
contribution.setExitStatus(ExitStatus.FAILED); //setExitStatus로 상태를 저장 할 수 있다
흐름 제어 메소드
- on - 이전 step의 status에 대한 다음 행동
- to - on과 연결된 다음 행동
- end - 반환 / build 종료 2가지가 존재 맺음 메소드
- from - on과 end 이외 추가전 이벤트 캐치 사용에 사용
@Slf4j
@Configuration
@AllArgsConstructor
public class JobSecondConfig {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
@Bean
public Job jobSecondBean(){
return jobBuilderFactory.get("testJob2")
.start(jobStep1()) .on("FAILED") //FAILED 이면 .to(jobStep3()) //step3 실행 .on("*") //to와 관계없이 .end() //반환 종료 .from(jobStep1()) //on을 사용한후 추가적 이벤트 캐치 .on("*") //FAILED외의 모든 것 .to(jobStep2()) //step2 실행 .next(jobStep3()) //정상 종료되면 step3 실행 .on("*") //step3 결과 상관없이 .end() //반환 종료 .end() //build 종료 .build(); }
@Bean
public Step jobStep1(){
return stepBuilderFactory.get("step1")
.tasklet((contribution, chunkContext) -> {
log.info("[========= This is Step1 ==========]");
contribution.setExitStatus(ExitStatus.FAILED); //Status Failed return RepeatStatus.FINISHED;
})
.build();
}
@Bean
public Step jobStep2(){
return stepBuilderFactory.get("step2")
.tasklet((contribution, chunkContext) -> {
log.info("[========= This is Step2 ==========]");
return RepeatStatus.FINISHED;
})
.build();
}
@Bean
public Step jobStep3(){
return stepBuilderFactory.get("step3")
.tasklet((contribution, chunkContext) -> {
log.info("[========= This is Step3 ==========]");
contribution.setExitStatus(ExitStatus.FAILED); //Status Failed
return RepeatStatus.FINISHED;
})
.build();
}
}
위에서 하이라이팅
된 부분을 보시면 어떤 형식으로 흐름제어를 하는지 한눈에 들어 오실 겁니다
그리고 실행해보면 아래와 같은 결과를 얻으실 수 있습니다
Fail Flow
다음은 정상적으로 FINISHED
로 끝난 코드입니다
@Bean
public Step jobStep1(){
return stepBuilderFactory.get("step1")
.tasklet((contribution, chunkContext) -> {
log.info("[========= This is Step1 ==========]");
/* contribution.setExitStatus(ExitStatus.FAILED); */ return RepeatStatus.FINISHED;
})
.build();
}
Standard Flow
BATCH_STEP_EXECUTION 해당 table에서는 각각의 Step에 대한 성공 실패 여부가 기록됩니다
BATCHSTEPEXECUTION
하지만 위의 코드에서는 문제점이 있습니다
다음으로는 그 문제점들을 알아보고 해결해 보도록 하겠습니다
위의 코드의 문제점
Step
이 Flow랑 Process처리라는2가지
의 역할을 수행 합니다- ExitStatus로는 다양한
Flow 처리
에 번거로움이 있습니다
JobExecutionDecider 를 통한 Flow 처리
Spring Batch에서는 이를 위해 JobExecutionDecider
인터페이스를 구현하는 흐름처리를 제공합니다
이렇게 Step과 Flow 처리를 분리하므로 보다 결합성 낮은 코드를 구현할 수 있습니다
@Bean
public Job jobDeciderBean() {
return jobBuilderFactory.get("deTestJob")
.start(startStep())
.next(decider()) //JobExecutionDecider()로직 처리
.from(decider()) //JobExecutionDecider()결과 확인
.on("testDecide")
.to(startStep2())
.end()
.build();
}
@Bean
public JobExecutionDecider decider() {
return new jobDecider();
}
public static class jobDecider implements JobExecutionDecider{
@Override //조건 정의 method
public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
return new FlowExecutionStatus("testDecide");
}
}
JobExecutionDecider로 Flow를 제어하여 역할을 분리하고 유동적으로 많은 조건들을 생성해서 적용 가능합니다
JobParameter & Scope
Process 운영하다 보면 외부의 데이터를 통한 구현이 필요할 때가 있습니다
Spring Batch에서는 Barch Component에서 사용할 수 있게 지원되는 파라미터를 제공합니다
jobParameters
통해 간단하게 값을 받아 올 수 있습니다
또한 기본적으로 4가지의 Double
, Long
, Date
, String
형식만을 지원합니다
jobParameter 기본형식
@Value("#{jobParameters[parameterName]}")
⚠ JobParameters를 사용시에는 구현부에 다음과 같은 annotation
들이 강제 됩니다
JobParameter Scope
- JobScope : Step 사용시 사용
- StepScope : Tasklet 사용시 사용
간단한 예제를 통해 동작 방식을 알아 보겠습니다
실행 시 parameter 지정
program argument : --job.name=jobScope requestDate=20200203
실행은 다음과 같이 jobScope라는 job을 실행하며 뒤에 requestDate라는 파라미터를 주게 됩니다
@Slf4j
@Configuration
@AllArgsConstructor
public class JobParameterConfig {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
@Bean
public Job jobScopeBean() {
return jobBuilderFactory.get("jobScope")
.start(scopeStep1(null)) .next(scopeStep2())
.build();
}
@Bean
@JobScope //Step에 대해서 설정 public Step scopeStep1(@Value("#{jobParameters[requestDate]}") String requestDate) { return stepBuilderFactory.get("scopeStep1")
.tasklet((contribution, chunkContext) -> {
log.info("[========= This is scopeStep1 ==========]");
log.info("{}",requestDate);
return RepeatStatus.FINISHED;
}).build();
}
@Bean
public Step scopeStep2(){
return stepBuilderFactory.get("scopeStep2")
.tasklet(scopeStep2Tasklet(null))
.build();
}
@Bean
@StepScope //Tasklet에 대해서 설정
public Tasklet scopeStep2Tasklet(@Value("#{jobParameters[requestDate]}") String requestDate){
return (contribution, chunkContext) -> {
log.info("[========= This is scopeStep2 ==========]");
log.info("{}",requestDate);
return RepeatStatus.FINISHED;
};
}
}
위의 하이라이팅된 부분을 보시면 Step
의 처음에는 null로 입력하지만
이후 실행에서는 정상적으로 20200203
라는 값을 얻을 수 있습니다
근데 여기서 의문을 가지시는 분들이 계실 수 있습니다
Spring의 Bean은 기본적으로 singleton
scope로 구현되어 집니다
하지만 이렇게 실행되어 지면 외부에서 가져오는 값을 통한 jobParameters 매핑이 어려울 수 있습니다
그에 대한 해답이 @StepScope
와 @JobScope
입니다
@Bean에 @StepScope와 @JobScope를 같이 사용하면 Step의 시작과 종료시 생성/삭제가 이루어지게 됩니다
즉 해당 annotaion들을 통해서 singleton이 아닌 prototype
으로 scope가 생성되어 집니다
거기다.. proxy로요..
prototype
으로 구현되어 얻을수 있는 장점은 크게 2가지가 있습니다
- JobParameter의 Late Binding
일반적인 Bean 생성시점이 아닌 지점에서 생성 되므로
Controller
와Service
와 같은 비지니스 로직 처리단계에서 Job Parameter를 할당 할 수 있게 됩니다 - Component Parallel Processing 👉 Race Condition 문제
일반적인Singleton
처럼 생성되면 각각의 Step에서 Tasklet의 멤버변수등의 상태를 수정하는 일이 생기면서 데이터가 덮어써지게 됩니다
@StepScope
로 각각의 Step별로 별도의 Tasklet를 생성하고 관리하게 하므로써 이러한 문제를 해결 할 수 있습니다
JobParameter 주의 사항
위에서의 설명을 요약하면 이렇습니다
- JobParameter는
@Value
를 통해서만 값을 할당 받을 수 있습니다 @JobScope
와@StepScope
로 Bean을 생성할때만 Jobparameter가 생성되어 사용 할 수 있습니다
만약 이를 어기게 되면 바로 다음과 같은 Error를 보실 수 있습니다
Job Parameter Bean
Job Tasklet
Result
만약 @StepScope
가 없는 Bean에 Jobparameter를 지정하게 되면 생성시점
때문에 error가 납니다
JobParameter를 사용하는 이유
-
Late Binding (Command Line실행외의 다른 실행이 어려워 진다)
다음과 같이 동적 parameter의 대한 대응을 할 수 없습니다@Slf4j @RestController public class JobController { //단순 예제 private final JobLauncher jobLauncher; private final Job job; public JobController(JobLauncher jobLauncher, @Qualifier("JobParameterBean") Job job) { this.jobLauncher = jobLauncher; this.job = job; } @GetMapping("/launchjob") public String handle(@RequestParam("requestDate") String requestDate) { try { JobParameters jobParameters = new JobParametersBuilder() .addString("requestDate", requestDate) .addLong("time", System.currentTimeMillis()) .toJobParameters(); jobLauncher.run(job, jobParameters); } catch (Exception e) { log.error("Error : {}", e.getMessage(), e); } return "Deon"; } }
Spring Batch에서는 웹서버로 Batch를 관리하기를권장하지 않습니다
⚠
- Meta Table의 활용성 (Job과 Parameter의
1:1
관계)
이 처럼 Meta Table을 활용하여 Job을 1번만 실행하는지의 여부 확인이 어려워 집니다
@Bean과 @StepScope를 같이 사용할 때 주의사항
기억보단 기록을 블로그의 StepScope 사용시 주의사항을 정리해서 요약했습니다
Spring Batch에서 ItemReader를 구현시 @Bean
을 사용해서 구현하곤 합니다
문제는 @Bean
과 JobParameter를 같이 써야 하면서 @StepScope
를 같이 추가하면서 일어납니다
@Bean
@StepScope
public ItemReader<Test> reader(@Value("#{jobParameters[parameter]}") String parameter){
...
JpaPagingItemReader<Test> reader = new JpaPagingItemReader<>();
...
return reader;
}
해당 코드와 같이 사용하고 일반적으로 JpaPagingItemReader
를 사용하게 되면 ItemStream
인터페이스를 가지고 있지 않아서 Error가 납니다
이유인 즉 @StepScope
가 선언됨에 따라 Bean이 Proxy
로 설정 되면서 실제 생성한것과 다르게 ItemReader로 return 되기 때문입니다
@Scope(value = "step", proxyMode = ScopedProxyMode.TARGET_CLASS)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface StepScope {}
Scope의 관해서 자세히 알고 싶으시면 제 블로그의 다른 글 👉 Spring Scope의 관해서을 확인 하시기 바랍니다
Chunk 지향 처리
Spring Batch의 가장 큰 장점 중 하나는 Chunk
지향 처리입니다
Chunk
지향처리란 한 번에 하나씩의 데이터를 읽어 Chunk라는 덩어리를 만든 뒤, Chunk 단위로 트랜잭션을 다루는 것을 의미합니다
그래서 트랜잭션을 수행시 Chunk
단위로 수행하기 때문에 Chunk 만큼만 롤백 됩니다
Source : Spring Batch Docs Chunk-oriented
위의 그림은 Spring Batch에서 Item단위로 처리 프로세스가 어떻게 이루어지는지 나타냅니다
ItemReader와 ItemProcessor를 처리한뒤 마지막에 ItemWriter으로 전달 합니다
Source : 기억보단 기록을 Chunk 지향처리
위의 그림은 이해를 돕기 위한 그림입니다
Reader와 Processor가 Item을 1건씩 처리하고 Chunk단위 만큼 처리가 끝나면 Writer쪽으로 전달되어 일괄 처리 됩니다
Page Size 와 Chunk Size
Spring Batch에서 일반적으로 Reader로 PagingItemReader
를 많이들 사용합니다
일반적으로 Page Size와 Chunk Size를 같은 의미로 생각할 수 있습니다
하지만 Page Size와 Chunk Size는 서로 의미하는 바가 다릅니다
1. Chunk Size는 한번에 처리될 트랜잭션 단위
2. Page Size는 한번에 조회할 Item의 양
아래 코드로 ItemReader에서 Read가 이루어지는 경우를 한눈에 볼 수 있습니다
@Override
protected T doRead() throws Exception {
synchronized (lock) {
//results가 없거나, index가 pageSize를 초과한 경우 if (results == null || current >= pageSize) {
if (logger.isDebugEnabled()) {
logger.debug("Reading page " + getPage());
}
//신규 Item을 읽어서 results list에 추가한다
doReadPage(); page++;
if (current >= pageSize) {
current = 0;
}
}
int next = current++; if (next < results.size()) { return results.get(next); }
else {
return null;
}
}
}
다음과 같이 Read는 Page Size
단위로 이루어지며 쿼리 실행시 Page의 Size를 지정하기 위한 용도입니다
Chunk는 Item이 처리되는 단위이며 이 때문에 Chunk Size와 Page Size가 다를 경우 불필요한 Read가 발생할 수 있습니다
즉 Chunk Size가 Page Size보다 클경우 Chunk Size를 만족할 만큼의 Page Read가 이루어 집니다
그래서 JpaPagingItemReader에서는 다음과 같이 주석으로 알려주고 있습니다
/*
* <p>
* Setting a fairly large page size and using a commit interval that matches the * page size should provide better performance. * </p>
*/
public class JpaPagingItemReader<T> extends AbstractPagingItemReader<T> {
...
}
상당히 큰 페이지 크기를 설정하고 페이지 크기와 일치하는 커미트 간격을 사용하면 성능이 향상됩니다
즉 Chunk Size와 Page Size를 일치 시키는게 보편적으로 좋은 방법입니다
- JPA에서의 영속성 컨텍스트가 깨지는 문제도 있을 수 있다고 합니다
관련 정리 👉 영속성 컨텍스트 문제