Pull to refresh

Comments 12

Разрешите два вопроса:
1. Используется ли Flink CEP в вашем проекте для определения паттернов в потоке событий?
2. Каким именно образом используется Redis в качестве хранилища стейтов в Flink? Дописывали поддержку в качестве state backend или через AsyncIO?
1) Flink CEP — никогда не использовали. Посмотрел, и даже не знаю где у нас применить.
2) Мы не стали делать свою реализацию state backend, так как для этого нужно научить её всему API что есть в Flink (savepoint, checkpoint, восстановление из них и многое прочее). Мы просто во всех наших RichFlatMapFunction используем Cache на основе JedisCluster. Это позволяет избавится от ненужных keyBy перед flatMap и не парится о всех контрольных точках.
Если мы упали, то весь кеш гарантировано на месте.
У Flink-а есть ряд проблем. Я не пользовался им кажется с версии 1.1 и возможно что-то уже изменилось, но тем не менее:

1. Flink очень нестабильный. Например он может работать неделю обрабатывая тысячи сообщений в секунду, потом неожиданно повиснуть так что приходится перезапускать все ноды. Самое плохое что эта нестабильность только увеличивается при выходе новых минорных версий.
1. Чуствителен к сетевым проблемам. Например если пропала связанность между нодами, то кластер может развалится и потом уже не собраться автоматически. Приходится лезь в логи, разбираться что сломалось, перезагружать ноды и flow.
1. Очень прожорлив до CPU. Переписав flow на пару приложений на Java удалось снизить использование CPU процентов на 50 если не больше. Думаю эта особенность может стать bottleneck-ом при частых пиковых нагрузках, либо потребуется больше серверов.
1. Flink не умеет динамически масштабироваться при изменений кластера. Например если одна нода упала, то он перезапускал flow на оставшихся (если мог конечно), а когда нода восстанавливалась, то она оставалась простаивать. Приходилось отслеживать это и вручную перезапускать flow.
1. При запуске flow Flink не умеет подстраивать его под размер кластера (возможно сейчас уже может т.к. эта фича была в планах). Например если нужно запускать вычисление на всех 4-х нодах кластера, то в flow нужно явно указать это. Появляется/удаляется нода — нужно менять код во flow.
1. Отвратительный API для деплоя flow. Если вы хотите задеплоить новую flow, то вам самим нужно сделать savepoint, самим сделать cancel flow (повезёт если он сработает), залить JAR c новым flow и надеятся что изменения которые в нем сделаны подхватстся из сделанного ранее savepoint-а. Мне пришлось написать ужасный скрипт для Gradle который всё это делал.
1. Для сохранения checkpoint-ов и savepoint-ов (если вы подняли кластер Apache Flink) используется HDFS протокол. Заставить работать его требует титанических усилий т.к. части адаптеров (например S3) нет в Flink и он берёт их из инсталяции Hadoop которая должна быть где-то рядом и совместима с Flink. S3 адаптер который используется в этом Hadoop старый и содержит ошибки которые при возникновении выносят весь кластер Apache Flink. Настроить его тоже не просто. Ещё сложнее сделать это если вы живете не в AWS т.к. S3 интерфейс есть только у Riak CS или Minio (с которым адаптер из Hadoop не совместим из-за некоторых особенностей реализации Minio). Всё это выглядит как полный пи**ец т.к. по факту вам всего лишь нужно положить файлик куда-то и скачать его. Я бы сделал это сам, но Flink не дает сделать это в обход HDFS.
1. Очень похоже что есть мемори лики при сохранений checkpoint т.к. его размер стабильно рос и с нескольких сотен килобайт за пару недель он дорастал до сотен мегабайт. Мой flow был не очень большой и я излазил его вдоль и поперёк так что скорее всего ошибка где-то в Apache Flink.

Думаю этого достаточно, хотя это не все проблемы с которыми пришлось столкнуться при использовании Apache Flink.

В целом у меня сложилось следующее впечатление от Apache Flink: идея хорошая и правильная, но реализация — говно. Так же возникло ощущение что с каждым новым релизом ситуация ухудщается.
Ну про checkpoint's я согласен. Мне этот механизм не очень нравится. И дело не в том, что он где-то течёт. Это всё недоказано, а может быть и пофикшено. Дело в том, что восстанавливаться из них сложно, когда меняешь граф.

Чуствителен к сетевым проблемам. Если у Вас распределённый движок для вычислений, происходит общение между нодами и тут бац, сети нет — ну я даже не знаю, что Вы ещё хотите? Все таймауты можно настраивать в akka.

Про то что неудобно деплоить. Тут я не соглашусь. Мы используем в качестве среды запуска YARN. Мы написали для Cloudera Manager своё расширение и следим за flink из него. Очень удобно. Залили новый JAR, новый конфиг, нажали restart в CM и поехали считать дальше с новым графом.

Сейчас мы используем версию 1.4.0, но и в начале когда был 1.1.4 было вполне стабильно.
А сколько записей в день приходит?
Около 1.5 миллиардов сообщений
Да. Это около 1ТБ данных.
Мы собираем на новой платформе аналитики практически все возможные события.
Показы всех секций интерфейса, всех элементов, любые взаимодействия с клиентом, очень много событий от плеера, масса событий от бекенда.
С учётом того, что на новую платформу мы перевели только двух клиентов, данных будет ещё больше.
Интересная статья, возникли несколько вопросов:
1) Почему воронки считаются не в CH, а на уровне Flink? Медленно?
(Кажется, что у аналитиков может возникнуть желание посмотреть разные воронки в разной ретроспективе.)
Смотрели ли / пробовали ли функции такого рода?
2) Какой тип persistence у redis? Скорости достаточно? Используете ли redis-cluster?
1) Вполне резонный вопрос. sequenceMatch мы пробовали. Но нужно было чтобы от одного шага воронки, до другого было определённое колличество сообщений.
Я потом увидел доклад от Yandex (https://www.youtube.com/watch?v=YpurT78U2qA), где ребята решают это на основе массивов. В ближайшее время буду изучать этот вариант. Возможно, что надобность в предрасчитанных воронках отпадёт, если аналитикам понравится считать их прямо в базе.

2) Честно говоря, я не знаю как именно в redis у нас настроены снепшоты. Скорости вполне хватает. Мы используем redis-cluster, 4 шарда, каждый реплицированный.

Сколько ядер/памяти в вашем кластере ClickHouse?

4 тачки, по 2 в разных датацентрах. каждая по 56 ядер и 256ГБ оперативы.
Диски в raid10, для ускорения чтения.
Sign up to leave a comment.