메세지 리스너 컨테이너(Message Listener Containers)

두 가지 MessageListenerContainer 구현체가 제공된다.

  • KafkaMessageListenerContainer
  • ConcurrentMessageListenerContainer

KafkaMessageListenerContainer는 단일 스레드의 모든 토픽 또는 파티션에서 모든 메시지를 수신한다. ConcurrentMessageListenerContainer는 다중 스레드 소비를 제공하기 위해 하나 이상의 KafkaMessageListenerContainer 인스턴스에 위임한다.

버전 2.2.7부터 리스너 컨테이너에 RecordInterceptor를 추가할 수 있다. 레코드 검사 또는 수정을 허용하는 리스너 실행 전 호출된다. 인터셉터가 null을 반환하면 리스너가 호출되지 않는다. 버전 2.7부터 리스너가 종료된 후(일반적으로 또는 예외를 발생시켜) 호출되는 추가 메서드가 있다. 또한 버전 2.7부터 배치 리스너에 유사한 기능을 제공하는 BatchInterceptor가 있다. 또한 ConsumerAwareRecordInterceptor(및 BatchInterceptor)는 Consumer<?, ?>에 대한 접근을 제공한다. 예를 들어 인터셉터의 컨슈머 메트릭에 접근하는 데 사용될 수 있다.

컨슈머의 위치 및/또는 이러한 인터셉터의 커밋된 오프셋에 영향을 미치는 메서드를 실행해서는 안 된다. 컨테이너는 그러한 정보를 관리해야 한다.

인터셉터가 레코드를 변경하는 경우(새 레코드를 생성하여) 레코드 손실과 같은 예상치 못한 부작용을 피하기 위해 토픽, 파티션 및 오프셋을 동일하게 유지해야 한다.

CompositeRecordInterceptorCompositeBatchInterceptor를 사용하여 여러 인터셉터를 호출할 수 있다.

기본적으로, 버전 2.8부터 트랜잭션을 사용할 때 인터셉터는 트랜잭션이 시작되기 전 호출된다. 대신 트랜잭션이 시작된 후 인터셉터를 호출하려면 리스너 컨테이너의 InterceptBeforeTx 프로퍼티를 false로 설정할 수 있다. 버전 2.9부터 이는 KafkaAwareTransactionManagers뿐 아니라 모든 트랜잭션 매니저에 적용된다. 예를 들어, 이를 통해 인터셉터는 컨테이너에 의해 시작된 JDBC 트랜잭션에 참여할 수 있다.

버전 2.3.8, 2.4.6부터 ConcurrentMessageListenerContainer는 이제 동시성이 1보다 큰 경우 정적 멤버십(Membership)을 지원한다. group.instance.id에는 1부터 시작하는 n과 함께 -n 접미사가 붙는다. 예를 들어, 이는 증가된 session.timeout.ms와 함께 애플리케이션 인스턴스가 재시작될 때 재조정 이벤트를 줄이는 데 사용할 수 있다.

KafkaMessageListenerContainer 사용(Using KafkaMessageListenerContainer)

다음 생성자를 사용할 수 있다.

public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory, ContainerProperties containerProperties)

ContainerProperties 객체에서 ConsumerFactory와 토픽, 파티션, 기타 구성에 대한 정보를 수신한다. ContainerProperties에는 다음과 같은 생성자가 있다.

public ContainerProperties(TopicPartitionOffset... topicPartitions)
public ContainerProperties(String... topics)
public ContainerProperties(Pattern topicPattern)

첫 번째 생성자는 TopicPartitionOffset 아규먼트 배열을 사용하여 (컨슈머 assign() 메서드 사용) 사용할 파티션과 옵셔널 초기 오프셋을 컨테이너에 명시적으로 설명한다. 양수 값은 기본적으로 오프셋 절대값이다. 음수 값은 기본적으로 파티션 내의 마지막 오프셋을 기준으로 한다. 추가 부울(boolean) 아규먼트를 받는 TopicPartitionOffset의 생성자가 제공된다. 이것이 true인 경우, 초기 오프셋(양수 또는 음수)은 이 컨슈머의 현재 위치를 기준으로 한다. 컨테이너가 시작될 때 오프셋이 적용된다. 두 번째는 토픽 배열을 받고 카프카는 group.id 프로퍼티를 기반으로 파티션을 할당한다. 즉, 그룹 전체에 파티션을 배포한다. 세 번째는 정규식표현식 패턴을 사용하여 토픽을 선택한다.

MessageListener를 컨테이너에 할당하려면 컨테이너를 생성할 때 ContainerProps.setMessageListener 메서드를 사용할 수 있다. 다음 예제에서는 그 방법을 보여준다.

ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
containerProps.setMessageListener(
    new MessageListener<Integer, String>() {
        ...
    }
);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps());
KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf, containerProps);
return container;

