php - Laravel5.2队列驱动expire参数设置带来的重复执行问题 数据库驱动 - 个人文章 - SegmentFault 思否


本站和网页 https://segmentfault.com/a/1190000009325149 的作者无关,不对其内容负责。快照谨为网络故障时之索引,不代表被搜索网站的即时页面。

php - Laravel5.2队列驱动expire参数设置带来的重复执行问题 数据库驱动 - 个人文章 - SegmentFault 思否注册登录问答专栏标签招聘活动发现✓使用“Bing”搜本站使用“Google”搜本站使用“百度”搜本站站内搜索注册登录Laravel5.2队列驱动expire参数设置带来的重复执行问题 数据库驱动大尾狼603关注作者首页专栏php文章详情3Laravel5.2队列驱动expire参数设置带来的重复执行问题 数据库驱动大尾狼603发布于2017-05-07  
'connections' => [
....
'database' => [
'driver' => 'database',
'table' => 'jobs',
'queue' => 'default',
'expire' => 60,
],
'redis' => [
'driver' => 'redis',
'connection' => 'default',
'queue' => 'default',
'expire' => 180,
],
....
],
Laravel5.2队列驱动config/queue.php配置文件,“database”和“redis”有个expire参数,手册上解释是“队列任务过期时间(秒)”,默认为60秒。
(注:5.2和之后的配置文件发生了变化,改为'retry_after' 参数,具体见手册)
网上搜了一下这个配置,没有太多说明,但是实际使用的过程中,发现对于执行时间超过expire设置时间的队列进程,还有使用队列进行分布式程序部署,这个参数和这种设计模式是个大坑。。。
发现这个问题是想使用分布式程序部署处理队列,两台服务器部署Laravel框架artisan脚本,连接一个MYSQL数据库,使用一张jobs队列表。
部署的后,分别启动两台服务器的脚本,发现后执行的脚本,在队列驱动中取数据,如MYSQL的jobs表,遇到先执行的脚本队列数据时不会跳过,而是把这条数据视为Failed,储存一条新数据到failed_jobs表(Laravel队列失败时会将队列数据储存到failed_jobs表),造成数据重复。
之前在一台服务器启动3个进程执行脚本,并不会发生这种错误,后执行的脚本不会取得前一个进程的队列数据,更不会判断成Failed,多服务处理时是什么原因造成队列驱动中的数据错误呢?
根据队列执行的流程,程序执行时,队列到队列驱动中取任务,获得任务的过程队列驱动应该做事物处理,这样第二个进程取任务会跳过正在执行的队列数据。
查了一些资料,了解了Laravel队列的原理,最后还得看Queue的源码。
Laravel的Queue的源码都在IlluminateQueue目录下。
先分析以MYSQL为驱动的jobs表:
CREATE TABLE `jobs` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
`queue` varchar(255) COLLATE utf8_unicode_ci NOT NULL,
`payload` longtext COLLATE utf8_unicode_ci NOT NULL,
`attempts` tinyint(3) unsigned NOT NULL,
`reserved` tinyint(3) unsigned NOT NULL,
`reserved_at` int(10) unsigned DEFAULT NULL,
`available_at` int(10) unsigned NOT NULL,
`created_at` int(10) unsigned NOT NULL,
PRIMARY KEY (`id`),
KEY `jobs_queue_reserved_reserved_at_index` (`queue`,`reserved`,`reserved_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;
手册中主要介绍了队列任务的保存,payload字段储存的是序列化后的任务,Laravel队列可以将数据模型序列化,执行时候队列系统会自动从数据库中获取整个模型实例,具体说明见手册。
但是其他几个状态和时间字段才是保证队列事物处理的关键字段。
“attempts”执行次数,“reserved”执行的状态,“reserved_at”执行开始时间,‘available_at’预订执行时间,‘created_at’是队列创建时间。
监听事件的脚本有Listener.php和Worker.php两个脚本,看源码说明Listener可以处理指定的队列,connection参数,但是实际上最后都是通过work来处理队列的。Laravel5.4已经取消了queue:listen参数,都用queue:work来执行。不过我这里说的是Laravel5.2的问题,不知道是不是下面的原因使Laravel优化去掉了listen。
继续分析队列处理的Worker类的源码,取队列数据时用pop方法,这个方法会根据传递的驱动类型如database或redis,调用该驱动的pop方法。
$connection = $this->manager->connection($connectionName);
$job = $this->getNextJob($connection, $queue);
// If we're able to pull a job off of the stack, we will process it and
// then immediately return back out. If there is no job on the queue
// we will "sleep" the worker for the specified number of seconds.
if (! is_null($job)) {
return $this->process(
$this->manager->getName($connectionName), $job, $maxTries, $delay
);
下面是DatabaseQueue.php的pop方法。
/**
* Pop the next job off of the queue.
* @param string $queue
* @return \Illuminate\Contracts\Queue\Job|null
*/
public function pop($queue = null)
$queue = $this->getQueue($queue);
$this->database->beginTransaction();
if ($job = $this->getNextAvailableJob($queue)) {
$job = $this->markJobAsReserved($job);
$this->database->commit();
return new DatabaseJob(
$this->container, $this, $job, $queue
);
$this->database->commit();
取数据的过程事物处理已经打开。
取队列数据的核心还是$this->getNextAvailableJob($queue)。
打开sql日志,看看队列数据是如何查询出来的。
/**
* Get the next available job for the queue.
* @param string|null $queue
* @return \StdClass|null
*/
protected function getNextAvailableJob($queue)
$this->database->enableQueryLog();
$job = $this->database->table($this->table)
->lockForUpdate()
->where('queue', $this->getQueue($queue))
->where(function ($query) {
$this->isAvailable($query);
$this->isReservedButExpired($query);
})
->orderBy('id', 'asc')
->first();
var_dump($this->database->getQueryLog());
return $job ? (object) $job : null;
array(1) {
[0] =>
array(3) {
'query' =>
string(165) "select * from `jobs` where `queue` = ? and ((`reserved` = ? and `available_at` <= ?) or (`reserved` = ? and `reserved_at` <= ?)) order by `id` asc limit 1 for update"
'bindings' =>
array(5) {
[0] =>
string(7) "default"
[1] =>
int(0)
[2] =>
int(1493634233)
[3] =>
int(1)
[4] =>
int(1493634173)
'time' =>
double(1.55)
从sql语句中可以看出,取队列数据有两个条件
reserved为0时,available_at时间小于当前时间,这个条件是待执行的队列;reserved为1时,reserved_at执行开始时间小于计算出的时间($this->isReservedButExpired),即当前时间减去超时秒Carbon::now()->subSeconds($this->expire)->getTimestamp(),这个条件是判断队列任务是否过期。
整个select过程是 “for update”的,有排他锁。
取得符合条件的队列后
/**
* Mark the given job ID as reserved.
* @param \stdClass $job
* @return \stdClass
*/
protected function markJobAsReserved($job)
$job->reserved = 1;
$job->attempts = $job->attempts + 1;
$job->reserved_at = $this->getTime();
$this->database->table($this->table)->where('id', $job->id)->update([
'reserved' => $job->reserved,
'reserved_at' => $job->reserved_at,
'attempts' => $job->attempts,
]);
return $job;
程序会更新该条数据,并且更新完后即commit。
同一服务器,第二个进程取数据时候遇到悲观锁,需要等第一个进程取数据更新reserved和时间后执行。也就是说Laravel队列使用database时,并发的进程并不是同时取多条数据,而是取同一条数据等待其中一个进程update数据状态和执行时间,队列取得数据成功后第一个操作就是更新,所以第二个进程不会取到第一进程的同样数据,除非是队列过期。
在DatabaseQueue.php的pop方法中,取得队列数据后,“$this->database->commit(); ”前 sleep(10),会很明显的看到第二队列没有获取其他队列数据,说明“for update”只是update级排他锁,不会排斥select。
Laravel使用database队列有时候会有阻塞现象,不知道是不是这个原因造成的。
如果执行时间过长,超过‘expire’参数设置时间,第二队列会取得第一个队列数据,判断超时,这时候就会根据设置的最大执行次数tries来判断是插入新队列数据继续尝试执行,还是插入到错误队列“failed_jobs”表判断队列执行失败。
以上就是Laravel使用mysql执行队列的逻辑,之前提到的两台服务器部署Laravel框架执行artisan脚本,一个jobs表队列Failed的问题就是服务器时间不一致的原因,后一台服务器执行时候将前一队列数据判断为超时而插入到“failed_jobs”一条新数据,已经达到最大失败次数的情况,否则还会插入新的数据继续尝试。
所以queue:listen的执行时间参数 --timeout=60,一定要设置小于队列任务过期时间expire参数!
还有,Laravel5.2的queue:work并没有--timeout=60这个参数。。。。。
最后是队列执行完的处理逻辑。
如果队列执行成功会删除jobs的数据,这没什么问题。如果失败,包括超时、异常等,会根据设置的最大失败次数判断是否插入一条新数据,或者插入一条Failed数据到“failed_jobs”表。
出现错误时,handleJobException的异常处理调用DatabaseQueue.php的release方法,$job->release($delay),最终是pushToDatabase实现。
插入新数据时候,attempts是失败次数,reserved为0,available_at为当前时间戳加上延时时间参数,这样整个队列处理就形成了完整的数据逻辑操作。
Laravel5.4对队列功能进行了很大的修改,手册中的提示
任务过期和超时
任务执行时间
在配置文件 config/queue.php 中,每个连接都定义了 retry_after 项。该配置项的目的是定义任务在执行以后多少秒后释放回队列。如果retry_after 设定的值为 90, 任务在运行 90 秒后还未完成,那么将被释放回队列而不是删除掉。毫无疑问,你需要把 retry_after 的值设定为任务执行时间的最大可能值。
Laravel5.4去掉了queue的listen命令,work也增加了超时参数。Laravel5.5出来的时候应该升级上去。
附录:Laravel5.2测试的脚本,之前网上搜出来的都比较早,还是把job写成命令的方式,其实5.2以后job使用非常简单。
jobs下定义job任务,handle可以增加一些测试方案,比如我这种抛出异常,直接Failed的
class MyJob extends Job implements ShouldQueue
use InteractsWithQueue, SerializesModels;
private $key;
private $value;
/**
* Create a new job instance.
* @return void
*/
public function __construct($key, $value)
$this->key = $key;
$this->value = $value;
/**
* Execute the job.
* @return void
*/
public function handle()
for($i=0;$i<20;$i++){
echo "$i\n";
sleep(1);
echo "sss\t".$this->key."\t".date("Y-m-d H:i:s")."\n";
throw new \Exception("测试\n");
// Redis::hset('queue.test', $this->key, $this->value);
public function failed()
dump('failed');
控制器访问设置任务队列,key和value之前用了测试redis插入的,可以按自己的测试方案设置job参数。
for($i = 0; $i < 5; $i ++) {
echo "$i";
$job = (new MyJob($i, $i))->delay(20);
$this->dispatch($job);
我的例子设置了5个队列,开启多个shell并发执行artisan测试吧。
本来想将redis队列代码读完,一起发出来的,最近事情太多,redis代码也没怎么看。
redis驱动可以参考 http://www.cnblogs.com/z12987... 这篇文章对Laravel队列redis驱动逻辑介绍的很详细了,redis驱动使用的list和zset结构储存队列,执行过程会移除转存队列,没有数据库的“for update” 操作,所以应该不是存在队列阻塞的情况。
BUT队列任务过期时间设置和数据库驱动是一样的,所以同样
queue:listen的执行时间参数 --timeout=60,一定要设置小于队列任务过期时间expire参数!
终于写完了。。。
phplaravel队列阅读 4.7k发布于 2017-05-07 赞3收藏3分享本作品系原创,采用《署名-非商业性使用-禁止演绎 4.0 国际》许可协议大尾狼60 声望3 粉丝关注作者0 条评论得票最新提交评论评论支持部分 Markdown 语法:**粗体** _斜体_ [链接](http://example.com) `代码` - 列表 > 引用。你还可以使用 @ 来通知其他用户。推荐阅读Laravel框架Mysql连接多库时事物处理需要指定库如题其实这不是Laravel框架的原因,网上有查到java程序,使用Mysql多库进行事物处理,也需要指定库。Laravel框架项目中使用了多个库,将业务分库储存,Model中指定了连接的库。 protected $connection = 'mysql_b...大尾狼阅读 1.2kOne 一个简洁的博客、微博客系统代码:[链接]文档:[链接]系统预览首页:微博列表:微博详细:文章列表:文章详细:归档:搜索,目前只能依据分类、标签搜索😀:管理后台:Eyeswap赞 45阅读 2.2k评论 1怎样用 PHP 来实现枚举?在数学和计算机科学理论中,一个集的枚举是列出某些有穷序列集的所有成员的程序,或者是一种特定类型对象的计数。这两种类型经常(但不总是)重叠。枚举是一个被命名的整型常数的集合,枚举在日常生活中很常见,...唯一丶赞 25阅读 6.1k评论 4PHP 性能终极 Debug - 生成火焰图2012 年刚开始学习 PHP,那个时候的 PHP 应用很简单,没有太多复杂的设计模式,像依赖注入,工厂模式这些还几乎没有,Reflection API 那时也才刚出来,一个 PHP 应用就是一些包了前端代码的脚本文件,正是因为 PH...路易港赞 5阅读 3.4kLaravel : Syntax error or access violation: 1055 Error?上面这样一段代码, 测试服务器很好, 上线后报错了.Syntax error or access violation: 1055 Error : MySQL : isn't in GROUP BY云云小金子赞 7阅读 12k评论 4golang实现php里的serialize()和unserialize()序列和反序列方法Golang 实现 PHP里的 serialize() 、 unserialize()安装 {代码...} 用法 {代码...} github地址:[链接]JonLee赞 2阅读 6.6k郑方方打怪升级日记 — 初识HTML5与CSS3任务名称:响应式砸蛋页面任务背景前辈:方方啊,最近项目也没什么事情,你看这个砸蛋页面不是很好看,要不你做一个响应式砸蛋页面吧?系统:郑方方接下前辈的任务 - 郑方方自动解析任务步骤任务:响应式砸蛋页面HTML5与C...郑方方赞 1阅读 3.1k评论 3大尾狼60 声望3 粉丝关注作者宣传栏文章目录跟随▲33产品热门问答热门专栏热门课程最新活动翻译酷工作课程Java 开发课程PHP 开发课程Python 开发课程前端开发课程移动开发课程资源每周精选用户排行榜帮助中心建议反馈合作关于我们广告投放职位发布讲师招募联系我们合作伙伴关注产品技术日志社区运营日志市场运营日志团队日志社区访谈条款服务协议隐私政策下载 AppCopyright © 2011-2022 SegmentFault. 当前呈现版本 22.12.19浙ICP备15005796号-2浙公网安备33010602002000号ICP 经营许可 浙B2-20201554杭州堆栈科技有限公司版权所有