跳至内容

队列

介绍

在构建 Web 应用程序时,您可能有一些任务,例如解析和存储上传的 CSV 文件,这些任务在典型的 Web 请求期间执行时间过长。值得庆幸的是,Laravel 允许您轻松创建可以在后台处理的排队作业。通过将耗时的任务移动到队列中,您的应用程序可以快速响应 Web 请求,并为您的客户提供更好的用户体验。

Laravel 队列在各种不同的队列后端(例如 Amazon SQSRedis 甚至关系数据库)上提供统一的排队 API。

Laravel 的队列配置选项存储在应用程序的 config/queue.php 配置文件中。在此文件中,您将找到框架中包含的每个队列驱动程序的连接配置,包括数据库、Amazon SQSRedisBeanstalkd 驱动程序,以及将立即执行作业的同步驱动程序(用于本地开发期间)。还包括一个 null 队列驱动程序,该驱动程序会丢弃排队的作业。

lightbulb

Laravel 现在提供 Horizon,这是一个漂亮的仪表板和 Redis 驱动的队列的配置系统。请查看完整的 Horizon 文档以获取更多信息。

连接与队列

在开始使用 Laravel 队列之前,务必了解“连接”和“队列”之间的区别。在您的 config/queue.php 配置文件中,有一个 connections 配置数组。此选项定义与后端队列服务(例如 Amazon SQS、Beanstalk 或 Redis)的连接。但是,任何给定的队列连接都可能具有多个“队列”,这些队列可以被视为不同的堆栈或排队作业堆。

请注意,在 queue 配置文件中的每个连接配置示例都包含一个 queue 属性。这是将作业发送到给定连接时,作业将被分派到的默认队列。换句话说,如果您在没有明确定义作业应分派到哪个队列的情况下分派作业,则该作业将被放置在连接配置的 queue 属性中定义的队列中。

use App\Jobs\ProcessPodcast;
 
// This job is sent to the default connection's default queue...
ProcessPodcast::dispatch();
 
// This job is sent to the default connection's "emails" queue...
ProcessPodcast::dispatch()->onQueue('emails');

某些应用程序可能不需要将作业推送到多个队列,而是倾向于使用一个简单的队列。但是,将作业推送到多个队列对于希望优先处理或细分作业处理方式的应用程序特别有用,因为 Laravel 队列工作进程允许您指定它应该按优先级处理哪些队列。例如,如果您将作业推送到 high 队列,您可以运行一个给予它们更高处理优先级的 worker。

php artisan queue:work --queue=high,default

驱动程序说明和先决条件

数据库

为了使用 database 队列驱动程序,您需要一个数据库表来保存作业。通常,这包含在 Laravel 的默认 0001_01_01_000002_create_jobs_table.php 数据库迁移中;但是,如果您的应用程序不包含此迁移,您可以使用 make:queue-table Artisan 命令来创建它。

php artisan make:queue-table
 
php artisan migrate

Redis

为了使用 redis 队列驱动程序,您应该在 config/database.php 配置文件中配置一个 Redis 数据库连接。

exclamation

redis 队列驱动程序不支持 serializercompression Redis 选项。

Redis 集群

如果您的 Redis 队列连接使用 Redis 集群,则您的队列名称必须包含一个 键哈希标签。这是为了确保给定队列的所有 Redis 键都放置在同一个哈希槽中。

'redis' => [
'driver' => 'redis',
'connection' => env('REDIS_QUEUE_CONNECTION', 'default'),
'queue' => env('REDIS_QUEUE', '{default}'),
'retry_after' => env('REDIS_QUEUE_RETRY_AFTER', 90),
'block_for' => null,
'after_commit' => false,
],

阻塞

当使用 Redis 队列时,您可以使用 block_for 配置选项来指定驱动程序在迭代工作进程循环并重新轮询 Redis 数据库之前应等待作业变为可用的时间。

根据您的队列负载调整此值可能比持续轮询 Redis 数据库以获取新作业更有效。例如,您可以将该值设置为 5,以指示驱动程序在等待作业变为可用时应阻塞五秒钟。

'redis' => [
'driver' => 'redis',
'connection' => env('REDIS_QUEUE_CONNECTION', 'default'),
'queue' => env('REDIS_QUEUE', 'default'),
'retry_after' => env('REDIS_QUEUE_RETRY_AFTER', 90),
'block_for' => 5,
'after_commit' => false,
],
exclamation

block_for 设置为 0 将导致队列工作进程无限期阻塞,直到有作业可用。这也会阻止在处理下一个作业之前处理诸如 SIGTERM 之类的信号。

其他驱动程序先决条件

以下列出的队列驱动程序需要以下依赖项。这些依赖项可以通过 Composer 包管理器安装。

  • Amazon SQS: aws/aws-sdk-php ~3.0
  • Beanstalkd: pda/pheanstalk ~5.0
  • Redis: predis/predis ~2.0 或 phpredis PHP 扩展
  • MongoDB: mongodb/laravel-mongodb

创建作业

生成作业类

默认情况下,应用程序的所有可排队作业都存储在 app/Jobs 目录中。如果 app/Jobs 目录不存在,则在您运行 make:job Artisan 命令时将创建它。

php artisan make:job ProcessPodcast

生成的类将实现 Illuminate\Contracts\Queue\ShouldQueue 接口,指示 Laravel 该作业应被推送到队列以异步运行。

lightbulb

可以使用存根发布来自定义作业存根。

类结构

作业类非常简单,通常只包含一个 handle 方法,该方法在作业被队列处理时调用。首先,让我们看一个示例作业类。在这个例子中,我们假设我们管理一个播客发布服务,并且需要在发布之前处理上传的播客文件。

<?php
 
namespace App\Jobs;
 
use App\Models\Podcast;
use App\Services\AudioProcessor;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
 
class ProcessPodcast implements ShouldQueue
{
use Queueable;
 
/**
* Create a new job instance.
*/
public function __construct(
public Podcast $podcast,
) {}
 
/**
* Execute the job.
*/
public function handle(AudioProcessor $processor): void
{
// Process uploaded podcast...
}
}

在这个例子中,请注意我们可以将一个 Eloquent 模型直接传递到排队作业的构造函数中。由于该作业正在使用 Queueable trait,因此当作业被处理时,Eloquent 模型及其加载的关系将会被优雅地序列化和反序列化。

如果您的排队作业在其构造函数中接受一个 Eloquent 模型,则只有该模型的标识符会被序列化到队列中。当实际处理该作业时,队列系统将自动从数据库中重新检索完整的模型实例及其加载的关系。这种模型序列化的方法允许将更小的作业有效负载发送到您的队列驱动程序。

handle 方法的依赖注入

当作业被队列处理时,将调用 handle 方法。请注意,我们能够在作业的 handle 方法上类型提示依赖项。Laravel 服务容器会自动注入这些依赖项。

如果您想完全控制容器如何将依赖项注入到 handle 方法中,您可以使用容器的 bindMethod 方法。 bindMethod 方法接受一个回调函数,该回调函数接收作业和容器。在回调函数中,您可以随意调用 handle 方法。通常,您应该从 App\Providers\AppServiceProvider 服务提供者boot 方法中调用此方法。

use App\Jobs\ProcessPodcast;
use App\Services\AudioProcessor;
use Illuminate\Contracts\Foundation\Application;
 
$this->app->bindMethod([ProcessPodcast::class, 'handle'], function (ProcessPodcast $job, Application $app) {
return $job->handle($app->make(AudioProcessor::class));
});
exclamation

在将二进制数据(例如原始图像内容)传递到排队作业之前,应通过 base64_encode 函数传递。否则,当作业被放置在队列中时,可能无法正确序列化为 JSON。

排队的关系

因为当作业排队时,所有加载的 Eloquent 模型关系也会被序列化,所以序列化的作业字符串有时会变得非常大。此外,当作业被反序列化并且从数据库中重新检索模型关系时,它们将被完整地检索。在作业排队过程中序列化模型之前应用的任何先前的关系约束在作业反序列化时都不会应用。因此,如果您希望使用给定关系的子集,则应在您的排队作业中重新约束该关系。

或者,为了防止关系被序列化,您可以在设置属性值时调用模型的 withoutRelations 方法。此方法将返回一个没有其加载关系的模型的实例。

/**
* Create a new job instance.
*/
public function __construct(
Podcast $podcast,
) {
$this->podcast = $podcast->withoutRelations();
}

如果您正在使用 PHP 构造函数属性提升,并且想要指示一个 Eloquent 模型不应序列化其关系,则可以使用 WithoutRelations 属性。

use Illuminate\Queue\Attributes\WithoutRelations;
 
/**
* Create a new job instance.
*/
public function __construct(
#[WithoutRelations]
public Podcast $podcast,
) {}

如果一个作业接收到一个 Eloquent 模型的集合或数组,而不是单个模型,则当作业被反序列化和执行时,该集合中的模型将不会恢复其关系。这是为了防止在处理大量模型的作业上过度使用资源。

唯一作业

exclamation

唯一任务需要一个支持的缓存驱动。目前,memcachedredisdynamodbdatabasefilearray 缓存驱动都支持原子锁。此外,唯一任务的约束不适用于批处理中的任务。

有时,你可能希望确保在任何时间点,队列中只有一个特定任务的实例。你可以通过在你的任务类上实现 ShouldBeUnique 接口来实现这一点。此接口不需要你在类上定义任何其他方法。

<?php
 
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUnique;
 
class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
...
}

在上面的示例中,UpdateSearchIndex 任务是唯一的。因此,如果队列中已经存在该任务的另一个实例且尚未完成处理,则不会分派该任务。

