728x90
반응형
ErrorHandler
- 카프카 컨슈머에서 에러가 발생했을경우 어떻게 처리를 할까?
- consumer config 에 error handler를 적용해 주면 된다.
ErrorHandler
package com.example.demo.handler;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.MessageListenerContainer;
import java.util.Collection;
@Slf4j
@RequiredArgsConstructor
@Configuration
public class KafkaErrorHandler implements CommonErrorHandler {
// 다른 익셉션에 대해서 처리를 한다.
@Override
public void handleOtherException(Exception thrownException, Consumer<?, ?> consumer, MessageListenerContainer container, boolean batchListener) {
Collection<TopicPartition> assignedPartitions = container.getAssignedPartitions();
log.info("error");
ContainerProperties containerProperties = container.getContainerProperties();
consumer.seekToEnd(assignedPartitions);
consumer.assignment();
}
// 컨슈머에서 에러 발생시 처리를 한다.
@Override
public void handleRecord(
Exception thrownException,
ConsumerRecord<?, ?> record,
Consumer<?, ?> consumer,
MessageListenerContainer container) {
log.warn("Global error handler for message: {}", record.value().toString());
}
}
반응형
ConsumerConfig
package com.example.demo.config;
import com.example.demo.dto.ProductInfo;
import com.example.demo.handler.KafkaErrorHandler;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import java.util.HashMap;
import java.util.Map;
@Slf4j
@Configuration
@RequiredArgsConstructor
public class KafkaConsumerConfig {
@Value("${spring.kafka.producer.bootstrap-servers}")
private String bootstrapServers;
private final KafkaErrorHandler kafkaErrorHandler;
@Bean
public ConsumerFactory<String, ProductInfo> consumerFactory() {
Map<String,Object> configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
return new DefaultKafkaConsumerFactory<>(configs, new StringDeserializer(),
new JsonDeserializer<>(ProductInfo.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, ProductInfo> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, ProductInfo> factory
= new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setCommonErrorHandler(kafkaErrorHandler);
return factory;
}
}
Consumer 에서 익셉션 날리기
package com.example.demo.service;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Slf4j
@Service
@RequiredArgsConstructor
public class Consumer {
@KafkaListener(topics = "study", groupId = "study-1")
public void processMessage(ConsumerRecord<String, Object> record) {
Object value = record.value();
log.info("Get group1 message!!! :" + value);
throw new IllegalArgumentException();
}
}
결과
아래와 같이 에러를 잡아 주었다.
전체코드
https://github.com/rnrl1215/kafka.git
728x90
반응형
'Message Queue > Kafka' 카테고리의 다른 글
[Kafka] DLQ(Dead Letter Queue) (0) | 2023.09.26 |
---|---|
[Kafka] docker 카프카 설치 (0) | 2023.08.18 |
[Kafka] 카프카 Json 데이터 주고 받기 (0) | 2022.12.23 |
[Kafka] 카프카 String 데이터 주고 받기 (0) | 2022.12.23 |
[kafka] EC2 에 kafka 설치 (0) | 2022.07.10 |
댓글