Представляющий директор - инструмент для построения рабочих процессов Celery
Нам как разработчикам часто приходится выполнять задачи в фоновом режиме. К счастью, для этого уже существуют некоторые инструменты. Например, в экосистеме Python самая известная библиотека — это Celery. Если вы уже использовали это, вы знаете, насколько это здорово! Но вы также, вероятно, обнаружите, насколько сложно может быть отслеживание состояния сложного рабочего процесса.
Celery Director — это инструмент, который мы создали в OVHcloud для решения этой проблемы. Код теперь имеет открытый исходный код и доступен на Github.
После нашего выступления во время FOSDEM 2020, этот пост направлен на представление инструмента. Мы подробно рассмотрим, что такое Celery, почему мы создали Director и как его использовать.
Вот официальное описание сельдерея:
Важные слова здесь — «очередь задач». Это механизм, используемый для распределения работы по пулу машин или потоков.
Очередь в середине приведенной выше диаграммы хранит сообщения, отправленные производителями (например, API). С другой стороны, потребители постоянно читают очередь, чтобы отображать новые сообщения и выполнять задачи.
В Celery сообщение, отправленное производителем, является подписью функции Python: send_email(«john.doe»)например.
Очередь (именуемая посредником в Celery) хранит эту подпись до тех пор, пока рабочий не прочитает ее и не выполнит функцию в пределах данного параметра.
Но зачем выполнять функцию Python где-то еще? Основная причина — быстрое возвращение ответа в случаях, когда функции долго выполняются. В самом деле, это не вариант заставлять пользователей ждать ответа несколько секунд или минут.
Точно так же, как мы можем представить производителей без достаточного количества ресурсов, с задачей, связанной с процессором, более надежный работник мог бы справиться с ее выполнением.
Итак, Celery — это библиотека, используемая для выполнения кода Python где-то еще, но как она это делает? На самом деле это действительно просто! Чтобы проиллюстрировать это, мы будем использовать некоторые из доступных методов для отправки задач брокеру, а затем запустим воркер для их использования.
Вот код для создания задачи Celery:
Как видите, задача Celery — это просто функция Python, преобразованная для отправки в брокер. Обратите внимание, что мы передали соединение redis приложению Celery (названному app), чтобы сообщить брокеру, где хранить сообщения.
Это означает, что теперь можно отправить задачу в брокере:
Вот и все! Мы использовали этот .delay()метод, поэтому наш производитель не выполнял код Python, а вместо этого отправлял подпись задачи брокеру.
Теперь пора употребить его вместе с сельдереем:
Можно даже комбинировать задачи Celery с некоторыми примитивами (полный список здесь ):
Например, следующий код сделает два добавления параллельно, а затем суммирует результаты:
Вы, наверное, заметили, что мы не использовали здесь метод .delay () . Вместо этого мы создали холст , используемый для объединения набора задач.
В этом введении в Celery мы только что рассмотрели его базовое использование. Если вы хотите узнать больше, я приглашаю вас прочитать документацию, где вы обнаружите все мощные функции, включая ограничения скорости , повторные попытки выполнения задач или даже периодические задачи .
Я часть команды, целью которой является развертывание и мониторинг внутренней инфраструктуры. В рамках этого нам нужно было запустить некоторые фоновые задачи, и, как разработчики Python, мы естественным образом выбрали Celery. Но из коробки Celery не поддерживал определенные требования для наших проектов:
Для этого существуют другие классные инструменты, например Flower, но они позволяют нам отслеживать только каждую задачу по отдельности, а не весь рабочий процесс и его составные задачи.
И поскольку нам действительно были нужны эти функции, мы решили создать Celery Director.
Установку можно произвести с помощью pip-команды:
Директор предоставляет простую команду для создания новой папки рабочего пространства:
Ниже для вас создана новая папка задач и пример рабочего процесса:
В tasks/*.pyфайлы будут содержать ваши задачи Сельдерей, в то время как workflows.ymlфайл будет объединить их:
В этом примере с именем ovh.SIMPLE_ETL будут выполняться три задачи, одна за другой. Вы можете найти больше примеров в документации.
После экспорта DIRECTOR_HOMEпеременной и инициализации базы данных с помощью director db upgradeвы можете выполнить следующий рабочий процесс:
Брокер получил задачи, поэтому теперь вы можете запустить Celery worker для их выполнения:
А затем отобразите результаты с помощью команды веб-сервера ( director webserver):
Это только начало, поскольку Director предоставляет другие функции, позволяющие, например, параметризовать рабочий процесс или периодически его выполнять. Более подробную информацию об этих функциях вы найдете в документации.
Наши команды регулярно используют Director для запуска наших рабочих процессов. Больше нет шаблонов и нет необходимости в продвинутых знаниях Celery… Новый коллега может легко создавать свои задачи на Python и комбинировать их в YAML, не используя примитивы Celery, описанные ранее.
Иногда нам нужно периодически выполнять рабочий процесс (например, для заполнения кеша), а иногда нам нужно вручную вызывать его из другой веб-службы (обратите внимание, что рабочий процесс также может быть выполнен через вызов API ). Теперь это возможно с помощью нашего единственного экземпляра Director.
Мы приглашаем вас попробовать Director на себе и оставить свой отзыв через Github, чтобы мы могли продолжать его улучшать. Исходный код можно найти на Github, а презентация FOSDEM 2020 доступна здесь.
Celery Director — это инструмент, который мы создали в OVHcloud для решения этой проблемы. Код теперь имеет открытый исходный код и доступен на Github.
После нашего выступления во время FOSDEM 2020, этот пост направлен на представление инструмента. Мы подробно рассмотрим, что такое Celery, почему мы создали Director и как его использовать.
Что такое «сельдерей»?
Вот официальное описание сельдерея:
Celery — это асинхронная очередь задач / очередь заданий, основанная на распределенной передаче сообщений. Он ориентирован на работу в реальном времени, но также поддерживает планирование.
Важные слова здесь — «очередь задач». Это механизм, используемый для распределения работы по пулу машин или потоков.
Очередь в середине приведенной выше диаграммы хранит сообщения, отправленные производителями (например, API). С другой стороны, потребители постоянно читают очередь, чтобы отображать новые сообщения и выполнять задачи.
В Celery сообщение, отправленное производителем, является подписью функции Python: send_email(«john.doe»)например.
Очередь (именуемая посредником в Celery) хранит эту подпись до тех пор, пока рабочий не прочитает ее и не выполнит функцию в пределах данного параметра.
Но зачем выполнять функцию Python где-то еще? Основная причина — быстрое возвращение ответа в случаях, когда функции долго выполняются. В самом деле, это не вариант заставлять пользователей ждать ответа несколько секунд или минут.
Точно так же, как мы можем представить производителей без достаточного количества ресурсов, с задачей, связанной с процессором, более надежный работник мог бы справиться с ее выполнением.
Как использовать «сельдерей»
Итак, Celery — это библиотека, используемая для выполнения кода Python где-то еще, но как она это делает? На самом деле это действительно просто! Чтобы проиллюстрировать это, мы будем использовать некоторые из доступных методов для отправки задач брокеру, а затем запустим воркер для их использования.
Вот код для создания задачи Celery:
# tasks.py
from celery import Celery
app = Celery("tasks", broker="redis://127.0.0.1:6379/0")
@app.task
def add(x, y):
return x + y
Как видите, задача Celery — это просто функция Python, преобразованная для отправки в брокер. Обратите внимание, что мы передали соединение redis приложению Celery (названному app), чтобы сообщить брокеру, где хранить сообщения.
Это означает, что теперь можно отправить задачу в брокере:
>>> from tasks import add
>>> add.delay(2, 3)
Вот и все! Мы использовали этот .delay()метод, поэтому наш производитель не выполнял код Python, а вместо этого отправлял подпись задачи брокеру.
Теперь пора употребить его вместе с сельдереем:
$ celery worker -A tasks --loglevel=INFO
[...]
[2020-02-14 17:13:38,947: INFO/MainProcess] Received task: tasks.add[0e9b6ff2-7aec-46c3-b810-b62a32188000]
[2020-02-14 17:13:38,954: INFO/ForkPoolWorker-2] Task tasks.add[0e9b6ff2-7aec-46c3-b810-b62a32188000] succeeded in 0.0024250600254163146s: 5
Можно даже комбинировать задачи Celery с некоторыми примитивами (полный список здесь ):
- Цепочка: будет выполнять задачи одну за другой.
- Группа: будет выполнять задачи параллельно, направляя их нескольким рабочим.
Например, следующий код сделает два добавления параллельно, а затем суммирует результаты:
from celery import chain, group
# Create the canvas
canvas = chain(
group(
add.si(1, 2),
add.si(3, 4)
),
sum_numbers.s()
)
# Execute it
canvas.delay()
Вы, наверное, заметили, что мы не использовали здесь метод .delay () . Вместо этого мы создали холст , используемый для объединения набора задач.
.si()
Метод используется для создания неизменяемой подписи (т. Е. Такой , которая не получает данные от предыдущей задачи), в то время как .s()
полагается на данные, возвращенные двумя предыдущими задачами.В этом введении в Celery мы только что рассмотрели его базовое использование. Если вы хотите узнать больше, я приглашаю вас прочитать документацию, где вы обнаружите все мощные функции, включая ограничения скорости , повторные попытки выполнения задач или даже периодические задачи .
Как разработчик я хочу…
Я часть команды, целью которой является развертывание и мониторинг внутренней инфраструктуры. В рамках этого нам нужно было запустить некоторые фоновые задачи, и, как разработчики Python, мы естественным образом выбрали Celery. Но из коробки Celery не поддерживал определенные требования для наших проектов:
- Отслеживание развития задач и их зависимостей в WebUI.
- Выполнение рабочих процессов с помощью вызовов API или просто с помощью интерфейса командной строки.
- Объединение задач для создания рабочих процессов в формате YAML.
- Периодическое выполнение всего рабочего процесса.
Для этого существуют другие классные инструменты, например Flower, но они позволяют нам отслеживать только каждую задачу по отдельности, а не весь рабочий процесс и его составные задачи.
И поскольку нам действительно были нужны эти функции, мы решили создать Celery Director.
Как использовать Директор
Установку можно произвести с помощью pip-команды:
$ pip install celery-director
Директор предоставляет простую команду для создания новой папки рабочего пространства:
$ director init workflows
[*] Project created in /home/ncrocfer/workflows
[*] Do not forget to initialize the database
You can now export the DIRECTOR_HOME environment variable
Ниже для вас создана новая папка задач и пример рабочего процесса:
$ tree -a workflows/
├── .env
├── tasks
│ └── etl.py
└── workflows.yml
В tasks/*.pyфайлы будут содержать ваши задачи Сельдерей, в то время как workflows.ymlфайл будет объединить их:
$ cat workflows.yml
---
ovh.SIMPLE_ETL:
tasks:
- EXTRACT
- TRANSFORM
- LOAD
В этом примере с именем ovh.SIMPLE_ETL будут выполняться три задачи, одна за другой. Вы можете найти больше примеров в документации.
После экспорта DIRECTOR_HOMEпеременной и инициализации базы данных с помощью director db upgradeвы можете выполнить следующий рабочий процесс:
$ director workflow list
+----------------+----------+-----------+
| Workflows (1) | Periodic | Tasks |
+----------------+----------+-----------+
| ovh.SIMPLE_ETL | -- | EXTRACT |
| | | TRANSFORM |
| | | LOAD |
+----------------+----------+-----------+
$ director workflow run ovh.SIMPLE_ETL
Брокер получил задачи, поэтому теперь вы можете запустить Celery worker для их выполнения:
$ director celery worker --loglevel=INFO
А затем отобразите результаты с помощью команды веб-сервера ( director webserver):
Это только начало, поскольку Director предоставляет другие функции, позволяющие, например, параметризовать рабочий процесс или периодически его выполнять. Более подробную информацию об этих функциях вы найдете в документации.
Вывод
Наши команды регулярно используют Director для запуска наших рабочих процессов. Больше нет шаблонов и нет необходимости в продвинутых знаниях Celery… Новый коллега может легко создавать свои задачи на Python и комбинировать их в YAML, не используя примитивы Celery, описанные ранее.
Иногда нам нужно периодически выполнять рабочий процесс (например, для заполнения кеша), а иногда нам нужно вручную вызывать его из другой веб-службы (обратите внимание, что рабочий процесс также может быть выполнен через вызов API ). Теперь это возможно с помощью нашего единственного экземпляра Director.
Мы приглашаем вас попробовать Director на себе и оставить свой отзыв через Github, чтобы мы могли продолжать его улучшать. Исходный код можно найти на Github, а презентация FOSDEM 2020 доступна здесь.
0 комментариев