카프카에 연결(Connecting to Kafka)
KafkaAdmin
- 토픽 구성을 참고하자.
ProducerFactory
- 메세지 전송을 참고하자.
ConsumerFactory
- 메세지 받기를 참고하자.
버전 2.5부터 이들 각각은 KafkaResourceFactory
를 상속한다. 이를 통해 구성에 Supplier<String>
을 추가하여 런타임 시 부트스트랩 서버를 변경할 수 있다. setBootstrapServersSupplier(() -> …)
. 이는 서버 목록을 가져오기 위해 모든 새 커넥션에 대해 호출된다. 컨슈머와 프로듀서는 일반적으로 수명이 길다. 기존 프로듀서를 닫으려면 DefaultKafkaProducerFactory
에서 reset()
을 호출하자. 기존 컨슈머를 닫으려면 KafkaListenerEndpointRegistry
에서 stop()
(그런 다음 start()
)을 호출하거나 다른 리스너 컨테이너 빈에서 stop()
및 start()
를 호출한다.
편의를 위해, 프레임워크는 두 세트의 부트스트랩 서버를 지원하는 ABSwitchCluster
도 제공한다. 그 중 하나는 언제든지 활성화된다. setBootstrapServersSupplier()
를 호출하여 ABSwitchCluster
를 구성하고 프로듀서 및 컨슈머 팩토리와 KafkaAdmin
를 추가한다. 전환하려는 경우, primary()
또는 secondary()
를 호출하고 프로듀서 팩토리에서 reset()
을 호출하여 새 커넥션을 설정한다. 컨슈머의 경우 모든 리스너 컨테이너를 stop()
및 start()
한다. @KafkaListeners
를 사용하는 경우 KafkaListenerEndpointRegistry
빈을 stop()
및 start()
한다.
자세한 내용은 자바독(Javadoc)을 참고하자.
팩토리 리스너(Factory Listeners)
버전 2.5부터 프로듀서 또는 컨슈머가 생성되거나 닫힐 때마다 알림을 받도록 DefaultKafkaProducerFactory
및 DefaultKafkaConsumerFactory
를 리스너로 구성할 수 있다.
Producer Factory Listener
interface Listener<K, V> {
default void producerAdded(String id, Producer<K, V> producer) {
}
default void producerRemoved(String id, Producer<K, V> producer) {
}
}
Consumer Factory Listener
interface Listener<K, V> {
default void consumerAdded(String id, Consumer<K, V> consumer) {
}
default void consumerRemoved(String id, Consumer<K, V> consumer) {
}
}
각 경우에, ID는 client-id
프로퍼티(생성 후 metrics()
에서 가져옴)을 팩토리 beanName
프로퍼티에 ..로 구분하고 추가하여 생성된다.
예를 들어, 이러한 리스너는 새 클라이언트가 생성될 때 마이크로미터 KafkaClientMetrics
인스턴스를 생성하고 바인딩하는 데 사용할 수 있다(클라이언트가 닫힐 때 닫는다).
프레임워크는 정확히 이를 수행하는 리스너를 제공한다. 마이크로미터 네이티브 메트릭을 참고하자.