Free Lines Arrow
본문 바로가기
Message Queue/Kafka

[Kafka] 카프카 String 데이터 주고 받기

by skahn1215 2022. 12. 23.
728x90
반응형

카프카를 사용하여 스트링을 주고 받아 보자

 

yaml 

spring:
  kafka:
    producer:
      bootstrap-servers: localhost:9092
      acks: all
    listener:
      ack-mode: MANUAL_IMMEDIATE
      type: SINGLE
    consumer:
      bootstrap-servers: localhost:9092

 

ProducerConfig

package com.example.demo.config;

import com.example.demo.dto.MessageTemplate;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaProducerConfig {
    @Value("${spring.kafka.producer.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String,Object> configs = new HashMap<>();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory(configs);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

 

Producer

package com.example.demo.service;

import com.example.demo.dto.MessageTemplate;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.common.protocol.types.Field;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
@RequiredArgsConstructor
public class Producer {
    private final KafkaTemplate<String, String> kafkaTemplate;

    private static final String TOPIC = "study";

    public void sendMessage(String message) {
        kafkaTemplate.send(TOPIC, message);
    }
}

 

ConsumerConfig

package com.example.demo.config;

import com.example.demo.dto.MessageTemplate;
import com.example.demo.handler.KafkaErrorHandler;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.support.serializer.JsonDeserializer;

import java.util.HashMap;
import java.util.Map;

@Slf4j
@Configuration
public class KafkaConsumerConfig {
    @Value("${spring.kafka.producer.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String,Object> configs = new HashMap<>();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(configs);
    }


    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory
                = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

 

Consumer

package com.example.demo.service;


import com.example.demo.dto.MessageTemplate;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Slf4j
@Service
@RequiredArgsConstructor
public class Consumer {

    @KafkaListener(topics = "study", groupId = "study-1")
    public void processMessage(ConsumerRecord<String, String> record) {
        Object value = record.value();
        log.info("Get message!!! :" + value);
    }
}

 

Controller

  • 간단하게 Test 를 위해 GetMapping 으로 해두었다.
package com.example.demo.controller;

import com.example.demo.service.Producer;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequiredArgsConstructor
@RequestMapping("message")
public class KafkaController {
    private final Producer producer;

    @GetMapping
    public void sendMessage() {
        producer.sendMessage("TEST");
    }
}

 

 

결과

  • localhost:8080 으로 접속한다.
  • Controller 에서 Producer.send 를 호출하여 메시지를 발송한다.
  • Consumer 에서 보낸 레코드를 가져온다.

 

 

전체코드

https://github.com/rnrl1215/kafka.git

 

GitHub - rnrl1215/kafka: 카프카 연습

카프카 연습. Contribute to rnrl1215/kafka development by creating an account on GitHub.

github.com

 

728x90
반응형

'Message Queue > Kafka' 카테고리의 다른 글

[Kafka] DLQ(Dead Letter Queue)  (0) 2023.09.26
[Kafka] docker 카프카 설치  (0) 2023.08.18
[Kafka] 카프카 Error Handler 적용  (0) 2022.12.23
[Kafka] 카프카 Json 데이터 주고 받기  (0) 2022.12.23
[kafka] EC2 에 kafka 설치  (0) 2022.07.10

댓글