목차

  • 애플리케이션 테스트(Testing Applications)
  • 임베디드 카프카 브로커(Embedded Kafka Broker)
  • KafkaTestUtils
  • JUnit
  • 토픽 구성(Configuring Topics)
  • 여러 테스트 클래스에 동일한 브로커 사용(Using the Same Broker(s) for Multiple Test Classes)
  • @EmbeddedKafka 어노테이션(@EmbeddedKafka Annotation)
  • JUnit5를 사용한 @EmbeddedKafka 어노테이션(@EmbeddedKafka Annotation with JUnit5)
  • @SpringBootTest 어노테이션의 임베디드 브로커(Embedded Broker in @SpringBootTest Annotations)
    • JUnit4 Class Rule
    • @EmbeddedKafka 어노테이션 또는 EmbeddedKafkaBroker 빈(@EmbeddedKafka Annotation or EmbeddedKafkaBroker Bean)
  • 햄크레스트 매처스(Hamcrest Matchers)
  • 어썰트제이 컨디션(AssertJ Conditions)
  • 예제
  • 목 컨슈머와 프로듀서(Mock Consumer and Producer)

애플리케이션 테스트(Testing Applications)

spring-kafka-test jar에는 애플리케이션 테스트에 도움이 되는 몇 가지 유용한 유틸리티가 있다.

임베디드 카프카 브로커(Embedded Kafka Broker)

두 가지 구현체가 있다.

  • EmbeddedKafkaZKBroker - 임베디드 주키퍼(Zookeeper) 인스턴스를 시작하는 레거시 구현체다.
  • EmbeddedKafkaKraftBroker - (기본값) 결합된 컨트롤러 및 브로커 모드에서 주키퍼 대신 크래프트(Kraft)를 사용한다 (3.1부터 적용).

다음 절에서 설명하는 것처럼 브로커를 구성하는 몇 가지 기술이 있다.

KafkaTestUtils

org.springframework.kafka.test.utils.KafkaTestUtils는 레코드를 사용하고, 다양한 레코드 오프셋을 검색하는 등의 다양한 정적 헬퍼 메서드를 제공한다. 자세한 내용은 해당 Javadoc을 참고하자.

JUnit

org.springframework.kafka.test.utils.KafkaTestUtils는 프로듀서 및 컨슈머 프로퍼티를 설정하는 몇 가지 정적 메서드도 제공한다. 다음 목록은 해당 메서드의 시그니처를 보여준다.

/**
 * {@code <Integer, String>} 컨슈머에 대한 테스트 프로퍼티를 설정한다.
 * @param group 그룹 id.
 * @param autoCommit 오토커밋.
 * @param embeddedKafka {@link EmbeddedKafkaBroker} 인스턴스.
 * @return 프로퍼티.
 */
public static Map<String, Object> consumerProps(String group, String autoCommit, EmbeddedKafkaBroker embeddedKafka) { ... }

/**
 * {@code <Integer, String>} 프로듀서에 대한 테스트 프로퍼티를 설정한다.
 * @param embeddedKafka {@link EmbeddedKafkaBroker} 인스턴스.
 * @return 프로퍼티.
 */
public static Map<String, Object> producerProps(EmbeddedKafkaBroker embeddedKafka) { ... }

노트 버전 2.5부터, ConsumerProps 메소드는 ConsumerConfig.AUTO_OFFSET_RESET_CONFIGearliest으로 설정한다. 대부분의 경우 컨슈머가 테스트에서 전송된 모든 메시지를 소비하기를 원하기 때문이다. ConsumerConfig 기본값은 latest이다. 그래서, 컨슈머 시작 전, 테스트에서 이미 전송된 메시지는 레코드로 수신하지 않는다. 이전 동작으로 되돌리려면, 메서드를 호출한 후 프로퍼티를 latest으로 설정하자.

임베디드 브로커를 사용할 때 혼선을 방지하기 위해, 일반적으로 각 테스트마다 다른 토픽을 사용하는 것이 가장 좋다. 어떤 이유로 이것이 불가능한 경우, ConsumerFromEmbeddedTopics 메소드의 기본 동작은 할당 후 처음부터 할당된 파티션을 찾다는걸 알아두자. 컨슈머 프로퍼티에 대한 접근 권한이 없기 때문에, 시작 대신 끝까지 seekToEnd 부울 파라미터를 사용하는 오버로드된 메서드를 사용해야 한다. ***

