카프카 프로듀서 spring boot로 구현하기
카프카 프로듀서를 Spring Boot로 어떻게 구현하는지 설명하도록 하겠습니다.
모든 소스 코드는 아래 링크를 통해 보실 수 있습니다.
https://github.com/kang-seongbeom/kafka-producer-example
오늘 구현할 것을 그림으로 나타내면 다음과 같습니다.
프로젝트 구조는 다음과 같습니다.
producer
|_ config
|_ KafkaProducerConfig.class
|_ KafkaTopicConfig.class
|_ SetProperties.class
|_ controller
|_ PushRequestController.class
|_ dto
|_ PushRequestDto.class
|_ service
|_ PushRequestService.class
|_ ProducerApplication.class
1. 의존성 설정
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
- spring-boot-starter-web
- SpringMVC를 사용하여 Web 애플리케이션으로 만들 수 있습니다.
- spring-kafka
- 카프카 프로듀서를 쉽게 만들기 위한 각종 기능이 있습니다.
- spring-boot-configuration-processor
- yml의 값을 매핑하여 java 코드에서 객체로 사용할 수 있게 도와줍니다.
2. YML 설정
server:
servlet:
context-path: /
encoding:
charset: UTF-8
enabled: true
force: true
port: 8050
setting:
ksb:
bootstrapServers: host.docker.internal:29092, host.docker.internal:29093, host.docker.internal:29094
topic: app-push-topic
partition: 3
replication: 3
- port : 8050
- 애플리케이션을 동작했을 때 접속할 수 있는 포트 번호입니다.
- bootstrapServers
- 카프카 브로커 서버 주소입니다.
- topic
- 카프카에 전송할 토픽 이름입니다.
- partition
- 카프카에서 토픽을 만들 때 설정한 partition 개수입니다.
- replication
- 데이터가 복제될 개수입니다.
💡 설정한 topic 이름과 일치하는 토픽이 카프카에 존재하지 않을 때, topic, partition, replication 세 개의 값을 통해 토픽을 만들게됩니다.
3. 설정 값 매핑
@Getter
@ConstructorBinding
@ConfigurationProperties(prefix = "setting")
@RequiredArgsConstructor
public class SetProperties {
private final Ksb ksb;
@Getter
@RequiredArgsConstructor
public static final class Ksb {
private final List<String> bootstrapServers;
private final String topic;
private final int partition;
private final short replication;
}
}
- @ConstructorBinding
- 생성자로 값을 매핑할 수 있게 합니다.
- @ConfigurationProperties(prefix = "setting")
- yml의 값을 가져와서 해당 클래스의 각 값에 매핑합니다.
4. 카프카 프로듀서 커스텀 설정
@Configuration
public class KafkaTopicConfig {
@Autowired
private SetProperties setProperties;
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> config = new HashMap<>();
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
this.setProperties.getKsb().getBootstrapServers());
return new KafkaAdmin(config);
}
@Bean
public NewTopic newTopic() {
String topicName = setProperties.getKsb().getTopic();
int partition = setProperties.getKsb().getPartition();
short replication = setProperties.getKsb().getReplication();
return new NewTopic(topicName, partition, replication);
}
}
- kafkaAdmin()
- 카프카 브로커 서버의 값을 가져와 설정합니다.
- newTopic()
- topic, partition, replication의 값을 가져와 카프카에 전송할 토픽 객체를 만듭니다.
@Configuration
public class KafkaProducerConfig {
@Autowired
private SetProperties setProperties;
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = producerFactoryConfig();
return new DefaultKafkaProducerFactory<>(configProps);
}
private Map<String, Object> producerFactoryConfig() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
this.setProperties.getKsb().getBootstrapServers());
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return configProps;
}
}
- kafkaTemplate()
- 프로듀서 팩토리롤 통해 만든 설정 값을 바탕으로 템플릿을 만듭니다.
- producerFactory()
- 설정 값을 통해 팩토리를 만들어 반환합니다.
- producerFactoryConfig()
- 팩토리에 설정 된 값을 매핑하여 반환합니다.
5. 컨트롤러
@RestController
public class PushRequestController {
@Autowired
private PushRequestService service;
@PostMapping("/push")
public String push(@RequestBody PushRequestDto dto) {
service.send(dto);
return "success";
}
}
- push(@RequestBody PushRequestDto dto)
- 요청을 받아서 서비스 단에 넘깁니다.
6. DTO
@Data
@NoArgsConstructor
@AllArgsConstructor
public class PushRequestDto {
private String token;
public String message;
@Override
public String toString() {
return "PushRequestDto{" +
"token='" + token + '\\'' +
", message='" + message + '\\'' +
'}';
}
}
7. 서비스
@Service
public class PushRequestService {
@Autowired
SetProperties properties;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void send(PushRequestDto dto) {
Message<String> message = MessageBuilder
.withPayload(dto.toString())
.setHeader(KafkaHeaders.TOPIC, properties.getKsb().getTopic())
.build();
// TODO: 비동기로 받은 결과 처리
kafkaTemplate.send(message);
}
}
- send(PushRequestDto dto)
- 전달받은 값을 카프카에 전송합니다.
8. 데이터 전송
값을 전송하면 데이터가 정상적으로 들어가 있는 것을 확인하실 수 있습니다.
- postman 전송 gif
- spring boot(producer) 전송 gif
- kafka 수신 gif
'카프카' 카테고리의 다른 글
카프카 컨슈머 spring boot로 구현하기 (0) | 2023.01.12 |
---|---|
카프카 프로듀서와 컨슈머 docker shell 에서 테스트하기 (0) | 2023.01.05 |
카프카 클러스터 docker compose로 구축하기 (0) | 2023.01.05 |
카프카란 무엇인가? - 2 (0) | 2022.12.29 |
카프카란 무엇인가? - 1 (2) | 2022.12.21 |