Делаем БОЛЬШУЮ автоматизацию с помощью Celery

Вступление

TL; DR: вы можете пропустить вступление и сразу перейти к «Celery — Distributed Task Queue».



Здравствуйте! Меня зовут Бартош Рабьега, я работаю в команде R & D / DevOps в OVHcloud. В рамках нашей повседневной работы мы разрабатываем и поддерживаем проект Ceph-as-a-Service, чтобы обеспечить надежное распределенное хранилище высокой доступности для различных приложений. Мы имеем дело с более чем 60 ПБ данных в 10 регионах, поэтому, как вы можете себе представить, у нас впереди довольно много работы с точки зрения замены вышедшего из строя оборудования, обработки естественного роста, предоставления новых регионов и центров обработки данных, оценки нового оборудования, оптимизация конфигураций программного и аппаратного обеспечения, поиск новых решений для хранения данных и многое другое!



Из-за большого объема нашей работы нам нужно разгрузить как можно больше повторяющихся задач. И мы делаем это с помощью автоматизации.

Автоматизация вашей работы

В некоторой степени каждый ручной процесс можно описать как набор действий и условий. Если бы нам каким-то образом удалось заставить что-то автоматически выполнять действия и проверять условия, мы могли бы автоматизировать процесс, что привело бы к автоматизированному рабочему процессу. Взгляните на пример ниже, в котором показаны некоторые общие шаги по ручной замене оборудования в нашем проекте.



Хм… Что может помочь нам сделать это автоматически? Разве компьютер не кажется идеальным?

Сельдерей — распределенная очередь задач

Celery — это хорошо известное и широко используемое программное обеспечение, которое позволяет нам обрабатывать задачи асинхронно. Описание проекта на его главной странице ( www.celeryproject.org/ ) может показаться немного загадочным, но мы можем сузить его базовую функциональность до примерно следующего:



Такой механизм идеально подходит для таких задач, как асинхронная отправка электронных писем (например, «запустил и забыл»), но его также можно использовать для различных целей. Так с какими еще задачами он справился? В принципе, любые задачи можно реализовать на Python (основном языке Celery)! Я не буду вдаваться в подробности, поскольку они доступны в документации по Celery. Важно то, что, поскольку мы можем реализовать любую задачу, которую захотим, мы можем использовать ее для создания строительных блоков для нашей автоматизации.

Есть еще одна важная вещь… Celery изначально поддерживает объединение таких задач в рабочие процессы (примитивы Celery: цепочки, группы, аккорды и т. Д.). Итак, давайте рассмотрим несколько примеров…

Мы будем использовать следующие определения задач — одиночная задача, печать аргументов и kwargs:

@celery_app.task
def noop(*args, **kwargs):
    # Task accepts any arguments and does nothing
    print(args, kwargs)
    return True


Теперь мы можем выполнить задачу асинхронно, используя следующий код:

task = noop.s(777)
task.apply_async()


Элементарные задачи можно параметризовать и объединить в сложный рабочий процесс с использованием методов сельдерея, то есть «цепочки», «группы» и «хорды». См. Примеры ниже. В каждом из них левая сторона показывает визуальное представление рабочего процесса, а правая — фрагмент кода, который его генерирует. Зеленая рамка — это отправная точка, после которой выполнение рабочего процесса продвигается по вертикали.

Цепочка — набор задач, обрабатываемых последовательно



workflow = (
    chain([noop.s(i) for i in range(3)])
)


Группа — набор параллельно обрабатываемых задач.



workflow = (
    group([noop.s(i) for i in range(5)])
)


Аккорд — группа задач, связанных со следующей задачей



workflow = chord(
        [noop.s(i) for i in range(5)],
        noop.s(i)
)

# Equivalent:
workflow = chain([
        group([noop.s(i) for i in range(5)]),
        noop.s(i)
])