在某些情况下,你可能想要定义一个特定的“键”来使任务唯一,或者你可能想要指定一个超时时间,超过该时间后任务不再保持唯一。为了实现这一点,你可以在你的任务类上定义 uniqueIduniqueFor 属性或方法。

<?php
 
use App\Models\Product;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUnique;
 
class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
/**
* The product instance.
*
* @var \App\Product
*/
public $product;
 
/**
* The number of seconds after which the job's unique lock will be released.
*
* @var int
*/
public $uniqueFor = 3600;
 
/**
* Get the unique ID for the job.
*/
public function uniqueId(): string
{
return $this->product->id;
}
}

在上面的示例中,UpdateSearchIndex 任务通过产品 ID 保持唯一。因此,任何具有相同产品 ID 的新任务分派都将被忽略,直到现有任务完成处理。此外,如果现有任务在一小时内未处理完成,则唯一锁将被释放,并且可以将另一个具有相同唯一键的任务分派到队列。

exclamation

如果你的应用程序从多个 Web 服务器或容器分派任务,你应该确保所有服务器都与同一个中央缓存服务器通信,以便 Laravel 可以准确确定任务是否唯一。

保持任务在开始处理前唯一

默认情况下,唯一任务在任务完成处理或所有重试尝试失败后会“解锁”。但是,在某些情况下,你可能希望任务在处理之前立即解锁。为了实现这一点,你的任务应该实现 ShouldBeUniqueUntilProcessing 合约,而不是 ShouldBeUnique 合约。

<?php
 
use App\Models\Product;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUniqueUntilProcessing;
 
class UpdateSearchIndex implements ShouldQueue, ShouldBeUniqueUntilProcessing
{
// ...
}

唯一任务锁

在后台,当分派 ShouldBeUnique 任务时,Laravel 会尝试使用 uniqueId 键获取一个 。如果未获取到锁,则不会分派该任务。当任务完成处理或所有重试尝试失败时,此锁将被释放。默认情况下,Laravel 将使用默认的缓存驱动来获取此锁。但是,如果你希望使用另一个驱动来获取锁,你可以定义一个 uniqueVia 方法,该方法返回应使用的缓存驱动。

use Illuminate\Contracts\Cache\Repository;
use Illuminate\Support\Facades\Cache;
 
class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
...
 
/**
* Get the cache driver for the unique job lock.
*/
public function uniqueVia(): Repository
{
return Cache::driver('redis');
}
}
lightbulb

如果你只需要限制任务的并发处理,请使用 WithoutOverlapping 任务中间件。

加密作业

Laravel 允许你通过 加密来确保任务数据的隐私性和完整性。要开始使用,只需将 ShouldBeEncrypted 接口添加到任务类。一旦将此接口添加到类中,Laravel 将在将你的任务推送到队列之前自动加密你的任务。

<?php
 
use Illuminate\Contracts\Queue\ShouldBeEncrypted;
use Illuminate\Contracts\Queue\ShouldQueue;
 
class UpdateSearchIndex implements ShouldQueue, ShouldBeEncrypted
{
// ...
}

作业中间件

任务中间件允许你将自定义逻辑包装在队列任务的执行周围,从而减少任务本身的样板代码。例如,考虑以下 handle 方法,该方法利用 Laravel 的 Redis 速率限制功能,允许每五秒钟仅处理一个任务。

use Illuminate\Support\Facades\Redis;
 
/**
* Execute the job.
*/
public function handle(): void
{
Redis::throttle('key')->block(0)->allow(1)->every(5)->then(function () {
info('Lock obtained...');
 
// Handle job...
}, function () {
// Could not obtain lock...
 
return $this->release(5);
});
}

虽然这段代码是有效的,但 handle 方法的实现变得嘈杂,因为它被 Redis 速率限制逻辑所杂乱。此外,对于我们想要进行速率限制的任何其他任务,都必须重复此速率限制逻辑。

我们可以定义一个处理速率限制的任务中间件,而不是在 handle 方法中进行速率限制。Laravel 没有任务中间件的默认位置,因此你可以将任务中间件放置在应用程序中的任何位置。在此示例中,我们将中间件放置在 app/Jobs/Middleware 目录中。

<?php
 
namespace App\Jobs\Middleware;
 
use Closure;
use Illuminate\Support\Facades\Redis;
 
class RateLimited
{
/**
* Process the queued job.
*
* @param \Closure(object): void $next
*/
public function handle(object $job, Closure $next): void
{
Redis::throttle('key')
->block(0)->allow(1)->every(5)
->then(function () use ($job, $next) {
// Lock obtained...
 
$next($job);
}, function () use ($job) {
// Could not obtain lock...
 
$job->release(5);
});
}
}

如你所见,与 路由中间件一样,任务中间件接收正在处理的任务和一个回调,该回调应被调用以继续处理任务。

创建任务中间件后,可以通过从任务的 middleware 方法中返回它们来将其附加到任务。此方法不存在于由 make:job Artisan 命令脚手架生成的任务中,因此你需要手动将其添加到你的任务类中。

use App\Jobs\Middleware\RateLimited;
 
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [new RateLimited];
}
lightbulb

任务中间件也可以分配给可排队事件侦听器、邮件和通知。

速率限制

尽管我们刚刚演示了如何编写自己的速率限制任务中间件,但 Laravel 实际上包含一个你可以用来限制任务速率的速率限制中间件。与 路由速率限制器一样,任务速率限制器使用 RateLimiter facade 的 for 方法定义。

例如,你可能希望允许用户每小时备份一次数据,而对高级客户不施加此类限制。为了实现这一点,你可以在 AppServiceProviderboot 方法中定义一个 RateLimiter

use Illuminate\Cache\RateLimiting\Limit;
use Illuminate\Support\Facades\RateLimiter;
 
/**
* Bootstrap any application services.
*/
public function boot(): void
{
RateLimiter::for('backups', function (object $job) {
return $job->user->vipCustomer()
? Limit::none()
: Limit::perHour(1)->by($job->user->id);
});
}

在上面的示例中,我们定义了每小时的速率限制;但是,你可以使用 perMinute 方法轻松定义基于分钟的速率限制。此外,你可以将任何你想要的值传递给速率限制的 by 方法;但是,此值最常用于按客户细分速率限制。

return Limit::perMinute(50)->by($job->user->id);

定义速率限制后,你可以使用 Illuminate\Queue\Middleware\RateLimited 中间件将其附加到你的任务。每次任务超出速率限制时,此中间件都会根据速率限制持续时间将任务释放回队列并进行适当的延迟。

use Illuminate\Queue\Middleware\RateLimited;
 
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [new RateLimited('backups')];
}

将速率限制的任务释放回队列仍会增加任务的 attempts 总数。你可能希望相应地调整任务类上的 triesmaxExceptions 属性。或者,你可能希望使用 retryUntil 方法来定义任务不应再尝试的时间。

如果你不希望任务在被速率限制时重试,可以使用 dontRelease 方法。

/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new RateLimited('backups'))->dontRelease()];
}
lightbulb

如果你正在使用 Redis,可以使用 Illuminate\Queue\Middleware\RateLimitedWithRedis 中间件,该中间件针对 Redis 进行了微调,并且比基本的速率限制中间件更有效。

防止作业重叠

Laravel 包括一个 Illuminate\Queue\Middleware\WithoutOverlapping 中间件,允许你根据任意键来防止任务重叠。当排队的任务正在修改一个应该一次只被一个任务修改的资源时,这会很有用。

例如,假设你有一个排队的任务来更新用户的信用评分,并且你希望防止同一用户 ID 的信用评分更新任务重叠。为了实现这一点,你可以从任务的 middleware 方法返回 WithoutOverlapping 中间件。

use Illuminate\Queue\Middleware\WithoutOverlapping;
 
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [new WithoutOverlapping($this->user->id)];
}

任何相同类型的重叠任务都将被释放回队列。你还可以指定在释放的任务再次尝试之前必须经过的秒数。

/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new WithoutOverlapping($this->order->id))->releaseAfter(60)];
}

如果你希望立即删除任何重叠的任务,使其不会重试,可以使用 dontRelease 方法。

/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new WithoutOverlapping($this->order->id))->dontRelease()];
}

WithoutOverlapping 中间件由 Laravel 的原子锁功能提供支持。有时,你的任务可能会意外失败或超时,导致锁无法释放。因此,你可以使用 expireAfter 方法显式定义锁过期时间。例如,下面的示例将指示 Laravel 在任务开始处理后三分钟释放 WithoutOverlapping 锁。

/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new WithoutOverlapping($this->order->id))->expireAfter(180)];
}
exclamation

WithoutOverlapping 中间件需要支持的缓存驱动。目前,memcachedredisdynamodbdatabasefilearray 缓存驱动支持原子锁。

跨作业类共享锁键

默认情况下,WithoutOverlapping 中间件只会阻止同一类别的作业重叠。因此,即使两个不同的作业类使用相同的锁键,它们也不会被阻止重叠。但是,您可以使用 shared 方法指示 Laravel 将键应用于所有作业类。

use Illuminate\Queue\Middleware\WithoutOverlapping;
 
class ProviderIsDown
{
// ...
 
public function middleware(): array
{
return [
(new WithoutOverlapping("status:{$this->provider}"))->shared(),
];
}
}
 
class ProviderIsUp
{
// ...
 
public function middleware(): array
{
return [
(new WithoutOverlapping("status:{$this->provider}"))->shared(),
];
}
}

限制异常

Laravel 包含一个 Illuminate\Queue\Middleware\ThrottlesExceptions 中间件,允许您限制异常的发生频率。一旦作业抛出给定数量的异常,所有进一步执行该作业的尝试将被延迟,直到指定的时间间隔过去。对于与不稳定的第三方服务交互的作业,此中间件特别有用。