DefaultKafkaConsumerFactory를 생성할 때, 위와 같이 프로퍼티만 받는 생성자를 사용하면 키 및 값 Deserializer 클래스가 구성 시 선택된다는 의미다. 또는 Deserializer 인스턴스가 키 및/또는 값에 대한 DefaultKafkaConsumerFactory 생성자에 전달될 수 있다. 이 경우 모든 컨슈머는 동일한 인스턴스를 공유한다. 또 다른 옵션은 각 Consumer에 대해 별도의 Deserializer 인스턴스를 얻는 데 사용되는 Supplier<Deserializer> s(버전 2.3부터)를 제공하는 것이다.

DefaultKafkaConsumerFactory<Integer, CustomValue> cf = new DefaultKafkaConsumerFactory<>(consumerProps(), null, () -> new CustomValueDeserializer());
KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf, containerProps);
return container;

설정할 수 있는 다양한 프로퍼티에 대한 자세한 내용은 Javadoc for ContainerProperties를 참고하자.

버전 2.1.1부터 logContainerConfig라는 새로운 프로퍼티를 사용할 수 있다. true이고 INFO 로깅이 활성화되면 각 리스너 컨테이너는 해당 구성 프로퍼티를 요약하는 로그 메시지를 작성한다.

기본적으로, 토픽 오프셋 커밋의 로깅은 DEBUG 로깅 레벨에서 수행된다. 버전 2.1.2부터 commitLogLevel이라는 ContainerProperties의 프로퍼티를 사용하면 이러한 메시지에 대한 로그 레벨을 지정할 수 있다. 예를 들어 로그 레벨을 INFO로 변경하려면 ContainerProperties.setCommitLogLevel(LogIfLevelEnabled.Level.INFO);을 사용할 수 있다.

버전 2.2부터 missingTopicsFatal이라는 새 컨테이너 프로퍼티가 추가됐다(2.3.4부터 기본값: false). 이렇게 하면 구성된 토픽 중 하나라도 브로커에 없으면 컨테이너가 시작되지 않는다. 컨테이너가 토픽 정규표현식 패턴(regex)을 수신하도록 구성된 경우에는 적용되지 않는다. 이전에는 많은 메시지를 기록하는 동안 토픽이 나타날 때까지 기다리는 Consumer.poll() 메서드 내에서 컨테이너 스레드가 반복됐다. 로그 외에는 문제가 있다는 징후가 없었다.

버전 2.8부터 새로운 컨테이너 프로퍼티 authExceptionRetryInterval이 도입됐다. 이로 인해 컨테이너는 KafkaConsumer에서 AuthenticationException 또는 AuthorizationException을 가져온 후 메시지 가져오기를 재시도한다. 예를 들어, 구성된 사용자가 특정 토픽을 읽을 수 있는 접근이 거부되었거나 자격 증명이 잘못된 경우 이러한 상황이 발생할 수 있다. authExceptionRetryInterval을 정의하면 적절한 권한이 부여될 때 컨테이너가 복구될 수 있다.

기본적으로 인터벌(interval)은 구성되지 않는다. 인증 및 권한 부여 오류는 치명적인 것으로 간주되어 컨테이너가 중지된다.

버전 2.8부터 컨슈머 팩토리를 생성할 때 디시리얼라이저를 객체로 제공하는 경우(생성자에서 또는 setter를 통해) 팩토리는 프로퍼티로 구성하기 위해 configure() 메서드를 호출한다.

ConcurrentMessageListenerContainer 사용(Using ConcurrentMessageListenerContainer)

단일 생성자는 KafkaListenerContainer 생성자와 유사하다. 다음 목록은 생성자의 시그니처(signature)를 보여준다.

public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory, ContainerProperties containerProperties)

동시성(concurrency) 프로퍼티도 있다. 예를 들어, Container.setConcurrency(3)KafkaMessageListenerContainer 인스턴스 3개를 생성한다.

첫 번째 생성자의 경우 카프카는 그룹 관리 기능을 사용하여 컨슈머에게 파티션을 제공한다.


important

여러 토픽을 청취할 때 기본 파티션 분포가 예상한 것과 다를 수 있다. 예를 들어, 각각 5개의 파티션이 있는 3개의 토픽이 있고 concurrency=15를 사용하려는 경우, 각 토픽에서 하나의 파티션이 할당된 5개는 활성 컨슈머로 표시되고 나머지 10개의 컨슈머는 유휴 상태가 된다. 이는 기본 카프카 PartitionAssignorRangeAssignor(Javadoc 참고)이기 때문이다. 이 시나리오에서는 대신 모든 컨슈머에게 파티션을 배포하는 RoundRobinAssignor 사용을 고려할 수 있다. 그런 다음 각 컨슈머에게 하나의 토픽 또는 파티션이 할당된다. PartitionAssignor를 변경하려면 DefaultKafkaConsumerFactory에 제공된 프로퍼티에서 partition.location.strategy 컨슈머 프로퍼티(ConsumerConfigs.PARTITION_ASSIGNMENT_STRATEGY_CONFIG)을 설정할 수 있다.