임베디드 카프카 및 임베디드 주키퍼 서버를 생성하기 위해 EmbeddedKafkaZKBroker용 JUnit 4 @Rule 래퍼클래스가 제공된다. (JUnit 5에서 @EmbeddedKafka를 사용하는 방법에 대한 자세한 내용은 @EmbeddedKafka 어노테이션을 참고하자.) 다음 목록은 해당 메서드의 시그니처를 보여준다.

/**
 * 임베디드 카프카 브로커를 생성한다.
 * @param count 브로커의 숫자다.
 * @param controlledShutdown TestUtils.createBrokerConfig에 전달된다.
 * @param topics 생성할 토픽(파티션당 2개).
 */
public EmbeddedKafkaRule(int count, boolean controlledShutdown, String... topics) { ... }

/**
 *
 * 임베디드 카프카 브로커를 생성한다.
 * @param count 브로커의 숫자다.
 * @param controlledShutdown TestUtils.createBrokerConfig에 전달된다.
 * @param partitions 토픽별 파티션
 * @param topics 만들 토픽
 */
public EmbeddedKafkaRule(int count, boolean controlledShutdown, int partitions, String... topics) { ... }

EmbeddedKafkaKraftBroker는 JUnit4에서 지원되지 않는다.

EmbeddedKafkaBroker 클래스에는 생성된 모든 토픽을 사용할 수 있는 유틸리티 메서드가 있다. 다음 예제는 사용 방법을 보여준다.

Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testT", "false", embeddedKafka);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
Consumer<Integer, String> consumer = cf.createConsumer();
embeddedKafka.consumeFromAllEmbeddedTopics(consumer);

KafkaTestUtils에는 컨슈머로부터 결과를 가져오는 몇 가지 유틸리티 메서드가 있다. 다음은 해당 메서드 시그니처를 보여준다.

/**
 * 지정된 토픽에 대해 단일 레코드를 예상하여, 소비자를 폴링한다.
 * @param consumer 컨슈머.
 * @param topic 토픽.
 * @return 레코드.
 * @throws org.junit.ComparisonFailure 정확히 하나의 레코드도 수신되지 않은 경우.
 */
public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consumer, String topic) { ... }

/**
 * 소비자에게 레코드를 폴링한다.
 * @param consumer 컨슈머.
 * @return 레코드.
 */
public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer) { ... }

다음 예제는 KafkaTestUtils 사용방법을 보여준다.

...
template.sendDefault(0, 2, "bar");
ConsumerRecord<Integer, String> received = KafkaTestUtils.getSingleRecord(consumer, "topic");
...

임베디드 카프카 및 임베디드 주키퍼 서버가 EmbeddedKafkaBroker에 의해 시작되면, spring.embedded.kafka.brokers라는 시스템 프로퍼티가 카프카 브로커의 주소로 설정되고 spring.embedded.zookeeper.connect라는 시스템 프로퍼티가 주키퍼 주소로 설정된다. 이 프로퍼티는 편리한 상수(EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERSEmbeddedKafkaBroker.SPRING_EMBEDDED_ZOOKEEPER_CONNECT)가 제공된다.

기본 spring.embedded.kafka.brokers 시스템 프로퍼티 대신, 임의의 편리한 카프카 브로커의 주소를 프로퍼티에 노출할 수 있다. 이를 위해 임베디드 카프카를 시작하기 전 spring.embedded.kafka.brokers.property(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY) 시스템 프로퍼티를 설정해야한다. 예를 들어, 스프링부트를 사용하면 카프카 클라이언트 자동 구성을 위해 spring.kafka.bootstrap-servers 구성 프로퍼티를 각각 설정할것이다. 따라서 임의의 포트에서 임베디드 카프카로 테스트를 실행하기 전에 spring.embedded.kafka.brokers.property=spring.kafka.bootstrap-servers를 시스템 프로퍼티로 설정할 수 있으며, EmbeddedKafkaBroker는 이를 사용하여 브로커 주소를 노출한다. 이는 이제 이 프로퍼티의 기본값이다(버전 3.0.10부터).

