메세지 리스너 컨테이너(Message Listener Containers)
두 가지 MessageListenerContainer
구현체가 제공된다.
KafkaMessageListenerContainer
ConcurrentMessageListenerContainer
KafkaMessageListenerContainer
는 단일 스레드의 모든 토픽 또는 파티션에서 모든 메시지를 수신한다. ConcurrentMessageListenerContainer
는 다중 스레드 소비를 제공하기 위해 하나 이상의 KafkaMessageListenerContainer
인스턴스에 위임한다.
버전 2.2.7부터 리스너 컨테이너에 RecordInterceptor
를 추가할 수 있다. 레코드 검사 또는 수정을 허용하는 리스너 실행 전 호출된다. 인터셉터가 null
을 반환하면 리스너가 호출되지 않는다. 버전 2.7부터 리스너가 종료된 후(일반적으로 또는 예외를 발생시켜) 호출되는 추가 메서드가 있다. 또한 버전 2.7부터 배치 리스너에 유사한 기능을 제공하는 BatchInterceptor
가 있다. 또한 ConsumerAwareRecordInterceptor
(및 BatchInterceptor
)는 Consumer<?, ?>
에 대한 접근을 제공한다. 예를 들어 인터셉터의 컨슈머 메트릭에 접근하는 데 사용될 수 있다.
컨슈머의 위치 및/또는 이러한 인터셉터의 커밋된 오프셋에 영향을 미치는 메서드를 실행해서는 안 된다. 컨테이너는 그러한 정보를 관리해야 한다.
인터셉터가 레코드를 변경하는 경우(새 레코드를 생성하여) 레코드 손실과 같은 예상치 못한 부작용을 피하기 위해 토픽, 파티션 및 오프셋을 동일하게 유지해야 한다.
CompositeRecordInterceptor
및 CompositeBatchInterceptor
를 사용하여 여러 인터셉터를 호출할 수 있다.
기본적으로, 버전 2.8부터 트랜잭션을 사용할 때 인터셉터는 트랜잭션이 시작되기 전 호출된다. 대신 트랜잭션이 시작된 후 인터셉터를 호출하려면 리스너 컨테이너의 InterceptBeforeTx
프로퍼티를 false
로 설정할 수 있다. 버전 2.9부터 이는 KafkaAwareTransactionManagers
뿐 아니라 모든 트랜잭션 매니저에 적용된다. 예를 들어, 이를 통해 인터셉터는 컨테이너에 의해 시작된 JDBC 트랜잭션에 참여할 수 있다.
버전 2.3.8, 2.4.6부터 ConcurrentMessageListenerContainer
는 이제 동시성이 1보다 큰 경우 정적 멤버십(Membership)을 지원한다. group.instance.id
에는 1부터 시작하는 n과 함께 -n 접미사가 붙는다. 예를 들어, 이는 증가된 session.timeout.ms
와 함께 애플리케이션 인스턴스가 재시작될 때 재조정 이벤트를 줄이는 데 사용할 수 있다.
KafkaMessageListenerContainer 사용(Using KafkaMessageListenerContainer)
다음 생성자를 사용할 수 있다.
public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory, ContainerProperties containerProperties)
ContainerProperties
객체에서 ConsumerFactory
와 토픽, 파티션, 기타 구성에 대한 정보를 수신한다. ContainerProperties
에는 다음과 같은 생성자가 있다.
public ContainerProperties(TopicPartitionOffset... topicPartitions)
public ContainerProperties(String... topics)
public ContainerProperties(Pattern topicPattern)
첫 번째 생성자는 TopicPartitionOffset
아규먼트 배열을 사용하여 (컨슈머 assign()
메서드 사용) 사용할 파티션과 옵셔널 초기 오프셋을 컨테이너에 명시적으로 설명한다. 양수 값은 기본적으로 오프셋 절대값이다. 음수 값은 기본적으로 파티션 내의 마지막 오프셋을 기준으로 한다. 추가 부울(boolean)
아규먼트를 받는 TopicPartitionOffset
의 생성자가 제공된다. 이것이 true
인 경우, 초기 오프셋(양수 또는 음수)은 이 컨슈머의 현재 위치를 기준으로 한다. 컨테이너가 시작될 때 오프셋이 적용된다. 두 번째는 토픽 배열을 받고 카프카는 group.id
프로퍼티를 기반으로 파티션을 할당한다. 즉, 그룹 전체에 파티션을 배포한다. 세 번째는 정규식표현식 패턴을 사용하여 토픽을 선택한다.
MessageListener
를 컨테이너에 할당하려면 컨테이너를 생성할 때 ContainerProps.setMessageListener
메서드를 사용할 수 있다. 다음 예제에서는 그 방법을 보여준다.
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
containerProps.setMessageListener(
new MessageListener<Integer, String>() {
...
}
);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps());
KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
DefaultKafkaConsumerFactory
를 생성할 때, 위와 같이 프로퍼티만 받는 생성자를 사용하면 키 및 값 Deserializer
클래스가 구성 시 선택된다는 의미다. 또는 Deserializer
인스턴스가 키 및/또는 값에 대한 DefaultKafkaConsumerFactory
생성자에 전달될 수 있다. 이 경우 모든 컨슈머는 동일한 인스턴스를 공유한다. 또 다른 옵션은 각 Consumer
에 대해 별도의 Deserializer
인스턴스를 얻는 데 사용되는 Supplier<Deserializer>
s(버전 2.3부터)를 제공하는 것이다.
DefaultKafkaConsumerFactory<Integer, CustomValue> cf = new DefaultKafkaConsumerFactory<>(consumerProps(), null, () -> new CustomValueDeserializer());
KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
설정할 수 있는 다양한 프로퍼티에 대한 자세한 내용은 Javadoc for ContainerProperties
를 참고하자.
버전 2.1.1부터 logContainerConfig
라는 새로운 프로퍼티를 사용할 수 있다. true
이고 INFO
로깅이 활성화되면 각 리스너 컨테이너는 해당 구성 프로퍼티를 요약하는 로그 메시지를 작성한다.
기본적으로, 토픽 오프셋 커밋의 로깅은 DEBUG
로깅 레벨에서 수행된다. 버전 2.1.2부터 commitLogLevel
이라는 ContainerProperties
의 프로퍼티를 사용하면 이러한 메시지에 대한 로그 레벨을 지정할 수 있다. 예를 들어 로그 레벨을 INFO
로 변경하려면 ContainerProperties.setCommitLogLevel(LogIfLevelEnabled.Level.INFO);
을 사용할 수 있다.
버전 2.2부터 missingTopicsFatal
이라는 새 컨테이너 프로퍼티가 추가됐다(2.3.4부터 기본값: false
). 이렇게 하면 구성된 토픽 중 하나라도 브로커에 없으면 컨테이너가 시작되지 않는다. 컨테이너가 토픽 정규표현식 패턴(regex)을 수신하도록 구성된 경우에는 적용되지 않는다. 이전에는 많은 메시지를 기록하는 동안 토픽이 나타날 때까지 기다리는 Consumer.poll()
메서드 내에서 컨테이너 스레드가 반복됐다. 로그 외에는 문제가 있다는 징후가 없었다.
버전 2.8부터 새로운 컨테이너 프로퍼티 authExceptionRetryInterval
이 도입됐다. 이로 인해 컨테이너는 KafkaConsumer
에서 AuthenticationException
또는 AuthorizationException
을 가져온 후 메시지 가져오기를 재시도한다. 예를 들어, 구성된 사용자가 특정 토픽을 읽을 수 있는 접근이 거부되었거나 자격 증명이 잘못된 경우 이러한 상황이 발생할 수 있다. authExceptionRetryInterval
을 정의하면 적절한 권한이 부여될 때 컨테이너가 복구될 수 있다.
기본적으로 인터벌(interval)은 구성되지 않는다. 인증 및 권한 부여 오류는 치명적인 것으로 간주되어 컨테이너가 중지된다.
버전 2.8부터 컨슈머 팩토리를 생성할 때 디시리얼라이저를 객체로 제공하는 경우(생성자에서 또는 setter를 통해) 팩토리는 프로퍼티로 구성하기 위해 configure()
메서드를 호출한다.
ConcurrentMessageListenerContainer 사용(Using ConcurrentMessageListenerContainer)
단일 생성자는 KafkaListenerContainer
생성자와 유사하다. 다음 목록은 생성자의 시그니처(signature)를 보여준다.
public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory, ContainerProperties containerProperties)
동시성(concurrency) 프로퍼티도 있다. 예를 들어, Container.setConcurrency(3)
는 KafkaMessageListenerContainer
인스턴스 3개를 생성한다.
첫 번째 생성자의 경우 카프카는 그룹 관리 기능을 사용하여 컨슈머에게 파티션을 제공한다.
important
여러 토픽을 청취할 때 기본 파티션 분포가 예상한 것과 다를 수 있다. 예를 들어, 각각 5개의 파티션이 있는 3개의 토픽이 있고 concurrency=15
를 사용하려는 경우, 각 토픽에서 하나의 파티션이 할당된 5개는 활성 컨슈머로 표시되고 나머지 10개의 컨슈머는 유휴 상태가 된다. 이는 기본 카프카 PartitionAssignor
가 RangeAssignor
(Javadoc 참고)이기 때문이다. 이 시나리오에서는 대신 모든 컨슈머에게 파티션을 배포하는 RoundRobinAssignor
사용을 고려할 수 있다. 그런 다음 각 컨슈머에게 하나의 토픽 또는 파티션이 할당된다. PartitionAssignor
를 변경하려면 DefaultKafkaConsumerFactory
에 제공된 프로퍼티에서 partition.location.strategy
컨슈머 프로퍼티(ConsumerConfigs.PARTITION_ASSIGNMENT_STRATEGY_CONFIG
)을 설정할 수 있다.
스프링 부트를 사용할 때 다음과 같이 전략을 할당할 수 있다.
spring.kafka.consumer.properties.partition.assignment.strategy=\
org.apache.kafka.clients.consumer.RoundRobinAssignor
컨테이너 프로퍼티가 TopicPartitionOffsets
로 구성되면 ConcurrentMessageListenerContainer
는 위임 KafkaMessageListenerContainer
인스턴스 전체에 TopicPartitionOffset
인스턴스를 배포한다.
예를 들어, 6개의 TopicPartitionOffset
인스턴스가 제공되고 concurrency
는 3
이다. 각 컨테이너는 두 개의 파티션을 갖는다. 5개의 TopicPartitionOffset
인스턴스인 경우, 두 개의 컨테이너는 두 개의 파티션을 얻고 세 번째 컨테이너는 하나를 얻는다. concurrency
이 TopicPartitions
수보다 크면 각 컨테이너가 하나의 파티션을 갖도록 concurrency
가 조정된다.
client.id
프로퍼티(설정된 경우)에는 -n
이 추가된다. 여기서 n
은 동시성(concurrency)에 해당하는 컨슈머 인스턴스다. 이는 JMX가 활성화된 경우 MBean에 고유한 이름을 제공하는 데 필요하다.
버전 1.3부터 MessageListenerContainer
는 기본 KafkaConsumer
메트릭에 대한 접근을 제공한다. ConcurrentMessageListenerContainer
의 경우 metrics()
메소드는 모든 대상 KafkaMessageListenerContainer
인스턴스에 대한 지표를 반환한다. 메트릭은 기본 KafkaConsumer
에 제공된 client-id
Map<MetricName, ? extends Metric>
로 그룹화된다.
2.3부터 ContainerProperties
는 리스너 컨테이너의 기본 루프가 KafkaConsumer.poll()
호출 사이에 휴면 상태가 변하는 버전 BetweenPolls
옵션을 제공한다. 실제 슬립 인터벌(sleep interva) 처리는 옵션의 최소 값과 max.poll.interval.ms
컨슈머 구성과 현재 처리 시간차이로 선택된다.
오프셋 커밋(Committing Offsets)
오프셋 커밋을 위한 여러 옵션이 제공된다. enable.auto.commit
컨슈머 프로퍼티가 true
인 경우 카프카는 해당 구성에 따라 오프셋을 자동 커밋한다. false
인 경우 컨테이너는 여러 AckMode
설정을 지원한다(다음 목록에 설명되어 있음). 기본 AckMode
는 BATCH
다. 버전 2.3부터 프레임워크는 구성에 명시적으로 설정되지 않는 한 enable.auto.commit
을 false
로 설정한다. 이전에는 프로퍼티가 설정되지 않은 경우 카프카 기본값(true
)이 사용됐다.
컨슈머 poll()
메서드는 하나 이상의 ConsumerRecord
를 반환한다. MessageListener
는 각 레코드에 대해 호출된다. 다음 목록은 각 AckMode
에 대해 컨테이너가 수행하는 작업을 설명한다(트랜잭션이 사용되지 않는 경우).
RECORD
: 레코드 처리 후 리스너가 반환되면 오프셋을 커밋한다.BATCH
:poll()
에서 반환된 모든 레코드가 처리되면 오프셋을 커밋한다.TIME
: 마지막 커밋 이후ackTime
이 초과된 동안poll()
에서 반환된 모든 레코드가 처리되면 오프셋을 커밋한다.COUNT
: 마지막 커밋 이후ackCount
레코드가 수신된 경우poll()
에서 반환된 모든 레코드가 처리되면 오프셋을 커밋한다.COUNT_TIME
:TIME
및COUNT
와 유사하지만 두 조건 중 하나가true
인 경우 커밋이 수행된다.MANUAL
: 메시지 리스너는Acknowledgement
에acknowledge()
를 담당한다. 이후에는BATCH
와 동일한 의미가 적용됩니다.MANUAL_IMMEDIATE
: 리스너가Acknowledgement.acknowledge()
메서드를 호출하면 즉시 오프셋을 커밋한다.
트랜잭션을 사용할 때, 오프셋은 트랜잭션으로 전송되며 의미 체계는 리스너 타입(레코드 또는 일괄 처리)에 따라 RECORD
또는 BATCH
와 동일하다.
MANUAL
및 MANUAL_IMMEDIATE
에서는 리스너가 AcknowledgeingMessageListener
또는 BatchAcknowledgeingMessageListener
여야 한다. 메시지 리스너를 참고하자.
syncCommits
컨테이너 프로퍼티에 따라 컨슈머의 commitSync()
또는 commitAsync()
메서드가 사용된다. syncCommits
는 기본적으로 true
다. setSyncCommitTimeout
도 참고하자. 비동기 커밋 결과를 얻으려면 setCommitCallback
을 참고하자. 기본 콜백은 오류(및 디버그 수준의 성공)를 기록하는 LoggingCommitCallback
다.
리스너 컨테이너에는 오프셋 커밋을 위한 자체 메커니즘이 있으므로 카프카 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
가 false
인 것을 선호한다. 버전 2.3부터는 컨슈머 팩토리에서 특별히 설정하거나 컨테이너의 컨슈머 프로퍼티를 오버라이드하지 않는 한 무조건 false
로 설정된다.
Acknowledgment
에는 다음과 같은 메소드가 있다.
public interface Acknowledgment {
void acknowledge();
}
이 방법을 사용하면 오프셋이 커밋되는 시점을 리스너가 제어할 수 있다.
버전 2.3부터 Acknowledgement
인터페이스에는 nack(long sleep)
및 nack(int index, long sleep)
두 가지 추가 메서드가 있다. 첫 번째는 레코드 리스너와 함께 사용되고 두 번째는 배치 리스너와 함께 사용된다. 리스너 타입에 대해 잘못된 메소드를 호출하면 IllegalStateException
이 발생한다.
nack()
을 사용하여 부분 배치(partial batch)를 커밋하려면, 트랜잭션을 사용할 때 AckMode
를 MANUAL
로 설정하자. nack()
를 호출하면 성공적으로 처리된 레코드의 오프셋이 트랜잭션에 전송된다.
nack()
은 리스너를 호출하는 컨슈머 스레드에서만 호출할 수 있다.
Out of Order Commit
을 사용할 때는 nack()
이 허용되지 않는다.
레코드 리스너를 사용하면, nack()
가 호출될 때, 보류 중인 모든 오프셋이 커밋되고, 마지막 폴링의 나머지 레코드가 삭제되며, 실패한 레코드와 처리되지 않은 레코드가 다음 poll()
시 다시 전달되도록 해당 파티션에서 검색이 수행된다. sleep
아규먼트를 설정하여, 재전송 전에 컨슈머를 일시 중지할 수 있다. 이는 컨테이너가 DefaultErrorHandler
로 구성될 때 예외를 발생시키는 것과 유사한 기능이다.
nack()
은 할당된 모든 파티션을 포함하여 지정된 슬립 기간 동안 전체 리스너를 일시 중지한다.
배치 리스너를 사용할 때, 배치 내에서 실패가 발생한 인덱스를 지정할 수 있다. nack()
이 호출되면 실패하고 폐기된 레코드에 대한 파티션에서 인덱스 및 검색이 수행되기 전에 레코드에 대해 오프셋이 커밋되어 다음 poll()
에서 다시 전달된다.
자세한 내용은 컨테이너 오류 핸들러를 참고하자.
컨슈머는 슬립 중 일시 중단되므로 컨슈머를 계속 유지하기 위해 브로커를 계속 폴링한다. 실제 슬립 시간과 해당 resolution
은 기본값이 5초인 컨테이너의 pollTimeout
에 따라 다르다. 최소 슬립 시간은 pollTimeout
과 동일하며 모든 슬립 시간은 이것의 배수가 된다. 짧은 슬립 시간의 경우 또는 정확도를 높이려면 컨테이너의 pollTimeout
을 줄이는 것이 좋다.
버전 3.0.10부터, 배치 리스너는 Acknowledgement
아규먼트에 대한 acknowledge(index)
를 사용하여, 배치 부분의 오프셋을 커밋할 수 있다. 이 메소드가 호출되면, 인덱스에 있는 레코드(및 모든 이전 레코드)의 오프셋이 커밋된다. 부분 배치 커밋이 수행된 후 acknowledge()
을 호출하면 배치의 나머지 부분에 대한 오프셋이 커밋된다. 다음 제한 사항이 적용된다.
- AckMode.MANUAL_IMMEDIATE가 필요하다.
- 메서드는 리스너 스레드에서 호출되어야 한다.
- 리스너는 원시 ConsumerRecords가 아닌 목록을 사용해야 한다.
- 인덱스느 목록의 엘리먼트 범위 내에 있어야 한다.
- 인덱스는 이전 호출에 사용된 인덱스보다 커야 한다.
이러한 제한 사항은 적용되며 메서드는 위반 사항에 따라 IllegalArgumentException
또는 IllegalStateException
을 발생시킨다.