Переход к хранилищу Ceph нового поколения в OVHcloud с LXD

Вступление

Меня зовут Филип Дорош. Я работаю в OVHcloud с 2017 года инженером DevOps. Сегодня я хочу рассказать вам историю о том, как мы развернули Ceph следующего поколения в OVHcloud. Но сначала несколько слов о Ceph: Ceph — это программно определяемое решение для хранения данных, которое поддерживает дополнительные тома публичного облака OVHcloud, а также наш продукт Cloud Disk Array. Но я не буду утомлять вас маркетинговыми вещами — давайте начнем!



Похоже, это интересная задача ...

Полтора года назад мы начали очень знакомый спринт. Помимо обычных вещей, с которыми нам приходится иметь дело, была одна задача, которая выглядела немного интереснее. Заголовок гласил: « Оцените, можем ли мы запускать новые версии Ceph на нашем текущем программном обеспечении ». Нам нужны были более новые версии Ceph и Bluestore для создания решения Ceph следующего поколения со всеми флеш-хранилищами.

Наше программное решение (которое мы называем устаревшим решением) основано на Docker. Звучит действительно круто, но мы запускаем Docker немного иначе, чем по назначению. Наши контейнеры очень сохраняют состояние. Мы запускаем полноценную систему инициализации внутри контейнера в качестве точки входа в докер. Затем эта система инициализации запускает все необходимое программное обеспечение внутри контейнера, включая Puppet, который мы используем для управления «вещами». Похоже, мы используем контейнеры Docker так же, как контейнеры LXC, не так ли?…

Наша унаследованная инфраструктура Ceph (аллегория)

Вскоре выяснилось, что невозможно запускать более новые выпуски Ceph в нашем внутреннем решении, потому что новые версии Ceph используют systemd, а в нашем текущем решении мы вообще не запускаем systemd — ни внутри контейнеров, ни на хозяева, которые их принимают.

Началась охота за решениями. Одна из возможностей заключалась в том, чтобы упаковать Ceph самостоятельно и избавиться от systemd, но это большой объем работы с небольшой добавленной стоимостью. Сообщество Ceph предоставляет проверенные пакеты, которые необходимо использовать, так что этот вариант не рассматривался.

Второй вариант — запустить Ceph с супервизором внутри контейнера Docker. Хотя это звучит как план, даже в документации supervisord четко указано, что supervisord «не предназначен для запуска вместо init как« process id 1 ».». Так что это тоже явно не вариант.

Нам нужен был systemd!

На этом этапе стало ясно, что нам нужно решение, позволяющее запускать systemd как внутри контейнера, так и на хосте. Похоже, настало идеальное время для перехода на совершенно новое решение — решение, которое было разработано для запуска полной ОС внутри контейнера. Поскольку наш Docker использовал бэкэнд LXC, это было естественным выбором для оценки LXC. В нем были все необходимые функции, но с LXC нам пришлось бы самостоятельно кодировать всю автоматизацию, связанную с контейнерами. Но можно ли избежать всей этой дополнительной работы? Оказывается, может…

Поскольку я использовал LXD в своем предыдущем проекте, я знал, что он способен управлять образами, сетями, блочными устройствами и всеми хорошими функциями, которые необходимы для настройки полнофункционального кластера Ceph.

Поэтому я переустановил свои серверы разработчиков с выпуском Ubuntu Server LTS и установил на них LXD.



LXD имеет все необходимое для создания полнофункциональных кластеров Ceph:

  • он поддерживает «толстые» контейнеры с отслеживанием состояния,
  • он поддерживает systemd внутри контейнера,
  • он поддерживает образы контейнеров, поэтому мы можем подготовить индивидуальные изображения и без проблем использовать их,
  • передача целых блочных устройств в контейнеры,
  • передача обычных каталогов в контейнеры,
  • поддержка простого запуска, остановки, перезапуска контейнера,
  • REST API, который будет рассмотрен в следующих частях статьи,
  • поддержка нескольких сетевых интерфейсов в контейнерах с использованием macvlan.

