当前位置 博文首页 > 无欲则刚的博客:RabbitMQ学习笔记2
队列参数
channel.QueueDeclare方法中arguments参数,队列一旦声明,参数将无法更改,添加,删除
参数名称 | 描述 | Features |
---|---|---|
x-message-ttl | 队列中的消息的生存周期,单位毫秒 | TTL |
x-expires | 队列在指定的时间内没有被使用(访问)就会被删除 | Exp |
x-max-length | 设置队列最大长度(先进先丢) | Lim |
x-max-length-bytes | 队列可以容纳的消息的最大字节数,超过这个字节数,队列头部的消息将会被丢弃 | Lim B |
x-overflow | 队列中的消息溢出时,(默认drop-head)丢弃队列头部的消息或(reject-publish)拒绝接收后面生产者发送过来的所有消息 | Ovfl |
x-single-active-consumer | 一次只能有一个消费者消费消息 | SAC |
x-dead-letter-exchange | 设置当前队列的死信交换机 | DLX |
x-dead-letter-routing-key | 设置死信交换机的路由key,死信交换机会根据该值去找到死信消息存放的队列 | DLK |
x-max-priority | 队列中的消息的优先级最大值,不设置的队列就不支持优先级发送消息 | Pri |
x-queue-mode | 懒人模式的队列会先将交换机推送过来的消息(尽可能多的)保存在磁盘上,以减少内存的占用。当消费者开始消费的时候才加载到内存中。 | Args |
x-queue-master-locator | master queue host 的分配策略:min-masters、client-local和random |
消息参数
参数名称 | 描述 |
---|---|
content_type | 消息内容的类型 |
content_encoding | 消息内容的编码格式 |
priority | 消息的优先级 |
correlation_id | 用于将RPC响应与请求相关联 |
reply_to | 回调队列 |
expiration | 消息过期时间,单位毫秒.该参数值优先级>队列参数设置中的消息生存期 |
message_id | 消息id |
timestamp | 消息的时间戳 |
type | 类型 |
user_id | 用户id |
app_id | 应用程序id |
cluster_id | 集群id |
签收异常,没有调用basic.ack;当前会话处于连接状态时,消息转变为unacked状态,其他消费者消费不到,当前会话断开,unacked的消息会重新变为ready状态,其他消费者才能够重新消费
签收正常,成功调用basic.ack,队列中立即删除消息
basic.reject方法拒绝deliveryTag对应的消息,第二个参数是否requeue,true则重新入队列,否则丢弃或者进入死信队列
basic.nack方法为不确认deliveryTag对应的消息,第二个参数是否应用于多消息,第三个参数是否requeue,与basic.reject区别就是同时支持多个消息,可以nack该消费者先前接收未ack的所有消息。nack后的消息也会被自己消费到
basic.recover是否恢复消息到队列,参数是是否requeue,true则重新入队列,并且尽可能的将之前recover的消息投递给其他消费者消费,而不是自己再次消费。false则消息会重新被投递给自己
消费者和生产者已知,消费者和生产者之间没有时间依赖关系
生产者已知,消费者未知,需要消费者订阅后才能接收消息
**推模式:**消息中间件主动将消息推送给消费者,消费者需要设置一个缓冲区缓存消息,效率高,但缓冲区可能会溢出
**拉模式:**消费者主动从消息中间件拉取消息,网络开销会增加消息延迟,降低系统吞吐量
拉模式适用场景
消费者在某个条件成立时才能消费消息
需要批量拉取消息进行处理,连续调用basicGet
方法拉取多条消息,处理完毕一次性返回ACK
利用数据库主键去重
利用Redis的原子性去实现
redis是单线程的,但是性能好也有很多原子性的命令,比如setnx命令,在接收到消息后将消息ID作为key去执行setnx命令,如果执行成功则表示没有执行过这条消息,可以进行消费(setnx命令特点:当且仅当key不存在,将key值设为value值;若key已存在该命令不做任何操作)
使用全局ID区分消息,解决幂等性
生产者在请求头设置messageId,可以用随机ID或业务逻辑唯一ID
消息被拒(basicreject or basicnack)并且没有重新入队(requeue=false);
当前队列中的消息数量已经超过最大长度
消息在队列中过期
配置死信队列
????public?static?void?SendMessage()
????????{
????????????var?exchangeA?=?"exchange";
????????????var?routeA?=?"routekey";
????????????var?queueA?=?"queue";
????????????var?exchangeD?=?"dlx.exchange";
????????????var?routeD?=?"dlx.route";
????????????var?queueD?=?"dlx.queue";
????????????var?connection?=?RabbitMQHelper.GetConnection();
????????????{
????????????????var?channel?=?connection.CreateModel();
????????????????{
????????????????????//?创建死信交换机
????????????????????channel.ExchangeDeclare(exchangeD,?type:?"fanout",?durable:?true,?autoDelete:?false);
????????????????????//?创建死信队列
????????????????????channel.QueueDeclare(queueD,?durable:?true,?exclusive:?false,?autoDelete:?false);
????????????????????//?绑定死信交换机和队列
????????????????????channel.QueueBind(queueD,?exchangeD,?routeD);
????????????????????channel.ExchangeDeclare(exchangeA,?type:?"fanout",?durable:?true,?autoDelete:?false);
????????????????????channel.QueueDeclare(queueA,?durable:?true,?exclusive:?false,?autoDelete:?false,?arguments:?
????????????????????????????????????????new?Dictionary<string,?object>?{
?????????????????????????????????????????????{?"x-dead-letter-exchange",exchangeD},?//设置当前队列的DLX
?????????????????????????????????????????????{?"x-dead-letter-routing-key",routeD},?//设置DLX的路由key,DLX会根据该值去找到死信消息存放的队列
???????????????????????????????????????????//??{?"x-message-ttl",10000},?//设置消息的存活时间,即过期时间
????????????????????????????????????????????{?"x-max-length",5}//设置队列最大长度
?????????????????????????????????????????});
????????????????????channel.QueueBind(queueA,?exchangeA,?routeA);
????????????????????var?properties?=?channel.CreateBasicProperties();
????????????????????properties.Persistent?=?true;
????????????????????//发布消息
????????????????????channel.BasicPublish(exchange:?exchangeA,
?????????????????????????????????????????routingKey:?routeA,
?????????????????????????????????????????basicProperties:?properties,
?????????????????????????????????????????body:?Encoding.UTF8.GetBytes("hello?rabbitmq?message"));
????????????????}
????????????}
????????????
????????}?
重试失败特定次数后放入死信队列
?private?static?string?_exchangeNormal?=?"Exchange.Normal";??//定义一个用于接收?正常?消息的交换机
????????private?static?string?_exchangeRetry?=?"Exchange.Retry";????//定义一个用于接收?重试?消息的交换机
????????private?static?string?_exchangeFail?=?"Exchange.Fail";??????//定义一个用于接收?失败?消息的交换机
????????private?static?string?_queueNormal?=?"Queue.Noraml";????????//定义一个用于接收?正常?消息的队列
????????private?static?string?_queueRetry?=?"Queue.Retry";??????????//定义一个用于接收?重试?消息的队列
????????private?static?string?_queueFail?=?"Queue.Fail";????????????//定义一个用于接收?失败?消息的队列
????????public?static?void?Test()
????????{
????????????var?connection?=?RabbitMQHelper.GetConnection();
????????????var?channel?=?connection.CreateModel();
????????????//声明交换机
????????????channel.ExchangeDeclare(_exchangeNormal,?"topic",?true,?false,?null);
????????????channel.ExchangeDeclare(_exchangeRetry,?"topic",?true,?false,?null);
????????????channel.ExchangeDeclare(_exchangeFail,?"topic",?true,?false,?null);
????????????//定义队列参数
????????????var?queueNormalArgs?=?new?Dictionary<string,?object>();
????????????{
????????????????queueNormalArgs.Add("x-dead-letter-exchange",?_exchangeFail);???//指定死信交换机,用于将?Normal?队列中失败的消息投递给?Fail?交换机
????????????}
????????????var?queueRetryArgs?=?new?Dictionary<string,?object>();
????????????{
????????????????queueRetryArgs.Add("x-dead-letter-exchange",?_exchangeNormal);??//指定死信交换机,用于将?Retry?队列中超时的消息投递给?Normal?交换机
????????????????queueRetryArgs.Add("x-message-ttl",?6000);??????????????????????//定义 queueRetry 的消息最大停留时间?(原理是:等消息超时后由 broker 自动投递给当前绑定的死信交换机)?????????????????????????????????????????????????????????????????????????????//定义最大停留时间为防止一些?待重新投递?的消息、没有定义重试时间而导致内存溢出
????????????}
????????????var?queueFailArgs?=?new?Dictionary<string,?object>();
????????????{
????????????}
????????????//声明队列
????????????channel.QueueDeclare(queue:?_queueNormal,?durable:?true,?exclusive:?false,?autoDelete:?false,?arguments:?queueNormalArgs);
????????????channel.QueueDeclare(queue:?_queueRetry,?durable:?true,?exclusive:?false,?autoDelete:?false,?arguments:?queueRetryArgs);
????????????channel.QueueDeclare(queue:?_queueFail,?durable:?true,?exclusive:?false,?autoDelete:?false,?arguments:?queueFailArgs);
????????????//为队列绑定交换机
????????????channel.QueueBind(queue:?_queueNormal,?exchange:?_exchangeNormal,?routingKey:?"#");
????????????channel.QueueBind(queue:?_queueRetry,?exchange:?_exchangeRetry,?routingKey:?"#");
????????????channel.QueueBind(queue:?_queueFail,?exchange:?_exchangeFail,?routingKey:?"#");
????????????#region?创建一个普通消息消费者
????????????{
????????????????var?consumer?=?new?EventingBasicConsumer(channel);
????????????????consumer.Received?+=?(sender,?e)?=>
????????????????{
????????????????????var?_sender?=?(EventingBasicConsumer)sender;????????????//消息传送者
????????????????????var?_channel?=?_sender.Model;???????????????????????????//消息传送通道
????????????????????var?_message?=?(BasicDeliverEventArgs)e;????????????????//消息传送参数
????????????????????var?_headers?=?_message.BasicProperties.Headers;????????//消息头
????????????????????var?_content?=?Encoding.UTF8.GetString(_message.Body.ToArray());??//消息内容
????????????????????var?_death?=?default(Dictionary<string,?object>);???????//死信参数
????????????????????if?(_headers?!=?null?&&?_headers.ContainsKey("x-death"))
????????????????????????_death?=?(Dictionary<string,?object>)(_headers["x-death"]?as?List<object>)[0];
????????????????????try
????????????????????#region?消息处理
????????????????????{
????????????????????????Console.WriteLine();
????????????????????????Console.WriteLine($"{DateTime.Now.ToString("HH:mm:ss.fff")}\t(1.0)消息接收:\r\n\t[deliveryTag={_message.DeliveryTag}]\r\n\t[consumerID={_message.ConsumerTag}]\r\n\t[exchange={_message.Exchange}]\r\n\t[routingKey={_message.RoutingKey}]\r\n\t[content={_content}]");
????????????????????????throw?new?Exception("模拟消息处理失败效果。");
????????????????????????//处理成功时
????????????????????????Console.WriteLine($"{DateTime.Now.ToString("HH:mm:ss.fff")}\t(1.1)处理成功:\r\n\t[deliveryTag={_message.DeliveryTag}]");
????????????????????????//消息确认?(销毁当前消息)
????????????????????????_channel.BasicAck(deliveryTag:?_message.DeliveryTag,?multiple:?false);
????????????????????}
????????????????????#endregion
????????????????????catch?(Exception?ex)
????????????????????#region?消息处理失败时
????????????????????{
????????????????????????var?retryCount?=?(long)(_death?["count"]????default(long));?//查询当前消息被重新投递的次数?(首次则为0)
????????????????????????Console.WriteLine($"{DateTime.Now.ToString("HH:mm:ss.fff")}\t(1.2)处理失败:\r\n\t[deliveryTag={_message.DeliveryTag}]\r\n\t[retryCount={retryCount}]");
????????????????????????if?(retryCount?>=?2)
????????????????????????#region?投递第3次还没消费成功时,就转发给?exchangeFail?交换机
????????????????????????{
????????????????????????????//消息拒绝(投递给死信交换机,也就是上边定义的?("x-dead-letter-exchange",?_exchangeFail))
????????????????????????????_channel.BasicNack(deliveryTag:?_message.DeliveryTag,?multiple:?false,?requeue:?false);
????????????????????????}
????????????????????????#endregion
????????????????????????else
????????????????????????#region?否则转发给?exchangeRetry?交换机
????????????????????????{
????????????????????????????var?interval?=?(retryCount?+?1)?*?10;?//定义下一次投递的间隔时间?(单位:秒)
????????????????????????????//定义下一次投递的间隔时间?(单位:毫秒)
????????????????????????????_message.BasicProperties.Expiration?=?(interval?*?1000).ToString();
????????????????????????????//将消息投递给?_exchangeRetry?(会自动增加?death?次数)
????????????????????????????_channel.BasicPublish(exchange:?_exchangeRetry,?routingKey:?_message.RoutingKey,?basicProperties:?_message.BasicProperties,?body:?_message.Body);
????????????????????????????//消息确认?(销毁当前消息)
????????????????????????????_channel.BasicAck(deliveryTag:?_message.DeliveryTag,?multiple:?false);
????????????????????????}
????????????????????????#endregion
????????????????????}
????????????????????#endregion
????????????????};
????????????????channel.BasicConsume(queue:?_queueNormal,?autoAck:?false,?consumer:?consumer);
????????????}
????????????#endregion
????????}
过期队列+死信交换机
RabbitMQ 3.6.x 开始可以使用延迟插件,交换机类型选择x-delayed-message(延迟将数据放入队列)
???public?static?void?ConsumerMessage()
???{
????????????var?connection?=?RabbitMQHelper.GetConnection();
????????????var?channel?=?connection.CreateModel();
????????????var?exchangeArgumets?=?new?Dictionary<string,?object>
????????????{
????????????????{?"x-delayed-type",?"topic"?}??//延迟交换机的类型
????????????};
????????????channel.ExchangeDeclare("delay_exchange",?"x-delayed-message",?true,?false,?exchangeArgumets);
????????????//?创建队列
????????????string?queueName1?=?"delay_queue1";
????????????channel.QueueDeclare(queueName1,?false,?false,?false,?null);
????????????string?queueName2?=?"delay_queue2";
????????????channel.QueueDeclare(queueName2,?false,?false,?false,?null);
????????????string?queueName3?=?"delay_queue3";
????????????channel.QueueDeclare(queueName3,?false,?false,?false,?null);
????????????//?绑定到交互机
????????????channel.QueueBind(queue:?queueName1,?exchange:?"delay_exchange",?routingKey:?"delayed-direct1");
????????????channel.QueueBind(queue:?queueName2,?exchange:?"delay_exchange",?routingKey:?"delayed-direct2");
????????????channel.QueueBind(queue:?queueName3,?exchange:?"delay_exchange",?routingKey:?"delayed-direct3");
????????????var?properties?=?channel.CreateBasicProperties();
????????????properties.Persistent?=?true;?//?标记消息持久化
????????????Console.WriteLine("?[*]?Waiting?for?messages.");
????????????var?consumer?=?new?EventingBasicConsumer(channel);
????????????consumer.Received?+=?(model,?ea)?=>
????????????{
????????????????var?body?=?ea.Body;
????????????????var?message?=?Encoding.UTF8.GetString(body.ToArray());
????????????????var?routingKey?=?ea.RoutingKey;
????????????????var?_headers?=?ea.BasicProperties.Headers;????????//消息头
????????????????int?delay?=?0;
????????????????if?(_headers?==?null)
????????????????{
????????????????????ea.BasicProperties.Headers?=?new?Dictionary<string,?object>();
????????????????}
????????????????else??if?(?_headers.ContainsKey("x-delay"))
????????????????{
????????????????????delay?=?Convert.ToInt32(ea.BasicProperties.Headers["x-delay"]);
????????????????????delay?=?delay?+?20000;
????????????????}
????????????????ea.BasicProperties.Headers["x-delay"]?=?delay;?//消息头设置消息延迟的时间
????????????????Console.WriteLine($"?{DateTime.Now}=={delay}");
????????????????Console.WriteLine("?[x]?Received?'{0}':'{1}'",?routingKey,?message);
???????????????
????????????????channel.BasicPublish(ea.Exchange,?ea.RoutingKey,?basicProperties:?ea.BasicProperties,?body);
????????????????channel.BasicAck(deliveryTag:?ea.DeliveryTag,?multiple:?false);
????????????};
????????????channel.BasicConsume(queue:?queueName3,
?????????????????????????????????autoAck:?false,
?????????????????????????????????consumer:?consumer);
????????????Console.WriteLine("?Press?[enter]?to?exit.");
????????????Console.ReadLine();
????????}
cs