当前位置 主页 > 网站技术 > 代码类 >

    Java搭建RabbitMq消息中间件过程详解

    栏目:代码类 时间:2019-12-23 12:09

    这篇文章主要介绍了Java搭建RabbitMq消息中间件过程详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下

    前言

    当系统中出现“生产“和“消费“的速度或稳定性等因素不一致的时候,就需要消息队列。

    名词

    exchange: 交换机 routingkey: 路由key queue:队列

    控制台端口:15672

      exchange和queue是需要绑定在一起的,然后消息发送到exchange再由exchange通过routingkey发送到对应的队列中。

    使用场景

    1.技能订单3分钟自动取消,改变状态

    2.直播开始前15分钟提醒

    3.直播状态自动结束

    流程

      生产者发送消息 —> order_pre_exchange交换机 —> order_per_ttl_delay_queue队列

      —> 时间到期 —> order_delay_exchange交换机 —> order_delay_process_queue队列 —> 消费者

    第一步:在pom文件中添加

    <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

    第二步:在application.properties文件中添加

    spring.rabbitmq.host=172.xx.xx.xxx
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=rabbit
    spring.rabbitmq.password=123456
    spring.rabbitmq.virtual-host=/
    spring.rabbitmq.connection-timeout=15000
    
    spring.rabbitmq.publisher-confirms=true
    spring.rabbitmq.publisher-returns=true
    spring.rabbitmq.template.mandatory=true

    第三步:配置 OrderQueueConfig

    package com.tuohang.platform.config;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.QueueBuilder;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
    import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    
    /**
     * rabbitMQ的队列设置(生产者发送的消息,永远是先进入exchange,再通过路由,转发到队列)
     * 
     * 
     * @author Administrator
     * @version 1.0
     * @Date 2018年9月18日
     */
    @Configuration
    public class OrderQueueConfig {
    
      /**
       * 订单缓冲交换机名称
       */
      public final static String ORDER_PRE_EXCHANGE_NAME = "order_pre_exchange";
    
      /**
       * 发送到该队列的message会在一段时间后过期进入到order_delay_process_queue 【队列里所有的message都有统一的失效时间】
       */
      public final static String ORDER_PRE_TTL_DELAY_QUEUE_NAME = "order_pre_ttl_delay_queue";
    
      /**
       * 订单的交换机DLX 名字
       */
      final static String ORDER_DELAY_EXCHANGE_NAME = "order_delay_exchange";
    
      /**
       * 订单message时间过期后进入的队列,也就是订单实际的消费队列
       */
      public final static String ORDER_DELAY_PROCESS_QUEUE_NAME = "order_delay_process_queue";
    
      /**
       * 订单在缓冲队列过期时间(毫秒)30分钟
       */
      public final static int ORDER_QUEUE_EXPIRATION = 1800000;
    
      /**
       * 订单缓冲交换机
       * 
       * @return
       */
      @Bean
      public DirectExchange preOrderExange() {
        return new DirectExchange(ORDER_PRE_EXCHANGE_NAME);
      }
    
      /**
       * 创建order_per_ttl_delay_queue队列,订单消息经过缓冲交换机,会进入该队列
       * 
       * @return
       */
      @Bean
      public Queue delayQueuePerOrderTTLQueue() {
        return QueueBuilder.durable(ORDER_PRE_TTL_DELAY_QUEUE_NAME)
            .withArgument("x-dead-letter-exchange", ORDER_DELAY_EXCHANGE_NAME) // DLX
            .withArgument("x-dead-letter-routing-key", ORDER_DELAY_PROCESS_QUEUE_NAME) // dead letter携带的routing key
            .withArgument("x-message-ttl", ORDER_QUEUE_EXPIRATION) // 设置订单队列的过期时间
            .build();
      }
    
      /**
       * 将order_pre_exchange绑定到order_pre_ttl_delay_queue队列
       *
       * @param delayQueuePerOrderTTLQueue
       * @param preOrderExange
       * @return
       */
      @Bean
      public Binding queueOrderTTLBinding(Queue delayQueuePerOrderTTLQueue, DirectExchange preOrderExange) {
        return BindingBuilder.bind(delayQueuePerOrderTTLQueue).to(preOrderExange).with(ORDER_PRE_TTL_DELAY_QUEUE_NAME);
      }
    
      /**
       * 创建订单的DLX exchange
       *
       * @return
       */
      @Bean
      public DirectExchange delayOrderExchange() {
        return new DirectExchange(ORDER_DELAY_EXCHANGE_NAME);
      }
    
      /**
       * 创建order_delay_process_queue队列,也就是订单实际消费队列
       *
       * @return
       */
      @Bean
      public Queue delayProcessOrderQueue() {
        return QueueBuilder.durable(ORDER_DELAY_PROCESS_QUEUE_NAME).build();
      }
    
      /**
       * 将DLX绑定到实际消费队列
       *
       * @param delayProcessOrderQueue
       * @param delayExchange
       * @return
       */
      @Bean
      public Binding dlxOrderBinding(Queue delayProcessOrderQueue, DirectExchange delayOrderExchange) {
        return BindingBuilder.bind(delayProcessOrderQueue).to(delayOrderExchange).with(ORDER_DELAY_PROCESS_QUEUE_NAME);
      }
    
      /**
       * 监听订单实际消费者队列order_delay_process_queue
       * 
       * @param connectionFactory
       * @param processReceiver
       * @return
       */
      @Bean
      public SimpleMessageListenerContainer orderProcessContainer(ConnectionFactory connectionFactory,
          OrderProcessReceiver processReceiver) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(ORDER_DELAY_PROCESS_QUEUE_NAME); // 监听order_delay_process_queue
        container.setMessageListener(new MessageListenerAdapter(processReceiver));
        return container;
      }
    }