1. Глубже в детали
  2. Очереди

Присоединяйся к нашему Telegram сообществу @webblend!

Здесь ты найдешь сниппеты по Laravel и полезные советы по веб-разработке.

Введение

При создании веб-приложения у вас может возникнуть необходимость выполнять некоторые задачи, такие как разбор и сохранение загруженного файла CSV, которые занимают слишком много времени для выполнения в течение типичного веб-запроса. К счастью, Laravel позволяет легко создавать задачи в очереди, которые могут быть обработаны в фоновом режиме. Перемещая трудоемкие задачи в очередь, ваше приложение может отвечать на веб-запросы с поразительной скоростью и обеспечивать лучший опыт пользователя вашим клиентам.

Очереди Laravel предоставляют унифицированный API для работы с очередями на различных фронтендах очередей, таких как Amazon SQS, Redis или даже реляционная база данных.

Конфигурационные параметры очередей Laravel хранятся в файле конфигурации вашего приложения config/queue.php. В этом файле вы найдете конфигурации подключений для каждого из драйверов очередей, включенных в фреймворк, включая драйверы базы данных, Amazon SQS, Redis и Beanstalkd, а также синхронный драйвер, который будет выполнять задачи немедленно (для использования во время локальной разработки). Также включен драйвер очередей null, который отбрасывает поставленные в очередь задачи.

Примечание Теперь Laravel предлагает Horizon — красивую панель и систему конфигурации для очередей, работающих на Redis. Подробнее ознакомьтесь с документацией по Horizon.

Подключения против очередей

Перед тем как начать работу с очередями Laravel, важно понимать различие между "подключениями" и "очередями". В файле конфигурации config/queue.php вашего приложения есть массив конфигурации connections. Эта опция определяет подключения к фронтальным службам очередей, таким как Amazon SQS, Beanstalk или Redis. Однако каждое конкретное подключение к очереди может иметь несколько "очередей", которые можно рассматривать как разные стеки или кучи поставленных в очередь задач.

Обратите внимание, что каждый пример конфигурации подключения в файле конфигурации queue содержит атрибут queue. Это очередь по умолчанию, на которую будут отправляться задачи, когда они поступают в определенное подключение. Другими словами, если вы отправляете задачу, не определяя явно, на какую очередь ее следует отправить, задача будет помещена в очередь, указанную в атрибуте queue конфигурации подключения:

use App\Jobs\ProcessPodcast;
// Эта задача отправляется в очередь по умолчанию...
ProcessPodcast::dispatch();
// Эта задача отправляется в очередь "emails"...
ProcessPodcast::dispatch()->onQueue('emails');

Некоторым приложениям может не потребоваться постоянно поставлять задачи в несколько очередей, предпочитая вместо этого иметь одну простую очередь. Однако поставка задач в несколько очередей может быть особенно полезной для приложений, которые хотят определять приоритеты или сегментировать обработку задач, поскольку Laravel позволяет вам указать, какие очереди он должен обрабатывать в порядке приоритета. Например, если вы посылаете задачи в очередь high, вы можете запустить рабочий процесс, который будет обрабатывать их с более высоким приоритетом:

php artisan queue:work --queue=high,default

Примечания и предварительные требования драйвера

База данных

Для использования драйвера очереди database вам потребуется таблица в базе данных для хранения задач. Чтобы создать миграцию, создающую эту таблицу, выполните команду Artisan queue:table. После создания миграции вы можете выполнить миграцию базы данных, используя команду migrate:

php artisan queue:table
php artisan migrate

Не забудьте указать вашему приложению использовать драйвер database, обновив переменную QUEUE_CONNECTION в файле .env вашего приложения:

QUEUE_CONNECTION=database

Redis

Для использования драйвера очереди redis настройте соединение с базой данных Redis в файле конфигурации config/database.php.

Внимание Опции Redis serializer и compression не поддерживаются драйвером очереди redis.

Redis Cluster

Если ваше соединение с очередью Redis использует кластер Redis, имена ваших очередей должны содержать хэш-тег ключа. Это необходимо для того, чтобы все ключи Redis для данной очереди попадали в один и тот же хэш-слот:

'redis' => [
'driver' => 'redis',
'connection' => 'default',
'queue' => '{default}',
'retry_after' => 90,
],

Блокировка

При использовании очереди Redis вы можете использовать опцию конфигурации block_for для указания того, как долго драйвер должен ждать, пока задача не станет доступной, прежде чем переходить к циклу рабочего процесса и повторно проверять базу данных Redis.

Настройка этого значения в зависимости от загрузки вашей очереди может быть более эффективной, чем постоянная проверка базы данных Redis на наличие новых задач. Например, вы можете установить значение 5, чтобы указать, что драйвер должен блокироваться на пять секунд, ожидая доступности задачи:

'redis' => [
'driver' => 'redis',
'connection' => 'default',
'queue' => 'default',
'retry_after' => 90,
'block_for' => 5,
],

Внимание Установка block_for в 0 приведет к тому, что рабочие процессы очереди будут блокироваться неопределенно, пока не появится задача. Это также предотвратит обработку сигналов, таких как SIGTERM, пока не будет обработана следующая задача.

Прочие предпосылки для драйверов

Для указанных драйверов очереди требуются следующие зависимости. Эти зависимости можно установить с помощью менеджера пакетов Composer:

  • Amazon SQS: aws/aws-sdk-php ~3.0
  • Beanstalkd: pda/pheanstalk ~4.0
  • Redis: predis/predis ~1.0 или phpredis расширение PHP

Создание задач

Генерация классов задач

По умолчанию все задачи, которые могут быть помещены в очередь для вашего приложения, сохраняются в каталоге app/Jobs. Если каталог app/Jobs не существует, он будет создан при выполнении команды Artisan make:job:

php artisan make:job ProcessPodcast

Созданный класс будет реализовывать интерфейс Illuminate\Contracts\Queue\ShouldQueue, указывая Laravel, что задачу следует поместить в очередь для асинхронного выполнения.

Примечание Заготовки задач можно настроить с помощью публикации заготовок.

Структура класса

Классы задач очень просты, обычно содержат только метод handle, который вызывается при обработке задачи очереди. Давайте рассмотрим пример класса задачи. В этом примере мы будем притворяться, что у нас есть сервис публикации подкастов и нам нужно обработать загруженные файлы подкаста перед их публикацией:

<?php
namespace App\Jobs;
use App\Models\Podcast;
use App\Services\AudioProcessor;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
class ProcessPodcast implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
/**
* Создать новый экземпляр задачи.
*/
public function __construct(
public Podcast $podcast,
) {}
/**
* Выполнить задачу.
*/
public function handle(AudioProcessor $processor): void
{
// Обработать загруженный подкаст...
}
}

В этом примере обратите внимание, что мы смогли передать Eloquent модель непосредственно в конструктор очередной задачи. Из-за трейта SerializesModels, который используется в задаче, Eloquent модели и их загруженные отношения будут грациозно сериализованы и разсериализованы при обработке задачи.

Если ваша задача в очереди принимает Eloquent модель в своем конструкторе, в очередь будет сериализован только идентификатор модели. Когда задача будет обработана, система очередей автоматически извлечет полный экземпляр модели и ее загруженные отношения из базы данных. Этот подход к сериализации модели позволяет отправлять на драйвер очереди гораздо более небольшие нагрузки.

Внедрение зависимости в метод handle

Метод handle вызывается при обработке задачи в очереди. Обратите внимание, что мы можем указать зависимости для метода handle задачи. Сервис-контейнер Laravel автоматически внедряет эти зависимости.

Если вы хотите полностью контролировать внедрение зависимостей в метод handle, вы можете использовать метод bindMethod контейнера. Метод bindMethod принимает обратный вызов, который получает задачу и контейнер. В пределах обратного вызова вы свободны вызывать метод handle так, как вам угодно. Обычно этот метод следует вызывать из метода boot вашего поставщика служб App\Providers\AppServiceProvider:

use App\Jobs\ProcessPodcast;
use App\Services\AudioProcessor;
use Illuminate\Contracts\Foundation\Application;
$this->app->bindMethod([ProcessPodcast::class, 'handle'], function (ProcessPodcast $job, Application $app) {
return $job->handle($app->make(AudioProcessor::class));
});

Внимание Двоичные данные, такие как сырые содержимое изображений, должны передаваться через функцию base64_encode перед передачей в очередную задачу. В противном случае задача может некорректно сериализоваться в JSON при помещении в очередь.

Очередные отношения

Поскольку все загруженные отношения Eloquent модели также сериализуются, когда задача помещается в очередь, сериализованная строка задачи иногда может стать довольно большой. Более того, при десериализации задачи и повторном извлечении отношений модели из базы данных они будут извлечены в полном объеме. Любые предыдущие ограничения отношений, которые были применены до того, как модель была сериализована во время постановки в очередь, не будут применены при десериализации задачи. Поэтому, если вы хотите работать с подмножеством данного отношения, вам следует повторно наложить это отношение в вашей поставленной в очередь задаче.

Или, чтобы предотвратить сериализацию отношений, вы можете вызвать метод withoutRelations на модели при установке значения свойства. Этот метод вернет экземпляр модели без загруженных отношений:

/**
* Создать новый экземпляр задачи.
*/
public function __construct(Podcast $podcast)
{
$this->podcast = $podcast->withoutRelations();
}

Если вы используете автоматическую расстановку свойств конструктора PHP и хотите указать, что Eloquent модель не должна сериализоваться со своими отношениями, вы можете использовать атрибут WithoutRelations:

use Illuminate\Queue\Attributes\WithoutRelations;
/**
* Создать новый экземпляр задачи.
*/
public function __construct(
#[WithoutRelations]
public Podcast $podcast
) {
}

Если задача получает коллекцию или массив Eloquent моделей вместо одной модели, то модели внутри этой коллекции не будут восстанавливать свои отношения при десериализации и выполнении задачи. Это сделано для предотвращения избыточного использования ресурсов в задачах, которые работают с большим количеством моделей.

