简介
对于很多应用场景,消息队列功能是需要的。例如:
1、未支付订单,2小时后自动取消;
2、10分钟后推送模板消息
3、数据统计延迟执行等
WxBot中的消息队列使用基于think-queue,可配合supervisor使用,保证进程常驻。
注意启动用户要设置成www。
推荐supervisor配置如下(建议直接使用宝塔配置):
[program:wxbot] directory= 项目根目录完全路径 command=php think queue:listen --queue wxbot autorestart=true autostart=true startsecs=1 stderr_logfile=项目根目录完全路径/runtime/log/stderr.log stdout_logfile=项目根目录完全路径/runtime/log/stdout.log
使用
WxBot已经把消息队列这块进行统一封装,先来看看最简单的使用:
/**
* 消息队列测试
* @author: fudaoji<fdj@kuryun.cn>
*/
public function queue(){
controller('common/TaskQueue', 'event')->push([
'delay' => 2, //表示延迟2秒后执行
'params' => [
'do' => ['\\app\\common\\event\\TaskQueue', 'testTask'] //指定了具体的消费者行为
'key1' => 'value1', //其他需要传给消费的参数都可以在此处添加
'key2' => 'value2'
]
]);
echo '任务入队列';
}可以看到,短短几行代码我们就完成了一个任务的入队列操作。
接下来,我们详细分析一下代码的执行流程:
1、我们将消息队列的生产者放在了app\common\event\TaskQueue中定义,如下:
namespace app\common\event;
use ky\ErrorCode;
use think\facade\Log;
use think\Queue;
class TaskQueue extends Base
{
/**
* 任务入队列
* @param array $params
* @author: fudaoji<fdj@kuryun.cn>
*/
public function push($params = []){
$worker = "app\\common\\job\\Task"; //消费者
$queue = config('queue_name'); //队列名称,可在.env中修改
if(empty($params['params']['do'])){
abort(ErrorCode::CatchException, '缺少任务执行者'); //具体的消费者行为
}
if(empty($params['delay'])){
Queue::push($worker, $params['params'], $queue);
}else{
Queue::later($params['delay'], $worker, $params['params'], $queue);
}
}
/**
* 测试消费者
* @param $data 上文中调用处放入的参数
* @author: fudaoji<fdj@kuryun.cn>
*/
public function testTask($data){
echo '测试任务队列执行';
$job = $data['job'];
if ($job->attempts() > 2) {
echo '我要删除任务了';
//通过这个方法可以检查这个任务已经重试了几次了
$job->delete();
}
}
}2、从app\common\event\TaskQueue中可以看到消费者容器 : app\\common\\job\\Task ,定义如下:
namespace app\common\job;
use think\facade\Log;
use think\queue\Job;
class Task
{
/**
* 任务worker
* @param Job $job
* @param $data
* @author: fudaoji<fdj@kuryun.cn>
*/
public function fire(Job $job, $data){
try {
if(isset($data['do'])){
$data['job'] = $job; //将job抛给开发者
if(is_string($data['do'])){//全局函数
$callback = $data['do'];
}else{ //对象方法
$obj = new $data['do'][0]();
$callback = [$obj, $data['do'][1]];
}
//echo '==================='.json_encode($data).'==================';
call_user_func_array($callback, [$data]);
}else{
$job->delete();
echo(date('Y-m-d H:i:s') . '缺少do参数' . json_encode($data));
}
}catch (\Exception $e){
Log::error($e->getMessage());
$job->delete();
}
}
public function failed($data){
echo "任务执行失败, 参数:".json_encode($data);
}
}这个类其实是任务处理的中转站,通过传入参数指派具体的消费者(某个类中的某个方法)
3、在测试演示中,实际的消费者是: app\common\event\TaskQueue->testTask