728x90
반응형
카프카를 사용하여 스트링을 주고 받아 보자
yaml
spring:
kafka:
producer:
bootstrap-servers: localhost:9092
acks: all
listener:
ack-mode: MANUAL_IMMEDIATE
type: SINGLE
consumer:
bootstrap-servers: localhost:9092
ProducerConfig
package com.example.demo.config;
import com.example.demo.dto.MessageTemplate;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.producer.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String,Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory(configs);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Producer
package com.example.demo.service;
import com.example.demo.dto.MessageTemplate;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.common.protocol.types.Field;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
@RequiredArgsConstructor
public class Producer {
private final KafkaTemplate<String, String> kafkaTemplate;
private static final String TOPIC = "study";
public void sendMessage(String message) {
kafkaTemplate.send(TOPIC, message);
}
}
ConsumerConfig
package com.example.demo.config;
import com.example.demo.dto.MessageTemplate;
import com.example.demo.handler.KafkaErrorHandler;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import java.util.HashMap;
import java.util.Map;
@Slf4j
@Configuration
public class KafkaConsumerConfig {
@Value("${spring.kafka.producer.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String,Object> configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(configs);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory
= new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
Consumer
package com.example.demo.service;
import com.example.demo.dto.MessageTemplate;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Slf4j
@Service
@RequiredArgsConstructor
public class Consumer {
@KafkaListener(topics = "study", groupId = "study-1")
public void processMessage(ConsumerRecord<String, String> record) {
Object value = record.value();
log.info("Get message!!! :" + value);
}
}
Controller
- 간단하게 Test 를 위해 GetMapping 으로 해두었다.
package com.example.demo.controller;
import com.example.demo.service.Producer;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequiredArgsConstructor
@RequestMapping("message")
public class KafkaController {
private final Producer producer;
@GetMapping
public void sendMessage() {
producer.sendMessage("TEST");
}
}
결과
- localhost:8080 으로 접속한다.
- Controller 에서 Producer.send 를 호출하여 메시지를 발송한다.
- Consumer 에서 보낸 레코드를 가져온다.
전체코드
https://github.com/rnrl1215/kafka.git
728x90
반응형
'Message Queue > Kafka' 카테고리의 다른 글
[Kafka] DLQ(Dead Letter Queue) (0) | 2023.09.26 |
---|---|
[Kafka] docker 카프카 설치 (0) | 2023.08.18 |
[Kafka] 카프카 Error Handler 적용 (0) | 2022.12.23 |
[Kafka] 카프카 Json 데이터 주고 받기 (0) | 2022.12.23 |
[kafka] EC2 에 kafka 설치 (0) | 2022.07.10 |
댓글