카프카에 연결(Connecting to Kafka)
KafkaAdmin - 토픽 구성을 참고하자.
ProducerFactory - 메세지 전송을 참고하자.
ConsumerFactory - 메세지 받기를 참고하자.
버전 2.5부터 이들 각각은 KafkaResourceFactory를 상속한다. 이를 통해 구성에 Supplier<String>을 추가하여 런타임 시 부트스트랩 서버를 변경할 수 있다. setBootstrapServersSupplier(() -> …). 이는 서버 목록을 가져오기 위해 모든 새 커넥션에 대해 호출된다. 컨슈머와 프로듀서는 일반적으로 수명이 길다. 기존 프로듀서를 닫으려면 DefaultKafkaProducerFactory에서 reset()을 호출하자. 기존 컨슈머를 닫으려면 KafkaListenerEndpointRegistry에서 stop()(그런 다음 start())을 호출하거나 다른 리스너 컨테이너 빈에서 stop() 및 start()를 호출한다.
편의를 위해, 프레임워크는 두 세트의 부트스트랩 서버를 지원하는 ABSwitchCluster도 제공한다. 그 중 하나는 언제든지 활성화된다. setBootstrapServersSupplier()를 호출하여 ABSwitchCluster를 구성하고 프로듀서 및 컨슈머 팩토리와 KafkaAdmin를 추가한다. 전환하려는 경우, primary() 또는 secondary()를 호출하고 프로듀서 팩토리에서 reset()을 호출하여 새 커넥션을 설정한다. 컨슈머의 경우 모든 리스너 컨테이너를 stop() 및 start()한다. @KafkaListeners를 사용하는 경우 KafkaListenerEndpointRegistry 빈을 stop() 및 start()한다.
자세한 내용은 자바독(Javadoc)을 참고하자.
팩토리 리스너(Factory Listeners)
버전 2.5부터 프로듀서 또는 컨슈머가 생성되거나 닫힐 때마다 알림을 받도록 DefaultKafkaProducerFactory 및 DefaultKafkaConsumerFactory를 리스너로 구성할 수 있다.
Producer Factory Listener
interface Listener<K, V> {
default void producerAdded(String id, Producer<K, V> producer) {
}
default void producerRemoved(String id, Producer<K, V> producer) {
}
}
Consumer Factory Listener
interface Listener<K, V> {
default void consumerAdded(String id, Consumer<K, V> consumer) {
}
default void consumerRemoved(String id, Consumer<K, V> consumer) {
}
}
각 경우에, ID는 client-id 프로퍼티(생성 후 metrics()에서 가져옴)을 팩토리 beanName 프로퍼티에 ..로 구분하고 추가하여 생성된다.
예를 들어, 이러한 리스너는 새 클라이언트가 생성될 때 마이크로미터 KafkaClientMetrics 인스턴스를 생성하고 바인딩하는 데 사용할 수 있다(클라이언트가 닫힐 때 닫는다).
프레임워크는 정확히 이를 수행하는 리스너를 제공한다. 마이크로미터 네이티브 메트릭을 참고하자.