例如,假设有一个队列作业与开始抛出异常的第三方 API 交互。要限制异常,您可以从作业的 middleware 方法返回 ThrottlesExceptions 中间件。通常,此中间件应与实现基于时间的尝试的作业配对使用。

use DateTime;
use Illuminate\Queue\Middleware\ThrottlesExceptions;
 
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [new ThrottlesExceptions(10, 5 * 60)];
}
 
/**
* Determine the time at which the job should timeout.
*/
public function retryUntil(): DateTime
{
return now()->addMinutes(30);
}

中间件接受的第一个构造函数参数是作业在被限制前可以抛出的异常数量,而第二个构造函数参数是在作业被限制后再次尝试之前应该经过的秒数。在上面的代码示例中,如果作业连续抛出 10 个异常,我们将等待 5 分钟后再尝试该作业,并受 30 分钟的时间限制。

当作业抛出异常但尚未达到异常阈值时,作业通常会立即重试。但是,您可以在将中间件附加到作业时通过调用 backoff 方法来指定此类作业应延迟的分钟数。

use Illuminate\Queue\Middleware\ThrottlesExceptions;
 
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new ThrottlesExceptions(10, 5 * 60))->backoff(5)];
}

在内部,此中间件使用 Laravel 的缓存系统来实现速率限制,并且作业的类名用作缓存“键”。您可以在将中间件附加到作业时通过调用 by 方法来覆盖此键。如果您有多个作业与同一第三方服务交互,并且希望它们共享一个通用的限制“桶”,这可能很有用。

use Illuminate\Queue\Middleware\ThrottlesExceptions;
 
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new ThrottlesExceptions(10, 10 * 60))->by('key')];
}

默认情况下,此中间件将限制每个异常。您可以通过在将中间件附加到作业时调用 when 方法来修改此行为。仅当提供给 when 方法的闭包返回 true 时,才会限制异常。

use Illuminate\Http\Client\HttpClientException;
use Illuminate\Queue\Middleware\ThrottlesExceptions;
 
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new ThrottlesExceptions(10, 10 * 60))->when(
fn (Throwable $throwable) => $throwable instanceof HttpClientException
)];
}

如果您希望将受限制的异常报告给应用程序的异常处理程序,您可以通过在将中间件附加到作业时调用 report 方法来实现。可选地,您可以为 report 方法提供一个闭包,并且仅当给定的闭包返回 true 时才会报告异常。

use Illuminate\Http\Client\HttpClientException;
use Illuminate\Queue\Middleware\ThrottlesExceptions;
 
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new ThrottlesExceptions(10, 10 * 60))->report(
fn (Throwable $throwable) => $throwable instanceof HttpClientException
)];
}
lightbulb

如果您正在使用 Redis,您可以使用 Illuminate\Queue\Middleware\ThrottlesExceptionsWithRedis 中间件,该中间件针对 Redis 进行了微调,并且比基本的异常限制中间件更有效。

跳过作业

Skip 中间件允许您指定应跳过/删除作业,而无需修改作业的逻辑。如果给定条件求值为 true,则 Skip::when 方法将删除作业,而如果条件求值为 false,则 Skip::unless 方法将删除作业。

use Illuminate\Queue\Middleware\Skip;
 
/**
* Get the middleware the job should pass through.
*/
public function middleware(): array
{
return [
Skip::when($someCondition),
];
}

您还可以将 Closure 传递给 whenunless 方法以进行更复杂的条件评估。

use Illuminate\Queue\Middleware\Skip;
 
/**
* Get the middleware the job should pass through.
*/
public function middleware(): array
{
return [
Skip::when(function (): bool {
return $this->shouldSkip();
}),
];
}

调度作业

编写完作业类后,可以使用作业本身的 dispatch 方法来分发它。传递给 dispatch 方法的参数将传递给作业的构造函数。

<?php
 
namespace App\Http\Controllers;
 
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
 
class PodcastController extends Controller
{
/**
* Store a new podcast.
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);
 
// ...
 
ProcessPodcast::dispatch($podcast);
 
return redirect('/podcasts');
}
}

如果您想有条件地分发作业,可以使用 dispatchIfdispatchUnless 方法。

ProcessPodcast::dispatchIf($accountActive, $podcast);
 
ProcessPodcast::dispatchUnless($accountSuspended, $podcast);

在新的 Laravel 应用程序中,sync 驱动程序是默认的队列驱动程序。此驱动程序在当前请求的前台中同步执行作业,这在本地开发期间通常很方便。如果您想实际开始将作业排队以进行后台处理,您可以在应用程序的 config/queue.php 配置文件中指定不同的队列驱动程序。

延迟调度

如果您想指定作业不应立即由队列工作程序处理,您可以在分发作业时使用 delay 方法。例如,让我们指定一个作业在分发后 10 分钟内不可用于处理。

<?php
 
namespace App\Http\Controllers;
 
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
 
class PodcastController extends Controller
{
/**
* Store a new podcast.
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);
 
// ...
 
ProcessPodcast::dispatch($podcast)
->delay(now()->addMinutes(10));
 
return redirect('/podcasts');
}
}

在某些情况下,作业可能配置了默认延迟。如果您需要绕过此延迟并立即分发作业进行处理,您可以使用 withoutDelay 方法。

ProcessPodcast::dispatch($podcast)->withoutDelay();
exclamation

Amazon SQS 队列服务的最大延迟时间为 15 分钟。

在将响应发送到浏览器后分发

或者,如果您的 Web 服务器正在使用 FastCGI,则 dispatchAfterResponse 方法会将作业的分发延迟到 HTTP 响应发送到用户浏览器之后。即使排队的作业仍在执行,这仍然允许用户开始使用应用程序。这通常只应用于需要大约一秒钟的作业,例如发送电子邮件。由于它们是在当前的 HTTP 请求中处理的,因此以这种方式分发的作业不需要运行队列工作程序才能进行处理。

use App\Jobs\SendNotification;
 
SendNotification::dispatchAfterResponse();

您还可以 dispatch 一个闭包,并将 afterResponse 方法链接到 dispatch 助手,以便在将 HTTP 响应发送到浏览器后执行一个闭包。

use App\Mail\WelcomeMessage;
use Illuminate\Support\Facades\Mail;
 
dispatch(function () {
Mail::to('[email protected]')->send(new WelcomeMessage);
})->afterResponse();

同步调度

如果您想立即(同步)分发作业,可以使用 dispatchSync 方法。使用此方法时,作业将不会排队,并且将立即在当前进程中执行。

<?php
 
namespace App\Http\Controllers;
 
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
 
class PodcastController extends Controller
{
/**
* Store a new podcast.
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);
 
// Create podcast...
 
ProcessPodcast::dispatchSync($podcast);
 
return redirect('/podcasts');
}
}

作业和数据库事务

虽然在数据库事务中分发作业完全没问题,但您应该特别注意确保您的作业实际上能够成功执行。在事务中分发作业时,作业可能在父事务提交之前由工作程序处理。发生这种情况时,您在数据库事务期间对模型或数据库记录所做的任何更新可能尚未反映在数据库中。此外,在事务中创建的任何模型或数据库记录可能都不存在于数据库中。

值得庆幸的是,Laravel 提供了几种解决此问题的方法。首先,您可以在队列连接的配置数组中设置 after_commit 连接选项。

'redis' => [
'driver' => 'redis',
// ...
'after_commit' => true,
],

after_commit 选项为 true 时,您可以在数据库事务中分发作业;但是,Laravel 将等待,直到打开的父数据库事务已提交,然后再实际分发作业。当然,如果当前没有打开的数据库事务,则会立即分发作业。

如果由于事务期间发生的异常而回滚事务,则在该事务期间分发的作业将被丢弃。

lightbulb

after_commit 配置选项设置为 true 还会导致所有排队的事件侦听器、邮件、通知和广播事件在所有打开的数据库事务都已提交后才分发。

内联指定提交分发行为

如果您没有将 after_commit 队列连接配置选项设置为 true,您仍然可以指示特定作业应在所有打开的数据库事务都已提交后分发。要实现此目的,您可以将 afterCommit 方法链接到您的分发操作。

use App\Jobs\ProcessPodcast;
 
ProcessPodcast::dispatch($podcast)->afterCommit();

同样,如果 after_commit 配置选项设置为 true,您可以指定一个特定的任务应该立即分发,而无需等待任何打开的数据库事务提交。

ProcessPodcast::dispatch($podcast)->beforeCommit();

作业链

任务链允许您指定一个队列任务列表,这些任务应该在主任务成功执行后按顺序运行。如果序列中的一个任务失败,其余的任务将不会运行。要执行一个排队的任务链,您可以使用 Bus facade 提供的 chain 方法。Laravel 的命令总线是一个较低级别的组件,排队任务分发构建在其之上。

use App\Jobs\OptimizePodcast;
use App\Jobs\ProcessPodcast;
use App\Jobs\ReleasePodcast;
use Illuminate\Support\Facades\Bus;
 
Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
new ReleasePodcast,
])->dispatch();

除了链接任务类实例外,您还可以链接闭包。

Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
function () {
Podcast::update(/* ... */);
},
])->dispatch();
exclamation

在任务中使用 $this->delete() 方法删除任务不会阻止链式任务被处理。只有当链中的任务失败时,链才会停止执行。

链连接和队列

如果您想指定链式任务应该使用的连接和队列,您可以使用 onConnectiononQueue 方法。这些方法指定了应该使用的队列连接和队列名称,除非排队的任务被明确分配了不同的连接/队列。

Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
new ReleasePodcast,
])->onConnection('redis')->onQueue('podcasts')->dispatch();

向链添加任务

有时,您可能需要在链中的另一个任务中向现有任务链的前面或后面添加一个任务。您可以使用 prependToChainappendToChain 方法来完成此操作。

