728x90
반응형
Delay RabbitMQ Spring 연동
- 스프링 부트를 사용해서 직접 레빗 엠큐에 메세지를 쏴보자
- 실무에서는 직접 래빗엠큐 정보를 다루는게 편하다
- 그래서 다른 글과는 다르게 ConnectionFactory 를 이용한다.
- 사용자 아이디, 주소등 쉽게 다룰수 있다.
주의사항
- 코드를 작성할때 임포트를 맞게 제대로 해줘야 한다.
- import 를 제대로 확인해보자
Gradle 에 라이브러리 추가
implementation 'org.springframework.boot:spring-boot-starter-amqp'
testImplementation 'org.springframework.amqp:spring-rabbit-test'
application.yml 파일 설정
yml 파일 위치
yml 파일 내용
rabbitmq:
delay:
host: localhost // rabbitmq 주소
port: 5672 //rabbit mq 통신 포트는 5672 를 사용한다.
exchange-name: delayed.message
queue-name: message
username: guest
password: guest
ClassDiagram
MessageQueueConfiguration
- 메세지큐 인터 페이스
package com.example.rabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
public interface MessageQueueConfiguration {
public Queue queue();
public CustomExchange customExchange();
public Binding binding(Queue queue, CustomExchange customExchange);
public ConnectionFactory connectionFactory();
}
DelayRabbitMQConfigurationImpl
package com.example.rabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class DelayRabbitMQConfigurationImpl implements MessageQueueConfiguration{
@Value("${rabbitmq.delay.host}")
private String host;
@Value("${rabbitmq.delay.port}")
private Integer port;
@Value("${rabbitmq.delay.username}")
private String username;
@Value("${rabbitmq.delay.password}")
private String password;
@Value("${rabbitmq.delay.queue-name}")
private String queueName;
@Value("${rabbitmq.delay.exchange-name}")
private String exchangeName;
@Bean
@Override
public Queue queue() {
return new Queue(queueName);
}
@Bean
@Override
public CustomExchange customExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delay-type", "direct");
return new CustomExchange(exchangeName, "x-delayed-message", true, false, args);
}
@Bean
@Override
public ConnectionFactory connectionFactory() {
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setHost(host);
factory.setUsername(username);
factory.setPassword(password);
factory.setPort(port);
return factory;
}
@Bean
@Override
public Binding binding(Queue queue, CustomExchange customExchange) {
return null;
}
@Bean
MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean(name = "rabbitMQTemplate")
RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, MessageConverter messageConverter) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(messageConverter);
return rabbitTemplate;
}
}
DelayRabbitMQService
package com.example.rabbitmq.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class DelayRabbitMQService {
private final AmqpTemplate rabbitTemplate;
public DelayRabbitMQService(@Qualifier("rabbitMQTemplate") AmqpTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
@Value("${rabbitmq.delay.routing-key}")
private String routingKey;
@Value("${rabbitmq.delay.exchange-name}")
private String exchangeName;
public void publishMessageWithDelayTime(String message, Long delayTime) {
rabbitTemplate.convertAndSend(exchangeName, routingKey, message, m->{
m.getMessageProperties().setHeader("x-delay", delayTime);
return m;
});
}
}
DelayRabbitMQService Test 해보기
- 실무에서는 목킹 처리를 하겠지만 실습이니 바로 테스트코드로 실행해 본다.
package com.example.rabbitmq.service;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import static org.junit.jupiter.api.Assertions.*;
@SpringBootTest
class DelayRabbitMQServiceTest {
@Autowired
private DelayRabbitMQService delayRabbitMQService;
@Test
public void sendMessage () {
delayRabbitMQService.publishMessageWithDelayTime("test", 1L);
}
}
OverView 모니터링
- RabbitMq 에 로그인 후 메세지 큐 상태를 확인해 보자
- 아직 아무런 변화가 없다
RabbitMQ 메세지 보낸후 모니터링
- 아래 처럼 메세지가 들어온 시도가 있다면 Message rate 에서 아래처럼 튀게 된다.
728x90
반응형
'Message Queue > RabbitMQ' 카테고리의 다른 글
[RbbitMQ] Delay RabbitMQ Spring Consume (0) | 2022.10.17 |
---|---|
[RabbitMQ] Work Queues (0) | 2022.10.03 |
[RbbitMQ] RabbitMQ 개념 기초 (0) | 2022.09.24 |
[RbbitMQ] Delay RabbitMQ 설치하기 (0) | 2022.09.24 |
댓글