Важный момент: выполнение рабочего процесса всегда останавливается в случае сбоя задачи. В результате цепочка не будет продолжена, если какая-то задача не удастся в ее середине. Это дает нам довольно мощный фреймворк для реализации аккуратной автоматизации, и именно это мы используем для Ceph-as-a-Service в OVHcloud! Мы реализовали множество небольших, гибких, параметризуемых задач, которые мы объединяем для достижения общей цели. Вот несколько реальных примеров элементарных задач, используемых для автоматического удаления старого оборудования:

  • Изменить вес узла Ceph (используется для увеличения / уменьшения количества данных на узле. Запускает ребалансировку данных)
  • Установите время простоя службы (перебалансировка данных запускает зонды мониторинга, но это ожидается, поэтому установите время простоя для этой конкретной записи мониторинга)
  • Подождите, пока Ceph не станет здоровым (дождитесь завершения ребалансировки данных — повторяющаяся задача)
  • Удалите узел Ceph из кластера (узел пуст, поэтому его можно просто удалить)
  • Отправьте информацию техническим специалистам в DC (оборудование готово к замене)
  • Добавить новый узел Ceph в кластер (установить новый пустой узел)

Мы параметризируем эти задачи и связываем их вместе, используя цепочки, группы и аккорды сельдерея для создания желаемого рабочего процесса. Затем Celery сделает все остальное, асинхронно выполняя рабочий процесс.

Большие рабочие процессы и сельдерей

По мере роста нашей инфраструктуры растут и наши автоматизированные рабочие процессы, с увеличением количества задач на рабочий процесс, более высокой сложностью рабочих процессов… Что мы понимаем под большим рабочим процессом? Рабочий процесс, состоящий из 1000-10 000 задач. Чтобы визуализировать это, взгляните на следующие примеры:

Несколько аккордов, связанных вместе (всего 57 заданий)



workflow = chain([
    noop.s(0),
    chord([noop.s(i) for i in range(10)], noop.s()),
    chord([noop.s(i) for i in range(10)], noop.s()),
    chord([noop.s(i) for i in range(10)], noop.s()),
    chord([noop.s(i) for i in range(10)], noop.s()),
    chord([noop.s(i) for i in range(10)], noop.s()),
    noop.s()
])


Более сложная структура графа, построенная из цепочек и групп (всего 23 задачи)



# | is ‘chain’ operator in celery
workflow = (
    group(
        group(
            group([noop.s() for i in range(5)]),
            chain([noop.s() for i in range(5)])
        ) |
        noop.s() |
        group([noop.s() for i in range(5)]) |
        noop.s(),
        chain([noop.s() for i in range(5)])
    ) |
    noop.s()
)


Как вы, наверное, догадались, когда задействовано 1000 задач, визуализация становится довольно большой и беспорядочной! Celery — мощный инструмент, обладающий множеством функций, которые хорошо подходят для автоматизации, но он по-прежнему испытывает трудности, когда дело доходит до обработки больших, сложных и длительных рабочих процессов. Организовать выполнение 10 000 задач с различными зависимостями — нетривиальная вещь. Когда наша автоматизация стала слишком большой, мы столкнулись с несколькими проблемами:

  • Проблемы с памятью во время построения рабочего процесса (на стороне клиента)
  • Проблемы с сериализацией (клиент -> бэкэнд-передача сельдерея)
  • Недетерминированное, прерывистое выполнение рабочих процессов
  • Проблемы с памятью у рабочих Celery (серверная часть Celery)
  • Исчезающие задачи
  • И более…

Взгляните на GitHub:


Использование сельдерея для нашего конкретного случая стало трудным и ненадежным. Встроенная поддержка рабочих процессов в Celery не кажется правильным выбором для обработки 100/1000/10 000 задач. В нынешнем состоянии этого просто недостаточно. Итак, вот мы стоим перед прочной бетонной стеной… Либо мы как-то исправляем Celery, либо переписываем нашу автоматизацию, используя другую структуру.

Сельдерей — исправить… или исправить?

Переписать всю нашу автоматизацию можно, хотя и относительно болезненно. Поскольку я довольно ленив, возможно, попытка исправить Celery была не совсем плохой идеей? Так что я потратил некоторое время, чтобы покопаться в коде Celery, и мне удалось найти части, отвечающие за построение рабочих процессов и выполнение цепочек и аккордов. Мне все еще было немного сложно понять все различные пути кода, обрабатывающие широкий спектр вариантов использования, но я понял, что можно реализовать чистую, прямую оркестровку, которая будет обрабатывать все задачи и их комбинации в одном путь. Более того, я понял, что интегрировать его в нашу автоматизацию не потребует слишком больших усилий (давайте не будем забывать о главной цели!).

