当前位置 博文首页 > WeskyNet:十五、.net core(.NET 6)搭建RabbitMQ消息队列生产

    WeskyNet:十五、.net core(.NET 6)搭建RabbitMQ消息队列生产

    作者:WeskyNet 时间:2021-06-12 18:24

     搭建RabbitMQ简单通用的直连方法

     

    如果还没有MQ环境,可以参考上一篇的博客

    https://www.cnblogs.com/weskynet/p/14877932.html

     

    接下来开始.net core操作Rabbitmq有关的内容。我打算使用比较简单的单机的direct直连模式,来演示一下有关操作,基本套路差不多。

    首先,我在我的package包项目上面,添加对RabbitMQ.Client的引用:

     

     

    Common文件夹下,新建类库项目 Wsk.Core.RabbitMQ,并且引用package项目:

     

     

    在启动项目下的appsettings配置文件里面,新增一个访问RabbitMQ的配置信息:

     

     

    配置部分代码:

    "MQ": [
        {
          "Host": "127.0.0.1", // MQ安装的实际服务器IP地址
          "Port": 5672, // 服务端口号
          "User": "wesky", // 用户名
          "Password": "wesky123", // 密码
          "ExchangeName": "WeskyExchange", // 设定一个Exchange名称,
          "Durable": true // 是否启用持久化
        }
      ]

     

    然后,在实体类项目下,新建实体类MqConfigInfo,用于把读取的配置信息赋值到该实体类下:

     

    实体类代码:

    public class MqConfigInfo
        {
            public string Host { get; set; }
            public int Port { get; set; }
            public string User { get; set; }
            public string Password { get; set; }
            public string ExchangeName { get; set; }
            public bool Durable { get; set; }
        }
    View Code

     

    在刚刚新建的RabbitMQ类库项目下面,引用该实体类库项目,以及APppSettings项目。然后新建一个类,叫做ReadMqConfigHelper,以及它的interface接口,并且提供一个方法,叫ReadMqConfig,用来进行读取配置信息使用:

    读取配置信息类代码:

    public class ReadMqConfigHelper:IReadMqConfigHelper
        {
            private readonly ILogger<ReadMqConfigHelper> _logger;
            public ReadMqConfigHelper(ILogger<ReadMqConfigHelper>  logger)
            {
                _logger = logger;
            }
            public List<MqConfigInfo> ReadMqConfig()
            {
                try
                {
                    List<MqConfigInfo> config = AppHelper.ReadAppSettings<MqConfigInfo>(new string[] { "MQ" }); // 读取MQ配置信息
                    if (config.Any())
                    {
                        return config;
                    }
                    _logger.LogError($"获取MQ配置信息失败:没有可用数据集");
                    return null;
                }
                catch (Exception ex)
                {
                    _logger.LogError($"获取MQ配置信息失败:{ex.Message}");
                    return null;
                }
            }
        }
    View Code

     

    接着,新建类MqConnectionHelper以及接口IMqConnectionHelper,用于做MQ连接、创建生产者和消费者等有关操作:

     

     

    然后,新增一系列创建连接所需要的静态变量:

     

     

    然后,设置两个消费者队列,用来测试。以及添加生产者连接有关的配置和操作:

     

     

    然后,创建消费者连接方法:

     

     

    其中,StartListener下面提供了事件,用于手动确认消息接收。如果设置为自动,有可能导致消息丢失:

     

     

    然后,添加消息发布方法:

     

     

    interface接口里面,添加有关的接口,用于等下依赖注入使用:

     

     

    连接类部分的代码:

      public class MqConnectionHelper:IMqConnectionHelper
        {
    
            private readonly ILogger<MqConnectionHelper> _logger;
            public MqConnectionHelper(ILogger<MqConnectionHelper> logger)
            {
                _logger = logger;
    
                _connectionReceiveFactory = new IConnectionFactory[_costomerCount];
                _connectionReceive = new IConnection[_costomerCount];
                _modelReceive = new IModel[_costomerCount];
                _basicConsumer = new EventingBasicConsumer[_costomerCount];
    
            }
    
    
            /*
             备注:使用数组的部分,是给消费端用的。目前生产者只设置了一个,消费者可能存在多个。
                         当然,有条件的还可以上RabbitMQ集群进行处理,会更好玩一点。
             */
            private static IConnectionFactory _connectionSendFactory;  //RabbitMQ工厂 发送端
            private static IConnectionFactory[] _connectionReceiveFactory; //RabbitMQ工厂 接收端  
    
            private static IConnection _connectionSend; //连接 发送端
            private static IConnection[] _connectionReceive; //连接 消费端
    
            public static List<MqConfigInfo> _mqConfig; // 配置信息
    
            private static IModel _modelSend;  //通道  发送端
            private static IModel[] _modelReceive; //通道  消费端
    
            private static EventingBasicConsumer[] _basicConsumer;  // 事件
    
            /* 设置两个routingKey 和 队列名称,用来做测试使用*/
            public static int _costomerCount = 2;
            public static string[] _routingKey = new string[] {"WeskyNet001","WeskyNet002" };
            public static string[] _queueName = new string[] { "Queue001", "Queue002" };
    
            /// <summary>
            /// 生产者初始化连接配置
            /// </summary>
            public void SendFactoryConnectionInit()
            {
                _connectionSendFactory = new ConnectionFactory
                {
                    HostName = _mqConfig.FirstOrDefault().Host,
                    Port = _mqConfig.FirstOrDefault().Port,
                    UserName = _mqConfig.FirstOrDefault().User,
                    Password = _mqConfig.FirstOrDefault().Password
                };
            }
    
            /// <summary>
            /// 生产者连接
            /// </summary>
            public void SendFactoryConnection()
            {
    
                if (null != _connectionSend && _connectionSend.IsOpen)
                {
                    return; // 已有连接
                }
                _connectionSend = _connectionSendFactory.CreateConnection(); // 创建生产者连接
    
                if (null != _modelSend && _modelSend.IsOpen)
                {
                    return; // 已有通道
                }
                _modelSend = _connectionSend.CreateModel(); // 创建生产者通道
    
                _modelSend.ExchangeDeclare(_mqConfig.FirstOrDefault().ExchangeName, ExchangeType.Direct); // 定义交换机名称和类型(direct)
    
            }
    
            /// <summary>
            /// 消费者初始化连接配置
            /// </summary>
            public void ReceiveFactoryConnectionInit()
            {
                var factories = new ConnectionFactory
                {
                    HostName = _mqConfig.FirstOrDefault().Host,
                    Port = _mqConfig.FirstOrDefault().Port,
                    UserName = _mqConfig.FirstOrDefault().User,
                    Password = _mqConfig.FirstOrDefault().Password
                };
    
                for (int i = 0; i < _costomerCount; i++)
                {
                    _connectionReceiveFactory[i] = factories;  // 给每个消费者绑定一个连接工厂
                }
            }
    
            /// <summary>
            /// 消费者连接
            /// </summary>
            /// <param name="consumeIndex"></param>
            /// <param name="exchangeName"></param>
            /// <param name="routeKey"></param>
            /// <param name="queueName"></param>
            public void ConnectionReceive(int consumeIndex, string exchangeName, string routeKey, string queueName)
            {
                _logger.LogInformation($"开始连接RabbitMQ消费者:{routeKey}");
    
                if (null != _connectionReceive[consumeIndex] && _connectionReceive[consumeIndex].IsOpen)
                {
                    return;
                }
                _connectionReceive[consumeIndex] = _connectionReceiveFactory[consumeIndex].CreateConnection(); // 创建消费者连接
    
                if (null != _modelReceive[consumeIndex] && _modelReceive[consumeIndex].IsOpen)
                {
                    return;
                }
                _modelReceive[consumeIndex] = _connectionReceive[consumeIndex].CreateModel();  // 创建消费者通道
    
                _basicConsumer[consumeIndex] = new EventingBasicConsumer(_modelReceive[consumeIndex]);
    
                _modelReceive[consumeIndex].ExchangeDeclare(exchangeName, ExchangeType.Direct); // 定义交换机名称和类型  与生产者保持一致
    
                _modelReceive[consumeIndex].QueueDeclare(
                             queue: queueName, //消息队列名称
                             durable: _mqConfig.FirstOrDefault().Durable, // 是否可持久化,此处配置在文件中,默认全局持久化(true),也可以自定义更改
                             exclusive: false,
                             autoDelete: false,
                             arguments: null
               );  // 定义消费者队列
    
                
                _modelReceive[consumeIndex].QueueBind(queueName, exchangeName, routeKey); // 队列绑定给指定的交换机
    
                _modelReceive[consumeIndex].BasicQos(0, 1, false); // 设置消费者每次只接收一条消息
    
                StartListener((model, ea) =>
                {
                    byte[] message = ea.Body.ToArray(); // 接收到的消息
    
                    string msg = Encoding.UTF8.GetString(message);
    
                    _logger.LogInformation($"队列{queueName}接收到消息:{msg}");
                    Thread.Sleep(2000);
    
                    _modelReceive[consumeIndex].BasicAck(ea.DeliveryTag, true);
                }, queueName, consumeIndex);
    
            }
    
            /// <summary>
            /// 消费者接收消息的确认机制
            /// </summary>
            /// <param name="basicDeliverEventArgs"></param>
            /// <param name="queueName"></param>
            /// <param name="consumeIndex"></param>