Free Lines Arrow
본문 바로가기
Message Queue/Kafka

[Kafka] 카프카 재처리 With DLQ2 - 코드

by skahn1215 2025. 1. 11.
728x90
반응형

카프카 재처리를 위한 에러 핸들러

1. CommonErrorHandler

- 에러 핸들러를 오버라이드 해서 처리하는 방법

2. DefaultErrorHandler

- 기본 에러 핸들러에 backoff 를 설정하여 처리하는 방법

 

차이점

CommonErrorHandler DefaultErrorHandler
(CommonErrorHandler 를 인터페이스 사용)
Kafka Streams와 Kafka Connect에서의 공통 오류 처리 Kafka Connect에서의 기본에러 처리
복잡한 오류 처리 로직을 커스터마이즈 가능 기본적인 예외 처리 (예: 로그 기록, 재시도 등)

 

코드 구현

- 둘다 CommonErrorHandler 를 인터페이스로 가지고 있기 때문에 OCP 패턴을 만족한다.
- 그렇기 때문에 간단하게 @Primary 를 이용해서 쉽게 교체 가능하다. 

1. DefaultErrorHandler

package com.p8labs.kafkadlq.handler;

import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.util.backoff.BackOff;
import org.springframework.util.backoff.FixedBackOff;

@RequiredArgsConstructor
@Configuration
public class KafkaDefaultErrorHandler implements CommonErrorHandler {

    @Value("${spring.kafka.topics.product-like-dlq-topic}")
    private String dlqTopic;

    private final KafkaTemplate<String, Object> kafkaTemplate;

    @Primary
    @Bean
    public CommonErrorHandler commonErrorHandler() {

        BackOff backOff = new FixedBackOff(1000, 3); //1초 마다 재시도 최대 3번
        return new DefaultErrorHandler(
                (record, exception) -> {
                    kafkaTemplate.send(dlqTopic, record.key().toString(), record.value());
                },
                backOff // Backoff 정책 추가
        );
    }
}

 

 

2. CommonErrorHandler

package com.p8labs.kafkadlq.handler;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.MessageListenerContainer;

import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.TimeoutException;

@Slf4j
@RequiredArgsConstructor
@Configuration
public class kafkaCustomErrorHandler implements CommonErrorHandler {

    @Value("${spring.kafka.topics.product-like-dlq-topic}")
    private String dlqTopic;

    private static final int MAX_RETRIES = 3; // 최대 재시도 횟수 설정

    private final KafkaTemplate<String, Object> kafkaTemplate;

    private final String RETRY_HEADER = "retryCount";


    // 카프카 예외 외 다른 예외 처리
    @Override
    public void handleOtherException(
            Exception thrownException,
            Consumer<?, ?> consumer,
            MessageListenerContainer container,
            boolean batchListener) {

        log.error(thrownException.getMessage(), thrownException);

        try {
            ConsumerRecords<?, ?> poll = consumer.poll(Duration.of(1, ChronoUnit.SECONDS));
            // DLQ 로 보냄
            for (ConsumerRecord<?, ?> consumerRecord : poll) {
                sendToDLQ(consumerRecord);
            }
        } catch (Exception e) {
            log.error(thrownException.getMessage(), thrownException);
        }
    }


    @Override
    public boolean handleOne(Exception thrownException,
                             ConsumerRecord<?, ?> record,
                             Consumer<?, ?> consumer,
                             MessageListenerContainer container) {

        log.error(thrownException.getMessage(), thrownException);


        Integer retryCount = getRetryCountFromHeaders(record);
        retryCount++;

        if (retryCount > MAX_RETRIES) {
            sendToDLQ(record);
            return true;
        }

        if (thrownException.getCause() instanceof TimeoutException) {
            // 재시도 처리
            ProducerRecord<String, Object> newRecord = new ProducerRecord<>(record.topic(), record.value());
            newRecord.headers().add(RETRY_HEADER, String.valueOf(retryCount).getBytes());
            kafkaTemplate.send(newRecord);
        } else {
            // DLQ 로 보냄
            sendToDLQ(record);
        }

        return true;
    }

    // 헤더에서 재시도 횟수를 가져오는 메서드
    private Integer getRetryCountFromHeaders(ConsumerRecord<?, ?> record) {

        Header retryHeader = record.headers().lastHeader(RETRY_HEADER);
        if (retryHeader == null) {
            return 0;  // 초기 재시도 횟수
        }
        return Integer.parseInt(new String(retryHeader.value()));
    }


    // 실패한 메시지를 DLQ로 보내는 메서드
    private void sendToDLQ(ConsumerRecord<?, ?> record) {
        String key = "";
        if (record.key() != null) {
            key = record.key().toString();
        }
        kafkaTemplate.send(dlqTopic, key, record.value());
    }
}

 

 

Consumer 

package com.p8labs.kafkadlq.service;

import com.p8labs.kafkadlq.dto.ProductEmotionDto;
import com.p8labs.kafkadlq.model.ProductEmotion;
import com.p8labs.kafkadlq.repository.ProductEmotionRepository;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.modelmapper.ModelMapper;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Repository;
import org.springframework.stereotype.Service;

import java.util.concurrent.TimeoutException;

@RequiredArgsConstructor
@Service
public class ProductLikeConsumerService {

    private final ModelMapper modelMapper;

    private final ProductEmotionRepository productEmotionRepository;

    @KafkaListener(topics = "like-topic", groupId = "product_emotion_group")
    public void listen(ConsumerRecord<String, Object> record) throws TimeoutException {
        if (record.value() instanceof ProductEmotionDto) {
            ProductEmotion productEmotion = modelMapper.map(record.value(), ProductEmotion.class);
            productEmotionRepository.save(productEmotion);
        }
    }
}

 

728x90
반응형

댓글