当前位置 博文首页 > RtxTitanV的博客:SpringBoot2.x 集成 RabbitMQ
本文主要对SpringBoot2.x集成RabbitMQ进行简单总结,其中SpringBoot使用的2.4.5
版本。
通过maven新建一个名为springboot-rabbitmq-producer
的项目作为生产者发送消息,新建一个名为springboot-rabbitmq-consumer
的项目作为消费者接收消息。
除了SpringBoot的依赖外,生产者和消费者都还需引入amqp的依赖:
<!-- amqp起步依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- 单元测试依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
在生产者和消费者的application.yml
中都进行如下配置:
spring:
rabbitmq:
# 配置RabbitMq的连接信息
host: 这里填RabbitMq服务器ip
port: 5672
username: admin
password: admin
virtual-host: /
在消费者中创建一个用于配置交换机和队列的配置类:
package com.rtxtitanv.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author rtxtitanv
* @version 1.0.0
* @name com.rtxtitanv.config.RabbitMqConfig
* @description 配置交换机和队列
* @date 2021/4/17 16:48
*/
@Configuration
public class RabbitMqConfig {
}
在生产者中创建一个用于发送消息的测试类进行测试:
package com.rtxtitanv;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
/**
* @author rtxtitanv
* @version 1.0.0
* @name com.rtxtitanv.RabbitMqTest
* @description 用于生产者发送消息的测试类
* @date 2021/4/17 17:02
*/
// SpringBoot2.4.0开始使用Junit5,不需要加@Runwith
@SpringBootTest(classes = RabbitMqProducerApplication.class)
class RabbitMqTest {
@Autowired
private RabbitTemplate rabbitTemplate;
}
在消费者中创建一个接收消息的类:
package com.rtxtitanv.listener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author rtxtitanv
* @version 1.0.0
* @name com.rtxtitanv.listener.RabbitMqListener
* @description 用于监听队列
* @date 2021/4/17 17:47
*/
@Component
public class RabbitMqListener {
private static Logger logger = LoggerFactory.getLogger(RabbitMqListener.class);
}
在配置类中进行如下配置:
/**
* 简单模式,声明队列
*
* @return Queue对象
*/
@Bean(name = "simpleQueue")
public Queue simpleQueue() {
/*
* 参数明细
* 1.name:队列名称
* 2.durable:是否持久化队列,true表示持久化,false表示不持久化
* 3.exclusive:是否独占此队列,true表示是,false表示否
* 4.autoDelete:队列不用是否自动删除,true表示是,false表示否
*/
return new Queue("simple-queue", true, false, false);
}
生产者发送消息:
/**
* 简单模式,发送消息测试
*/
@Test
void simpleSendTest() {
/*
* 发送消息
* 参数明细
* 1.exchange:交换机名称,如果没有指定,则使用Default Exchange
* 由于没指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显
* 示绑定或解除绑定
* 2.routingKey:路由键,由于使用默认交换机,指定路由键为队列名
* 3.object:发送的消息
*/
rabbitTemplate.convertAndSend("", "simple-queue", "Hello World");
}
消费者接收消息:
@RabbitListener(queues = "simple-queue")
public void simpleQueue(String msg) {
logger.info("监听simple-queue队列的消费者接收到的消息:" + msg);
}
测试结果:
在配置类中进行如下配置:
/**
* 工作队列模式,声明队列
*
* @return Queue对象
*/
@Bean(name = "workQueue")
public Queue workQueue() {
return new Queue("work-queue", true, false, false);
}
生产者发送消息:
/**
* 工作队列模式,发送消息测试
*/
@Test
void workSendTest() {
for (int i = 1; i <= 10; i++) {
rabbitTemplate.convertAndSend("", "work-queue", "message" + i);
}
}
消费者接收消息:
@RabbitListener(queues = "work-queue")
public void workQueue1(String msg) {
logger.info("监听work-queue队列的消费者1接收到的消息:" + msg);
}
@RabbitListener(queues = "work-queue")
public void workQueue2(String msg) {
logger.info("监听work-queue队列的消费者2接收到的消息:" + msg);
}
测试结果:
在配置类中进行如下配置:
/**
* 发布订阅模式,声明交换机
*
* @return FanoutExchange对象
*/
@Bean(name = "publishExchange")
public FanoutExchange publishExchange() {
// 创建一个fanout类型的交换机
return new FanoutExchange("publish-exchange");
}
/**
* 发布订阅模式,声明队列1
*
* @return Queue对象
*/
@Bean(name = "publishQueue1")
public Queue publishQueue1() {
return new Queue("publish-queue1", true, false, false);
}
/**
* 发布订阅模式,声明队列2
*
* @return Queue对象
*/
@Bean(name = "publishQueue2")
public Queue publishQueue2() {
return new Queue("publish-queue2", true, false, false);
}
/**
* 将队列publish-queue1与交换机publish-exchange绑定
*
* @param queue 队列
* @param fanoutExchange fanout类型交换机
* @return Binding对象
*/
@Bean(name = "bindingQueue1ToFanoutExchange")
public Binding bindingQueue1ToFanoutExchange(@Qualifier("publishQueue1") Queue queue,
FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queue).to(fanoutExchange);
}
/**
* 将队列publish-queue2与交换机publish-exchange绑定
*
* @param queue 队列
* @param fanoutExchange fanout类型交换机
* @return Binding对象
*/
@Bean(name = "bindingQueue2ToFanoutExchange")
public Binding bindingQueue2ToFanoutExchange(@Qualifier("publishQueue2") Queue queue,
FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queue).to(fanoutExchange);
}
生产者发送消息:
/**
* 发布订阅模式,发送消息测试
*/
@Test
void publishSendTest() {
rabbitTemplate.convertAndSend("publish-exchange", "", "Hello World");
}
消费者接收消息:
@RabbitListener(queues = "publish-queue1")
public void publishQueue1(String msg)