Уникальные задачи

Внимание Уникальные задачи требуют кэш-драйвера, поддерживающего блокировки. В настоящее время драйверы кэша memcached, redis, dynamodb, database, file и array поддерживают атомарные блокировки. Кроме того, ограничения на уникальные задачи не применяются к задачам внутри пакетов.

Иногда вам может потребоваться обеспечить, чтобы на очереди находился только один экземпляр конкретной задачи в любой момент времени. Вы можете сделать это, реализовав интерфейс ShouldBeUnique в вашем классе задачи. Этот интерфейс не требует от вас определения дополнительных методов в вашем классе:

<?php
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUnique;
class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
...
}

В приведенном выше примере задача UpdateSearchIndex является уникальной. Таким образом, задача не будет отправлена, если другой экземпляр этой задачи уже находится в очереди и еще не завершил обработку.

В некоторых случаях вам может потребоваться определить конкретный «ключ», который делает задачу уникальной, или вы можете хотеть указать тайм-аут, после которого задача больше не остается уникальной. Для этого вы можете определить свойства или методы uniqueId и uniqueFor в вашем классе задачи:

<?php
use App\Models\Product;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUnique;
class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
/**
* Экземпляр продукта.
*
* @var \App\Product
*/
public $product;
/**
* Количество секунд, по истечении которых уникальная блокировка задачи будет освобождена.
*
* @var int
*/
public $uniqueFor = 3600;
/**
* Получить уникальный идентификатор задачи.
*/
public function uniqueId(): string
{
return $this->product->id;
}
}

В приведенном выше примере задача UpdateSearchIndex является уникальной по идентификатору продукта. Таким образом, любые новые отправки задачи с тем же идентификатором продукта будут проигнорированы, пока существующая задача не завершит обработку. Кроме того, если существующая задача не будет обработана в течение одного часа, уникальная блокировка будет снята, и еще одна задача с тем же уникальным ключом сможет быть отправлена в очередь.

Внимание Если ваше приложение отправляет задачи с нескольких веб-серверов или контейнеров, вы должны убедиться, что все ваши серверы взаимодействуют с одним и тем же центральным сервером кэша, чтобы Laravel мог точно определить, уникальна ли задача.

Сохранение уникальности задач до начала обработки

По умолчанию уникальные задачи "разблокируются" после завершения обработки задачи или после неудачных попыток перезапуска. Однако могут возникнуть ситуации, когда вы хотите, чтобы ваша задача разблокировалась непосредственно перед ее обработкой. Для этого ваша задача должна реализовать контракт ShouldBeUniqueUntilProcessing вместо контракта ShouldBeUnique:

<?php
use App\Models\Product;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUniqueUntilProcessing;
class UpdateSearchIndex implements ShouldQueue, ShouldBeUniqueUntilProcessing
{
// ...
}

Уникальные блокировки задач

За кулисами, когда отправляется уникальная задача ShouldBeUnique, Laravel пытается получить блокировку с ключом uniqueId. Если блокировка не может быть получена, задача не отправляется. Эта блокировка освобождается, когда задача завершает обработку или после неудачных попыток перезапуска. По умолчанию Laravel будет использовать кэш-драйвер по умолчанию для получения этой блокировки. Однако, если вы хотите использовать другой драйвер для получения блокировки, вы можете определить метод uniqueVia, который возвращает драйвер кэша, который должен быть использован:

use Illuminate\Contracts\Cache\Repository;
use Illuminate\Support\Facades\Cache;
class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
...
/**
* Получить драйвер кеша для уникальной блокировки задачи.
*/
public function uniqueVia(): Repository
{
return Cache::driver('redis');
}
}

Примечание Если вам нужно ограничить параллельную обработку задачи, просто используйте промежуточное ПО WithoutOverlapping.

Шифрованные задачи

Laravel позволяет обеспечить конфиденциальность и целостность данных задачи с помощью шифрования. Для начала просто добавьте интерфейс ShouldBeEncrypted в класс задачи. Как только этот интерфейс добавлен в класс, Laravel автоматически шифрует вашу задачу перед помещением ее в очередь:

<?php
use Illuminate\Contracts\Queue\ShouldBeEncrypted;
use Illuminate\Contracts\Queue\ShouldQueue;
class UpdateSearchIndex implements ShouldQueue, ShouldBeEncrypted
{
// ...
}

Промежуточное ПО для задач

Middleware для задач позволяют вам обернуть кастомную логику вокруг выполнения задач в очереди, уменьшая шаблонный код в самих задачах. Например, рассмотрим следующий метод handle, который использует функции ограничения Redis в Laravel, чтобы позволить обрабатывать только одну задачу каждые пять секунд:

use Illuminate\Support\Facades\Redis;
/**
* Выполнить задачу.
*/
public function handle(): void
{
Redis::throttle('key')->block(0)->allow(1)->every(5)->then(function () {
info('Lock obtained...');
// Обработать задачу...
}, function () {
// Невозможно получить блокировку...
return $this->release(5);
});
}

Хотя этот код является допустимым, реализация метода handle становится громкой, поскольку он перегружен логикой ограничения скорости Redis. Кроме того, эту логику ограничения скорости необходимо дублировать для любых других задач, которые мы хотим ограничивать скоростью.

Вместо ограничения скорости в методе handle мы могли бы определить middleware для задачи, которое управляет ограничением скорости. Laravel не имеет места по умолчанию для middleware для задач, поэтому вы можете разместить middleware для задач где угодно в своем приложении. В этом примере мы разместим middleware в директории app/Jobs/Middleware:

<?php
namespace App\Jobs\Middleware;
use Closure;
use Illuminate\Support\Facades\Redis;
class RateLimited
{
/**
* Обработать поставленную в очередь задачу.
*
* @param \Closure(object): void $next
*/
public function handle(object $job, Closure $next): void
{
Redis::throttle('key')
->block(0)->allow(1)->every(5)
->then(function () use ($job, $next) {
// Блокировка получена...
$next($job);
}, function () use ($job) {
// Невозможно получить блокировку...
$job->release(5);
});
}
}

Как видите, также как middleware для маршрутов, middleware для задач получают задачу, которая обрабатывается, и обратный вызов, который должен быть вызван для продолжения обработки задачи.

После создания middleware для задач их можно прикрепить к задаче, вернув их из метода middleware задачи. Этот метод не существует в задачах, созданных командой make:job Artisan, поэтому вам нужно вручную добавить его в свой класс задачи:

use App\Jobs\Middleware\RateLimited;
/**
* Получить промежуточное ПО, через которое должна пройти задача.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [new RateLimited];
}

Примечание Промежуточное ПО для задач также может быть присвоено обработчикам событий, отправляемым по очереди, почтовым сообщениям и уведомлениям.

Ограничение по частоте

Несмотря на то, что мы только что продемонстрировали, как написать свою middleware для ограничения скорости задач, Laravel фактически включает middleware для ограничения скорости, которую вы можете использовать для ограничения скорости задач. Как и ограничители скорости для маршрутов, ограничители скорости задач определяются с использованием метода for фасада RateLimiter.

Например, вы можете разрешить пользователям создавать резервные копии своих данных раз в час, не накладывая такого ограничения на премиум-клиентов. Для этого вы можете определить ограничитель скорости в методе boot вашего AppServiceProvider:

use Illuminate\Cache\RateLimiting\Limit;
use Illuminate\Support\Facades\RateLimiter;
/**
* Загрузить любые службы приложения.
*/
public function boot(): void
{
RateLimiter::for('backups', function (object $job) {
return $job->user->vipCustomer()
? Limit::none()
: Limit::perHour(1)->by($job->user->id);
});
}

В приведенном выше примере мы определили ежечасный предел скорости; однако вы можете легко определить предел скорости на основе минут с использованием метода perMinute. Кроме того, вы можете передать любое значение в метод by для ограничения скорости; однако это значение чаще всего используется для сегментации ограничений скорости по клиентам:

return Limit::perMinute(50)->by($job->user->id);

После определения ограничителя скорости вы можете присоединить ограничитель скорости к вашей задаче с использованием middleware Illuminate\Queue\Middleware\RateLimited. В каждый раз, когда задача превышает предел скорости, этот middleware выпустит задачу обратно в очередь с соответствующей задержкой, основанной на длительности ограничения скорости.

