메세지 보내기(Sending Messages)

이 장에서는 메시지를 보내는 방법을 다룬다.

KafkaTemplate 사용(Using KafkaTemplate)

이 장에서는 KafkaTemplate을 사용하여 메시지를 보내는 방법을 다룬다.

개요(Overview)

KafkaTemplate은 프로듀서를 래핑하고 카프카 토픽에 데이터를 보내는 편리한 방법을 제공한다. 다음 목록은 KafkaTemplate의 관련 메서드를 보여준다.

CompletableFuture<SendResult<K, V>> sendDefault(V data);

CompletableFuture<SendResult<K, V>> sendDefault(K key, V data);

CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);

CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);

CompletableFuture<SendResult<K, V>> send(String topic, V data);

CompletableFuture<SendResult<K, V>> send(String topic, K key, V data);

CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);

CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);

CompletableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);

CompletableFuture<SendResult<K, V>> send(Message<?> message);

Map<MetricName, ? extends Metric> metrics();

List<PartitionInfo> partitionsFor(String topic);

<T> T execute(ProducerCallback<K, V, T> callback);

<T> T executeInTransaction(OperationsCallback<K, V, T> callback);

// Flush the producer.
void flush();

interface ProducerCallback<K, V, T> {

    T doInKafka(Producer<K, V> producer);

}

interface OperationsCallback<K, V, T> {

    T doInOperations(KafkaOperations<K, V> operations);

}

자세한 내용은 Javadoc을 참고하자.

버전 3.0에서는, 이전에 ListenableFuture를 반환했던 메서드가 CompletableFuture를 반환하도록 변경됐다. 마이그레이션을 용이하게 하기 위해 2.9 버전에는 CompletableFuture 반환 타입과 동일한 메서드를 제공하는 CompletableFuture()를 사용하는 메서드가 추가됐다. 이 방법은 더 이상 사용할 수 없다.

sendDefault API를 사용하려면 기본 토픽이 템플릿에 제공되어야 한다.

API는 타임스탬프(timestamp)를 파라미터로 사용하고 이 타임스탬프를 레코드에 저장한다. 사용자가 제공한 타임스탬프가 저장되는 방식은 카프카 토픽에 구성된 타임스탬프 타입에 따라 다르다. CREATE_TIME을 사용하도록 토픽가 구성된 경우 사용자가 지정한 타임스탬프가 기록된다(또는 지정되지 않은 경우 생성된다). LOG_APPEND_TIME을 사용하도록 토픽을 구성한 경우 사용자가 지정한 타임스탬프는 무시되고 브로커는 로컬 브로커 시간을 추가한다.

metricspartitionsFor 메서드는 기본 프로듀서의 동일한 메서드에 위임한다. execute 메소드는 기본 프로듀서에 대한 직접 접근을 제공한다.

템플릿을 사용하려면, 프로듀서 팩토리를 구성하고 이를 템플릿 생성자에 제공하면 된다. 다음 예제에서 그 방법을 보여준다.

@Bean
public ProducerFactory<Integer, String> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    // 더 많은 프로퍼티 정보는 다음을 참고하자. https://kafka.apache.org/documentation/#producerconfigs
    return props;
}

@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
    return new KafkaTemplate<Integer, String>(producerFactory());
}

버전 2.5부터, 이제 팩토리의 ProducerConfig 프로퍼티를 오버라이드하여 동일한 팩토리의 다양한 프로듀서 구성으로 템플릿을 생성할 수 있다.

@Bean
public KafkaTemplate<String, String> stringTemplate(ProducerFactory<String, String> pf) {
    return new KafkaTemplate<>(pf);
}

@Bean
public KafkaTemplate<String, byte[]> bytesTemplate(ProducerFactory<String, byte[]> pf) {
    return new KafkaTemplate<>(pf,
            Collections.singletonMap(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class));
}

ProducerFactory<?, ?> 타입(예: 스프링 부트에 의해 자동 구성된 빈)의 빈은 다른 일반 타입으로 참조될 수 있다.

표준 <bean/> 정의를 사용하여 템플릿을 구성할 수도 있다.

그 다음 템플릿을 사용하려면, 해당 메서드 중 하나를 호출하면 된다.

When you use the methods with a Message<?> parameter, the topic, partition, key and timestamp information is provided in a message header that includes the following items:

Message<?> 파라미터와 함께 메소드를 사용하면 토픽, 파티션, 키 및 타임스탬프 정보가 다음 항목을 포함하는 메시지 헤더에 제공된다.

  • KafkaHeaders.TOPIC
  • KafkaHeaders.PARTITION
  • KafkaHeaders.KEY
  • KafkaHeaders.TIMESTAMP

메시지 페이로드는 데이터다.

선택적으로, Future가 완료될 때까지 기다리는 대신 전송 결과(성공 또는 실패)와 함께 비동기(asynchronous) 콜백을 가져오도록 ProducerListener를 사용하여 KafkaTemplate을 구성할 수 있다. 다음 목록은 ProducerListener 인터페이스의 정의를 보여준다.

public interface ProducerListener<K, V> {

    void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata);

    void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata, Exception exception);

}

기본적으로, 템플릿은 오류를 기록하고 전송이 성공하면 아무 작업도 수행하지 않는 LoggingProducerListener로 구성된다.

편의를 위해 메소드 중 하나만 구현하려는 경우 기본 메소드 구현이 제공된다.

send 메소드는 CompletableFuture<SendResult>를 반환한다. 전송 결과를 비동기적으로 수신하기 위해 리스너에 콜백을 등록할 수 있다. 다음 예제에서는 그 방법을 보여준다.

CompletableFuture<SendResult<Integer, String>> future = template.send("myTopic", "something");
future.whenComplete((result, ex) -> {
    ...
});

SendResult에는 ProducerRecordRecordMetadata라는 두 가지 프로퍼티가 있다. 해당 객체에 대한 자세한 내용은 카프카 API 설명서를 참고하자.

ThrowableKafkaProducerException으로 캐스팅할 수 있다. failedProducerRecord 프로퍼티에는 실패한 레코드가 포함되어 있다.

결과를 기다리기 위해 전송 스레드를 차단하려면 futureget() 메서드를 호출할 수 있다. 제한 시간이 있는 방법을 사용하는 것이 좋다. linger.ms를 설정한 경우 기다리기 전에 flush()를 호출할 수 있다. 또는 편의를 위해 템플릿에는 보낼 때마다 템플릿이 flush()되도록 하는 autoFlush 파라미터가 있는 생성자가 있다. flushlinger.ms 프로듀서 프로퍼티을 설정하고 부분 배치를 즉시 보내려는 경우에만 필요하다.

예제(Examples)

이 절에서는 카프카에 메시지를 보내는 예제를 보여준다.

예제 1. 논 블록킹 (Async)

public void sendToKafka(final MyOutputData data) {
    final ProducerRecord<String, String> record = createRecord(data);

    CompletableFuture<SendResult<Integer, String>> future = template.send(record);
    future.whenComplete((result, ex) -> {
        if (ex == null) {
            handleSuccess(data);
        }
        else {
            handleFailure(data, record, ex);
        }
    });
}

예제 2. 블로킹 (Sync)

public void sendToKafka(final MyOutputData data) {
    final ProducerRecord<String, String> record = createRecord(data);

    try {
        template.send(record).get(10, TimeUnit.SECONDS);
        handleSuccess(data);
    }
    catch (ExecutionException e) {
        handleFailure(data, record, e.getCause());
    }
    catch (TimeoutException | InterruptedException e) {
        handleFailure(data, record, e);
    }
}

ExecutionException의 원인은 failedProducerRecord 프로퍼티가 있는 KafkaProducerException이다.

RoutingKafkaTemplate 사용(Using RoutingKafkaTemplate)

버전 2.5부터, RoutingKafkaTemplate을 사용하여 대상 토픽명을 기반으로 런타임 시 프로듀서를 선택할 수 있다.

라우팅 템플릿은 transactions, execute, flush 또는 metrics 작업에 대한 토픽이 알려져 있지 않기 때문에 해당 작업을 지원하지 않는다.

템플릿에는 java.util.regex.PatternProducerFactory<Object, Object> 인스턴스 매핑이 필요하다. 이 맵은 순서대로 탐색되므로 순서가 지정되어야 한다(예: LinkedHashMap). 처음에는 좀 더 구체적인 패턴을 추가해야 한다.