/**
* Execute the job.
*/
public function handle(): void
{
// ...
 
// Prepend to the current chain, run job immediately after current job...
$this->prependToChain(new TranscribePodcast);
 
// Append to the current chain, run job at end of chain...
$this->appendToChain(new TranscribePodcast);
}

链失败

在链接任务时,您可以使用 catch 方法指定一个闭包,该闭包应在链中的任务失败时被调用。给定的回调将接收导致任务失败的 Throwable 实例。

use Illuminate\Support\Facades\Bus;
use Throwable;
 
Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
new ReleasePodcast,
])->catch(function (Throwable $e) {
// A job within the chain has failed...
})->dispatch();
exclamation

由于链回调是由 Laravel 队列序列化并在稍后执行的,因此您不应在链回调中使用 $this 变量。

自定义队列和连接

分发到特定队列

通过将任务推送到不同的队列,您可以“分类”您的排队任务,甚至可以优先处理您分配给各个队列的工作人员数量。请记住,这不会将任务推送到您的队列配置文件定义的不同的队列“连接”,而只是推送到单个连接中的特定队列。要指定队列,请在分发任务时使用 onQueue 方法。

<?php
 
namespace App\Http\Controllers;
 
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
 
class PodcastController extends Controller
{
/**
* Store a new podcast.
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);
 
// Create podcast...
 
ProcessPodcast::dispatch($podcast)->onQueue('processing');
 
return redirect('/podcasts');
}
}

或者,您可以在任务的构造函数中调用 onQueue 方法来指定任务的队列。

<?php
 
namespace App\Jobs;
 
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
 
class ProcessPodcast implements ShouldQueue
{
use Queueable;
 
/**
* Create a new job instance.
*/
public function __construct()
{
$this->onQueue('processing');
}
}

分发到特定连接

如果您的应用程序与多个队列连接交互,您可以使用 onConnection 方法指定将任务推送到哪个连接。

<?php
 
namespace App\Http\Controllers;
 
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
 
class PodcastController extends Controller
{
/**
* Store a new podcast.
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);
 
// Create podcast...
 
ProcessPodcast::dispatch($podcast)->onConnection('sqs');
 
return redirect('/podcasts');
}
}

您可以将 onConnectiononQueue 方法链接在一起,以指定任务的连接和队列。

ProcessPodcast::dispatch($podcast)
->onConnection('sqs')
->onQueue('processing');

或者,您可以在任务的构造函数中调用 onConnection 方法来指定任务的连接。

<?php
 
namespace App\Jobs;
 
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
 
class ProcessPodcast implements ShouldQueue
{
use Queueable;
 
/**
* Create a new job instance.
*/
public function __construct()
{
$this->onConnection('sqs');
}
}

指定最大作业尝试次数 / 超时值

最大尝试次数

如果您的某个排队任务遇到错误,您可能不希望它无限期地重试。因此,Laravel 提供了多种方式来指定任务可以尝试的次数或时长。

指定任务可以尝试的最大次数的一种方法是通过 Artisan 命令行上的 --tries 开关。这将应用于工作人员处理的所有任务,除非正在处理的任务指定了它可以尝试的次数。

php artisan queue:work --tries=3

如果任务超过了其最大尝试次数,则该任务将被视为“失败”的任务。有关处理失败任务的更多信息,请参阅失败任务文档。如果为 queue:work 命令提供了 --tries=0,则该任务将无限期地重试。

您可以通过在任务类本身上定义任务可以尝试的最大次数来采取更细粒度的方法。如果在任务上指定了最大尝试次数,它将优先于命令行上提供的 --tries 值。

<?php
 
namespace App\Jobs;
 
class ProcessPodcast implements ShouldQueue
{
/**
* The number of times the job may be attempted.
*
* @var int
*/
public $tries = 5;
}

如果您需要动态控制特定任务的最大尝试次数,您可以在任务上定义一个 tries 方法。

/**
* Determine number of times the job may be attempted.
*/
public function tries(): int
{
return 5;
}

基于时间的尝试

作为定义任务在失败之前可以尝试多少次的替代方法,您可以定义任务应该不再尝试的时间。这允许任务在给定的时间范围内尝试任意次数。要定义任务应该不再尝试的时间,请向您的任务类添加 retryUntil 方法。此方法应返回一个 DateTime 实例。

use DateTime;
 
/**
* Determine the time at which the job should timeout.
*/
public function retryUntil(): DateTime
{
return now()->addMinutes(10);
}
lightbulb

您还可以在您的排队事件监听器上定义 tries 属性或 retryUntil 方法。

最大异常数

有时,您可能希望指定任务可以尝试多次,但如果重试是由给定数量的未处理异常触发(而不是由 release 方法直接释放)则应失败。要实现此目的,您可以在您的任务类上定义 maxExceptions 属性。

<?php
 
namespace App\Jobs;
 
use Illuminate\Support\Facades\Redis;
 
class ProcessPodcast implements ShouldQueue
{
/**
* The number of times the job may be attempted.
*
* @var int
*/
public $tries = 25;
 
/**
* The maximum number of unhandled exceptions to allow before failing.
*
* @var int
*/
public $maxExceptions = 3;
 
/**
* Execute the job.
*/
public function handle(): void
{
Redis::throttle('key')->allow(10)->every(60)->then(function () {
// Lock obtained, process the podcast...
}, function () {
// Unable to obtain lock...
return $this->release(10);
});
}
}

在此示例中,如果应用程序无法获得 Redis 锁,则任务将被释放十秒钟,并且将继续重试最多 25 次。但是,如果任务抛出三个未处理的异常,则该任务将失败。

超时

通常,您大致了解您的排队任务需要多长时间。因此,Laravel 允许您指定一个“超时”值。默认情况下,超时值为 60 秒。如果任务的处理时间超过超时值指定的秒数,则处理该任务的工作人员将以错误退出。通常,工作人员将由服务器上配置的进程管理器自动重启。

可以使用 Artisan 命令行上的 --timeout 开关指定任务可以运行的最大秒数。

php artisan queue:work --timeout=30

如果任务因不断超时而超过其最大尝试次数,则该任务将被标记为失败。

您还可以在任务类本身上定义允许任务运行的最大秒数。如果在任务上指定了超时,它将优先于命令行上指定的任何超时。

<?php
 
namespace App\Jobs;
 
class ProcessPodcast implements ShouldQueue
{
/**
* The number of seconds the job can run before timing out.
*
* @var int
*/
public $timeout = 120;
}

有时,诸如套接字或传出 HTTP 连接之类的 IO 阻塞进程可能不遵守您指定的超时。因此,当使用这些功能时,您还应始终尝试使用它们的 API 指定超时。例如,当使用 Guzzle 时,您应始终指定连接和请求超时值。

exclamation

必须安装 pcntl PHP 扩展才能指定任务超时。此外,任务的“超时”值应始终小于其“重试后”值。否则,任务可能会在实际完成执行或超时之前被重新尝试。

超时时失败

如果您想指示任务在超时时应标记为失败,您可以在任务类上定义 $failOnTimeout 属性。

/**
* Indicate if the job should be marked as failed on timeout.
*
* @var bool
*/
public $failOnTimeout = true;

错误处理

如果作业在处理过程中抛出异常,该作业将自动释放回队列,以便再次尝试。该作业将持续释放,直到尝试次数达到应用程序允许的最大次数。最大尝试次数由在 queue:work Artisan 命令中使用的 --tries 开关定义。或者,最大尝试次数可以在作业类本身中定义。有关运行队列工作程序的更多信息可以在下面找到

手动释放作业

有时您可能希望手动将作业释放回队列,以便稍后再次尝试。您可以通过调用 release 方法来实现此目的

/**
* Execute the job.
*/
public function handle(): void
{
// ...
 
$this->release();
}

默认情况下,release 方法会将作业释放回队列以立即处理。但是,您可以通过将整数或日期实例传递给 release 方法,指示队列在给定秒数过去之前不要使作业可用于处理

$this->release(10);
 
$this->release(now()->addSeconds(10));

手动失败作业

有时您可能需要手动将作业标记为“失败”。为此,您可以调用 fail 方法

/**
* Execute the job.
*/
public function handle(): void
{
// ...
 
$this->fail();
}

如果您想因为捕获到的异常而将作业标记为失败,您可以将异常传递给 fail 方法。或者,为了方便起见,您可以传递一个字符串错误消息,该消息将被转换为异常

$this->fail($exception);
 
$this->fail('Something went wrong.');
lightbulb

有关失败作业的更多信息,请查看有关处理作业失败的文档

作业批处理

Laravel 的作业批处理功能允许您轻松执行一批作业,然后在该批作业执行完成后执行某些操作。在开始之前,您应该创建一个数据库迁移来构建一个表,该表将包含有关您的作业批次的元信息,例如它们的完成百分比。可以使用 make:queue-batches-table Artisan 命令生成此迁移

php artisan make:queue-batches-table
 
php artisan migrate

定义可批处理的作业

要定义可批处理的作业,您应该像平常一样创建可排队的作业;但是,您应该将 Illuminate\Bus\Batchable 特性添加到作业类中。此特性提供了对 batch 方法的访问,该方法可用于检索作业正在其中执行的当前批次

<?php
 
namespace App\Jobs;
 
use Illuminate\Bus\Batchable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
 
class ImportCsv implements ShouldQueue
{
use Batchable, Queueable;
 
/**
* Execute the job.
*/
public function handle(): void
{
if ($this->batch()->cancelled()) {
// Determine if the batch has been cancelled...
 
return;
}
 
// Import a portion of the CSV file...
}
}

调度批处理

