빠르게 둘러보기(Quick Tour)
전제조건: 아파치 카프카를 설치하고 실행해야 한다. 그런 다음 아파치 카프카용 스프링(spring-kafka
) JAR과 모든 의존 항목을 클래스패스에 배치해야 한다. 가장 쉬운 방법은 빌드 도구에서 의존성을 선언하는 것이다.
스프링 부트를 사용하지 않는 경우 프로젝트에서 spring-kafka
jar를 의존성으로 선언하자.
메이븐
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.1.0</version>
</dependency>
그레이들
compile 'org.springframework.kafka:spring-kafka:3.1.0'
스프링 부트를 사용할 때(그리고 프로젝트를 생성하기 위해
start.spring.io
를 사용하지 않은 경우), 버전을 생략하면 부트가 자동으로 부트 버전과 호환되는 올바른 버전을 가져온다.
메이븐
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
그레이들
implementation 'org.springframework.kafka:spring-kafka'
그러나, 시작하는 가장 빠른 방법은 start.spring.io
(또는 Spring Tool Suits
및 Intellij IDEA
)를 사용하고 ‘Spring for Apache Kafka’를 의존성으로 선택하여 프로젝트를 생성하는 것이다.
호환성(Compatibility)
이 빠르게 둘러보기
는 다음 버전에서 작동한다.
- 아파치 카프카 클라이언트 3.6.x
- 스프링 프레임워크 6.1.x
- 자바 최소 버전 17
시작하기(Getting Started)
시작하는 가장 간단한 방법은 start.spring.io
(또는 Spring Tool Suits
및 Intellij IDEA
)를 사용하고 ‘Spring for Apache Kafka’를 의존성으로 선택하여 프로젝트를 생성하는 것이다. 인프라 빈의 독창적인 자동 구성에 대한 자세한 내용은 스프링 부트 설명서를 참고하자.
다음은 간단한 컨슈머(Consumer) 애플리케이션이다.
스프링 부트 컨슈머 앱(Spring Boot Consumer App)
자바
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("topic1")
.partitions(10)
.replicas(1)
.build();
}
@KafkaListener(id = "myId", topics = "topic1")
public void listen(String in) {
System.out.println(in);
}
}
코틀린
@SpringBootApplication
class Application {
@Bean
fun topic() = NewTopic("topic1", 10, 1)
@KafkaListener(id = "myId", topics = ["topic1"])
fun listen(value: String?) {
println(value)
}
}
fun main(args: Array<String>) = runApplication<Application>(*args)
application.properties
spring.kafka.consumer.auto-offset-reset=earliest
NewTopic 빈을 사용하면 브로커에 토픽이 생성된다. 토픽이 이미 존재하는 경우에는 필요하지 않다.
스프링 부트 프로듀서 앱(Spring Boot Producer App)
자바
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("topic1")
.partitions(10)
.replicas(1)
.build();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send("topic1", "test");
};
}
}
코틀린
@SpringBootApplication
class Application {
@Bean
fun topic() = NewTopic("topic1", 10, 1)
@Bean
fun runner(template: KafkaTemplate<String?, String?>) =
ApplicationRunner { template.send("topic1", "test") }
companion object {
@JvmStatic
fun main(args: Array<String>) = runApplication<Application>(*args)
}
}
자바 구성 사용(스프링 부트 없음)
아파치 카프카용 스프링은 스프링 애플리케이션 컨텍스트에서 사용되도록 설계됐다. 예를 들어 스프링 컨텍스트 외부에서 직접 리스너 컨테이너를 생성하는 경우 컨테이너가 구현하는 …Aware 인터페이스를 모두 충족하지 않으면 모든 기능이 작동하지 않는다.
다음은 스프링 부트를 사용하지 않는 애플리케이션 예제다. 컨슈머와 프로듀서 모두 있다.
Without Spring Boot
자바
public class Sender {
public static void main(String[] args) {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Config.class);
context.getBean(Sender.class).send("test", 42);
}
private final KafkaTemplate<Integer, String> template;
public Sender(KafkaTemplate<Integer, String> template) {
this.template = template;
}
public void send(String toSend, int key) {
this.template.send("topic1", key, toSend);
}
}
public class Listener {
@KafkaListener(id = "listen1", topics = "topic1")
public void listen1(String in) {
System.out.println(in);
}
}
@Configuration
@EnableKafka
public class Config {
@Bean
ConcurrentKafkaListenerContainerFactory<Integer, String>
kafkaListenerContainerFactory(ConsumerFactory<Integer, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerProps());
}
private Map<String, Object> consumerProps() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// ...
return props;
}
@Bean
public Sender sender(KafkaTemplate<Integer, String> template) {
return new Sender(template);
}
@Bean
public Listener listener() {
return new Listener();
}
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(senderProps());
}
private Map<String, Object> senderProps() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
//...
return props;
}
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate(ProducerFactory<Integer, String> producerFactory) {
return new KafkaTemplate<>(producerFactory);
}
}
코틀린
class Sender(private val template: KafkaTemplate<Int, String>) {
fun send(toSend: String, key: Int) {
template.send("topic1", key, toSend)
}
}
class Listener {
@KafkaListener(id = "listen1", topics = ["topic1"])
fun listen1(`in`: String) {
println(`in`)
}
}
@Configuration
@EnableKafka
class Config {
@Bean
fun kafkaListenerContainerFactory(consumerFactory: ConsumerFactory<Int, String>) =
ConcurrentKafkaListenerContainerFactory<Int, String>().also { it.consumerFactory = consumerFactory }
@Bean
fun consumerFactory() = DefaultKafkaConsumerFactory<Int, String>(consumerProps)
val consumerProps = mapOf(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to "localhost:9092",
ConsumerConfig.GROUP_ID_CONFIG to "group",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to IntegerDeserializer::class.java,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest"
)
@Bean
fun sender(template: KafkaTemplate<Int, String>) = Sender(template)
@Bean
fun listener() = Listener()
@Bean
fun producerFactory() = DefaultKafkaProducerFactory<Int, String>(senderProps)
val senderProps = mapOf(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "localhost:9092",
ProducerConfig.LINGER_MS_CONFIG to 10,
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to IntegerSerializer::class.java,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java
)
@Bean
fun kafkaTemplate(producerFactory: ProducerFactory<Int, String>) = KafkaTemplate(producerFactory)
}
보시다시피, 스프링 부트를 사용하지 않을 때는 여러 인프라 빈을 정의해야 한다.