这篇文章主要介绍了RabbitMQ 最常用的三大模式实例解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
Direct 模式
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class DirectProducer { public static void main(String[] args) throws Exception { //1. 创建一个 ConnectionFactory 并进行设置 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); //2. 通过连接工厂来创建连接 Connection connection = factory.newConnection(); //3. 通过 Connection 来创建 Channel Channel channel = connection.createChannel(); //4. 声明 String exchangeName = "test_direct_exchange"; String routingKey = "item.direct"; //5. 发送 String msg = "this is direct msg"; channel.basicPublish(exchangeName, routingKey, null, msg.getBytes()); System.out.println("Send message : " + msg); //6. 关闭连接 channel.close(); connection.close(); } }
import com.rabbitmq.client.*; import java.io.IOException; public class DirectConsumer { public static void main(String[] args) throws Exception { //1. 创建一个 ConnectionFactory 并进行设置 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); factory.setAutomaticRecoveryEnabled(true); factory.setNetworkRecoveryInterval(3000); //2. 通过连接工厂来创建连接 Connection connection = factory.newConnection(); //3. 通过 Connection 来创建 Channel Channel channel = connection.createChannel(); //4. 声明 String exchangeName = "test_direct_exchange"; String queueName = "test_direct_queue"; String routingKey = "item.direct"; channel.exchangeDeclare(exchangeName, "direct", true, false, null); channel.queueDeclare(queueName, false, false, false, null); //一般不用代码绑定,在管理界面手动绑定 channel.queueBind(queueName, exchangeName, routingKey); //5. 创建消费者并接收消息 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; //6. 设置 Channel 消费者绑定队列 channel.basicConsume(queueName, true, consumer); } }