要调度一批作业,您应该使用 Bus facade 的 batch 方法。当然,批处理主要在与完成回调结合使用时才有用。因此,您可以使用 thencatchfinally 方法来定义批次的完成回调。每个回调在调用时都会收到一个 Illuminate\Bus\Batch 实例。在这个例子中,我们假设我们正在排队一批作业,每个作业都处理来自 CSV 文件的给定行数

use App\Jobs\ImportCsv;
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;
use Throwable;
 
$batch = Bus::batch([
new ImportCsv(1, 100),
new ImportCsv(101, 200),
new ImportCsv(201, 300),
new ImportCsv(301, 400),
new ImportCsv(401, 500),
])->before(function (Batch $batch) {
// The batch has been created but no jobs have been added...
})->progress(function (Batch $batch) {
// A single job has completed successfully...
})->then(function (Batch $batch) {
// All jobs completed successfully...
})->catch(function (Batch $batch, Throwable $e) {
// First batch job failure detected...
})->finally(function (Batch $batch) {
// The batch has finished executing...
})->dispatch();
 
return $batch->id;

批次的 ID,可以通过 $batch->id 属性访问,可用于查询 Laravel 命令总线,以获取有关已调度批次的信息。

exclamation

由于批处理回调被序列化并在稍后由 Laravel 队列执行,因此您不应在回调中使用 $this 变量。此外,由于批处理作业被包装在数据库事务中,因此不应在作业中执行触发隐式提交的数据库语句。

命名批次

如果批次被命名,某些工具(如 Laravel Horizon 和 Laravel Telescope)可能会为批次提供更友好的调试信息。要为批次分配任意名称,您可以在定义批次时调用 name 方法

$batch = Bus::batch([
// ...
])->then(function (Batch $batch) {
// All jobs completed successfully...
})->name('Import CSV')->dispatch();

批次连接和队列

如果您想指定用于批处理作业的连接和队列,您可以使用 onConnectiononQueue 方法。所有批处理作业必须在相同的连接和队列中执行

$batch = Bus::batch([
// ...
])->then(function (Batch $batch) {
// All jobs completed successfully...
})->onConnection('redis')->onQueue('imports')->dispatch();

链和批处理

您可以通过将链接的作业放置在数组中来定义批次中的一组链式作业。例如,我们可以并行执行两个作业链,并在两个作业链都完成处理后执行回调

use App\Jobs\ReleasePodcast;
use App\Jobs\SendPodcastReleaseNotification;
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;
 
Bus::batch([
[
new ReleasePodcast(1),
new SendPodcastReleaseNotification(1),
],
[
new ReleasePodcast(2),
new SendPodcastReleaseNotification(2),
],
])->then(function (Batch $batch) {
// ...
})->dispatch();

相反,您可以通过在链中定义批次来在中运行批次作业。例如,您可以先运行一批作业来发布多个播客,然后运行一批作业来发送发布通知

use App\Jobs\FlushPodcastCache;
use App\Jobs\ReleasePodcast;
use App\Jobs\SendPodcastReleaseNotification;
use Illuminate\Support\Facades\Bus;
 
Bus::chain([
new FlushPodcastCache,
Bus::batch([
new ReleasePodcast(1),
new ReleasePodcast(2),
]),
Bus::batch([
new SendPodcastReleaseNotification(1),
new SendPodcastReleaseNotification(2),
]),
])->dispatch();

向批处理中添加作业

有时,从批处理作业中向批次添加其他作业可能很有用。当您需要批处理数千个可能需要很长时间才能在 Web 请求期间调度的作业时,此模式非常有用。因此,您可能希望调度一个初始的“加载器”作业批次,以使用更多作业来填充批次

$batch = Bus::batch([
new LoadImportBatch,
new LoadImportBatch,
new LoadImportBatch,
])->then(function (Batch $batch) {
// All jobs completed successfully...
})->name('Import Contacts')->dispatch();

在此示例中,我们将使用 LoadImportBatch 作业来使用其他作业填充批次。为此,我们可以使用批次实例上的 add 方法,该实例可以通过作业的 batch 方法访问

use App\Jobs\ImportContacts;
use Illuminate\Support\Collection;
 
/**
* Execute the job.
*/
public function handle(): void
{
if ($this->batch()->cancelled()) {
return;
}
 
$this->batch()->add(Collection::times(1000, function () {
return new ImportContacts;
}));
}
exclamation

您只能从属于同一批次的作业中向批次添加作业。

检查批处理

提供给批次完成回调的 Illuminate\Bus\Batch 实例具有各种属性和方法,可帮助您与给定的作业批次进行交互和检查

// The UUID of the batch...
$batch->id;
 
// The name of the batch (if applicable)...
$batch->name;
 
// The number of jobs assigned to the batch...
$batch->totalJobs;
 
// The number of jobs that have not been processed by the queue...
$batch->pendingJobs;
 
// The number of jobs that have failed...
$batch->failedJobs;
 
// The number of jobs that have been processed thus far...
$batch->processedJobs();
 
// The completion percentage of the batch (0-100)...
$batch->progress();
 
// Indicates if the batch has finished executing...
$batch->finished();
 
// Cancel the execution of the batch...
$batch->cancel();
 
// Indicates if the batch has been cancelled...
$batch->cancelled();

从路由返回批次

所有 Illuminate\Bus\Batch 实例都是 JSON 可序列化的,这意味着您可以直接从应用程序的路由之一返回它们,以检索包含有关批次信息的 JSON 有效负载,包括其完成进度。这使得在应用程序的 UI 中显示有关批次完成进度的信息非常方便。

要按其 ID 检索批次,您可以使用 Bus facade 的 findBatch 方法

use Illuminate\Support\Facades\Bus;
use Illuminate\Support\Facades\Route;
 
Route::get('/batch/{batchId}', function (string $batchId) {
return Bus::findBatch($batchId);
});

取消批处理

有时您可能需要取消给定批次的执行。可以通过调用 Illuminate\Bus\Batch 实例上的 cancel 方法来实现此目的

/**
* Execute the job.
*/
public function handle(): void
{
if ($this->user->exceedsImportLimit()) {
return $this->batch()->cancel();
}
 
if ($this->batch()->cancelled()) {
return;
}
}

正如您在前面的示例中可能已经注意到的那样,批处理作业通常应确定其相应的批次是否已取消,然后再继续执行。但是,为了方便起见,您可以将 SkipIfBatchCancelled 中间件 分配给作业。顾名思义,此中间件将指示 Laravel 如果其相应的批次已取消,则不要处理该作业

use Illuminate\Queue\Middleware\SkipIfBatchCancelled;
 
/**
* Get the middleware the job should pass through.
*/
public function middleware(): array
{
return [new SkipIfBatchCancelled];
}

批处理失败

当批处理作业失败时,将调用 catch 回调(如果已分配)。此回调仅对批次中失败的第一个作业调用。

允许失败

当批次中的作业失败时,Laravel 会自动将批次标记为“已取消”。如果您愿意,可以禁用此行为,以便作业失败不会自动将批次标记为已取消。这可以通过在调度批次时调用 allowFailures 方法来实现

$batch = Bus::batch([
// ...
])->then(function (Batch $batch) {
// All jobs completed successfully...
})->allowFailures()->dispatch();

重试失败的批处理作业

为了方便起见,Laravel 提供了一个 queue:retry-batch Artisan 命令,可让您轻松重试给定批次的所有失败作业。queue:retry-batch 命令接受应重试其失败作业的批次的 UUID

php artisan queue:retry-batch 32dbc76c-4f82-4749-b610-a639fe0099b5

修剪批处理

如果不进行修剪,job_batches 表可能会非常快速地累积记录。为了缓解这种情况,您应该计划每天运行 queue:prune-batches Artisan 命令

use Illuminate\Support\Facades\Schedule;
 
Schedule::command('queue:prune-batches')->daily();

默认情况下,所有超过 24 小时的已完成批次都将被修剪。您可以在调用命令时使用 hours 选项来确定要保留批次数据的时间。例如,以下命令将删除所有超过 48 小时完成的批次

use Illuminate\Support\Facades\Schedule;
 
Schedule::command('queue:prune-batches --hours=48')->daily();

有时,您的 jobs_batches 表可能会累积一些未成功完成的批处理记录,例如作业失败且未成功重试的批处理记录。您可以使用 unfinished 选项指示 queue:prune-batches 命令修剪这些未完成的批处理记录。

use Illuminate\Support\Facades\Schedule;
 
Schedule::command('queue:prune-batches --hours=48 --unfinished=72')->daily();

同样,您的 jobs_batches 表也可能会累积已取消批处理的记录。您可以使用 cancelled 选项指示 queue:prune-batches 命令修剪这些已取消的批处理记录。

use Illuminate\Support\Facades\Schedule;
 
Schedule::command('queue:prune-batches --hours=48 --cancelled=72')->daily();

在 DynamoDB 中存储批处理

Laravel 还支持将批处理元信息存储在 DynamoDB 中,而不是关系数据库中。但是,您需要手动创建一个 DynamoDB 表来存储所有批处理记录。

通常,此表应命名为 job_batches,但您应根据应用程序 queue 配置文件中 queue.batching.table 配置的值来命名该表。

DynamoDB 批处理表配置

job_batches 表应具有一个名为 application 的字符串主分区键和一个名为 id 的字符串主排序键。键的 application 部分将包含您应用程序的名称,该名称由应用程序 app 配置文件中的 name 配置值定义。由于应用程序名称是 DynamoDB 表键的一部分,因此您可以使用同一个表来存储多个 Laravel 应用程序的作业批处理。

此外,如果您想利用自动批处理修剪,则可以为您的表定义 ttl 属性。

DynamoDB 配置

