消息队列

简介

对于很多应用场景,消息队列功能是需要的。例如:

1、未支付订单,2小时后自动取消;

2、10分钟后推送模板消息

3、数据统计延迟执行等


WxBot中的消息队列使用基于think-queue,可配合supervisor使用,保证进程常驻。

推荐supervisor配置如下(建议直接使用宝塔配置):

[program:wxbot]
directory= 项目根目录完全路径
command=php think queue:listen --queue  wxbot  


autorestart=true
autostart=true

startsecs=1
stderr_logfile=项目根目录完全路径/runtime/log/stderr.log
stdout_logfile=项目根目录完全路径/runtime/log/stdout.log


使用

WxBot已经把消息队列这块进行统一封装,先来看看最简单的使用:

/**
 * 消息队列测试
 * @author: fudaoji<fdj@kuryun.cn>
 */
public function queue(){
    controller('common/TaskQueue', 'event')->push([
        'delay' => 2,  //表示延迟2秒后执行
        'params' => [
            'do' => ['\\app\\common\\event\\TaskQueue', 'testTask']  //指定了具体的消费者行为
            'key1' => 'value1',     //其他需要传给消费的参数都可以在此处添加
            'key2' => 'value2'
        ]
    ]);
    echo '任务入队列';
}

可以看到,短短几行代码我们就完成了一个任务的入队列操作。

接下来,我们详细分析一下代码的执行流程:

1、我们将消息队列的生产者放在了app\common\event\TaskQueue中定义,如下:

namespace app\common\event;
use ky\ErrorCode;
use think\facade\Log;
use think\Queue;

class TaskQueue extends Base
{

    /**
     * 任务入队列
     * @param array $params
     * @author: fudaoji<fdj@kuryun.cn>
     */
    public function push($params = []){
        $worker = "app\\common\\job\\Task";   //消费者
        $queue = config('queue_name');        //队列名称,可在.env中修改
        if(empty($params['params']['do'])){
            abort(ErrorCode::CatchException, '缺少任务执行者');    //具体的消费者行为
        }
        if(empty($params['delay'])){
            Queue::push($worker, $params['params'], $queue);
        }else{
            Queue::later($params['delay'], $worker, $params['params'], $queue);
        }
    }

    /**
     * 测试消费者
     * @param $data  上文中调用处放入的参数
     * @author: fudaoji<fdj@kuryun.cn>
     */
    public function testTask($data){
        echo '测试任务队列执行';
        $job = $data['job'];
        if ($job->attempts() > 2) {
            echo '我要删除任务了';
            //通过这个方法可以检查这个任务已经重试了几次了
            $job->delete();
        }
    }
}

2、从app\common\event\TaskQueue中可以看到消费者容器 : app\\common\\job\\Task ,定义如下:

namespace app\common\job;

use think\facade\Log;
use think\queue\Job;

class Task
{

    /**
     * 任务worker
     * @param Job $job
     * @param $data
     * @author: fudaoji<fdj@kuryun.cn>
     */
    public function fire(Job $job, $data){
        try {
            if(isset($data['do'])){
                $data['job'] = $job; //将job抛给开发者
                if(is_string($data['do'])){//全局函数
                    $callback = $data['do'];
                }else{ //对象方法
                    $obj = new $data['do'][0]();
                    $callback = [$obj, $data['do'][1]];
                }
                //echo '==================='.json_encode($data).'==================';
                call_user_func_array($callback, [$data]);
            }else{
                $job->delete(); 
                echo(date('Y-m-d H:i:s') . '缺少do参数' . json_encode($data));
            }
        }catch (\Exception $e){
            Log::error($e->getMessage());
            $job->delete();
        }
    }

    public function failed($data){
        echo "任务执行失败, 参数:".json_encode($data);
    }

}

这个类其实是任务处理的中转站,通过传入参数指派具体的消费者(某个类中的某个方法)


3、在测试演示中,实际的消费者是: app\common\event\TaskQueue->testTask