Неожиданный поиск бизнес-аналитики
Бизнес-аналитика (BI) — это способность собирать существенные данные из информационной системы для подачи в хранилище данных (DWH) или озеро данных. Обычно они предоставляют копию данных, которые будут использоваться для приложений бизнес-аналитики. Для подпитки DWH могут применяться разные стратегии. Одна из таких стратегий - Change Data Capture (CDC), которая представляет собой способность фиксировать изменяющиеся состояния из базы данных и преобразовывать их в события, которые можно использовать для других целей. Большинство баз данных предназначены для целей OLTP и хорошо для этого разработаны. Тем не менее, разные варианты использования потребуют одних и тех же данных с разными шаблонами доступа. Эти варианты использования (большие данные, ETL и потоковая обработка и др.) В основном подпадают под Баннер OLAP . Их смешение поставит под угрозу OLTP и производственную среду, поэтому нам нужно включить OLAP ненавязчивым образом.
OVH, как облачный провайдер, управляет многочисленными базами данных как для своих клиентов, так и для собственных нужд. Управление жизненным циклом базы данных всегда включает в себя как поддержание инфраструктуры в актуальном состоянии, так и синхронизацию с циклом разработки, чтобы согласовать программное обеспечение с его зависимостью от базы данных. Например, приложению может потребоваться MySQL 5.0, который затем может быть объявлен как EOL (End Of Life). В этом случае приложение необходимо изменить для поддержки (скажем) MySQL 5.5. Мы не изобретаем велосипед здесь — этим процессом уже несколько десятилетий управляют операторы и команды разработчиков.
Это усложняется, если у вас нет контроля над приложением. Например, представьте, что третья сторона предоставляет вам приложение для обеспечения зашифрованных транзакций. У вас нет абсолютно никакого контроля над этим приложением или связанной с ним базой данных. Тем не менее, вам по-прежнему нужны данные из базы данных.
В этом сообщении в блоге описан аналогичный пример, с которым мы столкнулись при создании озера данных OVH с помощью собственной разработки CDC. Эта история происходит в начале 2015 года, хотя я все же думаю, что ею стоит поделиться.
Обычно перед тем, как приступить к разработке, рекомендуется определить состояние технологии, поскольку это сэкономит время и укрепит сообщества. Еще в начале 2015 года, когда впервые появился ландшафт CDC ( Debezium , аналогичное решение с открытым исходным кодом, появилось только в конце того же года), единственное существующее решение — Databus — появилось от LinkedIn. Архитектура Databus была довольно сложной, и проект не был очень активным. Кроме того, это не решало наших требований к безопасности, и мы пришли из сильной культуры эксплуатации, поэтому запуск JVM на сервере базы данных был явно непозволительным для наших рабочих групп.
Хотя не существовало программного обеспечения CDC, соответствующего нашим требованиям, в конце концов мы нашли библиотеку репликации binlog, которую можно было бы интегрировать с некоторыми из них в Go. Binlog — это имя MySQL для базы данных WAL.
Наши требования были довольно простыми:
Вот глобальный дизайн проекта Menura:
Менура — это род лирохвостов : птицы, способные воспроизводить любой звук. Большинство BI связанных компонентов являются «птицы», так как они часть B usiness I ntelligence R & D проекта!
Поскольку Menura была развернута на серверах баз данных, она могла отображать доступные базы данных и таблицы в плоскости управления BI , чтобы пользователь мог запросить синхронизацию с заданной таблицей. Протокол управления имел несколько простых задач:
В то время gRPC только зарождался, но мы увидели в этом проекте, с его прочной основой, возможность примирить Protobuf, экономичность и языковое разнообразие. Кроме того, возможность настройки RPC с двунаправленной потоковой передачей была интересна с точки зрения реализации соединений клиент-сервер с RPC-серверами, поэтому мы сделали ее основой протокола управления.
gRPC использует буфер протокола как IDL для сериализации структурированных данных. Каждый StreamRequest состоит из заголовка для управления мультитенантностью. Это означает, что если бы наши клиенты решили назвать свои источники одним и тем же именем, мы могли бы выделить управляющие сообщения по арендатору, а не только по источнику. Поэтому мы находим RequestType, как определено в Protobuf v3:
Этот RequestType позволил нам получить исходные плагины со специализированными структурами, которые они ожидают. Обратите внимание, что мы отделили клиентов БД от репликации БД (binlog, WAL…). Основная причина в том, что они не имеют одной и той же области видимости, поэтому библиотеки не совпадают. Поэтому имело смысл держать их отдельно.
Другая причина заключается в том, что репликация действует как подчиненное устройство для базы данных, что означает, что процесс репликации не занимает значительного места, в то время как клиент, сбрасывающий базу данных, может подразумевать блокировку строк или таблиц, учитывая базу данных и ее обрабатывающий механизм. Это могло привести к тому, что у нас будет два разных ведомых устройства, или репликация будет подключена к ведущему устройству, а клиент — к ведомому.
Эти опасения подтолкнули нас к модульной конструкции агента Menura:
Фильтрация ближе всего к базе данных, безусловно, была лучшим выбором, особенно для наших клиентов, поскольку они могли добавлять или проверять фильтры сами. Для этого мы определили политику фильтрации, которая имитирует политики IPTables с принятием или удалением значений по умолчанию. Затем исходный фильтр описал, как таблицы и столбцы будут вести себя в зависимости от политики:
Падение политик упадет что - нибудь по умолчанию, за исключением таблиц явно перечисленных в фильтре, в то время как принимать политик удалять таблицы , перечисленные в качестве пустой в фильтре, за исключением таблиц , которые имеют ignored_columns ключ, чтобы фильтр только столбцы с их именами.
В некоторых случаях вы можете подтвердить, что обрабатываете аналитическое задание на тех же данных, из которых состоит настоящая база данных. Например, для расчета выручки за определенный период требуются достоверные данные от даты до даты. Проверка состояния репликации между базой данных и озером данных была сложной задачей. На самом деле проверки целостности не реализуются с той же логикой в базах данных или хранилищах, поэтому нам нужен был способ абстрагировать их от собственной реализации. Мы подумали об использовании структуры данных Merkle Tree , чтобы мы могли поддерживать дерево целостности с блоками или строками. Если ключ / значение отличается от базы данных, то это будет отражать глобальный или промежуточный хэш целостности, и нам нужно будет сканировать только листовой блок, который имеет несогласованный хэш между обеими системами.
Как мы уже говорили во введении, CDC настроен на преобразование изменений базы данных в обрабатываемые события. Цель здесь — эффективно и действенно удовлетворять любые бизнес-потребности, требующие данных. Вот два примера того, что мы сделали с данными, которые теперь были доступны в виде событий…
Пока мы строили озеро данных из реплицированных таблиц, и поскольку этот проект был в основном для целей бизнес-аналитики, мы рассмотрели возможность добавления некоторых аналитических данных в реальном времени на основе той же логики, которую мы используем с пакетными заданиями и заданиями Apache Pig . С 2015 года самой продвинутой структурой обработки потоков является Apache Flink , которую мы использовали для обработки соединений в реальном времени между различными базами данных и таблицами.
Алексис проделал потрясающую работу, описав процесс соединения, который мы внедрили в Apache Flink, так что в дополнение к репликации баз данных мы также создали новую агрегированную таблицу. В самом деле, мы могли бы написать целую запись в блоге только по этой теме ...
Мы выбрали Apache Flink по нескольким причинам:
Теперь у нас есть таблица реального времени, загруженная в Apache HBase , и нам нужно было добавить к ней возможность запросов. Хотя HBase был хорош с точки зрения хранения, он не предоставлял никаких возможностей поиска, а его схема доступа не была идеальной для сканирования по критерию поиска.
Здесь Гийом сотворил чудо! Повторно используя Lily , индексатор HBase, предоставляющий концепцию SEP (обработчик побочных эффектов), ему удалось повторно внедрить схему агрегированной таблицы в Lily, чтобы построить отображение типов данных, необходимое для чтения значений байтовых массивов HBase, перед их индексированием в Solr. Теперь у нас была панель мониторинга в реальном времени, представляющая собой агрегированную объединенную таблицу в реальном времени, обработанную на основе сбора данных об изменении в реальном времени. Бум!
Именно тогда мы начали привлекать реальных клиентов к нашему новому решению.
Если по-прежнему существует необходимость продемонстрировать, что тестирование в среде моделирования — это не то же самое, что тестирование в производственной среде, эта следующая часть, вероятно, разрешит спор…
После настройки конвейера данных мы обнаружили несколько ошибок в производственной среде. Вот два из них:
Согласно определению MySQL, структура события состоит из заголовка и поля данных.
В репликации на основе строк (RBR), в отличие от репликации на основе операторов (SBR), событие каждой строки реплицируется с соответствующими данными. Операторы DML разделены на две части:
Первое событие, TABLE_MAP_EVENTописывает метаданные содержимого второго события. Эти метаданные содержат:
Второе событие WRITE_ROWS_EVENT(для вставок) содержит данные. Чтобы декодировать его, вам нужно, чтобы предыдущее TABLE_MAP_EVENTсобытие знало, как использовать это событие, сопоставляя соответствующее MYSQL_TYPE_* и считывая количество байтов, ожидаемых для каждого типа.
Иногда некоторые события не обрабатывались должным образом, поскольку несогласованность между метаданными и данными приводила к VARCHARдекодированию DATETIMEзначений как значений и т. Д.
После некоторой отладки выяснилось, что команда DBA добавила триггеры в некоторые таблицы MySQL. Когда несколько дней спустя реплики были перестроены, они унаследовали эти функции и начали регистрировать события, производимые этими триггерами.
Дело в том, что в MySQL триггеры являются внутренними. В binlog каждое событие, исходящее от мастера, отправляется следующим образом:
TableMapEvent_a
a, bи cпредставляет события для разных таблиц schema.tables.
Поскольку триггеры не поступают от ведущего устройства, когда ведомое устройство получает a TableMapEventдля определенной таблицы, оно запускает другой TableMapEvent для специализированной таблицы (_event). То же самое относится и к WriteEvent.
OVH, как облачный провайдер, управляет многочисленными базами данных как для своих клиентов, так и для собственных нужд. Управление жизненным циклом базы данных всегда включает в себя как поддержание инфраструктуры в актуальном состоянии, так и синхронизацию с циклом разработки, чтобы согласовать программное обеспечение с его зависимостью от базы данных. Например, приложению может потребоваться MySQL 5.0, который затем может быть объявлен как EOL (End Of Life). В этом случае приложение необходимо изменить для поддержки (скажем) MySQL 5.5. Мы не изобретаем велосипед здесь — этим процессом уже несколько десятилетий управляют операторы и команды разработчиков.
Это усложняется, если у вас нет контроля над приложением. Например, представьте, что третья сторона предоставляет вам приложение для обеспечения зашифрованных транзакций. У вас нет абсолютно никакого контроля над этим приложением или связанной с ним базой данных. Тем не менее, вам по-прежнему нужны данные из базы данных.
В этом сообщении в блоге описан аналогичный пример, с которым мы столкнулись при создании озера данных OVH с помощью собственной разработки CDC. Эта история происходит в начале 2015 года, хотя я все же думаю, что ею стоит поделиться.
Разработка ненавязчивого процесса сбора измененных данных
Обычно перед тем, как приступить к разработке, рекомендуется определить состояние технологии, поскольку это сэкономит время и укрепит сообщества. Еще в начале 2015 года, когда впервые появился ландшафт CDC ( Debezium , аналогичное решение с открытым исходным кодом, появилось только в конце того же года), единственное существующее решение — Databus — появилось от LinkedIn. Архитектура Databus была довольно сложной, и проект не был очень активным. Кроме того, это не решало наших требований к безопасности, и мы пришли из сильной культуры эксплуатации, поэтому запуск JVM на сервере базы данных был явно непозволительным для наших рабочих групп.
Хотя не существовало программного обеспечения CDC, соответствующего нашим требованиям, в конце концов мы нашли библиотеку репликации binlog, которую можно было бы интегрировать с некоторыми из них в Go. Binlog — это имя MySQL для базы данных WAL.
Наши требования были довольно простыми:
- Избегайте решений на основе JVM (JVM и контейнеры в то время не работали должным образом, и было бы трудно получить поддержку от Ops)
- Агент CDC, необходимый для подключения к шлюзу CDC для высокозащищенных сред (а не шлюзу для агентов)
- Шлюз CDC может контролировать флот своих агентов
- Агент CDC может фильтровать и сериализовать события, чтобы протолкнуть их с помощью контроля обратного давления.
- Агент CDC может сбросить БД, чтобы получить первый снимок, поскольку бинлоги не всегда доступны с самого начала.
Вот глобальный дизайн проекта Menura:
Менура — это род лирохвостов : птицы, способные воспроизводить любой звук. Большинство BI связанных компонентов являются «птицы», так как они часть B usiness I ntelligence R & D проекта!
Автоматизация плоскости управления BI
Поскольку Menura была развернута на серверах баз данных, она могла отображать доступные базы данных и таблицы в плоскости управления BI , чтобы пользователь мог запросить синхронизацию с заданной таблицей. Протокол управления имел несколько простых задач:
- Добавить и настроить источник базы данных
- Управление удаленной конфигурацией
- Картографический агент / картография таблиц
- Дамп таблиц базы данных
- Управление CDC (запуск / остановка синхронизации, фиксация смещений бинлогов…)
В то время gRPC только зарождался, но мы увидели в этом проекте, с его прочной основой, возможность примирить Protobuf, экономичность и языковое разнообразие. Кроме того, возможность настройки RPC с двунаправленной потоковой передачей была интересна с точки зрения реализации соединений клиент-сервер с RPC-серверами, поэтому мы сделали ее основой протокола управления.
gRPC использует буфер протокола как IDL для сериализации структурированных данных. Каждый StreamRequest состоит из заголовка для управления мультитенантностью. Это означает, что если бы наши клиенты решили назвать свои источники одним и тем же именем, мы могли бы выделить управляющие сообщения по арендатору, а не только по источнику. Поэтому мы находим RequestType, как определено в Protobuf v3:
enum RequestType {
Control_Config = 0;
Control_Hearbeat = 1;
Control_MySQLClient = 10;
Control_MySQLBinlog = 11;
Control_Syslog = 12;
Control_PgSQLClient = 13;
Control_PgSQLWal = 14;
Control_PgSQLLogDec = 15;
Control_Kafka = 16;
Control_MSSQLClient = 17;
}
Этот RequestType позволил нам получить исходные плагины со специализированными структурами, которые они ожидают. Обратите внимание, что мы отделили клиентов БД от репликации БД (binlog, WAL…). Основная причина в том, что они не имеют одной и той же области видимости, поэтому библиотеки не совпадают. Поэтому имело смысл держать их отдельно.
Другая причина заключается в том, что репликация действует как подчиненное устройство для базы данных, что означает, что процесс репликации не занимает значительного места, в то время как клиент, сбрасывающий базу данных, может подразумевать блокировку строк или таблиц, учитывая базу данных и ее обрабатывающий механизм. Это могло привести к тому, что у нас будет два разных ведомых устройства, или репликация будет подключена к ведущему устройству, а клиент — к ведомому.
Эти опасения подтолкнули нас к модульной конструкции агента Menura:
Фильтрация данных
Важной функцией была возможность фильтровать события или столбцы. На то было две причины:- Мы столкнулись с базами данных с таким количеством таблиц или столбцов, что нам потребовалось немного снизить уровень шума.
- Нам не обязательно нужно было получать определенные данные из базы данных
Фильтрация ближе всего к базе данных, безусловно, была лучшим выбором, особенно для наших клиентов, поскольку они могли добавлять или проверять фильтры сами. Для этого мы определили политику фильтрации, которая имитирует политики IPTables с принятием или удалением значений по умолчанию. Затем исходный фильтр описал, как таблицы и столбцы будут вести себя в зависимости от политики:
filter_policy: accept/drop
filter:
table_a:
table_b:
ignored_columns:
— sensibleColumn
Падение политик упадет что - нибудь по умолчанию, за исключением таблиц явно перечисленных в фильтре, в то время как принимать политик удалять таблицы , перечисленные в качестве пустой в фильтре, за исключением таблиц , которые имеют ignored_columns ключ, чтобы фильтр только столбцы с их именами.
Валидации в гетерогенных системах
В некоторых случаях вы можете подтвердить, что обрабатываете аналитическое задание на тех же данных, из которых состоит настоящая база данных. Например, для расчета выручки за определенный период требуются достоверные данные от даты до даты. Проверка состояния репликации между базой данных и озером данных была сложной задачей. На самом деле проверки целостности не реализуются с той же логикой в базах данных или хранилищах, поэтому нам нужен был способ абстрагировать их от собственной реализации. Мы подумали об использовании структуры данных Merkle Tree , чтобы мы могли поддерживать дерево целостности с блоками или строками. Если ключ / значение отличается от базы данных, то это будет отражать глобальный или промежуточный хэш целостности, и нам нужно будет сканировать только листовой блок, который имеет несогласованный хэш между обеими системами.
Соберем все вместе
Как мы уже говорили во введении, CDC настроен на преобразование изменений базы данных в обрабатываемые события. Цель здесь — эффективно и действенно удовлетворять любые бизнес-потребности, требующие данных. Вот два примера того, что мы сделали с данными, которые теперь были доступны в виде событий…
Соединения между базами данных в реальном времени
Пока мы строили озеро данных из реплицированных таблиц, и поскольку этот проект был в основном для целей бизнес-аналитики, мы рассмотрели возможность добавления некоторых аналитических данных в реальном времени на основе той же логики, которую мы используем с пакетными заданиями и заданиями Apache Pig . С 2015 года самой продвинутой структурой обработки потоков является Apache Flink , которую мы использовали для обработки соединений в реальном времени между различными базами данных и таблицами.
Алексис проделал потрясающую работу, описав процесс соединения, который мы внедрили в Apache Flink, так что в дополнение к репликации баз данных мы также создали новую агрегированную таблицу. В самом деле, мы могли бы написать целую запись в блоге только по этой теме ...
Мы выбрали Apache Flink по нескольким причинам:
- Его документация была восхитительной
- Его основной движок был великолепен и намного превосходил Apache Spark (проекта Tungsten там даже не было).
- Это был европейский проект , поэтому мы были близки к редактору и его сообществу.
Индексирование в реальном времени
Теперь у нас есть таблица реального времени, загруженная в Apache HBase , и нам нужно было добавить к ней возможность запросов. Хотя HBase был хорош с точки зрения хранения, он не предоставлял никаких возможностей поиска, а его схема доступа не была идеальной для сканирования по критерию поиска.
Здесь Гийом сотворил чудо! Повторно используя Lily , индексатор HBase, предоставляющий концепцию SEP (обработчик побочных эффектов), ему удалось повторно внедрить схему агрегированной таблицы в Lily, чтобы построить отображение типов данных, необходимое для чтения значений байтовых массивов HBase, перед их индексированием в Solr. Теперь у нас была панель мониторинга в реальном времени, представляющая собой агрегированную объединенную таблицу в реальном времени, обработанную на основе сбора данных об изменении в реальном времени. Бум!
Именно тогда мы начали привлекать реальных клиентов к нашему новому решению.
Жить!
Если по-прежнему существует необходимость продемонстрировать, что тестирование в среде моделирования — это не то же самое, что тестирование в производственной среде, эта следующая часть, вероятно, разрешит спор…
После настройки конвейера данных мы обнаружили несколько ошибок в производственной среде. Вот два из них:
Чередующиеся события
Согласно определению MySQL, структура события состоит из заголовка и поля данных.
В репликации на основе строк (RBR), в отличие от репликации на основе операторов (SBR), событие каждой строки реплицируется с соответствующими данными. Операторы DML разделены на две части:
- А TABLE_MAP_EVENT
- A ROWS_EVENT(может быть WRITE, UPDATEили DELETE)
Первое событие, TABLE_MAP_EVENTописывает метаданные содержимого второго события. Эти метаданные содержат:
- Включенные поля
- Растровые изображения с нулевыми значениями
- Схема предстоящих данных
- Метаданные для предоставленной схемы
Второе событие WRITE_ROWS_EVENT(для вставок) содержит данные. Чтобы декодировать его, вам нужно, чтобы предыдущее TABLE_MAP_EVENTсобытие знало, как использовать это событие, сопоставляя соответствующее MYSQL_TYPE_* и считывая количество байтов, ожидаемых для каждого типа.
Иногда некоторые события не обрабатывались должным образом, поскольку несогласованность между метаданными и данными приводила к VARCHARдекодированию DATETIMEзначений как значений и т. Д.
После некоторой отладки выяснилось, что команда DBA добавила триггеры в некоторые таблицы MySQL. Когда несколько дней спустя реплики были перестроены, они унаследовали эти функции и начали регистрировать события, производимые этими триггерами.
Дело в том, что в MySQL триггеры являются внутренними. В binlog каждое событие, исходящее от мастера, отправляется следующим образом:
TableMapEvent_a
TableMapEvent_a
WriteEvent_a
TableMapEvent_b
WriteEvent_b
TableMapEvent_c
WriteEvent_c
a, bи cпредставляет события для разных таблиц schema.tables.
Поскольку триггеры не поступают от ведущего устройства, когда ведомое устройство получает a TableMapEventдля определенной таблицы, оно запускает другой TableMapEvent для специализированной таблицы (