스프링 부트를 사용할 때 다음과 같이 전략을 할당할 수 있다.

spring.kafka.consumer.properties.partition.assignment.strategy=\
org.apache.kafka.clients.consumer.RoundRobinAssignor

컨테이너 프로퍼티가 TopicPartitionOffsets로 구성되면 ConcurrentMessageListenerContainer는 위임 KafkaMessageListenerContainer 인스턴스 전체에 TopicPartitionOffset 인스턴스를 배포한다.

예를 들어, 6개의 TopicPartitionOffset 인스턴스가 제공되고 concurrency3이다. 각 컨테이너는 두 개의 파티션을 갖는다. 5개의 TopicPartitionOffset 인스턴스인 경우, 두 개의 컨테이너는 두 개의 파티션을 얻고 세 번째 컨테이너는 하나를 얻는다. concurrencyTopicPartitions 수보다 크면 각 컨테이너가 하나의 파티션을 갖도록 concurrency가 조정된다.

client.id 프로퍼티(설정된 경우)에는 -n이 추가된다. 여기서 n은 동시성(concurrency)에 해당하는 컨슈머 인스턴스다. 이는 JMX가 활성화된 경우 MBean에 고유한 이름을 제공하는 데 필요하다.

버전 1.3부터 ​​MessageListenerContainer는 기본 KafkaConsumer 메트릭에 대한 접근을 제공한다. ConcurrentMessageListenerContainer의 경우 metrics() 메소드는 모든 대상 KafkaMessageListenerContainer 인스턴스에 대한 지표를 반환한다. 메트릭은 기본 KafkaConsumer에 제공된 client-id Map<MetricName, ? extends Metric>로 그룹화된다.

2.3부터 ContainerProperties는 리스너 컨테이너의 기본 루프가 KafkaConsumer.poll() 호출 사이에 휴면 상태가 변하는 버전 BetweenPolls 옵션을 제공한다. 실제 슬립 인터벌(sleep interva) 처리는 옵션의 최소 값과 max.poll.interval.ms 컨슈머 구성과 현재 처리 시간차이로 선택된다.

오프셋 커밋(Committing Offsets)

오프셋 커밋을 위한 여러 옵션이 제공된다. enable.auto.commit 컨슈머 프로퍼티가 true인 경우 카프카는 해당 구성에 따라 오프셋을 자동 커밋한다. false인 경우 컨테이너는 여러 AckMode 설정을 지원한다(다음 목록에 설명되어 있음). 기본 AckModeBATCH다. 버전 2.3부터 프레임워크는 구성에 명시적으로 설정되지 않는 한 enable.auto.commitfalse로 설정한다. 이전에는 프로퍼티가 설정되지 않은 경우 카프카 기본값(true)이 사용됐다.

컨슈머 poll() 메서드는 하나 이상의 ConsumerRecord를 반환한다. MessageListener는 각 레코드에 대해 호출된다. 다음 목록은 각 AckMode에 대해 컨테이너가 수행하는 작업을 설명한다(트랜잭션이 사용되지 않는 경우).

  • RECORD: 레코드 처리 후 리스너가 반환되면 오프셋을 커밋한다.
  • BATCH: poll()에서 반환된 모든 레코드가 처리되면 오프셋을 커밋한다.
  • TIME: 마지막 커밋 이후 ackTime이 초과된 동안 poll()에서 반환된 모든 레코드가 처리되면 오프셋을 커밋한다.
  • COUNT: 마지막 커밋 이후 ackCount 레코드가 수신된 경우 poll()에서 반환된 모든 레코드가 처리되면 오프셋을 커밋한다.
  • COUNT_TIME: TIMECOUNT와 유사하지만 두 조건 중 하나가 true인 경우 커밋이 수행된다.
  • MANUAL: 메시지 리스너는 Acknowledgementacknowledge()를 담당한다. 이후에는 BATCH와 동일한 의미가 적용됩니다.
  • MANUAL_IMMEDIATE: 리스너가 Acknowledgement.acknowledge() 메서드를 호출하면 즉시 오프셋을 커밋한다.

트랜잭션을 사용할 때, 오프셋은 트랜잭션으로 전송되며 의미 체계는 리스너 타입(레코드 또는 일괄 처리)에 따라 RECORD 또는 BATCH와 동일하다.

