토픽 구성(Configuring Topics)
애플리케이션 컨텍스트에서 KafkaAdmin
빈을 정의하면 자동으로 브로커에 토픽를 추가할 수 있다. 이를 위해 각 토픽에 대한 NewTopic
@Bean
을 애플리케이션 컨텍스트에 추가할 수 있다. 버전 2.3에서는 이러한 빈을 보다 편리하게 생성할 수 있도록 새로운 클래스 TopicBuilder
를 도입했다. 다음 예제에서는 그 방법을 보여준다.
자바
@Bean
public KafkaAdmin admin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
return new KafkaAdmin(configs);
}
@Bean
public NewTopic topic1() {
return TopicBuilder.name("thing1")
.partitions(10)
.replicas(3)
.compact()
.build();
}
@Bean
public NewTopic topic2() {
return TopicBuilder.name("thing2")
.partitions(10)
.replicas(3)
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
.build();
}
@Bean
public NewTopic topic3() {
return TopicBuilder.name("thing3")
.assignReplicas(0, List.of(0, 1))
.assignReplicas(1, List.of(1, 2))
.assignReplicas(2, List.of(2, 0))
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
.build();
}
코틀린
@Bean
fun admin() = KafkaAdmin(mapOf(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG to "localhost:9092"))
@Bean
fun topic1() =
TopicBuilder.name("thing1")
.partitions(10)
.replicas(3)
.compact()
.build()
@Bean
fun topic2() =
TopicBuilder.name("thing2")
.partitions(10)
.replicas(3)
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
.build()
@Bean
fun topic3() =
TopicBuilder.name("thing3")
.assignReplicas(0, Arrays.asList(0, 1))
.assignReplicas(1, Arrays.asList(1, 2))
.assignReplicas(2, Arrays.asList(2, 0))
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
.build()
버전 2.6부터 partitions()
및/또는 replicas()
를 생략할 수 있으며 해당 프로퍼티에는 브로커 기본값이 적용된다. 이 기능을 지원하려면 브로커 버전이 2.4.0
이상이어야 한다. KIP-464
를 참고하자.
자바
@Bean
public NewTopic topic4() {
return TopicBuilder.name("defaultBoth")
.build();
}
@Bean
public NewTopic topic5() {
return TopicBuilder.name("defaultPart")
.replicas(1)
.build();
}
@Bean
public NewTopic topic6() {
return TopicBuilder.name("defaultRepl")
.partitions(3)
.build();
}
코틀린
@Bean
fun topic4() = TopicBuilder.name("defaultBoth").build()
@Bean
fun topic5() = TopicBuilder.name("defaultPart").replicas(1).build()
@Bean
fun topic6() = TopicBuilder.name("defaultRepl").partitions(3).build()
버전 2.7부터, 단일 KafkaAdmin.NewTopics
빈 정의에서 여러 NewTopics
를 선언할 수 있다.
자바
@Bean
public KafkaAdmin.NewTopics topics456() {
return new NewTopics(
TopicBuilder.name("defaultBoth")
.build(),
TopicBuilder.name("defaultPart")
.replicas(1)
.build(),
TopicBuilder.name("defaultRepl")
.partitions(3)
.build());
}
코틀린
@Bean
fun topics456() = KafkaAdmin.NewTopics(
TopicBuilder.name("defaultBoth")
.build(),
TopicBuilder.name("defaultPart")
.replicas(1)
.build(),
TopicBuilder.name("defaultRepl")
.partitions(3)
.build()
)
스프링 부트를 사용할 때
KafkaAdmin
빈이 자동으로 등록되므로NewTopic
(및/또는NewTopics
)@Beans
만 필요하다..
기본적으로, 브로커를 사용할 수 없는 경우 메시지가 기록되지만 컨텍스트는 계속 로드된다. 나중에 재시도를 위해 admin의 initialize()
메서드를 호출할 수 있다. 이 조건을 치명적인 것으로 간주하려면 admin의 fatalIfBrokerNotAvailable
프로퍼티를 true
로 설정하자. 그러면 컨텍스트가 초기화되지 않는다.
브로커가 이를 지원하는 경우(1.0.0 이상) 기존 토픽에 NewTopic.numPartitions
보다 적은 수의 파티션이 있는 것으로 확인되면 관리자는 파티션 수를 늘린다.
버전 2.7부터 KafkaAdmin
은 런타임에 토픽을 생성하고 검사하는 메서드를 제공한다.
createOrModifyTopics
describeTopics
고급 기능을 사용하려면, AdminClient
를 직접 사용할 수 있다. 다음 예제에서는 그 방법을 보여준다.
@Autowired
private KafkaAdmin admin;
...
AdminClient client = AdminClient.create(admin.getConfigurationProperties());
...
client.close();
버전 2.9.10, 3.0.9부터 특정 NewTopic
빈을 생성 또는 수정해야 하는지 여부를 결정하는 데 사용할 수 있는 Predicate<NewTopic>
을 제공할 수 있다. 예를 들어, 서로 다른 클러스터를 가리키는 KafkaAdmin
인스턴스가 여러 개 있고 각 admin이 생성하거나 수정해야 하는 토픽를 선택하려는 경우에 유용하다.