Очереди и задачи в Masonite
Masonite поставляется с мощной системой очередей. Эта функция полезна для выполнения тех задач, которые требуют времени. Например, отправка электронных писем, обработка видео, создание счетов, обновление записей и всё остальное, что вам не нужно, чтобы пользователи ждали.
Во-первых, задачи создаются с логикой, необходимой для запуска задачи. Затем задачи "помещаются" в очередь, где они будут выполняться позже с использованием "работников очереди" (workers). Вы можете указать столько worker`ов, сколько может запустить ваш сервер.
Помимо выполнения задачи, некоторые драйверы позволяют отслеживать любые задачи, которые не выполняются. Данные задачи будут сохранены в базе данных, где их можно будет отслеживать и при необходимости повторно запускать.
Конфигурация
Вы можете легко изменить поведение вашей системы очередей, изменив конфигурацию очереди:
Доступные драйверы очередей: async
, database
и amqp
.
Полная конфигурация очереди будет выглядеть так:
Очередь по умолчанию
Ключ default
используется для указания того, какой драйвер очереди в конфигурации использовать
по умолчанию. Это должно быть то же значение, что и один из других ключей в словаре конфигурации.
Драйвер базы данных
Чтобы использовать драйвер базы данных, вы должны сначала создать таблицу задач:
Команда создаст файл миграции в вашем каталоге migrations.Если вы хотите сохранить провалившиеся задания, вы также должны создать миграцию:
Затем выполнить миграции:
Драйвер БД используется для обработки задач и невыполненных задач через соединение с базой данных.
Вариант | Описание |
---|---|
connection |
Указывает соединение, используемое для поиска задач и таблицы невыполненных задач. |
table |
Задает имя таблицы для хранения задач. |
failed_jobs |
Указывает таблицу, в которой будут храниться проваленные задачи. Установите значение None , чтобы не сохранять. |
attempts |
Задает количество попыток выполнения задачи по умолчанию, прежде чем она будет считаться неудачной. |
poll |
Указывает время ожидания в секундах перед вызовом базы данных для поиска новых задач. |
tz |
Часовой пояс, в котором база данных должна сохранять и находить временные метки. |
Драйвер AMQP
Драйвер AMQP используется для соединений по протоколу AMQP, например RabbitMQ.
Доступные варианты включают в себя:
Вариант | Описание |
---|---|
username |
Имя пользователя вашего AMQP-соединения. |
password |
Пароль вашего AMQP-соединения. |
port |
Порт вашего AMQP-соединения. |
vhost |
Имя вашего виртуального хоста. Можно узнать через панель AMQP. |
host |
IP-адрес или имя хоста вашего соединения. |
channel |
Канал для отправки задач. |
queue |
Имя очереди по умолчанию для отправки задач. |
Асинхронный драйвер
Асинхронный драйвер просто запускает задачи в памяти, используя процессы или потоки. Это самый простой драйвер, так как он не требует специального программного обеспечения или настройки.
Доступные варианты включают в себя:
Вариант | Описание |
---|---|
blocking |
Логическое значение, указывающее, должны ли задачи выполняться синхронно. |
callback |
Имя метода задачи, которое должно выполняться. |
mode |
Должна ли очередь порождать процессы или потоки. Возможные варианты: threading или multiprocess |
workers |
Количество процессов или потоков, которые должны создаваться для выполнения задач. |
Создание задач
Чтобы обработать что-то в очереди, вам нужно будет создать задачу. Эта задача будет рассматриваться как сущность, которую можно сериализовать и запустить позже.
Вы можете запустить команду job для создания задачи:
Теперь у вас есть класс задачи, для которого вы можете написать логику:
Любая логика должна быть внутри метода handle
:
from masonite.queues import Queueable
class CreateInvoice(Queueable):
def __init__(self, order_id):
self.order_id = order_id
def handle(self):
# Создание документов счета-фактуры
pass
Очередь задач
Вы можете добавить задачи в очередь для обработки, просто передав их в очередь:
from masonite.queues import Queue
from app.jobs.CreateInvoice import CreateInvoice
class InvoiceController:
def generate(self, queue: Queue):
# Задачи без параметров
queue.push(CreateInvoice())
# Задачи с параметрами
# Создайте счет-фактуру на основе платежа
queue.push(CreateInvoice(Order.find(1).id))
Вы также можете указать любое количество параметров в методе push
:
queue.push(
CreateInvoice(Order.find(1).id),
driver="async", # Драйвер очереди
queue="invoices", # Имя очереди, в которую нужно поместить задачу
)
Обработчики очереди (workers)
Чтобы запустить worker, который является терминальным процессом, а не выполняет задачи,
вы можете использовать команду queue:work
:
Вы также можете изменить параметры:
Вариант | Описание |
---|---|
--driver database |
Указывает, какой драйвер использовать для этого worker. |
--queue invoices |
Указывает, какую очередь использовать для этого worker. |
--connection mysql |
Указывает соединение, используемое для worker. |
--poll 5 |
Указывает время ожидания в секундах для получения новых задач. По умолчанию 1 секунда. |
--attempts 5 |
Указывает количество попыток повторить задачу, прежде чем оно будет считаться неудачным. По умолчанию 3 раза. |
Команда с измененными параметрами будет выглядеть так:
Неудачные задачи
Если ваши конфигурации настроены правильно, при сбое задач они попадут в таблицу неудачных задач. Здесь вы можете отслеживать, почему ваши задачи не выполняются и выбирать, перезапустить или удалить их.
Если вы решите перезапустить свои задачи, они будут помещены обратно в конец очереди и повторно запущены с обычным процессом постановки в очередь.
Чтобы повторно запустить задачи, которые не удалось выполнить, используйте команду:
Вы также можете указать несколько параметров:
Вариант | Описание |
---|---|
--driver database |
Указывает, какой драйвер следует использовать для поиска невыполненных задач. |
--queue invoices |
Указывает, в какую очередь возвращать невыполненные задачи. |
--connection mysql |
Указывает подключение, используемое для получения невыполненных задач. |