다음의 간단한 스프링 부트 애플리케이션은 동일한 템플릿과 다른 시리얼라이저를 사용하여 다양한 토픽으로 보내는 방법에 대한 예제를 제공한다.

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Bean
    public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context, ProducerFactory<Object, Object> pf) {
        // 다른 시리얼라이저를 사용하여, PF를 복제하고 종료를 위해 스프링에 등록한다.
        Map<String, Object> configs = new HashMap<>(pf.getConfigurationProperties());
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
        DefaultKafkaProducerFactory<Object, Object> bytesPF = new DefaultKafkaProducerFactory<>(configs);
        context.registerBean("bytesPF", DefaultKafkaProducerFactory.class, () -> bytesPF);

        Map<Pattern, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();
        map.put(Pattern.compile("two"), bytesPF);
        map.put(Pattern.compile(".+"), pf); // 스트링 시리얼라이저와 기본 PF
        return new RoutingKafkaTemplate(map);
    }

    @Bean
    public ApplicationRunner runner(RoutingKafkaTemplate routingTemplate) {
        return args -> {
            routingTemplate.send("one", "thing1");
            routingTemplate.send("two", "thing2".getBytes());
        };
    }

}

이 예제에 해당하는 @KafkaListeners는 어노테이션 프로퍼티에 표시된다.

비슷한 결과를 얻기 위한 다른 기술을 사용하되 동일한 토픽에 다른 타입을 보내는 추가 기능을 보려면 시리얼라이저 및 디시리얼라이저 위임을 참고하자.

DefaultKafkaProducerFactory 사용(Using DefaultKafkaProducerFactory)

카프카템플릿(KafkaTemplate) 사용에서 볼 수 있듯이, 프로듀서팩토리(ProducerFactory)를 사용하여 프로듀서를 생성한다.

트랜잭션을 사용하지 않는 경우 기본적으로 DefaultKafkaProducerFactoryKafkaProducer 자바독(JavaDocs)에서 권장하는 대로, 모든 클라이언트에서 사용하는 싱글톤 프로듀서를 생성한다. 그러나, 템플릿에서 flush()를 호출하면 동일한 프로듀서를 사용하는 다른 스레드에 대한 지연이 발생할 수 있다. 버전 2.3부터 DefaultKafkaProducerFactory에는 producerPerThread라는 새로운 프로퍼티가 있다. true로 설정하면 팩토리는 이 문제를 방지하기 위해 각 스레드에 대해 별도의 프로듀서를 생성(및 캐시)한다.

producerPerThreadtrue인 경우, 프로듀서가 더 이상 필요하지 않을 때, 사용자 코드는 팩토리에서 closeThreadBoundProducer()를 호출한다. 그러면 프로듀서가 물리적으로 닫히고 ThreadLocal에서 제거된다. reset() 또는 destroy()를 호출해도 이러한 프로듀서는 정리되지 않는다.

KafkaTemplate 트랜잭션 및 비트랜잭션 게시도 참고하자.

DefaultKafkaProducerFactory를 생성할 때, 프로퍼티 맵만 가져오는 프로듀서를 호출하여 키 및/또는 값 Serializer 클래스를 구성에서 선택하거나(KafkaTemplate 사용의 예제 참고) Serializer 인스턴스를 DefaultKafkaProducerFactory 생성자에 전달할 수 있다( 이 경우 모든 프로듀서 는 동일한 인스턴스를 공유한다.) 또는 각 프로듀서에 대해 별도 Serializer 인스턴스를 얻는 데 사용되는 Manufacturer<Serializer>(버전 2.3부터)를 제공할 수 있다.

@Bean
public ProducerFactory<Integer, CustomValue> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs(), null, () -> new CustomValueSerializer());
}

@Bean
public KafkaTemplate<Integer, CustomValue> kafkaTemplate() {
    return new KafkaTemplate<Integer, CustomValue>(producerFactory());
}

버전 2.5.10부터 이제 팩토리가 생성된 후 프로듀서 프로퍼티를 업데이트할 수 있다. 예를 들어 자격 증명이 변경된 후 SSL 키/신뢰 리포지터리 위치를 업데이트해야 하는 경우 유용할 수 있다. 변경 사항은 기존 프로듀서 인스턴스에 영향을 미치지 않는다. 새로운 프로퍼티를 사용하여 새 프로듀서가 생성되도록 reset()을 호출하여 기존 프로듀서를 모두 닫는다. 노트: 트랜젝셔널 프로듀서 팩토리를 논트랜잭셔널 프로듀서 팩토리로 변경할 수 없으며 그 반대의 경우도 마찬가지다.

이제 두 가지 새로운 방법이 제공된다.

void updateConfigs(Map<String, Object> updates);

void removeConfig(String configKey);

버전 2.8부터 시리얼라이저를 객체로 제공하는 경우(생성자(constructor)에서 또는 설정자(setters)를 통해) 팩토리는 구성 프로퍼티로 구성하기 위해 configure() 메서드를 호출합니다.

