Перейти к содержанию

Очереди и задачи в Masonite

Masonite поставляется с мощной системой очередей. Эта функция полезна для выполнения тех задач, которые требуют времени. Например, отправка электронных писем, обработка видео, создание счетов, обновление записей и всё остальное, что вам не нужно, чтобы пользователи ждали.

Во-первых, задачи создаются с логикой, необходимой для запуска задачи. Затем задачи "помещаются" в очередь, где они будут выполняться позже с использованием "работников очереди" (workers). Вы можете указать столько worker`ов, сколько может запустить ваш сервер.

Помимо выполнения задачи, некоторые драйверы позволяют отслеживать любые задачи, которые не выполняются. Данные задачи будут сохранены в базе данных, где их можно будет отслеживать и при необходимости повторно запускать.

Конфигурация

Вы можете легко изменить поведение вашей системы очередей, изменив конфигурацию очереди:

Доступные драйверы очередей: async, database и amqp.

Полная конфигурация очереди будет выглядеть так:

config/queue.py
from masonite.environment import env


DRIVERS = {
    "default": env("QUEUE_DRIVER", "async"),
    "database": {
        "connection": "sqlite",
        "table": "jobs",
        "failed_table": "failed_jobs",
        "attempts": 3,
        "poll": 5,
    },
    "redis": {
        "name": env("QUEUE_USERNAME", "guest"),
        "password": env("QUEUE_PASSWORD", "guest"),
        "port": env("QUEUE_PORT", "6379"),
        "host": env("QUEUE_HOST", "localhost"),
    },
    "amqp": {
        "username": env("QUEUE_USERNAME", "guest"),
        "password": env("QUEUE_PASSWORD", "guest"),
        "port": env("QUEUE_PORT", "5672"),
        "vhost": env("QUEUE_VHOST", ""),
        "host": env("QUEUE_HOST", "localhost"),
        "exchange": "",
        # See https://pika.readthedocs.io/en/stable/modules/parameters.html#pika.connection.URLParameters
        # for valid connection options values
        "connection_options": {},
        "channel": env("QUEUE_CHANNEL", "default"),
        "queue": "masonite4",
        "tz": "UTC",
    },
    "async": {
        "blocking": True,
        "callback": "handle",
        "mode": "threading",
        "workers": 1,
    },
}

Очередь по умолчанию

Ключ default используется для указания того, какой драйвер очереди в конфигурации использовать по умолчанию. Это должно быть то же значение, что и один из других ключей в словаре конфигурации.

Драйвер базы данных

Чтобы использовать драйвер базы данных, вы должны сначала создать таблицу задач:

python craft queue:table
Команда создаст файл миграции в вашем каталоге migrations.

Если вы хотите сохранить провалившиеся задания, вы также должны создать миграцию:

python craft queue:failed

Затем выполнить миграции:

python craft migrate

Драйвер БД используется для обработки задач и невыполненных задач через соединение с базой данных.

Вариант Описание
connection Указывает соединение, используемое для поиска задач и таблицы невыполненных задач.
table Задает имя таблицы для хранения задач.
failed_jobs Указывает таблицу, в которой будут храниться проваленные задачи. Установите значение None, чтобы не сохранять.
attempts Задает количество попыток выполнения задачи по умолчанию, прежде чем она будет считаться неудачной.
poll Указывает время ожидания в секундах перед вызовом базы данных для поиска новых задач.
tz Часовой пояс, в котором база данных должна сохранять и находить временные метки.

Драйвер AMQP

Драйвер AMQP используется для соединений по протоколу AMQP, например RabbitMQ.

Доступные варианты включают в себя:

Вариант Описание
username Имя пользователя вашего AMQP-соединения.
password Пароль вашего AMQP-соединения.
port Порт вашего AMQP-соединения.
vhost Имя вашего виртуального хоста. Можно узнать через панель AMQP.
host IP-адрес или имя хоста вашего соединения.
channel Канал для отправки задач.
queue Имя очереди по умолчанию для отправки задач.

Асинхронный драйвер

Асинхронный драйвер просто запускает задачи в памяти, используя процессы или потоки. Это самый простой драйвер, так как он не требует специального программного обеспечения или настройки.

Доступные варианты включают в себя:

Вариант Описание
blocking

Логическое значение, указывающее, должны ли задачи выполняться синхронно.
Полезно для целей отладки.

callback Имя метода задачи, которое должно выполняться.
mode Должна ли очередь порождать процессы или потоки. Возможные варианты: threading или multiprocess
workers Количество процессов или потоков, которые должны создаваться для выполнения задач.

Создание задач

Чтобы обработать что-то в очереди, вам нужно будет создать задачу. Эта задача будет рассматриваться как сущность, которую можно сериализовать и запустить позже.

Вы можете запустить команду job для создания задачи:

python craft job CreateInvoice

Теперь у вас есть класс задачи, для которого вы можете написать логику:

from masonite.queues import Queueable


class CreateInvoice(Queueable):
    def handle(self):
        pass

Любая логика должна быть внутри метода handle:

from masonite.queues import Queueable


class CreateInvoice(Queueable):

    def __init__(self, order_id):
        self.order_id = order_id

    def handle(self):
        # Создание документов счета-фактуры
        pass

Очередь задач

Вы можете добавить задачи в очередь для обработки, просто передав их в очередь:

from masonite.queues import Queue
from app.jobs.CreateInvoice import CreateInvoice


class InvoiceController:

  def generate(self, queue: Queue):
    # Задачи без параметров
    queue.push(CreateInvoice())

    # Задачи с параметрами
    # Создайте счет-фактуру на основе платежа
    queue.push(CreateInvoice(Order.find(1).id))

Вы также можете указать любое количество параметров в методе push:

queue.push(
  CreateInvoice(Order.find(1).id),
  driver="async", # Драйвер очереди
  queue="invoices", # Имя очереди, в которую нужно поместить задачу
)

Обработчики очереди (workers)

Чтобы запустить worker, который является терминальным процессом, а не выполняет задачи, вы можете использовать команду queue:work:

python craft queue:work
Эта команда запустит worker с использованием конфигураций очереди по умолчанию.

Вы также можете изменить параметры:

Вариант Описание
--driver database Указывает, какой драйвер использовать для этого worker.
--queue invoices Указывает, какую очередь использовать для этого worker.
--connection mysql Указывает соединение, используемое для worker.
--poll 5 Указывает время ожидания в секундах для получения новых задач. По умолчанию 1 секунда.
--attempts 5 Указывает количество попыток повторить задачу, прежде чем оно будет считаться неудачным. По умолчанию 3 раза.

Команда с измененными параметрами будет выглядеть так:

python craft queue:work --driver database --connection mysql --poll 5 --attempts 2

Неудачные задачи

Если ваши конфигурации настроены правильно, при сбое задач они попадут в таблицу неудачных задач. Здесь вы можете отслеживать, почему ваши задачи не выполняются и выбирать, перезапустить или удалить их.

Если вы решите перезапустить свои задачи, они будут помещены обратно в конец очереди и повторно запущены с обычным процессом постановки в очередь.

Чтобы повторно запустить задачи, которые не удалось выполнить, используйте команду:

python craft queue:retry

Вы также можете указать несколько параметров:

Вариант Описание
--driver database Указывает, какой драйвер следует использовать для поиска невыполненных задач.
--queue invoices Указывает, в какую очередь возвращать невыполненные задачи.
--connection mysql Указывает подключение, используемое для получения невыполненных задач.