토픽 구성(Configuring Topics)

애플리케이션 컨텍스트에서 KafkaAdmin 빈을 정의하면 자동으로 브로커에 토픽를 추가할 수 있다. 이를 위해 각 토픽에 대한 NewTopic @Bean을 애플리케이션 컨텍스트에 추가할 수 있다. 버전 2.3에서는 이러한 빈을 보다 편리하게 생성할 수 있도록 새로운 클래스 TopicBuilder를 도입했다. 다음 예제에서는 그 방법을 보여준다.

자바

@Bean
public KafkaAdmin admin() {
    Map<String, Object> configs = new HashMap<>();
    configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    return new KafkaAdmin(configs);
}

@Bean
public NewTopic topic1() {
    return TopicBuilder.name("thing1")
            .partitions(10)
            .replicas(3)
            .compact()
            .build();
}

@Bean
public NewTopic topic2() {
    return TopicBuilder.name("thing2")
            .partitions(10)
            .replicas(3)
            .config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
            .build();
}

@Bean
public NewTopic topic3() {
    return TopicBuilder.name("thing3")
            .assignReplicas(0, List.of(0, 1))
            .assignReplicas(1, List.of(1, 2))
            .assignReplicas(2, List.of(2, 0))
            .config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
            .build();
}

코틀린

@Bean
fun admin() = KafkaAdmin(mapOf(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG to "localhost:9092"))

@Bean
fun topic1() =
    TopicBuilder.name("thing1")
        .partitions(10)
        .replicas(3)
        .compact()
        .build()

@Bean
fun topic2() =
    TopicBuilder.name("thing2")
        .partitions(10)
        .replicas(3)
        .config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
        .build()

@Bean
fun topic3() =
    TopicBuilder.name("thing3")
        .assignReplicas(0, Arrays.asList(0, 1))
        .assignReplicas(1, Arrays.asList(1, 2))
        .assignReplicas(2, Arrays.asList(2, 0))
        .config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
        .build()

버전 2.6부터 partitions() 및/또는 replicas()를 생략할 수 있으며 해당 프로퍼티에는 브로커 기본값이 적용된다. 이 기능을 지원하려면 브로커 버전이 2.4.0 이상이어야 한다. KIP-464를 참고하자.

자바

@Bean
public NewTopic topic4() {
    return TopicBuilder.name("defaultBoth")
            .build();
}

@Bean
public NewTopic topic5() {
    return TopicBuilder.name("defaultPart")
            .replicas(1)
            .build();
}

@Bean
public NewTopic topic6() {
    return TopicBuilder.name("defaultRepl")
            .partitions(3)
            .build();
}

코틀린

@Bean
fun topic4() = TopicBuilder.name("defaultBoth").build()

@Bean
fun topic5() = TopicBuilder.name("defaultPart").replicas(1).build()

@Bean
fun topic6() = TopicBuilder.name("defaultRepl").partitions(3).build()

버전 2.7부터, 단일 KafkaAdmin.NewTopics 빈 정의에서 여러 NewTopics를 선언할 수 있다.

자바

@Bean
public KafkaAdmin.NewTopics topics456() {
    return new NewTopics(
            TopicBuilder.name("defaultBoth")
                .build(),
            TopicBuilder.name("defaultPart")
                .replicas(1)
                .build(),
            TopicBuilder.name("defaultRepl")
                .partitions(3)
                .build());
}

코틀린

@Bean
fun topics456() = KafkaAdmin.NewTopics(
    TopicBuilder.name("defaultBoth")
        .build(),
    TopicBuilder.name("defaultPart")
        .replicas(1)
        .build(),
    TopicBuilder.name("defaultRepl")
        .partitions(3)
        .build()
)

스프링 부트를 사용할 때 KafkaAdmin 빈이 자동으로 등록되므로 NewTopic(및/또는 NewTopics) @Beans만 필요하다..

기본적으로, 브로커를 사용할 수 없는 경우 메시지가 기록되지만 컨텍스트는 계속 로드된다. 나중에 재시도를 위해 admin의 initialize() 메서드를 호출할 수 있다. 이 조건을 치명적인 것으로 간주하려면 admin의 fatalIfBrokerNotAvailable 프로퍼티를 true로 설정하자. 그러면 컨텍스트가 초기화되지 않는다.

브로커가 이를 지원하는 경우(1.0.0 이상) 기존 토픽에 NewTopic.numPartitions보다 적은 수의 파티션이 있는 것으로 확인되면 관리자는 파티션 수를 늘린다.

버전 2.7부터 KafkaAdmin은 런타임에 토픽을 생성하고 검사하는 메서드를 제공한다.

  • createOrModifyTopics
  • describeTopics

고급 기능을 사용하려면, AdminClient를 직접 사용할 수 있다. 다음 예제에서는 그 방법을 보여준다.

@Autowired
private KafkaAdmin admin;
    ...
    AdminClient client = AdminClient.create(admin.getConfigurationProperties());
    ...
    client.close();

버전 2.9.10, 3.0.9부터 특정 NewTopic 빈을 생성 또는 수정해야 하는지 여부를 결정하는 데 사용할 수 있는 Predicate<NewTopic>을 제공할 수 있다. 예를 들어, 서로 다른 클러스터를 가리키는 KafkaAdmin 인스턴스가 여러 개 있고 각 admin이 생성하거나 수정해야 하는 토픽를 선택하려는 경우에 유용하다.