본문 바로가기

카프카

카프카 프로듀서 spring boot로 구현하기

카프카 프로듀서 spring boot로 구현하기

카프카 프로듀서를 Spring Boot로 어떻게 구현하는지 설명하도록 하겠습니다.

모든 소스 코드는 아래 링크를 통해 보실 수 있습니다.

https://github.com/kang-seongbeom/kafka-producer-example

 

 

오늘 구현할 것을 그림으로 나타내면 다음과 같습니다.

프로젝트 구조는 다음과 같습니다.

producer
  |_ config
    |_ KafkaProducerConfig.class
    |_ KafkaTopicConfig.class
    |_ SetProperties.class
  |_ controller
    |_ PushRequestController.class
  |_ dto
    |_ PushRequestDto.class
  |_ service
    |_ PushRequestService.class
  |_ ProducerApplication.class

 

1. 의존성 설정

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>  
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-configuration-processor</artifactId>
        <optional>true</optional>
    </dependency>     
</dependencies>
  • spring-boot-starter-web
    • SpringMVC를 사용하여 Web 애플리케이션으로 만들 수 있습니다.
  • spring-kafka
    • 카프카 프로듀서를 쉽게 만들기 위한 각종 기능이 있습니다.
  • spring-boot-configuration-processor
    • yml의 값을 매핑하여 java 코드에서 객체로 사용할 수 있게 도와줍니다.

 

2. YML 설정

server:
  servlet:
    context-path: /
    encoding:
      charset: UTF-8
      enabled: true
      force: true
  port: 8050

setting:
  ksb:
    bootstrapServers: host.docker.internal:29092, host.docker.internal:29093, host.docker.internal:29094
    topic: app-push-topic
    partition: 3
    replication: 3
  • port : 8050
    • 애플리케이션을 동작했을 때 접속할 수 있는 포트 번호입니다.
  • bootstrapServers
    • 카프카 브로커 서버 주소입니다.
  • topic
    • 카프카에 전송할 토픽 이름입니다.
  • partition
    • 카프카에서 토픽을 만들 때 설정한 partition 개수입니다.
  • replication
    • 데이터가 복제될 개수입니다.

💡 설정한 topic 이름과 일치하는 토픽이 카프카에 존재하지 않을 때, topic, partition, replication 세 개의 값을 통해 토픽을 만들게됩니다.

 

3. 설정 값 매핑

@Getter
@ConstructorBinding
@ConfigurationProperties(prefix = "setting")
@RequiredArgsConstructor
public class SetProperties {

    private final Ksb ksb;

    @Getter
    @RequiredArgsConstructor
    public static final class Ksb {
        private final List<String> bootstrapServers;
        private final String topic;
        private final int partition;
        private final short replication;
    }
}
  • @ConstructorBinding
    • 생성자로 값을 매핑할 수 있게 합니다.
  • @ConfigurationProperties(prefix = "setting")
    • yml의 값을 가져와서 해당 클래스의 각 값에 매핑합니다.

 

4. 카프카 프로듀서 커스텀 설정

@Configuration
public class KafkaTopicConfig {

    @Autowired
    private SetProperties setProperties;

    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> config = new HashMap<>();
        config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
                this.setProperties.getKsb().getBootstrapServers());
        return new KafkaAdmin(config);
    }

    @Bean
    public NewTopic newTopic() {
        String topicName = setProperties.getKsb().getTopic();
        int partition = setProperties.getKsb().getPartition();
        short replication = setProperties.getKsb().getReplication();

        return new NewTopic(topicName, partition, replication);
    }
}
  • kafkaAdmin()
    • 카프카 브로커 서버의 값을 가져와 설정합니다.
  • newTopic()
    • topic, partition, replication의 값을 가져와 카프카에 전송할 토픽 객체를 만듭니다.
@Configuration
public class KafkaProducerConfig {

    @Autowired
    private SetProperties setProperties;

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = producerFactoryConfig();
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    private Map<String, Object> producerFactoryConfig() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                this.setProperties.getKsb().getBootstrapServers());
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return configProps;
    }
}
  • kafkaTemplate()
    • 프로듀서 팩토리롤 통해 만든 설정 값을 바탕으로 템플릿을 만듭니다.
  • producerFactory()
    • 설정 값을 통해 팩토리를 만들어 반환합니다.
  • producerFactoryConfig()
    • 팩토리에 설정 된 값을 매핑하여 반환합니다.

 

5. 컨트롤러

@RestController
public class PushRequestController {

    @Autowired
    private PushRequestService service;

    @PostMapping("/push")
    public String push(@RequestBody PushRequestDto dto) {
        service.send(dto);
        return "success";
    }
}
  • push(@RequestBody PushRequestDto dto)
    • 요청을 받아서 서비스 단에 넘깁니다.

 

6. DTO

@Data
@NoArgsConstructor
@AllArgsConstructor
public class PushRequestDto {

    private String token;
    public String message;

    @Override
    public String toString() {
        return "PushRequestDto{" +
                "token='" + token + '\\'' +
                ", message='" + message + '\\'' +
                '}';
    }
}

 

7. 서비스

@Service
public class PushRequestService {

    @Autowired
    SetProperties properties;

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void send(PushRequestDto dto) {
        Message<String> message = MessageBuilder
                .withPayload(dto.toString())
                .setHeader(KafkaHeaders.TOPIC, properties.getKsb().getTopic())
                .build();

        // TODO: 비동기로 받은 결과 처리
        kafkaTemplate.send(message);
    }
}
  • send(PushRequestDto dto)
    • 전달받은 값을 카프카에 전송합니다.

 

8. 데이터 전송

값을 전송하면 데이터가 정상적으로 들어가 있는 것을 확인하실 수 있습니다.

  • postman 전송 gif

  • spring boot(producer) 전송 gif

  • kafka 수신 gif