Free Lines Arrow
본문 바로가기
Message Queue/Kafka

[Kafka] DLQ(Dead Letter Queue)

by skahn1215 2023. 9. 26.
728x90
반응형

DLQ 란

  • DLQ(Dead Letter Queue)는 소프트웨어 시스템에서 오류로 인해 처리할 수 없는 메시지를 임시로 저장하는 큐
    (카프카에서는 단순 토픽이다.)

 

DLQ 를 사용해야 되는 이유

  • 통신 비용 절감
    • 일반 또는 표준 메시지 대기열은 보존 기간이 만료될 때까지 메시지를 계속 처리합니다. 이러한 방식을 통해 지속적인 메시지 처리를 보장하고 대기열이 차단될 가능성을 최소화할 수 있다.
    • 실패한 메시지가 만료될 때까지 해당 메시지 처리를 시도하는 대신, 몇 번의 처리 시도 후에 해당 메시지를 DLQ(Dead Letter Queue)로 이동하는 것이 좋다
  • 문제 해결 개선
    • 잘못된 메시지를 DLQ로 이동시키면 개발자가 오류의 원인을 식별하는데 훨씬 수월하다.
    • 수신자가 메시지를 처리할 수 없는 이유를 조사하고
    • 수정 사항을 적용한 후, 메시지를 전송하는 새로운 시도를 수행할 수 있다.

 

구현 방식

1 . 카프카 커넥트를 쓰는 방법

errors.tolerance = all
errors.deadletterqueue.topic.name=dlq

 

 

카프카 커넥트가 처리하는 방식

1. 카프카 커넥트에서 에러가 발생하는 경우

  • 아래처럼 에러가 발생한 경우 카프카 서버를 끄고 먼저 에러를 처리하고 재부팅을 해줘야 한다.

 

2. 카프카 커넥트에서 DLQ를 적용한 경우

  • 메시지가 실패한 경우 DLQ로 보낸다.
  • 더 이상 에러가 발생한 메시지를 즉시 해결할 필요는 없어진다. 

 

3. DLQ 재처리 방법

  • 에러가 발생한 메시지를 처리하는 커넥트를 하나더 만들어 준다.
  • DLQ 에서 메시지를 들고와 다시 처리 할 수 있다.
  • 아래 예제는 JSON 포맷을 처리하지 못하는 경우임.
    • JSON 을 처리하는 커넥터를 하나더 추가하여 sink 로 데이터를 보낼 수 있다.

 

 

728x90

 

2. Java 코드 처리 방식

  • Error handler를 구축하는 방법
  • Consumer 에서 실패시 DLQ 로 push 해준다.

 

@Slf4j
@Configuration
public class KafkaErrorHandler implements CommonErrorHandler {

    // 다른 익셉션에 대해서 처리를 한다.
    @Override
    public void handleOtherException(Exception thrownException, Consumer<?, ?> consumer, MessageListenerContainer container, boolean batchListener) {
        Collection<TopicPartition> assignedPartitions = container.getAssignedPartitions();
        log.info("error");
        // pub DLQ
    }

    @Override
    public void handleRecord(Exception thrownException, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer, MessageListenerContainer container) {
        // pub DLQ
    }
}

 

 

Java 예제

Error 발생 예제

package com.example.demo.service;


import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Slf4j
@Service
@RequiredArgsConstructor
public class Consumer {

    @KafkaListener(topics = "study", groupId = "study-1")
    public void processMessage(ConsumerRecord<String, Object> record) {
        Object value = record.value();
        log.info("Get group1 message!!! :" + value);
        throw new IllegalArgumentException();
    }
}

 

 

Error handler

package com.example.demo.config;

import com.example.demo.dto.ProductInfo;
import com.example.demo.handler.KafkaErrorHandler;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.support.serializer.JsonDeserializer;

import java.util.HashMap;
import java.util.Map;

@Slf4j
@Configuration
@RequiredArgsConstructor
public class KafkaConsumerConfig {
    @Value("${spring.kafka.producer.bootstrap-servers}")
    private String bootstrapServers;

    private final KafkaErrorHandler kafkaErrorHandler;

    @Bean
    public ConsumerFactory<String, ProductInfo> consumerFactory() {
        Map<String,Object> configs = new HashMap<>();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
       return new DefaultKafkaConsumerFactory<>(configs, new StringDeserializer(),
                new JsonDeserializer<>(ProductInfo.class));
    }


    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, ProductInfo> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, ProductInfo> factory
                = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setCommonErrorHandler(kafkaErrorHandler);
        return factory;
    }
}

 

반응형

 

DLQ Consumer

package com.example.demo.service;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Slf4j
@Service
@RequiredArgsConstructor
public class DlqConsumer {
    @KafkaListener(topics = "dlq", groupId = "study-1")
    public void processMessage(ConsumerRecord<String, Object> record) {
        Object value = record.value();
        log.info("Get DLQ message!!! :" + value);
    }
}

 

 

전체 코드

https://github.com/rnrl1215/kafka

 

GitHub - rnrl1215/kafka: 카프카 연습

카프카 연습. Contribute to rnrl1215/kafka development by creating an account on GitHub.

github.com

 

 

 

참고

https://developer.confluent.io/learn/

 

Articles about Apache Kafka

Confluent Developer - Apache Kafka articles

developer.confluent.io

 

728x90
반응형

댓글