К сожалению, внедрение новой оркестровки в проект Celery, вероятно, будет довольно сложным и, скорее всего, нарушит некоторую обратную совместимость. Поэтому я решил использовать другой подход — написать расширение или плагин, которые не требовали бы изменений в Celery. Что-то подключаемое и максимально неинвазивное. Так появился Celery Dyrygent…

Сельдерей Dyrygent

github.com/ovh/celery-dyrygent

Как представить рабочий процесс

Вы можете представить себе рабочий процесс как направленный ациклический граф (DAG), где каждая задача является отдельным узлом графа. Когда дело доходит до ациклических графов, относительно легко хранить и разрешать зависимости между узлами, что приводит к простой оркестровке. Celery Dyrygent был реализован на основе этих функций. Каждая задача в рабочем процессе имеет уникальный идентификатор (Celery уже назначает идентификаторы задач, когда задача отправляется на выполнение), и каждая из них заключена в узел рабочего процесса. Каждый узел рабочего процесса состоит из сигнатуры задачи (простой сигнатуры сельдерея) и списка идентификаторов задач, от которых он зависит. См. Пример ниже:



Как обработать рабочий процесс

Итак, мы знаем, как сохранить рабочий процесс простым и понятным способом. Теперь нам просто нужно его выполнить. Как насчет… сельдерея? Почему бы нет? Для этого Celery Dyrygent вводит задачу процессора рабочего процесса (обычная задача Celery ). Эта задача охватывает весь рабочий процесс и планирует выполнение примитивных задач в соответствии с их зависимостями. После того, как часть планирования завершена, задача повторяется («тикает» с некоторой задержкой).

На протяжении всего цикла обработки процессор рабочих процессов сохраняет внутреннее состояние всего рабочего процесса. В результате он обновляет состояние при каждом повторении. Вы можете увидеть пример оркестровки ниже:







В частности, процессор рабочего процесса останавливает свое выполнение в двух случаях:

  • После завершения всего рабочего процесса и успешного выполнения всех задач
  • Когда он не может продолжить работу из-за сбоя задачи

Как интегрировать

Итак, как нам это использовать? К счастью, мне довольно легко удалось найти способ использовать Celery Dyrygent. Прежде всего, вам необходимо внедрить определение задачи процессора рабочего процесса в ваше приложение CeleryP:

from celery_dyrygent.tasks import register_workflow_processor
app = Celery() #  your celery application instance
workflow_processor = register_workflow_processor(app)


Затем вам нужно преобразовать рабочий процесс, определенный для Celery, в рабочий процесс Celery Dyrygent:

from celery_dyrygent.workflows import Workflow

celery_workflow = chain([
    noop.s(0),
    chord([noop.s(i) for i in range(10)], noop.s()),
    chord([noop.s(i) for i in range(10)], noop.s()),
    chord([noop.s(i) for i in range(10)], noop.s()),
    chord([noop.s(i) for i in range(10)], noop.s()),
    chord([noop.s(i) for i in range(10)], noop.s()),
    noop.s()
])

workflow = Workflow()
workflow.add_celery_canvas(celery_workflow)


Наконец, просто выполните рабочий процесс, как обычную задачу Celery:

workflow.apply_async()


Это оно! Вы всегда можете вернуться, если хотите, так как небольшие изменения очень легко отменить.

Попробуйте!

Celery Dyrygent можно использовать бесплатно, а его исходный код доступен на Github ( github.com/ovh/celery-dyrygent ). Не стесняйтесь использовать его, улучшать, запрашивать функции и сообщать о любых ошибках! У него есть несколько дополнительных функций, не описанных здесь, поэтому я рекомендую вам взглянуть на файл readme проекта. Для наших требований к автоматизации это уже надежное, проверенное на практике решение. Мы используем его с конца 2018 года, и он обработал тысячи рабочих процессов, состоящих из сотен тысяч задач. Вот некоторая статистика производства за период с июня 2019 года по февраль 2020 года:

  • Выполнено 936 248 элементарных задач
  • Обработано 11 170 рабочих процессов
  • 4098 задач в самом большом рабочем процессе
  • ~ 84 задачи на рабочий процесс, в среднем

