Free Lines Arrow
본문 바로가기
Message Queue/RabbitMQ

[RbbitMQ] Delay RabbitMQ Spring Publish

by skahn1215 2022. 9. 24.
728x90
반응형

Delay RabbitMQ Spring 연동

  • 스프링 부트를 사용해서 직접 레빗 엠큐에 메세지를 쏴보자
  • 실무에서는 직접 래빗엠큐 정보를 다루는게 편하다
  • 그래서 다른 글과는 다르게 ConnectionFactory 를 이용한다.
    - 사용자 아이디, 주소등 쉽게 다룰수 있다.

 

주의사항

  • 코드를 작성할때 임포트를 맞게 제대로 해줘야 한다.
  • import 를 제대로 확인해보자

 

Gradle 에 라이브러리 추가

implementation 'org.springframework.boot:spring-boot-starter-amqp'
testImplementation 'org.springframework.amqp:spring-rabbit-test'

 

 

application.yml 파일 설정

yml 파일 위치

yml 파일 내용

rabbitmq:
  delay:
    host: localhost // rabbitmq 주소
    port: 5672 //rabbit mq 통신 포트는 5672 를 사용한다.
    exchange-name: delayed.message 
    queue-name: message
    username: guest
    password: guest

 

ClassDiagram

 

MessageQueueConfiguration

  • 메세지큐 인터 페이스
package com.example.rabbitmq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;

public interface MessageQueueConfiguration {
    public Queue queue();
    public CustomExchange customExchange();
    public Binding binding(Queue queue, CustomExchange customExchange);
    public ConnectionFactory connectionFactory();
}

 

 

DelayRabbitMQConfigurationImpl

package com.example.rabbitmq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class DelayRabbitMQConfigurationImpl implements MessageQueueConfiguration{

    @Value("${rabbitmq.delay.host}")
    private String host;

    @Value("${rabbitmq.delay.port}")
    private Integer port;

    @Value("${rabbitmq.delay.username}")
    private String username;

    @Value("${rabbitmq.delay.password}")
    private String password;

    @Value("${rabbitmq.delay.queue-name}")
    private String queueName;

    @Value("${rabbitmq.delay.exchange-name}")
    private String exchangeName;


    @Bean
    @Override
    public Queue queue() {
        return new Queue(queueName);
    }


    @Bean
    @Override
    public CustomExchange customExchange() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delay-type", "direct");
        return new CustomExchange(exchangeName, "x-delayed-message", true, false, args);
    }

    @Bean
    @Override
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setHost(host);
        factory.setUsername(username);
        factory.setPassword(password);
        factory.setPort(port);
        return factory;
    }

    @Bean
    @Override
    public Binding binding(Queue queue, CustomExchange customExchange) {
        return null;
    }

    @Bean
    MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean(name = "rabbitMQTemplate")
    RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, MessageConverter messageConverter) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(messageConverter);
        return rabbitTemplate;
    }
}

 

DelayRabbitMQService

package com.example.rabbitmq.service;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

@Slf4j
@Service
public class DelayRabbitMQService {

    private final AmqpTemplate rabbitTemplate;

    public DelayRabbitMQService(@Qualifier("rabbitMQTemplate") AmqpTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    @Value("${rabbitmq.delay.routing-key}")
    private String routingKey;

    @Value("${rabbitmq.delay.exchange-name}")
    private String exchangeName;

    public void publishMessageWithDelayTime(String message, Long delayTime) {
        rabbitTemplate.convertAndSend(exchangeName, routingKey, message, m->{
            m.getMessageProperties().setHeader("x-delay", delayTime);
            return m;
        });
    }
}

 

 

DelayRabbitMQService Test 해보기

  • 실무에서는 목킹 처리를 하겠지만 실습이니 바로 테스트코드로 실행해 본다.
package com.example.rabbitmq.service;

import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import static org.junit.jupiter.api.Assertions.*;

@SpringBootTest
class DelayRabbitMQServiceTest {
    @Autowired
    private DelayRabbitMQService delayRabbitMQService;

    @Test
    public void sendMessage () {
        delayRabbitMQService.publishMessageWithDelayTime("test", 1L);
    }
}

 

OverView 모니터링

  • RabbitMq 에 로그인 후 메세지 큐 상태를 확인해 보자
  • 아직 아무런 변화가 없다

 

RabbitMQ 메세지 보낸후 모니터링

  • 아래 처럼 메세지가 들어온 시도가 있다면 Message rate 에서 아래처럼 튀게 된다.

 

728x90
반응형

'Message Queue > RabbitMQ' 카테고리의 다른 글

[RbbitMQ] Delay RabbitMQ Spring Consume  (0) 2022.10.17
[RabbitMQ] Work Queues  (0) 2022.10.03
[RbbitMQ] RabbitMQ 개념 기초  (0) 2022.09.24
[RbbitMQ] Delay RabbitMQ 설치하기  (0) 2022.09.24

댓글