MANUALMANUAL_IMMEDIATE에서는 리스너가 AcknowledgeingMessageListener 또는 BatchAcknowledgeingMessageListener여야 한다. 메시지 리스너를 참고하자.

syncCommits 컨테이너 프로퍼티에 따라 컨슈머의 commitSync() 또는 commitAsync() 메서드가 사용된다. syncCommits는 기본적으로 true다. setSyncCommitTimeout도 참고하자. 비동기 커밋 결과를 얻으려면 setCommitCallback을 참고하자. 기본 콜백은 오류(및 디버그 수준의 성공)를 기록하는 LoggingCommitCallback다.

리스너 컨테이너에는 오프셋 커밋을 위한 자체 메커니즘이 있으므로 카프카 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIGfalse인 것을 선호한다. 버전 2.3부터는 컨슈머 팩토리에서 특별히 설정하거나 컨테이너의 컨슈머 프로퍼티를 오버라이드하지 않는 한 무조건 false로 설정된다.

Acknowledgment에는 다음과 같은 메소드가 있다.

public interface Acknowledgment {
    void acknowledge();
}

이 방법을 사용하면 오프셋이 커밋되는 시점을 리스너가 제어할 수 있다.

버전 2.3부터 Acknowledgement 인터페이스에는 nack(long sleep)nack(int index, long sleep)두 가지 추가 메서드가 있다. 첫 번째는 레코드 리스너와 함께 사용되고 두 번째는 배치 리스너와 함께 사용된다. 리스너 타입에 대해 잘못된 메소드를 호출하면 IllegalStateException이 발생한다.

nack()을 사용하여 부분 배치(partial batch)를 커밋하려면, 트랜잭션을 사용할 때 AckModeMANUAL로 설정하자. nack()를 호출하면 성공적으로 처리된 레코드의 오프셋이 트랜잭션에 전송된다.

nack()은 리스너를 호출하는 컨슈머 스레드에서만 호출할 수 있다.

Out of Order Commit을 사용할 때는 nack()이 허용되지 않는다.

레코드 리스너를 사용하면, nack()가 호출될 때, 보류 중인 모든 오프셋이 커밋되고, 마지막 폴링의 나머지 레코드가 삭제되며, 실패한 레코드와 처리되지 않은 레코드가 다음 poll() 시 다시 전달되도록 해당 파티션에서 검색이 수행된다. sleep 아규먼트를 설정하여, 재전송 전에 컨슈머를 일시 중지할 수 있다. 이는 컨테이너가 DefaultErrorHandler로 구성될 때 예외를 발생시키는 것과 유사한 기능이다.

nack()은 할당된 모든 파티션을 포함하여 지정된 슬립 기간 동안 전체 리스너를 일시 중지한다.

배치 리스너를 사용할 때, 배치 내에서 실패가 발생한 인덱스를 지정할 수 있다. nack()이 호출되면 실패하고 폐기된 레코드에 대한 파티션에서 인덱스 및 검색이 수행되기 전에 레코드에 대해 오프셋이 커밋되어 다음 poll()에서 다시 전달된다.

자세한 내용은 컨테이너 오류 핸들러를 참고하자.

컨슈머는 슬립 중 일시 중단되므로 컨슈머를 계속 유지하기 위해 브로커를 계속 폴링한다. 실제 슬립 시간과 해당 resolution은 기본값이 5초인 컨테이너의 pollTimeout에 따라 다르다. 최소 슬립 시간은 pollTimeout과 동일하며 모든 슬립 시간은 이것의 배수가 된다. 짧은 슬립 시간의 경우 또는 정확도를 높이려면 컨테이너의 pollTimeout을 줄이는 것이 좋다.

버전 3.0.10부터, 배치 리스너는 Acknowledgement 아규먼트에 대한 acknowledge(index)를 사용하여, 배치 부분의 오프셋을 커밋할 수 있다. 이 메소드가 호출되면, 인덱스에 있는 레코드(및 모든 이전 레코드)의 오프셋이 커밋된다. 부분 배치 커밋이 수행된 후 acknowledge()을 호출하면 배치의 나머지 부분에 대한 오프셋이 커밋된다. 다음 제한 사항이 적용된다.

  • AckMode.MANUAL_IMMEDIATE가 필요하다.
  • 메서드는 리스너 스레드에서 호출되어야 한다.
  • 리스너는 원시 ConsumerRecords가 아닌 목록을 사용해야 한다.
  • 인덱스느 목록의 엘리먼트 범위 내에 있어야 한다.
  • 인덱스는 이전 호출에 사용된 인덱스보다 커야 한다.

이러한 제한 사항은 적용되며 메서드는 위반 사항에 따라 IllegalArgumentException 또는 IllegalStateException을 발생시킨다.