Автоматизация — это всегда хорошая идея!

Представляющий директор - инструмент для построения рабочих процессов Celery

Нам как разработчикам часто приходится выполнять задачи в фоновом режиме. К счастью, для этого уже существуют некоторые инструменты. Например, в экосистеме Python самая известная библиотека — это Celery. Если вы уже использовали это, вы знаете, насколько это здорово! Но вы также, вероятно, обнаружите, насколько сложно может быть отслеживание состояния сложного рабочего процесса.



Celery Director — это инструмент, который мы создали в OVHcloud для решения этой проблемы. Код теперь имеет открытый исходный код и доступен на Github.



После нашего выступления во время FOSDEM 2020, этот пост направлен на представление инструмента. Мы подробно рассмотрим, что такое Celery, почему мы создали Director и как его использовать.

Что такое «сельдерей»?

Вот официальное описание сельдерея:

Celery — это асинхронная очередь задач / очередь заданий, основанная на распределенной передаче сообщений. Он ориентирован на работу в реальном времени, но также поддерживает планирование.

Важные слова здесь — «очередь задач». Это механизм, используемый для распределения работы по пулу машин или потоков.



Очередь в середине приведенной выше диаграммы хранит сообщения, отправленные производителями (например, API). С другой стороны, потребители постоянно читают очередь, чтобы отображать новые сообщения и выполнять задачи.

В Celery сообщение, отправленное производителем, является подписью функции Python: send_email(«john.doe»)например.

Очередь (именуемая посредником в Celery) хранит эту подпись до тех пор, пока рабочий не прочитает ее и не выполнит функцию в пределах данного параметра.

Но зачем выполнять функцию Python где-то еще? Основная причина — быстрое возвращение ответа в случаях, когда функции долго выполняются. В самом деле, это не вариант заставлять пользователей ждать ответа несколько секунд или минут.

Точно так же, как мы можем представить производителей без достаточного количества ресурсов, с задачей, связанной с процессором, более надежный работник мог бы справиться с ее выполнением.

Как использовать «сельдерей»

Итак, Celery — это библиотека, используемая для выполнения кода Python где-то еще, но как она это делает? На самом деле это действительно просто! Чтобы проиллюстрировать это, мы будем использовать некоторые из доступных методов для отправки задач брокеру, а затем запустим воркер для их использования.

Вот код для создания задачи Celery:

# tasks.py
from celery import Celery

app = Celery("tasks", broker="redis://127.0.0.1:6379/0")

@app.task
def add(x, y):
    return x + y


Как видите, задача Celery — это просто функция Python, преобразованная для отправки в брокер. Обратите внимание, что мы передали соединение redis приложению Celery (названному app), чтобы сообщить брокеру, где хранить сообщения.

Это означает, что теперь можно отправить задачу в брокере:

>>> from tasks import add
>>> add.delay(2, 3)


Вот и все! Мы использовали этот .delay()метод, поэтому наш производитель не выполнял код Python, а вместо этого отправлял подпись задачи брокеру.

Теперь пора употребить его вместе с сельдереем:

$ celery worker -A tasks --loglevel=INFO
[...]
[2020-02-14 17:13:38,947: INFO/MainProcess] Received task: tasks.add[0e9b6ff2-7aec-46c3-b810-b62a32188000]
[2020-02-14 17:13:38,954: INFO/ForkPoolWorker-2] Task tasks.add[0e9b6ff2-7aec-46c3-b810-b62a32188000] succeeded in 0.0024250600254163146s: 5


Можно даже комбинировать задачи Celery с некоторыми примитивами (полный список здесь ):

  • Цепочка: будет выполнять задачи одну за другой.
  • Группа: будет выполнять задачи параллельно, направляя их нескольким рабочим.

Например, следующий код сделает два добавления параллельно, а затем суммирует результаты:

from celery import chain, group

# Create the canvas
canvas = chain(
    group(
        add.si(1, 2),
        add.si(3, 4)
    ),
    sum_numbers.s()
)

