728x90
반응형
DLQ 란
- DLQ(Dead Letter Queue)는 소프트웨어 시스템에서 오류로 인해 처리할 수 없는 메시지를 임시로 저장하는 큐
(카프카에서는 단순 토픽이다.)
DLQ 를 사용해야 되는 이유
- 통신 비용 절감
- 일반 또는 표준 메시지 대기열은 보존 기간이 만료될 때까지 메시지를 계속 처리합니다. 이러한 방식을 통해 지속적인 메시지 처리를 보장하고 대기열이 차단될 가능성을 최소화할 수 있다.
- 실패한 메시지가 만료될 때까지 해당 메시지 처리를 시도하는 대신, 몇 번의 처리 시도 후에 해당 메시지를 DLQ(Dead Letter Queue)로 이동하는 것이 좋다
- 문제 해결 개선
- 잘못된 메시지를 DLQ로 이동시키면 개발자가 오류의 원인을 식별하는데 훨씬 수월하다.
- 수신자가 메시지를 처리할 수 없는 이유를 조사하고
- 수정 사항을 적용한 후, 메시지를 전송하는 새로운 시도를 수행할 수 있다.
구현 방식
1 . 카프카 커넥트를 쓰는 방법
- https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues/
- 단 어플리케이션 을 통해 데이터를 싱크 하는 부분은 아직 못찾음.
- 보통 stroage 끼리 사용할때 쓰는듯함.
- /distributed-connect 파일에 아래처럼 기입해주면된다.
errors.tolerance = all
errors.deadletterqueue.topic.name=dlq
카프카 커넥트가 처리하는 방식
1. 카프카 커넥트에서 에러가 발생하는 경우
- 아래처럼 에러가 발생한 경우 카프카 서버를 끄고 먼저 에러를 처리하고 재부팅을 해줘야 한다.
2. 카프카 커넥트에서 DLQ를 적용한 경우
- 메시지가 실패한 경우 DLQ로 보낸다.
- 더 이상 에러가 발생한 메시지를 즉시 해결할 필요는 없어진다.
3. DLQ 재처리 방법
- 에러가 발생한 메시지를 처리하는 커넥트를 하나더 만들어 준다.
- DLQ 에서 메시지를 들고와 다시 처리 할 수 있다.
- 아래 예제는 JSON 포맷을 처리하지 못하는 경우임.
- JSON 을 처리하는 커넥터를 하나더 추가하여 sink 로 데이터를 보낼 수 있다.
2. Java 코드 처리 방식
- Error handler를 구축하는 방법
- Consumer 에서 실패시 DLQ 로 push 해준다.
@Slf4j
@Configuration
public class KafkaErrorHandler implements CommonErrorHandler {
// 다른 익셉션에 대해서 처리를 한다.
@Override
public void handleOtherException(Exception thrownException, Consumer<?, ?> consumer, MessageListenerContainer container, boolean batchListener) {
Collection<TopicPartition> assignedPartitions = container.getAssignedPartitions();
log.info("error");
// pub DLQ
}
@Override
public void handleRecord(Exception thrownException, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer, MessageListenerContainer container) {
// pub DLQ
}
}
Java 예제
Error 발생 예제
package com.example.demo.service;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Slf4j
@Service
@RequiredArgsConstructor
public class Consumer {
@KafkaListener(topics = "study", groupId = "study-1")
public void processMessage(ConsumerRecord<String, Object> record) {
Object value = record.value();
log.info("Get group1 message!!! :" + value);
throw new IllegalArgumentException();
}
}
Error handler
package com.example.demo.config;
import com.example.demo.dto.ProductInfo;
import com.example.demo.handler.KafkaErrorHandler;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import java.util.HashMap;
import java.util.Map;
@Slf4j
@Configuration
@RequiredArgsConstructor
public class KafkaConsumerConfig {
@Value("${spring.kafka.producer.bootstrap-servers}")
private String bootstrapServers;
private final KafkaErrorHandler kafkaErrorHandler;
@Bean
public ConsumerFactory<String, ProductInfo> consumerFactory() {
Map<String,Object> configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
return new DefaultKafkaConsumerFactory<>(configs, new StringDeserializer(),
new JsonDeserializer<>(ProductInfo.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, ProductInfo> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, ProductInfo> factory
= new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setCommonErrorHandler(kafkaErrorHandler);
return factory;
}
}
반응형
DLQ Consumer
package com.example.demo.service;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Slf4j
@Service
@RequiredArgsConstructor
public class DlqConsumer {
@KafkaListener(topics = "dlq", groupId = "study-1")
public void processMessage(ConsumerRecord<String, Object> record) {
Object value = record.value();
log.info("Get DLQ message!!! :" + value);
}
}
전체 코드
https://github.com/rnrl1215/kafka
GitHub - rnrl1215/kafka: 카프카 연습
카프카 연습. Contribute to rnrl1215/kafka development by creating an account on GitHub.
github.com
참고
https://developer.confluent.io/learn/
Articles about Apache Kafka
Confluent Developer - Apache Kafka articles
developer.confluent.io
728x90
반응형
'Message Queue > Kafka' 카테고리의 다른 글
[Kafka] 카프카 재처리 With DLQ1 - 기본구성 (2) | 2024.12.24 |
---|---|
[Kafka] Error Handling Patterns (0) | 2024.06.01 |
[Kafka] docker 카프카 설치 (0) | 2023.08.18 |
[Kafka] 카프카 Error Handler 적용 (0) | 2022.12.23 |
[Kafka] 카프카 Json 데이터 주고 받기 (0) | 2022.12.23 |
댓글