当前位置 主页 > 网站技术 > 代码类 >

    RabbitMQ 最常用的三大模式实例解析

    栏目:代码类 时间:2019-12-10 12:05

    这篇文章主要介绍了RabbitMQ 最常用的三大模式实例解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下

    Direct 模式

    所有发送到 Direct Exchange 的消息被转发到 RouteKey 中指定的 Queue。 Direct 模式可以使用 RabbitMQ 自带的 Exchange: default Exchange,所以不需要将 Exchange 进行任何绑定(binding)操作。 消息传递时,RouteKey 必须完全匹配才会被队列接收,否则该消息会被抛弃,

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class DirectProducer {
      public static void main(String[] args) throws Exception {
        //1. 创建一个 ConnectionFactory 并进行设置
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");
    
        //2. 通过连接工厂来创建连接
        Connection connection = factory.newConnection();
    
        //3. 通过 Connection 来创建 Channel
        Channel channel = connection.createChannel();
    
        //4. 声明
        String exchangeName = "test_direct_exchange";
        String routingKey = "item.direct";
    
        //5. 发送
        String msg = "this is direct msg";
        channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
        System.out.println("Send message : " + msg);
    
        //6. 关闭连接
        channel.close();
        connection.close();
      }
    }
    import com.rabbitmq.client.*;
    import java.io.IOException;
    
    public class DirectConsumer {
    
      public static void main(String[] args) throws Exception {
        //1. 创建一个 ConnectionFactory 并进行设置
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(3000);
       
        //2. 通过连接工厂来创建连接
        Connection connection = factory.newConnection();
    
        //3. 通过 Connection 来创建 Channel
        Channel channel = connection.createChannel();
    
        //4. 声明
        String exchangeName = "test_direct_exchange";
        String queueName = "test_direct_queue";
        String routingKey = "item.direct";
        channel.exchangeDeclare(exchangeName, "direct", true, false, null);
        channel.queueDeclare(queueName, false, false, false, null);
    
        //一般不用代码绑定,在管理界面手动绑定
        channel.queueBind(queueName, exchangeName, routingKey);
    
        //5. 创建消费者并接收消息
        Consumer consumer = new DefaultConsumer(channel) {
          @Override
          public void handleDelivery(String consumerTag, Envelope envelope,
                        AMQP.BasicProperties properties, byte[] body)
              throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
          }
        };
    
        //6. 设置 Channel 消费者绑定队列
        channel.basicConsume(queueName, true, consumer);
    
      }
    }