EmbeddedKafkaBroker.brokerProperties(Map<String, String>)를 사용하면 카프카 서버에 대한 추가 프로퍼티를 제공할 수 있다. 사용 가능한 브로커 프로퍼티에 대한 자세한 내용은 카프카 구성을 참고하자.

토픽 구성(Configuring Topics)

다음 예제에서 5개의 파티션이 있는 토픽 cathat, 10개의 파티션이 있는 토픽 thing1, 15개의 파티션이 있는 토픽 thing2을 생성한다.

public class MyTests {
    @ClassRule
    private static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, false, 5, "cat", "hat");

    @Test
    public void test() {
        embeddedKafkaRule.getEmbeddedKafka()
            .addTopics(new NewTopic("thing1", 10, (short) 1), new NewTopic("thing2", 15, (short) 1));
        ...
    }
}

기본적으로, addTopics는 문제가 발생하면(예: 이미 존재하는 토픽 추가 등) 예외를 발생시킨다. 버전 2.6에는 Map<String, Exception>;을 반환하는 해당 메서드의 새 버전이 추가됐다. 키는 토픽명이고 값은 성공한 경우 null이고 실패한 경우 Exception이 발생한다.

여러 테스트 클래스에 동일한 브로커 사용(Using the Same Broker(s) for Multiple Test Classes)

다음과 유사한 여러 테스트 클래스에 동일한 브로커를 사용할 수 있다.

public final class EmbeddedKafkaHolder {

    private static EmbeddedKafkaBroker embeddedKafka = new EmbeddedKafkaZKBroker(1, false)
            .brokerListProperty("spring.kafka.bootstrap-servers");

    private static boolean started;

    public static EmbeddedKafkaBroker getEmbeddedKafka() {
        if (!started) {
            try {
                embeddedKafka.afterPropertiesSet();
            } catch (Exception e) {
                throw new KafkaException("Embedded broker failed to start", e);
            }
            started = true;
        }
        return embeddedKafka;
    }

    private EmbeddedKafkaHolder() {
        super();
    }

}

이는 스프링 부트 환경을 가정하고 임베디드 브로커가 부트스트랩 서버 프로퍼티를 대체한다.

그런 다음, 각 테스트 클래스에서 다음과 같이 사용할 수 있다.

static {
    EmbeddedKafkaHolder.getEmbeddedKafka().addTopics("topic1", "topic2");
}

private static final EmbeddedKafkaBroker broker = EmbeddedKafkaHolder.getEmbeddedKafka();

스프링 부트를 사용하지 않는 경우 Broker.getBrokersAsString()을 사용하여 부트스트랩 서버를 얻을 수 있다.

앞의 예제에서 모든 테스트가 완료되면 브로커를 종료하는 메커니즘을 제공하지 않는다. 예를 들어, 그레이들 데몬에서 테스트를 실행하는 경우 문제가 될 수 있다. 이런 상황에서는 이 기술을 사용해서는 안 된다. 또는 테스트가 완료되면 EmbeddedKafkaBroker에서 destroy()를 호출하는 방법을 사용해야 한다.

버전 3.0부터, 프레임워크는 JUnit 플랫폼용 GlobalEmbeddedKafkaTestExecutionListener를 노출한다. 기본적으로 비활성화되어 있다. 이를 위해서는 JUnit 플랫폼 1.8 이상이 필요하다. 이 리스너의 목적은 전체 테스트 계획에 대해 하나의 글로벌 EmbeddedKafkaBroker를 시작하고 계획이 끝나면 중지하는 것이다. 이 리스너를 활성화하여 프로젝트의 모든 테스트에 대해 단일 전역 임베디드 카프카 클러스터를 가지려면 시스템 프로퍼티 또는 JUnit 플랫폼 구성을 통해 spring.kafka.global.embedded.enabled 프로퍼티를 true로 설정해야 한다. 또한 다음 프로퍼티을 제공할 수 있다.

  • spring.kafka.embedded.count - 관리할 카프카 브로커 수
  • spring.kafka.embedded.ports - 시작할 모든 카프카 브로커에 대한 포트(쉼표로 구분된 값), 임의 포트를 선호하는 경우 0이다. 값의 개수는 위에서 언급한 브로커 수와 같아야 한다.
  • spring.kafka.embedded.topics - 시작된 카프카 클러스터에 생성할 토픽(쉼표로 구분된 값)
  • spring.kafka.embedded.partitions - 생성된 토픽을 프로비저닝할 파티션 수
  • spring.kafka.embedded.broker.properties.location - 추가적인 카프카 브로커 구성 프로퍼티에 대한 파일 위치, 이 프로퍼티의 값은 스프링 리소스 추상화 패턴을 따라야 한다.
  • spring.kafka.embedded.kraft - false인 경우 EmbeddedKafkaKraftBroker 대신 EmbeddedKafkaZKBroker를 사용한다.