После нескольких часов ручной работы у меня был кластер Ceph, на котором запущен выпуск Mimic внутри контейнеров LXD. Я набрал ceph health и получил «HEALTH_OK». Приятно! Это сработало отлично.

Как нам это индустриализировать?

Чтобы индустриализировать его и подключить к нашей плоскости управления, нам нужен был модуль Puppet для LXD, чтобы Puppet мог управлять всеми элементами, связанными с контейнером, на хосте. Такого модуля, который предоставлял бы необходимую нам функциональность, не существовало, поэтому нам пришлось его кодировать самостоятельно.

Демон LXD предоставляет удобный REST API, который мы использовали для создания нашего модуля Puppet. Вы можете общаться с API локально через сокет unix и через сеть, если вы настроили его раскрытие. Для использования в модуле было действительно удобно использовать команду запроса lxc, которая работает, отправляя необработанные запросы в LXD через сокет unix. Модуль теперь имеет открытый исходный код на GitHub, поэтому вы можете скачать его и поиграть с ним. Он позволяет настраивать основные параметры LXD, а также создавать контейнеры, профили, пулы хранения и т. Д.

Модуль позволяет создавать, а также управлять состоянием ресурсов. Просто измените свои манифесты, запустите марионеточный агент, и он сделает все остальное.



Модуль LXD Puppet на момент написания этого документа предоставляет следующие определения:

  • lxd :: profile
  • lxd :: изображение
  • lxd :: хранилище
  • lxd :: container

Для получения полной справки посетите его страницу GitHub — github.com/ovh/lxd-puppet-module.

Ручная установка VS Автоматическая установка с Puppet

Я покажу вам простой пример того, как создать точно такую ​​же настройку вручную, а затем снова автоматически с помощью Puppet. Для целей этой статьи я создал новый экземпляр Public Cloud с Ubuntu 18.04, один дополнительный диск и уже настроенное мостовое устройство br0. Предположим, есть также DHCP-сервер, прослушивающий интерфейс br0.

Стоит отметить, что обычно вам не нужно создавать собственное изображение, вы можете просто использовать вышестоящие со встроенными командами. Но для целей этой статьи давайте создадим собственный образ, который будет в точности похож на исходный. Чтобы создать такой образ, вам просто нужно ввести несколько команд, чтобы перепаковать исходный образ в Unified Tarball.

root @ ubuntu: ~ # wget https://cloud-images.ubuntu.com/bionic/current/bionic-server-cloudimg-amd64-root.tar.xz 
root @ ubuntu: ~ # wget https: // cloud-images .ubuntu.com / bionic / current / bionic-server-cloudimg-amd64-lxd.tar.xz 
root @ ubuntu: ~ # mkdir -p ubuntu1804 / rootfs 
root @ ubuntu: ~ # tar -xf bionic-server-cloudimg-amd64 -lxd.tar.xz -C ubuntu1804 / 
root @ ubuntu: ~ # tar -xf bionic-server-cloudimg-amd64-root.tar.xz -C ubuntu1804 / rootfs / 
root @ ubuntu: ~ # cd ubuntu1804 / 
root @ ubuntu : ~ / ubuntu1804 # tar -czf ../ubuntu1804.tar.gz *


В конце вы получите образ ubuntu1804.tar.gz, который можно использовать с LXD. Для целей этой статьи я поместил это изображение в каталог, доступный через HTTP, например: example.net/lxd-image/

Ручная настройка

Первым делом давайте установим LXD.

root @ ubuntu: ~ # apt install lxd

Во время установки пакета вы увидите сообщение: «Чтобы пройти начальную конфигурацию LXD, запустите: lxd init», но мы просто сделаем все шаги вручную.

Следующим шагом будет добавление нового пула хранения.

root @ ubuntu: ~ # lxc storage create default dir source = / var / lib / lxd / storage-pools / 
default Создание пула хранения по умолчанию