ReplyingKafkaTemplate 사용(Using ReplyingKafkaTemplate)

버전 2.1.3에서는 요청/응답 의미 체계를 제공하기 위해 KafkaTemplate의 하위 클래스를 도입했다. 클래스명은 ReplyingKafkaTemplate이며 두 가지 추가 메서드가 있다. 다음은 메서드 서명을 보여준다.

RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record);

RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record, Duration replyTimeout);

(Message<?>로 요청/응답 참고)

결과는 결과(또는 제한시간에 대한 예외)로 비동기 CompletableFuture이다. 결과에는 KafkaTemplate.send()를 호출한 결과인 sendFuture 프로퍼티도 있다. 이 future를 사용하여 전송 작업의 결과를 확인할 수 있다.

버전 3.0에서는 이러한 메서드(및 해당 sendFuture 프로퍼티)에 의해 반환된 futureListenableFutures 대신 CompletableFutures로 변경됐다.

첫 번째 방법이 사용되거나, replyTimeout 아규먼트가 null인 경우 템플릿의 defaultReplyTimeout 프로퍼티가 사용된다(기본적으로 5초).

버전 2.8.8부터 템플릿에는 waitForAssignment라는 새로운 메서드가 있다. 이는 응답 컨테이너가 auto.offset.reset=latest로 구성되어 컨테이너가 초기화되기 전에 요청 및 응답 전송을 방지하는 데 유용하다.

수동 파티션 할당(그룹 관리 없음)을 사용하는 경우, 첫 번째 폴링이 완료될 때까지 알림이 전송되지 않으므로 대기 기간(duration for the wait)은 컨테이너의 pollTimeout 프로퍼티보다 커야 한다.

다음 스프링부트 애플리케이션은 이 기능을 사용하는 방법의 예제를 보여준다.

@SpringBootApplication
public class KRequestingApplication {

    public static void main(String[] args) {
        SpringApplication.run(KRequestingApplication.class, args).close();
    }

    @Bean
    public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
        return args -> {
            if (!template.waitForAssignment(Duration.ofSeconds(10))) {
                throw new IllegalStateException("Reply container did not initialize");
            }
            ProducerRecord<String, String> record = new ProducerRecord<>("kRequests", "foo");
            RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
            SendResult<String, String> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
            System.out.println("Sent ok: " + sendResult.getRecordMetadata());
            ConsumerRecord<String, String> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
            System.out.println("Return value: " + consumerRecord.value());
        };
    }

    @Bean
    public ReplyingKafkaTemplate<String, String, String> replyingTemplate(
        ProducerFactory<String, String> pf,
        ConcurrentMessageListenerContainer<String, String> repliesContainer
    ) {
        return new ReplyingKafkaTemplate<>(pf, repliesContainer);
    }

    @Bean
    public ConcurrentMessageListenerContainer<String, String> repliesContainer(
        ConcurrentKafkaListenerContainerFactory<String, String> containerFactory
    ) {
        ConcurrentMessageListenerContainer<String, String> repliesContainer =containerFactory.createContainer("kReplies");
        repliesContainer.getContainerProperties().setGroupId("repliesGroup");
        repliesContainer.setAutoStartup(false);
        return repliesContainer;
    }

    @Bean
    public NewTopic kRequests() {
        return TopicBuilder.name("kRequests")
            .partitions(10)
            .replicas(2)
            .build();
    }

    @Bean
    public NewTopic kReplies() {
        return TopicBuilder.name("kReplies")
            .partitions(10)
            .replicas(2)
            .build();
    }
}

스프링 부트의 자동 구성된 컨테이너 팩토리를 사용하여 응답(reply) 컨테이너를 만들 수 있다.

응답에 중요한 디시리얼라이저를 사용하는 경우, 구성된 디시리얼라이저에 위임하는 ErrorHandlingDeserializer를 사용하는 것이 좋다. 이렇게 구성하면 RequestReplyFuture가 예외를 처리하며 완료되고 cause 프로퍼티에 DeserializationException이 있는 ExecutionException을 포착할 수 있다.

버전 2.6.7부터, 템플릿은 DeserializationExceptions를 감지하는 것 외에도 제공된 경우 replyErrorChecker 함수를 호출한다. 예외를 반환하면 future는 예외를 처리하며 완료된다.

여기에 예제가 있다.

