队列
介绍
在构建您的 Web 应用程序时,您可能有一些任务,例如解析和存储上传的 CSV 文件,这些任务在典型的 Web 请求期间执行时间过长。幸运的是,Laravel 允许您轻松创建可在后台处理的排队作业。通过将耗时任务移到队列中,您的应用程序可以以极快的速度响应 Web 请求,并为您的客户提供更好的用户体验。
Laravel 队列在各种不同的队列后端(如 Amazon SQS、Redis 甚至关系型数据库)中提供统一的队列 API。
Laravel 的队列配置选项存储在应用程序的 config/queue.php
配置文件中。在这个文件中,您将找到每个包含在框架中的队列驱动程序的连接配置,包括数据库、Amazon SQS、Redis 和 Beanstalkd 驱动程序,以及一个将在本地开发期间立即执行作业的同步驱动程序。还包含一个 null
队列驱动程序,它会丢弃排队的作业。
Laravel 现在提供 Horizon,这是一个用于 Redis 支持的队列的精美仪表板和配置系统。查看完整的 Horizon 文档 以了解更多信息。
连接与队列
在开始使用 Laravel 队列之前,了解“连接”和“队列”之间的区别非常重要。在您的 config/queue.php
配置文件中,有一个 connections
配置数组。此选项定义了到后端队列服务的连接,例如 Amazon SQS、Beanstalk 或 Redis。但是,任何给定的队列连接都可以有多个“队列”,可以将其视为不同的堆栈或排队的作业堆。
请注意,queue
配置文件中的每个连接配置示例都包含一个 queue
属性。这是当作业发送到给定连接时将要调度的默认队列。换句话说,如果您调度一个作业,但没有明确定义它应该调度的队列,则该作业将被放置在连接配置的 queue
属性中定义的队列中。
use App\Jobs\ProcessPodcast; // This job is sent to the default connection's default queue...ProcessPodcast::dispatch(); // This job is sent to the default connection's "emails" queue...ProcessPodcast::dispatch()->onQueue('emails');
某些应用程序可能不需要将作业推送到多个队列中,而是更愿意使用一个简单的队列。但是,将作业推送到多个队列对于希望优先处理或细分作业处理方式的应用程序特别有用,因为 Laravel 队列工作器允许您指定它应该按优先级处理哪些队列。例如,如果您将作业推送到 high
队列,则可以运行一个工作器,使其具有更高的处理优先级。
php artisan queue:work --queue=high,default
驱动程序说明和先决条件
数据库
为了使用 database
队列驱动程序,您需要一个数据库表来保存作业。通常,这包含在 Laravel 的默认 0001_01_01_000002_create_jobs_table.php
数据库迁移 中;但是,如果您的应用程序不包含此迁移,则可以使用 make:queue-table
Artisan 命令来创建它。
php artisan make:queue-table php artisan migrate
Redis
为了使用 redis
队列驱动程序,您应该在 config/database.php
配置文件中配置 Redis 数据库连接。
serializer
和 compression
Redis 选项不受 redis
队列驱动程序支持。
Redis 集群
如果您的 Redis 队列连接使用 Redis 集群,则您的队列名称必须包含 键哈希标签。这是为了确保给定队列的所有 Redis 键都放置在同一个哈希槽中。
'redis' => [ 'driver' => 'redis', 'connection' => env('REDIS_QUEUE_CONNECTION', 'default'), 'queue' => env('REDIS_QUEUE', '{default}'), 'retry_after' => env('REDIS_QUEUE_RETRY_AFTER', 90), 'block_for' => null, 'after_commit' => false,],
阻塞
使用 Redis 队列时,您可以使用 block_for
配置选项来指定驱动程序应该等待多长时间以使作业可用,然后再遍历工作器循环并重新轮询 Redis 数据库。
根据您的队列负载调整此值可能比不断轮询 Redis 数据库以查找新作业效率更高。例如,您可以将该值设置为 5
,表示驱动程序在等待作业可用时应该阻塞五秒钟。
'redis' => [ 'driver' => 'redis', 'connection' => env('REDIS_QUEUE_CONNECTION', 'default'), 'queue' => env('REDIS_QUEUE', 'default'), 'retry_after' => env('REDIS_QUEUE_RETRY_AFTER', 90), 'block_for' => 5, 'after_commit' => false,],
将 block_for
设置为 0
将导致队列工作器无限期地阻塞,直到作业可用。这还将阻止处理诸如 SIGTERM
之类的信号,直到下一个作业处理完毕。
其他驱动程序先决条件
列出的队列驱动程序需要以下依赖项。这些依赖项可以通过 Composer 包管理器安装。
- Amazon SQS:
aws/aws-sdk-php ~3.0
- Beanstalkd:
pda/pheanstalk ~5.0
- Redis:
predis/predis ~2.0
或 phpredis PHP 扩展
创建作业
生成作业类
默认情况下,应用程序的所有可排队作业都存储在 app/Jobs
目录中。如果 app/Jobs
目录不存在,则在运行 make:job
Artisan 命令时会创建它。
php artisan make:job ProcessPodcast
生成的类将实现 Illuminate\Contracts\Queue\ShouldQueue
接口,指示 Laravel 该作业应该推送到队列中以异步运行。
作业存根可以使用 存根发布 来自定义。
类结构
作业类非常简单,通常只包含一个 handle
方法,该方法在作业由队列处理时调用。为了开始,让我们看一下作业类的示例。在这个示例中,我们假设我们管理一个播客发布服务,需要在发布前处理上传的播客文件。
<?php namespace App\Jobs; use App\Models\Podcast;use App\Services\AudioProcessor;use Illuminate\Contracts\Queue\ShouldQueue;use Illuminate\Foundation\Queue\Queueable; class ProcessPodcast implements ShouldQueue{ use Queueable; /** * Create a new job instance. */ public function __construct( public Podcast $podcast, ) {} /** * Execute the job. */ public function handle(AudioProcessor $processor): void { // Process uploaded podcast... }}
在这个示例中,请注意我们能够将一个 Eloquent 模型 直接传递到排队作业的构造函数中。由于作业使用的 Queueable
特性,Eloquent 模型及其加载的关系将在作业处理时被优雅地序列化和反序列化。
如果您的排队作业在构造函数中接受 Eloquent 模型,则只有模型的标识符会被序列化到队列中。当作业实际处理时,队列系统会自动从数据库中重新获取完整的模型实例及其加载的关系。这种模型序列化方法允许将更小的作业有效负载发送到您的队列驱动程序。
handle
方法依赖注入
当作业由队列处理时,会调用 handle
方法。请注意,我们可以在作业的 handle
方法上使用类型提示依赖项。Laravel 服务容器 会自动注入这些依赖项。
如果您想完全控制容器如何将依赖项注入 handle
方法,您可以使用容器的 bindMethod
方法。bindMethod
方法接受一个回调函数,该回调函数接收作业和容器。在回调函数中,您可以随意调用 handle
方法。通常,您应该从 App\Providers\AppServiceProvider
服务提供者 的 boot
方法中调用此方法。
use App\Jobs\ProcessPodcast;use App\Services\AudioProcessor;use Illuminate\Contracts\Foundation\Application; $this->app->bindMethod([ProcessPodcast::class, 'handle'], function (ProcessPodcast $job, Application $app) { return $job->handle($app->make(AudioProcessor::class));});
二进制数据(例如原始图像内容)应在传递到排队作业之前通过 base64_encode
函数传递。否则,作业在放入队列时可能无法正确序列化为 JSON。
排队关系
因为所有加载的 Eloquent 模型关系也会在作业排队时被序列化,所以序列化的作业字符串有时会变得非常大。此外,当作业被反序列化并且模型关系从数据库中重新获取时,它们将被完整地获取。在作业排队过程中模型被序列化之前应用的任何先前的关系约束都不会在作业反序列化时应用。因此,如果您希望使用给定关系的子集,您应该在排队作业中重新约束该关系。
或者,要防止关系被序列化,您可以在设置属性值时在模型上调用 withoutRelations
方法。此方法将返回没有加载关系的模型实例。
/** * Create a new job instance. */public function __construct( Podcast $podcast,) { $this->podcast = $podcast->withoutRelations();}
如果您正在使用 PHP 构造函数属性提升,并且希望指示 Eloquent 模型不应序列化其关系,您可以使用 WithoutRelations
属性。
use Illuminate\Queue\Attributes\WithoutRelations; /** * Create a new job instance. */public function __construct( #[WithoutRelations] public Podcast $podcast,) {}
如果作业接收的是 Eloquent 模型的集合或数组,而不是单个模型,那么当作业被反序列化和执行时,该集合中的模型不会恢复其关系。这是为了防止在处理大量模型的作业上过度使用资源。
唯一作业
唯一作业需要支持 锁 的缓存驱动程序。目前,memcached
、redis
、dynamodb
、database
、file
和 array
缓存驱动程序支持原子锁。此外,唯一作业约束不适用于批处理中的作业。
有时,您可能希望确保在任何时间点队列中只有一个特定作业的实例。您可以通过在您的作业类上实现 ShouldBeUnique
接口来做到这一点。此接口不要求您在类上定义任何其他方法。
<?php use Illuminate\Contracts\Queue\ShouldQueue;use Illuminate\Contracts\Queue\ShouldBeUnique; class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique{ ...}
在上面的示例中,UpdateSearchIndex
作业是唯一的。因此,如果另一个作业实例已经在队列中并且尚未完成处理,则不会调度该作业。
在某些情况下,您可能希望定义一个特定的“键”来使作业唯一,或者您可能希望指定一个超时时间,超过该超时时间,作业将不再保持唯一。为此,您可以在作业类上定义 uniqueId
和 uniqueFor
属性或方法。
<?php use App\Models\Product;use Illuminate\Contracts\Queue\ShouldQueue;use Illuminate\Contracts\Queue\ShouldBeUnique; class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique{ /** * The product instance. * * @var \App\Product */ public $product; /** * The number of seconds after which the job's unique lock will be released. * * @var int */ public $uniqueFor = 3600; /** * Get the unique ID for the job. */ public function uniqueId(): string { return $this->product->id; }}
在上面的示例中,UpdateSearchIndex
作业是通过产品 ID 唯一的。因此,任何具有相同产品 ID 的新作业调度都将被忽略,直到现有作业完成处理。此外,如果现有作业在 1 小时内未处理,则唯一锁将被释放,并且可以将具有相同唯一键的另一个作业调度到队列中。
如果您的应用程序从多个 Web 服务器或容器中调度作业,您应该确保所有服务器都与同一个中央缓存服务器通信,以便 Laravel 可以准确地确定作业是否唯一。
在处理开始之前保持作业唯一
默认情况下,唯一作业在作业完成处理或失败所有重试尝试后被“解锁”。但是,在某些情况下,您可能希望您的作业在处理之前立即解锁。为此,您的作业应该实现 ShouldBeUniqueUntilProcessing
合同,而不是 ShouldBeUnique
合同。
<?php use App\Models\Product;use Illuminate\Contracts\Queue\ShouldQueue;use Illuminate\Contracts\Queue\ShouldBeUniqueUntilProcessing; class UpdateSearchIndex implements ShouldQueue, ShouldBeUniqueUntilProcessing{ // ...}
唯一作业锁
在后台,当调度 ShouldBeUnique
作业时,Laravel 会尝试使用 uniqueId
键获取 锁。如果无法获取锁,则不会调度作业。当作业完成处理或失败所有重试尝试时,此锁将被释放。默认情况下,Laravel 将使用默认的缓存驱动程序来获取此锁。但是,如果您希望使用另一个驱动程序来获取锁,则可以定义一个 uniqueVia
方法,该方法返回应使用的缓存驱动程序。
use Illuminate\Contracts\Cache\Repository;use Illuminate\Support\Facades\Cache; class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique{ ... /** * Get the cache driver for the unique job lock. */ public function uniqueVia(): Repository { return Cache::driver('redis'); }}
如果您只需要限制作业的并发处理,请使用 WithoutOverlapping
作业中间件。
加密作业
Laravel 允许您通过 加密 来确保作业数据的隐私性和完整性。要开始使用,只需将 ShouldBeEncrypted
接口添加到作业类中。一旦此接口被添加到类中,Laravel 将自动加密您的作业,然后将其推送到队列中。
<?php use Illuminate\Contracts\Queue\ShouldBeEncrypted;use Illuminate\Contracts\Queue\ShouldQueue; class UpdateSearchIndex implements ShouldQueue, ShouldBeEncrypted{ // ...}
作业中间件
作业中间件允许您在排队作业的执行周围包装自定义逻辑,从而减少作业本身的样板代码。例如,考虑以下 handle
方法,它利用 Laravel 的 Redis 速率限制功能,只允许每 5 秒处理一个作业。
use Illuminate\Support\Facades\Redis; /** * Execute the job. */public function handle(): void{ Redis::throttle('key')->block(0)->allow(1)->every(5)->then(function () { info('Lock obtained...'); // Handle job... }, function () { // Could not obtain lock... return $this->release(5); });}
虽然此代码有效,但 handle
方法的实现变得很混乱,因为它充斥着 Redis 速率限制逻辑。此外,对于我们要速率限制的任何其他作业,此速率限制逻辑都必须重复。
与其在 handle 方法中进行速率限制,不如定义一个处理速率限制的作业中间件。Laravel 没有作业中间件的默认位置,因此您可以将作业中间件放置在应用程序中的任何位置。在本例中,我们将中间件放置在 app/Jobs/Middleware
目录中。
<?php namespace App\Jobs\Middleware; use Closure;use Illuminate\Support\Facades\Redis; class RateLimited{ /** * Process the queued job. * * @param \Closure(object): void $next */ public function handle(object $job, Closure $next): void { Redis::throttle('key') ->block(0)->allow(1)->every(5) ->then(function () use ($job, $next) { // Lock obtained... $next($job); }, function () use ($job) { // Could not obtain lock... $job->release(5); }); }}
如您所见,与 路由中间件 一样,作业中间件接收正在处理的作业以及一个应被调用的回调函数,以继续处理作业。
创建作业中间件后,可以通过从作业的 middleware
方法中返回它们来将它们附加到作业。此方法不存在于由 make:job
Artisan 命令构建的作业上,因此您需要手动将其添加到您的作业类中。
use App\Jobs\Middleware\RateLimited; /** * Get the middleware the job should pass through. * * @return array<int, object> */public function middleware(): array{ return [new RateLimited];}
作业中间件也可以分配给可排队的事件监听器、邮件和通知。
速率限制
虽然我们只是演示了如何编写自己的速率限制作业中间件,但 Laravel 实际上包含一个速率限制中间件,您可以利用它来速率限制作业。与 路由速率限制器 一样,作业速率限制器是使用 RateLimiter
门面的 for
方法定义的。
例如,您可能希望允许用户每小时备份一次数据,而对高级用户不设置此类限制。为此,您可以在 AppServiceProvider
的 boot
方法中定义一个 RateLimiter
。
use Illuminate\Cache\RateLimiting\Limit;use Illuminate\Support\Facades\RateLimiter; /** * Bootstrap any application services. */public function boot(): void{ RateLimiter::for('backups', function (object $job) { return $job->user->vipCustomer() ? Limit::none() : Limit::perHour(1)->by($job->user->id); });}
在上面的示例中,我们定义了一个每小时的速率限制;但是,您可以轻松地使用 perMinute
方法定义基于分钟的速率限制。此外,您可以将您希望传递给速率限制的 by
方法的任何值;但是,此值最常用于按客户细分速率限制。
return Limit::perMinute(50)->by($job->user->id);
定义速率限制后,您可以使用 Illuminate\Queue\Middleware\RateLimited
中间件将速率限制器附加到您的作业。每次作业超过速率限制时,此中间件都会根据速率限制持续时间,将作业重新释放回队列,并伴随适当的延迟。
use Illuminate\Queue\Middleware\RateLimited; /** * Get the middleware the job should pass through. * * @return array<int, object> */public function middleware(): array{ return [new RateLimited('backups')];}
将速率限制的作业重新释放回队列仍然会增加作业的总 attempts
次数。您可能希望相应地调整作业类中的 tries
和 maxExceptions
属性。或者,您可能希望使用 retryUntil
方法 来定义作业应该停止尝试的时间。
如果您不希望作业在速率限制时被重试,您可以使用 dontRelease
方法。
/** * Get the middleware the job should pass through. * * @return array<int, object> */public function middleware(): array{ return [(new RateLimited('backups'))->dontRelease()];}
如果您正在使用 Redis,您可以使用 Illuminate\Queue\Middleware\RateLimitedWithRedis
中间件,该中间件针对 Redis 进行了微调,并且比基本速率限制中间件更高效。
防止作业重叠
Laravel 包含一个 Illuminate\Queue\Middleware\WithoutOverlapping
中间件,它允许您根据任意键来防止作业重叠。当排队作业正在修改一个资源时,此功能非常有用,该资源应该一次只被一个作业修改。
例如,假设您有一个排队作业,它更新用户的信用评分,并且您想防止对同一个用户 ID 的信用评分更新作业重叠。为此,您可以从作业的 middleware
方法中返回 WithoutOverlapping
中间件。
use Illuminate\Queue\Middleware\WithoutOverlapping; /** * Get the middleware the job should pass through. * * @return array<int, object> */public function middleware(): array{ return [new WithoutOverlapping($this->user->id)];}
任何类型的重叠作业都将被重新释放回队列。您也可以指定在重新释放的作业再次尝试之前必须经过的秒数。
/** * Get the middleware the job should pass through. * * @return array<int, object> */public function middleware(): array{ return [(new WithoutOverlapping($this->order->id))->releaseAfter(60)];}
如果您希望立即删除所有重叠作业,以便它们不会被重试,您可以使用 dontRelease
方法。
/** * Get the middleware the job should pass through. * * @return array<int, object> */public function middleware(): array{ return [(new WithoutOverlapping($this->order->id))->dontRelease()];}
WithoutOverlapping
中间件由 Laravel 的原子锁功能提供支持。有时,您的作业可能会以意外的方式失败或超时,从而导致锁未释放。因此,您可以使用 expireAfter
方法显式定义锁过期时间。例如,下面的示例将指示 Laravel 在作业开始处理 3 分钟后释放 WithoutOverlapping
锁。
/** * Get the middleware the job should pass through. * * @return array<int, object> */public function middleware(): array{ return [(new WithoutOverlapping($this->order->id))->expireAfter(180)];}
WithoutOverlapping
中间件需要支持 锁 的缓存驱动程序。目前,memcached
、redis
、dynamodb
、database
、file
和 array
缓存驱动程序支持原子锁。
在作业类之间共享锁键
默认情况下,WithoutOverlapping
中间件只会防止相同类的作业重叠。因此,虽然两个不同的作业类可能使用相同的锁键,但不会阻止它们重叠。但是,您可以指示 Laravel 使用 shared
方法在作业类之间应用键。
use Illuminate\Queue\Middleware\WithoutOverlapping; class ProviderIsDown{ // ... public function middleware(): array { return [ (new WithoutOverlapping("status:{$this->provider}"))->shared(), ]; }} class ProviderIsUp{ // ... public function middleware(): array { return [ (new WithoutOverlapping("status:{$this->provider}"))->shared(), ]; }}
限制异常
Laravel 包含一个 Illuminate\Queue\Middleware\ThrottlesExceptions
中间件,它允许您限制异常。一旦作业抛出给定数量的异常,所有进一步尝试执行该作业的尝试都会被延迟,直到指定的时段过去。此中间件对于与不稳定的第三方服务交互的作业特别有用。
例如,假设您有一个排队作业,它与一个开始抛出异常的第三方 API 交互。为了限制异常,您可以从作业的 middleware
方法中返回 ThrottlesExceptions
中间件。通常,此中间件应该与实现 基于时间的尝试 的作业配对。
use DateTime;use Illuminate\Queue\Middleware\ThrottlesExceptions; /** * Get the middleware the job should pass through. * * @return array<int, object> */public function middleware(): array{ return [new ThrottlesExceptions(10, 5 * 60)];} /** * Determine the time at which the job should timeout. */public function retryUntil(): DateTime{ return now()->addMinutes(30);}
中间件接受的第一个构造函数参数是作业在被限制之前可以抛出的异常数量,而第二个构造函数参数是作业被限制后,在再次尝试该作业之前应该经过的秒数。在上面的代码示例中,如果作业连续抛出 10 个异常,我们将等待 5 分钟,然后再尝试该作业,受 30 分钟时间限制的约束。
当作业抛出异常但尚未达到异常阈值时,该作业通常会立即被重试。但是,您可以指定这样的作业应该被延迟的分钟数,方法是在将中间件附加到作业时调用 backoff
方法。
use Illuminate\Queue\Middleware\ThrottlesExceptions; /** * Get the middleware the job should pass through. * * @return array<int, object> */public function middleware(): array{ return [(new ThrottlesExceptions(10, 5 * 60))->backoff(5)];}
在内部,此中间件使用 Laravel 的缓存系统来实现速率限制,并且作业的类名用作缓存的“键”。您可以在将中间件附加到作业时通过调用 `by` 方法来覆盖此键。如果您有多个作业与相同的第三方服务交互,并且您希望它们共享一个通用的节流“桶”,这将很有用。
use Illuminate\Queue\Middleware\ThrottlesExceptions; /** * Get the middleware the job should pass through. * * @return array<int, object> */public function middleware(): array{ return [(new ThrottlesExceptions(10, 10 * 60))->by('key')];}
默认情况下,此中间件将对所有异常进行节流。您可以通过在将中间件附加到作业时调用 `when` 方法来修改此行为。然后,只有在提供给 `when` 方法的闭包返回 `true` 时,才会对异常进行节流。
use Illuminate\Http\Client\HttpClientException;use Illuminate\Queue\Middleware\ThrottlesExceptions; /** * Get the middleware the job should pass through. * * @return array<int, object> */public function middleware(): array{ return [(new ThrottlesExceptions(10, 10 * 60))->when( fn (Throwable $throwable) => $throwable instanceof HttpClientException )];}
如果您希望将被节流的异常报告给应用程序的异常处理程序,您可以在将中间件附加到作业时通过调用 `report` 方法来实现。或者,您可以在 `report` 方法中提供一个闭包,并且只有在给定闭包返回 `true` 时才会报告异常。
use Illuminate\Http\Client\HttpClientException;use Illuminate\Queue\Middleware\ThrottlesExceptions; /** * Get the middleware the job should pass through. * * @return array<int, object> */public function middleware(): array{ return [(new ThrottlesExceptions(10, 10 * 60))->report( fn (Throwable $throwable) => $throwable instanceof HttpClientException )];}
如果您使用 Redis,您可以使用 `Illuminate\Queue\Middleware\ThrottlesExceptionsWithRedis` 中间件,该中间件针对 Redis 进行了微调,并且比基本异常节流中间件更高效。
跳过作业
`Skip` 中间件允许您指定一个作业应该被跳过/删除,而无需修改作业的逻辑。 `Skip::when` 方法将在给定条件计算结果为 `true` 时删除作业,而 `Skip::unless` 方法将在条件计算结果为 `false` 时删除作业。
use Illuminate\Queue\Middleware\Skip; /*** Get the middleware the job should pass through.*/public function middleware(): array{ return [ Skip::when($someCondition), ];}
您还可以将一个 `Closure` 传递给 `when` 和 `unless` 方法,以进行更复杂的条件评估。
use Illuminate\Queue\Middleware\Skip; /*** Get the middleware the job should pass through.*/public function middleware(): array{ return [ Skip::when(function (): bool { return $this->shouldSkip(); }), ];}
调度作业
在编写作业类后,您可以使用作业本身的 `dispatch` 方法来调度它。传递给 `dispatch` 方法的参数将传递给作业的构造函数。
<?php namespace App\Http\Controllers; use App\Http\Controllers\Controller;use App\Jobs\ProcessPodcast;use App\Models\Podcast;use Illuminate\Http\RedirectResponse;use Illuminate\Http\Request; class PodcastController extends Controller{ /** * Store a new podcast. */ public function store(Request $request): RedirectResponse { $podcast = Podcast::create(/* ... */); // ... ProcessPodcast::dispatch($podcast); return redirect('/podcasts'); }}
如果您想有条件地调度作业,可以使用 `dispatchIf` 和 `dispatchUnless` 方法。
ProcessPodcast::dispatchIf($accountActive, $podcast); ProcessPodcast::dispatchUnless($accountSuspended, $podcast);
在新 Laravel 应用程序中, `sync` 驱动程序是默认的队列驱动程序。此驱动程序在当前请求的前台同步执行作业,这在本地开发期间通常很方便。如果您想真正开始将作业排队以进行后台处理,您可以在应用程序的 `config/queue.php` 配置文件中指定不同的队列驱动程序。
延迟调度
如果您想指定一个作业不应该立即可供队列工作者处理,您可以在调度作业时使用 `delay` 方法。例如,让我们指定一个作业在调度后 10 分钟前不应该可用以进行处理。
<?php namespace App\Http\Controllers; use App\Http\Controllers\Controller;use App\Jobs\ProcessPodcast;use App\Models\Podcast;use Illuminate\Http\RedirectResponse;use Illuminate\Http\Request; class PodcastController extends Controller{ /** * Store a new podcast. */ public function store(Request $request): RedirectResponse { $podcast = Podcast::create(/* ... */); // ... ProcessPodcast::dispatch($podcast) ->delay(now()->addMinutes(10)); return redirect('/podcasts'); }}
在某些情况下,作业可能具有默认的延迟配置。如果您需要绕过此延迟并调度一个作业以立即处理,您可以使用 `withoutDelay` 方法。
ProcessPodcast::dispatch($podcast)->withoutDelay();
Amazon SQS 队列服务的最大延迟时间为 15 分钟。
在将响应发送到浏览器后调度
或者,如果您使用的是 FastCGI, `dispatchAfterResponse` 方法会延迟调度作业,直到将 HTTP 响应发送到用户的浏览器之后。即使排队的作业仍在执行,这仍然允许用户开始使用应用程序。这通常只应该用于大约需要一秒钟的作业,例如发送电子邮件。因为它们是在当前 HTTP 请求中处理的,所以以这种方式调度的作业不需要队列工作者运行才能被处理。
use App\Jobs\SendNotification; SendNotification::dispatchAfterResponse();
您也可以 `dispatch` 一个闭包并将 `afterResponse` 方法链接到 `dispatch` 助手,以便在将 HTTP 响应发送到浏览器后执行闭包。
use App\Mail\WelcomeMessage;use Illuminate\Support\Facades\Mail; dispatch(function () {})->afterResponse();
同步调度
如果您想立即(同步)调度作业,可以使用 `dispatchSync` 方法。使用此方法时,作业不会排队,并且将在当前进程中立即执行。
<?php namespace App\Http\Controllers; use App\Http\Controllers\Controller;use App\Jobs\ProcessPodcast;use App\Models\Podcast;use Illuminate\Http\RedirectResponse;use Illuminate\Http\Request; class PodcastController extends Controller{ /** * Store a new podcast. */ public function store(Request $request): RedirectResponse { $podcast = Podcast::create(/* ... */); // Create podcast... ProcessPodcast::dispatchSync($podcast); return redirect('/podcasts'); }}
作业与数据库事务
虽然在数据库事务中调度作业是完全可以的,但您应该特别注意确保您的作业能够成功执行。在事务中调度作业时,作业可能在父事务提交之前由工作者处理。当发生这种情况时,您在数据库事务中对模型或数据库记录进行的任何更新可能尚未反映在数据库中。此外,在事务中创建的任何模型或数据库记录可能不存在于数据库中。
值得庆幸的是,Laravel 提供了几种解决此问题的方法。首先,您可以在队列连接的配置数组中设置 `after_commit` 连接选项。
'redis' => [ 'driver' => 'redis', // ... 'after_commit' => true,],
当 `after_commit` 选项为 `true` 时,您可以在数据库事务中调度作业;但是,Laravel 会等到打开的父数据库事务提交后才会真正调度作业。当然,如果当前没有打开数据库事务,作业将立即调度。
如果事务由于事务期间发生的异常而回滚,则在该事务期间调度的作业将被丢弃。
将 `after_commit` 配置选项设置为 `true` 还会导致任何排队的事件监听器、邮件、通知和广播事件在所有打开的数据库事务提交后被调度。
内联指定提交调度行为
如果您没有将 `after_commit` 队列连接配置选项设置为 `true`,您仍然可以指示特定作业应该在所有打开的数据库事务提交后被调度。为此,您可以将 `afterCommit` 方法链接到您的调度操作。
use App\Jobs\ProcessPodcast; ProcessPodcast::dispatch($podcast)->afterCommit();
同样,如果 `after_commit` 配置选项设置为 `true`,您也可以指示特定作业应该立即调度,而无需等待任何打开的数据库事务提交。
ProcessPodcast::dispatch($podcast)->beforeCommit();
作业链
作业链接允许您指定一个排队作业列表,这些作业应该在主作业成功执行后按顺序运行。如果序列中的一个作业失败,则不会运行其余的作业。要执行排队作业链,可以使用 `Bus` 门面提供的 `chain` 方法。Laravel 的命令总线是一个更底层的组件,排队作业调度是在其之上构建的。
use App\Jobs\OptimizePodcast;use App\Jobs\ProcessPodcast;use App\Jobs\ReleasePodcast;use Illuminate\Support\Facades\Bus; Bus::chain([ new ProcessPodcast, new OptimizePodcast, new ReleasePodcast,])->dispatch();
除了链接作业类实例之外,您还可以链接闭包。
Bus::chain([ new ProcessPodcast, new OptimizePodcast, function () { Podcast::update(/* ... */); },])->dispatch();
使用作业中的 `$this->delete()` 方法删除作业不会阻止链式作业被处理。该链将仅在链中的作业失败时停止执行。
链连接和队列
如果您想指定应该用于链式作业的连接和队列,可以使用 `onConnection` 和 `onQueue` 方法。这些方法指定应使用的队列连接和队列名称,除非排队作业明确分配了不同的连接/队列。
Bus::chain([ new ProcessPodcast, new OptimizePodcast, new ReleasePodcast,])->onConnection('redis')->onQueue('podcasts')->dispatch();
将作业添加到链中
有时,您可能需要从链中的另一个作业中将作业预置或附加到现有的作业链中。您可以使用 `prependToChain` 和 `appendToChain` 方法来完成此操作。
/** * Execute the job. */public function handle(): void{ // ... // Prepend to the current chain, run job immediately after current job... $this->prependToChain(new TranscribePodcast); // Append to the current chain, run job at end of chain... $this->appendToChain(new TranscribePodcast);}
链失败
在链接作业时,可以使用 `catch` 方法指定一个闭包,该闭包应该在链中的作业失败时调用。给定的回调将接收导致作业失败的 `Throwable` 实例。
use Illuminate\Support\Facades\Bus;use Throwable; Bus::chain([ new ProcessPodcast, new OptimizePodcast, new ReleasePodcast,])->catch(function (Throwable $e) { // A job within the chain has failed...})->dispatch();
由于链回调是在稍后时间由 Laravel 队列序列化和执行的,因此您不应该在链回调中使用 `$this` 变量。
自定义队列和连接
调度到特定队列
通过将作业推送到不同的队列,您可以“分类”排队的作业,甚至优先考虑分配给各个队列的工作者数量。请记住,这不会将作业推送到队列配置文件中定义的不同队列“连接”,而只会推送到单个连接中的特定队列。要指定队列,请在调度作业时使用 `onQueue` 方法。
<?php namespace App\Http\Controllers; use App\Http\Controllers\Controller;use App\Jobs\ProcessPodcast;use App\Models\Podcast;use Illuminate\Http\RedirectResponse;use Illuminate\Http\Request; class PodcastController extends Controller{ /** * Store a new podcast. */ public function store(Request $request): RedirectResponse { $podcast = Podcast::create(/* ... */); // Create podcast... ProcessPodcast::dispatch($podcast)->onQueue('processing'); return redirect('/podcasts'); }}
或者,您可以在作业的构造函数中调用 `onQueue` 方法来指定作业的队列。
<?php namespace App\Jobs; use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Foundation\Queue\Queueable; class ProcessPodcast implements ShouldQueue{ use Queueable; /** * Create a new job instance. */ public function __construct() { $this->onQueue('processing'); }}
调度到特定连接
如果您的应用程序与多个队列连接交互,可以使用 `onConnection` 方法指定要将作业推送到哪个连接。
<?php namespace App\Http\Controllers; use App\Http\Controllers\Controller;use App\Jobs\ProcessPodcast;use App\Models\Podcast;use Illuminate\Http\RedirectResponse;use Illuminate\Http\Request; class PodcastController extends Controller{ /** * Store a new podcast. */ public function store(Request $request): RedirectResponse { $podcast = Podcast::create(/* ... */); // Create podcast... ProcessPodcast::dispatch($podcast)->onConnection('sqs'); return redirect('/podcasts'); }}
您可以将 `onConnection` 和 `onQueue` 方法链接在一起,以指定作业的连接和队列。
ProcessPodcast::dispatch($podcast) ->onConnection('sqs') ->onQueue('processing');
或者,您可以在作业的构造函数中调用 `onConnection` 方法来指定作业的连接。
<?php namespace App\Jobs; use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Foundation\Queue\Queueable; class ProcessPodcast implements ShouldQueue{ use Queueable; /** * Create a new job instance. */ public function __construct() { $this->onConnection('sqs'); }}
指定最大作业尝试次数/超时值
最大尝试次数
如果排队的作业之一遇到错误,您可能不希望它无限期地不断重试。因此,Laravel 提供了多种方法来指定可以尝试作业的次数或持续时间。
指定可以尝试作业的最大次数的一种方法是通过 Artisan 命令行上的 `--tries` 开关。这将应用于工作者处理的所有作业,除非正在处理的作业指定了它可以尝试的次数。
php artisan queue:work --tries=3
如果作业超过其最大尝试次数,它将被视为“失败”作业。有关处理失败作业的更多信息,请参阅 失败作业文档。如果为 `queue:work` 命令提供了 `--tries=0`,则作业将无限期地重试。
您可以通过在作业类本身中定义可以尝试作业的最大次数来采用更细粒度的方法。如果在作业上指定了最大尝试次数,它将优先于命令行上提供的 `--tries` 值。
<?php namespace App\Jobs; class ProcessPodcast implements ShouldQueue{ /** * The number of times the job may be attempted. * * @var int */ public $tries = 5;}
如果您需要动态控制特定作业的最大尝试次数,可以在作业上定义一个 `tries` 方法。
/** * Determine number of times the job may be attempted. */public function tries(): int{ return 5;}
基于时间尝试
作为定义作业在失败之前可以尝试多少次的替代方法,您可以定义不再尝试作业的时间。这允许在给定时间范围内对作业进行任意次数的尝试。要定义不再尝试作业的时间,请在作业类中添加 `retryUntil` 方法。此方法应该返回一个 `DateTime` 实例。
use DateTime; /** * Determine the time at which the job should timeout. */public function retryUntil(): DateTime{ return now()->addMinutes(10);}
您也可以在 排队的事件监听器 上定义 `tries` 属性或 `retryUntil` 方法。
最大异常次数
有时您可能希望指定作业可以尝试很多次,但如果重试是由给定数量的未处理异常触发的(而不是直接由 `release` 方法释放的),则应该失败。为此,您可以在作业类上定义一个 `maxExceptions` 属性。
<?php namespace App\Jobs; use Illuminate\Support\Facades\Redis; class ProcessPodcast implements ShouldQueue{ /** * The number of times the job may be attempted. * * @var int */ public $tries = 25; /** * The maximum number of unhandled exceptions to allow before failing. * * @var int */ public $maxExceptions = 3; /** * Execute the job. */ public function handle(): void { Redis::throttle('key')->allow(10)->every(60)->then(function () { // Lock obtained, process the podcast... }, function () { // Unable to obtain lock... return $this->release(10); }); }}
在此示例中,如果应用程序无法获得 Redis 锁,则作业将被释放 10 秒,并将继续重试最多 25 次。但是,如果作业抛出三个未处理的异常,则作业将失败。
超时
通常,您大约知道排队的作业需要多长时间。因此,Laravel 允许您指定“超时”值。默认情况下,超时值为 60 秒。如果作业的处理时间超过超时值指定的秒数,则处理作业的工作者将退出并出现错误。通常,工作者将由 服务器上配置的进程管理器 自动重新启动。
可以使用 Artisan 命令行上的 `--timeout` 开关指定作业可以运行的最大秒数。
php artisan queue:work --timeout=30
如果作业由于不断超时而超过其最大尝试次数,它将被标记为失败。
您也可以在作业类本身定义作业允许运行的最大秒数。如果作业中指定了超时,则优先于命令行中指定的任何超时。
<?php namespace App\Jobs; class ProcessPodcast implements ShouldQueue{ /** * The number of seconds the job can run before timing out. * * @var int */ public $timeout = 120;}
有时,套接字或传出 HTTP 连接等 IO 阻塞进程可能不会遵守您指定的超时。因此,在使用这些功能时,您应该始终尝试使用它们的 API 指定超时。例如,在使用 Guzzle 时,您应该始终指定连接和请求超时值。
必须安装 pcntl
PHP 扩展才能指定作业超时。此外,作业的“超时”值应该始终小于其 “重试后” 值。否则,作业可能会在实际完成执行或超时之前重新尝试。
超时失败
如果您想指示作业在超时时应被标记为 失败,您可以在作业类上定义 $failOnTimeout
属性。
/** * Indicate if the job should be marked as failed on timeout. * * @var bool */public $failOnTimeout = true;
错误处理
如果在处理作业时抛出异常,作业将自动释放回队列,以便可以再次尝试。作业将继续被释放,直到它被应用程序允许的最大次数尝试。最大尝试次数由 queue:work
Artisan 命令中使用的 --tries
开关定义。或者,最大尝试次数可以在作业类本身定义。有关运行队列工作程序的更多信息,请 参见下文。
手动释放作业
有时您可能希望手动将作业释放回队列,以便稍后再次尝试。您可以通过调用 release
方法来实现这一点。
/** * Execute the job. */public function handle(): void{ // ... $this->release();}
默认情况下,release
方法会将作业释放回队列以立即处理。但是,您可以指示队列不要在给定秒数过去之前使作业可供处理,方法是将整数或日期实例传递给 release
方法。
$this->release(10); $this->release(now()->addSeconds(10));
手动使作业失败
有时您可能需要手动将作业标记为“失败”。为此,您可以调用 fail
方法。
/** * Execute the job. */public function handle(): void{ // ... $this->fail();}
如果您想因为您捕获的异常而将您的作业标记为失败,您可以将异常传递给 fail
方法。或者,为了方便起见,您可以传递一个字符串错误消息,该消息将为您转换为异常。
$this->fail($exception); $this->fail('Something went wrong.');
有关失败作业的更多信息,请查看有关 处理作业失败的文档。
作业批处理
Laravel 的作业批处理功能允许您轻松地执行一批作业,然后在该批作业完成执行后执行某些操作。在开始之前,您应该创建一个数据库迁移来构建一个表,该表将包含有关您的作业批次的元信息,例如它们的完成百分比。可以使用 make:queue-batches-table
Artisan 命令生成此迁移。
php artisan make:queue-batches-table php artisan migrate
定义可批处理的作业
要定义可批处理的作业,您应该像往常一样 创建一个可排队的作业;但是,您应该将 Illuminate\Bus\Batchable
特性添加到作业类中。此特性提供对 batch
方法的访问,该方法可用于检索作业正在执行的当前批次。
<?php namespace App\Jobs; use Illuminate\Bus\Batchable;use Illuminate\Contracts\Queue\ShouldQueue;use Illuminate\Foundation\Queue\Queueable; class ImportCsv implements ShouldQueue{ use Batchable, Queueable; /** * Execute the job. */ public function handle(): void { if ($this->batch()->cancelled()) { // Determine if the batch has been cancelled... return; } // Import a portion of the CSV file... }}
调度批处理
要调度一批作业,您应该使用 Bus
门面的 batch
方法。当然,批处理主要在与完成回调结合使用时才有用。因此,您可以使用 then
、catch
和 finally
方法来定义批次的完成回调。当这些回调被调用时,它们中的每一个都会收到一个 Illuminate\Bus\Batch
实例。在这个例子中,我们将想象我们正在排队一批作业,这些作业分别处理 CSV 文件中给定数量的行。
use App\Jobs\ImportCsv;use Illuminate\Bus\Batch;use Illuminate\Support\Facades\Bus;use Throwable; $batch = Bus::batch([ new ImportCsv(1, 100), new ImportCsv(101, 200), new ImportCsv(201, 300), new ImportCsv(301, 400), new ImportCsv(401, 500),])->before(function (Batch $batch) { // The batch has been created but no jobs have been added...})->progress(function (Batch $batch) { // A single job has completed successfully...})->then(function (Batch $batch) { // All jobs completed successfully...})->catch(function (Batch $batch, Throwable $e) { // First batch job failure detected...})->finally(function (Batch $batch) { // The batch has finished executing...})->dispatch(); return $batch->id;
可以通过 $batch->id
属性访问的批次的 ID 可以用于 查询 Laravel 命令总线 以获取有关批次的信息,这些信息在它被调度后可用。
由于批处理回调是在以后由 Laravel 队列序列化和执行的,因此您不应该在回调中使用 $this
变量。此外,由于批处理作业被包装在数据库事务中,因此不应该在作业中执行触发隐式提交的数据库语句。
命名批次
某些工具(如 Laravel Horizon 和 Laravel Telescope)如果命名批次,则可能为批次提供更人性化的调试信息。要为批次分配任意名称,您可以在定义批次时调用 name
方法。
$batch = Bus::batch([ // ...])->then(function (Batch $batch) { // All jobs completed successfully...})->name('Import CSV')->dispatch();
批次连接和队列
如果您想指定应为批处理作业使用的连接和队列,您可以使用 onConnection
和 onQueue
方法。所有批处理作业必须在同一个连接和队列中执行。
$batch = Bus::batch([ // ...])->then(function (Batch $batch) { // All jobs completed successfully...})->onConnection('redis')->onQueue('imports')->dispatch();
链和批处理
您可以在批次中定义一组 链式作业,方法是将链式作业放在数组中。例如,我们可以并行执行两个作业链,并在两个作业链完成处理后执行回调。
use App\Jobs\ReleasePodcast;use App\Jobs\SendPodcastReleaseNotification;use Illuminate\Bus\Batch;use Illuminate\Support\Facades\Bus; Bus::batch([ [ new ReleasePodcast(1), new SendPodcastReleaseNotification(1), ], [ new ReleasePodcast(2), new SendPodcastReleaseNotification(2), ],])->then(function (Batch $batch) { // ...})->dispatch();
相反,您可以在 链 中运行一批作业,方法是在链中定义批次。例如,您可以先运行一批作业来发布多个播客,然后运行一批作业来发送发布通知。
use App\Jobs\FlushPodcastCache;use App\Jobs\ReleasePodcast;use App\Jobs\SendPodcastReleaseNotification;use Illuminate\Support\Facades\Bus; Bus::chain([ new FlushPodcastCache, Bus::batch([ new ReleasePodcast(1), new ReleasePodcast(2), ]), Bus::batch([ new SendPodcastReleaseNotification(1), new SendPodcastReleaseNotification(2), ]),])->dispatch();
将作业添加到批处理中
有时在批处理作业中添加更多作业到批次中可能很有用。当您需要批处理数千个作业(这些作业可能需要很长时间才能在 web 请求期间调度)时,此模式可能很有用。因此,您可能希望调度一批初始的“加载器”作业,这些作业会用更多作业来填充批次。
$batch = Bus::batch([ new LoadImportBatch, new LoadImportBatch, new LoadImportBatch,])->then(function (Batch $batch) { // All jobs completed successfully...})->name('Import Contacts')->dispatch();
在这个例子中,我们将使用 LoadImportBatch
作业来用其他作业填充批次。要实现这一点,我们可以使用可以通过作业的 batch
方法访问的批次实例上的 add
方法。
use App\Jobs\ImportContacts;use Illuminate\Support\Collection; /** * Execute the job. */public function handle(): void{ if ($this->batch()->cancelled()) { return; } $this->batch()->add(Collection::times(1000, function () { return new ImportContacts; }));}
您只能从属于同一批次的作业中将作业添加到批次中。
检查批处理
提供给批次完成回调的 Illuminate\Bus\Batch
实例具有一系列属性和方法,可帮助您与给定的作业批次进行交互并检查它。
// The UUID of the batch...$batch->id; // The name of the batch (if applicable)...$batch->name; // The number of jobs assigned to the batch...$batch->totalJobs; // The number of jobs that have not been processed by the queue...$batch->pendingJobs; // The number of jobs that have failed...$batch->failedJobs; // The number of jobs that have been processed thus far...$batch->processedJobs(); // The completion percentage of the batch (0-100)...$batch->progress(); // Indicates if the batch has finished executing...$batch->finished(); // Cancel the execution of the batch...$batch->cancel(); // Indicates if the batch has been cancelled...$batch->cancelled();
从路由返回批次
所有 Illuminate\Bus\Batch
实例都是 JSON 可序列化的,这意味着您可以直接从应用程序的路由之一返回它们以检索包含有关批次信息的 JSON 负载,包括其完成进度。这使得在应用程序的 UI 中显示有关批次完成进度的信息变得很方便。
要通过 ID 检索批次,您可以使用 Bus
门面的 findBatch
方法。
use Illuminate\Support\Facades\Bus;use Illuminate\Support\Facades\Route; Route::get('/batch/{batchId}', function (string $batchId) { return Bus::findBatch($batchId);});
取消批处理
有时您可能需要取消给定批次的执行。这可以通过调用 Illuminate\Bus\Batch
实例上的 cancel
方法来实现。
/** * Execute the job. */public function handle(): void{ if ($this->user->exceedsImportLimit()) { return $this->batch()->cancel(); } if ($this->batch()->cancelled()) { return; }}
正如您可能在前面的示例中注意到的那样,批处理作业通常应该在继续执行之前确定其相应的批次是否已被取消。但是,为了方便起见,您可以将 SkipIfBatchCancelled
中间件 分配给该作业。顾名思义,此中间件将指示 Laravel 在其相应的批次被取消的情况下不要处理该作业。
use Illuminate\Queue\Middleware\SkipIfBatchCancelled; /** * Get the middleware the job should pass through. */public function middleware(): array{ return [new SkipIfBatchCancelled];}
批处理失败
当批处理作业失败时,将调用 catch
回调(如果已分配)。此回调仅针对批次中第一个失败的作业调用。
允许失败
当批次中的作业失败时,Laravel 会自动将批次标记为“已取消”。如果您愿意,您可以禁用此行为,以便作业失败不会自动将批次标记为已取消。这可以通过在调度批次时调用 allowFailures
方法来实现。
$batch = Bus::batch([ // ...])->then(function (Batch $batch) { // All jobs completed successfully...})->allowFailures()->dispatch();
重试失败的批处理作业
为了方便起见,Laravel 提供了一个 queue:retry-batch
Artisan 命令,允许您轻松地重试给定批次的所有失败作业。queue:retry-batch
命令接受应重试其失败作业的批次的 UUID。
php artisan queue:retry-batch 32dbc76c-4f82-4749-b610-a639fe0099b5
修剪批处理
如果没有修剪,job_batches
表可能会非常快地累积记录。为了减轻这种情况,您应该 安排 queue:prune-batches
Artisan 命令每天运行一次。
use Illuminate\Support\Facades\Schedule; Schedule::command('queue:prune-batches')->daily();
默认情况下,所有超过 24 小时的已完成批次将被修剪。您可以在调用命令时使用 hours
选项来确定保留批次数据的时长。例如,以下命令将删除所有超过 48 小时完成的批次。
use Illuminate\Support\Facades\Schedule; Schedule::command('queue:prune-batches --hours=48')->daily();
有时,您的 jobs_batches
表可能会累积从未成功完成的批次的批次记录,例如,作业失败并且该作业从未成功重试的批次。您可以指示 queue:prune-batches
命令使用 unfinished
选项来修剪这些未完成的批次记录。
use Illuminate\Support\Facades\Schedule; Schedule::command('queue:prune-batches --hours=48 --unfinished=72')->daily();
同样,您的 jobs_batches
表也可能会累积已取消批次的批次记录。您可以指示 queue:prune-batches
命令使用 cancelled
选项来修剪这些已取消的批次记录。
use Illuminate\Support\Facades\Schedule; Schedule::command('queue:prune-batches --hours=48 --cancelled=72')->daily();
将批处理存储在 DynamoDB 中
Laravel 还支持将批次元信息存储在 DynamoDB 中,而不是关系型数据库中。但是,您需要手动创建一个 DynamoDB 表来存储所有批次记录。
通常,此表应该命名为 job_batches
,但您应该根据应用程序的 queue
配置文件中的 queue.batching.table
配置值的价值来命名该表。
DynamoDB 批次表配置
job_batches
表应该有一个名为 application
的字符串主分区键和一个名为 id
的字符串主排序键。键的 application
部分将包含应用程序的名称,该名称由应用程序的 app
配置文件中的 name
配置值定义。由于应用程序名称是 DynamoDB 表键的一部分,因此您可以使用同一个表来存储多个 Laravel 应用程序的作业批次。
此外,如果您想利用 自动批次修剪,则可以为您的表定义 ttl
属性。
DynamoDB 配置
接下来,安装 AWS SDK,以便您的 Laravel 应用程序可以与 Amazon DynamoDB 通信。
composer require aws/aws-sdk-php
然后,将 queue.batching.driver
配置选项的值设置为 dynamodb
。此外,您应该在 batching
配置数组中定义 key
、secret
和 region
配置选项。这些选项将用于对 AWS 进行身份验证。当使用 dynamodb
驱动程序时,queue.batching.database
配置选项是不必要的。
'batching' => [ 'driver' => env('QUEUE_BATCHING_DRIVER', 'dynamodb'), 'key' => env('AWS_ACCESS_KEY_ID'), 'secret' => env('AWS_SECRET_ACCESS_KEY'), 'region' => env('AWS_DEFAULT_REGION', 'us-east-1'), 'table' => 'job_batches',],
在 DynamoDB 中修剪批次
当利用 DynamoDB 来存储作业批次信息时,用于修剪存储在关系型数据库中的批次的典型修剪命令将不起作用。相反,您可以利用 DynamoDB 的原生 TTL 功能 来自动删除旧批次的记录。
如果你在定义 DynamoDB 表时使用了 `ttl` 属性,你可以定义配置参数来指示 Laravel 如何清理批量记录。`queue.batching.ttl_attribute` 配置值定义了保存 TTL 的属性名称,而 `queue.batching.ttl` 配置值定义了从 DynamoDB 表中移除批量记录的时间,相对于最后一次更新记录的时间。
'batching' => [ 'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'), 'key' => env('AWS_ACCESS_KEY_ID'), 'secret' => env('AWS_SECRET_ACCESS_KEY'), 'region' => env('AWS_DEFAULT_REGION', 'us-east-1'), 'table' => 'job_batches', 'ttl_attribute' => 'ttl', 'ttl' => 60 * 60 * 24 * 7, // 7 days...],
排队闭包
除了将作业类分派到队列,你也可以分派一个闭包。这非常适合需要在当前请求周期之外执行的快速简单的任务。当将闭包分派到队列时,闭包的代码内容会进行加密签名,以确保它在传输过程中不会被修改。
$podcast = App\Podcast::find(1); dispatch(function () use ($podcast) { $podcast->publish();});
使用 `catch` 方法,你可以提供一个闭包,如果排队的闭包在耗尽所有队列的 配置的重试次数 后仍未成功完成,则应执行此闭包。
use Throwable; dispatch(function () use ($podcast) { $podcast->publish();})->catch(function (Throwable $e) { // This job has failed...});
由于 `catch` 回调是在稍后的时间被 Laravel 队列序列化和执行的,因此你应该在 `catch` 回调中避免使用 `$this` 变量。
运行队列工作器
queue:work
命令
Laravel 包含一个 Artisan 命令,它将启动一个队列工作进程并处理推送到队列中的新作业。你可以使用 `queue:work` Artisan 命令运行工作进程。请注意,一旦 `queue:work` 命令启动,它将一直运行,直到你手动停止它或关闭终端。
php artisan queue:work
为了让 `queue:work` 进程在后台永久运行,你应该使用进程监控工具(如 Supervisor)来确保队列工作进程不会停止运行。
如果你希望在命令的输出中包含已处理的作业 ID,可以在调用 `queue:work` 命令时包含 `-v` 标志。
php artisan queue:work -v
请记住,队列工作进程是长期运行的进程,并将启动的应用程序状态存储在内存中。因此,在它们启动后,它们不会注意到代码库中的变化。所以,在部署过程中,务必 重新启动队列工作进程。此外,请记住,你的应用程序创建或修改的任何静态状态都不会在作业之间自动重置。
或者,你可以运行 `queue:listen` 命令。使用 `queue:listen` 命令时,你不必在想要重新加载已更新的代码或重置应用程序状态时手动重启工作进程;但是,此命令的效率明显低于 `queue:work` 命令。
php artisan queue:listen
运行多个队列工作进程
要将多个工作进程分配到队列并并发处理作业,只需启动多个 `queue:work` 进程即可。这可以在本地通过终端中的多个标签完成,也可以在生产环境中使用进程管理器的配置设置完成。 使用 Supervisor 时,你可以使用 `numprocs` 配置值。
指定连接和队列
你也可以指定工作进程应该使用哪个队列连接。传递给 `work` 命令的连接名称应与 `config/queue.php` 配置文件中定义的连接之一相对应。
php artisan queue:work redis
默认情况下,`queue:work` 命令只处理给定连接上的默认队列的作业。但是,你可以通过只处理给定连接上的特定队列来进一步定制你的队列工作进程。例如,如果所有电子邮件都在 `redis` 队列连接上的 `emails` 队列中处理,你可以发出以下命令来启动一个只处理该队列的工作进程:
php artisan queue:work redis --queue=emails
处理指定的作业数量
可以使用 `--once` 选项来指示工作进程只从队列中处理一个作业。
php artisan queue:work --once
可以使用 `--max-jobs` 选项来指示工作进程处理给定数量的作业,然后退出。此选项与 Supervisor 结合使用时非常有用,这样你的工作进程可以在处理完给定数量的作业后自动重启,释放它们可能积累的任何内存。
php artisan queue:work --max-jobs=1000
处理所有排队的作业,然后退出
可以使用 `--stop-when-empty` 选项来指示工作进程处理完所有作业后优雅地退出。如果要在 Docker 容器中处理 Laravel 队列,并且希望队列为空后关闭容器,则此选项非常有用。
php artisan queue:work --stop-when-empty
处理给定时间段内的作业
可以使用 `--max-time` 选项来指示工作进程处理给定时间段内的作业,然后退出。此选项与 Supervisor 结合使用时非常有用,这样你的工作进程可以在处理完给定时间段内的作业后自动重启,释放它们可能积累的任何内存。
# Process jobs for one hour and then exit...php artisan queue:work --max-time=3600
工作进程睡眠时长
当队列中有作业可用时,工作进程将持续处理作业,作业之间没有延迟。但是,`sleep` 选项决定了如果没有作业可用,工作进程将“睡眠”多少秒。当然,在睡眠时,工作进程不会处理任何新作业。
php artisan queue:work --sleep=3
维护模式和队列
当你的应用程序处于 维护模式 时,不会处理任何排队的作业。一旦应用程序退出维护模式,作业将继续像往常一样处理。
要强制队列工作进程即使在维护模式下也处理作业,可以使用 `--force` 选项。
php artisan queue:work --force
资源注意事项
守护进程队列工作进程在处理每个作业之前不会“重启”框架。因此,你应该在每个作业完成后释放任何繁重的资源。例如,如果你使用 GD 库进行图像操作,则在完成图像处理后,应该使用 `imagedestroy` 释放内存。
队列优先级
有时你可能希望优先考虑如何处理你的队列。例如,在 `config/queue.php` 配置文件中,你可以将 `redis` 连接的默认 `queue` 设置为 `low`。但是,有时你可能希望将作业推送到 `high` 优先级队列,如下所示:
dispatch((new Job)->onQueue('high'));
要启动一个工作进程,该工作进程会验证所有 `high` 队列作业都在处理完毕后才继续处理 `low` 队列上的任何作业,请将队列名称的逗号分隔列表传递给 `work` 命令:
php artisan queue:work --queue=high,low
队列工作器和部署
由于队列工作进程是长期运行的进程,因此它们不会在没有重启的情况下注意到代码的变化。所以,使用队列工作进程部署应用程序最简单的方法是在部署过程中重启工作进程。你可以通过发出 `queue:restart` 命令来优雅地重启所有工作进程。
php artisan queue:restart
此命令将指示所有队列工作进程在完成当前作业的处理后优雅地退出,这样就不会丢失任何现有的作业。由于在执行 `queue:restart` 命令时队列工作进程将退出,因此你应该运行一个进程管理器(如 Supervisor)来自动重启队列工作进程。
队列使用 缓存 来存储重启信号,因此你应该在使用此功能之前验证应用程序是否为你的应用程序正确配置了缓存驱动。
作业过期和超时
作业过期
在 `config/queue.php` 配置文件中,每个队列连接都定义了一个 `retry_after` 选项。此选项指定队列连接在重试正在处理的作业之前应该等待多少秒。例如,如果 `retry_after` 的值为 `90`,则如果作业在 90 秒内未被释放或删除,则该作业将被释放回队列。通常,你应该将 `retry_after` 值设置为你的作业合理完成处理的最大秒数。
唯一没有包含 `retry_after` 值的队列连接是 Amazon SQS。SQS 将根据 默认可见性超时 重试作业,该超时由 AWS 控制台中管理。
工作进程超时
`queue:work` Artisan 命令公开了 `--timeout` 选项。默认情况下,`--timeout` 值为 60 秒。如果作业处理时间超过超时值指定的秒数,则处理该作业的工作进程将退出并出现错误。通常,工作进程将被服务器上配置的 进程管理器 自动重启。
php artisan queue:work --timeout=60
`retry_after` 配置选项和 `--timeout` CLI 选项是不同的,但它们共同作用,以确保作业不会丢失,并且作业只成功处理一次。
`--timeout` 值应该始终比 `retry_after` 配置值短几秒钟。这将确保在重试作业之前,始终终止处理冻结作业的工作进程。如果你的 `--timeout` 选项长于 `retry_after` 配置值,你的作业可能会被处理两次。
Supervisor 配置
在生产环境中,你需要一种方法来保持 `queue:work` 进程运行。`queue:work` 进程可能由于各种原因停止运行,例如工作进程超时或执行 `queue:restart` 命令。
因此,你需要配置一个进程监控工具,它可以检测到 `queue:work` 进程何时退出并自动重启它们。此外,进程监控工具允许你指定要并发运行多少个 `queue:work` 进程。Supervisor 是 Linux 环境中常用的进程监控工具,我们将在以下文档中讨论如何配置它。
安装 Supervisor
Supervisor 是 Linux 操作系统的进程监控工具,如果 `queue:work` 进程失败,它将自动重启它们。要在 Ubuntu 上安装 Supervisor,可以使用以下命令:
sudo apt-get install supervisor
如果你觉得自己配置和管理 Supervisor 太麻烦,可以考虑使用 Laravel Forge,它会自动为你的生产 Laravel 项目安装和配置 Supervisor。
配置 Supervisor
Supervisor 配置文件通常存储在 `/etc/supervisor/conf.d` 目录中。在这个目录中,你可以创建任意数量的配置文件,这些配置文件指示 Supervisor 如何监控你的进程。例如,让我们创建一个 `laravel-worker.conf` 文件,该文件启动并监控 `queue:work` 进程:
[program:laravel-worker]process_name=%(program_name)s_%(process_num)02dcommand=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3 --max-time=3600autostart=trueautorestart=truestopasgroup=truekillasgroup=trueuser=forgenumprocs=8redirect_stderr=truestdout_logfile=/home/forge/app.com/worker.logstopwaitsecs=3600
在这个例子中,`numprocs` 指令将指示 Supervisor 运行八个 `queue:work` 进程并监控所有进程,如果它们失败,会自动重启它们。你应该更改配置的 `command` 指令以反映你想要的队列连接和工作进程选项。
你应该确保 `stopwaitsecs` 的值大于你的最长时间运行的作业消耗的秒数。否则,Supervisor 可能会在作业完成处理之前杀死它。
启动 Supervisor
创建完配置文件后,可以使用以下命令更新 Supervisor 配置并启动进程:
sudo supervisorctl reread sudo supervisorctl update sudo supervisorctl start "laravel-worker:*"
有关 Supervisor 的更多信息,请参阅 Supervisor 文档。
处理失败的作业
有时候你排队的作业会失败。别担心,事情并不总是按计划进行! Laravel 提供了一种便捷的方法来 指定作业尝试的最大次数。当异步作业超过了这个尝试次数后,它将被插入到 failed_jobs
数据库表中。 同步调度作业 失败后不会存储在此表中,它们的异常会立即由应用程序处理。
在新的 Laravel 应用程序中,通常已经存在一个创建 failed_jobs
表的迁移。但是,如果你的应用程序不包含此表的迁移,你可以使用 make:queue-failed-table
命令来创建迁移
php artisan make:queue-failed-table php artisan migrate
在运行 队列工作进程 时,你可以在 queue:work
命令上使用 --tries
选项来指定作业尝试的最大次数。如果你没有为 --tries
选项指定值,则作业只会被尝试一次,或者尝试的次数由作业类的 $tries
属性指定
php artisan queue:work redis --tries=3
使用 --backoff
选项,你可以指定 Laravel 应该等待多长时间才能重试遇到异常的作业。默认情况下,作业会立即被释放回队列,以便再次尝试
php artisan queue:work redis --tries=3 --backoff=3
如果你想在每个作业的基础上配置 Laravel 应该等待多长时间才能重试遇到异常的作业,你可以在作业类中定义一个 backoff
属性
/** * The number of seconds to wait before retrying the job. * * @var int */public $backoff = 3;
如果你需要更复杂的逻辑来确定作业的回退时间,你可以在作业类中定义一个 backoff
方法
/*** Calculate the number of seconds to wait before retrying the job.*/public function backoff(): int{ return 3;}
你可以通过从 backoff
方法返回一个回退值的数组来轻松配置“指数”回退。在这个例子中,重试延迟对于第一次重试为 1 秒,对于第二次重试为 5 秒,对于第三次重试为 10 秒,并且如果还有更多尝试次数,则每次后续重试都为 10 秒
/*** Calculate the number of seconds to wait before retrying the job.** @return array<int, int>*/public function backoff(): array{ return [1, 5, 10];}
清理失败作业
当某个作业失败时,你可能希望向用户发送警报或撤消作业部分完成的任何操作。为此,你可以在作业类中定义一个 failed
方法。导致作业失败的 Throwable
实例将被传递给 failed
方法
<?php namespace App\Jobs; use App\Models\Podcast;use App\Services\AudioProcessor;use Illuminate\Contracts\Queue\ShouldQueue;use Illuminate\Foundation\Queue\Queueable;use Throwable; class ProcessPodcast implements ShouldQueue{ use Queueable; /** * Create a new job instance. */ public function __construct( public Podcast $podcast, ) {} /** * Execute the job. */ public function handle(AudioProcessor $processor): void { // Process uploaded podcast... } /** * Handle a job failure. */ public function failed(?Throwable $exception): void { // Send user notification of failure, etc... }}
在调用 failed
方法之前会实例化一个新的作业实例;因此,在 handle
方法中可能发生的任何类属性修改都将丢失。
重试失败作业
要查看已插入到 failed_jobs
数据库表中的所有失败作业,你可以使用 queue:failed
Artisan 命令
php artisan queue:failed
queue:failed
命令将列出作业 ID、连接、队列、失败时间以及有关作业的其他信息。作业 ID 可用于重试失败的作业。例如,要重试一个 ID 为 ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece
的失败作业,请执行以下命令
php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece
如果需要,你可以在命令中传递多个 ID
php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece 91401d2c-0784-4f43-824c-34f94a33c24d
你也可以重试特定队列的所有失败作业
php artisan queue:retry --queue=name
要重试所有失败的作业,请执行 queue:retry
命令并将 all
作为 ID 传递
php artisan queue:retry all
如果你想删除一个失败的作业,可以使用 queue:forget
命令
php artisan queue:forget 91401d2c-0784-4f43-824c-34f94a33c24d
当使用 Horizon 时,你应该使用 horizon:forget
命令来删除失败的作业,而不是 queue:forget
命令。
要从 failed_jobs
表中删除所有失败的作业,可以使用 queue:flush
命令
php artisan queue:flush
忽略丢失的模型
当将 Eloquent 模型注入作业时,模型会在被放置到队列之前自动序列化,并在作业被工作进程处理时从数据库中重新检索。但是,如果模型在作业等待被工作进程处理时被删除,你的作业可能会失败并出现 ModelNotFoundException
。
为了方便起见,你可以选择通过将作业的 deleteWhenMissingModels
属性设置为 true
来自动删除缺少模型的作业。当此属性设置为 true
时,Laravel 会静默地丢弃作业,不会引发异常
/** * Delete the job if its models no longer exist. * * @var bool */public $deleteWhenMissingModels = true;
修剪失败作业
你可以通过调用 queue:prune-failed
Artisan 命令来修剪应用程序的 failed_jobs
表中的记录
php artisan queue:prune-failed
默认情况下,所有超过 24 小时的失败作业记录都将被修剪。如果你为命令提供 --hours
选项,则只保留在过去 N 小时内插入的失败作业记录。例如,以下命令将删除所有在 48 小时前插入的失败作业记录
php artisan queue:prune-failed --hours=48
将失败作业存储在 DynamoDB 中
Laravel 还支持将失败的作业记录存储在 DynamoDB 中,而不是关系型数据库表中。但是,你必须手动创建一个 DynamoDB 表来存储所有失败的作业记录。通常,此表应该命名为 failed_jobs
,但你应该根据应用程序 queue
配置文件中的 queue.failed.table
配置值来命名表。
failed_jobs
表应该有一个名为 application
的字符串主分区键和一个名为 uuid
的字符串主排序键。键的 application
部分将包含你的应用程序的名称,该名称由应用程序 app
配置文件中的 name
配置值定义。由于应用程序名称是 DynamoDB 表键的一部分,因此你可以使用同一个表来存储多个 Laravel 应用程序的失败作业。
此外,请确保安装 AWS SDK,以便你的 Laravel 应用程序能够与 Amazon DynamoDB 通信
composer require aws/aws-sdk-php
接下来,将 queue.failed.driver
配置选项的值设置为 dynamodb
。此外,你应该在失败的作业配置数组中定义 key
、secret
和 region
配置选项。这些选项将用于对 AWS 进行身份验证。当使用 dynamodb
驱动程序时,queue.failed.database
配置选项是不必要的
'failed' => [ 'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'), 'key' => env('AWS_ACCESS_KEY_ID'), 'secret' => env('AWS_SECRET_ACCESS_KEY'), 'region' => env('AWS_DEFAULT_REGION', 'us-east-1'), 'table' => 'failed_jobs',],
禁用失败作业存储
你可以指示 Laravel 在不存储失败作业的情况下丢弃它们,方法是将 queue.failed.driver
配置选项的值设置为 null
。通常,这可以通过 QUEUE_FAILED_DRIVER
环境变量来完成
QUEUE_FAILED_DRIVER=null
失败作业事件
如果你想注册一个事件监听器,该监听器将在作业失败时被调用,可以使用 Queue
门面的 failing
方法。例如,我们可以从 Laravel 附带的 AppServiceProvider
的 boot
方法中将一个闭包附加到此事件
<?php namespace App\Providers; use Illuminate\Support\Facades\Queue;use Illuminate\Support\ServiceProvider;use Illuminate\Queue\Events\JobFailed; class AppServiceProvider extends ServiceProvider{ /** * Register any application services. */ public function register(): void { // ... } /** * Bootstrap any application services. */ public function boot(): void { Queue::failing(function (JobFailed $event) { // $event->connectionName // $event->job // $event->exception }); }}
从队列中清除作业
当使用 Horizon 时,你应该使用 horizon:clear
命令来清除队列中的作业,而不是 queue:clear
命令。
如果你想从默认连接的默认队列中删除所有作业,可以使用 queue:clear
Artisan 命令
php artisan queue:clear
你也可以提供 connection
参数和 queue
选项来从特定连接和队列中删除作业
php artisan queue:clear redis --queue=emails
清除队列中的作业只适用于 SQS、Redis 和数据库队列驱动程序。此外,SQS 消息删除过程最多需要 60 秒,因此在你清除队列后最多 60 秒内发送到 SQS 队列的作业也可能会被删除。
监控您的队列
如果你的队列突然收到大量作业,它可能会不堪重负,导致作业完成的等待时间很长。如果你愿意,Laravel 可以在你队列作业数超过指定阈值时提醒你。
要开始,你应该将 queue:monitor
命令调度为 每分钟运行一次。该命令接受你要监控的队列的名称以及你想要的作业数量阈值
php artisan queue:monitor redis:default,redis:deployments --max=100
仅调度此命令不足以触发通知,提醒你队列已不堪重负。当命令遇到作业数量超过你设置阈值的队列时,将派发一个 Illuminate\Queue\Events\QueueBusy
事件。你可以在应用程序的 AppServiceProvider
中监听此事件,以便向你或你的开发团队发送通知
use App\Notifications\QueueHasLongWaitTime;use Illuminate\Queue\Events\QueueBusy;use Illuminate\Support\Facades\Event;use Illuminate\Support\Facades\Notification; /** * Bootstrap any application services. */public function boot(): void{ Event::listen(function (QueueBusy $event) { ->notify(new QueueHasLongWaitTime( $event->connection, $event->queue, $event->size )); });}
测试
当测试调度作业的代码时,你可能希望指示 Laravel 不要实际执行作业本身,因为作业的代码可以直接测试,并且与调度它的代码分开测试。当然,要测试作业本身,你可以实例化一个作业实例,并在测试中直接调用 handle
方法。
你可以使用 Queue
门面的 fake
方法来防止排队的作业实际被推送到队列中。在调用 Queue
门面的 fake
方法后,你就可以断言应用程序尝试将作业推送到队列中
<?php use App\Jobs\AnotherJob;use App\Jobs\FinalJob;use App\Jobs\ShipOrder;use Illuminate\Support\Facades\Queue; test('orders can be shipped', function () { Queue::fake(); // Perform order shipping... // Assert that no jobs were pushed... Queue::assertNothingPushed(); // Assert a job was pushed to a given queue... Queue::assertPushedOn('queue-name', ShipOrder::class); // Assert a job was pushed twice... Queue::assertPushed(ShipOrder::class, 2); // Assert a job was not pushed... Queue::assertNotPushed(AnotherJob::class); // Assert that a Closure was pushed to the queue... Queue::assertClosurePushed(); // Assert the total number of jobs that were pushed... Queue::assertCount(3);});
<?php namespace Tests\Feature; use App\Jobs\AnotherJob;use App\Jobs\FinalJob;use App\Jobs\ShipOrder;use Illuminate\Support\Facades\Queue;use Tests\TestCase; class ExampleTest extends TestCase{ public function test_orders_can_be_shipped(): void { Queue::fake(); // Perform order shipping... // Assert that no jobs were pushed... Queue::assertNothingPushed(); // Assert a job was pushed to a given queue... Queue::assertPushedOn('queue-name', ShipOrder::class); // Assert a job was pushed twice... Queue::assertPushed(ShipOrder::class, 2); // Assert a job was not pushed... Queue::assertNotPushed(AnotherJob::class); // Assert that a Closure was pushed to the queue... Queue::assertClosurePushed(); // Assert the total number of jobs that were pushed... Queue::assertCount(3); }}
你可以将一个闭包传递给 assertPushed
或 assertNotPushed
方法,以断言一个通过了给定“真值测试”的作业被推送到队列中。如果至少有一个通过了给定真值测试的作业被推送到队列中,则断言将成功
Queue::assertPushed(function (ShipOrder $job) use ($order) { return $job->order->id === $order->id;});
模拟部分作业
如果你只需要模拟特定的作业,而让其他作业正常执行,可以将要模拟的作业的类名传递给 fake
方法
test('orders can be shipped', function () { Queue::fake([ ShipOrder::class, ]); // Perform order shipping... // Assert a job was pushed twice... Queue::assertPushed(ShipOrder::class, 2);});
public function test_orders_can_be_shipped(): void{ Queue::fake([ ShipOrder::class, ]); // Perform order shipping... // Assert a job was pushed twice... Queue::assertPushed(ShipOrder::class, 2);}
你可以使用 except
方法模拟除一组指定作业之外的所有作业
Queue::fake()->except([ ShipOrder::class,]);
测试作业链
要测试作业链,你需要利用 Bus
门面的模拟功能。Bus
门面的 assertChained
方法可用于断言一个 作业链 被调度。assertChained
方法接受一个作业链数组作为它的第一个参数
use App\Jobs\RecordShipment;use App\Jobs\ShipOrder;use App\Jobs\UpdateInventory;use Illuminate\Support\Facades\Bus; Bus::fake(); // ... Bus::assertChained([ ShipOrder::class, RecordShipment::class, UpdateInventory::class]);
正如你在上面的例子中看到的,作业链数组可以是作业类名的数组。但是,你也可以提供实际作业实例的数组。这样做时,Laravel 将确保作业实例属于相同的类,并且具有应用程序调度的作业链的相同属性值
Bus::assertChained([ new ShipOrder, new RecordShipment, new UpdateInventory,]);
你可以使用 assertDispatchedWithoutChain
方法来断言一个作业被推送到队列中,但没有作业链
Bus::assertDispatchedWithoutChain(ShipOrder::class);
测试链修改
如果一个作业链 在现有链的开头或结尾添加作业,你可以使用作业的 assertHasChain
方法来断言该作业具有预期的剩余作业链
$job = new ProcessPodcast; $job->handle(); $job->assertHasChain([ new TranscribePodcast, new OptimizePodcast, new ReleasePodcast,]);
assertDoesntHaveChain
方法可用于断言作业的剩余链为空
$job->assertDoesntHaveChain();
测试链式批处理
如果你的作业链 包含一批作业,你可以通过在你的链断言中插入 Bus::chainedBatch
定义来断言链式批处理符合你的预期
use App\Jobs\ShipOrder;use App\Jobs\UpdateInventory;use Illuminate\Bus\PendingBatch;use Illuminate\Support\Facades\Bus; Bus::assertChained([ new ShipOrder, Bus::chainedBatch(function (PendingBatch $batch) { return $batch->jobs->count() === 3; }), new UpdateInventory,]);
测试作业批处理
Bus
门面的 assertBatched
方法可用于断言一个 作业批处理 被调度。传递给 assertBatched
方法的闭包接收一个 Illuminate\Bus\PendingBatch
实例,该实例可用于检查批处理中的作业
use Illuminate\Bus\PendingBatch;use Illuminate\Support\Facades\Bus; Bus::fake(); // ... Bus::assertBatched(function (PendingBatch $batch) { return $batch->name == 'import-csv' && $batch->jobs->count() === 10;});
你可以使用 assertBatchCount
方法来断言已调度了给定数量的批处理
Bus::assertBatchCount(3);
你可以使用 assertNothingBatched
来断言没有调度任何批处理
Bus::assertNothingBatched();
测试作业/批处理交互
此外,你可能偶尔需要测试单个作业与其底层批处理的交互。例如,你可能需要测试一个作业是否取消了对其批处理的进一步处理。为此,你需要通过 withFakeBatch
方法将一个假批处理分配给作业。withFakeBatch
方法返回一个元组,其中包含作业实例和假批处理
[$job, $batch] = (new ShipOrder)->withFakeBatch(); $job->handle(); $this->assertTrue($batch->cancelled());$this->assertEmpty($batch->added);
测试作业/队列交互
有时候,你可能需要测试一个排队的作业 将自己释放回队列。或者,你可能需要测试作业是否删除了自己。你可以通过实例化作业并调用 withFakeQueueInteractions
方法来测试这些队列交互。
一旦作业的队列交互被模拟,你就可以调用作业上的 handle
方法。调用作业后,可以使用 assertReleased
、assertDeleted
、assertNotDeleted
、assertFailed
和 assertNotFailed
方法对作业的队列交互进行断言
use App\Jobs\ProcessPodcast; $job = (new ProcessPodcast)->withFakeQueueInteractions(); $job->handle(); $job->assertReleased(delay: 30);$job->assertDeleted();$job->assertNotDeleted();$job->assertFailed();$job->assertNotFailed();
作业事件
在 Queue
的 外观 上使用 before
和 after
方法,可以指定在排队作业处理之前或之后执行的回调函数。这些回调函数是执行额外日志记录或增加仪表盘统计信息的绝佳机会。通常,应在 服务提供者 的 boot
方法中调用这些方法。例如,我们可以使用 Laravel 附带的 AppServiceProvider
。
<?php namespace App\Providers; use Illuminate\Support\Facades\Queue;use Illuminate\Support\ServiceProvider;use Illuminate\Queue\Events\JobProcessed;use Illuminate\Queue\Events\JobProcessing; class AppServiceProvider extends ServiceProvider{ /** * Register any application services. */ public function register(): void { // ... } /** * Bootstrap any application services. */ public function boot(): void { Queue::before(function (JobProcessing $event) { // $event->connectionName // $event->job // $event->job->payload() }); Queue::after(function (JobProcessed $event) { // $event->connectionName // $event->job // $event->job->payload() }); }}
在 Queue
的 外观 上使用 looping
方法,可以指定在 worker 尝试从队列中获取作业之前执行的回调函数。例如,您可以注册一个闭包来回滚先前失败作业留下的任何未完成的事务。
use Illuminate\Support\Facades\DB;use Illuminate\Support\Facades\Queue; Queue::looping(function () { while (DB::transactionLevel() > 0) { DB::rollBack(); }});