728x90
반응형
카프카 재처리를 위한 에러 핸들러
1. CommonErrorHandler
- 에러 핸들러를 오버라이드 해서 처리하는 방법
2. DefaultErrorHandler
- 기본 에러 핸들러에 backoff 를 설정하여 처리하는 방법
차이점
CommonErrorHandler | DefaultErrorHandler (CommonErrorHandler 를 인터페이스 사용) |
Kafka Streams와 Kafka Connect에서의 공통 오류 처리 | Kafka Connect에서의 기본에러 처리 |
복잡한 오류 처리 로직을 커스터마이즈 가능 | 기본적인 예외 처리 (예: 로그 기록, 재시도 등) |
코드 구현
- 둘다 CommonErrorHandler 를 인터페이스로 가지고 있기 때문에 OCP 패턴을 만족한다.
- 그렇기 때문에 간단하게 @Primary 를 이용해서 쉽게 교체 가능하다.
1. DefaultErrorHandler
package com.p8labs.kafkadlq.handler;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.util.backoff.BackOff;
import org.springframework.util.backoff.FixedBackOff;
@RequiredArgsConstructor
@Configuration
public class KafkaDefaultErrorHandler implements CommonErrorHandler {
@Value("${spring.kafka.topics.product-like-dlq-topic}")
private String dlqTopic;
private final KafkaTemplate<String, Object> kafkaTemplate;
@Primary
@Bean
public CommonErrorHandler commonErrorHandler() {
BackOff backOff = new FixedBackOff(1000, 3); //1초 마다 재시도 최대 3번
return new DefaultErrorHandler(
(record, exception) -> {
kafkaTemplate.send(dlqTopic, record.key().toString(), record.value());
},
backOff // Backoff 정책 추가
);
}
}
2. CommonErrorHandler
package com.p8labs.kafkadlq.handler;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.MessageListenerContainer;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.TimeoutException;
@Slf4j
@RequiredArgsConstructor
@Configuration
public class kafkaCustomErrorHandler implements CommonErrorHandler {
@Value("${spring.kafka.topics.product-like-dlq-topic}")
private String dlqTopic;
private static final int MAX_RETRIES = 3; // 최대 재시도 횟수 설정
private final KafkaTemplate<String, Object> kafkaTemplate;
private final String RETRY_HEADER = "retryCount";
// 카프카 예외 외 다른 예외 처리
@Override
public void handleOtherException(
Exception thrownException,
Consumer<?, ?> consumer,
MessageListenerContainer container,
boolean batchListener) {
log.error(thrownException.getMessage(), thrownException);
try {
ConsumerRecords<?, ?> poll = consumer.poll(Duration.of(1, ChronoUnit.SECONDS));
// DLQ 로 보냄
for (ConsumerRecord<?, ?> consumerRecord : poll) {
sendToDLQ(consumerRecord);
}
} catch (Exception e) {
log.error(thrownException.getMessage(), thrownException);
}
}
@Override
public boolean handleOne(Exception thrownException,
ConsumerRecord<?, ?> record,
Consumer<?, ?> consumer,
MessageListenerContainer container) {
log.error(thrownException.getMessage(), thrownException);
Integer retryCount = getRetryCountFromHeaders(record);
retryCount++;
if (retryCount > MAX_RETRIES) {
sendToDLQ(record);
return true;
}
if (thrownException.getCause() instanceof TimeoutException) {
// 재시도 처리
ProducerRecord<String, Object> newRecord = new ProducerRecord<>(record.topic(), record.value());
newRecord.headers().add(RETRY_HEADER, String.valueOf(retryCount).getBytes());
kafkaTemplate.send(newRecord);
} else {
// DLQ 로 보냄
sendToDLQ(record);
}
return true;
}
// 헤더에서 재시도 횟수를 가져오는 메서드
private Integer getRetryCountFromHeaders(ConsumerRecord<?, ?> record) {
Header retryHeader = record.headers().lastHeader(RETRY_HEADER);
if (retryHeader == null) {
return 0; // 초기 재시도 횟수
}
return Integer.parseInt(new String(retryHeader.value()));
}
// 실패한 메시지를 DLQ로 보내는 메서드
private void sendToDLQ(ConsumerRecord<?, ?> record) {
String key = "";
if (record.key() != null) {
key = record.key().toString();
}
kafkaTemplate.send(dlqTopic, key, record.value());
}
}
Consumer
package com.p8labs.kafkadlq.service;
import com.p8labs.kafkadlq.dto.ProductEmotionDto;
import com.p8labs.kafkadlq.model.ProductEmotion;
import com.p8labs.kafkadlq.repository.ProductEmotionRepository;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.modelmapper.ModelMapper;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Repository;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeoutException;
@RequiredArgsConstructor
@Service
public class ProductLikeConsumerService {
private final ModelMapper modelMapper;
private final ProductEmotionRepository productEmotionRepository;
@KafkaListener(topics = "like-topic", groupId = "product_emotion_group")
public void listen(ConsumerRecord<String, Object> record) throws TimeoutException {
if (record.value() instanceof ProductEmotionDto) {
ProductEmotion productEmotion = modelMapper.map(record.value(), ProductEmotion.class);
productEmotionRepository.save(productEmotion);
}
}
}
728x90
반응형
'Message Queue > Kafka' 카테고리의 다른 글
[Kafka] 카프카 재처리 With DLQ1 - 기본구성 (2) | 2024.12.24 |
---|---|
[Kafka] Error Handling Patterns (0) | 2024.06.01 |
[Kafka] DLQ(Dead Letter Queue) (0) | 2023.09.26 |
[Kafka] docker 카프카 설치 (0) | 2023.08.18 |
[Kafka] 카프카 Error Handler 적용 (0) | 2022.12.23 |
댓글