728x90
반응형
ProductInfo
package com.example.demo.dto;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
@ToString
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@Getter
public class ProductInfo {
private String productName;
private Integer price;
public ProductInfo(String productName, Integer price) {
this.productName = productName;
this.price = price;
}
}
ProducerConfig
package com.example.demo.config;
import com.example.demo.dto.ProductInfo;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.producer.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ProducerFactory<String, ProductInfo> producerFactory() {
Map<String,Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory(configs);
}
@Bean
public KafkaTemplate<String, ProductInfo> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Producer
package com.example.demo.service;
import com.example.demo.dto.ProductInfo;
import lombok.RequiredArgsConstructor;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
@RequiredArgsConstructor
public class Producer {
private final KafkaTemplate<String, ProductInfo> kafkaTemplate;
private static final String TOPIC = "study";
public void sendMessage(ProductInfo productInfo) {
kafkaTemplate.send(TOPIC, productInfo);
}
}
ConsumerConfig
package com.example.demo.config;
import com.example.demo.dto.ProductInfo;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import java.util.HashMap;
import java.util.Map;
@Slf4j
@Configuration
public class KafkaConsumerConfig {
@Value("${spring.kafka.producer.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ConsumerFactory<String, ProductInfo> consumerFactory() {
Map<String,Object> configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
return new DefaultKafkaConsumerFactory<>(configs, new StringDeserializer(),
new JsonDeserializer<>(ProductInfo.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, ProductInfo> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, ProductInfo> factory
= new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
Consumer
package com.example.demo.service;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Slf4j
@Service
@RequiredArgsConstructor
public class Consumer {
@KafkaListener(topics = "study", groupId = "study-1")
public void processMessage(ConsumerRecord<String, Object> record) {
Object value = record.value();
log.info("Get group1 message!!! :" + value);
}
}
Controller
package com.example.demo.controller;
import com.example.demo.dto.ProductInfo;
import com.example.demo.service.Producer;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequiredArgsConstructor
@RequestMapping("message")
public class KafkaController {
private final Producer producer;
@GetMapping
public void sendMessage() {
ProductInfo productInfo = new ProductInfo("MacBookPro", 1000);
producer.sendMessage(productInfo);
}
}
결과
스트링 뿐만 아니라 오브젝트 Json 형식으로 받아 처리를 하였다.
전체코드
https://github.com/rnrl1215/kafka.git
728x90
반응형
'Message Queue > Kafka' 카테고리의 다른 글
[Kafka] DLQ(Dead Letter Queue) (0) | 2023.09.26 |
---|---|
[Kafka] docker 카프카 설치 (0) | 2023.08.18 |
[Kafka] 카프카 Error Handler 적용 (0) | 2022.12.23 |
[Kafka] 카프카 String 데이터 주고 받기 (0) | 2022.12.23 |
[kafka] EC2 에 kafka 설치 (0) | 2022.07.10 |
댓글