队列
简介
在构建 Web 应用程序时,您可能会遇到一些任务,例如解析和存储上传的 CSV 文件,这些任务在典型的 Web 请求期间执行时间过长。 值得庆幸的是,Laravel 允许您轻松创建可以后台处理的队列任务。 通过将耗时的任务移至队列,您的应用程序可以闪电般的速度响应 Web 请求,并为您的客户提供更好的用户体验。
Laravel 队列为各种不同的队列后端(例如 Amazon SQS、Redis 甚至关系数据库)提供了统一的队列 API。
Laravel 的队列配置选项存储在应用程序的 config/queue.php
配置文件中。 在此文件中,您将找到框架包含的每个队列驱动程序的连接配置,包括数据库、Amazon SQS、Redis 和 Beanstalkd 驱动程序,以及一个将立即执行任务的同步驱动程序(用于本地开发期间)。 还包括一个 null
队列驱动程序,它会丢弃排队的任务。
Laravel 现在提供了 Horizon,这是一个漂亮的信息面板和配置系统,用于您的 Redis 驱动的队列。 查看完整的 Horizon 文档 以获取更多信息。
连接 vs. 队列
在开始使用 Laravel 队列之前,重要的是要了解“连接”和“队列”之间的区别。 在您的 config/queue.php
配置文件中,有一个 connections
配置数组。 此选项定义与后端队列服务(如 Amazon SQS、Beanstalk 或 Redis)的连接。 但是,任何给定的队列连接都可能具有多个“队列”,这些队列可以被认为是不同堆栈或成堆的排队任务。
请注意,queue
配置文件中的每个连接配置示例都包含一个 queue
属性。 这是任务在发送到给定连接时将调度的默认队列。 换句话说,如果您调度任务时没有明确定义应将其调度到哪个队列,则该任务将放置在连接配置的 queue
属性中定义的队列上
1use App\Jobs\ProcessPodcast;2 3// This job is sent to the default connection's default queue...4ProcessPodcast::dispatch();5 6// This job is sent to the default connection's "emails" queue...7ProcessPodcast::dispatch()->onQueue('emails');
某些应用程序可能永远不需要将任务推送到多个队列,而是倾向于使用一个简单的队列。 但是,将任务推送到多个队列对于希望优先处理或细分任务处理方式的应用程序可能特别有用,因为 Laravel 队列工作进程允许您按优先级指定应处理哪些队列。 例如,如果您将任务推送到 high
队列,则可以运行一个工作进程,为其提供更高的处理优先级
1php artisan queue:work --queue=high,default
驱动注意事项和先决条件
数据库
为了使用 database
队列驱动程序,您需要一个数据库表来保存任务。 通常,这包含在 Laravel 的默认 0001_01_01_000002_create_jobs_table.php
数据库迁移 中; 但是,如果您的应用程序不包含此迁移,则可以使用 make:queue-table
Artisan 命令来创建它
1php artisan make:queue-table2 3php artisan migrate
Redis
为了使用 redis
队列驱动程序,您应该在 config/database.php
配置文件中配置 Redis 数据库连接。
redis
队列驱动程序不支持 serializer
和 compression
Redis 选项。
Redis 集群
如果您的 Redis 队列连接使用 Redis 集群,则您的队列名称必须包含 键哈希标签。 这是必需的,以确保给定队列的所有 Redis 键都放置在同一哈希槽中
1'redis' => [2 'driver' => 'redis',3 'connection' => env('REDIS_QUEUE_CONNECTION', 'default'),4 'queue' => env('REDIS_QUEUE', '{default}'),5 'retry_after' => env('REDIS_QUEUE_RETRY_AFTER', 90),6 'block_for' => null,7 'after_commit' => false,8],
阻塞
使用 Redis 队列时,您可以使用 block_for
配置选项来指定驱动程序应等待任务变为可用的时间,然后再迭代工作进程循环并重新轮询 Redis 数据库。
根据您的队列负载调整此值可能比持续轮询 Redis 数据库以查找新任务更有效。 例如,您可以将该值设置为 5
,以指示驱动程序应阻塞五秒钟,同时等待任务变为可用
1'redis' => [2 'driver' => 'redis',3 'connection' => env('REDIS_QUEUE_CONNECTION', 'default'),4 'queue' => env('REDIS_QUEUE', 'default'),5 'retry_after' => env('REDIS_QUEUE_RETRY_AFTER', 90),6 'block_for' => 5,7 'after_commit' => false,8],
将 block_for
设置为 0
将导致队列工作进程无限期阻塞,直到任务可用。 这还将阻止处理诸如 SIGTERM
之类的信号,直到处理完下一个任务。
其他驱动程序先决条件
列出的队列驱动程序需要以下依赖项。 这些依赖项可以通过 Composer 包管理器安装
- Amazon SQS:
aws/aws-sdk-php ~3.0
- Beanstalkd:
pda/pheanstalk ~5.0
- Redis:
predis/predis ~2.0
或 phpredis PHP 扩展 - MongoDB:
mongodb/laravel-mongodb
创建任务
生成任务类
默认情况下,应用程序的所有可排队任务都存储在 app/Jobs
目录中。 如果 app/Jobs
目录不存在,则在您运行 make:job
Artisan 命令时将创建它
1php artisan make:job ProcessPodcast
生成的类将实现 Illuminate\Contracts\Queue\ShouldQueue
接口,指示 Laravel 应将任务推送到队列以异步运行。
任务存根可以使用 存根发布 进行自定义。
类结构
任务类非常简单,通常仅包含一个 handle
方法,该方法在队列处理任务时调用。 为了开始,让我们看一下一个任务类示例。 在此示例中,我们将假设我们管理一个播客发布服务,并且需要在上传的播客文件发布之前对其进行处理
1<?php 2 3namespace App\Jobs; 4 5use App\Models\Podcast; 6use App\Services\AudioProcessor; 7use Illuminate\Contracts\Queue\ShouldQueue; 8use Illuminate\Foundation\Queue\Queueable; 9 10class ProcessPodcast implements ShouldQueue11{12 use Queueable;13 14 /**15 * Create a new job instance.16 */17 public function __construct(18 public Podcast $podcast,19 ) {}20 21 /**22 * Execute the job.23 */24 public function handle(AudioProcessor $processor): void25 {26 // Process uploaded podcast...27 }28}
在此示例中,请注意,我们能够将 Eloquent 模型 直接传递到排队任务的构造函数中。 由于任务正在使用的 Queueable
特性,因此当任务正在处理时,Eloquent 模型及其加载的关系将被优雅地序列化和反序列化。
如果您的排队任务在其构造函数中接受 Eloquent 模型,则只有该模型的标识符将被序列化到队列中。 当实际处理任务时,队列系统将自动从数据库中重新检索完整的模型实例及其加载的关系。 这种模型序列化方法允许将更小的任务负载发送到您的队列驱动程序。
handle
方法依赖注入
当队列处理任务时,将调用 handle
方法。 请注意,我们能够在任务的 handle
方法上类型提示依赖项。 Laravel 服务容器 会自动注入这些依赖项。
如果您想完全控制容器如何将依赖项注入到 handle
方法中,则可以使用容器的 bindMethod
方法。 bindMethod
方法接受一个回调,该回调接收任务和容器。 在回调中,您可以随意调用 handle
方法。 通常,您应该从 App\Providers\AppServiceProvider
服务提供者 的 boot
方法中调用此方法
1use App\Jobs\ProcessPodcast;2use App\Services\AudioProcessor;3use Illuminate\Contracts\Foundation\Application;4 5$this->app->bindMethod([ProcessPodcast::class, 'handle'], function (ProcessPodcast $job, Application $app) {6 return $job->handle($app->make(AudioProcessor::class));7});
二进制数据(例如原始图像内容)应在传递给排队任务之前通过 base64_encode
函数传递。 否则,当放置在队列中时,任务可能无法正确序列化为 JSON。
队列关系
由于在任务排队时所有加载的 Eloquent 模型关系也会被序列化,因此序列化的任务字符串有时可能会变得很大。 此外,当反序列化任务并从数据库中重新检索模型关系时,将完整地检索它们。 在任务排队过程中模型序列化之前应用的任何先前关系约束在反序列化任务时都不会应用。 因此,如果您希望使用给定关系的子集,则应在排队任务中重新约束该关系。
或者,为了防止关系被序列化,您可以在设置属性值时在模型上调用 withoutRelations
方法。 此方法将返回模型的实例,而不包含其加载的关系
1/**2 * Create a new job instance.3 */4public function __construct(5 Podcast $podcast,6) {7 $this->podcast = $podcast->withoutRelations();8}
如果您正在使用 PHP 构造函数属性提升,并且想要指示不应序列化 Eloquent 模型的关联关系,则可以使用 WithoutRelations
属性
1use Illuminate\Queue\Attributes\WithoutRelations;2 3/**4 * Create a new job instance.5 */6public function __construct(7 #[WithoutRelations]8 public Podcast $podcast,9) {}
如果任务接收到 Eloquent 模型集合或数组而不是单个模型,则当反序列化和执行任务时,该集合中的模型将不会恢复其关系。 这是为了防止在处理大量模型的任务上过度使用资源。
唯一任务
唯一任务需要支持 锁 的缓存驱动程序。 当前,memcached
、redis
、dynamodb
、database
、file
和 array
缓存驱动程序支持原子锁。 此外,唯一任务约束不适用于批次中的任务。
有时,您可能希望确保在任何时间点队列中只有一个特定任务的实例。 您可以通过在任务类上实现 ShouldBeUnique
接口来实现此目的。 此接口不要求您在类上定义任何其他方法
1<?php2 3use Illuminate\Contracts\Queue\ShouldQueue;4use Illuminate\Contracts\Queue\ShouldBeUnique;5 6class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique7{8 // ...9}
在上面的示例中,UpdateSearchIndex
任务是唯一的。 因此,如果队列中已存在另一个任务实例且尚未完成处理,则不会调度该任务。
在某些情况下,您可能想要定义一个使任务唯一的特定“键”,或者您可能想要指定一个超时时间,超过该时间后任务不再保持唯一。 为了实现这一点,您可以在任务类上定义 uniqueId
和 uniqueFor
属性或方法
1<?php 2 3use App\Models\Product; 4use Illuminate\Contracts\Queue\ShouldQueue; 5use Illuminate\Contracts\Queue\ShouldBeUnique; 6 7class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique 8{ 9 /**10 * The product instance.11 *12 * @var \App\Product13 */14 public $product;15 16 /**17 * The number of seconds after which the job's unique lock will be released.18 *19 * @var int20 */21 public $uniqueFor = 3600;22 23 /**24 * Get the unique ID for the job.25 */26 public function uniqueId(): string27 {28 return $this->product->id;29 }30}
在上面的示例中,UpdateSearchIndex
任务按产品 ID 是唯一的。 因此,在现有任务完成处理之前,将忽略任何具有相同产品 ID 的新任务调度。 此外,如果现有任务在一小时内未处理,则唯一锁将被释放,并且可以将另一个具有相同唯一键的任务调度到队列。
如果您的应用程序从多个 Web 服务器或容器调度任务,则应确保所有服务器都与同一中央缓存服务器通信,以便 Laravel 可以准确确定任务是否唯一。
在处理开始之前保持任务唯一
默认情况下,唯一任务在任务完成处理或所有重试尝试失败后会被“解锁”。 但是,在某些情况下,您可能希望任务在处理之前立即解锁。 为了实现这一点,您的任务应实现 ShouldBeUniqueUntilProcessing
契约而不是 ShouldBeUnique
契约
1<?php 2 3use App\Models\Product; 4use Illuminate\Contracts\Queue\ShouldQueue; 5use Illuminate\Contracts\Queue\ShouldBeUniqueUntilProcessing; 6 7class UpdateSearchIndex implements ShouldQueue, ShouldBeUniqueUntilProcessing 8{ 9 // ...10}
唯一任务锁
在幕后,当调度 ShouldBeUnique
任务时,Laravel 会尝试使用 uniqueId
键获取 锁。 如果未获取锁,则不会调度该任务。 当任务完成处理或所有重试尝试失败时,此锁将被释放。 默认情况下,Laravel 将使用默认缓存驱动程序来获取此锁。 但是,如果您希望使用另一个驱动程序来获取锁,则可以定义一个 uniqueVia
方法,该方法返回应使用的缓存驱动程序
1use Illuminate\Contracts\Cache\Repository; 2use Illuminate\Support\Facades\Cache; 3 4class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique 5{ 6 // ... 7 8 /** 9 * Get the cache driver for the unique job lock.10 */11 public function uniqueVia(): Repository12 {13 return Cache::driver('redis');14 }15}
如果您只需要限制任务的并发处理,请改用 WithoutOverlapping
任务中间件。
加密任务
Laravel 允许您通过 加密 来确保任务数据的隐私和完整性。 要开始使用,只需将 ShouldBeEncrypted
接口添加到任务类即可。 将此接口添加到类后,Laravel 会在将您的任务推送到队列之前自动对其进行加密
1<?php2 3use Illuminate\Contracts\Queue\ShouldBeEncrypted;4use Illuminate\Contracts\Queue\ShouldQueue;5 6class UpdateSearchIndex implements ShouldQueue, ShouldBeEncrypted7{8 // ...9}
任务中间件
任务中间件允许您在排队任务的执行周围包装自定义逻辑,从而减少任务本身的样板代码。 例如,考虑以下 handle
方法,该方法利用 Laravel 的 Redis 速率限制功能,以允许每五秒钟仅处理一个任务
1use Illuminate\Support\Facades\Redis; 2 3/** 4 * Execute the job. 5 */ 6public function handle(): void 7{ 8 Redis::throttle('key')->block(0)->allow(1)->every(5)->then(function () { 9 info('Lock obtained...');10 11 // Handle job...12 }, function () {13 // Could not obtain lock...14 15 return $this->release(5);16 });17}
虽然此代码有效,但 handle
方法的实现变得混乱,因为它充满了 Redis 速率限制逻辑。 此外,对于我们要限制速率的任何其他任务,都必须复制此速率限制逻辑。
除了在 handle 方法中进行速率限制之外,我们还可以定义一个处理速率限制的任务中间件。 Laravel 没有任务中间件的默认位置,因此您可以将任务中间件放置在应用程序中的任何位置。 在此示例中,我们将中间件放置在 app/Jobs/Middleware
目录中
1<?php 2 3namespace App\Jobs\Middleware; 4 5use Closure; 6use Illuminate\Support\Facades\Redis; 7 8class RateLimited 9{10 /**11 * Process the queued job.12 *13 * @param \Closure(object): void $next14 */15 public function handle(object $job, Closure $next): void16 {17 Redis::throttle('key')18 ->block(0)->allow(1)->every(5)19 ->then(function () use ($job, $next) {20 // Lock obtained...21 22 $next($job);23 }, function () use ($job) {24 // Could not obtain lock...25 26 $job->release(5);27 });28 }29}
如您所见,与 路由中间件 类似,任务中间件接收正在处理的任务和一个应调用的回调以继续处理任务。
创建任务中间件后,可以通过从任务的 middleware
方法返回它们来将其附加到任务。 此方法在 make:job
Artisan 命令搭建的任务上不存在,因此您需要手动将其添加到您的任务类中
1use App\Jobs\Middleware\RateLimited; 2 3/** 4 * Get the middleware the job should pass through. 5 * 6 * @return array<int, object> 7 */ 8public function middleware(): array 9{10 return [new RateLimited];11}
任务中间件也可以分配给可排队的事件侦听器、邮件和通知。
速率限制
尽管我们刚刚演示了如何编写您自己的速率限制任务中间件,但 Laravel 实际上包含一个速率限制中间件,您可以利用它来限制任务的速率。 与 路由速率限制器 类似,任务速率限制器是使用 RateLimiter
外观模式的 for
方法定义的。
例如,您可能希望允许用户每小时备份一次数据,同时对高级客户不施加此类限制。 为了实现这一点,您可以在 AppServiceProvider
的 boot
方法中定义 RateLimiter
1use Illuminate\Cache\RateLimiting\Limit; 2use Illuminate\Support\Facades\RateLimiter; 3 4/** 5 * Bootstrap any application services. 6 */ 7public function boot(): void 8{ 9 RateLimiter::for('backups', function (object $job) {10 return $job->user->vipCustomer()11 ? Limit::none()12 : Limit::perHour(1)->by($job->user->id);13 });14}
在上面的示例中,我们定义了每小时速率限制; 但是,您可以使用 perMinute
方法轻松定义基于分钟的速率限制。 此外,您可以将任何您希望的值传递给速率限制的 by
方法; 但是,此值最常用于按客户细分速率限制
1return Limit::perMinute(50)->by($job->user->id);
定义速率限制后,您可以使用 Illuminate\Queue\Middleware\RateLimited
中间件将速率限制器附加到您的任务。 每次任务超出速率限制时,此中间件都会根据速率限制持续时间将任务释放回队列并适当延迟。
1use Illuminate\Queue\Middleware\RateLimited; 2 3/** 4 * Get the middleware the job should pass through. 5 * 6 * @return array<int, object> 7 */ 8public function middleware(): array 9{10 return [new RateLimited('backups')];11}
将速率受限的任务释放回队列仍会增加任务的总 attempts
次数。 您可能希望相应地调整任务类上的 tries
和 maxExceptions
属性。 或者,您可能希望使用 retryUntil
方法 来定义任务应不再尝试的时间量。
如果您不希望在任务受到速率限制时重试该任务,则可以使用 dontRelease
方法
1/**2 * Get the middleware the job should pass through.3 *4 * @return array<int, object>5 */6public function middleware(): array7{8 return [(new RateLimited('backups'))->dontRelease()];9}
如果您正在使用 Redis,则可以使用 Illuminate\Queue\Middleware\RateLimitedWithRedis
中间件,该中间件针对 Redis 进行了微调,并且比基本的速率限制中间件更有效。
防止任务重叠
Laravel 包含一个 Illuminate\Queue\Middleware\WithoutOverlapping
中间件,允许您基于任意键防止任务重叠。 当排队任务正在修改应一次仅由一个任务修改的资源时,这可能很有用。
例如,假设您有一个排队任务,用于更新用户的信用评分,并且您想要防止同一用户 ID 的信用评分更新任务重叠。 为了实现这一点,您可以从任务的 middleware
方法返回 WithoutOverlapping
中间件
1use Illuminate\Queue\Middleware\WithoutOverlapping; 2 3/** 4 * Get the middleware the job should pass through. 5 * 6 * @return array<int, object> 7 */ 8public function middleware(): array 9{10 return [new WithoutOverlapping($this->user->id)];11}
任何相同类型的重叠任务都将释放回队列。 您还可以指定在再次尝试释放的任务之前必须经过的秒数
1/**2 * Get the middleware the job should pass through.3 *4 * @return array<int, object>5 */6public function middleware(): array7{8 return [(new WithoutOverlapping($this->order->id))->releaseAfter(60)];9}
如果您希望立即删除任何重叠的任务,以便不重试它们,则可以使用 dontRelease
方法
1/**2 * Get the middleware the job should pass through.3 *4 * @return array<int, object>5 */6public function middleware(): array7{8 return [(new WithoutOverlapping($this->order->id))->dontRelease()];9}
WithoutOverlapping
中间件由 Laravel 的原子锁功能提供支持。 有时,您的任务可能会意外失败或超时,从而导致锁未释放。 因此,您可以明确定义使用 expireAfter
方法的锁过期时间。 例如,下面的示例将指示 Laravel 在任务开始处理三分钟后释放 WithoutOverlapping
锁
1/**2 * Get the middleware the job should pass through.3 *4 * @return array<int, object>5 */6public function middleware(): array7{8 return [(new WithoutOverlapping($this->order->id))->expireAfter(180)];9}
WithoutOverlapping
中间件需要支持 锁 的缓存驱动程序。 当前,memcached
、redis
、dynamodb
、database
、file
和 array
缓存驱动程序支持原子锁。
在任务类之间共享锁键
默认情况下,WithoutOverlapping
中间件将仅防止同一类的重叠任务。 因此,尽管两个不同的任务类可能使用相同的锁键,但它们不会被阻止重叠。 但是,您可以指示 Laravel 使用 shared
方法跨任务类应用键
1use Illuminate\Queue\Middleware\WithoutOverlapping; 2 3class ProviderIsDown 4{ 5 // ... 6 7 public function middleware(): array 8 { 9 return [10 (new WithoutOverlapping("status:{$this->provider}"))->shared(),11 ];12 }13}14 15class ProviderIsUp16{17 // ...18 19 public function middleware(): array20 {21 return [22 (new WithoutOverlapping("status:{$this->provider}"))->shared(),23 ];24 }25}
限制异常
Laravel 包含一个 Illuminate\Queue\Middleware\ThrottlesExceptions
中间件,允许您限制异常。 一旦任务抛出给定数量的异常,则所有进一步执行该任务的尝试都将延迟,直到指定的时隔过去。 此中间件对于与不稳定的第三方服务交互的任务特别有用。
例如,假设一个排队任务与开始抛出异常的第三方 API 交互。 为了限制异常,您可以从任务的 middleware
方法返回 ThrottlesExceptions
中间件。 通常,此中间件应与实现 基于时间的尝试 的任务配对
1use DateTime; 2use Illuminate\Queue\Middleware\ThrottlesExceptions; 3 4/** 5 * Get the middleware the job should pass through. 6 * 7 * @return array<int, object> 8 */ 9public function middleware(): array10{11 return [new ThrottlesExceptions(10, 5 * 60)];12}13 14/**15 * Determine the time at which the job should timeout.16 */17public function retryUntil(): DateTime18{19 return now()->addMinutes(30);20}
中间件接受的第一个构造函数参数是任务在被限制之前可以抛出的异常数量,而第二个构造函数参数是任务在被限制后再次尝试之前应经过的秒数。 在上面的代码示例中,如果任务连续抛出 10 个异常,我们将等待 5 分钟,然后再尝试再次执行该任务,并受 30 分钟时间限制的约束。
当任务抛出异常但尚未达到异常阈值时,通常会立即重试该任务。 但是,您可以通过在将中间件附加到任务时调用 backoff
方法来指定此类任务应延迟的分钟数
1use Illuminate\Queue\Middleware\ThrottlesExceptions; 2 3/** 4 * Get the middleware the job should pass through. 5 * 6 * @return array<int, object> 7 */ 8public function middleware(): array 9{10 return [(new ThrottlesExceptions(10, 5 * 60))->backoff(5)];11}
在内部,此中间件使用 Laravel 的缓存系统来实现速率限制,并且任务的类名用作缓存“键”。 您可以通过在将中间件附加到任务时调用 by
方法来覆盖此键。 如果您有多个任务与同一第三方服务交互,并且您希望它们共享一个通用的限制“存储桶”,则这可能很有用
1use Illuminate\Queue\Middleware\ThrottlesExceptions; 2 3/** 4 * Get the middleware the job should pass through. 5 * 6 * @return array<int, object> 7 */ 8public function middleware(): array 9{10 return [(new ThrottlesExceptions(10, 10 * 60))->by('key')];11}
默认情况下,此中间件将限制每个异常。 您可以通过在将中间件附加到任务时调用 when
方法来修改此行为。 然后,仅当提供给 when
方法的闭包返回 true
时,才会限制异常
1use Illuminate\Http\Client\HttpClientException; 2use Illuminate\Queue\Middleware\ThrottlesExceptions; 3 4/** 5 * Get the middleware the job should pass through. 6 * 7 * @return array<int, object> 8 */ 9public function middleware(): array10{11 return [(new ThrottlesExceptions(10, 10 * 60))->when(12 fn (Throwable $throwable) => $throwable instanceof HttpClientException13 )];14}
如果您希望将受限异常报告给应用程序的异常处理程序,则可以通过在将中间件附加到任务时调用 report
方法来完成此操作。 或者,您可以为 report
方法提供一个闭包,并且仅当给定的闭包返回 true
时才会报告异常
1use Illuminate\Http\Client\HttpClientException; 2use Illuminate\Queue\Middleware\ThrottlesExceptions; 3 4/** 5 * Get the middleware the job should pass through. 6 * 7 * @return array<int, object> 8 */ 9public function middleware(): array10{11 return [(new ThrottlesExceptions(10, 10 * 60))->report(12 fn (Throwable $throwable) => $throwable instanceof HttpClientException13 )];14}
如果您正在使用 Redis,则可以使用 Illuminate\Queue\Middleware\ThrottlesExceptionsWithRedis
中间件,该中间件针对 Redis 进行了微调,并且比基本的异常限制中间件更有效。
跳过任务
Skip
中间件允许您指定应跳过/删除任务,而无需修改任务的逻辑。 如果给定条件评估为 true
,则 Skip::when
方法将删除任务,而如果条件评估为 false
,则 Skip::unless
方法将删除任务
1use Illuminate\Queue\Middleware\Skip; 2 3/** 4 * Get the middleware the job should pass through. 5 */ 6public function middleware(): array 7{ 8 return [ 9 Skip::when($someCondition),10 ];11}
您还可以将 Closure
传递给 when
和 unless
方法以进行更复杂的条件评估
1use Illuminate\Queue\Middleware\Skip; 2 3/** 4 * Get the middleware the job should pass through. 5 */ 6public function middleware(): array 7{ 8 return [ 9 Skip::when(function (): bool {10 return $this->shouldSkip();11 }),12 ];13}
调度任务
编写完任务类后,您可以使用任务本身的 dispatch
方法来调度它。传递给 dispatch
方法的参数将提供给任务的构造函数
1<?php 2 3namespace App\Http\Controllers; 4 5use App\Http\Controllers\Controller; 6use App\Jobs\ProcessPodcast; 7use App\Models\Podcast; 8use Illuminate\Http\RedirectResponse; 9use Illuminate\Http\Request;10 11class PodcastController extends Controller12{13 /**14 * Store a new podcast.15 */16 public function store(Request $request): RedirectResponse17 {18 $podcast = Podcast::create(/* ... */);19 20 // ...21 22 ProcessPodcast::dispatch($podcast);23 24 return redirect('/podcasts');25 }26}
如果您想有条件地调度任务,可以使用 dispatchIf
和 dispatchUnless
方法
1ProcessPodcast::dispatchIf($accountActive, $podcast);2 3ProcessPodcast::dispatchUnless($accountSuspended, $podcast);
在新的 Laravel 应用程序中,sync
驱动是默认的队列驱动。此驱动在当前请求的前台中同步执行任务,这在本地开发期间通常很方便。如果您想实际开始将任务排队以进行后台处理,可以在应用程序的 config/queue.php
配置文件中指定不同的队列驱动。
延迟调度
如果您想指定任务不应立即供队列工作进程处理,则可以在调度任务时使用 delay
方法。例如,让我们指定任务在调度后 10 分钟才可用于处理
1<?php 2 3namespace App\Http\Controllers; 4 5use App\Http\Controllers\Controller; 6use App\Jobs\ProcessPodcast; 7use App\Models\Podcast; 8use Illuminate\Http\RedirectResponse; 9use Illuminate\Http\Request;10 11class PodcastController extends Controller12{13 /**14 * Store a new podcast.15 */16 public function store(Request $request): RedirectResponse17 {18 $podcast = Podcast::create(/* ... */);19 20 // ...21 22 ProcessPodcast::dispatch($podcast)23 ->delay(now()->addMinutes(10));24 25 return redirect('/podcasts');26 }27}
在某些情况下,任务可能配置了默认延迟。如果您需要绕过此延迟并立即调度任务进行处理,可以使用 withoutDelay
方法
1ProcessPodcast::dispatch($podcast)->withoutDelay();
Amazon SQS 队列服务的最大延迟时间为 15 分钟。
在响应发送到浏览器后调度
或者,如果您的 Web 服务器使用 FastCGI,则 dispatchAfterResponse
方法会将任务调度延迟到 HTTP 响应发送到用户浏览器之后。即使排队任务仍在执行,这也将允许用户开始使用应用程序。这通常只应用于大约需要一秒钟的任务,例如发送电子邮件。由于它们在当前的 HTTP 请求中处理,因此以这种方式调度的任务不需要队列工作进程运行才能被处理
1use App\Jobs\SendNotification;2 3SendNotification::dispatchAfterResponse();
您还可以 dispatch
一个闭包,并将 afterResponse
方法链接到 dispatch
辅助函数上,以便在 HTTP 响应发送到浏览器后执行闭包
1use App\Mail\WelcomeMessage;2use Illuminate\Support\Facades\Mail;3 4dispatch(function () {6})->afterResponse();
同步调度
如果您想立即(同步)调度任务,可以使用 dispatchSync
方法。使用此方法时,任务将不会排队,并将立即在当前进程中执行
1<?php 2 3namespace App\Http\Controllers; 4 5use App\Http\Controllers\Controller; 6use App\Jobs\ProcessPodcast; 7use App\Models\Podcast; 8use Illuminate\Http\RedirectResponse; 9use Illuminate\Http\Request;10 11class PodcastController extends Controller12{13 /**14 * Store a new podcast.15 */16 public function store(Request $request): RedirectResponse17 {18 $podcast = Podcast::create(/* ... */);19 20 // Create podcast...21 22 ProcessPodcast::dispatchSync($podcast);23 24 return redirect('/podcasts');25 }26}
任务 & 数据库事务
虽然在数据库事务中调度任务完全没有问题,但您应特别注意确保您的任务实际上能够成功执行。在事务中调度任务时,任务有可能在父事务提交之前被工作进程处理。发生这种情况时,您在数据库事务期间对模型或数据库记录所做的任何更新可能尚未反映在数据库中。此外,事务中创建的任何模型或数据库记录可能不存在于数据库中。
幸运的是,Laravel 提供了几种解决此问题的方法。首先,您可以在队列连接的配置数组中设置 after_commit
连接选项
1'redis' => [2 'driver' => 'redis',3 // ...4 'after_commit' => true,5],
当 after_commit
选项为 true
时,您可以在数据库事务中调度任务;但是,Laravel 将等待打开的父数据库事务提交后,再实际调度任务。当然,如果当前没有打开数据库事务,任务将立即被调度。
如果事务由于事务期间发生的异常而回滚,则在该事务期间调度的任务将被丢弃。
将 after_commit
配置选项设置为 true
也会导致任何排队的事件监听器、邮件、通知和广播事件在所有打开的数据库事务提交后被调度。
内联指定提交调度行为
如果您未将 after_commit
队列连接配置选项设置为 true
,您仍然可以指示特定任务应在所有打开的数据库事务提交后调度。为了实现这一点,您可以将 afterCommit
方法链接到您的调度操作上
1use App\Jobs\ProcessPodcast;2 3ProcessPodcast::dispatch($podcast)->afterCommit();
同样,如果 after_commit
配置选项设置为 true
,您可以指示特定任务应立即调度,而无需等待任何打开的数据库事务提交
1ProcessPodcast::dispatch($podcast)->beforeCommit();
任务链
任务链允许您指定一系列排队任务,这些任务应在主任务成功执行后按顺序运行。如果序列中的一个任务失败,其余任务将不会运行。要执行排队任务链,您可以使用 Bus
facade 提供的 chain
方法。Laravel 的命令总线是一个较低级别的组件,排队任务调度是构建在其之上的
1use App\Jobs\OptimizePodcast; 2use App\Jobs\ProcessPodcast; 3use App\Jobs\ReleasePodcast; 4use Illuminate\Support\Facades\Bus; 5 6Bus::chain([ 7 new ProcessPodcast, 8 new OptimizePodcast, 9 new ReleasePodcast,10])->dispatch();
除了链接任务类实例之外,您还可以链接闭包
1Bus::chain([2 new ProcessPodcast,3 new OptimizePodcast,4 function () {5 Podcast::update(/* ... */);6 },7])->dispatch();
在任务中使用 $this->delete()
方法删除任务不会阻止链式任务被处理。只有当链中的任务失败时,链才会停止执行。
链式连接和队列
如果您想指定应用于链式任务的连接和队列,可以使用 onConnection
和 onQueue
方法。这些方法指定了应使用的队列连接和队列名称,除非排队任务被显式分配了不同的连接/队列
1Bus::chain([2 new ProcessPodcast,3 new OptimizePodcast,4 new ReleasePodcast,5])->onConnection('redis')->onQueue('podcasts')->dispatch();
向链中添加任务
有时,您可能需要从链中的另一个任务中向现有任务链中添加前置或附加任务。您可以使用 prependToChain
和 appendToChain
方法来实现此目的
1/** 2 * Execute the job. 3 */ 4public function handle(): void 5{ 6 // ... 7 8 // Prepend to the current chain, run job immediately after current job... 9 $this->prependToChain(new TranscribePodcast);10 11 // Append to the current chain, run job at end of chain...12 $this->appendToChain(new TranscribePodcast);13}
链式失败
在链接任务时,您可以使用 catch
方法指定一个闭包,如果链中的任务失败,则应调用该闭包。给定的回调将接收导致任务失败的 Throwable
实例
1use Illuminate\Support\Facades\Bus; 2use Throwable; 3 4Bus::chain([ 5 new ProcessPodcast, 6 new OptimizePodcast, 7 new ReleasePodcast, 8])->catch(function (Throwable $e) { 9 // A job within the chain has failed...10})->dispatch();
由于链式回调由 Laravel 队列序列化并在稍后执行,因此您不应在链式回调中使用 $this
变量。
自定义队列和连接
调度到特定队列
通过将任务推送到不同的队列,您可以“分类”您的排队任务,甚至可以确定分配给各个队列的工作进程的数量的优先级。请记住,这不会将任务推送到由您的队列配置文件定义的不同队列“连接”,而只是推送到单个连接中的特定队列。要指定队列,请在调度任务时使用 onQueue
方法
1<?php 2 3namespace App\Http\Controllers; 4 5use App\Http\Controllers\Controller; 6use App\Jobs\ProcessPodcast; 7use App\Models\Podcast; 8use Illuminate\Http\RedirectResponse; 9use Illuminate\Http\Request;10 11class PodcastController extends Controller12{13 /**14 * Store a new podcast.15 */16 public function store(Request $request): RedirectResponse17 {18 $podcast = Podcast::create(/* ... */);19 20 // Create podcast...21 22 ProcessPodcast::dispatch($podcast)->onQueue('processing');23 24 return redirect('/podcasts');25 }26}
或者,您可以通过在任务的构造函数中调用 onQueue
方法来指定任务的队列
1<?php 2 3namespace App\Jobs; 4 5use Illuminate\Contracts\Queue\ShouldQueue; 6use Illuminate\Foundation\Queue\Queueable; 7 8class ProcessPodcast implements ShouldQueue 9{10 use Queueable;11 12 /**13 * Create a new job instance.14 */15 public function __construct()16 {17 $this->onQueue('processing');18 }19}
调度到特定连接
如果您的应用程序与多个队列连接交互,则可以使用 onConnection
方法指定要将任务推送到哪个连接
1<?php 2 3namespace App\Http\Controllers; 4 5use App\Http\Controllers\Controller; 6use App\Jobs\ProcessPodcast; 7use App\Models\Podcast; 8use Illuminate\Http\RedirectResponse; 9use Illuminate\Http\Request;10 11class PodcastController extends Controller12{13 /**14 * Store a new podcast.15 */16 public function store(Request $request): RedirectResponse17 {18 $podcast = Podcast::create(/* ... */);19 20 // Create podcast...21 22 ProcessPodcast::dispatch($podcast)->onConnection('sqs');23 24 return redirect('/podcasts');25 }26}
您可以将 onConnection
和 onQueue
方法链接在一起,以指定任务的连接和队列
1ProcessPodcast::dispatch($podcast)2 ->onConnection('sqs')3 ->onQueue('processing');
或者,您可以通过在任务的构造函数中调用 onConnection
方法来指定任务的连接
1<?php 2 3namespace App\Jobs; 4 5use Illuminate\Contracts\Queue\ShouldQueue; 6use Illuminate\Foundation\Queue\Queueable; 7 8class ProcessPodcast implements ShouldQueue 9{10 use Queueable;11 12 /**13 * Create a new job instance.14 */15 public function __construct()16 {17 $this->onConnection('sqs');18 }19}
指定最大任务尝试次数 / 超时值
最大尝试次数
如果您的某个排队任务遇到错误,您可能不希望它无限期地重试。因此,Laravel 提供了多种方法来指定任务可以尝试的次数或时长。
指定任务可以尝试的最大次数的一种方法是通过 Artisan 命令行上的 --tries
开关。除非正在处理的任务指定了它可以尝试的次数,否则这将应用于工作进程处理的所有任务
1php artisan queue:work --tries=3
如果任务超过其最大尝试次数,它将被视为“失败”的任务。有关处理失败任务的更多信息,请参阅 失败任务文档。如果为 queue:work
命令提供了 --tries=0
,则任务将无限期地重试。
您可以通过在任务类本身上定义任务可以尝试的最大次数来采取更精细的方法。如果在任务上指定了最大尝试次数,它将优先于命令行上提供的 --tries
值
1<?php 2 3namespace App\Jobs; 4 5class ProcessPodcast implements ShouldQueue 6{ 7 /** 8 * The number of times the job may be attempted. 9 *10 * @var int11 */12 public $tries = 5;13}
如果您需要动态控制特定任务的最大尝试次数,可以在任务上定义 tries
方法
1/**2 * Determine number of times the job may be attempted.3 */4public function tries(): int5{6 return 5;7}
基于时间的尝试
作为定义任务在失败之前可以尝试多少次的替代方法,您可以定义任务不应再尝试的时间。这允许任务在给定的时间范围内尝试任意次数。要定义任务不应再尝试的时间,请向您的任务类添加 retryUntil
方法。此方法应返回 DateTime
实例
1use DateTime;2 3/**4 * Determine the time at which the job should timeout.5 */6public function retryUntil(): DateTime7{8 return now()->addMinutes(10);9}
您还可以在您的 排队事件监听器上定义 tries
属性或 retryUntil
方法。
最大异常数
有时您可能希望指定任务可以尝试多次,但如果重试是由给定数量的未处理异常触发的(而不是由 release
方法直接释放),则应失败。为了实现这一点,您可以在任务类上定义 maxExceptions
属性
1<?php 2 3namespace App\Jobs; 4 5use Illuminate\Support\Facades\Redis; 6 7class ProcessPodcast implements ShouldQueue 8{ 9 /**10 * The number of times the job may be attempted.11 *12 * @var int13 */14 public $tries = 25;15 16 /**17 * The maximum number of unhandled exceptions to allow before failing.18 *19 * @var int20 */21 public $maxExceptions = 3;22 23 /**24 * Execute the job.25 */26 public function handle(): void27 {28 Redis::throttle('key')->allow(10)->every(60)->then(function () {29 // Lock obtained, process the podcast...30 }, function () {31 // Unable to obtain lock...32 return $this->release(10);33 });34 }35}
在此示例中,如果应用程序无法获取 Redis 锁,则任务将被释放 10 秒,并将继续重试最多 25 次。但是,如果任务抛出三个未处理的异常,则任务将失败。
超时
通常,您大致知道您期望排队任务花费多长时间。因此,Laravel 允许您指定“超时”值。默认情况下,超时值为 60 秒。如果任务的处理时间超过超时值指定的秒数,则处理任务的工作进程将退出并显示错误。通常,工作进程将由您服务器上配置的 进程管理器 自动重启。
可以使用 Artisan 命令行上的 --timeout
开关指定任务可以运行的最大秒数
1php artisan queue:work --timeout=30
如果任务因持续超时而超过其最大尝试次数,则它将被标记为失败。
您还可以在任务类本身上定义允许任务运行的最大秒数。如果在任务上指定了超时时间,它将优先于命令行上指定的任何超时时间
1<?php 2 3namespace App\Jobs; 4 5class ProcessPodcast implements ShouldQueue 6{ 7 /** 8 * The number of seconds the job can run before timing out. 9 *10 * @var int11 */12 public $timeout = 120;13}
有时,IO 阻塞进程(例如套接字或传出的 HTTP 连接)可能不遵守您指定的超时时间。因此,当使用这些功能时,您应始终尝试也使用它们的 API 指定超时时间。例如,当使用 Guzzle 时,您应始终指定连接和请求超时值。
必须安装 pcntl
PHP 扩展才能指定任务超时。此外,任务的“超时”值应始终小于其 “重试间隔” 值。否则,任务可能在实际完成执行或超时之前被重新尝试。
超时时失败
如果您想指示任务应在超时时标记为 失败,您可以在任务类上定义 $failOnTimeout
属性
1/**2 * Indicate if the job should be marked as failed on timeout.3 *4 * @var bool5 */6public $failOnTimeout = true;
错误处理
如果在任务处理期间抛出异常,任务将自动释放回队列,以便可以再次尝试。任务将继续释放,直到它已尝试了应用程序允许的最大次数。最大尝试次数由 queue:work
Artisan 命令上使用的 --tries
开关定义。或者,最大尝试次数可以在任务类本身上定义。有关运行队列工作进程的更多信息,可以在下面找到。
手动释放任务
有时您可能希望手动将任务释放回队列,以便稍后可以再次尝试。您可以通过调用 release
方法来实现此目的
1/**2 * Execute the job.3 */4public function handle(): void5{6 // ...7 8 $this->release();9}
默认情况下,release
方法会将任务释放回队列以进行立即处理。但是,您可以指示队列在给定的秒数过去后才使任务可用于处理,方法是将整数或日期实例传递给 release
方法
1$this->release(10);2 3$this->release(now()->addSeconds(10));
手动使任务失败
有时您可能需要手动将任务标记为“失败”。为此,您可以调用 fail
方法
1/**2 * Execute the job.3 */4public function handle(): void5{6 // ...7 8 $this->fail();9}
如果您想因为您捕获的异常而将任务标记为失败,您可以将异常传递给 fail
方法。或者,为了方便起见,您可以传递字符串错误消息,该消息将为您转换为异常
1$this->fail($exception);2 3$this->fail('Something went wrong.');
有关失败任务的更多信息,请查看 有关处理任务失败的文档。
任务批处理
Laravel 的任务批处理功能允许您轻松执行一批任务,然后在该批任务执行完成后执行某些操作。在开始之前,您应该创建一个数据库迁移来构建一个表,该表将包含有关您的任务批次的元信息,例如其完成百分比。可以使用 make:queue-batches-table
Artisan 命令生成此迁移
1php artisan make:queue-batches-table2 3php artisan migrate
定义可批处理的任务
要定义可批处理的任务,您应该像往常一样 创建可排队的任务;但是,您应该将 Illuminate\Bus\Batchable
trait 添加到任务类。此 trait 提供了对 batch
方法的访问,该方法可用于检索任务正在其中执行的当前批次
1<?php 2 3namespace App\Jobs; 4 5use Illuminate\Bus\Batchable; 6use Illuminate\Contracts\Queue\ShouldQueue; 7use Illuminate\Foundation\Queue\Queueable; 8 9class ImportCsv implements ShouldQueue10{11 use Batchable, Queueable;12 13 /**14 * Execute the job.15 */16 public function handle(): void17 {18 if ($this->batch()->cancelled()) {19 // Determine if the batch has been cancelled...20 21 return;22 }23 24 // Import a portion of the CSV file...25 }26}
调度批次
要调度一批任务,您应该使用 Bus
facade 的 batch
方法。当然,批处理主要在与完成回调结合使用时才有用。因此,您可以使用 then
、catch
和 finally
方法为批次定义完成回调。当调用这些回调时,每个回调都将接收 Illuminate\Bus\Batch
实例。在此示例中,我们将假设我们正在排队一批任务,每个任务都处理 CSV 文件中给定数量的行
1use App\Jobs\ImportCsv; 2use Illuminate\Bus\Batch; 3use Illuminate\Support\Facades\Bus; 4use Throwable; 5 6$batch = Bus::batch([ 7 new ImportCsv(1, 100), 8 new ImportCsv(101, 200), 9 new ImportCsv(201, 300),10 new ImportCsv(301, 400),11 new ImportCsv(401, 500),12])->before(function (Batch $batch) {13 // The batch has been created but no jobs have been added...14})->progress(function (Batch $batch) {15 // A single job has completed successfully...16})->then(function (Batch $batch) {17 // All jobs completed successfully...18})->catch(function (Batch $batch, Throwable $e) {19 // First batch job failure detected...20})->finally(function (Batch $batch) {21 // The batch has finished executing...22})->dispatch();23 24return $batch->id;
批次的 ID(可以通过 $batch->id
属性访问)可用于 查询 Laravel 命令总线 以获取有关已调度批次的信息。
由于批处理回调由 Laravel 队列序列化并在稍后执行,因此您不应在回调中使用 $this
变量。此外,由于批处理任务包装在数据库事务中,因此不应在任务中执行触发隐式提交的数据库语句。
命名批次
某些工具(例如 Laravel Horizon 和 Laravel Telescope)可以在批次被命名时为批次提供更友好的调试信息。要为批次分配任意名称,您可以在定义批次时调用 name
方法
1$batch = Bus::batch([2 // ...3])->then(function (Batch $batch) {4 // All jobs completed successfully...5})->name('Import CSV')->dispatch();
批次连接和队列
如果您想指定应用于批处理任务的连接和队列,可以使用 onConnection
和 onQueue
方法。所有批处理任务必须在同一连接和队列中执行
1$batch = Bus::batch([2 // ...3])->then(function (Batch $batch) {4 // All jobs completed successfully...5})->onConnection('redis')->onQueue('imports')->dispatch();
链和批次
您可以通过将链式任务放置在数组中来在批次中定义一组 链式任务。例如,我们可以并行执行两个任务链,并在两个任务链都完成处理后执行回调
1use App\Jobs\ReleasePodcast; 2use App\Jobs\SendPodcastReleaseNotification; 3use Illuminate\Bus\Batch; 4use Illuminate\Support\Facades\Bus; 5 6Bus::batch([ 7 [ 8 new ReleasePodcast(1), 9 new SendPodcastReleaseNotification(1),10 ],11 [12 new ReleasePodcast(2),13 new SendPodcastReleaseNotification(2),14 ],15])->then(function (Batch $batch) {16 // ...17})->dispatch();
相反,您可以通过在 链 中定义批次来在链中运行批处理任务。例如,您可以首先运行一批任务以发布多个播客,然后运行一批任务以发送发布通知
1use App\Jobs\FlushPodcastCache; 2use App\Jobs\ReleasePodcast; 3use App\Jobs\SendPodcastReleaseNotification; 4use Illuminate\Support\Facades\Bus; 5 6Bus::chain([ 7 new FlushPodcastCache, 8 Bus::batch([ 9 new ReleasePodcast(1),10 new ReleasePodcast(2),11 ]),12 Bus::batch([13 new SendPodcastReleaseNotification(1),14 new SendPodcastReleaseNotification(2),15 ]),16])->dispatch();
将任务添加到批次
有时,从批处理任务中向批次添加其他任务可能很有用。当您需要批处理数千个任务时,此模式可能很有用,这些任务可能需要很长时间才能在 Web 请求期间调度。因此,您可能希望调度一批初始“加载器”任务,这些任务会使用更多任务来填充批次
1$batch = Bus::batch([2 new LoadImportBatch,3 new LoadImportBatch,4 new LoadImportBatch,5])->then(function (Batch $batch) {6 // All jobs completed successfully...7})->name('Import Contacts')->dispatch();
在此示例中,我们将使用 LoadImportBatch
任务来使用其他任务填充批次。为了实现这一点,我们可以使用批次实例上的 add
方法,该实例可以通过任务的 batch
方法访问
1use App\Jobs\ImportContacts; 2use Illuminate\Support\Collection; 3 4/** 5 * Execute the job. 6 */ 7public function handle(): void 8{ 9 if ($this->batch()->cancelled()) {10 return;11 }12 13 $this->batch()->add(Collection::times(1000, function () {14 return new ImportContacts;15 }));16}
您只能从属于同一批次的任务中向批次添加任务。
检查批次
提供给批次完成回调的 Illuminate\Bus\Batch
实例具有各种属性和方法,可帮助您与给定的任务批次进行交互和检查
1// The UUID of the batch... 2$batch->id; 3 4// The name of the batch (if applicable)... 5$batch->name; 6 7// The number of jobs assigned to the batch... 8$batch->totalJobs; 9 10// The number of jobs that have not been processed by the queue...11$batch->pendingJobs;12 13// The number of jobs that have failed...14$batch->failedJobs;15 16// The number of jobs that have been processed thus far...17$batch->processedJobs();18 19// The completion percentage of the batch (0-100)...20$batch->progress();21 22// Indicates if the batch has finished executing...23$batch->finished();24 25// Cancel the execution of the batch...26$batch->cancel();27 28// Indicates if the batch has been cancelled...29$batch->cancelled();
从路由返回批次
所有 Illuminate\Bus\Batch
实例都是 JSON 可序列化的,这意味着您可以直接从应用程序的路由之一返回它们,以检索包含有关批次信息的 JSON 有效负载,包括其完成进度。这使得在应用程序的 UI 中显示有关批次完成进度的信息非常方便。
要按 ID 检索批次,您可以使用 Bus
facade 的 findBatch
方法
1use Illuminate\Support\Facades\Bus;2use Illuminate\Support\Facades\Route;3 4Route::get('/batch/{batchId}', function (string $batchId) {5 return Bus::findBatch($batchId);6});
取消批次
有时您可能需要取消给定批次的执行。可以通过调用 Illuminate\Bus\Batch
实例上的 cancel
方法来完成此操作
1/** 2 * Execute the job. 3 */ 4public function handle(): void 5{ 6 if ($this->user->exceedsImportLimit()) { 7 return $this->batch()->cancel(); 8 } 9 10 if ($this->batch()->cancelled()) {11 return;12 }13}
正如您可能在前面的示例中注意到的那样,批处理任务通常应在继续执行之前确定其对应的批次是否已被取消。但是,为了方便起见,您可以将 SkipIfBatchCancelled
中间件 分配给任务。顾名思义,此中间件将指示 Laravel 如果其对应的批次已被取消,则不处理该任务
1use Illuminate\Queue\Middleware\SkipIfBatchCancelled;2 3/**4 * Get the middleware the job should pass through.5 */6public function middleware(): array7{8 return [new SkipIfBatchCancelled];9}
批次失败
当批处理任务失败时,将调用 catch
回调(如果已分配)。此回调仅为批次中第一个失败的任务调用。
允许失败
当批次中的任务失败时,Laravel 将自动将批次标记为“已取消”。如果您愿意,您可以禁用此行为,以便任务失败不会自动将批次标记为已取消。可以通过在调度批次时调用 allowFailures
方法来实现此目的
1$batch = Bus::batch([2 // ...3])->then(function (Batch $batch) {4 // All jobs completed successfully...5})->allowFailures()->dispatch();
重试失败的批处理任务
为了方便起见,Laravel 提供了一个 queue:retry-batch
Artisan 命令,使您可以轻松地重试给定批次的所有失败任务。queue:retry-batch
命令接受应重试其失败任务的批次的 UUID
1php artisan queue:retry-batch 32dbc76c-4f82-4749-b610-a639fe0099b5
修剪批次
如果不进行修剪,job_batches
表可能会非常快速地累积记录。为了缓解这种情况,您应该 计划 每天运行 queue:prune-batches
Artisan 命令
1use Illuminate\Support\Facades\Schedule;2 3Schedule::command('queue:prune-batches')->daily();
默认情况下,所有超过 24 小时的已完成批次都将被修剪。您可以在调用命令时使用 hours
选项来确定保留批次数据的时间长度。例如,以下命令将删除所有超过 48 小时前完成的批次
1use Illuminate\Support\Facades\Schedule;2 3Schedule::command('queue:prune-batches --hours=48')->daily();
有时,您的 jobs_batches
表可能会累积从未成功完成的批次的批次记录,例如任务失败且该任务从未成功重试的批次。您可以指示 queue:prune-batches
命令使用 unfinished
选项修剪这些未完成的批次记录
1use Illuminate\Support\Facades\Schedule;2 3Schedule::command('queue:prune-batches --hours=48 --unfinished=72')->daily();
同样,您的 jobs_batches
表也可能累积已取消批次的批次记录。您可以指示 queue:prune-batches
命令使用 cancelled
选项修剪这些已取消的批次记录
1use Illuminate\Support\Facades\Schedule;2 3Schedule::command('queue:prune-batches --hours=48 --cancelled=72')->daily();
在 DynamoDB 中存储批次
Laravel 还支持将批次元信息存储在 DynamoDB 中,而不是关系数据库中。但是,您需要手动创建一个 DynamoDB 表来存储所有批次记录。
通常,此表应命名为 job_batches
,但您应根据应用程序的 queue
配置文件中 queue.batching.table
配置值来命名表。
DynamoDB 批次表配置
job_batches
表应具有名为 application
的字符串主分区键和名为 id
的字符串主排序键。键的 application
部分将包含您的应用程序名称,如应用程序的 app
配置文件中的 name
配置值所定义。由于应用程序名称是 DynamoDB 表键的一部分,因此您可以使用同一表来存储多个 Laravel 应用程序的任务批次。
此外,如果您想利用 自动批次修剪,可以为您的表定义 ttl
属性。
DynamoDB 配置
接下来,安装 AWS SDK,以便您的 Laravel 应用程序可以与 Amazon DynamoDB 通信
1composer require aws/aws-sdk-php
然后,将 queue.batching.driver
配置选项的值设置为 dynamodb
。此外,您应该在 batching
配置数组中定义 key
、secret
和 region
配置选项。这些选项将用于向 AWS 进行身份验证。当使用 dynamodb
驱动时,queue.batching.database
配置选项是不必要的
1'batching' => [2 'driver' => env('QUEUE_BATCHING_DRIVER', 'dynamodb'),3 'key' => env('AWS_ACCESS_KEY_ID'),4 'secret' => env('AWS_SECRET_ACCESS_KEY'),5 'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),6 'table' => 'job_batches',7],
在 DynamoDB 中修剪批次
当使用 DynamoDB 存储任务批次信息时,用于修剪存储在关系数据库中的批次的典型修剪命令将不起作用。相反,您可以使用 DynamoDB 的原生 TTL 功能 来自动删除旧批次的记录。
如果您使用 ttl
属性定义了 DynamoDB 表,则可以定义配置参数来指示 Laravel 如何修剪批次记录。queue.batching.ttl_attribute
配置值定义了保存 TTL 的属性的名称,而 queue.batching.ttl
配置值定义了相对于记录上次更新时间,批次记录可以从 DynamoDB 表中删除的秒数
1'batching' => [2 'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'),3 'key' => env('AWS_ACCESS_KEY_ID'),4 'secret' => env('AWS_SECRET_ACCESS_KEY'),5 'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),6 'table' => 'job_batches',7 'ttl_attribute' => 'ttl',8 'ttl' => 60 * 60 * 24 * 7, // 7 days...9],
队列闭包
除了将任务类调度到队列之外,您还可以调度闭包。这对于需要在当前请求周期之外执行的快速、简单任务非常有用。当将闭包调度到队列时,闭包的代码内容经过加密签名,以便在传输过程中无法修改
1$podcast = App\Podcast::find(1);2 3dispatch(function () use ($podcast) {4 $podcast->publish();5});
使用 catch
方法,您可以提供一个闭包,如果排队的闭包在耗尽队列的所有 配置的重试尝试 后未能成功完成,则应执行该闭包
1use Throwable;2 3dispatch(function () use ($podcast) {4 $podcast->publish();5})->catch(function (Throwable $e) {6 // This job has failed...7});
由于 catch
回调由 Laravel 队列序列化并在稍后执行,因此您不应在 catch
回调中使用 $this
变量。
运行队列工作进程
queue:work
命令
Laravel 包含一个 Artisan 命令,该命令将启动队列工作进程并在新任务推送到队列时处理它们。您可以使用 queue:work
Artisan 命令运行工作进程。请注意,一旦 queue:work
命令启动,它将继续运行,直到手动停止或关闭终端
1php artisan queue:work
为了使 queue:work
进程在后台永久运行,您应该使用进程监视器(例如 Supervisor)来确保队列工作进程不会停止运行。
如果您希望将处理的任务 ID 包含在命令的输出中,则可以在调用 queue:work
命令时包含 -v
标志
1php artisan queue:work -v
请记住,队列工作进程是长期运行的进程,并将启动的应用程序状态存储在内存中。因此,它们在启动后不会注意到代码库中的更改。因此,在部署过程中,请务必 重启队列工作进程。此外,请记住,您的应用程序创建或修改的任何静态状态都不会在任务之间自动重置。
或者,您可以运行 queue:listen
命令。当使用 queue:listen
命令时,您不必在想要重新加载更新的代码或重置应用程序状态时手动重启工作进程;但是,此命令的效率远低于 queue:work
命令
1php artisan queue:listen
运行多个队列工作进程
要为队列分配多个工作进程并同时处理任务,您只需启动多个 queue:work
进程即可。这可以通过终端中的多个选项卡在本地完成,也可以使用进程管理器的配置设置在生产环境中完成。当使用 Supervisor 时,您可以使用 numprocs
配置值。
指定连接和队列
您还可以指定工作进程应使用的队列连接。传递给 work
命令的连接名称应与您的 config/queue.php
配置文件中定义的连接之一相对应
1php artisan queue:work redis
默认情况下,queue:work
命令仅处理给定连接上的默认队列的任务。但是,您可以通过仅处理给定连接的特定队列来进一步自定义队列工作进程。例如,如果您的所有电子邮件都在 redis
队列连接上的 emails
队列中处理,您可以发出以下命令来启动仅处理该队列的工作进程
1php artisan queue:work redis --queue=emails
处理指定数量的任务
--once
选项可用于指示工作进程仅处理队列中的单个任务
1php artisan queue:work --once
--max-jobs
选项可用于指示工作进程处理给定数量的任务然后退出。当与 Supervisor 结合使用时,此选项可能很有用,以便您的工作进程在处理给定数量的任务后自动重启,从而释放它们可能累积的任何内存
1php artisan queue:work --max-jobs=1000
处理所有排队任务然后退出
--stop-when-empty
选项可用于指示工作进程处理所有任务然后正常退出。如果您希望在 Docker 容器中处理 Laravel 队列后关闭容器,则此选项非常有用
1php artisan queue:work --stop-when-empty
处理给定秒数的任务
--max-time
选项可用于指示工作进程处理给定秒数的任务然后退出。当与 Supervisor 结合使用时,此选项可能很有用,以便您的工作进程在处理给定时间量的任务后自动重启,从而释放它们可能累积的任何内存
1# Process jobs for one hour and then exit...2php artisan queue:work --max-time=3600
工作进程睡眠持续时间
当队列中有可用任务时,工作进程将继续处理任务,任务之间没有延迟。但是,如果队列中没有可用任务,则 sleep
选项确定工作进程将“睡眠”多少秒。当然,在睡眠期间,工作进程将不处理任何新任务
1php artisan queue:work --sleep=3
维护模式和队列
当您的应用程序处于 维护模式 时,将不会处理任何排队任务。一旦应用程序退出维护模式,任务将继续正常处理。
要强制队列工作进程即使在启用维护模式的情况下也处理任务,您可以使用 --force
选项
1php artisan queue:work --force
资源注意事项
守护程序队列工作进程不会在处理每个任务之前“重启”框架。因此,您应在每个任务完成后释放任何繁重的资源。例如,如果您正在使用 GD 库进行图像处理,则应在使用完图像后使用 imagedestroy
释放内存。
队列优先级
有时您可能希望确定队列的处理优先级。例如,在您的 config/queue.php
配置文件中,您可以将 redis
连接的默认 queue
设置为 low
。但是,有时您可能希望将任务推送到 high
优先级队列,如下所示
1dispatch((new Job)->onQueue('high'));
要启动一个工作进程,该工作进程验证是否已处理所有 high
队列任务,然后再继续处理 low
队列上的任何任务,请将逗号分隔的队列名称列表传递给 work
命令
1php artisan queue:work --queue=high,low
队列工作进程和部署
由于队列工作进程是长期运行的进程,因此它们在不重启的情况下不会注意到代码的更改。因此,使用队列工作进程部署应用程序的最简单方法是在部署过程中重启工作进程。您可以通过发出 queue:restart
命令来正常重启所有工作进程
1php artisan queue:restart
此命令将指示所有队列工作进程在完成处理当前任务后正常退出,以便不会丢失任何现有任务。由于队列工作进程将在执行 queue:restart
命令时退出,因此您应该运行进程管理器(例如 Supervisor)以自动重启队列工作进程。
队列使用 缓存 来存储重启信号,因此您应在使用此功能之前验证是否为您的应用程序正确配置了缓存驱动。
任务过期和超时
任务过期
在您的 config/queue.php
配置文件中,每个队列连接都定义了一个 retry_after
选项。此选项指定队列连接在重试正在处理的任务之前应等待多少秒。例如,如果 retry_after
的值设置为 90
,则如果任务已处理 90 秒而未被释放或删除,则该任务将释放回队列。通常,您应将 retry_after
值设置为您的任务合理完成处理所需的最大秒数。
唯一不包含 retry_after
值的队列连接是 Amazon SQS。SQS 将根据 默认可见性超时 重试任务,该超时在 AWS 控制台中管理。
工作进程超时
queue:work
Artisan 命令公开了一个 --timeout
选项。默认情况下,--timeout
值为 60 秒。如果任务的处理时间超过超时值指定的秒数,则处理任务的工作进程将退出并显示错误。通常,工作进程将由您服务器上配置的 进程管理器 自动重启
1php artisan queue:work --timeout=60
retry_after
配置选项和 --timeout
CLI 选项是不同的,但它们协同工作以确保任务不会丢失,并且任务仅成功处理一次。
--timeout
值应始终比您的 retry_after
配置值至少短几秒钟。这将确保在任务重试之前始终终止处理冻结任务的工作进程。如果您的 --timeout
选项比您的 retry_after
配置值长,则您的任务可能会被处理两次。
Supervisor 配置
在生产环境中,您需要一种方法来保持 queue:work
进程运行。queue:work
进程可能会因多种原因停止运行,例如超出工作进程超时或执行 queue:restart
命令。
因此,您需要配置一个进程监视器,该监视器可以检测到您的 queue:work
进程何时退出并自动重启它们。此外,进程监视器还可以让您指定要并发运行多少个 queue:work
进程。Supervisor 是 Linux 环境中常用的进程监视器,我们将在以下文档中讨论如何配置它。
安装 Supervisor
Supervisor 是 Linux 操作系统的一个进程监视器,如果您的 queue:work
进程失败,它将自动重启它们。要在 Ubuntu 上安装 Supervisor,您可以使用以下命令
1sudo apt-get install supervisor
如果自己配置和管理 Supervisor 听起来令人望而生畏,请考虑使用 Laravel Cloud,它提供了一个完全托管的平台来运行 Laravel 队列工作进程。
配置 Supervisor
Supervisor 配置文件通常存储在 /etc/supervisor/conf.d
目录中。在此目录中,您可以创建任意数量的配置文件,这些文件指示 Supervisor 应如何监视您的进程。例如,让我们创建一个 laravel-worker.conf
文件,该文件启动并监视 queue:work
进程
1[program:laravel-worker] 2process_name=%(program_name)s_%(process_num)02d 3command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3 --max-time=3600 4autostart=true 5autorestart=true 6stopasgroup=true 7killasgroup=true 8user=forge 9numprocs=810redirect_stderr=true11stdout_logfile=/home/forge/app.com/worker.log12stopwaitsecs=3600
在此示例中,numprocs
指令将指示 Supervisor 运行八个 queue:work
进程并监视所有这些进程,并在它们失败时自动重启它们。您应该更改配置的 command
指令以反映您所需的队列连接和工作进程选项。
您应确保 stopwaitsecs
的值大于您的最长运行任务所消耗的秒数。否则,Supervisor 可能会在任务完成处理之前将其杀死。
启动 Supervisor
创建配置文件后,您可以更新 Supervisor 配置并使用以下命令启动进程
1sudo supervisorctl reread2 3sudo supervisorctl update4 5sudo supervisorctl start "laravel-worker:*"
有关 Supervisor 的更多信息,请查阅 Supervisor 文档。
处理失败的任务
有时您的队列任务会失败。别担心,事情并非总是按计划进行!Laravel 提供了一种便捷的方式来 指定任务应尝试的最大次数。异步任务超出此尝试次数后,将被插入到 failed_jobs
数据库表中。同步调度的任务 如果失败,则不会存储在此表中,其异常会立即由应用程序处理。
通常,在新的 Laravel 应用程序中已经存在创建 failed_jobs
表的迁移。但是,如果您的应用程序不包含此表的迁移,您可以使用 make:queue-failed-table
命令来创建迁移
1php artisan make:queue-failed-table2 3php artisan migrate
当运行 队列工作进程 时,您可以使用 queue:work
命令的 --tries
选项来指定任务应尝试的最大次数。如果您没有为 --tries
选项指定值,任务将仅尝试一次,或者尝试 job 类的 $tries
属性指定的次数
1php artisan queue:work redis --tries=3
使用 --backoff
选项,您可以指定 Laravel 在遇到异常后,应等待多少秒再重试任务。默认情况下,任务会立即放回队列,以便可以再次尝试
1php artisan queue:work redis --tries=3 --backoff=3
如果您想配置 Laravel 在遇到异常后,应等待多少秒再重试任务(基于每个任务),您可以通过在您的 job 类上定义 backoff
属性来实现
1/**2 * The number of seconds to wait before retrying the job.3 *4 * @var int5 */6public $backoff = 3;
如果您的任务退避时间需要更复杂的逻辑,您可以在您的 job 类上定义一个 backoff
方法
1/**2 * Calculate the number of seconds to wait before retrying the job.3 */4public function backoff(): int5{6 return 3;7}
您可以通过从 backoff
方法返回一个退避值数组来轻松配置“指数”退避。在本例中,第一次重试的延迟为 1 秒,第二次重试为 5 秒,第三次重试为 10 秒,如果还有更多尝试次数,则每次后续重试都为 10 秒
1/**2 * Calculate the number of seconds to wait before retrying the job.3 *4 * @return array<int, int>5 */6public function backoff(): array7{8 return [1, 5, 10];9}
清理失败任务后
当特定任务失败时,您可能希望向用户发送警报或回滚任务部分完成的任何操作。为了实现这一点,您可以在您的 job 类上定义一个 failed
方法。导致任务失败的 Throwable
实例将传递给 failed
方法
1<?php 2 3namespace App\Jobs; 4 5use App\Models\Podcast; 6use App\Services\AudioProcessor; 7use Illuminate\Contracts\Queue\ShouldQueue; 8use Illuminate\Foundation\Queue\Queueable; 9use Throwable;10 11class ProcessPodcast implements ShouldQueue12{13 use Queueable;14 15 /**16 * Create a new job instance.17 */18 public function __construct(19 public Podcast $podcast,20 ) {}21 22 /**23 * Execute the job.24 */25 public function handle(AudioProcessor $processor): void26 {27 // Process uploaded podcast...28 }29 30 /**31 * Handle a job failure.32 */33 public function failed(?Throwable $exception): void34 {35 // Send user notification of failure, etc...36 }37}
在调用 failed
方法之前,会实例化一个新的任务实例;因此,在 handle
方法中可能发生的任何类属性修改都将丢失。
重试失败的任务
要查看已插入到您的 failed_jobs
数据库表中的所有失败任务,您可以使用 queue:failed
Artisan 命令
1php artisan queue:failed
queue:failed
命令将列出任务 ID、连接、队列、失败时间和有关任务的其他信息。任务 ID 可用于重试失败的任务。例如,要重试 ID 为 ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece
的失败任务,请执行以下命令
1php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece
如有必要,您可以将多个 ID 传递给命令
1php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece 91401d2c-0784-4f43-824c-34f94a33c24d
您还可以重试特定队列的所有失败任务
1php artisan queue:retry --queue=name
要重试所有失败的任务,请执行 queue:retry
命令并将 all
作为 ID 传递
1php artisan queue:retry all
如果您想删除失败的任务,您可以使用 queue:forget
命令
1php artisan queue:forget 91401d2c-0784-4f43-824c-34f94a33c24d
当使用 Horizon 时,您应该使用 horizon:forget
命令来删除失败的任务,而不是 queue:forget
命令。
要从 failed_jobs
表中删除所有失败的任务,您可以使用 queue:flush
命令
1php artisan queue:flush
忽略丢失的模型
当将 Eloquent 模型注入到任务中时,该模型会在放入队列之前自动序列化,并在任务被处理时从数据库中重新检索。但是,如果模型在任务等待工作进程处理期间被删除,您的任务可能会因 ModelNotFoundException
而失败。
为了方便起见,您可以选择通过将任务的 deleteWhenMissingModels
属性设置为 true
来自动删除缺少模型的任务。当此属性设置为 true
时,Laravel 将静默地丢弃任务,而不会引发异常
1/**2 * Delete the job if its models no longer exist.3 *4 * @var bool5 */6public $deleteWhenMissingModels = true;
修剪失败的任务
您可以通过调用 queue:prune-failed
Artisan 命令来清理应用程序 failed_jobs
表中的记录
1php artisan queue:prune-failed
默认情况下,所有超过 24 小时的失败任务记录都将被清理。如果您为命令提供 --hours
选项,则只会保留在过去 N 小时内插入的失败任务记录。例如,以下命令将删除所有在 48 小时前插入的失败任务记录
1php artisan queue:prune-failed --hours=48
在 DynamoDB 中存储失败的任务
Laravel 还支持将失败的任务记录存储在 DynamoDB 中,而不是关系型数据库表中。但是,您必须手动创建一个 DynamoDB 表来存储所有失败的任务记录。通常,此表应命名为 failed_jobs
,但您应该根据应用程序 queue
配置文件中 queue.failed.table
配置值来命名表。
failed_jobs
表应具有名为 application
的字符串主分区键和名为 uuid
的字符串主排序键。键的 application
部分将包含您的应用程序名称,该名称由应用程序 app
配置文件中的 name
配置值定义。由于应用程序名称是 DynamoDB 表键的一部分,因此您可以使用同一张表来存储多个 Laravel 应用程序的失败任务。
此外,请确保您安装了 AWS SDK,以便您的 Laravel 应用程序可以与 Amazon DynamoDB 通信
1composer require aws/aws-sdk-php
接下来,将 queue.failed.driver
配置选项的值设置为 dynamodb
。此外,您应该在失败任务配置数组中定义 key
、secret
和 region
配置选项。这些选项将用于向 AWS 验证身份。当使用 dynamodb
驱动程序时,queue.failed.database
配置选项是不必要的
1'failed' => [2 'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'),3 'key' => env('AWS_ACCESS_KEY_ID'),4 'secret' => env('AWS_SECRET_ACCESS_KEY'),5 'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),6 'table' => 'failed_jobs',7],
禁用失败任务存储
您可以指示 Laravel 丢弃失败的任务而不存储它们,方法是将 queue.failed.driver
配置选项的值设置为 null
。通常,这可以通过 QUEUE_FAILED_DRIVER
环境变量来实现
1QUEUE_FAILED_DRIVER=null
失败任务事件
如果您想注册一个事件监听器,该监听器将在任务失败时被调用,您可以使用 Queue
Facade 的 failing
方法。例如,我们可以从 Laravel 包含的 AppServiceProvider
的 boot
方法中将一个闭包附加到此事件
1<?php 2 3namespace App\Providers; 4 5use Illuminate\Support\Facades\Queue; 6use Illuminate\Support\ServiceProvider; 7use Illuminate\Queue\Events\JobFailed; 8 9class AppServiceProvider extends ServiceProvider10{11 /**12 * Register any application services.13 */14 public function register(): void15 {16 // ...17 }18 19 /**20 * Bootstrap any application services.21 */22 public function boot(): void23 {24 Queue::failing(function (JobFailed $event) {25 // $event->connectionName26 // $event->job27 // $event->exception28 });29 }30}
从队列中清除任务
当使用 Horizon 时,您应该使用 horizon:clear
命令来清除队列中的任务,而不是 queue:clear
命令。
如果您想从默认连接的默认队列中删除所有任务,您可以使用 queue:clear
Artisan 命令
1php artisan queue:clear
您还可以提供 connection
参数和 queue
选项,以从特定连接和队列中删除任务
1php artisan queue:clear redis --queue=emails
清除队列中的任务仅适用于 SQS、Redis 和 database 队列驱动程序。此外,SQS 消息删除过程最多需要 60 秒,因此在您清除队列后 60 秒内发送到 SQS 队列的任务也可能被删除。
监控您的队列
如果您的队列收到大量涌入的任务,它可能会变得 overwhelmed,导致任务完成的等待时间过长。如果您愿意,当您的队列任务计数超过指定阈值时,Laravel 可以提醒您。
要开始使用,您应该计划 每分钟运行一次 queue:monitor
命令。该命令接受您希望监控的队列名称以及您期望的任务计数阈值
1php artisan queue:monitor redis:default,redis:deployments --max=100
仅计划此命令不足以触发通知以提醒您队列的 overwhelmed 状态。当命令遇到任务计数超过阈值的队列时,将分发 Illuminate\Queue\Events\QueueBusy
事件。您可以在应用程序的 AppServiceProvider
中监听此事件,以便向您或您的开发团队发送通知
1use App\Notifications\QueueHasLongWaitTime; 2use Illuminate\Queue\Events\QueueBusy; 3use Illuminate\Support\Facades\Event; 4use Illuminate\Support\Facades\Notification; 5 6/** 7 * Bootstrap any application services. 8 */ 9public function boot(): void10{11 Event::listen(function (QueueBusy $event) {13 ->notify(new QueueHasLongWaitTime(14 $event->connection,15 $event->queue,16 $event->size17 ));18 });19}
测试
当测试调度任务的代码时,您可能希望指示 Laravel 实际上不执行任务本身,因为任务的代码可以与调度它的代码分开直接测试。当然,要测试任务本身,您可以实例化一个任务实例并在您的测试中直接调用 handle
方法。
您可以使用 Queue
Facade 的 fake
方法来阻止队列任务实际推送到队列。在调用 Queue
Facade 的 fake
方法后,您可以断言应用程序尝试将任务推送到队列
1<?php 2 3use App\Jobs\AnotherJob; 4use App\Jobs\FinalJob; 5use App\Jobs\ShipOrder; 6use Illuminate\Support\Facades\Queue; 7 8test('orders can be shipped', function () { 9 Queue::fake();10 11 // Perform order shipping...12 13 // Assert that no jobs were pushed...14 Queue::assertNothingPushed();15 16 // Assert a job was pushed to a given queue...17 Queue::assertPushedOn('queue-name', ShipOrder::class);18 19 // Assert a job was pushed twice...20 Queue::assertPushed(ShipOrder::class, 2);21 22 // Assert a job was not pushed...23 Queue::assertNotPushed(AnotherJob::class);24 25 // Assert that a Closure was pushed to the queue...26 Queue::assertClosurePushed();27 28 // Assert the total number of jobs that were pushed...29 Queue::assertCount(3);30});
1<?php 2 3namespace Tests\Feature; 4 5use App\Jobs\AnotherJob; 6use App\Jobs\FinalJob; 7use App\Jobs\ShipOrder; 8use Illuminate\Support\Facades\Queue; 9use Tests\TestCase;10 11class ExampleTest extends TestCase12{13 public function test_orders_can_be_shipped(): void14 {15 Queue::fake();16 17 // Perform order shipping...18 19 // Assert that no jobs were pushed...20 Queue::assertNothingPushed();21 22 // Assert a job was pushed to a given queue...23 Queue::assertPushedOn('queue-name', ShipOrder::class);24 25 // Assert a job was pushed twice...26 Queue::assertPushed(ShipOrder::class, 2);27 28 // Assert a job was not pushed...29 Queue::assertNotPushed(AnotherJob::class);30 31 // Assert that a Closure was pushed to the queue...32 Queue::assertClosurePushed();33 34 // Assert the total number of jobs that were pushed...35 Queue::assertCount(3);36 }37}
您可以将闭包传递给 assertPushed
或 assertNotPushed
方法,以断言已推送的任务通过了给定的“真值测试”。如果至少推送了一个通过给定真值测试的任务,则断言将成功
1Queue::assertPushed(function (ShipOrder $job) use ($order) {2 return $job->order->id === $order->id;3});
伪造部分任务
如果您只需要模拟特定任务,同时允许其他任务正常执行,您可以将应模拟的任务的类名传递给 fake
方法
1test('orders can be shipped', function () { 2 Queue::fake([ 3 ShipOrder::class, 4 ]); 5 6 // Perform order shipping... 7 8 // Assert a job was pushed twice... 9 Queue::assertPushed(ShipOrder::class, 2);10});
1public function test_orders_can_be_shipped(): void 2{ 3 Queue::fake([ 4 ShipOrder::class, 5 ]); 6 7 // Perform order shipping... 8 9 // Assert a job was pushed twice...10 Queue::assertPushed(ShipOrder::class, 2);11}
您可以使用 except
方法来模拟除一组指定任务之外的所有任务
1Queue::fake()->except([2 ShipOrder::class,3]);
测试任务链
要测试任务链,您需要利用 Bus
Facade 的模拟功能。Bus
Facade 的 assertChained
方法可用于断言已调度 任务链。assertChained
方法接受一个链式任务数组作为其第一个参数
1use App\Jobs\RecordShipment; 2use App\Jobs\ShipOrder; 3use App\Jobs\UpdateInventory; 4use Illuminate\Support\Facades\Bus; 5 6Bus::fake(); 7 8// ... 9 10Bus::assertChained([11 ShipOrder::class,12 RecordShipment::class,13 UpdateInventory::class14]);
正如您在上面的示例中看到的,链式任务数组可以是任务类名称的数组。但是,您也可以提供实际任务实例的数组。这样做时,Laravel 将确保任务实例属于同一类,并且具有与您的应用程序调度的链式任务相同的属性值
1Bus::assertChained([2 new ShipOrder,3 new RecordShipment,4 new UpdateInventory,5]);
您可以使用 assertDispatchedWithoutChain
方法来断言已推送的任务没有任务链
1Bus::assertDispatchedWithoutChain(ShipOrder::class);
测试链修改
如果链式任务 在现有链中前置或后置任务,您可以使用任务的 assertHasChain
方法来断言该任务具有预期的剩余任务链
1$job = new ProcessPodcast;2 3$job->handle();4 5$job->assertHasChain([6 new TranscribePodcast,7 new OptimizePodcast,8 new ReleasePodcast,9]);
assertDoesntHaveChain
方法可用于断言任务的剩余链为空
1$job->assertDoesntHaveChain();
测试链式批处理
如果您的任务链 包含一批任务,您可以通过在链断言中插入 Bus::chainedBatch
定义来断言链式批处理符合您的预期
1use App\Jobs\ShipOrder; 2use App\Jobs\UpdateInventory; 3use Illuminate\Bus\PendingBatch; 4use Illuminate\Support\Facades\Bus; 5 6Bus::assertChained([ 7 new ShipOrder, 8 Bus::chainedBatch(function (PendingBatch $batch) { 9 return $batch->jobs->count() === 3;10 }),11 new UpdateInventory,12]);
测试任务批次
Bus
Facade 的 assertBatched
方法可用于断言已调度 一批任务。传递给 assertBatched
方法的闭包接收 Illuminate\Bus\PendingBatch
的实例,该实例可用于检查批处理中的任务
1use Illuminate\Bus\PendingBatch; 2use Illuminate\Support\Facades\Bus; 3 4Bus::fake(); 5 6// ... 7 8Bus::assertBatched(function (PendingBatch $batch) { 9 return $batch->name == 'import-csv' &&10 $batch->jobs->count() === 10;11});
您可以使用 assertBatchCount
方法来断言已调度给定数量的批处理
1Bus::assertBatchCount(3);
您可以使用 assertNothingBatched
来断言没有调度任何批处理
1Bus::assertNothingBatched();
测试任务/批处理交互
此外,您有时可能需要测试单个任务与其底层批处理的交互。例如,您可能需要测试任务是否取消了其批处理的进一步处理。为了实现这一点,您需要通过 withFakeBatch
方法为任务分配一个模拟批处理。withFakeBatch
方法返回一个元组,其中包含任务实例和模拟批处理
1[$job, $batch] = (new ShipOrder)->withFakeBatch();2 3$job->handle();4 5$this->assertTrue($batch->cancelled());6$this->assertEmpty($batch->added);
测试任务 / 队列交互
有时,您可能需要测试排队任务 是否将自己重新释放回队列。或者,您可能需要测试任务是否已删除自身。您可以通过实例化任务并调用 withFakeQueueInteractions
方法来测试这些队列交互。
一旦任务的队列交互被模拟,您就可以调用任务上的 handle
方法。在调用任务后,可以使用 assertReleased
、assertDeleted
、assertNotDeleted
、assertFailed
、assertFailedWith
和 assertNotFailed
方法对任务的队列交互进行断言
1use App\Exceptions\CorruptedAudioException; 2use App\Jobs\ProcessPodcast; 3 4$job = (new ProcessPodcast)->withFakeQueueInteractions(); 5 6$job->handle(); 7 8$job->assertReleased(delay: 30); 9$job->assertDeleted();10$job->assertNotDeleted();11$job->assertFailed();12$job->assertFailedWith(CorruptedAudioException::class);13$job->assertNotFailed();
任务事件
使用 Queue
Facade 上的 before
和 after
方法,您可以指定在队列任务处理之前或之后执行的回调。这些回调是执行额外日志记录或增加仪表板统计信息的绝佳机会。通常,您应该从 服务提供者 的 boot
方法中调用这些方法。例如,我们可以使用 Laravel 包含的 AppServiceProvider
1<?php 2 3namespace App\Providers; 4 5use Illuminate\Support\Facades\Queue; 6use Illuminate\Support\ServiceProvider; 7use Illuminate\Queue\Events\JobProcessed; 8use Illuminate\Queue\Events\JobProcessing; 9 10class AppServiceProvider extends ServiceProvider11{12 /**13 * Register any application services.14 */15 public function register(): void16 {17 // ...18 }19 20 /**21 * Bootstrap any application services.22 */23 public function boot(): void24 {25 Queue::before(function (JobProcessing $event) {26 // $event->connectionName27 // $event->job28 // $event->job->payload()29 });30 31 Queue::after(function (JobProcessed $event) {32 // $event->connectionName33 // $event->job34 // $event->job->payload()35 });36 }37}
使用 Queue
Facade 上的 looping
方法,您可以指定在工作进程尝试从队列中获取任务之前执行的回调。例如,您可以注册一个闭包来回滚先前失败的任务留下的任何事务
1use Illuminate\Support\Facades\DB;2use Illuminate\Support\Facades\Queue;3 4Queue::looping(function () {5 while (DB::transactionLevel() > 0) {6 DB::rollBack();7 }8});