use Illuminate\Queue\Middleware\RateLimited;
/**
* Получить промежуточное ПО, через которое должна пройти задача.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [new RateLimited('backups')];
}

Выпускание задачи с ограничением скорости обратно в очередь все равно увеличит общее количество попыток выполнения задачи. Вы можете подстроить свои свойства tries и maxExceptions в соответствии с вашим классом задачи. Или вы можете использовать метод retryUntil, чтобы определить, через какое время задачу больше не нужно пытаться выполнить.

Если вы не хотите, чтобы задача повторно пыталась выполниться, когда на нее наложено ограничение скорости, вы можете использовать метод dontRelease:

/**
* Получить промежуточное ПО, через которое должна пройти задача.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new RateLimited('backups'))->dontRelease()];
}

Примечание Если вы используете Redis, вы можете использовать промежуточное ПО Illuminate\Queue\Middleware\RateLimitedWithRedis, которое настроено специально для Redis и более эффективно, чем базовое промежуточное ПО для ограничения скорости.

Предотвращение перекрытия задач

Laravel включает middleware Illuminate\Queue\Middleware\WithoutOverlapping, которое позволяет вам предотвращать перекрытие задач на основе произвольного ключа. Это может быть полезно, когда задача в очереди изменяет ресурс, который должен изменяться только одним заданием за раз.

Например, предположим, у вас есть задача в очереди, которая обновляет кредитный рейтинг пользователя, и вы хотите предотвратить перекрытие заданий по обновлению кредитного рейтинга для того же идентификатора пользователя. Для этого вы можете вернуть middleware WithoutOverlapping из метода middleware вашей задачи:

use Illuminate\Queue\Middleware\WithoutOverlapping;
/**
* Получить промежуточное ПО, через которое должна пройти задача.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [new WithoutOverlapping($this->user->id)];
}

Любые перекрывающиеся задачи того же типа будут возвращены обратно в очередь. Вы также можете указать количество секунд, которые должны пройти, прежде чем выпущенная задача будет попытана снова:

/**
* Получить промежуточное ПО, через которое должна пройти задача.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new WithoutOverlapping($this->order->id))->releaseAfter(60)];
}

Если вы хотите немедленно удалить все перекрывающиеся задачи, чтобы они не повторялись, вы можете использовать метод dontRelease:

/**
* Получить промежуточное ПО, через которое должна пройти задача.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new WithoutOverlapping($this->order->id))->dontRelease()];
}

Middleware WithoutOverlapping работает на основе функции атомарных блокировок Laravel. Иногда ваша задача может неожиданно завершиться с ошибкой или превысить время выполнения так, что блокировка не освобождается. Поэтому вы можете явно определить время истечения срока блокировки с использованием метода expireAfter. Например, приведенный ниже пример укажет Laravel освободить блокировку WithoutOverlapping через три минуты после начала обработки задачи:

/**
* Получить промежуточное ПО, через которое должна пройти задача.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new WithoutOverlapping($this->order->id))->expireAfter(180)];
}

Внимание Для использования middleware WithoutOverlapping требуется кэш-драйвер, поддерживающий блокировки. В настоящее время драйверы кэша memcached, redis, dynamodb, database, file и array поддерживают атомарные блокировки.

Общие ключи блокировки между классами задач

По умолчанию middleware WithoutOverlapping будет предотвращать перекрытие задач того же класса. Таким образом, хотя два разных класса задач могут использовать один и тот же ключ блокировки, они не будут предотвращены от перекрытия. Однако вы можете указать Laravel применять ключ для классов задач с использованием метода shared:

use Illuminate\Queue\Middleware\WithoutOverlapping;
class ProviderIsDown
{
// ...
public function middleware(): array
{
return [
(new WithoutOverlapping("status:{$this->provider}"))->shared(),
];
}
}
class ProviderIsUp
{
// ...
public function middleware(): array
{
return [
(new WithoutOverlapping("status:{$this->provider}"))->shared(),
];
}
}

Обработка исключений

Laravel включает middleware Illuminate\Queue\Middleware\ThrottlesExceptions, которое позволяет вам ограничивать исключения. После того как задача вызывает заданное количество исключений, все последующие попытки выполнить задачу откладываются до истечения указанного интервала времени. Это middleware особенно полезно для задач, взаимодействующих с сторонними службами, которые нестабильны.

Например, представьте себе задачу в очереди, которая взаимодействует с API стороннего сервиса, начинающего бросать исключения. Для ограничения исключений вы можете вернуть middleware ThrottlesExceptions из метода middleware вашей задачи. Обычно это middleware следует связывать с задачей, реализующей попытки на основе времени:

use DateTime;
use Illuminate\Queue\Middleware\ThrottlesExceptions;
/**
* Получить промежуточное ПО, через которое должна пройти задача.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [new ThrottlesExceptions(10, 5)];
}
/**
* Определить время, когда задача должна завершиться.
*/
public function retryUntil(): DateTime
{
return now()->addMinutes(5);
}

Первый аргумент конструктора, принимаемый middleware, - это количество исключений, которые задача может вызвать, прежде чем ее ограничителят, в то время как второй аргумент конструктора - это количество минут, которое должно пройти, прежде чем задача будет попытана снова после того, как она была ограничена. В приведенном выше примере, если задача вызывает 10 исключений в течение 5 минут, мы подождем 5 минут, прежде чем попытаться выполнить задачу снова.

Когда задача вызывает исключение, но порог исключений еще не достигнут, задача, как правило, будет немедленно повторно пытаться выполниться. Однако вы можете указать количество минут, которое задача должна быть отложена, вызвав метод backoff при добавлении middleware к задаче:

use Illuminate\Queue\Middleware\ThrottlesExceptions;
/**
* Получить промежуточное ПО, через которое должна пройти задача.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new ThrottlesExceptions(10, 5))->backoff(5)];
}

Внутренне это middleware использует систему кэша Laravel для реализации ограничения скорости, и имя класса задачи используется в качестве «ключа» кэша. Вы можете переопределить этот ключ, вызвав метод by при добавлении middleware к вашей задаче. Это может быть полезно, если у вас есть несколько задач, взаимодействующих с одним и тем же сторонним сервисом, и вы хотите, чтобы они использовали общий «ведро» для ограничения скорости:

use Illuminate\Queue\Middleware\ThrottlesExceptions;
/**
* Получить промежуточное ПО, через которое должна пройти задача.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new ThrottlesExceptions(10, 10))->by('key')];
}

Примечание Если вы используете Redis, вы можете использовать промежуточное ПО Illuminate\Queue\Middleware\ThrottlesExceptionsWithRedis, которое настроено специально для Redis и более эффективно, чем базовое промежуточное ПО для ограничения исключений.

Запуск задач

После того как вы написали класс своей задачи, вы можете отправить ее с использованием метода dispatch самой задачи. Аргументы, переданные методу dispatch, будут переданы конструктору задачи:

<?php
namespace App\Http\Controllers;
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* Сохранить новый подкаст.
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);
// ...
ProcessPodcast::dispatch($podcast);
return redirect('/podcasts');
}
}

Если вы хотите условно отправить задачу, вы можете использовать методы dispatchIf и dispatchUnless:

ProcessPodcast::dispatchIf($accountActive, $podcast);
ProcessPodcast::dispatchUnless($accountSuspended, $podcast);

В новых приложениях Laravel драйвер sync является драйвером очереди по умолчанию. Этот драйвер выполняет задачи синхронно в текущем запросе, что часто удобно во время локальной разработки. Если вы хотите начать постановку задач в очередь для фоновой обработки, вы можете указать другой драйвер очереди в файле конфигурации вашего приложения config/queue.php.

Отложенная отправка

Если вы хотите указать, что задача не должна немедленно стать доступной для обработки рабочим процессом очереди, вы можете использовать метод delay при отправке задачи. Например, давайте укажем, что задача не должна быть доступна для обработки в течение 10 минут после ее отправки:

<?php
namespace App\Http\Controllers;
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* Сохранить новый подкаст.
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);
// ...
ProcessPodcast::dispatch($podcast)
->delay(now()->addMinutes(10));
return redirect('/podcasts');
}
}

Внимание У службы очередей Amazon SQS есть максимальное время задержки 15 минут.

Отправка после отправки ответа браузеру

В качестве альтернативы метод dispatchAfterResponse откладывает отправку задачи до тех пор, пока ответ HTTP не будет отправлен браузеру пользователя, если ваш веб-сервер использует FastCGI. Это позволит пользователю начать использовать приложение, даже если задача в очереди все еще выполняется. Это обычно следует использовать только для задач, занимающих примерно секунду, таких как отправка электронной почты. Поскольку они обрабатываются в текущем HTTP-запросе, задачи, отправленные таким образом, не требуют наличия рабочего процесса очереди для их обработки:

use App\Jobs\SendNotification;
SendNotification::dispatchAfterResponse();

Вы также можете dispatch замыкание и добавить метод afterResponse к помощнику dispatch, чтобы выполнить замыкание после отправки HTTP-ответа браузеру:

use App\Mail\WelcomeMessage;
use Illuminate\Support\Facades\Mail;
dispatch(function () {
Mail::to('[email protected]')->send(new WelcomeMessage);
})->afterResponse();

Синхронная отправка

Если вы хотите отправить задачу немедленно (синхронно), вы можете использовать метод dispatchSync. При использовании этого метода задача не будет помещена в очередь и будет выполнена немедленно в текущем процессе:

<?php
namespace App\Http\Controllers;
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* Сохранить новый подкаст.
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);
// Создать подкаст...
ProcessPodcast::dispatchSync($podcast);
return redirect('/podcasts');
}
}

Задачи и транзакции базы данных

Хотя вполне допустимо отправлять задачи в пределах транзакций базы данных, вам следует особенно внимательно следить за тем, чтобы ваша задача действительно могла успешно выполниться. При отправке задачи в пределах транзакции существует возможность того, что задача будет обработана рабочим процессом до того, как родительская транзакция будет зафиксирована. Когда это происходит, любые обновления, внесенные вами в модели или записи базы данных во время транзакции (транзакций), могут еще не отразиться в базе данных. Кроме того, любые модели или записи базы данных, созданные в пределах транзакции (транзакций), могут не существовать в базе данных.

К счастью, Laravel предоставляет несколько методов обхода этой проблемы. Во-первых, вы можете установить параметр подключения after_commit в конфигурационном массиве подключения вашей очереди:

'redis' => [
'driver' => 'redis',
// ...
'after_commit' => true,
],

Когда опция after_commit установлена в true, вы можете отправлять задачи в пределах транзакций базы данных; однако Laravel будет ждать, пока открытые родительские транзакции базы данных не будут зафиксированы, прежде чем фактически отправить задачу. Конечно же, если в настоящее время нет открытых транзакций базы данных, задача будет отправлена немедленно.

Если транзакция откатывается из-за исключения, возникшего во время транзакции, задачи, отправленные во время этой транзакции, будут отброшены.

Примечание Установка параметра after_commit в true также вызовет отправку любых обработчиков событий, отправляемых по очереди, почтовых сообщений, уведомлений и событий трансляции после того, как все транзакции с базой данных будут зафиксированы.

Указание поведения отправки коммита встроенным образом

Если вы не устанавливаете параметр конфигурации подключения after_commit в true, вы все равно можете указать, что конкретную задачу следует отправить после того, как все открытые транзакции базы данных будут зафиксированы. Для этого вы можете добавить метод afterCommit к вашей операции отправки:

use App\Jobs\ProcessPodcast;
ProcessPodcast::dispatch($podcast)->afterCommit();

Точно так же, если параметр конфигурации подключения after_commit установлен в true, вы можете указать, что конкретную задачу следует отправить немедленно, не дожидаясь, пока зафиксируются какие-либо открытые транзакции базы данных:

ProcessPodcast::dispatch($podcast)->beforeCommit();

Цепочка задач

Цепочка задач позволяет указать список задач, поставленных в очередь, которые должны выполняться последовательно после успешного выполнения основной задачи. Если одна из задач в последовательности завершается с ошибкой, остальные задачи не выполняются. Для выполнения цепочки задач в очереди вы можете использовать метод chain, предоставляемый фасадом Bus. Командный автобус Laravel - это компонент более низкого уровня, на котором построена постановка задач в очередь:

use App\Jobs\OptimizePodcast;
use App\Jobs\ProcessPodcast;
use App\Jobs\ReleasePodcast;
use Illuminate\Support\Facades\Bus;
Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
new ReleasePodcast,
])->dispatch();

Помимо цепочки экземпляров классов задач, вы также можете цеплять замыкания:

Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
function () {
Podcast::update(/* ... */);
},
])->dispatch();

