본문 바로가기

카프카

카프카 컨슈머 spring boot로 구현하기

카프카 컨슈머를 Spring Boot로 어떻게 구현하는지 설명하도록 하겠습니다.

모든 소스 코드는 아래 링크를 통해 보실 수 있습니다.

https://github.com/kang-seongbeom/kafka-consumer-example

 

오늘 구현할 것을 그림으로 나타내면 다음과 같습니다.

 

프로젝트 구조는 다음과 같습니다.

consumer
  |_ config
    |_ KafkaConsumerConfig.class
    |_ TaskExecutorConfig.class
  |_ dto
    |_ PushRequestDto.class
  |_ service
    |_ PushRequestKafkaMessageListner.class
  |_ KafkaConsumerExampleApplication.class

 

1. 의존성 설정

<dependencies>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>  
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-configuration-processor</artifactId>
        <optional>true</optional>
    </dependency>     
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-simple</artifactId>
    </dependency>
</dependencies>
  • spring-kafka
    • 카프카 컨슈머를 쉽게 만들기 위한 각종 기능이 있습니다.
  • slf4j-simple
    • 카프카 컨슈머 로그들이 콘솔에 기록되도록 합니다.

 

2. Properties 설정

server.port=8081
server.servlet.context-path=/
server.servlet.encoding.charset=UTF-8
server.servlet.encoding.enabled=true
server.servlet.encoding.force=true

setting.bootstrapServers= 127.0.0.1:29092, 127.0.0.1:29093, 127.0.0.1:29094
setting.ksb.topic= app-push-topic
setting.ksb.group= app-push-group
setting.ksb.enable.auto.commit= false
setting.ksb.earliest= earliest
  • group
    • 컨슈머는 프로듀서와 달리 group이 존재합니다. 그룹핑을 하여 데이터를 어떻게 소비할지 설정할 수 있습니다.

 

3. 카프카 컨슈머 커스텀 설정

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Value("${setting.bootstrapServers}")
    private String bootstrapServers;

    @Value("${setting.ksb.group}")
    private String groupId;

    @Value("${setting.ksb.enable.auto.commit}")
    private boolean autocommit;

    @Value("${setting.ksb.earliest}")
    private String earliest;

    @Autowired
    private TaskExecutorConfig taskExecutorConfig;

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaConsumerFactory(){
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();

        // 기본 팩토리 설정
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3); // 하나의 리스너에서 스레드 3개로 처리.

        // 수동 커밋
        // 리스너에서 acknowledgment가 호출될 때 마다, 커밋
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);

        // 컨슈머 스레드 처리
        factory.getContainerProperties().setConsumerTaskExecutor(taskExecutorConfig.executor());

        // retry
        factory.setRetryTemplate(retryTemplate());
        factory.setRecoveryCallback(context -> {
            System.out.println("consumer retry : " + context.toString());
            return null;
        });
        factory.setErrorHandler(new SeekToCurrentErrorHandler());

        return factory;
    }

    @Bean
    public ConsumerFactory<string, string=""> consumerFactory(){
        JsonDeserializer deserializer = pushDeserializer();
        return new DefaultKafkaConsumerFactory<>(
                consumerFactoryConfig(deserializer), 
                new StringDeserializer(), 
                deserializer);
    }

    private Map<string, object=""> consumerFactoryConfig(JsonDeserializer deserializer) {
        Map<string, object=""> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, this.groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, this.autocommit);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, this.earliest);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 5000); // poll 요청을 보내고, 다음 poll 요청을 보내는데 까지의 최대 시간 설정
        return props;
    }

    private JsonDeserializer pushDeserializer() {
        JsonDeserializer deserializer = new JsonDeserializer<>(String.class);
        deserializer.setRemoveTypeHeaders(false);
        deserializer.addTrustedPackages("*");
        deserializer.setUseTypeMapperForKey(true);
        return deserializer;
    }

    // retry관련 내용 블로그
    // <https://wfreud.tistory.com/352>
    // <https://objectpartners.com/2018/11/21/building-resilient-kafka-consumers-with-spring-retry/>
    // <https://blog.leocat.kr/notes/2018/10/10/translation-retrying-consumer-architecture-in-the-apache-kafka>
    // <https://gunju-ko.github.io/kafka/spring-kafka/2018/04/16/Spring-Kafka-Retry.html>
    @Bean
    RetryTemplate retryTemplate(){
        RetryTemplate retryTemplate = new RetryTemplate();

        // retry 정책
        FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
        fixedBackOffPolicy.setBackOffPeriod(1000L);

        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
        retryPolicy.setMaxAttempts(3);

        retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
        retryTemplate.setRetryPolicy(retryPolicy);

        return retryTemplate;
    }
}
  • retryTemplate()
    • 메시지 수신에 실패했을 때, 재시도록 어떻게 할 지 설정할 수 있습니다.

💡 컨슈머는 데이터를 소비하고, 이를 카프카에 기록하기 위해 커밋을 하게 됩니다. 커밋의 내용은 주제에 벗어나기 때문에 수동 커밋으로 동작한 다는 것만 알고 지나가시면 됩니다.

@Configuration
public class TaskExecutorConfig {

    @Bean
    public ThreadPoolTaskExecutor executor(){
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(200);
        executor.setQueueCapacity(250);
        executor.setThreadFactory(new CustomizableThreadFactory("kafka-thread")); // 이름 prefix
        return executor;
    }
}

 

4. Dto

@Data
@NoArgsConstructor
@AllArgsConstructor
public class PushRequestDto {

    private String token;
    public String message;

    @Override
    public String toString() {
        return "PushRequestDto{" +
                "token='" + token + '\\'' +
                ", message='" + message + '\\'' +
                '}';
    }
}

 

5. Listener

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.AcknowledgingMessageListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

@Component
public class PushRequestKafkaMessageListener
        implements AcknowledgingMessageListener<String, String> {

    @Override
    @KafkaListener(topics = "${setting.ksb.topic}",
            groupId = "${setting.ksb.group}",
            containerFactory = "kafkaConsumerFactory") // containerFactory는 config 파일에서 설정한 bean
    public void onMessage(ConsumerRecord<String, String> consumerRecord,
                          Acknowledgment acknowledgment) {
        try {
            System.out.println(consumerRecord.toString());
            acknowledgment.acknowledge(); // 커밋되는 시기 제어
            Thread.sleep(3000);
        } catch (InterruptedException e){
            System.out.println(e.getMessage());
        }
    }
}
  • @KafkaListener
    • topic, groupId, containerFactory 세 개의 값을 통해 카프카로부터 값을 가져올 수 있습니다.
  • onMessage(ConsumerRecord<String, String> consumerRecord, Acknowledgment acknowledgment)
    • 카프카로부터 값을 가져와 처리를 합니다.

 

6. 데이터 수신

  • postman 전송 gif

  • spring boot(consumer) 수신 gif