# Execute it
canvas.delay()


Вы, наверное, заметили, что мы не использовали здесь метод .delay () . Вместо этого мы создали холст , используемый для объединения набора задач.

.si()
Метод используется для создания неизменяемой подписи (т. Е. Такой , которая не получает данные от предыдущей задачи), в то время как 
.s()
полагается на данные, возвращенные двумя предыдущими задачами.

В этом введении в Celery мы только что рассмотрели его базовое использование. Если вы хотите узнать больше, я приглашаю вас прочитать документацию, где вы обнаружите все мощные функции, включая ограничения скорости , повторные попытки выполнения задач или даже периодические задачи .

Как разработчик я хочу…

Я часть команды, целью которой является развертывание и мониторинг внутренней инфраструктуры. В рамках этого нам нужно было запустить некоторые фоновые задачи, и, как разработчики Python, мы естественным образом выбрали Celery. Но из коробки Celery не поддерживал определенные требования для наших проектов:

  • Отслеживание развития задач и их зависимостей в WebUI.
  • Выполнение рабочих процессов с помощью вызовов API или просто с помощью интерфейса командной строки.
  • Объединение задач для создания рабочих процессов в формате YAML.
  • Периодическое выполнение всего рабочего процесса.

Для этого существуют другие классные инструменты, например Flower, но они позволяют нам отслеживать только каждую задачу по отдельности, а не весь рабочий процесс и его составные задачи.

И поскольку нам действительно были нужны эти функции, мы решили создать Celery Director.



Как использовать Директор

Установку можно произвести с помощью pip-команды:

$ pip install celery-director


Директор предоставляет простую команду для создания новой папки рабочего пространства:

$ director init workflows
[*] Project created in /home/ncrocfer/workflows
[*] Do not forget to initialize the database
You can now export the DIRECTOR_HOME environment variable


Ниже для вас создана новая папка задач и пример рабочего процесса:

$ tree -a workflows/
├── .env
├── tasks
│   └── etl.py
└── workflows.yml


В tasks/*.pyфайлы будут содержать ваши задачи Сельдерей, в то время как workflows.ymlфайл будет объединить их:

$ cat workflows.yml
---
ovh.SIMPLE_ETL:
  tasks:
    - EXTRACT
    - TRANSFORM
    - LOAD


В этом примере с именем ovh.SIMPLE_ETL будут выполняться три задачи, одна за другой. Вы можете найти больше примеров в документации.

После экспорта DIRECTOR_HOMEпеременной и инициализации базы данных с помощью director db upgradeвы можете выполнить следующий рабочий процесс:

$ director workflow list
+----------------+----------+-----------+
| Workflows (1)  | Periodic | Tasks     |
+----------------+----------+-----------+
| ovh.SIMPLE_ETL |    --    | EXTRACT   |
|                |          | TRANSFORM |
|                |          | LOAD      |
+----------------+----------+-----------+
$ director workflow run ovh.SIMPLE_ETL


Брокер получил задачи, поэтому теперь вы можете запустить Celery worker для их выполнения:

$ director celery worker --loglevel=INFO


А затем отобразите результаты с помощью команды веб-сервера ( director webserver):



Это только начало, поскольку Director предоставляет другие функции, позволяющие, например, параметризовать рабочий процесс или периодически его выполнять. Более подробную информацию об этих функциях вы найдете в документации.

Вывод

Наши команды регулярно используют Director для запуска наших рабочих процессов. Больше нет шаблонов и нет необходимости в продвинутых знаниях Celery… Новый коллега может легко создавать свои задачи на Python и комбинировать их в YAML, не используя примитивы Celery, описанные ранее.

Иногда нам нужно периодически выполнять рабочий процесс (например, для заполнения кеша), а иногда нам нужно вручную вызывать его из другой веб-службы (обратите внимание, что рабочий процесс также может быть выполнен через вызов API ). Теперь это возможно с помощью нашего единственного экземпляра Director.

Мы приглашаем вас попробовать Director на себе и оставить свой отзыв через Github, чтобы мы могли продолжать его улучшать. Исходный код можно найти на Github, а презентация FOSDEM 2020 доступна здесь.