Внимание Удаление задач с использованием метода $this->delete() внутри задачи не помешает обработке связанных задач. Цепочка прекратит выполнение только в случае сбоя в одной из задач цепочки.

Цепное подключение и очередь

Если вы хотите указать подключение и очередь, которые должны использоваться для связанных задач, вы можете использовать методы onConnection и onQueue. Эти методы указывают подключение и имя очереди, которые следует использовать, если задаче в очереди явно не назначено другое подключение/очередь:

Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
new ReleasePodcast,
])->onConnection('redis')->onQueue('podcasts')->dispatch();

Сбои цепочки

При цеплении задач вы можете использовать метод catch, чтобы указать замыкание, которое должно быть вызвано, если задача в цепочке завершается ошибкой. Переданное обратное вызов будет получать экземпляр Throwable, вызвавший сбой задачи:

use Illuminate\Support\Facades\Bus;
use Throwable;
Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
new ReleasePodcast,
])->catch(function (Throwable $e) {
// Одна из задач в цепочке завершилась с ошибкой...
})->dispatch();

Внимание Поскольку функции обратного вызова цепочки сериализуются и выполняются позже Laravel-очередью, в этих функциях не следует использовать переменную $this.

Настройка очереди и подключения

Отправка в конкретную очередь

Помещая задачи в различные очереди, вы можете "категоризировать" ваши задачи в очереди и даже определять приоритет, назначая различное количество рабочих процессов для разных очередей. Имейте в виду, что это не помещает задачи в различные "подключения" очередей, как определено в файле конфигурации очереди, а только в конкретные очереди в пределах одного подключения. Для указания очереди используйте метод onQueue при отправке задачи:

<?php
namespace App\Http\Controllers;
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* Сохранить новый подкаст.
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);
// Создать подкаст...
ProcessPodcast::dispatch($podcast)->onQueue('processing');
return redirect('/podcasts');
}
}

Кроме того, вы можете указать очередь задачи, вызвав метод onQueue в конструкторе задачи:

<?php
namespace App\Jobs;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
class ProcessPodcast implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
/**
* Создать новый экземпляр задачи.
*/
public function __construct()
{
$this->onQueue('processing');
}
}

Отправка в конкретное соединение

Если ваше приложение взаимодействует с несколькими подключениями к очереди, вы можете указать, в какое подключение отправить задачу, используя метод onConnection:

<?php
namespace App\Http\Controllers;
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* Сохранить новый подкаст.
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);
// Создать подкаст...
ProcessPodcast::dispatch($podcast)->onConnection('sqs');
return redirect('/podcasts');
}
}

Вы можете объединить методы onConnection и onQueue вместе, чтобы указать подключение и очередь для задачи:

ProcessPodcast::dispatch($podcast)
->onConnection('sqs')
->onQueue('processing');

В качестве альтернативы вы можете указать подключение задачи, вызвав метод onConnection в конструкторе задачи:

<?php
namespace App\Jobs;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
class ProcessPodcast implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
/**
* Создать новый экземпляр задачи.
*/
public function __construct()
{
$this->onConnection('sqs');
}
}

Указание максимального числа попыток / тайм-аута

Максимальное количество попыток

Если одна из ваших задач в очереди сталкивается с ошибкой, вероятно, вы не хотите, чтобы она бесконечно пыталась ее повторить. Поэтому Laravel предоставляет различные способы указания, сколько раз или как долго может выполняться задача.

Один из подходов к указанию максимального количества попыток выполнения задачи — использование параметра --tries в командной строке Artisan. Это применяется ко всем задачам, обрабатываемым воркером, если только задача, которую обрабатывают, не указывает количество попыток выполнения:

php artisan queue:work --tries=3

Если задача превышает максимальное количество попыток, она будет считаться "неудавшейся". Дополнительную информацию о работе с неудавшимися задачами смотрите в документации по сбоям задач. Если для команды queue:work указан параметр --tries=0, задача будет повторяться бесконечно.

Можно выбрать более детальный подход, определив максимальное количество попыток выполнения задачи в самом классе задачи. Если максимальное количество попыток указано в задаче, оно будет иметь приоритет над значением --tries, указанным в командной строке:

<?php
namespace App\Jobs;
class ProcessPodcast implements ShouldQueue
{
/**
* Количество попыток выполнения задачи.
*
* @var int
*/
public $tries = 5;
}

Попытки на основе времени

В качестве альтернативы указанию, сколько раз задача может быть попытана до ее сбоя, вы можете указать время, после которого задачу больше не следует пытаться выполнить. Это позволяет задаче пытаться выполниться любое количество раз в течение заданного времени. Чтобы указать время, после которого задачу больше не следует выполнять, добавьте метод retryUntil в ваш класс задачи. Этот метод должен вернуть экземпляр DateTime:

use DateTime;
/**
* Определите время, когда задача должна завершиться.
*/
public function retryUntil(): DateTime
{
return now()->addMinutes(10);
}

Примечание Вы также можете определить свойство tries или метод retryUntil в ваших обработчиках очереди событий.

Максимальное количество исключений

Иногда вы можете захотеть указать, что задачу можно попробовать много раз, но она должна завершиться неудачей, если повторные попытки вызваны определенным количеством необработанных исключений (в отличие от освобождения методом release напрямую). Для этого вы можете определить свойство maxExceptions в вашем классе задачи:

<?php
namespace App\Jobs;
use Illuminate\Support\Facades\Redis;
class ProcessPodcast implements ShouldQueue
{
/**
* Количество попыток выполнения задачи.
*
* @var int
*/
public $tries = 25;
/**
* Максимальное количество необработанных исключений, разрешенных перед сбоем.
*
* @var int
*/
public $maxExceptions = 3;
/**
* Выполнить задачу.
*/
public function handle(): void
{
Redis::throttle('key')->allow(10)->every(60)->then(function () {
// Блокировка получена, обработка подкаста...
}, function () {
// Невозможно получить блокировку...
return $this->release(10);
});
}
}

В этом примере задача будет освобождена на десять секунд, если приложению не удается получить блокировку Redis, и будет продолжать повторяться до 25 раз. Однако задача завершится неудачей, если три необработанных исключения будут сгенерированы задачей.

Тайм-аут

Часто вы примерно знаете, сколько времени могут занять ваши задачи в очереди. По этой причине Laravel позволяет указать значение "timeout". По умолчанию значение тайм-аута составляет 60 секунд. Если задача обрабатывается дольше, чем указанное значение тайм-аута, рабочий, обрабатывающий задачу, завершится с ошибкой. Обычно рабочий будет автоматически перезапущен менеджером процессов, настроенным на вашем сервере.

Максимальное количество секунд, которые могут выполняться задачи, можно указать с помощью ключа --timeout в командной строке Artisan:

php artisan queue:work --timeout=30

Если задача превышает максимальное количество попыток из-за постоянных сбоев по тайм-ауту, она будет отмечена как неудавшаяся.

Также вы можете указать максимальное количество секунд, в течение которого задача должна выполняться, в самом классе задачи. Если тайм-аут указан в задаче, он будет иметь приоритет перед тайм-аутом, указанным в командной строке:

<?php
namespace App\Jobs;
class ProcessPodcast implements ShouldQueue
{
/**
* Количество секунд, в течение которых задача может выполняться, прежде чем завершится по тайм-ауту.
*
* @var int
*/
public $timeout = 120;
}

Иногда блокирующие процессы ввода-вывода, такие как сокеты или исходящие HTTP-соединения, могут не учитывать указанный вами тайм-аут. Поэтому, используя эти функции, всегда старайтесь указывать тайм-аут с использованием их API. Например, при использовании Guzzle всегда укажите значение тайм-аута для соединения и запроса.

Внимание PHP-расширение pcntl должно быть установлено, чтобы указывать таймаут задачи. Кроме того, значение "timeout" задачи всегда должно быть меньше значения "retry after". В противном случае задача может повторно выполняться до того, как она фактически завершит выполнение или истечет время ожидания.

Отказ при тайм-ауте

Если вы хотите указать, что задача должна быть отмечена как неудавшаяся при тайм-ауте, вы можете определить свойство $failOnTimeout в классе задачи:

/**
* Указать, должна ли задача быть помечена как неудачная при тайм-ауте.
*
* @var bool
*/
public $failOnTimeout = true;

Обработка ошибок

Если исключение возникает во время выполнения задачи, задача автоматически освобождается обратно в очередь, чтобы ее можно было попробовать снова. Задача будет продолжать выпускаться, пока она не будет попытана максимальное количество раз, разрешенное вашим приложением. Максимальное количество попыток определяется ключом --tries, используемым в команде Artisan queue:work. В качестве альтернативы максимальное количество попыток можно определить в самом классе задачи. Дополнительную информацию о запуске рабочего процесса очереди можно найти ниже.

Ручное освобождение задачи

Иногда вам может потребоваться вручную вернуть задачу обратно в очередь, чтобы ее можно было попробовать позже. Вы можете сделать это, вызвав метод release:

