当前位置 博文首页 > 风吹头蛋凉的博客:tp6简单实现消息队列

    风吹头蛋凉的博客:tp6简单实现消息队列

    作者:[db:作者] 时间:2021-06-29 19:27

    tp6实现消息队列

    队列是什么?

    从本质上说消息对列就是一个队列结构的中间件,也就是说消息放入这个中间件之后就可以直接返回,并不需要系统立即处理,而另外会有一个程序读取这些数据,并按顺序进行逐次处理,在处理一些耗时任务时,非常有用。

    当你遇到一个并发特别大并且耗时特别长同时还不需要立即返回处理结果,使用消息队列可以解决这类问题。

    应用场景

    • 耗时较久业务处理
    • 并发量较高接口

    tp6实现消息队列

    • 使用composer安装queue
    composer require topthink/think-queue
    
    • 首先配置/config/queue.php中的redis配置,修改为自己的redis地址
      在这里插入图片描述
      2.修改原生消息队列
      把原来的方法稍作调整,把延迟消息队列和正常的放在一起方便调用
      整理出多任务和单任务的快捷使用方式
    namespace learn\utils;
    use learn\traits\ErrorTrait;
    use think\facade\Config;
    use think\facade\Queue as QueueThink;
    use think\facade\Log;
    /**
    * Class Queue
    * @package learn\utils
    * @method $this do(string $do) 设置任务执行方法
    * @method $this job(string $job) 设置任务执行类名
    * @method $this errorCount(int $errorCount) 执行失败次数
    * @method $this data(...$data) 执行数据
    * @method $this secs(int $secs) 延迟执行秒数
    * @method $this log($log) 记录日志
    */
    class Queue
    {
    
       use ErrorTrait;
       /**
        * 任务执行
        * @var string
        */
       protected $do = 'doJob';
    
       /**
        * 默认任务执行方法名
        * @var string
        */
       protected $defaultDo;
    
       /**
        * 任务类名
        * @var string
        */
       protected $job;
    
       /**
        * 错误次数
        * @var int
        */
       protected $errorCount = 3;
    
       /**
        * 数据
        * @var array|string
        */
       protected $data;
    
       /**
        * 任务名
        * @var null
        */
       protected $queueName = null;
    
       /**
        * 延迟执行秒数
        * @var int
        */
       protected $secs = 0;
    
       /**
        * 记录日志
        * @var string|callable|array
        */
       protected $log;
    
       /**
        * @var array
        */
       protected $rules = ['do', 'data', 'errorCount', 'job', 'secs', 'log'];
    
       /**
        * @var static
        */
       protected static $instance;
    
       /**
        * Queue constructor.
        */
       protected function __construct()
       {
           $this->defaultDo = $this->do;
       }
    
       /**
        * @return static
        */
       public static function instance()
       {
           if (is_null(self::$instance)) {
               self::$instance = new static();
           }
           return self::$instance;
       }
    
       /**
        * 放入消息队列
        * @param array|null $data
        * @return mixed
        */
       public function push(?array $data = null)
       {
           if (!$this->job) {
               return $this->setError('需要执行的队列类必须存在');
           }
           $res = QueueThink::{$this->action()}(...$this->getValues($data));
           if(!$res){
               $res = QueueThink::{$this->action()}(...$this->getValues($data));
               if(!$res){
                   Log::error('加入队列失败,参数:'.json_encode($this->getValues($data)));
               }
           }
           $this->clean();
           return $res;
       }
    
       /**
        * 清除数据
        */
       public function clean()
       {
           $this->secs = 0;
           $this->data = [];
           $this->log = null;
           $this->queueName = null;
           $this->errorCount = 3;
           $this->do = $this->defaultDo;
       }
    
       /**
        * 获取任务方式
        * @return string
        */
       protected function action()
       {
           return $this->secs ? 'later' : 'push';
       }
    
       /**
        * 获取参数
        * @param $data
        * @return array
        */
       protected function getValues($data)
       {
           $jobData['data'] = $data ?: $this->data;
           $jobData['do'] = $this->do;
           $jobData['errorCount'] = $this->errorCount;
           $jobData['log'] = $this->log;
           if ($this->do != $this->defaultDo) {
               $this->job .= '@' . Config::get('queue.prefix', 'eb_') . $this->do;
           }
           if ($this->secs) {
               return [$this->secs, $this->job, $jobData, $this->queueName];
           } else {
               return [$this->job, $jobData, $this->queueName];
           }
       }
    
       /**
        * @param $name
        * @param $arguments
        * @return $this
        */
       public function __call($name, $arguments)
       {
           if (in_array($name, $this->rules)) {
               if ($name === 'data') {
                   $this->{$name} = $arguments;
               } else {
                   $this->{$name} = $arguments[0] ?? null;
               }
               return $this;
           } else {
               throw new \RuntimeException('Method does not exist' . __CLASS__ . '->' . $name . '()');
           }
       }
    
    
    • 调用消息队列,在控制层调用
    Queue::instance()  
               ->do('test')  //具体方法
               ->job(UserCreate::class)   //执行任务类名
               ->data('99')->push();  //参数
    
    • 队列处理逻辑
        public function test(){
         try {
               Log::save('success');
          } catch (\Throwable $e) {
              Log::error('打印失败 原因:' . $e->getMessage());
         };
           return true;
       }
    
    
    • 开启消息队列
     php think queue:work
    

    windows用户可在项目根目录开启命令行输入命令
    Linux推荐使用Supervisor,命令都是一样的
    启动报错,按照提示解禁对应函数就行了
    启动后就行看到队列加入和消费的日志

    下一篇:没有了