기본적으로 이러한 프로퍼티는 @EmbeddedKafka 애트리뷰트 중 일부를 모방한다.

JUnit 5 사용자 가이드에서 구성 프로퍼티와 이를 제공하는 방법에 대한 자세한 내용을 참고하자. 예를 들어, spring.embedded.kafka.brokers.property=my.bootstrap-servers 항목을 테스트 클래스패스의 junit-platform.properties 파일에 추가할 수 있다. 버전 3.0.10부터 브로커는 스프링 부트 애플리케이션 테스트를 위해 기본적으로 이를 spring.kafka.bootstrap-servers로 자동 설정한다.

단일 테스트 스위트(test suite)에 전역 임베디드 카프카와 테스트별 클래스를 결합하지 않는 것이 좋다. 둘 다 동일한 시스템 프로퍼티을 공유하므로 예기치 않은 동작이 발생할 가능성이 매우 높다.

spring-kafka-test에는 junit-jupiter-apijunit-platform-launcher(후자는 글로벌 임베디드 브로커를 지원함)에 대한 전이적 의존성(transitive dependencies)이 있다. 임베디드 브로커를 사용하고 JUnit을 사용하지 않는 경우 이러한 의존성을 제외할 수 있다.

@EmbeddedKafka 어노테이션(@EmbeddedKafka Annotation)

일반적으로 테스트 사이에 브로커 시작 및 중지를 방지하려면 규칙을 @ClassRule로 사용하고, 각 테스트마다 다른 토픽를 사용하는 것이 좋다. 버전 2.0부터 스프링의 테스트 애플리케이션 컨텍스트 캐싱을 사용하는 경우 EmbeddedKafkaBroker 빈을 선언할 수도 있으므로 단일 브로커를 여러 테스트 클래스에서 사용할 수 있다. 편의를 위해 EmbeddedKafkaBroker 빈(Bean)을 등록하기 위해 @EmbeddedKafka라는 테스트 클래스 레벨의 어노테이션을 제공한다. 다음 예제에서는 사용 방법을 보여준다.

@RunWith(SpringRunner.class)
@DirtiesContext
@EmbeddedKafka(partitions = 1,
        topics = {
                 KafkaStreamsTests.STREAMING_TOPIC1,
                 KafkaStreamsTests.STREAMING_TOPIC2 
        }
)
public class KafkaStreamsTests {

    @Autowired
    private EmbeddedKafkaBroker embeddedKafka;

    @Test
    public void someTest() {
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGroup", "true", this.embeddedKafka);
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        ConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
        Consumer<Integer, String> consumer = cf.createConsumer();
        this.embeddedKafka.consumeFromAnEmbeddedTopic(consumer, KafkaStreamsTests.STREAMING_TOPIC2);
        ConsumerRecords<Integer, String> replies = KafkaTestUtils.getRecords(consumer);
        assertThat(replies.count()).isGreaterThanOrEqualTo(1);
    }

    @Configuration
    @EnableKafkaStreams
    public static class KafkaStreamsConfiguration {

        @Value("${" + EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS + "}")
        private String brokerAddresses;

        @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
        public KafkaStreamsConfiguration kStreamsConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddresses);
            return new KafkaStreamsConfiguration(props);
        }

    }

}

버전 2.2.4부터 @EmbeddedKafka 어노테이션을 사용하여 카프카 포트 프로퍼티를 지정할 수도 있다.