接下来,安装 AWS SDK,以便您的 Laravel 应用程序可以与 Amazon DynamoDB 通信

composer require aws/aws-sdk-php

然后,将 queue.batching.driver 配置选项的值设置为 dynamodb。此外,您还应在 batching 配置数组中定义 keysecretregion 配置选项。这些选项将用于与 AWS 进行身份验证。使用 dynamodb 驱动程序时,queue.batching.database 配置选项是不必要的。

'batching' => [
'driver' => env('QUEUE_BATCHING_DRIVER', 'dynamodb'),
'key' => env('AWS_ACCESS_KEY_ID'),
'secret' => env('AWS_SECRET_ACCESS_KEY'),
'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
'table' => 'job_batches',
],

在 DynamoDB 中修剪批处理

当使用 DynamoDB 存储作业批处理信息时,用于修剪存储在关系数据库中的批处理的典型修剪命令将不起作用。相反,您可以利用 DynamoDB 的原生 TTL 功能来自动删除旧批处理的记录。

如果您使用 ttl 属性定义了 DynamoDB 表,则可以定义配置参数以指示 Laravel 如何修剪批处理记录。queue.batching.ttl_attribute 配置值定义保存 TTL 的属性的名称,而 queue.batching.ttl 配置值定义自上次更新记录后,批处理记录可以从 DynamoDB 表中删除的秒数。

'batching' => [
'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'),
'key' => env('AWS_ACCESS_KEY_ID'),
'secret' => env('AWS_SECRET_ACCESS_KEY'),
'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
'table' => 'job_batches',
'ttl_attribute' => 'ttl',
'ttl' => 60 * 60 * 24 * 7, // 7 days...
],

排队闭包

除了将作业类分派到队列外,您还可以分派闭包。这对于需要在当前请求周期之外执行的快速、简单任务非常有用。当将闭包分派到队列时,闭包的代码内容会经过加密签名,以便在传输过程中无法修改。

$podcast = App\Podcast::find(1);
 
dispatch(function () use ($podcast) {
$podcast->publish();
});

使用 catch 方法,您可以提供一个闭包,如果排队的闭包在耗尽队列的所有配置的重试尝试后未能成功完成,则应执行该闭包。

use Throwable;
 
dispatch(function () use ($podcast) {
$podcast->publish();
})->catch(function (Throwable $e) {
// This job has failed...
});
exclamation

由于 catch 回调会被序列化并在稍后由 Laravel 队列执行,因此您不应在 catch 回调中使用 $this 变量。

运行队列工作进程

queue:work 命令

Laravel 包含一个 Artisan 命令,该命令将启动一个队列工作进程,并在新作业被推送到队列时对其进行处理。您可以使用 queue:work Artisan 命令运行工作进程。请注意,一旦 queue:work 命令启动,它将继续运行,直到手动停止或关闭终端。

php artisan queue:work
lightbulb

为了使 queue:work 进程在后台永久运行,您应使用进程监视器(如 Supervisor)来确保队列工作进程不会停止运行。

如果您希望在命令输出中包含已处理的作业 ID,则可以在调用 queue:work 命令时包含 -v 标志。

php artisan queue:work -v

请记住,队列工作进程是长期运行的进程,并将已启动的应用程序状态存储在内存中。因此,它们不会注意到在它们启动后代码库中的更改。因此,在您的部署过程中,请务必重新启动您的队列工作进程。此外,请记住,您的应用程序创建或修改的任何静态状态都不会在作业之间自动重置。

或者,您可以运行 queue:listen 命令。当使用 queue:listen 命令时,您不必在要重新加载更新的代码或重置应用程序状态时手动重新启动工作进程;但是,此命令的效率明显低于 queue:work 命令。

php artisan queue:listen

运行多个队列工作进程

要将多个工作进程分配给一个队列并并发处理作业,您只需启动多个 queue:work 进程。这可以通过终端中的多个选项卡在本地完成,或者使用进程管理器的配置设置在生产环境中完成。 当使用 Supervisor 时,您可以使用 numprocs 配置值。

指定连接和队列

您还可以指定工作进程应使用哪个队列连接。传递给 work 命令的连接名称应与您的 config/queue.php 配置文件中定义的连接之一相对应。

php artisan queue:work redis

默认情况下,queue:work 命令仅处理给定连接上默认队列的作业。但是,您可以通过仅处理给定连接的特定队列来进一步自定义队列工作进程。例如,如果您的所有电子邮件都在 redis 队列连接上的 emails 队列中处理,则可以发出以下命令以启动仅处理该队列的工作进程。

php artisan queue:work redis --queue=emails

处理指定数量的作业

可以使用 --once 选项来指示工作进程仅处理队列中的一个作业。

php artisan queue:work --once

可以使用 --max-jobs 选项来指示工作进程处理给定数量的作业然后退出。当与 Supervisor 结合使用时,此选项可能会很有用,这样您的工作进程会在处理给定数量的作业后自动重新启动,从而释放它们可能累积的任何内存。

php artisan queue:work --max-jobs=1000

处理所有排队的作业然后退出

可以使用 --stop-when-empty 选项来指示 worker 处理完所有作业后优雅地退出。当您希望在 Docker 容器内处理 Laravel 队列,并在队列为空后关闭容器时,此选项非常有用。

php artisan queue:work --stop-when-empty

处理指定秒数的作业

可以使用 --max-time 选项来指示 worker 在给定的秒数内处理作业,然后退出。此选项与 Supervisor 结合使用时非常有用,这样您的 worker 在处理一定时间后会自动重启,释放它们可能累积的任何内存。

# Process jobs for one hour and then exit...
php artisan queue:work --max-time=3600

Worker 休眠时长

当队列中有作业可用时,worker 将会持续处理作业,而不会在作业之间有任何延迟。然而,sleep 选项决定了在没有可用作业时,worker 将“休眠”多少秒。当然,在休眠期间,worker 不会处理任何新的作业。

php artisan queue:work --sleep=3

维护模式和队列

当您的应用程序处于维护模式时,将不会处理任何排队的作业。一旦应用程序退出维护模式,这些作业将继续正常处理。

要强制您的队列 worker 即使在启用维护模式的情况下也处理作业,您可以使用 --force 选项。

php artisan queue:work --force

资源注意事项

守护进程队列 worker 在处理每个作业之前不会“重启”框架。因此,您应该在每个作业完成后释放任何占用大量资源的资源。例如,如果您正在使用 GD 库进行图像处理,您应该在完成图像处理后使用 imagedestroy 释放内存。

队列优先级

有时您可能希望优先处理队列。例如,在您的 config/queue.php 配置文件中,您可以将 redis 连接的默认 queue 设置为 low。然而,有时您可能希望将作业推送到 high 优先级队列,如下所示:

dispatch((new Job)->onQueue('high'));

要启动一个 worker,该 worker 会先验证是否处理完 high 队列中的所有作业,然后再继续处理 low 队列中的任何作业,请将以逗号分隔的队列名称列表传递给 work 命令。

php artisan queue:work --queue=high,low

队列工作进程和部署

由于队列 worker 是长期运行的进程,因此如果不重启它们,它们不会注意到代码的变化。因此,使用队列 worker 部署应用程序的最简单方法是在部署过程中重启 worker。您可以通过执行 queue:restart 命令来优雅地重启所有 worker。

php artisan queue:restart

此命令将指示所有队列 worker 在完成当前作业后优雅地退出,从而不会丢失任何现有作业。由于队列 worker 将在执行 queue:restart 命令时退出,因此您应该运行一个进程管理器(例如 Supervisor)来自动重启队列 worker。

lightbulb

队列使用 缓存 来存储重启信号,因此您应该在使用此功能之前验证是否为您的应用程序正确配置了缓存驱动程序。

作业过期和超时

作业过期

在您的 config/queue.php 配置文件中,每个队列连接都定义了一个 retry_after 选项。此选项指定队列连接在重试正在处理的作业之前应等待多少秒。例如,如果 retry_after 的值设置为 90,则如果作业在没有被释放或删除的情况下已经处理了 90 秒,则该作业将被释放回队列。通常,您应该将 retry_after 值设置为您的作业合理完成处理所需的最长时间。

exclamation

唯一不包含 retry_after 值的队列连接是 Amazon SQS。SQS 将根据 AWS 控制台中管理的 默认可见性超时 重试作业。

Worker 超时

queue:work Artisan 命令公开了一个 --timeout 选项。默认情况下,--timeout 的值为 60 秒。如果作业的处理时间超过了超时值指定的秒数,则处理该作业的 worker 将会退出并报错。通常,worker 将会自动被 服务器上配置的进程管理器重启。

php artisan queue:work --timeout=60

retry_after 配置选项和 --timeout CLI 选项是不同的,但它们协同工作以确保作业不会丢失,并且作业只会被成功处理一次。

exclamation

--timeout 值应始终比您的 retry_after 配置值短至少几秒。这将确保处理冻结作业的 worker 总是在作业重试之前终止。如果您的 --timeout 选项比您的 retry_after 配置值长,则您的作业可能会被处理两次。

Supervisor 配置

在生产环境中,您需要一种方法来保持 queue:work 进程的运行。queue:work 进程可能会因各种原因停止运行,例如超出 worker 超时或执行 queue:restart 命令。

因此,您需要配置一个进程监视器,它可以检测到您的 queue:work 进程何时退出并自动重启它们。此外,进程监视器还可以让您指定您希望同时运行多少个 queue:work 进程。Supervisor 是 Linux 环境中常用的进程监视器,我们将在以下文档中讨论如何配置它。

安装 Supervisor

Supervisor 是 Linux 操作系统的一个进程监视器,如果您的 queue:work 进程失败,它会自动重启它们。要在 Ubuntu 上安装 Supervisor,您可以使用以下命令:

sudo apt-get install supervisor
lightbulb

如果您觉得自己配置和管理 Supervisor 过于复杂,请考虑使用 Laravel Forge,它会自动为您的生产 Laravel 项目安装和配置 Supervisor。

配置 Supervisor

Supervisor 配置文件通常存储在 /etc/supervisor/conf.d 目录中。在此目录中,您可以创建任意数量的配置文件,这些配置文件指示 Supervisor 如何监视您的进程。例如,让我们创建一个 laravel-worker.conf 文件来启动和监视 queue:work 进程:

[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3 --max-time=3600
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true
user=forge
numprocs=8
redirect_stderr=true
stdout_logfile=/home/forge/app.com/worker.log
stopwaitsecs=3600

在此示例中,numprocs 指令将指示 Supervisor 运行八个 queue:work 进程并监视所有这些进程,如果它们失败,则会自动重启它们。您应该更改配置的 command 指令,以反映您想要的队列连接和 worker 选项。

exclamation

您应该确保 stopwaitsecs 的值大于您的最长运行作业所消耗的秒数。否则,Supervisor 可能会在作业完成处理之前将其杀死。

启动 Supervisor

创建配置文件后,您可以使用以下命令更新 Supervisor 配置并启动进程:

sudo supervisorctl reread
 
sudo supervisorctl update
 
sudo supervisorctl start "laravel-worker:*"

有关 Supervisor 的更多信息,请查阅 Supervisor 文档

处理失败的作业

有时候你的队列任务会失败。别担心,事情并不总会按计划进行!Laravel 提供了一种便捷的方式来指定任务应该尝试的最大次数。当异步任务超过最大尝试次数后,它将被插入到 failed_jobs 数据库表中。同步调度的任务如果失败,则不会存储在此表中,其异常会立即被应用程序处理。

在新的 Laravel 应用程序中,通常已经存在一个用于创建 failed_jobs 表的迁移文件。但是,如果你的应用程序没有此表的迁移文件,你可以使用 make:queue-failed-table 命令来创建迁移文件。

php artisan make:queue-failed-table
 
php artisan migrate

当运行队列工作进程时,你可以使用 queue:work 命令的 --tries 开关来指定任务应该尝试的最大次数。如果你没有为 --tries 选项指定值,任务将只尝试一次,或者按照任务类的 $tries 属性指定的次数进行尝试。

php artisan queue:work redis --tries=3

使用 --backoff 选项,你可以指定 Laravel 在重试遇到异常的任务之前应该等待多少秒。默认情况下,任务会立即被放回队列,以便它可以再次尝试。

php artisan queue:work redis --tries=3 --backoff=3

如果你想为每个任务配置 Laravel 在重试遇到异常的任务之前应该等待多少秒,你可以在你的任务类中定义一个 backoff 属性。

/**
* The number of seconds to wait before retrying the job.
*
* @var int
*/
public $backoff = 3;

如果需要更复杂的逻辑来确定任务的退避时间,你可以在你的任务类中定义一个 backoff 方法。

/**
* Calculate the number of seconds to wait before retrying the job.
*/
public function backoff(): int
{
return 3;
}

你可以通过从 backoff 方法返回一个退避值数组来轻松配置“指数”退避。在此示例中,第一次重试的延迟将为 1 秒,第二次重试的延迟将为 5 秒,第三次重试的延迟将为 10 秒,如果还有更多尝试次数,则每次后续重试的延迟都将为 10 秒。

/**
* Calculate the number of seconds to wait before retrying the job.
*
* @return array<int, int>
*/
public function backoff(): array
{
return [1, 5, 10];
}

清理失败作业后

当特定任务失败时,你可能需要向你的用户发送警报或回滚该任务部分完成的任何操作。为了实现这一点,你可以在你的任务类中定义一个 failed 方法。导致任务失败的 Throwable 实例将被传递给 failed 方法。

<?php
 
namespace App\Jobs;
 
use App\Models\Podcast;
use App\Services\AudioProcessor;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
use Throwable;
 
class ProcessPodcast implements ShouldQueue
{
use Queueable;
 
/**
* Create a new job instance.
*/
public function __construct(
public Podcast $podcast,
) {}
 
/**
* Execute the job.
*/
public function handle(AudioProcessor $processor): void
{
// Process uploaded podcast...
}
 
/**
* Handle a job failure.
*/
public function failed(?Throwable $exception): void
{
// Send user notification of failure, etc...
}
}
exclamation

在调用 failed 方法之前,会实例化一个新的任务实例;因此,在 handle 方法中可能发生的任何类属性修改都将丢失。

重试失败的作业

要查看已插入到你的 failed_jobs 数据库表中的所有失败任务,你可以使用 queue:failed Artisan 命令。

php artisan queue:failed

queue:failed 命令将列出任务 ID、连接、队列、失败时间以及有关任务的其他信息。任务 ID 可用于重试失败的任务。例如,要重试 ID 为 ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece 的失败任务,请发出以下命令。

php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece

如有必要,你可以将多个 ID 传递给该命令。

php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece 91401d2c-0784-4f43-824c-34f94a33c24d

你也可以重试特定队列的所有失败任务。

php artisan queue:retry --queue=name

要重试所有失败的任务,请执行 queue:retry 命令并将 all 作为 ID 传递。

php artisan queue:retry all

如果你想删除一个失败的任务,你可以使用 queue:forget 命令。

php artisan queue:forget 91401d2c-0784-4f43-824c-34f94a33c24d
lightbulb

当使用 Horizon 时,你应该使用 horizon:forget 命令来删除失败的任务,而不是 queue:forget 命令。

要从 failed_jobs 表中删除所有失败的任务,你可以使用 queue:flush 命令。

php artisan queue:flush

忽略丢失的模型

当将 Eloquent 模型注入到任务中时,该模型会在被放入队列之前自动序列化,并在任务被处理时从数据库中重新检索。但是,如果该模型在任务等待被工作进程处理时被删除,则你的任务可能会失败并抛出 ModelNotFoundException 异常。

为了方便起见,你可以通过将任务的 deleteWhenMissingModels 属性设置为 true 来选择自动删除缺少模型的任务。当此属性设置为 true 时,Laravel 将会静默地丢弃该任务,而不会引发异常。

/**
* Delete the job if its models no longer exist.
*
* @var bool
*/
public $deleteWhenMissingModels = true;

修剪失败的作业

你可以通过调用 queue:prune-failed Artisan 命令来修剪应用程序 failed_jobs 表中的记录。

php artisan queue:prune-failed

默认情况下,所有超过 24 小时的失败任务记录都将被修剪。如果为该命令提供 --hours 选项,则仅保留最近 N 小时内插入的失败任务记录。例如,以下命令将删除所有在 48 小时前插入的失败任务记录。

php artisan queue:prune-failed --hours=48

在 DynamoDB 中存储失败的作业

Laravel 还支持将失败的任务记录存储在 DynamoDB 中,而不是关系数据库表中。但是,你必须手动创建一个 DynamoDB 表来存储所有失败的任务记录。通常,此表应命名为 failed_jobs,但你应该根据应用程序 queue 配置文件中 queue.failed.table 配置的值来命名该表。

failed_jobs 表应具有一个名为 application 的字符串主分区键和一个名为 uuid 的字符串主排序键。键的 application 部分将包含你的应用程序名称,该名称由应用程序 app 配置文件中的 name 配置值定义。由于应用程序名称是 DynamoDB 表键的一部分,因此你可以使用同一个表来存储多个 Laravel 应用程序的失败任务。

此外,请确保安装 AWS SDK,以便你的 Laravel 应用程序可以与 Amazon DynamoDB 通信。

composer require aws/aws-sdk-php

接下来,将 queue.failed.driver 配置选项的值设置为 dynamodb。此外,你应在失败任务配置数组中定义 keysecretregion 配置选项。这些选项将用于 AWS 的身份验证。当使用 dynamodb 驱动程序时,queue.failed.database 配置选项是不必要的。

'failed' => [
'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'),
'key' => env('AWS_ACCESS_KEY_ID'),
'secret' => env('AWS_SECRET_ACCESS_KEY'),
'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
'table' => 'failed_jobs',
],

禁用失败作业存储

你可以通过将 queue.failed.driver 配置选项的值设置为 null 来指示 Laravel 丢弃失败的任务而不存储它们。通常,可以通过 QUEUE_FAILED_DRIVER 环境变量来实现。

QUEUE_FAILED_DRIVER=null

失败的作业事件

如果你想注册一个在任务失败时调用的事件侦听器,你可以使用 Queue Facade 的 failing 方法。例如,我们可以从 Laravel 自带的 AppServiceProviderboot 方法中附加一个闭包到此事件。

<?php
 
namespace App\Providers;
 
use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobFailed;
 
class AppServiceProvider extends ServiceProvider
{
/**
* Register any application services.
*/
public function register(): void
{
// ...
}
 
/**
* Bootstrap any application services.
*/
public function boot(): void
{
Queue::failing(function (JobFailed $event) {
// $event->connectionName
// $event->job
// $event->exception
});
}
}

从队列中清除作业

lightbulb

当使用 Horizon 时,你应该使用 horizon:clear 命令从队列中清除任务,而不是 queue:clear 命令。

如果你想从默认连接的默认队列中删除所有任务,你可以使用 queue:clear Artisan 命令。

php artisan queue:clear

你还可以提供 connection 参数和 queue 选项来删除特定连接和队列中的任务。

php artisan queue:clear redis --queue=emails
exclamation

从队列中清除任务仅适用于 SQS、Redis 和数据库队列驱动程序。此外,SQS 消息删除过程最多需要 60 秒,因此在你清除队列后 60 秒内发送到 SQS 队列的任务也可能会被删除。