template.setReplyErrorChecker(record -> {
    Header error = record.headers().lastHeader("serverSentAnError");
    if (error != null) {
        return new MyException(new String(error.value()));
    }
    else {
        return null;
    }
});

...

RequestReplyFuture<Integer, String, String> future = template.sendAndReceive(record);
try {
    future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
    ConsumerRecord<Integer, String> consumerRecord = future.get(10, TimeUnit.SECONDS);
    ...
}
catch (InterruptedException e) {
    ...
}
catch (ExecutionException e) {
    if (e.getCause instanceof MyException) {
        ...
    }
}
catch (TimeoutException e) {
    ...
}

템플릿은 서버 측에서 에코해야 하는 헤더(기본적으로 KafkaHeaders.CORRELATION_ID라는 이름)를 설정한다.

이 경우 다음 @KafkaListener 애플리케이션이 응답한다.

@SpringBootApplication
public class KReplyingApplication {

    public static void main(String[] args) {
        SpringApplication.run(KReplyingApplication.class, args);
    }

    @KafkaListener(id="server", topics = "kRequests")
    @SendTo // 기본 replyTo 표현식 사용
    public String listen(String in) {
        System.out.println("Server received: " + in);
        return in.toUpperCase();
    }

    @Bean
    public NewTopic kRequests() {
        return TopicBuilder.name("kRequests")
            .partitions(10)
            .replicas(2)
            .build();
    }

    @Bean // 잭슨이 클래스패스에 있는 경우에는 필요하지 않다.
    public MessagingMessageConverter simpleMapperConverter() {
        MessagingMessageConverter messagingMessageConverter = new MessagingMessageConverter();
        messagingMessageConverter.setHeaderMapper(new SimpleKafkaHeaderMapper());
        return messagingMessageConverter;
    }

}

@KafkaListener 인프라는 상관 관계 ID를 에코하고 응답 토픽을 결정한다.

응답 보내기에 대한 자세한 내용은 @SendTo를 사용하여 리스너 결과 전달을 참고하자. 템플릿은 기본 헤더 KafKaHeaders.REPLY_TOPIC을 사용하여 응답이 전달되는 토픽을 나타낸다.

버전 2.2부터, 템플릿은 구성된 응답 컨테이너에서 응답 토픽 또는 파티션을 감지한다. 컨테이너가 단일 토픽 또는 단일 TopicPartitionOffset을 수신하도록 구성된 경우 응답 헤더를 설정하는 데 사용된다. 컨테이너가 다르게 구성된 경우 사용자는 응답 헤더를 설정해야 한다. 이 경우 초기화 중에 INFO 로그 메시지가 기록된다. 다음 예제에서는 KafkaHeaders.REPLY_TOPIC을 사용한다.

record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, "kReplies".getBytes()));

단일 응답 TopicPartitionOffset으로 구성하는 경우 각 인스턴스가 서로 다른 파티션을 수신하는 한 여러 템플릿에 대해 동일한 응답 토픽을 사용할 수 있다. 단일 응답 토픽으로 구성하는 경우 각 인스턴스는 서로 다른 group.id를 사용해야 한다. 이 경우 모든 인스턴스는 각 응답을 받지만 요청을 보낸 인스턴스만 상관 관계(correlation) ID를 찾는다. 이는 자동 크기 조정에 유용할 수 있지만 추가 네트워크 트래픽의 오버헤드와 원치 않는 각 응답을 삭제하는 데 드는 비용이 적다. 이 설정을 사용할 때 템플릿의 sharedReplyTopictrue로 설정하는 것이 좋다. 그러면 기본 ERROR 대신 DEBUG에 대한 예기치 않은 응답의 로깅 레벨이 줄어든다.

다음은 동일한 공유 응답 토픽을 사용하도록 응답 컨테이너를 구성하는 예제다.