/**
* Выполнить задачу.
*/
public function handle(): void
{
// ...
$this->release();
}

По умолчанию метод release будет выпускать задачу обратно в очередь для немедленной обработки. Однако вы можете указать очереди не делать задачу доступной для обработки, пока не пройдет определенное количество секунд, передав целое число или экземпляр даты в метод release:

$this->release(10);
$this->release(now()->addSeconds(10));

Ручной сбой задачи

Иногда вам может потребоваться вручную отметить задачу как "неудавшуюся". Для этого вы можете вызвать метод fail:

/**
* Выполнить задачу.
*/
public function handle(): void
{
// ...
$this->fail();
}

Если вы хотите пометить вашу задачу как неудавшуюся из-за исключения, которое вы поймали, вы можете передать исключение в метод fail. Или, для удобства, вы можете передать строку с сообщением об ошибке, которая будет преобразована в исключение за вас:

$this->fail($exception);
$this->fail('Something went wrong.');

Примечание Дополнительную информацию о неудавшихся задачах можно найти в документации по обработке сбоев задач.

Пакетная обработка задач

Возможность пакетной обработки задач в Laravel позволяет легко выполнять пакет задач, а затем выполнять какие-либо действия после завершения выполнения пакета задач. Прежде чем начать, вам следует создать миграцию базы данных для создания таблицы для хранения метаинформации о ваших пакетах задач, такой как процент завершения. Эту миграцию можно создать с помощью команды Artisan queue:batches-table:

php artisan queue:batches-table
php artisan migrate

Определение задач, подлежащих пакетной обработке

Для определения пакетной задачи вы должны создать задачу, подходящую для помещения в очередь как обычно; однако вы должны добавить трейт Illuminate\Bus\Batchable в класс задачи. Этот трейт предоставляет доступ к методу batch, который можно использовать для получения текущего пакета, в котором выполняется задача:

<?php
namespace App\Jobs;
use Illuminate\Bus\Batchable;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
class ImportCsv implements ShouldQueue
{
use Batchable, Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
/**
* Выполнить задачу.
*/
public function handle(): void
{
if ($this->batch()->cancelled()) {
// Определить, отменен ли пакет...
return;
}
// Импортировать часть файла CSV...
}
}

Отправка пакетов

Для отправки пакета задач вы должны использовать метод batch фасада Bus. Конечно, пакетная обработка в основном полезна в сочетании с обратными вызовами завершения. Поэтому вы можете использовать методы then, catch и finally, чтобы определить обратные вызовы завершения для пакета. Каждый из этих обратных вызовов будет получать экземпляр Illuminate\Bus\Batch, когда они будут вызваны. В этом примере мы будем представлять, что мы отправляем пакет задач, каждая из которых обрабатывает определенное количество строк из CSV-файла:

use App\Jobs\ImportCsv;
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;
use Throwable;
$batch = Bus::batch([
new ImportCsv(1, 100),
new ImportCsv(101, 200),
new ImportCsv(201, 300),
new ImportCsv(301, 400),
new ImportCsv(401, 500),
])->then(function (Batch $batch) {
// Все задачи успешно завершены...
})->catch(function (Batch $batch, Throwable $e) {
// Обнаружена первая ошибка в задаче пакета...
})->finally(function (Batch $batch) {
// Пакет завершил выполнение...
})->dispatch();
return $batch->id;

ID пакета, к которому можно получить доступ через свойство $batch->id, может быть использован для запроса шины команд Laravel для получения информации о пакете после его отправки.

Внимание Поскольку функции обратного вызова партиций сериализуются и выполняются позже Laravel-очередью, в этих функциях не следует использовать переменную $this.

Именование пакетов

Некоторые инструменты, такие как Laravel Horizon и Laravel Telescope, могут предоставлять более удобную отладочную информацию для пакетов, если пакеты имеют имена. Чтобы назначить произвольное имя пакету, вы можете вызвать метод name при определении пакета:

$batch = Bus::batch([
// ...
])->then(function (Batch $batch) {
// Все задачи успешно завершены...
})->name('Import CSV')->dispatch();

Подключение и очередь пакета

Если вы хотите указать соединение и очередь, которые должны использоваться для задач пакета, вы можете использовать методы onConnection и onQueue. Все задачи пакета должны выполняться в одном и том же соединении и очереди:

$batch = Bus::batch([
// ...
])->then(function (Batch $batch) {
// Все задачи успешно завершены...
})->onConnection('redis')->onQueue('imports')->dispatch();

Цепочки и пакеты

Вы можете определить набор цепочек задач внутри пакета, поместив цепочки задач в массив. Например, мы можем выполнить две цепочки задач параллельно и выполнить обратный вызов, когда обе цепочки задач завершат обработку:

use App\Jobs\ReleasePodcast;
use App\Jobs\SendPodcastReleaseNotification;
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;
Bus::batch([
[
new ReleasePodcast(1),
new SendPodcastReleaseNotification(1),
],
[
new ReleasePodcast(2),
new SendPodcastReleaseNotification(2),
],
])->then(function (Batch $batch) {
// ...
})->dispatch();

Наоборот, вы можете выполнять пакеты задач в цепочке, определяя пакеты внутри цепочки. Например, вы можете сначала выполнить пакет задач для выпуска нескольких подкастов, а затем пакет задач для отправки уведомлений о выпуске:

use App\Jobs\FlushPodcastCache;
use App\Jobs\ReleasePodcast;
use App\Jobs\SendPodcastReleaseNotification;
use Illuminate\Support\Facades\Bus;
Bus::chain([
new FlushPodcastCache,
Bus::batch([
new ReleasePodcast(1),
new ReleasePodcast(2),
]),
Bus::batch([
new SendPodcastReleaseNotification(1),
new SendPodcastReleaseNotification(2),
]),
])->dispatch();

Добавление задач в пакеты

Иногда может быть полезно добавлять дополнительные задачи в пакет изнутри пакетной задачи. Этот шаблон может быть полезен, когда вам нужно упаковать тысячи задач, которые могут занять слишком много времени для отправки во время веб-запроса. Поэтому вместо этого вы можете отправить начальный пакет "загрузчик" задач, которые увлекут пакет еще больше задач:

$batch = Bus::batch([
new LoadImportBatch,
new LoadImportBatch,
new LoadImportBatch,
])->then(function (Batch $batch) {
// Все задачи успешно завершены...
})->name('Import Contacts')->dispatch();

В этом примере мы будем использовать задачу LoadImportBatch для увеличения пакета дополнительными задачами. Для достижения этого мы можем использовать метод add на экземпляре пакета, к которому можно получить доступ через метод batch задачи:

use App\Jobs\ImportContacts;
use Illuminate\Support\Collection;
/**
* Выполнить задачу.
*/
public function handle(): void
{
if ($this->batch()->cancelled()) {
return;
}
$this->batch()->add(Collection::times(1000, function () {
return new ImportContacts;
}));
}

Внимание Вы можете добавлять задачи только в пакет изнутри задачи, принадлежащей тому же пакету.

Инспекция пакетов

Экземпляр Illuminate\Bus\Batch, предоставляемый обратным вызовам завершения пакета, имеет различные свойства и методы, которые помогут вам во взаимодействии с исследованием задач в данном пакете:

// UUID пакета...
$batch->id;
// Имя пакета (при наличии)...
$batch->name;
// Количество задач, назначенных пакету...
$batch->totalJobs;
// Количество задач, которые еще не были обработаны очередью...
$batch->pendingJobs;
// Количество задач, завершившихся с ошибкой...
$batch->failedJobs;
// Количество задач, которые были обработаны до сих пор...
$batch->processedJobs();
// Процент завершения выполнения пакета (0-100)...
$batch->progress();
// Указывает, завершилось ли выполнение пакета...
$batch->finished();
// Отменить выполнение пакета...
$batch->cancel();
// Указывает, отменен ли пакет...
$batch->cancelled();

Возврат пакетов из маршрутов

Все экземпляры Illuminate\Bus\Batch могут быть сериализованы в JSON, что означает, что вы можете возвращать их напрямую из одного из маршрутов вашего приложения, чтобы получить JSON-полезную нагрузку, содержащую информацию о пакете, включая прогресс его выполнения. Это удобно для отображения информации о ходе выполнения пакета в пользовательском интерфейсе вашего приложения.

Чтобы получить пакет по его идентификатору, вы можете использовать метод findBatch фасада Bus:

use Illuminate\Support\Facades\Bus;
use Illuminate\Support\Facades\Route;
Route::get('/batch/{batchId}', function (string $batchId) {
return Bus::findBatch($batchId);
});

Отмена пакетов

Иногда вам может потребоваться отменить выполнение определенного пакета. Это можно сделать, вызвав метод cancel на экземпляре Illuminate\Bus\Batch:

/**
* Выполнить задачу.
*/
public function handle(): void
{
if ($this->user->exceedsImportLimit()) {
return $this->batch()->cancel();
}
if ($this->batch()->cancelled()) {
return;
}
}

Как вы могли заметить в предыдущих примерах, пакетные задачи должны обычно определять, отменено ли выполнение их соответствующего пакета, прежде чем продолжить выполнение. Тем не менее, для удобства вы можете назначить промежуточное ПО SkipIfBatchCancelled задаче. Как следует из его названия, это промежуточное ПО настроит Laravel так, чтобы оно не обрабатывало задачу, если ее соответствующий пакет был отменен:

use Illuminate\Queue\Middleware\SkipIfBatchCancelled;
/**
* Получить промежуточное ПО, через которое должна пройти задача.
*/
public function middleware(): array
{
return [new SkipIfBatchCancelled];
}

Сбои пакетов

Когда пакетная задача не удается, вызывается обратный вызов catch (если он был назначен). Этот обратный вызов вызывается только для первой задачи, которая не удалась в пакете.

Разрешение сбоев

Когда задача внутри пакета завершается с ошибкой, Laravel автоматически помечает пакет как «отмененный». Если вы хотите, вы можете отключить это поведение, чтобы сбой задачи не автоматически отмечал пакет как отмененный. Это можно сделать, вызвав метод allowFailures при отправке пакета:

$batch = Bus::batch([
// ...
])->then(function (Batch $batch) {
// Все задачи успешно завершены...
})->allowFailures()->dispatch();

Повтор неудачных пакетных задач

Для удобства Laravel предоставляет команду Artisan queue:retry-batch, которая позволяет легко повторить все неудавшиеся задачи для заданного пакета. Команда queue:retry-batch принимает идентификатор UUID пакета, задачи которого необходимо повторить:

php artisan queue:retry-batch 32dbc76c-4f82-4749-b610-a639fe0099b5

Обрезка пакетов

Без усечения таблицы job_batches она может быстро заполняться записями. Для предотвращения этого вы должны планировать выполнение команды Artisan queue:prune-batches ежедневно:

$schedule->command('queue:prune-batches')->daily();

По умолчанию будут усекаться все завершенные пакеты, которые старше 24 часов. Вы можете использовать параметр hours при вызове команды, чтобы определить, как долго сохранять данные пакета. Например, следующая команда удалит все пакеты, завершенные более 48 часов назад:

$schedule->command('queue:prune-batches --hours=48')->daily();

Иногда ваша таблица jobs_batches может накапливать записи пакетов для пакетов, которые никогда не завершаются успешно, например, для пакетов, где задача завершается с ошибкой, и эта задача никогда не завершается успешно после повторной попытки. Вы можете настроить команду queue:prune-batches для обрезки этих незавершенных записей пакетов с использованием параметра unfinished:

$schedule->command('queue:prune-batches --hours=48 --unfinished=72')->daily();

Точно так же ваша таблица jobs_batches может накапливать записи пакетов для отмененных пакетов. Вы можете настроить команду queue:prune-batches для обрезки этих записей отмененных пакетов с использованием параметра cancelled:

$schedule->command('queue:prune-batches --hours=48 --cancelled=72')->daily();

Закрытие замыканий

Вместо отправки класса задачи в очередь вы также можете отправить замыкание. Это отлично подходит для быстрых, простых задач, которые должны выполняться вне текущего цикла запроса. При отправке замыканий в очередь содержимое замыкания криптографически подписывается, чтобы оно не могло быть изменено в пути:

$podcast = App\Podcast::find(1);
dispatch(function () use ($podcast) {
$podcast->publish();
});

Используя метод catch, вы можете предоставить замыкание, которое должно быть выполнено, если очередное замыкание не удается успешно завершиться после исчерпания всех настроенных попыток повтора в вашей очереди:

use Throwable;
dispatch(function () use ($podcast) {
$podcast->publish();
})->catch(function (Throwable $e) {
// Эта задача завершилась с ошибкой...
});

Внимание Поскольку функции обратного вызова "catch" сериализуются и выполняются позже Laravel-очередью, в этих функциях не следует использовать переменную $this.

Запуск команды queue:work

Команда queue:work

Laravel включает команду Artisan, которая запустит рабочего процесса очереди и будет обрабатывать новые задачи по мере их поступления в очередь. Вы можете запустить рабочего процесса, используя команду Artisan queue:work. Обратите внимание, что после запуска команды queue:work она будет продолжать выполняться, пока ее не остановят вручную или вы не закроете терминал:

php artisan queue:work

Примечание Для постоянного выполнения процесса queue:work в фоновом режиме рекомендуется использовать монитор процессов, такой как Supervisor, чтобы гарантировать, что рабочий процесс очереди не прекратится.

Вы можете включить флаг -v, при вызове команды queue:work, если хотите, чтобы идентификаторы обработанных задач были включены в вывод команды:

php artisan queue:work -v

Помните, что рабочие процессы очереди - это долгоживущие процессы, и они хранят состояние загруженного приложения в памяти. В результате они не будут замечать изменений в вашем кодовой базе после их запуска. Поэтому во время процесса развертывания не забудьте перезапустить ваши рабочие процессы очереди. Кроме того, помните, что любое статическое состояние, созданное или измененное вашим приложением, не будет автоматически сброшено между задачами.

В качестве альтернативы вы можете использовать команду queue:listen. При использовании команды queue:listen вам не нужно вручную перезапускать рабочего процесса, когда вы хотите перезагрузить свой обновленный код или сбросить состояние приложения; однако эта команда значительно менее эффективна, чем команда queue:work:

php artisan queue:listen

Запуск нескольких рабочих очередей

Чтобы назначить несколько рабочих процессов для очереди и обрабатывать задачи параллельно, вы просто должны запустить несколько процессов queue:work. Это можно сделать как локально с использованием нескольких вкладок в вашем терминале, так и в производстве с использованием настроек конфигурации вашего менеджера процессов. При использовании Supervisor вы можете использовать значение конфигурации numprocs.

Указание подключения и очереди

Вы также можете указать, какое подключение к очереди должен использовать рабочий процесс. Имя подключения, переданное команде work, должно соответствовать одному из подключений, определенных в вашем файле конфигурации config/queue.php:

php artisan queue:work redis

По умолчанию команда queue:work обрабатывает только задачи для очереди по умолчанию на заданном подключении. Однако вы можете настроить свой рабочий процесс очень подробно, обрабатывая только определенные очереди для заданного подключения. Например, если все ваши электронные письма обрабатываются в очереди emails на вашем подключении к очереди redis, вы можете выполнить следующую команду, чтобы запустить рабочего процесса, который обрабатывает только эту очередь:

php artisan queue:work redis --queue=emails

Обработка определенного количества задач

Опция --once может использоваться для указания рабочему процессу обработать только одну задачу из очереди:

php artisan queue:work --once

Опция --max-jobs может быть использована для указания работнику обработать заданное количество задач и затем выйти. Эта опция может быть полезна в сочетании с Supervisor, чтобы ваши работники автоматически перезапускались после обработки заданного числа задач, освобождая любую занимаемую ими память:

php artisan queue:work --max-jobs=1000

Обработка всех задач в очереди и выход

Опция --stop-when-empty может быть использована для указания работнику обработать все задачи и затем завершить выполнение. Эта опция может быть полезна при обработке очередей Laravel внутри Docker-контейнера, если вы хотите выключить контейнер после того, как очередь опустеет:

php artisan queue:work --stop-when-empty

Обработка задач в течение указанного количества секунд

Опция --max-time может быть использована для указания работнику обработать задачи в течение заданного количества секунд и затем выйти. Эта опция может быть полезна в сочетании с Supervisor, чтобы ваши работники автоматически перезапускались после обработки задач в течение заданного времени, освобождая любую занимаемую ими память:

# Process jobs for one hour and then exit...
php artisan queue:work --max-time=3600

Длительность сна рабочего

Когда в очереди есть задачи, работник будет продолжать обрабатывать их без задержек между задачами. Однако опция sleep определяет, сколько секунд работник будет "спать", если доступных задач нет. Конечно же, во время сна работник не будет обрабатывать новые задачи:

php artisan queue:work --sleep=3

Соображения о ресурсах

Демон-работники очереди не "перезагружают" фреймворк перед обработкой каждой задачи. Поэтому вы должны освободить любые ресурсы после завершения каждой задачи. Например, если вы выполняете манипуляции с изображением с использованием библиотеки GD, освободите память с помощью imagedestroy, когда закончите обработку изображения.

Приоритеты очереди

Иногда вам может потребоваться установить приоритет обработки очередей. Например, в конфигурационном файле config/queue.php вы можете установить значение queue по умолчанию для вашего соединения redis в low. Тем не менее, иногда вы можете захотеть добавить задачу в очередь с приоритетом high, например так:

dispatch((new Job)->onQueue('high'));

Чтобы запустить работника, который проверяет, что все задачи из очереди high обработаны, прежде чем перейти к каким-либо задачам в очереди low, передайте список имен очередей, разделенных запятыми, в команде work:

php artisan queue:work --queue=high,low

Рабочие очереди и развертывание

Поскольку работники очереди - это долгоживущие процессы, они не замечают изменений в вашем коде без перезапуска. Простейший способ развертывания приложения с использованием работников очереди - перезапуск работников во время процесса развертывания. Вы можете благополучно перезапустить всех работников, выполнив команду queue:restart:

php artisan queue:restart

Эта команда указывает всем работникам очереди грациозно завершить выполнение после завершения обработки текущей задачи, чтобы не потерять существующие задачи. Поскольку работники очереди завершат выполнение при выполнении команды queue:restart, вы должны запускать менеджер процессов, такой как Supervisor, чтобы автоматически перезапускать работников очереди.

Примечание Очередь использует кэш для хранения сигналов о перезапуске, поэтому перед использованием этой функции убедитесь, что драйвер кэша правильно настроен для вашего приложения.

Срок действия и тайм-ауты задач

Истечение срока действия задачи

В конфигурационном файле config/queue.php каждое соединение с очередью определяет опцию retry_after. Эта опция указывает, сколько секунд соединение с очередью должно ждать перед повторной попыткой обработать задачу, которая обрабатывается. Например, если значение retry_after установлено в 90, задача будет возвращена в очередь, если она обрабатывается 90 секунд, не была возвращена или удалена. Обычно вы должны установить значение retry_after равным максимальному времени, которое ваши задачи должны разумно занимать для завершения обработки.

Внимание Единственное подключение очереди, не содержащее значение retry_after, - это Amazon SQS. SQS будет повторно пытаться выполнить задачу на основе таймаута видимости по умолчанию, который управляется в консоли AWS.

Тайм-ауты рабочего

Команда Artisan queue:work предоставляет опцию --timeout. По умолчанию значение --timeout составляет 60 секунд. Если задача обрабатывается дольше, чем указанное значение тайм-аута, работник, обрабатывающий задачу, завершит выполнение с ошибкой. Обычно работник будет автоматически перезапущен менеджером процессов, настроенным на вашем сервере:

php artisan queue:work --timeout=60