버전 3.1부터 EmbeddedKafkaKraftBroker 대신 EmbeddedKafkaZKBroker를 사용하려면 kraft 프로퍼티를 false로 설정해야 한다.

다음 예제에서는 @EmbeddedKafka 지원 프로퍼티의 자리표시자로 topics, brokerPropertiesbrokerPropertiesLocation 애트리뷰트를 설정한다.

@TestPropertySource(locations = "classpath:/test.properties")
@EmbeddedKafka(
        topics = { "any-topic", "${kafka.topics.another-topic}" },
        brokerProperties = { 
                            "log.dir=${kafka.broker.logs-dir}",
                            "listeners=PLAINTEXT://localhost:${kafka.broker.port}",
                            "auto.create.topics.enable=${kafka.broker.topics-enable:true}" 
        },
        brokerPropertiesLocation = "classpath:/broker.properties"
)

앞의 예제에서 프로퍼티 자리표시자 ${kafka.topics.another-topic}, ${kafka.broker.logs-dir}${kafka.broker.port}는 스프링 Environment에서 확인할 수 있다. 또한 브로커 프로퍼티는 brokerPropertiesLocation에 지정된 broker.properties 클래스패스 리소스에서 로드된다. brokerPropertiesLocation URL 및 리소스에 있는 모든 프로퍼티 자리표시자에 대한 프로퍼티를 확인할 수 있다. brokerProperties에 의해 정의된 프로퍼티는 brokerPropertiesLocation에 있는 프로퍼티를을 오버라이드한다.

JUnit 4 또는 JUnit 5에서 @EmbeddedKafka 어노테이션을 사용할 수 있다.

JUnit5를 사용한 @EmbeddedKafka 어노테이션(@EmbeddedKafka Annotation with JUnit5)

버전 2.3부터 JUnit5와 함께 @EmbeddedKafka 어노테이션을 사용하는 두 가지 방법이 있다. @SpringJunitConfig 어노테이션과 함께 사용하면 임베디드 브로커가 테스트 애플리케이션 컨텍스트에 추가된다. 클래스 또는 메서드 레벨에서 브로커를 테스트에 자동으로 연결하여 브로커 주소 목록을 가져올 수 있다.

스프링 테스트 컨텍스트를 사용하지 않는 경우, EmbdeddedKafkaCondition은 브로커를 생성한다. 컨디션에는 테스트 메서드에서 브로커에 접근할 수 있도록 파라미터 확인자가 포함되어 있다.

@EmbeddedKafka
public class EmbeddedKafkaConditionTests {

    @Test
    public void test(EmbeddedKafkaBroker broker) {
        String brokerList = broker.getBrokersAsString();
        ...
    }
}

@EmbeddedKafka 어노테이션이 달린 클래스에 ExtendWith(SpringExtension.class)로 어노테이션을 달거나 메타 어노테이션을 달지 않는 한 독립형 브로커(스프링의 TestContext 외부)가 생성된다. @SpringJunitConfig@SpringBootTest에는 메타 어노테이션이 달려 있으며 해당 어노테이션 중 하나가 있을 때 컨텍스트 기반 브로커가 사용된다.

사용 가능한 스프링 테스트 애플리케이션 컨텍스트가 있는 경우 토픽 및 브로커 프로퍼티에는 프로퍼티 어딘가에 정의되어 있는 프로퍼티 자리 표시자가 사용될 수 있다. 사용 가능한 스프링 컨텍스트가 없으면 이러한 자리 표시자는 사용되지 않는다.

@SpringBootTest 어노테이션의 임베디드 브로커(Embedded Broker in @SpringBootTest Annotations)

이제 스프링 초기화는 테스트 범위의 spring-kafka-test 의존성을 프로젝트 구성에 자동으로 추가한다.


애플리케이션이 spring-cloud-stream에서 카프카 바인더를 사용하고 테스트를 위해 임베디드 브로커를 사용하려는 경우 spring-cloud-stream-test-support 의존성을 제거해야 한다. 왜냐하면 실제 바인더가 테스트 바인더로 대체되기 때문이다. 테스트 케이스의 경우, 일부 테스트에서는 테스트 바인더를 사용하고 일부 테스트에서는 임베디드 브로커를 사용하려는 경우 실제 바인더를 사용하는 테스트에서는 테스트 클래스에서 바인더 자동 구성을 제외하여 테스트 바인더를 비활성화해야 한다. 다음 예제에서는 그 방법을 보여줍니다.

