Kafka와 Stomp를 활용한 비동기 작업 처리
이전에 Stomp와 Kafka에 대해서 알아보았고, 이번에는 해당 내용을 바탕으로 실시간 작업 처리기를 만들어볼 예정이다.
Stomp와 Kafka에 대한 내용은 아래 글에서 확인할 수 있다.
[Spring Boot] 실시간 비동기 작업 처리기 만들기 - 기초 개념
요구사항 분석이번에 실시간으로 사용자 입력을 받아서 이를 AI 서버에 전달하여 실시간으로 사용자에게 응답하는 기능을 개발하게 되었다. 따라서 아래와 같은 요소들을 고려해야할 조건으로
kym8821.tistory.com
전체적인 시스템 흐름
서비스의 전체적인 흐름은 기존에 사용했던 흐름을 따른다.

정리하면 아래와 같다.
- /app 경로에 대한 메세지는 메세지 컨트롤러로 라우팅되어 전처리되고, 메세지 브로커로 produce된다.
- 사용자는 /topic 경로에 구독하여 해당 경로의 메세지를 consume할 수 있다.
- StompBrokerRelay는 외부 메세지 브로커(Kafka)와 중간다리 역할을 한다.
이젠, 해당 내용을 기반으로 구현을 진행해보자.
Stomp로 메세지 브로커에 실시간으로 메세지 publish하기
비동기 작업 처리기에서 Stomp는 실시간으로 메세지를 전달받아서 메세지 브로커로 publish하는 기능을 담당한다.
이를 구현하기 위해서 Stomp와 브로커에 관련된 설정과 메세지 컨트롤러를 구현해야 한다.
Stomp와 브로커 경로 설정하기
우선 아래와 같이 Stomp에 대한 설정정보를 작성한다.
@Configuration
@EnableWebSocketMessageBroker
public class SocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws/audio")
.setAllowedOriginPatterns("*")
.withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableSimpleBroker("/queue", "/topic");
registry.setApplicationDestinationPrefixes("/app");
}
}
코드를 하나씩 살펴보면 아래와 같이 정리할 수 있다.
- registerSompEndpoints : /ws/audio에 대해서 Stomp 연결이 가능하도록 설정
- configureMessageBroker : 메세지 브로커에 대한 설정 진행
- enableSimpleBroker : /queue, /topic 경로를 브로커가 관리하여 사용자가 해당 경로를 구독할 수 있도록 함
- setApplicationDestinationPrefixes : 메세지 컨트롤러로 라우팅되는 경로의 접두사( /app )를 지정함
메세지 컨트롤러 구성하기
다음으로 /app에 대한 메세지 매핑에 사용하는 메세지 컨트롤러를 구현한다.
여기서 메세지 컨트롤러는 클라이언트가 전달한 메세지에 대한 컨트롤러 메서드를 의미한다.
@Slf4j
@RestController
public class AudioController {
private final CustomProvider provider;
public AudioController(CustomProvider provider) {
this.provider = provider;
}
@MessageMapping("/audio/forwarding")
public void sendAudio(AudioMessageDto audio){
log.info("Sending audio message: {}", audio.toString());
provider.produceAudioEvent(audio);
}
}
위 코드에서 CustomProvider는 메세지를 kafka의 특정 토픽으로 produce하는 역할을 한다.
이 부분은 이후 kafka 설정에서 보다 자세히 다루어볼 예정이다.
위와 같이 구현했을 때, 메세지 컨트롤러의 동작 시나리오는 아래와 같다.
- 사용자가 소켓 연결 후 /app/audio/forwarding으로 메세지 전달
- 해당 메세지는 위 메세지 컨트롤러로 매핑되고 로깅 등의 추가 처리 진행
- 추가 처리 후, 해당 메세지를 메세지 브로커로 발행
이제 우리는 추후 kafka 설정을 통해 메세지를 consume하고 이에 대한 추가 처리 및 사용자에게 전달할 것이다.
Kafka로 메세지를 consume하여 클라이언트에게 전달하기
다음으로 Kafka에 publish된 메세지를 consume하여 사용자에게 전달하는 기능을 구현한다.
이 때, 우리는 사용자가 /topic/audio를 구독했다고 가정한다.
해당 경로를 구독했기에 우리가 consume한 메세지를 처리 후 /topic/audio 경로의 브로커로 전달하면, 클라이언트는 전달된 메세지를 실시간으로 consume할 수 있을 것이다.
이를 위해서 우선 Kafka 환경설정과 메세지 발행 및 소비하는 기능을 구현한다.
Kafka 환경설정하기
우선 기초적인 설정 정보는 아래와 같이 application.yml으로 통합 관리하기로 했다.
spring:
application:
name: audio-service
profiles:
active: dev, webclient
kafka:
bootstrap-servers: http://localhost:29092
consumer:
group-id: audio
properties:
spring:
json:
trusted.packages: "*"
type.mapping: "com."
위 설정에서 주목해야하는 점은 크게 두 가지이다.
- kafka.bootstrap-servers : 카프카의 주소를 지정한다. 이를 통해서 카프카의 주소를 지정하고 다른 객체에서 이를 주입받을 수 있도록 한다.
- kafka.consumer.group-id : 카프카는 동일 토픽의 동일 그룹에 대해서 메세지 처리 순서를 보장한다. 따라서 사용할 그룹을 명시하여 추후 환경 설정을 통해 메세지 처리 순서를 보장하도록 한다.
다음으로 Kafka의 토픽 관리를 위해 KafkaAdmin에 대한 환경 설정을 진행했다.
@Configuration
public class KafkaConfig {
public final String bootstrapServer;
public KafkaConfig(@Value("${spring.kafka.bootstrap-servers}") String bootstrapServer){
this.bootstrapServer = bootstrapServer;
}
@Bean
public KafkaAdmin kafkaAdmin(){
Map<String, Object> config = new HashMap<>();
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
return new KafkaAdmin(config);
}
@Bean
public NewTopic audioTopic(){
return TopicBuilder.name("audio").build();
}
}
각 빈에 대해서 좀더 자세하게 알아보자.
- KafakAdmin : 카프카의 토픽들을 관리하는 객체
- 현재 bootstrap server( kafka 주소 )를 설정정보로 등록했다.
- audioTopic : kafka에 audio라는 이름의 토픽을 생성한다.
다음으로 데이터를 특정 토픽으로 전달해주는 KafkaProducer에 대한 설정을 진행했다.
@Configuration
public class KafkaProducerConfig{
private final KafkaConfig kafkaConfig;
public KafkaProducerConfig(KafkaConfig kafkaConfig){
this.kafkaConfig = kafkaConfig;
}
@Bean
public ProducerFactory<String, AudioMessageDto> producerFactory(){
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.bootstrapServer);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, AudioMessageDto> kafkaTemplate(){
return new KafkaTemplate<>(producerFactory());
}
}
마찬가지로 각 빈에 대해서 좀더 자세히 살펴보자.
- producerFactory : 메세지를 생산 및 발행하는 kafka producer를 생성하기 위한 빈이다.
- kafkaAdmin와 마찬가지로 kafka의 주소를 등록했다
- 우리는 AudioMessageDto 객체를 kafka에서 관리하므로 value serializer에 JsonSerializer를 설정했다. 이를 통해 객체를 json 문자열로 변환하여 kafka에 전송할 수 있도록 했다.
- KafkaTemplate : kafka producer를 감싸고 있는 객체이다.
- kafka template을 사용하면 kafka producer를 보다 쉽게 사용할 수 있다.
마지막으로 특정 토픽의 메세지를 consume하는 Kafka Consumer에 대한 설정을 진행한다.
@Configuration
public class KafkaConsumerConfig {
private final KafkaConfig kafkaConfig;
private final String groupId;
public KafkaConsumerConfig(KafkaConfig kafkaConfig, @Value("${spring.kafka.consumer.group-id}") String groupId){
this.groupId = groupId;
this.kafkaConfig = kafkaConfig;
}
public JsonDeserializer<AudioMessageDto> getAudioMessageDeserializer() {
JsonDeserializer<AudioMessageDto> deserializer = new JsonDeserializer<>(AudioMessageDto.class, new ObjectMapper());
deserializer.addTrustedPackages("com.capstone.dto");
deserializer.setUseTypeMapperForKey(true);
return deserializer;
}
@Bean
public ConsumerFactory<String, AudioMessageDto> consumerFactory(){
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.bootstrapServer);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), getAudioMessageDeserializer());
}
@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String, AudioMessageDto> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
마찬가지로 각 빈의 역할을 위주로 좀더 자세히 알아보자.
- consumerFactory : 메세지를 consume하는 kafka consumer를 생성하기 위한 빈이다.
- 이번에는 group id를 설정 정보에 등록했고 추가로 직접 JsonDeseializer를 생성하여 등록한다.
- getAudioMessageDeserializer : 기존에 발생하던 Class is not compatible with JavaType 에러를 수정하기 위해 역직렬화할 타입과 패키지를 명시한 커스텀 JsonDeserializer를 생성한다.
- kafkaListenerContainerFactory : @KafkaListenr를 생성 및 관리하는 빈을 생성한다.
- @KafkaListener는 특정 토픽의 특정 그룹에 대해서 메세지를 consume할 수 있는 어노테이션이다.
Custom Consumer와 Provider 생성하기
이를 통해서 전반적인 kafka 설정을 마무리했다. 다음에는 CustomConsumer와 CustomProvider를 통해 메세지를 consume, produce할 수 있는 객체를 생성한다.
CustomProvider 객체는 KafkaTemplate을 활용하여 특정 토픽으로 메세지를 produce하는 역할을 한다.
@Slf4j
@Service
public class CustomProvider {
private final KafkaTemplate<String, AudioMessageDto> kafkaTemplate;
public CustomProvider(KafkaTemplate<String, AudioMessageDto> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public boolean produceAudioEvent(AudioMessageDto audioMessageDto) {
try {
CompletableFuture<SendResult<String, AudioMessageDto>> future = kafkaTemplate.send("audio", audioMessageDto);
String result = future.handle((res, error) -> {
if(error != null) {
log.error(error.getMessage());
return null;
}
log.info(audioMessageDto.toString());
return audioMessageDto.toString();
}).get();
return result != null;
}catch (InterruptedException | ExecutionException e) {
log.error(e.getMessage());
return false;
}
}
}
해당 객체의 기능을 정리해보자.
- topic 관리와 kafkaTemplate을 사용한 메세지 전송을 분리함
- 메세지 전송 결과와 메세지를 로깅하는 역할을 함
- 메세지 전송 결과를 함수를 실행한 대상에게 전달함
다음으로 @KafkaListener로 메세지를 consume하는 객체를 생성한다.
@Service
public class CustomConsumer {
private final SimpMessagingTemplate messagingTemplate;
private final AudioModelClient audioModelClient;
public CustomConsumer(SimpMessagingTemplate messagingTemplate, AudioModelClient audioModelClient) {
this.messagingTemplate = messagingTemplate;
this.audioModelClient = audioModelClient;
}
@KafkaListener(topics = "audio", groupId = "${spring.kafka.consumer.group-id}")
public void sendAudioConversionResult(@Payload final AudioMessageDto audioMessageDto) {
audioModelClient.testUserExists(audioMessageDto.getMessage())
.switchIfEmpty(Mono.just(new UserResponseDto()))
.doOnNext(res -> {
if(res.getEmail()==null) {
audioMessageDto.setMessage("no such user");
messagingTemplate.convertAndSend("/topic/audio", audioMessageDto);
}
else messagingTemplate.convertAndSend("/topic/audio", audioMessageDto);
})
.subscribe();
}
}
해당 객체의 기능은 아래와 같이 정리할 수 있다.
- @KafkaListener를 활용하여 audio 토픽의 특정 그룹의 메세지를 consume
- WebClient 객체를 사용하여 AI 서버로의 요청을 비동기적으로 처리 가능
- SimpMessagingTemplate의 convertAndSend를 통해 브로커의 /topic/audio로 메세지 전달 가능.
- 해당 경로를 구독하는 구독자가 실시간으로 메세지를 읽어올 수 있음.
Docker를 사용하여 Kafka와 Zookeeper 실행하기
마지막으로 docker-compose를 활용하여 kafka와 zookeeper를 컨테이너로 관리할 수 있도록 했다.
# compose 파일 버전
version: '3'
services:
# 서비스 명
zookeeper:
# 사용할 이미지
image: wurstmeister/zookeeper
# 컨테이너명 설정
container_name: zookeeper
# 접근 포트 설정 (컨테이너 외부:컨테이너 내부)
ports:
- "2181:2181"
# 서비스 명
kafka:
# 사용할 이미지
image: wurstmeister/kafka
# 컨테이너명 설정
container_name: kafka
# 접근 포트 설정 (컨테이너 외부:컨테이너 내부)
ports:
- "9092:9092"
- "29092:29092"
# 환경 변수 설정
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ADVERTISED_LISTENERS: INSIDE://:9092,OUTSIDE://localhost:29092
KAFKA_CREATE_TOPICS: "Topic:1:1"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:29092
# 볼륨 설정
volumes:
- /var/run/docker.sock
# 의존 관계 설정
depends_on:
- zookeeper
이를 통해 테스트/배포 환경에서 kafka 브로커를 쉽게 관리할 수 있도록 했다.
현재 kafka 브로커는 한개 동작하도록 했고, zookeeper도 한개 동작하도록 했다.
kafka의 외부 접근 포트와 내부 접근 포트를 구분했다.
- 외부 접근 포트 : 외부의 서버가 kafka에 메세지를 produce/consume할 수 있는 포트
- 내부 접근 포트 : kafka와 동일 도커 네트워크에 있는 컨테이너가 메세지를 produce/consume할 수 있는 포트
추가로 kafka에 저장되는 데이터가 소실되지 않도록 volume을 지정했고 depends_on으로 컨테이너 실행 순서를 조정했다.
마무리
Kafka와 Stomp를 활용한 비동기 처리 장치는 매우 복잡한 구조로 구성되어 있었다.
이를 모두 이해하는 것은 어려웠고 각 빈과 객체의 기능을 기반으로 대략적으로 이해하는 방식을 채택했다.
kafka는 매우 강력한 기능을 제공하지만, zookeeper를 추가로 사용해야 하고 kafka 자체도 상당히 무겁기 때문에 사용에 큰 고민이 있었다. 메세지 처리 순서 보장 등의 강력한 기능에 이점이 있었기에 kafka를 사용했지만, RabbitMQ와 같은 메세지 브로커에서 해당 기능을 구현하는 방법을 추가로 고려하지 못한 점은 아쉬움이 남는다.
'Spring > 기초 개념' 카테고리의 다른 글
| [Spring Boot] Spring AOP Self Invocation과 @Transactional (0) | 2025.04.07 |
|---|---|
| Spring Data JPA의 영속성 컨텍스트 (0) | 2025.04.02 |
| [Spring Boot] 실시간 비동기 작업 처리기 만들기 - 기초 개념 (0) | 2025.03.19 |
| SpringBoot에서 Mockito로 소셜 로그인 단위 테스트 (0) | 2025.02.14 |
| Access Token와 Refresh Token을 활용한 인증과 인가 (0) | 2025.02.07 |