7. Item processing
아이템리더(ItemReader)
및 아이템라이터(ItemWriter)
인터페이스는 둘 다 특정 작업에 매우 유용하지만, 쓰기 전에 동작하는 비즈니스 로직을 넣으려면 어떻게 해야 할까? 읽기와 쓰기 모두 한 가지 옵션은 컴포짓(composite) 패턴을 사용하는 것이다: 다른 아이템라이터(ItemWriter)
를 포함하는 아이템라이터(ItemWriter)
또는 다른 아이템리더(ItemReader)
를 포함하는 아이템리더(ItemReader)
를 만든다. 다음 코드는 예시를 보여준다.
public class CompositeItemWriter<T> implements ItemWriter<T> {
ItemWriter<T> itemWriter;
public CompositeItemWriter(ItemWriter<T> itemWriter) {
this.itemWriter = itemWriter;
}
public void write(Chunk<? extends T> items) throws Exception {
//여기에 비즈니스 로직 추가
itemWriter.write(items);
}
public void setDelegate(ItemWriter<T> itemWriter){
this.itemWriter = itemWriter;
}
}
위 클래스에는 일부 비즈니스 로직을 제공한 후 위임하는 다른 아이템라이터(ItemWriter)
가 포함되어 있다. 이 패턴은 아이템리더(ItemReader)
에도 쉽게 사용할 수 있으며, 메인 아이템리더(ItemReader)
에서 제공한 입력을 기반으로 더 많은 참고 데이터를 얻을 수 있다. 직접 작성(write)
하여 호출을 제어해야 하는 경우 유용하다. 그러나 실제로 쓰기 전에 전달된 아이템을 “변환(“transform”)”하려는 경우에는, 직접 코드를 작성(write)
할 필요가 없다. 아이템을 수정하면 됩니다. 이 경우 스프링 배치는 다음 인터페이스 정의와 같이 아이템프로세서(ItemProcessor)
인터페이스를 제공한다:
public interface ItemProcessor<I, O> {
O process(I item) throws Exception;
}
아이템프로세서(ItemProcessor)
는 간단하다. 하나의 객체가 주어지면, 변환(transform)하고 다른 객체를 반환한다. 제공된 객체는 동일한 타입일 수도 있고 아닐 수도 있다. 요점은 비즈니스 로직이 프로세스 내에 적용될 수 있으며, 해당 로직을 생성하는 것은 전적으로 개발자의 몫이라는 것다. 아이템프로세서(ItemProcessor)
를 스텝에 직접 연결할 수 있다. 예를 들어, 아이템리더(ItemReader)
가 Foo 타입의 클래스를 제공하고 쓰기 전에 Bar 타입으로 변환해야 한다고 가정하자. 다음 예는 변환을 수행하는 아이템프로세서(ItemProcessor)
를 보여준다:
public class Foo {}
public class Bar {
public Bar(Foo foo) {}
}
public class FooProcessor implements ItemProcessor<Foo, Bar> {
public Bar process(Foo foo) throws Exception {
// 간단한 변환 수행, Foo를 Bar로 변환
return new Bar(foo);
}
}
public class BarWriter implements ItemWriter<Bar> {
public void write(Chunk<? extends Bar> bars) throws Exception {
//bars 작성
}
}
위 예제에는 Foo
클래스, Bar
클래스 및 아이템프로세서(ItemProcessor)
인터페이스를 준수하는 FooProcessor
라는 클래스가 있다. 변환은 간단하지만 여기서 모든 타입의 변환을 수행할 수 있다. BarWriter
는 Bar
객체를 작성하고 다른 타입이 제공되면 예외를 발생시킨다. 유사하게, FooProcessor
는 Foo
이외의 것이 제공되면 예외를 발생시킨다. FooProcessor
는 다음 예제와 같이 스텝에 주입될 수 있다:
XML 구성
<job id="ioSampleJob">
<step name="step1">
<tasklet>
<chunk reader="fooReader" processor="fooProcessor" writer="barWriter" commit-interval="2"/>
</tasklet>
</step>
</job>
자바 구성
@Bean
public Job ioSampleJob(JobRepository jobRepository) {
return new JobBuilder("ioSampleJob", jobRepository)
.start(step1())
.build();
}
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<Foo, Bar>chunk(2, transactionManager)
.reader(fooReader())
.processor(fooProcessor())
.writer(barWriter())
.build();
}
아이템프로세서(ItemProcessor)
와 아이템리더(ItemReader)
또는 아이템라이터(ItemWriter)
의 차이점은 아이템프로세서(ItemProcessor)
가 선택적으로 스텝에서 사용될 수 있다는 점이다.
7.1. Chaining ItemProcessors
하나의 변환을 수행하는 것은 많은 상황에서 유용하지만, 여러 아이템프로세서(ItemProcessor)
구현체를 함께 “연결(chain)”하려면 어떻게 해야 할까? 앞에서 언급한 컴포짓(composite) 패턴을 사용면된다. 다음 예제와 같이 Foo는 Bar로 변환된 후 Foobar로 변환되고 기록된다:
public class Foo {}
public class Bar {
public Bar(Foo foo) {}
}
public class Foobar {
public Foobar(Bar bar) {}
}
public class FooProcessor implements ItemProcessor<Foo, Bar> {
public Bar process(Foo foo) throws Exception {
//간단한 변환 수행, Foo를 Bar로 변환
return new Bar(foo);
}
}
public class BarProcessor implements ItemProcessor<Bar, Foobar> {
public Foobar process(Bar bar) throws Exception {
return new Foobar(bar);
}
}
public class FoobarWriter implements ItemWriter<Foobar>{
public void write(Chunk<? extends Foobar> items) throws Exception {
//items 작성
}
}
FooProcessor
와 BarProcessor
는 다음 예제와 같이 결과 Foobar
를 제공하기 위해 함께 ‘연결(chain)’될 수 있다:
CompositeItemProcessor<Foo,Foobar> compositeProcessor = new CompositeItemProcessor<Foo,Foobar>();
List itemProcessors = new ArrayList();
itemProcessors.add(new FooProcessor());
itemProcessors.add(new BarProcessor());
compositeProcessor.setDelegates(itemProcessors);
위의 예시와 마찬가지로, 컴포짓(composite) 프로세서를 스텝으로 구성할 수 있다: XML 구성
<job id="ioSampleJob">
<step name="step1">
<tasklet>
<chunk reader="fooReader" processor="compositeItemProcessor" writer= "foobarWriter" commit-interval="2"/>
</tasklet>
</step>
</job>
<bean id="compositeItemProcessor" class="org.springframework.batch.item.support.CompositeItemProcessor">
<property name="delegates">
<list>
<bean class="..FooProcessor" />
<bean class="..BarProcessor" />
</list>
</property>
</bean>
자바 구성
@Bean
public Job ioSampleJob(JobRepository jobRepository) {
return new JobBuilder("ioSampleJob", jobRepository)
.start(step1())
.build();
}
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<Foo, Foobar>chunk(2, transactionManager)
.reader(fooReader())
.processor(compositeProcessor())
.writer(foobarWriter())
.build();
}
@Bean
public CompositeItemProcessor compositeProcessor() {
List<ItemProcessor> delegates = new ArrayList<>(2);
delegates.add(new FooProcessor());
delegates.add(new BarProcessor());
CompositeItemProcessor processor = new CompositeItemProcessor();
processor.setDelegates(delegates);
return processor;
}
7.2. Filtering Records
아이템 프로세서(item processor)의 일반적인 용도는 레코드가 아이템라이터(ItemWriter)
로 전달되기 전에 레코드를 필터링하는 것이다. 필터링은 스킵(skip)과는 별개의 작업이다. 스킵은 레코드가 유효하지 않음을 나타내고, 필터링은 레코드를 쓰지 않아야 함을 나타낸다.
예를 들어, 삽입할 레코드, 업데이트할 레코드 및 삭제할 레코드의 세 가지 다른 타입의 레코드가 포함된 파일을 읽는 배치 잡을 생각해보자. 시스템에서 레코드 삭제를 지원하지 않는 경우, 삭제할 수 있는 레코드를 아이템라이터(ItemWriter)
로 보내지 않아야할 것이다. 그러나 이러한 레코드는 실제로 잘못된 레코드가 아니므로 스킵보다는 필터링하는 것이 좋다. 결과적으로 아이템라이터(ItemWriter)
는 삽입 및 업데이트 가능한 레코드만 받는다.
레코드를 필터링하려면 아이템프로세서(ItemProcessor)
에서 null을 반환할 수 있다. 프레임워크는 결과가 null
임을 감지하고, 아이템라이터(ItemWriter)
에 전달된 레코드 목록에 해당 아이템을 추가하지 않는다. 아이템프로세서(ItemProcessor)
에서 예외가 발생하면 스킵한다.
7.3. Validating Input
아이템리더(ItemReader)
및 아이템라이터(ItemWriter)
장에서는 입력에 대한 파싱을 여러 접근 방식으로 설명한다. 각각의 주요 구현체은 “올바른 구성(well formed)”이 아닌 경우 예외를 발생시킨다. 픽스드렝스토크나이저(FixedLengthTokenizer)
는 데이터 범위가 누락된 경우 예외를 발생시킨다. 마찬가지로, 로우매퍼(RowMapper)
또는 필드셋매퍼(FieldSetMapper)
에 존재하지 않거나 예상과 다른 형식의 인덱스에 접근하려고 하면 예외가 발생한다. 이러한 모든 타입의 예외는 read
반환 전에 발생한다. 그러나, 반환된 아이템의 유효성에 대한 문제는 다루지 않는다. 예를 들어, 필드 중 하나가 나이인 경우 음수가 될 수 없다. 존재하고 숫자이기 때문에, 올바르게 파싱할 수 있고 예외가 발생하지는 않는다. 이미 수많은 유효성 검사 프레임워크가 있기 때문에, 스프링 배치는 그것을 제공하려고 시도하지 않는다. 오히려, 다음 인터페이스와 같이 여러 프레임워크에서 구현할 수 있는 밸리데이터(Validator)
를 제공한다:
public interface Validator<T> {
void validate(T value) throws ValidationException;
}
객체가 유효하지 않으면 validate
메서드가 예외를 발생(throw)시키고 유효하면 정상적으로 반환한다. 스프링 배치는 다음 빈 정의와 같이 밸리데이터아이템프로세서(ValidatingItemProcessor)
를 제공한다: XML 구성
<bean class="org.springframework.batch.item.validator.ValidatingItemProcessor">
<property name="validator" ref="validator" />
</bean>
<bean id="validator" class="org.springframework.batch.item.validator.SpringValidator">
<property name="validator">
<bean class="org.springframework.batch.sample.domain.trade.internal.validator.TradeValidator"/>
</property>
</bean>
자바 구성
@Bean
public ValidatingItemProcessor itemProcessor() {
ValidatingItemProcessor processor = new ValidatingItemProcessor();
processor.setValidator(validator());
return processor;
}
@Bean
public SpringValidator validator() {
SpringValidator validator = new SpringValidator();
validator.setValidator(new TradeValidator());
return validator;
}
또한 빈밸리데이팅아이템프로세서(BeanValidatingItemProcessor)
를 사용하여 빈 밸리데이션 API(Bean Validation API(JSR-303)) 어노테이션이 달린 아이템의 유효성을 검사할 수 있다. 예를 들어, 다음 Person
을 생각해보자:
class Person {
@NotEmpty
private String name;
public Person(String name) {
this.name = name;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
애플리케이션 컨텍스트에서 빈밸리데이팅아이템프로세서(BeanValidatingItemProcessor)
빈을 선언하여 아이템의 유효성을 검사하고 청크 지향 스텝에서 프로세서로 등록할 수 있다:
@Bean
public BeanValidatingItemProcessor<Person> beanValidatingItemProcessor() throws Exception {
BeanValidatingItemProcessor<Person> beanValidatingItemProcessor = new BeanValidatingItemProcessor<>();
beanValidatingItemProcessor.setFilter(true);
return beanValidatingItemProcessor;
}
7.4. Fault Tolerance
청크가 롤백되면, 읽는 동안 캐시된 아이템이 다시 처리될 수 있다. 스텝이 내결함성(fault-tolerant)(일반적으로 스킵 또는 재시도 처리 사용)으로 구성된 경우, 사용되는 모든 아이템프로세서(ItemProcessor)
는 멱등적(idempotent)으로 구현되어야 한다. 일반적으로 아이템프로세서(ItemProcessor)
의 입력 아이템을 변경하지 않고 결과 인스턴스만 업데이트하는 것으로 구성된다.