@RunWith(SpringRunner.class)
@SpringBootTest(properties = "spring.autoconfigure.exclude="
    + "org.springframework.cloud.stream.test.binder.TestSupportBinderAutoConfiguration")
public class MyApplicationTests {
    ...
}

스프링 부트 애플리케이션 테스트에서 임베디드 브로커를 사용하는 방법에는 여러 가지가 있다.

여기에는 다음이 포함된다.

  • JUnit4 Class Rule
  • @EmbeddedKafka 어노테이션 또는 EmbeddedKafkaBroker 빈(@EmbeddedKafka Annotation or EmbeddedKafkaBroker Bean)

JUnit4 Class Rule

다음 예에서는 JUnit4 클래스 규칙을 사용하여 임베디드 브로커를 생성하는 방법을 보여준다.

@RunWith(SpringRunner.class)
@SpringBootTest
public class MyApplicationTests {

    @ClassRule
    public static EmbeddedKafkaRule broker = new EmbeddedKafkaRule(1, false, "someTopic")
            .brokerListProperty("spring.kafka.bootstrap-servers");

    @Autowired
    private KafkaTemplate<String, String> template;

    @Test
    public void test() {
        ...
    }

}

이는 스프링부트 애플리케이션이므로 브로커 목록 프로퍼티를 오버라이드하여 스프링부트의 프로퍼티를 설정한다.

@EmbeddedKafka 어노테이션 또는 EmbeddedKafkaBroker 빈(@EmbeddedKafka Annotation or EmbeddedKafkaBroker Bean)

다음 예제에서는 @EmbeddedKafka 어노테이션을 사용하여 임베디드 브로커를 생성하는 방법을 보여준다.

@RunWith(SpringRunner.class)
@EmbeddedKafka(topics = "someTopic", bootstrapServersProperty = "spring.kafka.bootstrap-servers") // 이 설정은 지금부터 기본값 이다.
public class MyApplicationTests {

    @Autowired
    private KafkaTemplate<String, String> template;

    @Test
    public void test() {
        ...
    }

}

bootstrapServersProperty는 버전 3.0.10부터 기본적으로 spring.kafka.bootstrap-servers로 자동 설정된다.

햄크레스트 매처스(Hamcrest Matchers)

org.springframework.kafka.test.hamcrest.KafkaMatchers는 다음과 같은 매처(matchers)를 제공한다.

/**
 * @param key the key
 * @param <K> the type.
 * @return a Matcher that matches the key in a consumer record.
 */
public static <K> Matcher<ConsumerRecord<K, ?>> hasKey(K key) { ... }

/**
 * @param value the value.
 * @param <V> the type.
 * @return a Matcher that matches the value in a consumer record.
 */
public static <V> Matcher<ConsumerRecord<?, V>> hasValue(V value) { ... }

/**
 * @param partition the partition.
 * @return a Matcher that matches the partition in a consumer record.
 */
public static Matcher<ConsumerRecord<?, ?>> hasPartition(int partition) { ... }

/**
 * Matcher testing the timestamp of a {@link ConsumerRecord} assuming the topic has been set with
 * {@link org.apache.kafka.common.record.TimestampType#CREATE_TIME CreateTime}.
 *
 * @param ts timestamp of the consumer record.
 * @return a Matcher that matches the timestamp in a consumer record.
 */
public static Matcher<ConsumerRecord<?, ?>> hasTimestamp(long ts) {
  return hasTimestamp(TimestampType.CREATE_TIME, ts);
}

/**
 * Matcher testing the timestamp of a {@link ConsumerRecord}
 * @param type timestamp type of the record
 * @param ts timestamp of the consumer record.
 * @return a Matcher that matches the timestamp in a consumer record.
 */
public static Matcher<ConsumerRecord<?, ?>> hasTimestamp(TimestampType type, long ts) {
  return new ConsumerRecordTimestampMatcher(type, ts);
}

