Представляющий директор - инструмент для построения рабочих процессов Celery

Нам как разработчикам часто приходится выполнять задачи в фоновом режиме. К счастью, для этого уже существуют некоторые инструменты. Например, в экосистеме Python самая известная библиотека — это Celery. Если вы уже использовали это, вы знаете, насколько это здорово! Но вы также, вероятно, обнаружите, насколько сложно может быть отслеживание состояния сложного рабочего процесса.



Celery Director — это инструмент, который мы создали в OVHcloud для решения этой проблемы. Код теперь имеет открытый исходный код и доступен на Github.



После нашего выступления во время FOSDEM 2020, этот пост направлен на представление инструмента. Мы подробно рассмотрим, что такое Celery, почему мы создали Director и как его использовать.

Что такое «сельдерей»?

Вот официальное описание сельдерея:

Celery — это асинхронная очередь задач / очередь заданий, основанная на распределенной передаче сообщений. Он ориентирован на работу в реальном времени, но также поддерживает планирование.

Важные слова здесь — «очередь задач». Это механизм, используемый для распределения работы по пулу машин или потоков.



Очередь в середине приведенной выше диаграммы хранит сообщения, отправленные производителями (например, API). С другой стороны, потребители постоянно читают очередь, чтобы отображать новые сообщения и выполнять задачи.

В Celery сообщение, отправленное производителем, является подписью функции Python: send_email(«john.doe»)например.

Очередь (именуемая посредником в Celery) хранит эту подпись до тех пор, пока рабочий не прочитает ее и не выполнит функцию в пределах данного параметра.

Но зачем выполнять функцию Python где-то еще? Основная причина — быстрое возвращение ответа в случаях, когда функции долго выполняются. В самом деле, это не вариант заставлять пользователей ждать ответа несколько секунд или минут.

Точно так же, как мы можем представить производителей без достаточного количества ресурсов, с задачей, связанной с процессором, более надежный работник мог бы справиться с ее выполнением.

Как использовать «сельдерей»

Итак, Celery — это библиотека, используемая для выполнения кода Python где-то еще, но как она это делает? На самом деле это действительно просто! Чтобы проиллюстрировать это, мы будем использовать некоторые из доступных методов для отправки задач брокеру, а затем запустим воркер для их использования.

Вот код для создания задачи Celery:

# tasks.py
from celery import Celery

app = Celery("tasks", broker="redis://127.0.0.1:6379/0")

@app.task
def add(x, y):
    return x + y


Как видите, задача Celery — это просто функция Python, преобразованная для отправки в брокер. Обратите внимание, что мы передали соединение redis приложению Celery (названному app), чтобы сообщить брокеру, где хранить сообщения.

Это означает, что теперь можно отправить задачу в брокере:

>>> from tasks import add
>>> add.delay(2, 3)


Вот и все! Мы использовали этот .delay()метод, поэтому наш производитель не выполнял код Python, а вместо этого отправлял подпись задачи брокеру.

Теперь пора употребить его вместе с сельдереем:

$ celery worker -A tasks --loglevel=INFO
[...]
[2020-02-14 17:13:38,947: INFO/MainProcess] Received task: tasks.add[0e9b6ff2-7aec-46c3-b810-b62a32188000]
[2020-02-14 17:13:38,954: INFO/ForkPoolWorker-2] Task tasks.add[0e9b6ff2-7aec-46c3-b810-b62a32188000] succeeded in 0.0024250600254163146s: 5


Можно даже комбинировать задачи Celery с некоторыми примитивами (полный список здесь ):

  • Цепочка: будет выполнять задачи одну за другой.
  • Группа: будет выполнять задачи параллельно, направляя их нескольким рабочим.

Например, следующий код сделает два добавления параллельно, а затем суммирует результаты:

from celery import chain, group

# Create the canvas
canvas = chain(
    group(
        add.si(1, 2),
        add.si(3, 4)
    ),
    sum_numbers.s()
)

# Execute it
canvas.delay()


Вы, наверное, заметили, что мы не использовали здесь метод .delay () . Вместо этого мы создали холст , используемый для объединения набора задач.