监控您的队列

如果你的队列突然接收到大量任务,它可能会不堪重负,导致任务完成的等待时间过长。如果需要,当你的队列任务计数超过指定阈值时,Laravel 可以提醒你。

要开始使用,你应该安排 queue:monitor 命令每分钟运行一次。该命令接受你要监视的队列名称以及你所需的任务计数阈值。

php artisan queue:monitor redis:default,redis:deployments --max=100

仅仅调度此命令不足以触发通知,提醒您队列已超负荷。当命令遇到作业计数超过您阈值的队列时,将分发一个 Illuminate\Queue\Events\QueueBusy 事件。您可以在应用程序的 AppServiceProvider 中监听此事件,以便向您或您的开发团队发送通知。

use App\Notifications\QueueHasLongWaitTime;
use Illuminate\Queue\Events\QueueBusy;
use Illuminate\Support\Facades\Event;
use Illuminate\Support\Facades\Notification;
 
/**
* Bootstrap any application services.
*/
public function boot(): void
{
Event::listen(function (QueueBusy $event) {
Notification::route('mail', '[email protected]')
->notify(new QueueHasLongWaitTime(
$event->connection,
$event->queue,
$event->size
));
});
}

测试

在测试分发作业的代码时,您可能希望指示 Laravel 实际上不执行作业本身,因为作业的代码可以独立于分发它的代码进行直接测试。当然,要测试作业本身,您可以实例化一个作业实例并在测试中直接调用 handle 方法。

您可以使用 Queue facade 的 fake 方法来阻止排队的作业实际推送到队列。调用 Queue facade 的 fake 方法后,您可以断言应用程序尝试将作业推送到队列。

<?php
 
use App\Jobs\AnotherJob;
use App\Jobs\FinalJob;
use App\Jobs\ShipOrder;
use Illuminate\Support\Facades\Queue;
 
test('orders can be shipped', function () {
Queue::fake();
 
// Perform order shipping...
 
// Assert that no jobs were pushed...
Queue::assertNothingPushed();
 
// Assert a job was pushed to a given queue...
Queue::assertPushedOn('queue-name', ShipOrder::class);
 
// Assert a job was pushed twice...
Queue::assertPushed(ShipOrder::class, 2);
 
// Assert a job was not pushed...
Queue::assertNotPushed(AnotherJob::class);
 
// Assert that a Closure was pushed to the queue...
Queue::assertClosurePushed();
 
// Assert the total number of jobs that were pushed...
Queue::assertCount(3);
});
<?php
 
namespace Tests\Feature;
 
use App\Jobs\AnotherJob;
use App\Jobs\FinalJob;
use App\Jobs\ShipOrder;
use Illuminate\Support\Facades\Queue;
use Tests\TestCase;
 
class ExampleTest extends TestCase
{
public function test_orders_can_be_shipped(): void
{
Queue::fake();
 
// Perform order shipping...
 
// Assert that no jobs were pushed...
Queue::assertNothingPushed();
 
// Assert a job was pushed to a given queue...
Queue::assertPushedOn('queue-name', ShipOrder::class);
 
// Assert a job was pushed twice...
Queue::assertPushed(ShipOrder::class, 2);
 
// Assert a job was not pushed...
Queue::assertNotPushed(AnotherJob::class);
 
// Assert that a Closure was pushed to the queue...
Queue::assertClosurePushed();
 
// Assert the total number of jobs that were pushed...
Queue::assertCount(3);
}
}

您可以将一个闭包传递给 assertPushedassertNotPushed 方法,以便断言推送的作业是否通过给定的“真值测试”。如果至少有一个作业通过了给定的真值测试,则断言将成功。

Queue::assertPushed(function (ShipOrder $job) use ($order) {
return $job->order->id === $order->id;
});

伪造作业的子集

如果您只需要模拟特定的作业,同时允许其他作业正常执行,您可以将应该被模拟的作业的类名传递给 fake 方法。

test('orders can be shipped', function () {
Queue::fake([
ShipOrder::class,
]);
 
// Perform order shipping...
 
// Assert a job was pushed twice...
Queue::assertPushed(ShipOrder::class, 2);
});
public function test_orders_can_be_shipped(): void
{
Queue::fake([
ShipOrder::class,
]);
 
// Perform order shipping...
 
// Assert a job was pushed twice...
Queue::assertPushed(ShipOrder::class, 2);
}

您可以使用 except 方法来模拟除一组指定作业之外的所有作业。

Queue::fake()->except([
ShipOrder::class,
]);

测试作业链

要测试作业链,您需要利用 Bus facade 的模拟功能。Bus facade 的 assertChained 方法可用于断言已分发 作业链assertChained 方法接受一个链接作业的数组作为其第一个参数。

use App\Jobs\RecordShipment;
use App\Jobs\ShipOrder;
use App\Jobs\UpdateInventory;
use Illuminate\Support\Facades\Bus;
 
Bus::fake();
 
// ...
 
Bus::assertChained([
ShipOrder::class,
RecordShipment::class,
UpdateInventory::class
]);

正如您在上面的示例中看到的,链接作业的数组可以是作业类名的数组。但是,您也可以提供实际作业实例的数组。这样做时,Laravel 将确保作业实例属于同一类,并且具有与应用程序分发的链接作业相同的属性值。

Bus::assertChained([
new ShipOrder,
new RecordShipment,
new UpdateInventory,
]);

您可以使用 assertDispatchedWithoutChain 方法来断言已推送一个没有作业链的作业。

Bus::assertDispatchedWithoutChain(ShipOrder::class);

测试链修改

如果一个链接作业在现有链中前置或附加作业,您可以使用作业的 assertHasChain 方法来断言该作业具有预期的剩余作业链。

$job = new ProcessPodcast;
 
$job->handle();
 
$job->assertHasChain([
new TranscribePodcast,
new OptimizePodcast,
new ReleasePodcast,
]);

assertDoesntHaveChain 方法可用于断言作业的剩余链为空。

$job->assertDoesntHaveChain();

测试链接的批处理

如果您的作业链包含一批作业,您可以通过在链断言中插入 Bus::chainedBatch 定义来断言链接的批处理是否符合您的期望。

use App\Jobs\ShipOrder;
use App\Jobs\UpdateInventory;
use Illuminate\Bus\PendingBatch;
use Illuminate\Support\Facades\Bus;
 
Bus::assertChained([
new ShipOrder,
Bus::chainedBatch(function (PendingBatch $batch) {
return $batch->jobs->count() === 3;
}),
new UpdateInventory,
]);

测试作业批处理

Bus facade 的 assertBatched 方法可用于断言已分发一批作业。传递给 assertBatched 方法的闭包接收一个 Illuminate\Bus\PendingBatch 的实例,该实例可用于检查批处理中的作业。

use Illuminate\Bus\PendingBatch;
use Illuminate\Support\Facades\Bus;
 
Bus::fake();
 
// ...
 
Bus::assertBatched(function (PendingBatch $batch) {
return $batch->name == 'import-csv' &&
$batch->jobs->count() === 10;
});

您可以使用 assertBatchCount 方法来断言已分发给定数量的批处理。

Bus::assertBatchCount(3);

您可以使用 assertNothingBatched 来断言没有分发任何批处理。

Bus::assertNothingBatched();

测试作业/批处理交互

此外,您有时可能需要测试单个作业与其底层批处理的交互。例如,您可能需要测试一个作业是否取消了其批处理的进一步处理。要实现此目的,您需要通过 withFakeBatch 方法为作业分配一个模拟批处理。withFakeBatch 方法返回一个包含作业实例和模拟批处理的元组。

[$job, $batch] = (new ShipOrder)->withFakeBatch();
 
$job->handle();
 
$this->assertTrue($batch->cancelled());
$this->assertEmpty($batch->added);

测试作业 / 队列交互

有时,您可能需要测试一个排队的作业将其自身释放回队列。或者,您可能需要测试该作业是否已删除自身。您可以通过实例化作业并调用 withFakeQueueInteractions 方法来测试这些队列交互。

一旦作业的队列交互被模拟,您可以在作业上调用 handle 方法。调用作业后,可以使用 assertReleasedassertDeletedassertNotDeletedassertFailedassertNotFailed 方法来对作业的队列交互进行断言。

use App\Jobs\ProcessPodcast;
 
$job = (new ProcessPodcast)->withFakeQueueInteractions();
 
$job->handle();
 
$job->assertReleased(delay: 30);
$job->assertDeleted();
$job->assertNotDeleted();
$job->assertFailed();
$job->assertNotFailed();

作业事件

使用 Queue facade上的 beforeafter 方法,您可以指定在处理排队作业之前或之后执行的回调。这些回调是执行额外日志记录或增加仪表板统计信息的好机会。通常,您应该从服务提供者boot 方法中调用这些方法。例如,我们可以使用 Laravel 自带的 AppServiceProvider

<?php
 
namespace App\Providers;
 
use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;
 
class AppServiceProvider extends ServiceProvider
{
/**
* Register any application services.
*/
public function register(): void
{
// ...
}
 
/**
* Bootstrap any application services.
*/
public function boot(): void
{
Queue::before(function (JobProcessing $event) {
// $event->connectionName
// $event->job
// $event->job->payload()
});
 
Queue::after(function (JobProcessed $event) {
// $event->connectionName
// $event->job
// $event->job->payload()
});
}
}

使用 Queue facade 上的 looping 方法,您可以指定在 worker 尝试从队列中获取作业之前执行的回调。例如,您可以注册一个闭包来回滚先前失败的作业留下的任何事务。

use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Queue;
 
Queue::looping(function () {
while (DB::transactionLevel() > 0) {
DB::rollBack();
}
});