Затем создайте собственный профиль, который будет иметь: переменную окружения http_proxy, установленную на ”, ограничение памяти 2 ГБ, крыши в пуле хранения по умолчанию и eth0, который будет частью моста br0.

root@ubuntu:~# lxc profile create customprofile
Profile customprofile created
root@ubuntu:~# lxc profile device add customprofile root disk path=/ pool=default
Device root added to customprofile
root@ubuntu:~# lxc profile device add customprofile eth0 nic nictype=bridged parent=br0
Device eth0 added to customprofile
root@ubuntu:~# lxc profile set customprofile limits.memory 2GB


Распечатываем весь профиль, чтобы проверить, все ли в порядке:

root@ubuntu:~# lxc profile show customprofile
config:
  environment.http_proxy: ""
  limits.memory: 2GB
description: ""
devices:
  eth0:
    nictype: bridged
    parent: br0
    type: nic
  root:
    path: /
    pool: default
    type: disk
name: customprofile
used_by: []


Затем давайте загрузим изображение LXD в формате Unified Tarball:

root@ubuntu:~# wget -O /tmp/ubuntu1804.tar.gz http://example.net/lxd-images/ubuntu1804.tar.gz


И импортируйте его:

root@ubuntu:~# lxc image import /tmp/ubuntu1804.tar.gz --alias ubuntu1804
Image imported with fingerprint: dc6f4c678e68cfd4d166afbaddf5287b65d2327659a6d51264ee05774c819e70


Когда все будет готово, давайте создадим наш первый контейнер:

root@ubuntu:~# lxc init ubuntu1804 container01 --profile customprofile
Creating container01


Теперь давайте добавим несколько каталогов хоста в контейнер:
обратите внимание, что вы должны установить правильного владельца каталога на хосте!

root@ubuntu:~# mkdir /srv/log01
root@ubuntu:~# lxc config device add container01 log disk source=/srv/log01 path=/var/log/


И в качестве последнего штриха добавьте в контейнер раздел хоста:

root@ubuntu:~# lxc config device add container01 bluestore unix-block source=/dev/sdb1 path=/dev/bluestore


/ dev / sdb1 будет доступен внутри контейнера. Мы используем его для передачи устройств для Ceph's Bluestore в контейнер.

Контейнер готов к запуску.

root@ubuntu:~# lxc start container01


Вуаля! Контейнер запущен и работает. Мы настраиваем наши контейнеры так же, как указано выше.

Хотя это было довольно просто настроить вручную. Для массового развертывания вам необходимо автоматизировать вещи. Итак, теперь давайте сделаем это, используя наш модуль LXD Puppet.

Автоматическая установка с Puppet

Чтобы использовать модуль, загрузите его на свой марионеточный сервер и поместите в путь к модулю.

Затем создайте новый класс или добавьте его к одному из существующих классов; все, что вам подходит.



Я подключил его к своему марионеточному серверу. Обратите внимание, что я использую мостовое устройство br0, которое было подготовлено ранее другими модулями, и что изображения LXD размещаются на веб-сервере example.net/lxd-images/ в виде унифицированных тарболов.

Полный пример модуля, использующего модуль LXD Puppet:

class mymodule { 
 
    class {':: lxd':} 
 
    lxd :: storage {'default': 
        driver => 'dir', 
        config => { 
            'source' => '/ var / lib / lxd / storage-pools / default ' 
        } 
    } 
 
    lxd :: profile {' exampleprofile ': sure 
        =>' present ', 
        config => { 
            ' environment.http_proxy '=>' ', 
            ' limits.memory '=>' 2GB ', 
        }, 
        devices => { 
            'root' => { 
                'path' => '/', 
                'pool' => 'default',
                'type' => 'disk', 
            }, 
            'eth0' => {
                'nictype' => 'bridged', 
                'parent' => 'br0', 
                'type' => 'nic', 
            } 
        } 
    } 
 
    lxd :: image {'ubuntu1804': sure 
        => 'present', 
        repo_url => ' http://example.net/lxd-images/ ', 
        image_file =>' ubuntu1804.tar.gz ', 
        image_alias =>' ubuntu1804 ', 
    } 
 