@Bean
public ConcurrentMessageListenerContainer<String, String> replyContainer(
        ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {

    ConcurrentMessageListenerContainer<String, String> container = containerFactory.createContainer("topic2");
    container.getContainerProperties().setGroupId(UUID.randomUUID().toString()); // 유니크
    Properties props = new Properties();
    props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // 그러면 새 그룹은 이전 응답을 받지 않게 된다.
    container.getContainerProperties().setKafkaConsumerProperties(props);
    return container;
}

여러 클라이언트 인스턴스가 있고 이전 단락에서 설명한 대로 구성하지 않은 경우 각 인스턴스에는 전용 응답 토픽이 필요하다. 대안으로 KafkaHeaders.REPLY_PARTITION을 설정하고 각 인스턴스에 전용 파티션을 사용하는 것이다. 헤더에는 4바이트 int(빅 엔디안)가 포함되어 있다. 서버는 이 헤더를 사용하여 응답을 올바른 파티션으로 라우팅해야 한다(@KafkaListener가 이 작업을 수행함). 하지만 이 경우 회신 컨테이너는 카프카의 그룹 관리 기능을 사용해서는 안 되며 고정 파티션을 수신하도록 구성해야 한다(ContainerProperties 생성자에서 TopicPartitionOffset을 사용하여).

DefaultKafkaHeaderMapper를 사용하려면 Jackson이 클래스패스(@KafkaListener의 경우)에 있어야 한다. 사용할 수 없는 경우 메시지 컨버터에 헤더 매퍼가 없으므로 앞에서 설명한 것처럼 SimpleKafkaHeaderMapper를 사용하여 MessagingMessageConverter를 구성해야 한다.

기본적으로 3개의 헤더가 사용된다.

  • KafkaHeaders.CORRELATION_ID - 요청에 대한 응답을 연관시키는 데 사용된다.
  • KafkaHeaders.REPLY_TOPIC - 서버에게 응답할 위치를 알려주는 데 사용된다.
  • KafkaHeaders.REPLY_PARTITION - (옵셔널) 응답할 파티션을 서버에 알리는 데 사용된다.

이러한 헤더명은 @KafkaListener 인프라에서 응답을 라우팅하는 데 사용된다.

버전 2.3부터, 헤더명을 커스텀할 수 있다. 템플릿에는 CorrelationHeaderName, replyTopicHeaderNamereplyPartitionHeaderName 3가지 프로퍼티가 있다. 이는 서버가 스프링 애플리케이션이 아닌 경우(또는 @KafkaListener를 사용하지 않는 경우) 유용하다.

반대로, 요청하는 애플리케이션이 스프링 애플리케이션이 아니고 버전 3.0부터 다른 헤더에 상관 관계 정보를 넣는 경우 리스너 컨테이너 팩토리에서 커스텀 CorrelationHeaderName을 구성할 수 있으며 해당 헤더가 에코된다. 이전에는 리스너가 커스텀 상관 관계 헤더를 에코해야 했다.

Message<?>로 요청/응답<?>(Request/Reply with Message<?>s)

버전 2.7에서는 spring-messagingMessage<?> 추상화를 보내고 받기 위해 ReplyingKafkaTemplate에 메서드를 추가했다.

RequestReplyMessageFuture<K, V> sendAndReceive(Message<?> message);

<P> RequestReplyTypedMessageFuture<K, V, P> sendAndReceive(Message<?> message, ParameterizedTypeReference<P> returnType);

이는 템플릿의 기본 replyTimeout을 사용하며, 메서드 호출에서 제한시간 초과가 발생할 수 있는 오버로드된 버전도 있습니다.

버전 3.0에서는, 이러한 메서드(및 해당 sendFuture 프로퍼티)에 의해 반환된 futureListenableFutures 대신 CompletableFutures로 변경됐다.

컨슈머의 디시리얼라이저(Deserializer) 또는 템플릿의 MessageConverter가 응답 메시지의 구성 또는 타입 메타데이터를 통해 추가 정보 없이 페이로드를 변환할 수 있는 경우 첫 번째 방법을 사용하자.

메시지 컨버터를 지원하기 위해 반환 타입에 대한 타입 정보를 제공해야 하는 경우 두 번째 방법을 사용하자. 또한 서버 측이 스프링 애플리케이션이 아닌 경우와 같이 응답에 타입 메타데이터가 없는 경우에도 동일한 템플릿이 다른 타입을 수신할 수 있다. 다음은 후자의 예제다.

템플릿 빈

자바

@Bean
ReplyingKafkaTemplate<String, String, String> template(
    ProducerFactory<String, String> pf, 
    ConcurrentKafkaListenerContainerFactory<String, String> factory
) {

    ConcurrentMessageListenerContainer<String, String> replyContainer = factory.createContainer("replies");
    replyContainer.getContainerProperties().setGroupId("request.replies");
    ReplyingKafkaTemplate<String, String, String> template = new ReplyingKafkaTemplate<>(pf, replyContainer);
    template.setMessageConverter(new ByteArrayJsonMessageConverter());
    template.setDefaultTopic("requests");
    return template;
}

코틀린

@Bean
fun template(
    pf: ProducerFactory<String?, String>?,
    factory: ConcurrentKafkaListenerContainerFactory<String?, String?>
): ReplyingKafkaTemplate<String?, String, String?> {
    val replyContainer = factory.createContainer("replies")
    replyContainer.containerProperties.groupId = "request.replies"
    val template = ReplyingKafkaTemplate(pf, replyContainer)
    template.messageConverter = ByteArrayJsonMessageConverter()
    template.defaultTopic = "requests"
    return template
}

템플릿 사용

자바

RequestReplyTypedMessageFuture<String, String, Thing> future1 = 
    template.sendAndReceive(
        MessageBuilder.withPayload("getAThing").build(),
        new ParameterizedTypeReference<Thing>() { }
    );

log.info(future1.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
Thing thing = future1.get(10, TimeUnit.SECONDS).getPayload();
log.info(thing.toString());

RequestReplyTypedMessageFuture<String, String, List<Thing>> future2 = 
    template.sendAndReceive(
        MessageBuilder.withPayload("getThings").build(), 
        new ParameterizedTypeReference<List<Thing>>() { }
    );

log.info(future2.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
List<Thing> things = future2.get(10, TimeUnit.SECONDS).getPayload();
things.forEach(thing1 -> log.info(thing1.toString()));

코틀린

val future1: RequestReplyTypedMessageFuture<String?, String?, Thing?>? =
    template.sendAndReceive(MessageBuilder.withPayload("getAThing").build(), object : ParameterizedTypeReference<Thing?>() {})
log.info(future1?.sendFuture?.get(10, TimeUnit.SECONDS)?.recordMetadata?.toString())
val thing = future1?.get(10, TimeUnit.SECONDS)?.payload
log.info(thing.toString())

val future2: RequestReplyTypedMessageFuture<String?, String?, List<Thing?>?>? =
    template.sendAndReceive(MessageBuilder.withPayload("getThings").build(),object : ParameterizedTypeReference<List<Thing?>?>() {})
log.info(future2?.sendFuture?.get(10, TimeUnit.SECONDS)?.recordMetadata.toString())
val things = future2?.get(10, TimeUnit.SECONDS)?.payload
things?.forEach(Consumer { thing1: Thing? -> log.info(thing1.toString()) })

응답 타입 Message<?>(Reply Type Message<?>)

@KafkaListener가 2.5 이전 버전에서는 Message<?>를 반환할 때, 응답 토픽과 상관 관계(correlation) ID 헤더를 채워야 했다. 이 예제에서는 요청의 응답 토픽(topic) 헤더를 사용한다.

@KafkaListener(id = "requestor", topics = "request")
@SendTo
public Message<?> messageReturn(String in) {
    return MessageBuilder.withPayload(in.toUpperCase())
            .setHeader(KafkaHeaders.TOPIC, replyTo)
            .setHeader(KafkaHeaders.KEY, 42)
            .setHeader(KafkaHeaders.CORRELATION_ID, correlation)
            .build();
}

또한 응답 레코드에 키를 설정하는 방법도 보여준다.

버전 2.5부터 프레임워크는 이러한 헤더가 누락되었는지 감지하고 이를 토픽(@SendTo 값에서 결정된 토픽 또는 수신 KafkaHeaders.REPLY_TOPIC 헤더(있는 경우))로 채운다. 또한 수신 KafkaHeaders.CORRELATION_IDKafkaHeaders.REPLY_PARTITION(있는 경우)을 에코한다.

@KafkaListener(id = "requestor", topics = "request")
@SendTo  // 기본 REPLY_TOPIC 헤더
public Message<?> messageReturn(String in) {
    return MessageBuilder.withPayload(in.toUpperCase())
            .setHeader(KafkaHeaders.KEY, 42)
            .build();
}

다중 응답 집계(Aggregating Multiple Replies)

ReplyingKafkaTemplate 사용 시 템플릿은 단일 요청/응답 시나리오에만 적용된다. 단일 메시지에 여러 수신자가 응답을 반환하는 경우 AggregatingReplyingKafkaTemplate을 사용할 수 있다. 이는 Scatter-Gather 엔터프라이즈 통합 패턴의 클라이언트측 구현체다.

ReplyingKafkaTemplate과 마찬가지로 AggregatingReplyingKafkaTemplate 생성자는 응답을 수신하기 위해 생산자 팩토리와 리스너 컨테이너를 사용한다. 여기에는 응답을 받을 때마다 참조되는 세 번째 파라미터 BiPredicate<List<ConsumerRecord<K, R>>, Boolean> releaseStrategy가 있다. 조건자(predicate)가 true를 반환하면 ConsumerRecords 컬렉션을 사용하여 sendAndReceive 메서드에서 반환된 Future를 완성한다.

추가 프로퍼티 returnPartialOnTimeout(기본값 false)이 있다. true로 설정되면 KafkaReplyTimeoutException을 사용하여 future를 완료하는 대신, 부분적인 결과가 future를 정상적으로 완료처리 한다(최소 하나의 응답 레코드가 수신된 경우).

버전 2.3.5부터, 조건자(predicate)는 제한시간 후에도 호출된다(returnPartialOnTimeouttrue인 경우). 첫 번째 아규먼트는 현재 레코드 목록이다. 두 번째는 이 호출이 제한시간에 발생한 경우 true다. 조건자(predicate)는 레코드 목록을 수정할 수 있다.

AggregatingReplyingKafkaTemplate<Integer, String, String> template =
        new AggregatingReplyingKafkaTemplate<>(producerFactory, container, coll -> coll.size() == releaseSize);
...
RequestReplyFuture<Integer, String, Collection<ConsumerRecord<Integer, String>>> future = template.sendAndReceive(record);
future.getSendFuture().get(10, TimeUnit.SECONDS); // 전송완료
ConsumerRecord<Integer, Collection<ConsumerRecord<Integer, String>>> consumerRecord = future.get(30, TimeUnit.SECONDS);

반환 타입은 ConsumerRecords 컬렉션의 값을 가진 ConsumerRecord이다. “외부” ConsumerRecord는 “실제” 레코드가 아니며, 요청에 대해 수신된 실제 응답 레코드의 보유자(holder)로서 템플릿에 의해 합성(synthesized)된다. 일반 릴리스(release)가 발생하면(릴리스 전략이 true를 반환할 경우) 토픽이 AggregatedResults로 설정된다. returnPartialOnTimeouttrue이고 제한시간 초과가 발생하면(그리고 최소한 하나의 응답 레코드가 수신된 경우) 토픽은 partialResultsAfterTimeout으로 설정된다. 템플릿은 이러한 “토픽”명에 대한 상수 정적 변수(constant static variable)를 제공한다.

/**
 * Pseudo: 집계된 "외부" {@link ConsumerRecords}에 대한 토픽명
 * 릴리스 전략에 따라 정상적인 릴리스 이후 해당 값이 발생한다.
 */
public static final String AGGREGATED_RESULTS_TOPIC = "aggregatedResults";

/**
 * Pseudo 집계된 "외부" {@link ConsumerRecords}의 토픽명
 * 제한시간 초과 후 값이 반환된다.
 */
public static final String PARTIAL_RESULTS_AFTER_TIMEOUT_TOPIC = "partialResultsAfterTimeout";

컬렉션의 실제 ConsumerRecords에는 응답이 수신된 실제 토픽이 포함돼있다.

응답에 대한 리스너 컨테이너는 AckMode.MANUAL 또는 AckMode.MANUAL_IMMEDIATE로 구성되어야 한다. 컨슈머 프로퍼티 enable.auto.commitfalse여야 한다(버전 2.3 이후의 기본값). 메시지 손실 가능성을 방지하기 위해, 템플릿은 미해결 요청이 없을 때만 오프셋을 커밋한다. 즉, 마지막 미해결 요청이 릴리스 전략에 의해 릴리스될 때다. 재조정 후에, 중복 응답 전달이 가능하다. 이는 진행 중인 요청에 의해 무시된다. 이미 릴리스된 응답에 대해 중복된 응답을 받은 경우 오류 로그 메시지가 표시될 수 있다.

이 집계(aggregating) 템플릿과 함께 ErrorHandlingDeserializer를 사용하는 경우 프레임워크는 DeserializationExceptions를 자동으로 감지하지 않는다. 대신, 레코드(null 값 포함)는 헤더의 디시리얼라이저 예외와 함께 그대로 반환된다. 애플리케이션에서는 ReplyingKafkaTemplate.checkDeserialization() 유틸리티 메서드를 호출하여 디시리얼라이저 예외가 발생했는지 확인하는 것이 좋다. 자세한 내용은 해당 JavaDocs를 참고하자. 이 집계(aggregating) 템플릿에 대해서는 replyErrorChecker도 호출되지 않는다. 응답의 각 엘리먼트를 확인해야 한다.