Free Lines Arrow
본문 바로가기
Message Queue/Kafka

[Kafka] 카프카 Error Handler 적용

by skahn1215 2022. 12. 23.
728x90
반응형

ErrorHandler

  • 카프카 컨슈머에서 에러가 발생했을경우 어떻게 처리를 할까?
  • consumer config 에 error handler를 적용해 주면 된다.

 

ErrorHandler

package com.example.demo.handler;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.MessageListenerContainer;

import java.util.Collection;

@Slf4j
@RequiredArgsConstructor
@Configuration
public class KafkaErrorHandler implements CommonErrorHandler {


    // 다른 익셉션에 대해서 처리를 한다.
    @Override
    public void handleOtherException(Exception thrownException, Consumer<?, ?> consumer, MessageListenerContainer container, boolean batchListener) {
        Collection<TopicPartition> assignedPartitions = container.getAssignedPartitions();
        log.info("error");
        ContainerProperties containerProperties = container.getContainerProperties();
        consumer.seekToEnd(assignedPartitions);
        consumer.assignment();
    }

    // 컨슈머에서 에러 발생시 처리를 한다.
    @Override
    public void handleRecord(
            Exception thrownException,
            ConsumerRecord<?, ?> record,
            Consumer<?, ?> consumer,
            MessageListenerContainer container) {
        log.warn("Global error handler for message: {}", record.value().toString());
    }
}
반응형

ConsumerConfig

package com.example.demo.config;

import com.example.demo.dto.ProductInfo;
import com.example.demo.handler.KafkaErrorHandler;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.support.serializer.JsonDeserializer;

import java.util.HashMap;
import java.util.Map;

@Slf4j
@Configuration
@RequiredArgsConstructor
public class KafkaConsumerConfig {
    @Value("${spring.kafka.producer.bootstrap-servers}")
    private String bootstrapServers;

    private final KafkaErrorHandler kafkaErrorHandler;

    @Bean
    public ConsumerFactory<String, ProductInfo> consumerFactory() {
        Map<String,Object> configs = new HashMap<>();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
       return new DefaultKafkaConsumerFactory<>(configs, new StringDeserializer(),
                new JsonDeserializer<>(ProductInfo.class));
    }


    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, ProductInfo> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, ProductInfo> factory
                = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setCommonErrorHandler(kafkaErrorHandler);
        return factory;
    }
}
728x90

 

 

Consumer 에서 익셉션 날리기

package com.example.demo.service;


import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Slf4j
@Service
@RequiredArgsConstructor
public class Consumer {

    @KafkaListener(topics = "study", groupId = "study-1")
    public void processMessage(ConsumerRecord<String, Object> record) {
        Object value = record.value();
        log.info("Get group1 message!!! :" + value);
        throw new IllegalArgumentException();
    }
}

 

결과

아래와 같이 에러를 잡아 주었다.

 

 

 

전체코드

https://github.com/rnrl1215/kafka.git

 

GitHub - rnrl1215/kafka: 카프카 연습

카프카 연습. Contribute to rnrl1215/kafka development by creating an account on GitHub.

github.com

 

728x90
반응형

댓글