.si()
Метод используется для создания неизменяемой подписи (т. Е. Такой , которая не получает данные от предыдущей задачи), в то время как 
.s()
полагается на данные, возвращенные двумя предыдущими задачами.

В этом введении в Celery мы только что рассмотрели его базовое использование. Если вы хотите узнать больше, я приглашаю вас прочитать документацию, где вы обнаружите все мощные функции, включая ограничения скорости , повторные попытки выполнения задач или даже периодические задачи .

Как разработчик я хочу…

Я часть команды, целью которой является развертывание и мониторинг внутренней инфраструктуры. В рамках этого нам нужно было запустить некоторые фоновые задачи, и, как разработчики Python, мы естественным образом выбрали Celery. Но из коробки Celery не поддерживал определенные требования для наших проектов:

  • Отслеживание развития задач и их зависимостей в WebUI.
  • Выполнение рабочих процессов с помощью вызовов API или просто с помощью интерфейса командной строки.
  • Объединение задач для создания рабочих процессов в формате YAML.
  • Периодическое выполнение всего рабочего процесса.

Для этого существуют другие классные инструменты, например Flower, но они позволяют нам отслеживать только каждую задачу по отдельности, а не весь рабочий процесс и его составные задачи.

И поскольку нам действительно были нужны эти функции, мы решили создать Celery Director.



Как использовать Директор

Установку можно произвести с помощью pip-команды:

$ pip install celery-director


Директор предоставляет простую команду для создания новой папки рабочего пространства:

$ director init workflows
[*] Project created in /home/ncrocfer/workflows
[*] Do not forget to initialize the database
You can now export the DIRECTOR_HOME environment variable


Ниже для вас создана новая папка задач и пример рабочего процесса:

$ tree -a workflows/
├── .env
├── tasks
│   └── etl.py
└── workflows.yml


В tasks/*.pyфайлы будут содержать ваши задачи Сельдерей, в то время как workflows.ymlфайл будет объединить их:

$ cat workflows.yml
---
ovh.SIMPLE_ETL:
  tasks:
    - EXTRACT
    - TRANSFORM
    - LOAD


В этом примере с именем ovh.SIMPLE_ETL будут выполняться три задачи, одна за другой. Вы можете найти больше примеров в документации.

После экспорта DIRECTOR_HOMEпеременной и инициализации базы данных с помощью director db upgradeвы можете выполнить следующий рабочий процесс:

$ director workflow list
+----------------+----------+-----------+
| Workflows (1)  | Periodic | Tasks     |
+----------------+----------+-----------+
| ovh.SIMPLE_ETL |    --    | EXTRACT   |
|                |          | TRANSFORM |
|                |          | LOAD      |
+----------------+----------+-----------+
$ director workflow run ovh.SIMPLE_ETL


Брокер получил задачи, поэтому теперь вы можете запустить Celery worker для их выполнения:

$ director celery worker --loglevel=INFO


А затем отобразите результаты с помощью команды веб-сервера ( director webserver):



Это только начало, поскольку Director предоставляет другие функции, позволяющие, например, параметризовать рабочий процесс или периодически его выполнять. Более подробную информацию об этих функциях вы найдете в документации.

Вывод

Наши команды регулярно используют Director для запуска наших рабочих процессов. Больше нет шаблонов и нет необходимости в продвинутых знаниях Celery… Новый коллега может легко создавать свои задачи на Python и комбинировать их в YAML, не используя примитивы Celery, описанные ранее.

Иногда нам нужно периодически выполнять рабочий процесс (например, для заполнения кеша), а иногда нам нужно вручную вызывать его из другой веб-службы (обратите внимание, что рабочий процесс также может быть выполнен через вызов API ). Теперь это возможно с помощью нашего единственного экземпляра Director.

Мы приглашаем вас попробовать Director на себе и оставить свой отзыв через Github, чтобы мы могли продолжать его улучшать. Исходный код можно найти на Github, а презентация FOSDEM 2020 доступна здесь.

0 комментариев

Оставить комментарий