어썰트제이 컨디션(AssertJ Conditions)

다음 어썰트제이 컨디션을 사용할 수 있다.

/**
 * @param key the key
 * @param <K> the type.
 * @return a Condition that matches the key in a consumer record.
 */
public static <K> Condition<ConsumerRecord<K, ?>> key(K key) { ... }

/**
 * @param value the value.
 * @param <V> the type.
 * @return a Condition that matches the value in a consumer record.
 */
public static <V> Condition<ConsumerRecord<?, V>> value(V value) { ... }

/**
 * @param key the key.
 * @param value the value.
 * @param <K> the key type.
 * @param <V> the value type.
 * @return a Condition that matches the key in a consumer record.
 * @since 2.2.12
 */
public static <K, V> Condition<ConsumerRecord<K, V>> keyValue(K key, V value) { ... }

/**
 * @param partition the partition.
 * @return a Condition that matches the partition in a consumer record.
 */
public static Condition<ConsumerRecord<?, ?>> partition(int partition) { ... }

/**
 * @param value the timestamp.
 * @return a Condition that matches the timestamp value in a consumer record.
 */
public static Condition<ConsumerRecord<?, ?>> timestamp(long value) {
  return new ConsumerRecordTimestampCondition(TimestampType.CREATE_TIME, value);
}

/**
 * @param type the type of timestamp
 * @param value the timestamp.
 * @return a Condition that matches the timestamp value in a consumer record.
 */
public static Condition<ConsumerRecord<?, ?>> timestamp(TimestampType type, long value) {
  return new ConsumerRecordTimestampCondition(type, value);
}

예제

다음 예제에서는 이 장 대부분의 주제를 다룬다.

public class KafkaTemplateTests {

    private static final String TEMPLATE_TOPIC = "templateTopic";

    @ClassRule
    public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, TEMPLATE_TOPIC);

    @Test
    public void testTemplate() throws Exception {
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testT", "false", embeddedKafka.getEmbeddedKafka());
        DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
        ContainerProperties containerProperties = new ContainerProperties(TEMPLATE_TOPIC);
        KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf, containerProperties);
        final BlockingQueue<ConsumerRecord<Integer, String>> records = new LinkedBlockingQueue<>();
        container.setupMessageListener(new MessageListener<Integer, String>() {

            @Override
            public void onMessage(ConsumerRecord<Integer, String> record) {
                System.out.println(record);
                records.add(record);
            }

        });
        container.setBeanName("templateTests");
        container.start();
        ContainerTestUtils.waitForAssignment(container, embeddedKafka.getEmbeddedKafka().getPartitionsPerTopic());
        Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka());
        ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(producerProps);
        KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
        template.setDefaultTopic(TEMPLATE_TOPIC);
        template.sendDefault("foo");
        assertThat(records.poll(10, TimeUnit.SECONDS), hasValue("foo"));
        template.sendDefault(0, 2, "bar");
        ConsumerRecord<Integer, String> received = records.poll(10, TimeUnit.SECONDS);
        assertThat(received, hasKey(2));
        assertThat(received, hasPartition(0));
        assertThat(received, hasValue("bar"));
        template.send(TEMPLATE_TOPIC, 0, 2, "baz");
        received = records.poll(10, TimeUnit.SECONDS);
        assertThat(received, hasKey(2));
        assertThat(received, hasPartition(0));
        assertThat(received, hasValue("baz"));
    }

}

앞의 예에서는 햄크레스트(Hamcrest) 매처를 사용한다. 어썰트제이를 사용하면 마지막 부분은 다음 코드와 같다.

assertThat(records.poll(10, TimeUnit.SECONDS)).has(value("foo"));
template.sendDefault(0, 2, "bar");
ConsumerRecord<Integer, String> received = records.poll(10, TimeUnit.SECONDS);
// using individual assertions
assertThat(received).has(key(2));
assertThat(received).has(value("bar"));
assertThat(received).has(partition(0));
template.send(TEMPLATE_TOPIC, 0, 2, "baz");
received = records.poll(10, TimeUnit.SECONDS);
// using allOf()
assertThat(received).has(allOf(keyValue(2, "baz"), partition(0)));

