Free Lines Arrow
본문 바로가기
Message Queue/Kafka

[Kafka] 카프카 Json 데이터 주고 받기

by skahn1215 2022. 12. 23.
728x90
반응형

ProductInfo

package com.example.demo.dto;

import lombok.AccessLevel;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;

@ToString
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@Getter
public class ProductInfo {
    private String productName;
    private Integer price;

    public ProductInfo(String productName, Integer price) {
        this.productName = productName;
        this.price = price;
    }
}

 

ProducerConfig

package com.example.demo.config;

import com.example.demo.dto.ProductInfo;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaProducerConfig {
    @Value("${spring.kafka.producer.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public ProducerFactory<String, ProductInfo> producerFactory() {
        Map<String,Object> configs = new HashMap<>();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory(configs);
    }

    @Bean
    public KafkaTemplate<String, ProductInfo> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

 

 

Producer

package com.example.demo.service;

import com.example.demo.dto.ProductInfo;
import lombok.RequiredArgsConstructor;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
@RequiredArgsConstructor
public class Producer {
    private final KafkaTemplate<String, ProductInfo> kafkaTemplate;

    private static final String TOPIC = "study";

    public void sendMessage(ProductInfo productInfo) {
        kafkaTemplate.send(TOPIC, productInfo);
    }
}

 

ConsumerConfig

package com.example.demo.config;

import com.example.demo.dto.ProductInfo;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.support.serializer.JsonDeserializer;

import java.util.HashMap;
import java.util.Map;

@Slf4j
@Configuration
public class KafkaConsumerConfig {
    @Value("${spring.kafka.producer.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public ConsumerFactory<String, ProductInfo> consumerFactory() {
        Map<String,Object> configs = new HashMap<>();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
       return new DefaultKafkaConsumerFactory<>(configs, new StringDeserializer(),
                new JsonDeserializer<>(ProductInfo.class));
    }


    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, ProductInfo> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, ProductInfo> factory
                = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

 

Consumer

package com.example.demo.service;


import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Slf4j
@Service
@RequiredArgsConstructor
public class Consumer {

    @KafkaListener(topics = "study", groupId = "study-1")
    public void processMessage(ConsumerRecord<String, Object> record) {
        Object value = record.value();
        log.info("Get group1 message!!! :" + value);
    }
}

 

Controller

package com.example.demo.controller;

import com.example.demo.dto.ProductInfo;
import com.example.demo.service.Producer;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequiredArgsConstructor
@RequestMapping("message")
public class KafkaController {
    private final Producer producer;

    @GetMapping
    public void sendMessage() {
        ProductInfo productInfo = new ProductInfo("MacBookPro", 1000);
        producer.sendMessage(productInfo);
    }
}

 

결과

스트링 뿐만 아니라 오브젝트 Json 형식으로 받아 처리를 하였다.

 

 

 

전체코드

https://github.com/rnrl1215/kafka.git

 

GitHub - rnrl1215/kafka: 카프카 연습

카프카 연습. Contribute to rnrl1215/kafka development by creating an account on GitHub.

github.com

 

728x90
반응형

'Message Queue > Kafka' 카테고리의 다른 글

[Kafka] DLQ(Dead Letter Queue)  (0) 2023.09.26
[Kafka] docker 카프카 설치  (0) 2023.08.18
[Kafka] 카프카 Error Handler 적용  (0) 2022.12.23
[Kafka] 카프카 String 데이터 주고 받기  (0) 2022.12.23
[kafka] EC2 에 kafka 설치  (0) 2022.07.10

댓글