카프카에 연결(Connecting to Kafka)

KafkaAdmin - 토픽 구성을 참고하자.

ProducerFactory - 메세지 전송을 참고하자.

ConsumerFactory - 메세지 받기를 참고하자.

버전 2.5부터 이들 각각은 KafkaResourceFactory를 상속한다. 이를 통해 구성에 Supplier<String>을 추가하여 런타임 시 부트스트랩 서버를 변경할 수 있다. setBootstrapServersSupplier(() -> …​). 이는 서버 목록을 가져오기 위해 모든 새 커넥션에 대해 호출된다. 컨슈머와 프로듀서는 일반적으로 수명이 길다. 기존 프로듀서를 닫으려면 DefaultKafkaProducerFactory에서 reset()을 호출하자. 기존 컨슈머를 닫으려면 KafkaListenerEndpointRegistry에서 stop()(그런 다음 start())을 호출하거나 다른 리스너 컨테이너 빈에서 stop()start()를 호출한다.

편의를 위해, 프레임워크는 두 세트의 부트스트랩 서버를 지원하는 ABSwitchCluster도 제공한다. 그 중 하나는 언제든지 활성화된다. setBootstrapServersSupplier()를 호출하여 ABSwitchCluster를 구성하고 프로듀서 및 컨슈머 팩토리와 KafkaAdmin를 추가한다. 전환하려는 경우, primary() 또는 secondary()를 호출하고 프로듀서 팩토리에서 reset()을 호출하여 새 커넥션을 설정한다. 컨슈머의 경우 모든 리스너 컨테이너를 stop()start()한다. @KafkaListeners를 사용하는 경우 KafkaListenerEndpointRegistry 빈을 stop()start()한다.

자세한 내용은 자바독(Javadoc)을 참고하자.

팩토리 리스너(Factory Listeners)

버전 2.5부터 프로듀서 또는 컨슈머가 생성되거나 닫힐 때마다 알림을 받도록 DefaultKafkaProducerFactoryDefaultKafkaConsumerFactory를 리스너로 구성할 수 있다.

Producer Factory Listener

interface Listener<K, V> {

    default void producerAdded(String id, Producer<K, V> producer) {
    }

    default void producerRemoved(String id, Producer<K, V> producer) {
    }

}

Consumer Factory Listener

interface Listener<K, V> {

    default void consumerAdded(String id, Consumer<K, V> consumer) {
    }

    default void consumerRemoved(String id, Consumer<K, V> consumer) {
    }

}

각 경우에, ID는 client-id 프로퍼티(생성 후 metrics()에서 가져옴)을 팩토리 beanName 프로퍼티에 ..로 구분하고 추가하여 생성된다.

예를 들어, 이러한 리스너는 새 클라이언트가 생성될 때 마이크로미터 KafkaClientMetrics 인스턴스를 생성하고 바인딩하는 데 사용할 수 있다(클라이언트가 닫힐 때 닫는다).

프레임워크는 정확히 이를 수행하는 리스너를 제공한다. 마이크로미터 네이티브 메트릭을 참고하자.