목 컨슈머와 프로듀서(Mock Consumer and Producer)

kafka-clients 라이브러리는 테스트 목적으로 MockConsumerMockProducer 클래스를 제공한다.

버전 3.0.7부터 각각 리스너 컨테이너 또는 KafkaTemplate을 사용하여 일부 테스트에서 이러한 클래스를 사용하려는 경우 프레임워크는 이제 MockConsumerFactoryMockProducerFactory 구현체를 제공한다.

이러한 팩토리는 실행 중인(또는 임베디드) 브로커가 필요한 기본 팩토리 대신 리스너 컨테이너 및 템플릿에서 사용할 수 있다.

다음은 단일 컨슈머를 반환하는 간단한 구현의 예제다.

@Bean
ConsumerFactory<String, String> consumerFactory() {
    MockConsumer<String, String> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
    TopicPartition topicPartition0 = new TopicPartition("topic", 0);
    List<TopicPartition> topicPartitions = Collections.singletonList(topicPartition0);
    Map<TopicPartition, Long> beginningOffsets = topicPartitions.stream().collect(Collectors.toMap(Function.identity(), tp -> 0L));
    consumer.updateBeginningOffsets(beginningOffsets);
    consumer.schedulePollTask(() -> {
        consumer.addRecord(new ConsumerRecord<>("topic", 0, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "test1", new RecordHeaders(), Optional.empty()));
        consumer.addRecord(new ConsumerRecord<>("topic", 0, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "test2", new RecordHeaders(), Optional.empty()));
    });

    return new MockConsumerFactory(() -> consumer);
}

동시성(concurrency)을 테스트하려면, 팩토리 프로듀서의 공급자 람다(Supplier lambda)가 매번 새 인스턴스를 만들어야 한다.

MockProducerFactory에는 두 개의 생성자가 있다. 하나는 간단한 팩토리를 만드는 것이고, 다른 하나는 트랜잭션을 지원하는 팩토리를 만드는 것이다.

여기 예제가 있다.

@Bean
ProducerFactory<String, String> nonTransFactory() {
    return new MockProducerFactory<>(() -> new MockProducer<>(true, new StringSerializer(), new StringSerializer()));
}

@Bean
ProducerFactory<String, String> transFactory() {
    MockProducer<String, String> mockProducer = new MockProducer<>(true, new StringSerializer(), new StringSerializer());
    mockProducer.initTransactions();
    return new MockProducerFactory<String, String>((tx, id) -> mockProducer, "defaultTxId");
}

두 번째 경우, 람다는 호출자가 트랜잭션 프로듀서를 원하는 경우 첫 번째 파라미터가 trueBiFunction<Boolean, String>이다. 선택한 두 번째 파라미터에는 트랜잭션 id가 포함된다. 이는 기본값(프로듀서에 제공됨)이거나 구성되어 있다면, KafkaTransactionManager(또는 로컬 트랜잭션의 경우 KafkaTemplate)에 의해 오버라이드될 수 있다. 이 값을 기반으로 다른 MockProducer를 사용하려는 경우 트랜잭션 ID가 제공된다.

다중 스레드 환경에서 프로듀서를 사용하는 경우 BiFunction은 여러 프로듀서를 반환해야 한다(아마도 ThreadLocal을 사용하여 스레드 바인딩됨).

트랜잭션용 MockProducerinitTransaction()을 호출하여 트랜잭션에 대해 초기화되어야 한다.

MockProducer를 사용할 때, 각각 전송 후 프로듀서를 닫지 않으려면 슈퍼 클래스에서 close 메서드를 호출하지 않도록, close 메서드를 오버라이드하는 커스텀 MockProducer 구현체를 제공할 수 있다. 이는 동일한 프로듀서에 대한 여러 게시를 닫지 않고 확인할 때 테스트하는 데 편리하다.

예제다.

@Bean
MockProducer<String, String> mockProducer() {
    return new MockProducer<>(false, new StringSerializer(), new StringSerializer()) {
        @Override
        public void close() {

        }
    };
}

@Bean
ProducerFactory<String, String> mockProducerFactory(MockProducer<String, String> mockProducer) {
    return new MockProducerFactory<>(() -> mockProducer);
}