简介
对于很多应用场景,消息队列功能是需要的。例如:
1、未支付订单,2小时后自动取消;
2、10分钟后推送模板消息
3、数据统计延迟执行等
WxBot中的消息队列使用基于think-queue,可配合supervisor使用,保证进程常驻。
注意启动用户要设置成www。
推荐supervisor配置如下(建议直接使用宝塔配置):
[program:wxbot] directory= 项目根目录完全路径 command=php think queue:listen --queue wxbot autorestart=true autostart=true startsecs=1 stderr_logfile=项目根目录完全路径/runtime/log/stderr.log stdout_logfile=项目根目录完全路径/runtime/log/stdout.log
使用
WxBot已经把消息队列这块进行统一封装,先来看看最简单的使用:
/** * 消息队列测试 * @author: fudaoji<fdj@kuryun.cn> */ public function queue(){ controller('common/TaskQueue', 'event')->push([ 'delay' => 2, //表示延迟2秒后执行 'params' => [ 'do' => ['\\app\\common\\event\\TaskQueue', 'testTask'] //指定了具体的消费者行为 'key1' => 'value1', //其他需要传给消费的参数都可以在此处添加 'key2' => 'value2' ] ]); echo '任务入队列'; }
可以看到,短短几行代码我们就完成了一个任务的入队列操作。
接下来,我们详细分析一下代码的执行流程:
1、我们将消息队列的生产者放在了app\common\event\TaskQueue中定义,如下:
namespace app\common\event; use ky\ErrorCode; use think\facade\Log; use think\Queue; class TaskQueue extends Base { /** * 任务入队列 * @param array $params * @author: fudaoji<fdj@kuryun.cn> */ public function push($params = []){ $worker = "app\\common\\job\\Task"; //消费者 $queue = config('queue_name'); //队列名称,可在.env中修改 if(empty($params['params']['do'])){ abort(ErrorCode::CatchException, '缺少任务执行者'); //具体的消费者行为 } if(empty($params['delay'])){ Queue::push($worker, $params['params'], $queue); }else{ Queue::later($params['delay'], $worker, $params['params'], $queue); } } /** * 测试消费者 * @param $data 上文中调用处放入的参数 * @author: fudaoji<fdj@kuryun.cn> */ public function testTask($data){ echo '测试任务队列执行'; $job = $data['job']; if ($job->attempts() > 2) { echo '我要删除任务了'; //通过这个方法可以检查这个任务已经重试了几次了 $job->delete(); } } }
2、从app\common\event\TaskQueue中可以看到消费者容器 : app\\common\\job\\Task ,定义如下:
namespace app\common\job; use think\facade\Log; use think\queue\Job; class Task { /** * 任务worker * @param Job $job * @param $data * @author: fudaoji<fdj@kuryun.cn> */ public function fire(Job $job, $data){ try { if(isset($data['do'])){ $data['job'] = $job; //将job抛给开发者 if(is_string($data['do'])){//全局函数 $callback = $data['do']; }else{ //对象方法 $obj = new $data['do'][0](); $callback = [$obj, $data['do'][1]]; } //echo '==================='.json_encode($data).'=================='; call_user_func_array($callback, [$data]); }else{ $job->delete(); echo(date('Y-m-d H:i:s') . '缺少do参数' . json_encode($data)); } }catch (\Exception $e){ Log::error($e->getMessage()); $job->delete(); } } public function failed($data){ echo "任务执行失败, 参数:".json_encode($data); } }
这个类其实是任务处理的中转站,通过传入参数指派具体的消费者(某个类中的某个方法)
3、在测试演示中,实际的消费者是: app\common\event\TaskQueue->testTask