카프카 컨슈머를 Spring Boot로 어떻게 구현하는지 설명하도록 하겠습니다.
모든 소스 코드는 아래 링크를 통해 보실 수 있습니다.
https://github.com/kang-seongbeom/kafka-consumer-example
오늘 구현할 것을 그림으로 나타내면 다음과 같습니다.
프로젝트 구조는 다음과 같습니다.
consumer
|_ config
|_ KafkaConsumerConfig.class
|_ TaskExecutorConfig.class
|_ dto
|_ PushRequestDto.class
|_ service
|_ PushRequestKafkaMessageListner.class
|_ KafkaConsumerExampleApplication.class
1. 의존성 설정
<dependencies>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
</dependency>
</dependencies>
- spring-kafka
- 카프카 컨슈머를 쉽게 만들기 위한 각종 기능이 있습니다.
- slf4j-simple
- 카프카 컨슈머 로그들이 콘솔에 기록되도록 합니다.
2. Properties 설정
server.port=8081
server.servlet.context-path=/
server.servlet.encoding.charset=UTF-8
server.servlet.encoding.enabled=true
server.servlet.encoding.force=true
setting.bootstrapServers= 127.0.0.1:29092, 127.0.0.1:29093, 127.0.0.1:29094
setting.ksb.topic= app-push-topic
setting.ksb.group= app-push-group
setting.ksb.enable.auto.commit= false
setting.ksb.earliest= earliest
- group
- 컨슈머는 프로듀서와 달리 group이 존재합니다. 그룹핑을 하여 데이터를 어떻게 소비할지 설정할 수 있습니다.
3. 카프카 컨슈머 커스텀 설정
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value("${setting.bootstrapServers}")
private String bootstrapServers;
@Value("${setting.ksb.group}")
private String groupId;
@Value("${setting.ksb.enable.auto.commit}")
private boolean autocommit;
@Value("${setting.ksb.earliest}")
private String earliest;
@Autowired
private TaskExecutorConfig taskExecutorConfig;
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaConsumerFactory(){
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
// 기본 팩토리 설정
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3); // 하나의 리스너에서 스레드 3개로 처리.
// 수동 커밋
// 리스너에서 acknowledgment가 호출될 때 마다, 커밋
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
// 컨슈머 스레드 처리
factory.getContainerProperties().setConsumerTaskExecutor(taskExecutorConfig.executor());
// retry
factory.setRetryTemplate(retryTemplate());
factory.setRecoveryCallback(context -> {
System.out.println("consumer retry : " + context.toString());
return null;
});
factory.setErrorHandler(new SeekToCurrentErrorHandler());
return factory;
}
@Bean
public ConsumerFactory<string, string=""> consumerFactory(){
JsonDeserializer deserializer = pushDeserializer();
return new DefaultKafkaConsumerFactory<>(
consumerFactoryConfig(deserializer),
new StringDeserializer(),
deserializer);
}
private Map<string, object=""> consumerFactoryConfig(JsonDeserializer deserializer) {
Map<string, object=""> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, this.groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, this.autocommit);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, this.earliest);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 5000); // poll 요청을 보내고, 다음 poll 요청을 보내는데 까지의 최대 시간 설정
return props;
}
private JsonDeserializer pushDeserializer() {
JsonDeserializer deserializer = new JsonDeserializer<>(String.class);
deserializer.setRemoveTypeHeaders(false);
deserializer.addTrustedPackages("*");
deserializer.setUseTypeMapperForKey(true);
return deserializer;
}
// retry관련 내용 블로그
// <https://wfreud.tistory.com/352>
// <https://objectpartners.com/2018/11/21/building-resilient-kafka-consumers-with-spring-retry/>
// <https://blog.leocat.kr/notes/2018/10/10/translation-retrying-consumer-architecture-in-the-apache-kafka>
// <https://gunju-ko.github.io/kafka/spring-kafka/2018/04/16/Spring-Kafka-Retry.html>
@Bean
RetryTemplate retryTemplate(){
RetryTemplate retryTemplate = new RetryTemplate();
// retry 정책
FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
fixedBackOffPolicy.setBackOffPeriod(1000L);
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(3);
retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
retryTemplate.setRetryPolicy(retryPolicy);
return retryTemplate;
}
}
- retryTemplate()
- 메시지 수신에 실패했을 때, 재시도록 어떻게 할 지 설정할 수 있습니다.
💡 컨슈머는 데이터를 소비하고, 이를 카프카에 기록하기 위해 커밋을 하게 됩니다. 커밋의 내용은 주제에 벗어나기 때문에 수동 커밋으로 동작한 다는 것만 알고 지나가시면 됩니다.
@Configuration
public class TaskExecutorConfig {
@Bean
public ThreadPoolTaskExecutor executor(){
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(200);
executor.setQueueCapacity(250);
executor.setThreadFactory(new CustomizableThreadFactory("kafka-thread")); // 이름 prefix
return executor;
}
}
4. Dto
@Data
@NoArgsConstructor
@AllArgsConstructor
public class PushRequestDto {
private String token;
public String message;
@Override
public String toString() {
return "PushRequestDto{" +
"token='" + token + '\\'' +
", message='" + message + '\\'' +
'}';
}
}
5. Listener
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.AcknowledgingMessageListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
@Component
public class PushRequestKafkaMessageListener
implements AcknowledgingMessageListener<String, String> {
@Override
@KafkaListener(topics = "${setting.ksb.topic}",
groupId = "${setting.ksb.group}",
containerFactory = "kafkaConsumerFactory") // containerFactory는 config 파일에서 설정한 bean
public void onMessage(ConsumerRecord<String, String> consumerRecord,
Acknowledgment acknowledgment) {
try {
System.out.println(consumerRecord.toString());
acknowledgment.acknowledge(); // 커밋되는 시기 제어
Thread.sleep(3000);
} catch (InterruptedException e){
System.out.println(e.getMessage());
}
}
}
- @KafkaListener
- topic, groupId, containerFactory 세 개의 값을 통해 카프카로부터 값을 가져올 수 있습니다.
- onMessage(ConsumerRecord<String, String> consumerRecord, Acknowledgment acknowledgment)
- 카프카로부터 값을 가져와 처리를 합니다.
6. 데이터 수신
- postman 전송 gif
- spring boot(consumer) 수신 gif
'카프카' 카테고리의 다른 글
카프카 프로듀서 spring boot로 구현하기 (0) | 2023.01.12 |
---|---|
카프카 프로듀서와 컨슈머 docker shell 에서 테스트하기 (0) | 2023.01.05 |
카프카 클러스터 docker compose로 구축하기 (0) | 2023.01.05 |
카프카란 무엇인가? - 2 (0) | 2022.12.29 |
카프카란 무엇인가? - 1 (2) | 2022.12.21 |