当前位置 博文首页 > 无欲则刚的博客:RabbitMQ学习笔记2

    无欲则刚的博客:RabbitMQ学习笔记2

    作者:[db:作者] 时间:2021-09-10 18:59

    公共参数说明

    队列参数

    channel.QueueDeclare方法中arguments参数,队列一旦声明,参数将无法更改,添加,删除

    参数名称描述Features
    x-message-ttl队列中的消息的生存周期,单位毫秒TTL
    x-expires队列在指定的时间内没有被使用(访问)就会被删除Exp
    x-max-length设置队列最大长度(先进先丢)Lim
    x-max-length-bytes队列可以容纳的消息的最大字节数,超过这个字节数,队列头部的消息将会被丢弃Lim B
    x-overflow队列中的消息溢出时,(默认drop-head)丢弃队列头部的消息或(reject-publish)拒绝接收后面生产者发送过来的所有消息Ovfl
    x-single-active-consumer一次只能有一个消费者消费消息SAC
    x-dead-letter-exchange设置当前队列的死信交换机DLX
    x-dead-letter-routing-key设置死信交换机的路由key,死信交换机会根据该值去找到死信消息存放的队列DLK
    x-max-priority队列中的消息的优先级最大值,不设置的队列就不支持优先级发送消息Pri
    x-queue-mode懒人模式的队列会先将交换机推送过来的消息(尽可能多的)保存在磁盘上,以减少内存的占用。当消费者开始消费的时候才加载到内存中。Args
    x-queue-master-locatormaster queue host 的分配策略:min-masters、client-local和random

    消息参数

    参数名称描述
    content_type消息内容的类型
    content_encoding消息内容的编码格式
    priority消息的优先级
    correlation_id用于将RPC响应与请求相关联
    reply_to回调队列
    expiration消息过期时间,单位毫秒.该参数值优先级>队列参数设置中的消息生存期
    message_id消息id
    timestamp消息的时间戳
    type类型
    user_id用户id
    app_id应用程序id
    cluster_id集群id

    消息手动签收

    • 签收异常,没有调用basic.ack;当前会话处于连接状态时,消息转变为unacked状态,其他消费者消费不到,当前会话断开,unacked的消息会重新变为ready状态,其他消费者才能够重新消费

    • 签收正常,成功调用basic.ack,队列中立即删除消息

    • basic.reject方法拒绝deliveryTag对应的消息,第二个参数是否requeue,true则重新入队列,否则丢弃或者进入死信队列

    • basic.nack方法为不确认deliveryTag对应的消息,第二个参数是否应用于多消息,第三个参数是否requeue,与basic.reject区别就是同时支持多个消息,可以nack该消费者先前接收未ack的所有消息。nack后的消息也会被自己消费到

    • basic.recover是否恢复消息到队列,参数是是否requeue,true则重新入队列,并且尽可能的将之前recover的消息投递给其他消费者消费,而不是自己再次消费。false则消息会重新被投递给自己

    消费者和生产者时间依赖关系

    • 消费者和生产者已知,消费者和生产者之间没有时间依赖关系

    • 生产者已知,消费者未知,需要消费者订阅后才能接收消息

    消费端获取消息模式

    • **推模式:**消息中间件主动将消息推送给消费者,消费者需要设置一个缓冲区缓存消息,效率高,但缓冲区可能会溢出

    • **拉模式:**消费者主动从消息中间件拉取消息,网络开销会增加消息延迟,降低系统吞吐量

      拉模式适用场景

      • 消费者在某个条件成立时才能消费消息

      • 需要批量拉取消息进行处理,连续调用basicGet方法拉取多条消息,处理完毕一次性返回ACK

    解决重复消费问题

    • 利用数据库主键去重

    • 利用Redis的原子性去实现

    redis是单线程的,但是性能好也有很多原子性的命令,比如setnx命令,在接收到消息后将消息ID作为key去执行setnx命令,如果执行成功则表示没有执行过这条消息,可以进行消费(setnx命令特点:当且仅当key不存在,将key值设为value值;若key已存在该命令不做任何操作)

    • 使用全局ID区分消息,解决幂等性

    生产者在请求头设置messageId,可以用随机ID或业务逻辑唯一ID

    死信队列

    • 消息被拒(basicreject or basicnack)并且没有重新入队(requeue=false);

    • 当前队列中的消息数量已经超过最大长度

    • 消息在队列中过期

    配置死信队列

    ????public?static?void?SendMessage()
    ????????{
    ????????????var?exchangeA?=?"exchange";
    ????????????var?routeA?=?"routekey";
    ????????????var?queueA?=?"queue";
    
    ????????????var?exchangeD?=?"dlx.exchange";
    ????????????var?routeD?=?"dlx.route";
    ????????????var?queueD?=?"dlx.queue";
    
    ????????????var?connection?=?RabbitMQHelper.GetConnection();
    ????????????{
    ????????????????var?channel?=?connection.CreateModel();
    ????????????????{
    ????????????????????//?创建死信交换机
    ????????????????????channel.ExchangeDeclare(exchangeD,?type:?"fanout",?durable:?true,?autoDelete:?false);
    ????????????????????//?创建死信队列
    ????????????????????channel.QueueDeclare(queueD,?durable:?true,?exclusive:?false,?autoDelete:?false);
    ????????????????????//?绑定死信交换机和队列
    ????????????????????channel.QueueBind(queueD,?exchangeD,?routeD);
    
    ????????????????????channel.ExchangeDeclare(exchangeA,?type:?"fanout",?durable:?true,?autoDelete:?false);
    ????????????????????channel.QueueDeclare(queueA,?durable:?true,?exclusive:?false,?autoDelete:?false,?arguments:?
    ????????????????????????????????????????new?Dictionary<string,?object>?{
    ?????????????????????????????????????????????{?"x-dead-letter-exchange",exchangeD},?//设置当前队列的DLX
    ?????????????????????????????????????????????{?"x-dead-letter-routing-key",routeD},?//设置DLX的路由key,DLX会根据该值去找到死信消息存放的队列
    ???????????????????????????????????????????//??{?"x-message-ttl",10000},?//设置消息的存活时间,即过期时间
    ????????????????????????????????????????????{?"x-max-length",5}//设置队列最大长度
    ?????????????????????????????????????????});
    ????????????????????channel.QueueBind(queueA,?exchangeA,?routeA);
    
    
    ????????????????????var?properties?=?channel.CreateBasicProperties();
    ????????????????????properties.Persistent?=?true;
    ????????????????????//发布消息
    ????????????????????channel.BasicPublish(exchange:?exchangeA,
    ?????????????????????????????????????????routingKey:?routeA,
    ?????????????????????????????????????????basicProperties:?properties,
    ?????????????????????????????????????????body:?Encoding.UTF8.GetBytes("hello?rabbitmq?message"));
    ????????????????}
    ????????????}
    ????????????
    ????????}?
    

    重试失败特定次数后放入死信队列

    ?private?static?string?_exchangeNormal?=?"Exchange.Normal";??//定义一个用于接收?正常?消息的交换机
    ????????private?static?string?_exchangeRetry?=?"Exchange.Retry";????//定义一个用于接收?重试?消息的交换机
    ????????private?static?string?_exchangeFail?=?"Exchange.Fail";??????//定义一个用于接收?失败?消息的交换机
    ????????private?static?string?_queueNormal?=?"Queue.Noraml";????????//定义一个用于接收?正常?消息的队列
    ????????private?static?string?_queueRetry?=?"Queue.Retry";??????????//定义一个用于接收?重试?消息的队列
    ????????private?static?string?_queueFail?=?"Queue.Fail";????????????//定义一个用于接收?失败?消息的队列
    
    ????????public?static?void?Test()
    ????????{
    ????????????var?connection?=?RabbitMQHelper.GetConnection();
    ????????????var?channel?=?connection.CreateModel();
    
    ????????????//声明交换机
    ????????????channel.ExchangeDeclare(_exchangeNormal,?"topic",?true,?false,?null);
    ????????????channel.ExchangeDeclare(_exchangeRetry,?"topic",?true,?false,?null);
    ????????????channel.ExchangeDeclare(_exchangeFail,?"topic",?true,?false,?null);
    
    ????????????//定义队列参数
    ????????????var?queueNormalArgs?=?new?Dictionary<string,?object>();
    ????????????{
    ????????????????queueNormalArgs.Add("x-dead-letter-exchange",?_exchangeFail);???//指定死信交换机,用于将?Normal?队列中失败的消息投递给?Fail?交换机
    ????????????}
    ????????????var?queueRetryArgs?=?new?Dictionary<string,?object>();
    ????????????{
    ????????????????queueRetryArgs.Add("x-dead-letter-exchange",?_exchangeNormal);??//指定死信交换机,用于将?Retry?队列中超时的消息投递给?Normal?交换机
    ????????????????queueRetryArgs.Add("x-message-ttl",?6000);??????????????????????//定义 queueRetry 的消息最大停留时间?(原理是:等消息超时后由 broker 自动投递给当前绑定的死信交换机)?????????????????????????????????????????????????????????????????????????????//定义最大停留时间为防止一些?待重新投递?的消息、没有定义重试时间而导致内存溢出
    ????????????}
    ????????????var?queueFailArgs?=?new?Dictionary<string,?object>();
    ????????????{
    ????????????}
    
    ????????????//声明队列
    ????????????channel.QueueDeclare(queue:?_queueNormal,?durable:?true,?exclusive:?false,?autoDelete:?false,?arguments:?queueNormalArgs);
    ????????????channel.QueueDeclare(queue:?_queueRetry,?durable:?true,?exclusive:?false,?autoDelete:?false,?arguments:?queueRetryArgs);
    ????????????channel.QueueDeclare(queue:?_queueFail,?durable:?true,?exclusive:?false,?autoDelete:?false,?arguments:?queueFailArgs);
    
    ????????????//为队列绑定交换机
    ????????????channel.QueueBind(queue:?_queueNormal,?exchange:?_exchangeNormal,?routingKey:?"#");
    ????????????channel.QueueBind(queue:?_queueRetry,?exchange:?_exchangeRetry,?routingKey:?"#");
    ????????????channel.QueueBind(queue:?_queueFail,?exchange:?_exchangeFail,?routingKey:?"#");
    
    ????????????#region?创建一个普通消息消费者
    ????????????{
    ????????????????var?consumer?=?new?EventingBasicConsumer(channel);
    
    ????????????????consumer.Received?+=?(sender,?e)?=>
    ????????????????{
    ????????????????????var?_sender?=?(EventingBasicConsumer)sender;????????????//消息传送者
    ????????????????????var?_channel?=?_sender.Model;???????????????????????????//消息传送通道
    ????????????????????var?_message?=?(BasicDeliverEventArgs)e;????????????????//消息传送参数
    ????????????????????var?_headers?=?_message.BasicProperties.Headers;????????//消息头
    ????????????????????var?_content?=?Encoding.UTF8.GetString(_message.Body.ToArray());??//消息内容
    ????????????????????var?_death?=?default(Dictionary<string,?object>);???????//死信参数
    
    ????????????????????if?(_headers?!=?null?&&?_headers.ContainsKey("x-death"))
    ????????????????????????_death?=?(Dictionary<string,?object>)(_headers["x-death"]?as?List<object>)[0];
    
    ????????????????????try
    ????????????????????#region?消息处理
    ????????????????????{
    ????????????????????????Console.WriteLine();
    ????????????????????????Console.WriteLine($"{DateTime.Now.ToString("HH:mm:ss.fff")}\t(1.0)消息接收:\r\n\t[deliveryTag={_message.DeliveryTag}]\r\n\t[consumerID={_message.ConsumerTag}]\r\n\t[exchange={_message.Exchange}]\r\n\t[routingKey={_message.RoutingKey}]\r\n\t[content={_content}]");
    
    ????????????????????????throw?new?Exception("模拟消息处理失败效果。");
    
    ????????????????????????//处理成功时
    ????????????????????????Console.WriteLine($"{DateTime.Now.ToString("HH:mm:ss.fff")}\t(1.1)处理成功:\r\n\t[deliveryTag={_message.DeliveryTag}]");
    
    ????????????????????????//消息确认?(销毁当前消息)
    ????????????????????????_channel.BasicAck(deliveryTag:?_message.DeliveryTag,?multiple:?false);
    ????????????????????}
    ????????????????????#endregion
    ????????????????????catch?(Exception?ex)
    ????????????????????#region?消息处理失败时
    ????????????????????{
    ????????????????????????var?retryCount?=?(long)(_death?["count"]????default(long));?//查询当前消息被重新投递的次数?(首次则为0)
    
    ????????????????????????Console.WriteLine($"{DateTime.Now.ToString("HH:mm:ss.fff")}\t(1.2)处理失败:\r\n\t[deliveryTag={_message.DeliveryTag}]\r\n\t[retryCount={retryCount}]");
    
    ????????????????????????if?(retryCount?>=?2)
    ????????????????????????#region?投递第3次还没消费成功时,就转发给?exchangeFail?交换机
    ????????????????????????{
    ????????????????????????????//消息拒绝(投递给死信交换机,也就是上边定义的?("x-dead-letter-exchange",?_exchangeFail))
    ????????????????????????????_channel.BasicNack(deliveryTag:?_message.DeliveryTag,?multiple:?false,?requeue:?false);
    ????????????????????????}
    ????????????????????????#endregion
    ????????????????????????else
    ????????????????????????#region?否则转发给?exchangeRetry?交换机
    ????????????????????????{
    ????????????????????????????var?interval?=?(retryCount?+?1)?*?10;?//定义下一次投递的间隔时间?(单位:秒)
    
    ????????????????????????????//定义下一次投递的间隔时间?(单位:毫秒)
    ????????????????????????????_message.BasicProperties.Expiration?=?(interval?*?1000).ToString();
    
    ????????????????????????????//将消息投递给?_exchangeRetry?(会自动增加?death?次数)
    ????????????????????????????_channel.BasicPublish(exchange:?_exchangeRetry,?routingKey:?_message.RoutingKey,?basicProperties:?_message.BasicProperties,?body:?_message.Body);
    
    ????????????????????????????//消息确认?(销毁当前消息)
    ????????????????????????????_channel.BasicAck(deliveryTag:?_message.DeliveryTag,?multiple:?false);
    ????????????????????????}
    ????????????????????????#endregion
    ????????????????????}
    ????????????????????#endregion
    ????????????????};
    ????????????????channel.BasicConsume(queue:?_queueNormal,?autoAck:?false,?consumer:?consumer);
    ????????????}
    ????????????#endregion
    ????????}
    

    消息延时推送

    • 过期队列+死信交换机

    • RabbitMQ 3.6.x 开始可以使用延迟插件,交换机类型选择x-delayed-message(延迟将数据放入队列)

    ???public?static?void?ConsumerMessage()
    ???{
    ????????????var?connection?=?RabbitMQHelper.GetConnection();
    ????????????var?channel?=?connection.CreateModel();
    
    ????????????var?exchangeArgumets?=?new?Dictionary<string,?object>
    ????????????{
    ????????????????{?"x-delayed-type",?"topic"?}??//延迟交换机的类型
    ????????????};
    ????????????channel.ExchangeDeclare("delay_exchange",?"x-delayed-message",?true,?false,?exchangeArgumets);
    
    ????????????//?创建队列
    ????????????string?queueName1?=?"delay_queue1";
    ????????????channel.QueueDeclare(queueName1,?false,?false,?false,?null);
    ????????????string?queueName2?=?"delay_queue2";
    ????????????channel.QueueDeclare(queueName2,?false,?false,?false,?null);
    ????????????string?queueName3?=?"delay_queue3";
    ????????????channel.QueueDeclare(queueName3,?false,?false,?false,?null);
    ????????????//?绑定到交互机
    ????????????channel.QueueBind(queue:?queueName1,?exchange:?"delay_exchange",?routingKey:?"delayed-direct1");
    ????????????channel.QueueBind(queue:?queueName2,?exchange:?"delay_exchange",?routingKey:?"delayed-direct2");
    ????????????channel.QueueBind(queue:?queueName3,?exchange:?"delay_exchange",?routingKey:?"delayed-direct3");
    
    ????????????var?properties?=?channel.CreateBasicProperties();
    ????????????properties.Persistent?=?true;?//?标记消息持久化
    
    ????????????Console.WriteLine("?[*]?Waiting?for?messages.");
    
    ????????????var?consumer?=?new?EventingBasicConsumer(channel);
    ????????????consumer.Received?+=?(model,?ea)?=>
    ????????????{
    ????????????????var?body?=?ea.Body;
    ????????????????var?message?=?Encoding.UTF8.GetString(body.ToArray());
    ????????????????var?routingKey?=?ea.RoutingKey;
    ????????????????var?_headers?=?ea.BasicProperties.Headers;????????//消息头
    ????????????????int?delay?=?0;
    ????????????????if?(_headers?==?null)
    ????????????????{
    ????????????????????ea.BasicProperties.Headers?=?new?Dictionary<string,?object>();
    ????????????????}
    ????????????????else??if?(?_headers.ContainsKey("x-delay"))
    ????????????????{
    ????????????????????delay?=?Convert.ToInt32(ea.BasicProperties.Headers["x-delay"]);
    ????????????????????delay?=?delay?+?20000;
    ????????????????}
    ????????????????ea.BasicProperties.Headers["x-delay"]?=?delay;?//消息头设置消息延迟的时间
    ????????????????Console.WriteLine($"?{DateTime.Now}=={delay}");
    ????????????????Console.WriteLine("?[x]?Received?'{0}':'{1}'",?routingKey,?message);
    ???????????????
    ????????????????channel.BasicPublish(ea.Exchange,?ea.RoutingKey,?basicProperties:?ea.BasicProperties,?body);
    ????????????????channel.BasicAck(deliveryTag:?ea.DeliveryTag,?multiple:?false);
    ????????????};
    
    ????????????channel.BasicConsume(queue:?queueName3,
    ?????????????????????????????????autoAck:?false,
    ?????????????????????????????????consumer:?consumer);
    
    ????????????Console.WriteLine("?Press?[enter]?to?exit.");
    ????????????Console.ReadLine();
    ????????}
    cs
    下一篇:没有了