Конфигурационная опция retry_after и опция командной строки --timeout различны, но взаимодействуют так, чтобы гарантировать, что задачи не будут потеряны и что задачи будут успешно обработаны только один раз.

Внимание Значение --timeout всегда должно быть хотя бы на несколько секунд короче значения конфигурации retry_after. Это обеспечит завершение работы рабочего, обрабатывающего замороженную задачу, прежде чем задача повторно выполняется. Если ваша опция --timeout дольше, чем значение конфигурации retry_after, ваши задачи могут быть обработаны дважды.

Конфигурация Supervisor

В производственной среде вам нужен способ поддерживать работу ваших процессов queue:work. Процесс queue:work может прекратить выполнение по разным причинам, таким как превышение времени выполнения работы или выполнение команды queue:restart.

По этой причине вам необходимо настроить мониторинг процессов, который сможет обнаруживать выход из строя ваших процессов queue:work и автоматически их перезапускать. Кроме того, мониторы процессов позволяют указать, сколько процессов queue:work вы хотели бы запускать параллельно. Supervisor - это монитор процессов, часто используемый в среде Linux, и мы рассмотрим, как его настроить в следующей документации.

Установка Supervisor

Supervisor - это монитор процессов для операционной системы Linux и автоматически перезапустит ваши процессы queue:work, если они завершатся с ошибкой. Чтобы установить Supervisor в Ubuntu, вы можете воспользоваться следующей командой:

sudo apt-get install supervisor

Примечание Если конфигурация и управление Supervisor кажется вам сложным, рассмотрите возможность использования Laravel Forge, который автоматически устанавливает и настраивает Supervisor для ваших производственных проектов Laravel.

Настройка Supervisor

Конфигурационные файлы Supervisor обычно хранятся в каталоге /etc/supervisor/conf.d. В этом каталоге вы можете создать любое количество файлов конфигурации, которые указывают Supervisor, как следует мониторить ваши процессы. Например, создадим файл laravel-worker.conf, который запускает и мониторит процессы queue:work:

[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3 --max-time=3600
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true
user=forge
numprocs=8
redirect_stderr=true
stdout_logfile=/home/forge/app.com/worker.log
stopwaitsecs=3600

В этом примере директива numprocs указывает Supervisor запустить восемь процессов queue:work и мониторить их все, автоматически перезапуская их в случае сбоя. Вы должны изменить директиву command в конфигурации в соответствии с вашим соединением очереди и параметрами работника.

Внимание Убедитесь, что значение stopwaitsecs больше числа секунд, затраченных на выполнение вашей самой долгой задачи. В противном случае Supervisor может завершить задачу до ее завершения.

Запуск Supervisor

После создания файла конфигурации вы можете обновить конфигурацию Supervisor и запустить процессы с помощью следующих команд:

sudo supervisorctl reread
sudo supervisorctl update
sudo supervisorctl start "laravel-worker:*"

Для получения дополнительной информации о Supervisor обратитесь к документации Supervisor.

Работа с неудавшимися задачами

Иногда ваши отложенные задачи могут завершиться ошибкой. Не волнуйтесь, вещи не всегда проходят гладко! Laravel предоставляет удобный способ указать максимальное количество попыток выполнения задачи. После того как асинхронная задача превысит это количество попыток, она будет добавлена в таблицу базы данных failed_jobs. Синхронно отправляемые задачи, завершившиеся с ошибкой, не сохраняются в этой таблице, и их исключения сразу обрабатываются приложением.

Миграция для создания таблицы failed_jobs обычно уже присутствует в новых приложениях Laravel. Однако, если ваше приложение не содержит миграции для этой таблицы, вы можете использовать команду queue:failed-table для создания миграции:

php artisan queue:failed-table
php artisan migrate

При запуске процесса работника очереди вы можете указать максимальное количество попыток выполнения задачи с помощью параметра --tries в команде queue:work. Если вы не указываете значение для опции --tries, задачи будут пытаться выполниться только один раз или столько раз, сколько указано в свойстве $tries класса задачи:

php artisan queue:work redis --tries=3

С помощью опции --backoff вы можете указать, сколько секунд Laravel должен ждать перед повторной попыткой выполнить задачу, которая вызвала исключение. По умолчанию задача сразу возвращается обратно в очередь для повторной попытки:

php artisan queue:work redis --tries=3 --backoff=3

Если вы хотите настроить, сколько секунд Laravel должен ждать перед повторной попыткой выполнить задачу, вызвавшую исключение, на индивидуальной основе, вы можете сделать это, определив свойство backoff в классе вашей задачи:

/**
* Количество секунд ожидания перед повторной попыткой выполнения задачи.
*
* @var int
*/
public $backoff = 3;

Если вам требуется более сложная логика определения времени отсрочки задачи, вы можете определить метод backoff в классе вашей задачи:

/**
* Рассчитать количество секунд ожидания перед повторной попыткой выполнения задачи.
*/
public function backoff(): int
{
return 3;
}

Вы можете легко настроить "экспоненциальные" отсрочки, возвращая массив значений отсрочки из метода backoff. В этом примере задержка повтора будет 1 секунда для первой попытки, 5 секунд для второй попытки, 10 секунд для третьей попытки и 10 секунд для каждой последующей попытки, если остались еще попытки:

/**
* Рассчитать количество секунд ожидания перед повторной попыткой выполнения задачи.
*
* @return array<int, int>
*/
public function backoff(): array
{
return [1, 5, 10];
}

Очистка после сбоя задач

Когда определенная задача завершится с ошибкой, вы можете хотеть отправить уведомление вашим пользователям или отменить любые действия, которые были частично выполнены задачей. Для этого вы можете определить метод failed в классе вашей задачи. Экземпляр Throwable, вызвавший сбой задачи, будет передан в метод failed:

<?php
namespace App\Jobs;
use App\Models\Podcast;
use App\Services\AudioProcessor;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Throwable;
class ProcessPodcast implements ShouldQueue
{
use InteractsWithQueue, Queueable, SerializesModels;
/**
* Создать новый экземпляр задачи.
*/
public function __construct(
public Podcast $podcast,
) {}
/**
* Выполнить задачу.
*/
public function handle(AudioProcessor $processor): void
{
// Обработать загруженный подкаст...
}
/**
* Обработать сбой задачи.
*/
public function failed(Throwable $exception): void
{
// Отправить уведомление пользователю о сбое и т. д...
}
}

Внимание Перед вызовом метода failed создается новый экземпляр задачи, поэтому любые модификации свойств класса, которые могли произойти в методе handle, будут утеряны.

Повтор неудавшихся задач

Для просмотра всех неудачных задач, вставленных в таблицу базы данных failed_jobs, вы можете использовать команду Artisan queue:failed:

php artisan queue:failed

Команда queue:failed перечислит идентификатор задачи, соединение, очередь, время сбоя и другую информацию о задаче. Идентификатор задачи можно использовать для повторной попытки выполнения неудавшейся задачи. Например, чтобы повторно попробовать выполнить задачу с идентификатором ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece, выполните следующую команду:

php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece

При необходимости вы можете передать несколько идентификаторов в команду:

php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece 91401d2c-0784-4f43-824c-34f94a33c24d

Также вы можете повторно выполнить все неудавшиеся задачи для определенной очереди:

php artisan queue:retry --queue=name

Чтобы повторно выполнить все ваши неудавшиеся задачи, выполните команду queue:retry и передайте all в качестве идентификатора:

php artisan queue:retry all

Если вы хотите удалить неудавшуюся задачу, используйте команду queue:forget:

php artisan queue:forget 91401d2c-0784-4f43-824c-34f94a33c24d

Примечание При использовании Horizon вы должны использовать команду horizon:forget, чтобы удалить неудавшуюся задачу, а не queue:forget.

Для удаления всех неудавшихся задач из таблицы failed_jobs выполните команду queue:flush:

php artisan queue:flush

Игнорирование отсутствующих моделей

При внедрении объекта Eloquent в задачу, модель автоматически сериализуется перед помещением в очередь и заново извлекается из базы данных при обработке задачи. Однако, если модель была удалена, пока задача ожидала обработки работником, задача может завершиться с ошибкой ModelNotFoundException.

Для удобства вы можете автоматически удалять задачи с отсутствующими моделями, установив свойство deleteWhenMissingModels вашей задачи в значение true. Когда это свойство установлено в true, Laravel тихо отклонит задачу без генерации исключения:

/**
* Удалить задачу, если ее модели больше не существуют.
*
* @var bool
*/
public $deleteWhenMissingModels = true;

Обрезка неудавшихся задач

Вы можете уменьшить количество записей в таблице failed_jobs вашего приложения, вызвав команду Artisan queue:prune-failed:

php artisan queue:prune-failed

По умолчанию будут уменьшены все записи о неудавшихся задачах, которые старше 24 часов. Если вы предоставите опцию --hours команде, будут сохранены только записи о неудавшихся задачах, добавленные в течение последних N часов. Например, следующая команда удалит все записи о неудавшихся задачах, добавленные более 48 часов назад:

php artisan queue:prune-failed --hours=48

Хранение неудавшихся задач в DynamoDB

Laravel также поддерживает сохранение записей о неудавшихся задачах в DynamoDB вместо таблицы реляционной базы данных. Однако вам необходимо создать таблицу DynamoDB для хранения всех записей о неудавшихся задачах. Обычно эта таблица должна называться failed_jobs, но вы должны назвать таблицу в соответствии со значением конфигурационной опции queue.failed.table в конфигурационном файле queue вашего приложения.

Таблица failed_jobs должна иметь строковый первичный ключ раздела с именем application и строковый первичный ключ сортировки с именем uuid. Часть application ключа будет содержать имя вашего приложения, как определено значением конфигурационной опции name в конфигурационном файле app вашего приложения. Поскольку имя приложения является частью ключа таблицы DynamoDB, вы можете использовать ту же таблицу для хранения записей о неудавшихся задачах для нескольких приложений Laravel.

Кроме того, убедитесь, что у вас установлен AWS SDK, чтобы ваше приложение Laravel могло взаимодействовать с Amazon DynamoDB:

composer require aws/aws-sdk-php

Затем установите значение опции конфигурации queue.failed.driver в dynamodb. Кроме того, вы должны определить опции конфигурации key, secret и region в массиве конфигурации неудавшихся задач. Эти опции будут использоваться для аутентификации в AWS. При использовании драйвера dynamodb, опция конфигурации queue.failed.database не требуется:

'failed' => [
'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'),
'key' => env('AWS_ACCESS_KEY_ID'),
'secret' => env('AWS_SECRET_ACCESS_KEY'),
'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
'table' => 'failed_jobs',
],