    lxd :: container {' container01 ': 
        state =>' start ', 
        config => { 
            'user.somecustomconfig' => 'Моя потрясающая пользовательская переменная env',
        }, 
        profiles => ['exampleprofile'], 
        image => 'ubuntu1804',
        устройства => { 
            'log' => { 
                'path' => '/ var / log /', 
                'source' => '/ srv / log01', 
                'type' => 'disk', 
            }, 
            'bluestore' = > { 
                'путь' => '/ dev / bluestore', 
                'source' => '/ dev / sdb1', 
                'type' => 'unix-block', 
            } 
        } 
    } 
}


Теперь осталось только запустить марионеточный агент на машине. Он применит желаемое состояние:

root@ubuntu:~# puppet agent -t
Info: Using configured environment 'production'
Info: Retrieving pluginfacts
Info: Retrieving plugin
Info: Retrieving locales
Info: Loading facts
Info: Caching catalog for ubuntu.openstacklocal
Info: Applying configuration version '1588767214'
Notice: /Stage[main]/Lxd::Install/Package[lxd]/ensure: created
Notice: /Stage[main]/Lxd::Config/Lxd_config[global_images.auto_update_interval]/ensure: created
Notice: /Stage[main]/Mymodule/Lxd::Storage[default]/Lxd_storage[default]/ensure: created
Notice: /Stage[main]/Mymodule/Lxd::Profile[exampleprofile]/Lxd_profile[exampleprofile]/ensure: created
Notice: /Stage[main]/Mymodule/Lxd::Image[ubuntu1804]/Exec[lxd image present http://example.net/lxd-images//ubuntu1804.tar.gz]/returns: executed successfully
Notice: /Stage[main]/Mymodule/Lxd::Container[container01]/Lxd_container[container01]/ensure: created
Notice: Applied catalog in 37.56 seconds


В конце концов, у вас будет запущен новый контейнер:

root@ubuntu:~# lxc ls
+-------------+---------+--------------------+------+------------+-----------+
|    NAME     |  STATE  |        IPV4        | IPV6 |    TYPE    | SNAPSHOTS |
+-------------+---------+--------------------+------+------------+-----------+
| container01 | RUNNING | 192.168.0.5 (eth0) |      | PERSISTENT | 0         |
+-------------+---------+--------------------+------+------------+-----------+


Поскольку вы можете выставлять пользовательские переменные среды в контейнере, это открывает множество возможностей для настройки новых контейнеров.



Как это хорошо !?

Я призываю всех внести свой вклад в модуль или поставить ему звезду на GitHub, если вы сочтете его полезным.

Планы на будущее

После всестороннего тестирования мы уверены, что все работает, как задумано, и были уверены, что сможем приступить к разработке нового решения с хранилищем Ceph на всех флеш-хранилищах, без жестких дисков.

В будущем мы планируем перенести всю нашу устаревшую инфраструктуру на новое решение на основе LXD. Это будет гигантский проект для миграции: более 50 ПБ размещается на более чем 2000 выделенных серверах, но это уже история для другого раза.

Делаем БОЛЬШУЮ автоматизацию с помощью Celery

Вступление

TL; DR: вы можете пропустить вступление и сразу перейти к «Celery — Distributed Task Queue».



Здравствуйте! Меня зовут Бартош Рабьега, я работаю в команде R & D / DevOps в OVHcloud. В рамках нашей повседневной работы мы разрабатываем и поддерживаем проект Ceph-as-a-Service, чтобы обеспечить надежное распределенное хранилище высокой доступности для различных приложений. Мы имеем дело с более чем 60 ПБ данных в 10 регионах, поэтому, как вы можете себе представить, у нас впереди довольно много работы с точки зрения замены вышедшего из строя оборудования, обработки естественного роста, предоставления новых регионов и центров обработки данных, оценки нового оборудования, оптимизация конфигураций программного и аппаратного обеспечения, поиск новых решений для хранения данных и многое другое!



Из-за большого объема нашей работы нам нужно разгрузить как можно больше повторяющихся задач. И мы делаем это с помощью автоматизации.

Автоматизация вашей работы

В некоторой степени каждый ручной процесс можно описать как набор действий и условий. Если бы нам каким-то образом удалось заставить что-то автоматически выполнять действия и проверять условия, мы могли бы автоматизировать процесс, что привело бы к автоматизированному рабочему процессу. Взгляните на пример ниже, в котором показаны некоторые общие шаги по ручной замене оборудования в нашем проекте.



Хм… Что может помочь нам сделать это автоматически? Разве компьютер не кажется идеальным?

Сельдерей — распределенная очередь задач

Celery — это хорошо известное и широко используемое программное обеспечение, которое позволяет нам обрабатывать задачи асинхронно. Описание проекта на его главной странице ( www.celeryproject.org/ ) может показаться немного загадочным, но мы можем сузить его базовую функциональность до примерно следующего:



Такой механизм идеально подходит для таких задач, как асинхронная отправка электронных писем (например, «запустил и забыл»), но его также можно использовать для различных целей. Так с какими еще задачами он справился? В принципе, любые задачи можно реализовать на Python (основном языке Celery)! Я не буду вдаваться в подробности, поскольку они доступны в документации по Celery. Важно то, что, поскольку мы можем реализовать любую задачу, которую захотим, мы можем использовать ее для создания строительных блоков для нашей автоматизации.

Есть еще одна важная вещь… Celery изначально поддерживает объединение таких задач в рабочие процессы (примитивы Celery: цепочки, группы, аккорды и т. Д.). Итак, давайте рассмотрим несколько примеров…

Мы будем использовать следующие определения задач — одиночная задача, печать аргументов и kwargs:

@celery_app.task
def noop(*args, **kwargs):
    # Task accepts any arguments and does nothing
    print(args, kwargs)
    return True


Теперь мы можем выполнить задачу асинхронно, используя следующий код:

task = noop.s(777)
task.apply_async()


Элементарные задачи можно параметризовать и объединить в сложный рабочий процесс с использованием методов сельдерея, то есть «цепочки», «группы» и «хорды». См. Примеры ниже. В каждом из них левая сторона показывает визуальное представление рабочего процесса, а правая — фрагмент кода, который его генерирует. Зеленая рамка — это отправная точка, после которой выполнение рабочего процесса продвигается по вертикали.

Цепочка — набор задач, обрабатываемых последовательно



workflow = (
    chain([noop.s(i) for i in range(3)])
)


Группа — набор параллельно обрабатываемых задач.



workflow = (
    group([noop.s(i) for i in range(5)])
)


Аккорд — группа задач, связанных со следующей задачей



workflow = chord(
        [noop.s(i) for i in range(5)],
        noop.s(i)
)

# Equivalent:
workflow = chain([
        group([noop.s(i) for i in range(5)]),
        noop.s(i)
])


Важный момент: выполнение рабочего процесса всегда останавливается в случае сбоя задачи. В результате цепочка не будет продолжена, если какая-то задача не удастся в ее середине. Это дает нам довольно мощный фреймворк для реализации аккуратной автоматизации, и именно это мы используем для Ceph-as-a-Service в OVHcloud! Мы реализовали множество небольших, гибких, параметризуемых задач, которые мы объединяем для достижения общей цели. Вот несколько реальных примеров элементарных задач, используемых для автоматического удаления старого оборудования:

  • Изменить вес узла Ceph (используется для увеличения / уменьшения количества данных на узле. Запускает ребалансировку данных)
  • Установите время простоя службы (перебалансировка данных запускает зонды мониторинга, но это ожидается, поэтому установите время простоя для этой конкретной записи мониторинга)
  • Подождите, пока Ceph не станет здоровым (дождитесь завершения ребалансировки данных — повторяющаяся задача)
  • Удалите узел Ceph из кластера (узел пуст, поэтому его можно просто удалить)
  • Отправьте информацию техническим специалистам в DC (оборудование готово к замене)
  • Добавить новый узел Ceph в кластер (установить новый пустой узел)

Мы параметризируем эти задачи и связываем их вместе, используя цепочки, группы и аккорды сельдерея для создания желаемого рабочего процесса. Затем Celery сделает все остальное, асинхронно выполняя рабочий процесс.

Большие рабочие процессы и сельдерей

По мере роста нашей инфраструктуры растут и наши автоматизированные рабочие процессы, с увеличением количества задач на рабочий процесс, более высокой сложностью рабочих процессов… Что мы понимаем под большим рабочим процессом? Рабочий процесс, состоящий из 1000-10 000 задач. Чтобы визуализировать это, взгляните на следующие примеры:

Несколько аккордов, связанных вместе (всего 57 заданий)



workflow = chain([
    noop.s(0),
    chord([noop.s(i) for i in range(10)], noop.s()),
    chord([noop.s(i) for i in range(10)], noop.s()),
    chord([noop.s(i) for i in range(10)], noop.s()),
    chord([noop.s(i) for i in range(10)], noop.s()),
    chord([noop.s(i) for i in range(10)], noop.s()),
    noop.s()
])


Более сложная структура графа, построенная из цепочек и групп (всего 23 задачи)



# | is ‘chain’ operator in celery
workflow = (
    group(
        group(
            group([noop.s() for i in range(5)]),
            chain([noop.s() for i in range(5)])
        ) |
        noop.s() |
        group([noop.s() for i in range(5)]) |
        noop.s(),
        chain([noop.s() for i in range(5)])
    ) |
    noop.s()
)


Как вы, наверное, догадались, когда задействовано 1000 задач, визуализация становится довольно большой и беспорядочной! Celery — мощный инструмент, обладающий множеством функций, которые хорошо подходят для автоматизации, но он по-прежнему испытывает трудности, когда дело доходит до обработки больших, сложных и длительных рабочих процессов. Организовать выполнение 10 000 задач с различными зависимостями — нетривиальная вещь. Когда наша автоматизация стала слишком большой, мы столкнулись с несколькими проблемами:

  • Проблемы с памятью во время построения рабочего процесса (на стороне клиента)
  • Проблемы с сериализацией (клиент -> бэкэнд-передача сельдерея)
  • Недетерминированное, прерывистое выполнение рабочих процессов
  • Проблемы с памятью у рабочих Celery (серверная часть Celery)
  • Исчезающие задачи
  • И более…

Взгляните на GitHub:


Использование сельдерея для нашего конкретного случая стало трудным и ненадежным. Встроенная поддержка рабочих процессов в Celery не кажется правильным выбором для обработки 100/1000/10 000 задач. В нынешнем состоянии этого просто недостаточно. Итак, вот мы стоим перед прочной бетонной стеной… Либо мы как-то исправляем Celery, либо переписываем нашу автоматизацию, используя другую структуру.

Сельдерей — исправить… или исправить?

Переписать всю нашу автоматизацию можно, хотя и относительно болезненно. Поскольку я довольно ленив, возможно, попытка исправить Celery была не совсем плохой идеей? Так что я потратил некоторое время, чтобы покопаться в коде Celery, и мне удалось найти части, отвечающие за построение рабочих процессов и выполнение цепочек и аккордов. Мне все еще было немного сложно понять все различные пути кода, обрабатывающие широкий спектр вариантов использования, но я понял, что можно реализовать чистую, прямую оркестровку, которая будет обрабатывать все задачи и их комбинации в одном путь. Более того, я понял, что интегрировать его в нашу автоматизацию не потребует слишком больших усилий (давайте не будем забывать о главной цели!).

К сожалению, внедрение новой оркестровки в проект Celery, вероятно, будет довольно сложным и, скорее всего, нарушит некоторую обратную совместимость. Поэтому я решил использовать другой подход — написать расширение или плагин, которые не требовали бы изменений в Celery. Что-то подключаемое и максимально неинвазивное. Так появился Celery Dyrygent…

Сельдерей Dyrygent

github.com/ovh/celery-dyrygent

Как представить рабочий процесс

Вы можете представить себе рабочий процесс как направленный ациклический граф (DAG), где каждая задача является отдельным узлом графа. Когда дело доходит до ациклических графов, относительно легко хранить и разрешать зависимости между узлами, что приводит к простой оркестровке. Celery Dyrygent был реализован на основе этих функций. Каждая задача в рабочем процессе имеет уникальный идентификатор (Celery уже назначает идентификаторы задач, когда задача отправляется на выполнение), и каждая из них заключена в узел рабочего процесса. Каждый узел рабочего процесса состоит из сигнатуры задачи (простой сигнатуры сельдерея) и списка идентификаторов задач, от которых он зависит. См. Пример ниже:



Как обработать рабочий процесс

Итак, мы знаем, как сохранить рабочий процесс простым и понятным способом. Теперь нам просто нужно его выполнить. Как насчет… сельдерея? Почему бы нет? Для этого Celery Dyrygent вводит задачу процессора рабочего процесса (обычная задача Celery ). Эта задача охватывает весь рабочий процесс и планирует выполнение примитивных задач в соответствии с их зависимостями. После того, как часть планирования завершена, задача повторяется («тикает» с некоторой задержкой).

На протяжении всего цикла обработки процессор рабочих процессов сохраняет внутреннее состояние всего рабочего процесса. В результате он обновляет состояние при каждом повторении. Вы можете увидеть пример оркестровки ниже:







В частности, процессор рабочего процесса останавливает свое выполнение в двух случаях:

  • После завершения всего рабочего процесса и успешного выполнения всех задач
  • Когда он не может продолжить работу из-за сбоя задачи

Как интегрировать

Итак, как нам это использовать? К счастью, мне довольно легко удалось найти способ использовать Celery Dyrygent. Прежде всего, вам необходимо внедрить определение задачи процессора рабочего процесса в ваше приложение CeleryP:

from celery_dyrygent.tasks import register_workflow_processor
app = Celery() #  your celery application instance
workflow_processor = register_workflow_processor(app)


Затем вам нужно преобразовать рабочий процесс, определенный для Celery, в рабочий процесс Celery Dyrygent:

from celery_dyrygent.workflows import Workflow

celery_workflow = chain([
    noop.s(0),
    chord([noop.s(i) for i in range(10)], noop.s()),
    chord([noop.s(i) for i in range(10)], noop.s()),
    chord([noop.s(i) for i in range(10)], noop.s()),
    chord([noop.s(i) for i in range(10)], noop.s()),
    chord([noop.s(i) for i in range(10)], noop.s()),
    noop.s()
])

workflow = Workflow()
workflow.add_celery_canvas(celery_workflow)


Наконец, просто выполните рабочий процесс, как обычную задачу Celery:

workflow.apply_async()


Это оно! Вы всегда можете вернуться, если хотите, так как небольшие изменения очень легко отменить.

Попробуйте!

Celery Dyrygent можно использовать бесплатно, а его исходный код доступен на Github ( github.com/ovh/celery-dyrygent ). Не стесняйтесь использовать его, улучшать, запрашивать функции и сообщать о любых ошибках! У него есть несколько дополнительных функций, не описанных здесь, поэтому я рекомендую вам взглянуть на файл readme проекта. Для наших требований к автоматизации это уже надежное, проверенное на практике решение. Мы используем его с конца 2018 года, и он обработал тысячи рабочих процессов, состоящих из сотен тысяч задач. Вот некоторая статистика производства за период с июня 2019 года по февраль 2020 года:

  • Выполнено 936 248 элементарных задач
  • Обработано 11 170 рабочих процессов
  • 4098 задач в самом большом рабочем процессе
  • ~ 84 задачи на рабочий процесс, в среднем

Автоматизация — это всегда хорошая идея!