当前位置 博文首页 > RtxTitanV的博客:SpringBoot2.x 集成 RabbitMQ

    RtxTitanV的博客:SpringBoot2.x 集成 RabbitMQ

    作者:[db:作者] 时间:2021-06-20 12:20

    本文主要对SpringBoot2.x集成RabbitMQ进行简单总结,其中SpringBoot使用的2.4.5版本。

    一、集成RabbitMQ

    通过maven新建一个名为springboot-rabbitmq-producer的项目作为生产者发送消息,新建一个名为springboot-rabbitmq-consumer的项目作为消费者接收消息。

    1.引入依赖

    除了SpringBoot的依赖外,生产者和消费者都还需引入amqp的依赖:

    <!-- amqp起步依赖 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <!-- 单元测试依赖 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    

    2.编写配置文件

    在生产者和消费者的application.yml中都进行如下配置:

    spring:
      rabbitmq:
        # 配置RabbitMq的连接信息
        host: 这里填RabbitMq服务器ip
        port: 5672
        username: admin
        password: admin
        virtual-host: /
    

    3.创建配置类

    在消费者中创建一个用于配置交换机和队列的配置类:

    package com.rtxtitanv.config;
    
    import org.springframework.amqp.core.*;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * @author rtxtitanv
     * @version 1.0.0
     * @name com.rtxtitanv.config.RabbitMqConfig
     * @description 配置交换机和队列
     * @date 2021/4/17 16:48
     */
    @Configuration
    public class RabbitMqConfig {
    }
    

    4.创建测试类

    在生产者中创建一个用于发送消息的测试类进行测试:

    package com.rtxtitanv;
    
    import org.junit.jupiter.api.Test;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    
    /**
     * @author rtxtitanv
     * @version 1.0.0
     * @name com.rtxtitanv.RabbitMqTest
     * @description 用于生产者发送消息的测试类
     * @date 2021/4/17 17:02
     */
    // SpringBoot2.4.0开始使用Junit5,不需要加@Runwith
    @SpringBootTest(classes = RabbitMqProducerApplication.class)
    class RabbitMqTest {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    }
    

    5.创建接收消息的类

    在消费者中创建一个接收消息的类:

    package com.rtxtitanv.listener;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.rabbit.annotation.Exchange;
    import org.springframework.amqp.rabbit.annotation.Queue;
    import org.springframework.amqp.rabbit.annotation.QueueBinding;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    /**
     * @author rtxtitanv
     * @version 1.0.0
     * @name com.rtxtitanv.listener.RabbitMqListener
     * @description 用于监听队列
     * @date 2021/4/17 17:47
     */
    @Component
    public class RabbitMqListener {
    
        private static Logger logger = LoggerFactory.getLogger(RabbitMqListener.class);
    }
    

    二、RabbitMQ常见的工作模式

    1.简单模式

    1
    在配置类中进行如下配置:

    /**
     * 简单模式,声明队列
     *
     * @return Queue对象
     */
    @Bean(name = "simpleQueue")
    public Queue simpleQueue() {
        /*
         * 参数明细
         * 1.name:队列名称
         * 2.durable:是否持久化队列,true表示持久化,false表示不持久化
         * 3.exclusive:是否独占此队列,true表示是,false表示否
         * 4.autoDelete:队列不用是否自动删除,true表示是,false表示否
         */
        return new Queue("simple-queue", true, false, false);
    }
    

    生产者发送消息:

    /**
     * 简单模式,发送消息测试
     */
    @Test
    void simpleSendTest() {
        /*
         * 发送消息
         * 参数明细
         * 1.exchange:交换机名称,如果没有指定,则使用Default Exchange
         * 由于没指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显
         * 示绑定或解除绑定
         * 2.routingKey:路由键,由于使用默认交换机,指定路由键为队列名
         * 3.object:发送的消息
         */
        rabbitTemplate.convertAndSend("", "simple-queue", "Hello World");
    }
    

    消费者接收消息:

    @RabbitListener(queues = "simple-queue")
    public void simpleQueue(String msg) {
        logger.info("监听simple-queue队列的消费者接收到的消息:" + msg);
    }
    

    测试结果:
    2

    2.工作队列模式

    3
    在配置类中进行如下配置:

    /**
     * 工作队列模式,声明队列
     *
     * @return Queue对象
     */
    @Bean(name = "workQueue")
    public Queue workQueue() {
        return new Queue("work-queue", true, false, false);
    }
    

    生产者发送消息:

    /**
     * 工作队列模式,发送消息测试
     */
    @Test
    void workSendTest() {
        for (int i = 1; i <= 10; i++) {
            rabbitTemplate.convertAndSend("", "work-queue", "message" + i);
        }
    }
    

    消费者接收消息:

    @RabbitListener(queues = "work-queue")
    public void workQueue1(String msg) {
        logger.info("监听work-queue队列的消费者1接收到的消息:" + msg);
    }
    
    @RabbitListener(queues = "work-queue")
    public void workQueue2(String msg) {
        logger.info("监听work-queue队列的消费者2接收到的消息:" + msg);
    }
    

    测试结果:
    4

    3.发布订阅模式

    5
    在配置类中进行如下配置:

    /**
     * 发布订阅模式,声明交换机
     *
     * @return FanoutExchange对象
     */
    @Bean(name = "publishExchange")
    public FanoutExchange publishExchange() {
        // 创建一个fanout类型的交换机
        return new FanoutExchange("publish-exchange");
    }
    
    /**
     * 发布订阅模式,声明队列1
     *
     * @return Queue对象
     */
    @Bean(name = "publishQueue1")
    public Queue publishQueue1() {
        return new Queue("publish-queue1", true, false, false);
    }
    
    /**
     * 发布订阅模式,声明队列2
     *
     * @return Queue对象
     */
    @Bean(name = "publishQueue2")
    public Queue publishQueue2() {
        return new Queue("publish-queue2", true, false, false);
    }
    
    /**
     * 将队列publish-queue1与交换机publish-exchange绑定
     *
     * @param queue          队列
     * @param fanoutExchange fanout类型交换机
     * @return Binding对象
     */
    @Bean(name = "bindingQueue1ToFanoutExchange")
    public Binding bindingQueue1ToFanoutExchange(@Qualifier("publishQueue1") Queue queue,
        FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }
    
    /**
     * 将队列publish-queue2与交换机publish-exchange绑定
     *
     * @param queue          队列
     * @param fanoutExchange fanout类型交换机
     * @return Binding对象
     */
    @Bean(name = "bindingQueue2ToFanoutExchange")
    public Binding bindingQueue2ToFanoutExchange(@Qualifier("publishQueue2") Queue queue,
        FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }
    

    生产者发送消息:

    /**
     * 发布订阅模式,发送消息测试
     */
    @Test
    void publishSendTest() {
        rabbitTemplate.convertAndSend("publish-exchange", "", "Hello World");
    }
    

    消费者接收消息:

    @RabbitListener(queues = "publish-queue1")
    public void publishQueue1(String msg)
    
    下一篇:没有了