Отключение хранения неудавшихся задач

Вы можете настроить Laravel отбрасывать неудавшиеся задачи без их сохранения, установив значение опции конфигурации queue.failed.driver в null. Обычно это можно сделать с помощью переменной окружения QUEUE_FAILED_DRIVER:

QUEUE_FAILED_DRIVER=null

События неудавшихся задач

Если вы хотите зарегистрировать слушателя событий, который будет вызван при сбое выполнения задачи, вы можете использовать метод failing фасада Queue. Например, вы можете прикрепить замыкание к этому событию из метода boot AppServiceProvider, который включен в Laravel:

<?php
namespace App\Providers;
use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobFailed;
class AppServiceProvider extends ServiceProvider
{
/**
* Зарегистрировать любые службы приложения.
*/
public function register(): void
{
// ...
}
/**
* Загрузить любые службы приложения.
*/
public function boot(): void
{
Queue::failing(function (JobFailed $event) {
// $event->connectionName
// $event->job
// $event->exception
});
}
}

Очистка задач из очередей

Примечание При использовании Horizon вы должны использовать команду horizon:clear для очистки задач из очереди, а не queue:clear.

Если вы хотите удалить все задачи из очереди по умолчанию и из соединения по умолчанию, вы можете использовать команду Artisan queue:clear:

php artisan queue:clear

Вы также можете предоставить аргумент connection и опцию queue для удаления задач из конкретного соединения и очереди:

php artisan queue:clear redis --queue=emails

Внимание Очистка задач из очередей доступна только для драйверов очереди SQS, Redis и базы данных. Кроме того, процесс удаления сообщения SQS занимает до 60 секунд, поэтому задачи, отправленные в очередь SQS в течение 60 секунд после очистки очереди, также могут быть удалены.

Мониторинг ваших очередей

Если ваша очередь получает внезапный наплыв задач, она может быть перегружена, что приведет к долгому времени ожидания завершения задач. Если вы хотите, Laravel может оповестить вас, когда количество задач в вашей очереди превысит определенный порог.

Для начала вам следует запланировать выполнение команды queue:monitor каждую минуту. Команда принимает имена очередей, которые вы хотите отслеживать, а также ваш желаемый порог количества задач:

php artisan queue:monitor redis:default,redis:deployments --max=100

Планирование только этой команды недостаточно для того, чтобы вызвать уведомление о перегруженном состоянии очереди. Когда команда обнаружит очередь, в которой количество задач превышает ваш порог, будет отправлено событие Illuminate\Queue\Events\QueueBusy. Вы можете прослушивать это событие в EventServiceProvider вашего приложения, чтобы отправить уведомление вам или вашей команде разработчиков:

use App\Notifications\QueueHasLongWaitTime;
use Illuminate\Queue\Events\QueueBusy;
use Illuminate\Support\Facades\Event;
use Illuminate\Support\Facades\Notification;
/**
* Зарегистрировать любые другие события для вашего приложения.
*/
public function boot(): void
{
Event::listen(function (QueueBusy $event) {
Notification::route('mail', '[email protected]')
->notify(new QueueHasLongWaitTime(
$event->connection,
$event->queue,
$event->size
));
});
}

Тестирование

При тестировании кода, который отправляет задачи в очередь, вы можете захотеть указать Laravel не выполнять саму задачу, поскольку код задачи можно тестировать напрямую и независимо от кода, который ее отправляет. Конечно же, чтобы протестировать саму задачу, вы можете создать экземпляр задачи и вызвать метод handle напрямую в вашем тесте.

Вы можете использовать метод fake фасада Queue, чтобы предотвратить фактическое помещение задач в очередь. После вызова метода fake фасада Queue, вы можете утверждать, что приложение пыталось поместить задачи в очередь:

<?php
namespace Tests\Feature;
use App\Jobs\AnotherJob;
use App\Jobs\FinalJob;
use App\Jobs\ShipOrder;
use Illuminate\Support\Facades\Queue;
use Tests\TestCase;
class ExampleTest extends TestCase
{
public function test_orders_can_be_shipped(): void
{
Queue::fake();
// Выполнить доставку заказа...
// Утвердить, что задачи не были помещены в очередь...
Queue::assertNothingPushed();
// Утвердить, что задача была помещена в указанную очередь...
Queue::assertPushedOn('queue-name', ShipOrder::class);
// Утвердить, что задача была помещена дважды...
Queue::assertPushed(ShipOrder::class, 2);
// Утвердить, что задача не была помещена...
Queue::assertNotPushed(AnotherJob::class);
// Утвердить, что в очередь было помещено замыкание...
Queue::assertClosurePushed();
}
}

Вы можете передать замыкание методам assertPushed или assertNotPushed, чтобы утверждать, что была добавлена задача, которая проходит определенное «тестирование истинности». Если была добавлена хотя бы одна задача, которая проходит заданное тестирование, утверждение будет успешным:

Queue::assertPushed(function (ShipOrder $job) use ($order) {
return $job->order->id === $order->id;
});

Симуляция подмножества задач

Если вам нужно фиксировать только определенные задачи, разрешив выполнение других ваших задач, вы можете передать имена классов задач, которые должны быть фиксированы, методу fake:

public function test_orders_can_be_shipped(): void
{
Queue::fake([
ShipOrder::class,
]);
// Выполнить доставку заказа...
// Утвердить, что задача была помещена дважды...
Queue::assertPushed(ShipOrder::class, 2);
}

Вы можете фиксировать все задачи, кроме определенного набора задач, с использованием метода except:

Queue::fake()->except([
ShipOrder::class,
]);

Тестирование цепочек задач

Для тестирования цепочек задач вам потребуется использовать возможности фасада Bus для фиксации. Метод assertChained фасада Bus может использоваться для проверки, что была отправлена цепочка задач. Метод assertChained принимает массив связанных задач в качестве своего первого аргумента:

use App\Jobs\RecordShipment;
use App\Jobs\ShipOrder;
use App\Jobs\UpdateInventory;
use Illuminate\Support\Facades\Bus;
Bus::fake();
// ...
Bus::assertChained([
ShipOrder::class,
RecordShipment::class,
UpdateInventory::class
]);

Как видно из приведенного выше примера, массив связанных задач может быть массивом имен классов задач. Однако вы также можете предоставить массив фактических экземпляров задач. При этом Laravel гарантирует, что экземпляры задач будут того же класса и будут иметь те же значения свойств, что и связанные задачи, отправленные вашим приложением:

Bus::assertChained([
new ShipOrder,
new RecordShipment,
new UpdateInventory,
]);

Вы можете использовать метод assertDispatchedWithoutChain для проверки, что задача была добавлена без цепочки задач:

Bus::assertDispatchedWithoutChain(ShipOrder::class);

Тестирование пакетов задач

Метод assertBatched фасада Bus может использоваться для проверки, что была отправлена партия задач. Замыкание, переданное методу assertBatched, получает экземпляр Illuminate\Bus\PendingBatch, который можно использовать для проверки задач в партии:

use Illuminate\Bus\PendingBatch;
use Illuminate\Support\Facades\Bus;
Bus::fake();
// ...
Bus::assertBatched(function (PendingBatch $batch) {
return $batch->name == 'import-csv' &&
$batch->jobs->count() === 10;
});

Вы можете использовать метод assertBatchCount для проверки того, что заданное количество партий было отправлено:

Bus::assertBatchCount(3);

Вы можете использовать assertNothingBatched для проверки того, что партии не были отправлены:

Bus::assertNothingBatched();

Тестирование взаимодействия задачи / пакета

Иногда вам может потребоваться протестировать взаимодействие отдельной задачи с ее базовой партией. Например, вам может понадобиться проверить, отменила ли задача дальнейшую обработку ее партии. Для этого вам нужно назначить фиктивную партию задаче с помощью метода withFakeBatch. Метод withFakeBatch возвращает кортеж, содержащий экземпляр задачи и фиктивную партию:

[$job, $batch] = (new ShipOrder)->withFakeBatch();
$job->handle();
$this->assertTrue($batch->cancelled());
$this->assertEmpty($batch->added);

События задачи

Используя методы before и after фасада Queue, вы можете указать обратные вызовы, которые будут выполнены перед или после обработки задачи в очереди. Эти обратные вызовы предоставляют отличную возможность для выполнения дополнительного логирования или увеличения статистики для панели мониторинга. Обычно эти методы следует вызывать из метода boot поставщика служб. Например, мы можем использовать поставщик служб AppServiceProvider, который включен в Laravel:

<?php
namespace App\Providers;
use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;
class AppServiceProvider extends ServiceProvider
{
/**
* Зарегистрировать любые службы приложения.
*/
public function register(): void
{
// ...
}
/**
* Загрузить любые службы приложения.
*/
public function boot(): void
{
Queue::before(function (JobProcessing $event) {
// $event->connectionName
// $event->job
// $event->job->payload()
});
Queue::after(function (JobProcessed $event) {
// $event->connectionName
// $event->job
// $event->job->payload()
});
}
}

Используя метод looping фасада Queue, вы можете указать обратные вызовы, которые выполняются перед тем, как рабочий процесс попытается извлечь задачу из очереди. Например, вы можете зарегистрировать замыкание для отката транзакций, которые остались открытыми после неудачного выполнения предыдущей задачи:

use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Queue;
Queue::looping(function () {
while (DB::transactionLevel() > 0) {
DB::rollBack();
}
});