13. Spring Batch Integration
많은 스프링 배치 사용자는 스프링 인테그레이션(Spring Integration)을 사용하면 효율적이고 간단하게 구현가능한 요구 사항이 있다. 반대로, 스프링 인테그레이션(Spring Integration) 사용자는 스프링 배치를 사용해야할 요구 사항이 있을 수 있으며 두 프레임워크를 효율적으로 통합하는 방법이 필요할 수 있다. 이러한 맥락에서 여러 패턴과 사례가 나타나고 스프링 배치 인테그레이션((Spring Batch Integration))은 이러한 요구 사항을 해결한다.
스프링 배치와 스프링 인테그레이션 사이의 경계가 항상 명확하지 않지만, 세분화에 대해 생각하고 공통 패턴을 적용하는 방법에는 두 가지 조언이 도움이 될 수 있다: 이 장에서는 이러한 일반적인 패턴 중 일부를 설명한다.
배치 프로세스에 메시징을 추가하면 운영을 자동화하고 주요 관심사를 분리 및 전략화할 수 있다. 예를 들어, 메시지는 잡 실행을 트리거할 수 있으며, 메시지 전송은 다양한 방식으로 노출될 수 있다. 또는 잡이 완료되거나 실패하면 해당 이벤트가 메시지 전송을 트리거할 수 있으며 해당 메시지의 소비자는 애플리케이션 자체와 관련이 없는 운영 문제(예: 채널을 통해 처리할 아이템 읽기 또는 쓰기)를 가질 수 있다. 원격 분할 및 원격 청킹은 여러 워커에게 작업 부하를 분산하는 방법을 제공한다.
이 장에서는 다음과 같은 개념을 다룬다.
- Namespace Support
- Launching Batch Jobs through Messages
- Providing Feedback with Informational Messages
- Asynchronous Processors
- Externalizing Batch Process Execution
13.1. Namespace Support
더 쉬운 구성을 제공하기 위해 버전 1.3의 스프링 배치 인테그레이션에 전용 XML 네임스페이스가 추가됐다. 네임스페이스를 사용하려면 스프링 XML 애플리케이션 컨텍스트 파일에 다음 네임스페이스 선언을 추가하자:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:batch-int="http://www.springframework.org/schema/batch-integration"
xsi:schemaLocation="
http://www.springframework.org/schema/batch-integration
https://www.springframework.org/schema/batch-integration/spring-batch-integration.xsd">
...
</beans>
다음 예제는 스프링 배치 인테그레이션을 위해 완전히 구성된 스프링 XML 애플리케이션 컨텍스트 파일을 보여준다:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:batch="http://www.springframework.org/schema/batch"
xmlns:batch-int="http://www.springframework.org/schema/batch-integration"
xsi:schemaLocation="
http://www.springframework.org/schema/batch-integration
https://www.springframework.org/schema/batch-integration/spring-batch-integration.xsd
http://www.springframework.org/schema/batch
https://www.springframework.org/schema/batch/spring-batch.xsd
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd">
...
</beans>
참조된 XSD 파일에 버전 번호를 추가하는 것도 가능하다. 그러나 버전 없는 선언은 항상 최신 스키마를 사용하므로 일반적으로 XSD명에 버전 번호를 추가하지 않는 것이 좋다. 버전 번호를 추가하면 최신 버전의 XML 스키마가 필요할 수 있으므로 스프링 배치 인테그레이션 의존성을 업데이트할 때 문제가 발생할 수 있다.
13.2. Launching Batch Jobs through Messages
코어(core) 스프링 배치 API를 사용하여 배치 잡을 시작할 때 기본적으로 두 가지 옵션이 있다:
커맨트라인잡러너(CommandLineJobRunner)
로 명령줄 사용JobOperator.start()
또는JobLauncher.run()
을 사용한 프로그래밍 방식
예를 들어, 쉘 스크립트를 사용하여 배치 잡을 호출할 때, 커맨드라인잡러너(CommandLineJobRunner)
를 사용할 수 있다. 또는 잡오퍼레이터(JobOperator)
를 직접 사용(예: 스프링 배치를 웹 애플리케이션의 일부로 사용하는 경우)할 수 있다.
그러나, 더 복잡한 사례는 어떨까? 배치 잡 대한 데이터를 검색하기 위해 원격 (S)FTP 서버를 폴링해야 하거나 애플리케이션이 여러 다른 데이터 소스를 동시에 지원해야 할 수도 있다. 예를 들어, 웹뿐만 아니라 FTP 및 기타 소스에서도 데이터 파일을 받을 수 있다. 스프링 배치를 호출하기 전에 입력 파일의 추가 변환이 필요할 수 있다. 따라서 스프링 인테그레이션과 그에 따른 수많은 어댑터를 사용하여 배치 잡을 실행하는 것이 훨씬 더 강력할 수 있다. 예를 들어 파일 인바운드 채널 어댑터(File Inbound Channel Adapter)를 사용하여 파일 시스템의 디렉토리를 모니터링하고 입력 파일이 도착하자마자 배치 잡을 시작할 수 있다. 또한, 여러 어댑터를 사용하는 스프링 인테그레이션 플로우(flow)를 사용하여 동시에 여러 소스에서 배치 잡에 대한 데이터를 쉽게 수집할 수 있다. 잡런처(JobLauncher)
의 분리(decoupled) 및 이벤트 드리븐(event-driven)을 허용하기 때문에 스프링 인테그레이션으로 이런 상황을 구현하는 것은 쉽다.
스프링 배치 인테그레이션은 배치 잡을 시작하는 데 사용할 수 있는 잡런칭메세지핸들러(JobLaunchingMessageHandler)
클래스를 제공한다. 잡런칭메세지핸들러(JobLaunchingMessageHandler)
에 대한 입력은 잡런치리퀘스트(JobLaunchRequest)
타입의 페이로드가 있는 스프링 인테그레이션 메시지가 제공한다. 이 클래스는 배치 잡 및 잡을 시작하는 데 필요한 잡파라미터(JobParameters) 주변의 래퍼(wrapper)이다.
다음 이미지는 배치 잡을 시작하는 데 필요한 일반적인 스프링 인테그레이션 메시지 플로우(flow)을 보여준다. EIP(Enterprise Integration Patterns) 웹사이트는 메시징 아이콘과 해당 설명에 대한 전체 개요를 제공한다.
이미지 28. 배치 잡 실행
13.2.1. Transforming a File into a JobLaunchRequest
다음 예제에서 파일을 잡런치리퀘스트(JobLaunchRequest)
로 변환한다:
package io.spring.sbi;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.integration.launch.JobLaunchRequest;
import org.springframework.integration.annotation.Transformer;
import org.springframework.messaging.Message;
import java.io.File;
public class FileMessageToJobRequest {
private Job job;
private String fileParameterName;
public void setFileParameterName(String fileParameterName) {
this.fileParameterName = fileParameterName;
}
public void setJob(Job job) {
this.job = job;
}
@Transformer
public JobLaunchRequest toRequest(Message<File> message) {
JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
jobParametersBuilder.addString(fileParameterName, message.getPayload().getAbsolutePath());
return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
}
}
13.2.2. The JobExecution Response
배치 잡이 실행되면 잡익스큐션(JobExecution)
인스턴스가 반환된다. 이 인스턴스를 사용하여 실행 상태를 확인할 수 있다. 잡익스큐션(JobExecution)
을 성공적으로 생성하면, 실제 실행 성공 여부에 관계없이 항상 반환된다.
잡익스큐션(JobExecution)
인스턴스가 반환되는 방식에 대한 정확한 동작은 제공된 태스크익스큐터(TaskExecutor)
에 따라 다르다. 동기(승글 스레드) 태스트익스큐터(TaskExecutor)
구현체가 사용되는 경우 잡익스큐션(JobExecution)
응답은 작업이 완료된 후에만 반환된다. 비동기 태스크익스큐터(TaskExecutor)
를 사용하면, 잡익스큐션(JobExecution)
인스턴스가 즉시 반환된다. 그런 다음 잡익스큐션(JobExecution)
인스턴스의 ID(JobExecution.getJobId() 사용)를 가져오고 잡익스플로러(JobExplorer)
를 사용하여 잡리포지터리(JobRepository)
에서 잡의 업데이트된 상태를 쿼리할 수 있다. 자세한 내용은 리포지터리 쿼리를 참고하자.
13.2.3. Spring Batch Integration Configuration
제공된 디렉토리에서 CSV 파일을 수신하고, 변환기(파일메세지투잡리퀘스트(FileMessageToJobRequest)
)에 전달하고, 잡 런칭 게이트웨이(job launching gateway)를 통해 잡을 실행하고, 로깅 채널 어댑터(logging-channel-adapter)
를 사용하여 잡익스큐션(JobExecution)
의 출력을 기록하고, 출력을 기록하기 위해 누군가 인바운드 채널 어댑터(inbound-channel-adapter)
파일을 생성해야 하는 경우를 생각해 보자.
다음 예제는 XML에서 일반적인 사례 구성 방법을 보여준다:
XML 구성
<int:channel id="inboundFileChannel"/>
<int:channel id="outboundJobRequestChannel"/>
<int:channel id="jobLaunchReplyChannel"/>
<int-file:inbound-channel-adapter id="filePoller"
channel="inboundFileChannel"
directory="file:/tmp/myfiles/"
filename-pattern="*.csv">
<int:poller fixed-rate="1000"/>
</int-file:inbound-channel-adapter>
<int:transformer input-channel="inboundFileChannel" output-channel="outboundJobRequestChannel">
<bean class="io.spring.sbi.FileMessageToJobRequest">
<property name="job" ref="personJob"/>
<property name="fileParameterName" value="input.file.name"/>
</bean>
</int:transformer>
<batch-int:job-launching-gateway request-channel="outboundJobRequestChannel" reply-channel="jobLaunchReplyChannel"/>
<int:logging-channel-adapter channel="jobLaunchReplyChannel"/>
다음 예제는 자바에서 일반적인 사례 구성 방법을 보여준다:
자바 구성
@Bean
public FileMessageToJobRequest fileMessageToJobRequest() {
FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
fileMessageToJobRequest.setFileParameterName("input.file.name");
fileMessageToJobRequest.setJob(personJob());
return fileMessageToJobRequest;
}
@Bean
public JobLaunchingGateway jobLaunchingGateway() {
TaskExecutorJobLauncher jobLauncher = new TaskExecutorJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.setTaskExecutor(new SyncTaskExecutor());
JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(jobLauncher);
return jobLaunchingGateway;
}
@Bean
public IntegrationFlow integrationFlow(JobLaunchingGateway jobLaunchingGateway) {
return IntegrationFlow.from(Files.inboundAdapter(new File("/tmp/myfiles"))
.filter(new SimplePatternFileListFilter("*.csv")),
c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1)))
.transform(fileMessageToJobRequest())
.handle(jobLaunchingGateway)
.log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload")
.get();
}
13.2.4. Example ItemReader Configuration
이제 파일을 폴링하고 잡을 시작할 것이므로, 다음 빈(Bean) 구성과 같이 잡파라미터에 정의된 “input.file.name” 위치에 있는 파일을 사용하도록 스프링 배치 아이템리더(ItemReader)
를 구성해야 한다:
다음 예제는 XML에서 필요한 빈 구성을 보여준다:
XML 구성
<bean id="itemReader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">
<property name="resource" value="file://#{jobParameters['input.file.name']}"/>
...
</bean>
다음 예제는 자바에서 필요한 빈 구성을 보여준다:
자바 구성
@Bean
@StepScope
public ItemReader sampleReader(@Value("#{jobParameters[input.file.name]}") String resource) {
...
FlatFileItemReader flatFileItemReader = new FlatFileItemReader();
flatFileItemReader.setResource(new FileSystemResource(resource));
...
return flatFileItemReader;
}
위 예제에서 주요 관심사는 #{jobParameters['input.file.name']}
값을 리소스(Resource) 프로퍼티 값으로 주입하고 아이템리더(ItemReader)
빈이 스텝 스코프를 갖도록 설정하는 것이다. 스텝 스코프를 갖도록 빈을 설정하면, jobParameters
변수에 대한 접근할 수 있도록 레이트 바인딩(late binding)을 지원한다.
13.3. Available Attributes of the Job-Launching Gateway
잡 런칭 게이트웨이(The job-launching gateway)에는 잡을 제어하기 위해 설정할 수 있는 애트리뷰트가 있다:
id
: 다음 중 하나의 인스턴스인 기본 스프링 빈을 식별한다.이벤트드리븐컨슈머(EventDrivenConsumer)
폴링컨슈머(PollingConsumer)
(정확한 구현체는 컴포넌트의의 입력 채널이섭스크라이버블채널(SubscribableChannel)
인지폴러블채널(PollableChannel)
인지에 따라 달라진다.)
auto-startup
: 엔드포인트가 시작 시 자동으로 시작되어야 함을 나타내는 불 플래그이다. 기본값은true
이다.request-channel
: 엔트포인트의 입력메세지채널(MessageChannel)
이다.reply-channel
: 결과잡익스큐션(JobExecution)
페이로드가 전송되는메세지채널(MessageChannel)
reply-timeout
: 예외가 발생하기 전 이 게이트웨의 응답 메시지(reply message)가 응답 채널(reply channel)에 성공적으로 전송될 때까지 기다리는 시간(밀리초)을 지정할 수 있다. 이 애트리뷰트는 채널이 차단(예: 현재 가득 찬 제한된 큐 채널(queue channel)을 사용하려는 경우)될 수 있는 경우에만 적용된다. 또한,다이렉트채널(DirectChannel)
로 전송할 때 발신자의 스레드에서 호출이 발생한다는 점을 명심하자. 따라서 전송 작업의 실패는 추가 다운스트림(downstream)의 다른 컴포넌트로 인해 발생할 수 있다. 응답reply-timeout
애트리뷰트는 기본메세징템플릿(MessagingTemplate)
인스턴스의sendTimeout
프로퍼티에 매핑된다. 지정하지 않으면 애트리뷰트의의 기본값은 -1이며, 이는 기본적으로 게이트웨이가 무기한 대기함을 의미한다.job-launcher
: 선택 값이다. 커스텀잡런처(JobLauncher)
빈을 참조할 수있다. 지정하지 않으면 어댑터는jobLauncher
ID로 등록된 인스턴스를 재사용한다. 기본 인스턴스가 없으면 예외가 발생한다.order
: 이 엔드포인트가섭스크라이버블채널(SubscribableChannel)
의 구독자(subscriber)로 연결될 때 구체적인 호출 순서를 지정한다.
13.4. Sub-elements
이 게이트웨이(Gateway)
가 폴러블채널(PollableChannel)
에서 메시지를 수신하는 경우, 글로벌 디폴트 폴러
(global default Poller
)를 제공하거나 잡 런칭 게이트웨이(Job Launching Gateway)
에 폴러(Poller)
하위 엘리먼트(sub-element)를 제공해야 한다.
다음 예제는 XML에서 폴러(poller)를 제공하는 방법을 보여준다:
XML 구성
<batch-int:job-launching-gateway request-channel="queueChannel" reply-channel="replyChannel" job-launcher="jobLauncher">
<int:poller fixed-rate="1000">
</batch-int:job-launching-gateway>
다음 예제는 자바에서 폴러(poller)를 제공하는 방법을 보여준다:
자바 구성
@Bean
@ServiceActivator(inputChannel = "queueChannel", poller = @Poller(fixedRate="1000"))
public JobLaunchingGateway sampleJobLaunchingGateway() {
JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(jobLauncher());
jobLaunchingGateway.setOutputChannel(replyChannel());
return jobLaunchingGateway;
}
13.4.1. Providing Feedback with Informational Messages
스프링 배치 잡은 오랫동안 실행될 수 있으므로, 진행 정보를 제공하는 것이 중요할 수 있다. 예를 들어, 이해관계자는 배치 잡의 일부 또는 전부가 실패한 경우 알림을 받기를 원할 수 있다. 스프링 배치는 다음을 통해 수집되는 이 정보에 대한 지원을 제공한다:
- Active polling
- Event-driven listeners
스프링 배치 잡을 비동기식(asynchronously)으로 시작할 때(예: 잡 런칭 게이트웨이(Job Launching Gateway)
사용), 잡익스큐션(JobExecution)
인스턴스가 반환된다. 따라서 JobExecution.getJobId()
를 사용하면 잡익스플로러(JobExplorer)
를 사용하여, 잡리포지터리(JobRepository)
에서 잡익스큐션(JobExecution)
의 업데이트된 인스턴스를 검색하여 상태 업데이트를 지속적으로 폴링할 수 있다. 그러나 이는 차선책(sub-optimal)으로 간주되며 이벤트 기반(event-driven) 접근 방식이 선호된다.
따라서, 스프링 배치는 가장 일반적으로 사용되는 세 가지 리스너를 제공한다:
스텝리스너(StepListener)
청크리스너(ChunkListener)
잡익스큐션리스너(JobExecutionListener)
다음 이미지에 표시된 예시에서 스프링 배치 잡이 스텝익스큐션리스너(StepExecutionListener)
로 구성됐다. 따라서 스프링 인테그레이션(Spring Integration)
은 이벤트 전후의 모든 스텝를 수신하고 처리한다. 예를 들어, 라우터(Router)
를 사용하여 수신된 스텝익스큐션(StepExecution)
을 검사할 수 있다. 해당 검사 결과에 따라, 다양한 일이 발생할 수 있으므로(예: 메일 아웃바운드 채널 어댑터(mail outbound channel adapter)로 메시지 라우팅), 일부 조건에 따라 이메일 알림이 전송될 수 있다.
이미지 29. 정보 메시지 처리
두 부분으로 구성된 다음 예시에서 스텝익스큐션(StepExecution)
이벤트를 게이트웨이(Gateway)
에 메시지를 보내고, 해당 출력을 로깅 채널 어댑터(logging-channel-adapter)
에 기록하도록 리스너를 구성하는 방법을 보여준다.
첫째, 노티피케이션 인테그레이션 빈(notification integration bean)을 만든다.
다음 예제는 XML에서 노티피케이션 인테그레이션 빈을 생성하는 방법을 보여준다:
XML 구성
<int:channel id="stepExecutionsChannel"/>
<int:gateway id="notificationExecutionsListener"
service-interface="org.springframework.batch.core.StepExecutionListener"
default-request-channel="stepExecutionsChannel"/>
<int:logging-channel-adapter channel="stepExecutionsChannel"/>
다음 예제는 자바에서 노티피케이션 인테그레이션 빈을 생성하는 방법을 보여준다:
자바 구성
@Bean
@ServiceActivator(inputChannel = "stepExecutionsChannel")
public LoggingHandler loggingHandler() {
LoggingHandler adapter = new LoggingHandler(LoggingHandler.Level.WARN);
adapter.setLoggerName("TEST_LOGGER");
adapter.setLogExpressionString("headers.id + ': ' + payload");
return adapter;
}
@MessagingGateway(name = "notificationExecutionsListener", defaultRequestChannel = "stepExecutionsChannel")
public interface NotificationExecutionListener extends StepExecutionListener {}
구성에
@IntegrationComponentScan
어노테이션을 추가해야 한다.
둘째, 잡을 수정하여 스텝 레벨 리스너를 추가한다.
다음 예는 XML에서 스텝 레벨 리스너를 추가하는 방법을 보여준다:
XML 구성
<job id="importPayments">
<step id="step1">
<tasklet ../>
<chunk ../>
<listeners>
<listener ref="notificationExecutionsListener"/>
</listeners>
</tasklet>
...
</step>
</job>
다음 예는 자바에서 스텝 레벨 리스너를 추가하는 방법을 보여준다:
자바 구성
public Job importPaymentsJob(JobRepository jobRepository) {
return new JobBuilder("importPayments", jobRepository)
.start(
stepBuilderFactory.get("step1")
.chunk(200)
.listener(notificationExecutionsListener())
...
)
}
13.4.2. Asynchronous Processors
비동기 프로세서는 아이템 처리의 규모를 조정하는 데 도움이 된다. 비동기 프로세서 사례에서 에이싱크아이템프로세서(AsyncItemProcessor)
는 디스패처 역할을 하며 새 스레드의 아이템에 대한 아이템프로세서(ItemProcessor)
로직를 실행한다. 아이템이 완료되면 퓨처(Future)
가 에싱크아이템라이터(AsynchItemWriter)
에 전달 후 작성된다.
따라서, 에이싱크로너스 아이템 프로세싱를 사용하여, 기본적으로 포크 조인(fork-join) 시나리오를 구현함으로써 성능을 향상시킬 수 있다. 에이싱크아이템라이터(AsyncItemWriter)
는 결과를 수집하고 모든 결과가 사용 가능해지면 즉시 청크를 재작성한다.
다음 예제는 XML에서 에이싱크아이템프로세서(AsyncItemProcessor)
를 구성하는 방법을 보여준다:
XML 구성
<bean id="processor" class="org.springframework.batch.integration.async.AsyncItemProcessor">
<property name="delegate">
<bean class="your.ItemProcessor"/>
</property>
<property name="taskExecutor">
<bean class="org.springframework.core.task.SimpleAsyncTaskExecutor"/>
</property>
</bean>
다음 예제는 자바에서 에이싱크아이템프로세서(AsyncItemProcessor)
를 구성하는 방법을 보여준다:
자바 구성
@Bean
public AsyncItemProcessor processor(ItemProcessor itemProcessor, TaskExecutor taskExecutor) {
AsyncItemProcessor asyncItemProcessor = new AsyncItemProcessor();
asyncItemProcessor.setTaskExecutor(taskExecutor);
asyncItemProcessor.setDelegate(itemProcessor);
return asyncItemProcessor;
}
delegate
프로퍼티는 아이템프로세서(ItemProcessor)
빈을 참조하고, taskExecutor
프로퍼티는은 택한 태스크익스큐터(TaskExecutor)
를 참조합니다.
다음 예제는 XML에서 에이싱크아이템라이터(AsyncItemWriter)
를 구성하는 방법을 보여준다:
XML 구성
<bean id="itemWriter" class="org.springframework.batch.integration.async.AsyncItemWriter">
<property name="delegate">
<bean id="itemWriter" class="your.ItemWriter"/>
</property>
</bean>
다음 예제는 자바에서 에이싱크아이템라이터(AsyncItemWriter)
를 구성하는 방법을 보여준다:
자바 구성
@Bean
public AsyncItemWriter writer(ItemWriter itemWriter) {
AsyncItemWriter asyncItemWriter = new AsyncItemWriter();
asyncItemWriter.setDelegate(itemWriter);
return asyncItemWriter;
}
다시 말하지만, delegate
프로퍼티는 실제로 아이템라이터(ItemWriter)
빈에 대한 참조이다.
13.4.3. Externalizing Batch Process Execution
지금까지 논의된 통합(integration) 접근 방식은 스프링 인테그레이션이 외부 셸처럼 스프링 배치를 래핑하는 사례를 보여줬다. 그러나 스프링 배치는 내부적으로 스프링 인테그레이션을 사용할 수도 있다. 이 접근 방식을 사용하면 스프링 배치 사용자는 아이템 처리 또는 청크 처리를 외부 프로세스에 위임할 수 있다.
이를 통해 복잡한 처리에 대한 부담을 덜어줄 수 있다. 스프링 배치 인테그레이션(Spring Batch Integration)은 다음을 제공한다:
- Remote Chunking
- Remote Partitioning
Remote Chunking
다음 이미지는 스프링 인테그레이션과 함께 스프링 배치를 사용할 때 원격 청킹이 작동하는 한 가지 방법을 보여준다:
이미지 30. 원격 청킹
한 단계 더 나아가 아이템을 전송하고, 결과를 수집하는 청크메세지채널아이템라이터(ChunkMessageChannelItemWriter)
(스프링 배치 인테그레이션에서 제공)를 사용하여 청크 처리를 외부화할 수도 있다. 일단 전송되면, 스프링 배치는 결과를 기다리지 않고, 아이템을 읽고 그룹화하는 프로세스를 계속한다. 오히려 결과를 수집하고, 이를 스프링 배치 프로세스에 재통합하는 것은 청크메세지채널아이템라이터(ChunkMessageChannelItemWriter)
의 책임이다. 스프링 인테그레이션을 사용하면 프로세스의 동시성을 완전히 제어할 수 있다(예: 다이렉트채널(DirectChannel)
대신 큐채널(QueueChannel)
사용). 또한 스프링 인테그레이션의 풍부한 채널 어댑터 컬렉션(예: JMS 및 AMQP)을 사용하여, 배치 작업 청크를 외부 시스템에 배포하여 처리할 수 있다.
XML에서 원격 청크 스텝이 있는 잡은 다음같은 구성을 가질 수 있다:
XML 구성
<job id="personJob">
<step id="step1">
<tasklet>
<chunk reader="itemReader" writer="itemWriter" commit-interval="200"/>
</tasklet>
...
</step>
</job>
자바에서 원격 청크 스텝이 있는 잡은 다음같은 구성을 가질 수 있다:
자바 구성
public Job chunkJob(JobRepository jobRepository) {
return new JobBuilder("personJob", jobRepository)
.start(
stepBuilderFactory.get("step1")
.<Person, Person>chunk(200)
.reader(itemReader())
.writer(itemWriter())
.build()
)
.build();
}
아이템리더(ItemReader)
참조는 매니저에서 데이터를 읽는 데 사용하려는 빈을 가리킨다. 아이템라이터(ItemWriter)
참조는 앞서 설명한 대로 청크메세지채널아이템라이터(ChunkMessageChannelItemWriter)
라고 하는 특별한 아이템라이터(ItemWriter)
를 가리킨다. 프로세서가 있는 경우 워커 구성 대로 매니저 구성에서 제외된다. 사례를 구현할 때 스로틀 제한(throttle limits) 등과 같은 추가 컴포넌트 프로퍼티를 확인해야 한다.
XML에서 기본 매니저 설정을 보여준다:
XML 구성
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616"/>
</bean>
<int-jms:outbound-channel-adapter id="jmsRequests" destination-name="requests"/>
<bean id="messagingTemplate" class="org.springframework.integration.core.MessagingTemplate">
<property name="defaultChannel" ref="requests"/>
<property name="receiveTimeout" value="2000"/>
</bean>
<bean id="itemWriter"
class="org.springframework.batch.integration.chunk.ChunkMessageChannelItemWriter"
scope="step">
<property name="messagingOperations" ref="messagingTemplate"/>
<property name="replyChannel" ref="replies"/>
</bean>
<int:channel id="replies">
<int:queue/>
</int:channel>
<int-jms:message-driven-channel-adapter id="jmsReplies"
destination-name="replies"
channel="replies"/>
자바 구성
@Bean
public org.apache.activemq.ActiveMQConnectionFactory connectionFactory() {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
factory.setBrokerURL("tcp://localhost:61616");
return factory;
}
/*
* 아웃바운드 플로우 구성(워커에게 전달되는 요청(requests))
*/
@Bean
public DirectChannel requests() {
return new DirectChannel();
}
@Bean
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlow
.from(requests())
.handle(Jms.outboundAdapter(connectionFactory).destination("requests"))
.get();
}
/*
* 인바운드 플로우 구성(워커로부터 오는 응답)
*/
@Bean
public QueueChannel replies() {
return new QueueChannel();
}
@Bean
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlow
.from(Jms.messageDrivenChannelAdapter(connectionFactory).destination(
}
/*
* 청크메세지채널아이템라이터(ChunkMessageChannelItemWriter) 구성
*/
@Bean
public ItemWriter<Integer> itemWriter() {
MessagingTemplate messagingTemplate = new MessagingTemplate();
messagingTemplate.setDefaultChannel(requests());
messagingTemplate.setReceiveTimeout(2000);
ChunkMessageChannelItemWriter<Integer> chunkMessageChannelItemWriter = new ChunkMessageChannelItemWriter<>();
chunkMessageChannelItemWriter.setMessagingOperations(messagingTemplate);
chunkMessageChannelItemWriter.setReplyChannel(replies());
return chunkMessageChannelItemWriter;
}
위 구성은 여러 개의 빈을 제공한다. ActiveMQ와 스프링 인테그레이션에서 제공하는 인바운드 및 아웃바운드 JMS 어댑터를 사용하여 메시징 미들웨어를 구성한다. 표시된 대로 잡 스텝에서 참조되는 아이템라이터(itemWriter)
빈은 청크메세지채널아이템라이터(ChunkMessageChannelItemWriter)
를 사용하여 구성된 미들웨어에 청크를 쓴다(write).
이제 다음 예제와 같이 워커 구성으로 이동할 수 있다:
다음 예제는 XML에서 워커 구성을 보여준다:
XML 구성
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616"/>
</bean>
<int:channel id="requests"/>
<int:channel id="replies"/>
<int-jms:message-driven-channel-adapter id="incomingRequests"
destination-name="requests"
channel="requests"/>
<int-jms:outbound-channel-adapter id="outgoingReplies"
destination-name="replies"
channel="replies">
</int-jms:outbound-channel-adapter>
<int:service-activator id="serviceActivator"
input-channel="requests"
output-channel="replies"
ref="chunkProcessorChunkHandler"
method="handleChunk"/>
<bean id="chunkProcessorChunkHandler" class="org.springframework.batch.integration.chunk.ChunkProcessorChunkHandler">
<property name="chunkProcessor">
<bean class="org.springframework.batch.core.step.item.SimpleChunkProcessor">
<property name="itemWriter">
<bean class="io.spring.sbi.PersonItemWriter"/>
</property>
<property name="itemProcessor">
<bean class="io.spring.sbi.PersonItemProcessor"/>
</property>
</bean>
</property>
</bean>
다음 예제는 자바에서 워커 구성을 보여준다:
자바 구성
@Bean
public org.apache.activemq.ActiveMQConnectionFactory connectionFactory() {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
factory.setBrokerURL("tcp://localhost:61616");
return factory;
}
/*
* 인바운드 플로우 구성(매니저로부터 오는 요청(request))
*/
@Bean
public DirectChannel requests() {
return new DirectChannel();
}
@Bean
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlow
.from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("requests"))
.channel(requests())
.get();
}
/*
* 아웃바운드 플로우 구성(매니저에게 응답)
*/
@Bean
public DirectChannel replies() {
return new DirectChannel();
}
@Bean
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlow
.from(replies())
.handle(Jms.outboundAdapter(connectionFactory).destination("replies"))
.get();
}
/*
* 청크프로세서청크핸들러(ChunkProcessorChunkHandler) 구성
*/
@Bean
@ServiceActivator(inputChannel = "requests", outputChannel = "replies")
public ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler() {
ChunkProcessor<Integer> chunkProcessor = new SimpleChunkProcessor<>(itemProcessor(), itemWriter());
ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler = new ChunkProcessorChunkHandler<>();
chunkProcessorChunkHandler.setChunkProcessor(chunkProcessor);
return chunkProcessorChunkHandler;
}
이러한 설정은 대부분 매니저 구성에서 친숙하게 보일 것이다. 워커는 스프링 배치 잡리포지터리(JobRepository)
나 실제 잡 구성 파일에 접근할 필요가 없다. 관심 있는 주요 빈은 청크프로세서청크핸들러(ChunkProcessorChunkHandler)
이다. 청크프로세서청크핸들러(ChunkProcessorChunkHandler)
의 청크프로세서(ChunkProcessor)
프로퍼티는 매니저로부터 청크를 받을 때 워커에서 실행될 아이템라이터(ItemWriter)
(및 옵셔널하게 아이템프로세서(ItemProcessor)
)에 대한 레퍼런스를 제공하는 심플청크프로세서(SimpleChunkProcessor)
를 사용한다.
자세한 내용은 “확장성(Scalability)” 장의 원격 청킹(Remote Chunking) 절을 참고하자.
버전 4.1부터 스프링 배치 인테그레이션(Spring Batch Integration)은 원격 청킹 설정을 단순화하는 데 사용할 수 있는 @EnableBatchIntegration
어노테이션을 도입했다. 이 어노테이션은 애플리케이션 컨텍스트에서 오토와이어드(autowire)할 수 있는 두 개의 빈을 제공한다:
리모트청킹매니저스텝빌더팩토리(RemoteChunkingManagerStepBuilderFactory)
: 매니저 스텝 구성리모트청킹워커빌더(RemoteChunkingWorkerBuilder)
: 원격 워커 인테그레이션 플로우 구성
이러한 API는 다음 다이어그램에 표시된 것처럼 다양한 컴포넌트 구성을 관리한다:
이미지 31. 원격 청킹 구성
매니저에서는, 리모트청킹매니저스텝빌더팩토리(RemoteChunkingManagerStepBuilderFactory)
를 사용하여 다음을 선언하여 매니저 스텝을 구성할 수 있다:
- 아이템을 읽고 워커에게 보내는 아이템 리더
- 워커에게 요청을 보내는 출력 채널(“보내는 요청(request)”)
- 워커로부터 응답을 수신하기 위한 입력 채널(“수신 응답(replies)”)
청크메세지채널아이템라디터(ChunkMessageChannelItemWriter)
및 메세징템플릿(MessagingTemplate)
을 명시적으로 구성할 필요는 없다. 구성 해야 하는 이유가 있으면 명시적으로 구성할 수 있다.
워커 측에서는 리모트청킹워커빌더(RemoteChunkingWorkerBuilder)
를 사용하여 워커를 구성할 수 있다:
- 입력 채널에서 매니저가 보낸 요청(“들어오는 요청(request”)”)을 듣는다.
- 구성된
아이템프로세서(ItemProcessor)
및아이템라이터(ItemWriter)
를 사용하여 각 요청에 대해청크프로세서청크핸들러(ChunkProcessorChunkHandler)
의handlerChunk
메서드를 호출한다. - 출력 채널의 응답(“보내는 응답”)을 매니저에게 보낸다.
심플청크프로세서(SimpleChunkProcessor)
및 청크프로세서청크핸들러(ChunkProcessorChunkHandler)
를 명시적으로 구성할 필요는 없다. 그렇게 해야 하는 이유가 있다면 명시적으로 구성할 수 있다.
다음 예제에서 이러한 API를 사용하는 방법을 보여준다:
@EnableBatchIntegration
@EnableBatchProcessing
public class RemoteChunkingJobConfiguration {
@Configuration
public static class ManagerConfiguration {
@Autowired
private RemoteChunkingManagerStepBuilderFactory managerStepBuilderFactory;
@Bean
public TaskletStep managerStep() {
return this.managerStepBuilderFactory.get("managerStep")
.chunk(100)
.reader(itemReader())
.outputChannel(requests()) // 워커에게 전송된 요청(request)
.inputChannel(replies()) // 워커로부터 받은 답변(replies)
.build();
}
// 미들웨어 빈 설정 생략
}
@Configuration
public static class WorkerConfiguration {
@Autowired
private RemoteChunkingWorkerBuilder workerBuilder;
@Bean
public IntegrationFlow workerFlow() {
return this.workerBuilder
.itemProcessor(itemProcessor())
.itemWriter(itemWriter())
.inputChannel(requests()) // 매니저로부터 받은 요청
.outputChannel(replies()) // 매니저에게 답장을 보냄
.build();
}
// 미들웨어 빈 설정 생략
}
}
여기에서 원격 청킹 잡의 전체 예를 찾을 수 있다.
Remote Partitioning
다음 이미지는 일반적인 원격 파티셔닝 상황을 보여준다:
이미지 32. 원격 파티셔닝
반면, 원격 파티셔닝은 병목 현상(bottleneck)을 일으키는 것이 아이템 처리가 아니라 I/O일 때 유용하다. 원격 파티셔닝을 사용하면 전체 스프링 배치 스텝을 실행하는 워커에게 작업을 보낼 수 있다. 따라서, 각 워커에는 자체 아이템리더(ItemReader)
, 아이템프로세서(ItemProcessor)
및 아이템라이터(ItemWriter)
가 있다. 이를 위해 스프링 배치 인테그레이션(Spring Batch Integration)은 메세지채널파티션핸들러(MessageChannelPartitionHandler)
를 제공한다.
파티션핸들러(PartitionHandler)
인터페이스의 이 구현체는 메세지채널(MessageChannel)
인스턴스를 사용하여 원격 워커에게 지침(instruction)을 보내고 응답을 받는다. 이는 원격 워커와 통신하는 데 사용되는 전송(예: JMS 및 AMQP)에서 추상화를 제공한다.
“확장성(Scalability)” 장의 원격 파티셔닝 절에서는 원격 파티셔닝을 구성하는 데 필요한 개념과 컴포넌트에 대한 개요를 제공하고 기본 태스크익스큐터파티션핸들러(TaskExecutorPartitionHandler)
를 사용하여 별도의 로컬 실행 스레드로 분할하는 예를 보여준다.
여러 JVM으로 원격 파티셔닝을 수행하려면 두 개의 추가 컴포넌트가 필요하다:
- 원격 패브릭(fabric) 또는 그리드(grid) 환경
- 원하는 원격 패브릭 또는 그리드 환경을 지원하는
파티션핸들러(PartitionHandler)
구현
원격 청킹과 유사하게 JMS를 “원격 패브릭”으로 사용할 수 있다. 이 경우 앞서 설명한 대로 메세지채널파티션핸들러(MessageChannelPartitionHandler)
인스턴스를 파티션핸들러(PartitionHandler)
구현체로 사용한다.
다음 예제는 XML에서 기존 파티션 잡이 있다고 가정하고 메세지채널파티션핸들러(MessageChannelPartitionHandler)
및 JMS 구성에 중점을 둔다:
XML 구성
<bean id="partitionHandler" class= "org.springframework.batch.integration.partition.MessageChannelPartitionHandler">
<property name="stepName" value="step1"/>
<property name="gridSize" value="3"/>
<property name="replyChannel" ref="outbound-replies"/>
<property name="messagingOperations">
<bean class="org.springframework.integration.core.MessagingTemplate">
<property name="defaultChannel" ref="outbound-requests"/>
<property name="receiveTimeout" value="100000"/>
</bean>
</property>
</bean>
<int:channel id="outbound-requests"/>
<int-jms:outbound-channel-adapter destination="requestsQueue" channel="outbound-requests"/>
<int:channel id="inbound-requests"/>
<int-jms:message-driven-channel-adapter destination="requestsQueue" channel="inbound-requests"/>
<bean id="stepExecutionRequestHandler" class="org.springframework.batch.integration.partition.StepExecutionRequestHandler">
<property name="jobExplorer" ref="jobExplorer"/>
<property name="stepLocator" ref="stepLocator"/>
</bean>
<int:service-activator ref="stepExecutionRequestHandler" input-channel="inbound-requests" output-channel="outbound-staging"/>
<int:channel id="outbound-staging"/>
<int-jms:outbound-channel-adapter destination="stagingQueue" channel="outbound-staging"/>
<int:channel id="inbound-staging"/>
<int-jms:message-driven-channel-adapter destination="stagingQueue" channel="inbound-staging"/>
<int:aggregator ref="partitionHandler" input-channel="inbound-staging" output-channel="outbound-replies"/>
<int:channel id="outbound-replies">
<int:queue/>
</int:channel>
<bean id="stepLocator" class="org.springframework.batch.integration.partition.BeanFactoryStepLocator" />
다음 예제는 자바에서 기존 파티션 잡이 있다고 가정하고 메세지채널파티션핸들러(MessageChannelPartitionHandler)
및 JMS 구성에 중점을 둔다:
자바 구성
/*
* 매니저 측 구성
*/
@Bean
public PartitionHandler partitionHandler() {
MessageChannelPartitionHandler partitionHandler = new MessageChannelPartitionHandler();
partitionHandler.setStepName("step1");
partitionHandler.setGridSize(3);
partitionHandler.setReplyChannel(outboundReplies());
MessagingTemplate template = new MessagingTemplate();
template.setDefaultChannel(outboundRequests());
template.setReceiveTimeout(100000);
partitionHandler.setMessagingOperations(template);
return partitionHandler;
}
@Bean
public QueueChannel outboundReplies() {
return new QueueChannel();
}
@Bean
public DirectChannel outboundRequests() {
return new DirectChannel();
}
@Bean
public IntegrationFlow outboundJmsRequests() {
return IntegrationFlow.from("outboundRequests")
.handle(Jms.outboundGateway(connectionFactory())
.requestDestination("requestsQueue"))
.get();
}
@Bean
@ServiceActivator(inputChannel = "inboundStaging")
public AggregatorFactoryBean partitioningMessageHandler() throws Exception {
AggregatorFactoryBean aggregatorFactoryBean = new AggregatorFactoryBean();
aggregatorFactoryBean.setProcessorBean(partitionHandler());
aggregatorFactoryBean.setOutputChannel(outboundReplies());
// configure other propeties of the aggregatorFactoryBean
return aggregatorFactoryBean;
}
@Bean
public DirectChannel inboundStaging() {
return new DirectChannel();
}
@Bean
public IntegrationFlow inboundJmsStaging() {
return IntegrationFlow
.from(Jms.messageDrivenChannelAdapter(connectionFactory())
.configureListenerContainer(c -> c.subscriptionDurable(false))
.destination("stagingQueue"))
.channel(inboundStaging())
.get();
}
/*
* 워커 측 구성
*/
@Bean
public StepExecutionRequestHandler stepExecutionRequestHandler() {
StepExecutionRequestHandler stepExecutionRequestHandler = new StepExecutionRequestHandler();
stepExecutionRequestHandler.setJobExplorer(jobExplorer);
stepExecutionRequestHandler.setStepLocator(stepLocator());
return stepExecutionRequestHandler;
}
@Bean
@ServiceActivator(inputChannel = "inboundRequests", outputChannel = "outboundStaging")
public StepExecutionRequestHandler serviceActivator() throws Exception {
return stepExecutionRequestHandler();
}
@Bean
public DirectChannel inboundRequests() {
return new DirectChannel();
}
public IntegrationFlow inboundJmsRequests() {
return IntegrationFlow
.from(Jms.messageDrivenChannelAdapter(connectionFactory())
.configureListenerContainer(c -> c.subscriptionDurable(false))
.destination("requestsQueue"))
.channel(inboundRequests())
.get();
}
@Bean
public DirectChannel outboundStaging() {
return new DirectChannel();
}
@Bean
public IntegrationFlow outboundJmsStaging() {
return IntegrationFlow.from("outboundStaging")
.handle(
Jms.outboundGateway(connectionFactory())
.requestDestination("stagingQueue")
).get();
}
또한 파티션 핸들러
(partition handler
) 애트리뷰트가 partitionHandler
빈에 매핑되는지 확인해야 한다.
다음 예제는 XML에서 파티션 핸들러
(partition handler
) 애트리뷰트를 partitionHandler
에 매핑한다:
XML 구성
<job id="personJob">
<step id="step1.manager">
<partition partitioner="partitioner" handler="partitionHandler"/>
...
</step>
</job>
다음 예제는 자바에서 파티션 핸들러
(partition handler
) 애트리뷰트를 partitionHandler
에 매핑한다:
자바 구성
public Job personJob(JobRepository jobRepository) {
return new JobBuilder("personJob", jobRepository)
.start(
stepBuilderFactory.get("step1.manager")
.partitioner("step1.worker", partitioner())
.partitionHandler(partitionHandler())
.build()
).build();
}
여기에서 원격 파티셔닝 작업의 전체 예시를 볼 수 있다.
@EnableBatchIntegration 어노테이션을 사용하여 원격 파티셔닝 설정을 단순화할 수 있다. 이 어노테이션은 원격 파티셔닝에 유용한 두 개의 빈을 제공한다:
리모트파티셔닝매니저스텝빌더팩토리(RemotePartitioningManagerStepBuilderFactory)
: 매니저 스텝 구성리모트파티셔닝워커스텝빌더팩토리(RemotePartitioningWorkerStepBuilderFactory)
: 워커 스텝 구성
이러한 API는 다음 다이어그램에 표시된 것처럼 다양한 컴포넌트 구성을 처리한다:
이미지 33. 원격 파티셔닝 구성(잡 리포지터리 폴링 포함)
이미지 34. 원격 파티셔닝 구성(응답 집계 포함)
매니저 측에서, 리모트파티셔닝매니저스텝빌더팩토리(RemotePartitioningManagerStepBuilderFactory)
를 사용하고 다음 내용을 선언하여 매니저 스텝를 구성할 수 있다.
- 데이터를 파티셔닝하는 데 사용되는 파티셔너
- 워커에게 요청을 보내는 출력 채널(“발신 요청(request)”)
- 워커로부터 응답을 수신할 입력 채널(“들어오는 응답”)(응답 집계(replies aggregation) 구성 시)
- 폴링 간격(poll interval) 및 시간 초과(timeout) 파라미터(잡 리포지터리 폴링을 구성하는 경우)
메세지채널파티션해들러(MessageChannelPartitionHandler)
및 메세징템플릿(MessagingTemplate)
을 명시적으로 구성할 필요는 없다. 그렇게 해야 하는 이유를 찾으면 명시적으로 구성할 수 있다.
워커 측에서, 리모트파티셔닝워커스텝빌더팩토리(RemotePartitioningWorkerStepBuilderFactory)
를 사용하여 워커를 다음과 같이 구성할 수 있다:
- 입력 채널에서 매니저가 보낸 요청(“들어오는 요청”)을 듣는다.
- 각 요청에 대해
스텝익스큐션리퀘스트핸들러(StepExecutionRequestHandler)
의handle
메서드를 호출한다. - 출력 채널의 응답(“보내는 응답”)을 매니저에게 보낸다.
스텝익스큐션리퀘스트핸들러(StepExecutionRequestHandler)
를 명시적으로 구성할 필요는 없다. 그렇게 해야 하는 이유를 찾으면 명시적으로 구성할 수 있다.
다음 예제에서 이러한 API를 사용하는 방법을 보여준다:
@Configuration
@EnableBatchProcessing
@EnableBatchIntegration
public class RemotePartitioningJobConfiguration {
@Configuration
public static class ManagerConfiguration {
@Autowired
private RemotePartitioningManagerStepBuilderFactory managerStepBuilderFactory;
@Bean
public Step managerStep() {
return this.managerStepBuilderFactory
.get("managerStep")
.partitioner("workerStep", partitioner())
.gridSize(10)
.outputChannel(outgoingRequestsToWorkers())
.inputChannel(incomingRepliesFromWorkers())
.build();
}
// 미들웨어 빈 설정이 생략됨
}
@Configuration
public static class WorkerConfiguration {
@Autowired
private RemotePartitioningWorkerStepBuilderFactory workerStepBuilderFactory;
@Bean
public Step workerStep() {
return this.workerStepBuilderFactory
.get("workerStep")
.inputChannel(incomingRequestsFromManager())
.outputChannel(outgoingRepliesToManager())
.chunk(100)
.reader(itemReader())
.processor(itemProcessor())
.writer(itemWriter())
.build();
}
// 미들웨어 빈 설정이 생략됨
}
}