В одном из исследовательских проектов нам с коллегами пришла идея совместить Akka Stream, Akka event sourcing (typed persistence) и Akka cluster sharding для реализации stateful stream processing. На мой взгляд, получилось достаточно интересное и лаконично решение, которым я бы и хотел с вами поделиться.
Disclaimer
Все мысли, описанные в данной статье, представляют собой толкование автором его восприятия механизмов и фреймворков потоковой обработки данных и не претендуют на истину. Все приведенные ниже примеры выдуманы исключительно для данного материала на основе пережитого личного опыта. Код примеров написан на языке Scala. Автор приветствует исправления, корректировки и дополнения изложенной информации.
Потоковая обработка данных
Потоковая обработка данных – это архитектурный паттерн, при котором система обрабатывает данные, которые поступают (как правило, из бесконечных источников), и формирует результаты в реальном времени. “В реальном времени” – достаточно условное определение, поскольку какие-то задержки у нас будут всегда. Данная фраза обычно ставится в противовес пакетной обработке, в которой обычно обработка данных и формирование результатов запускается периодически, по расписанию.
Потоковая обработка достаточно востребована в отраслях, где есть необходимость в наиболее быстрой реакции на поток событий. В качестве примеров можно привести:
борьбу с мошенничеством: фрод, атаки на инфраструктуру, детектирование аномального поведения;
маркетинг в режиме реального времени (real-time offering): кредиты, скоринги, реклама;
интернет вещей (IoT): обработка и реакция на данные с датчиков в различных областях (транспорт, промышленность).
При решении достаточно большого ряда бизнес-задач возникает необходимость в промежуточной агрегации данных, их аккумуляции, определении границ данных, и все это на бесконечном потоке. Подобные задачи всегда создавали проблемы при реализации потоковой обработки.
Еще одним примером усложненной задачи может служить фильтрация или обогащение одного бесконечного потока данных на основании другого бесконечного потока данных. Как правило, в таких задачах есть основной достаточно большой поток данных и второй медленно изменяющийся (slowly changing dimension), на основании которого как раз и происходит обогащение. Попробую привести пример.
Мониторинг сроков доставки груза в логистической компании. Есть основной огромный поток проводок грузов по всем складам компании: погрузка в рейс, выгрузка на склад, прием от клиента, выдача клиенту. И есть дополнительный поток данных, в котором независимо приходит клиентский срок доставки груза. Для мониторинга качества исполнения доставки нам необходимо завести state по каждому грузу, в котором будет храниться клиентский срок. Пропуская все проводки по грузам через данный state мы сможем рассчитывать прогнозируемое время доставки. Эту задачу можно немного усложнить, если, например, мы не можем гарантировать то, какое событие придет первым – клиентский срок или проводки по перемещению груза. В таком случае, проводки по грузам, для которых еще неизвестен клиентский срок, необходимо будет сохранять в state до получения информации по сроку доставки груза.
Для решения подобного класса задач в реальном времени как раз прекрасно подходит механизм stateful stream processing.
Stateful stream processing в современных фреймворках
На данный момент в экосистеме Java существует достаточно большое количество фреймворков, позволяющих построить масштабируемую и надежную потоковую обработку. На мой взгляд, самые популярные из них – это Apache Flink и Apache Spark.
Spark
Stateful stream processing в spark structure streaming осуществляется при помощи операторов mapGroupsWithState и flatMapGroupsWithState. Эти операторы можно вызвать на KeyValueGroupedDataset, для этого исходный dataset нужно сгруппировать по ключу (метод groupByKey). На вход операторы принимают функцию со следующей сигнатурой:
(Key, Data, GroupState[S]]) => Output
Где Key –это ключ, по которому мы группировали наш dataset. Data – это коллекция сгруппированных по ключу элементов из нашего потока данных. GroupState[S] – интерфейс Spark, предоставляющий нам доступ (get, update, remove и т.д.) к нашему состоянию. Output – тип результата, который сформируется в dataset, возвращаемый после *GroupsWithState оператора.
Другим словами, разработчику достаточно определить свою функцию, которая будет осуществлять stateful stream processing. Spark берет на себя всю работу по сохранению, поиску и загрузке состояния. Стоит отметить, что state будет привязан к ключу, по которому вы группировали исходный dataset, и в общем случае количество хранимых state будет равно количеству ключей, полученных на потоке данных. Однако, ничего не мешает вам удалять state по наступлению какого-либо события или по истечении определенного периода времени.
Более подробно про это можно почитать в официальной документации, а здесь – посмотреть на хороший пример.
Flink
Работа с потоковым состоянием во Flink в чем-то похожа на Spark. Точно также, чтобы начать работу со state, вам необходимо сгруппировать исходный DataStream при помощи метода keyBy. Далее необходимо определить свой оператор, используя rich function. Внутри данного оператора, при помощи RuntimeContext вы можете создать любой из flink state примитивов, который позволит взаимодействовать со state при обработке элементов DataStream. В число этих примитивов входят:
ValueState<T> - самый простой вариант для чтения обновления конкретного значения;
ListState<T> – коллекция хранимых элементов;
MapState<UK, UV> – абстракция над state в виде ассоциативного массива (ключ - значение);
ReducingState<T> и AggregatingState<IN, OUT>, которые позволяют осуществлять агрегацию состояния при добавлении элементов.
Более подробно про потоковую обработку через состояния во Flink можно почитать в официальной документации: Flink Stateful Stream Processing и Flink Working with State.
Оба описанных выше механизма “из коробки” позволяют буквально в несколько десятков строк реализовать потоковую обработку с использованием состояния. Более того, фреймворки Flink и Spark изначально позиционируют себя как легко-масштабируемые. Если вы при помощи них собрали приложение для потоковой обработки, то его без каких либо переработок можно запустить как на одной машине, так на десятках (а может и сотнях) серверов.
Akka stream stateful processing
В Akka stream, к сожалению, мне не удалось найти механизмов, которые позволят с такой же легкость строить stateful stream processing автоматическим сохранением, поиском и загрузкой состояния. Довольно распространенным подходом является использование асинхронных этапов обработки (mapAsync), в которых вручную осуществляется поиск, загрузка и сохранение состояния в хранилищах данных.
Прежде, чем переходить к описанию реализации, давайте немного поговорим об используемых фреймворках экосистемы Akka. Все эти фреймворки “под капотом” используют actor model, при этом для пользователя (разработчика) сами акторы могут быть очень хорошо скрыты за высокоуровневыми абстракциями.
Akka Stream
Akka Stream – это и есть основной фреймворк для потоковой обработки данных, самый простой пример можно реализовать буквально в несколько строк:
import akka.actor.ActorSystem
import akka.stream.scaladsl._
implicit val actorSystem: ActorSystem = ActorSystem("my-system")
val wordsSource = Source.single("Hello akka stream!")
val printSink = Sink.foreach[String](println)
wordsSource.mapConcat(w => w.split(" ")).toMat(printSink)(Keep.right).run()
AcrtorSystem – это ключевой системный компонент Akka, вокруг которого и строится вся акторная модель и который позволяет создавать, взаимодействовать с группой акторов. Source.single – создает потоковый источник данных из одного элемента. В данном случае это просто строка. Sink.foreach создает приемник данных, а при помощи mapConcat мы добавляем этап обработки (Flow), который разбивает строку на слова и таким образом наш поток из одного элемента String превращается в поток из трех элементов String. Источник, обработчик и приемник соединяются вместе в RunnableGraph, который можно запустить при помощи метода run.
Пример выше написан при помощи так называемого fluent DSL. Его выразительности может не хватить для построения более сложных графов обработки данных. В этом случае на помощь приходит graph DSL, в котором мы вручную соединяем различные входы и выходы компонентов. Ниже – пример построения графа обработки при помощи graph DSL с той же самой логикой
RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val wordsSource = builder.add(Source.single("Hello akka stream!"))
val printSink = builder.add(Sink.foreach[String](println))
val process = builder.add(Flow[String].mapConcat(_.split(" ")))
wordsSource ~> process
process ~> printSink
ClosedShape
}).run()
Более подробно про graph dsl можно почитать тут: Working with Graphs.
Если посмотреть внутрь объектов Source и Sink, то можно обнаружить большое количество методов, которые позволяют создавать различные источники и приемники данных: Source.fromIterator, Source.fromJavaStream, Source.cycle, Source.tick, Source.repeat, Source.future, Soutce.actorRef; Sink.headOption, Sink.lastOption, Sink.seq, Sink.collection, Sink.asPublisher, Sink.ignore, Sink.foreachAsync, Sink.actorRef, а также многие другие.
Но основным преимуществом akka stream, на мой взгляд, является Alpakka project, который содержит достаточно большое количество совместимых с akka stream компонент для интеграции с различными источниками данных.
Также стоит отметить, что Akka Stream (в отличие от того же Spark) позволяет строить циклические графы. Это может быть очень полезно, чтобы строить цепочки обработки с обратной связью.
Akka event sourcing (typed persistence)
Чтобы разбираться с akka event sourcing, для начала нужно поговорить об akka persistence. Akka persistence – это механизм, позволяющий акторам сохранять свое состояние с целью его восстановления после перезапуска. Ключевыми элементами механизма являются:
Command – команда. Набор запросов, которые принимает actor; в частном случае – это намерение изменить хранимое в actor состояние;
Event – событие. Является производным от команды и описывает свершившееся изменение; события являются объектами хранения первого порядка, которые не могут быть изменены, а только добавлены;
Snapshot – состояние actor в некоторый определенный момент времени; является производным результатом от применения некоторой логики к набору Event. Является частью механизма оптимизации, позволяющего восстанавливать состояние не обрабатывая все ранее сохраненные Event.
Для простейшей реализации persistent actor разработчику необходимо:
описать логику обработки Command и преобразования их в Event (как правило, обработка – это предварительная валидация);
описать логику обработки Event – то, как они будут влиять на изменения состояния актора;
описать логику/периодичность сохранения Snapshot.
Весь механизм хранения, загрузки и правильного восстановления состояния Akka берет на себя. Стоит обратить внимание, что побочным эффектом от хранения Event вместо классического хранения состояния является тот факт, что мы можем легко исправить невалидное состоянии из-за ошибок в коде просто путем исправления программы и удаления всех Snapshot. В классическом подходе, когда мы по команде изменяем состояние в БД, нам приходится делать миграции данных и как-то догадываться о том, какое у нас должно было быть состояние.
Akka event sourcing – это абстракция, построенная поверх Akka persistence, позволяющая избавиться от части boilerplate (шаблонного, повторяемого, инфраструктурного) кода, добавляющая строгую типизацию в реализацию persistent actor и немного упрощающая интеграцию с Cluster Sharding.
Также стоит отметить, что данный механизм пропагандирует Event Sourcing подход. Мой опыт говорит, что большое количество людей не понимает до конца данную концепцию (а некоторые ее радикально отвергают). В интернете есть некоторое количество статей на эту тему, Akka предлагает нам ознакомиться с Introducing Event Sourcing на MSDN и Events As First-Class Citizens от Randy Shoup.
Со своей стороны хочу порекомендовать бесплатную группу курсов от Lightbend на тему Reactive Architecture (не путать с реактивным программированием, это разные вещи). В 6 курсе достаточно неплохо раскрывается идея Event Sourcing, но чтобы более глубоко погрузиться в концепцию, стоит пройти все курсы с начала. Я получил большое удовольствие от прохождения курсов, а также лично наблюдал, как разработчики, не разделяющие принципы event driven архитектуры, проникались этими идеями в процессе изучения курсов.
Akka persistence (и Akka event sourcing, соответственно) содержат целый ряд официальных и community плагинов, которые позволяют хранить события и слепки состояний в различных источниках данных. Для того, чтобы подключить какой-либо из persistence plugin, как правило, достаточно всего лишь добавить соответствующую библиотеку в список зависимостей проекта и прописать несколько настроек в конфиге (id плагина, url к источнику данных и название схемы, в которой будут храниться данные).
Akka cluster sharding
Cluster sharding – это механизм Akka, призванный упростить взаимодействие между различными физическими нодами в распределенной системе и организовать масштабирование системы. Также данный механизм позволяет организовать совместное размещение (co-location) нескольких обработчиков для одного и того же события в рамках одного физического шарда. Это необходимо для того, чтобы избежать лишней пересылки данных между разными shard node.
В нашем примере мы попытаемся организовать размещение actor, отвечающих за хранение state конкретной сущности на том же shard node, на которой происходит чтение и обработка kafka партиции с данными по этой же сущности.
Другим словами, основной поток данных с событиями по сущностям записывается в топик кафки, при этом партиционирование данных при записи осуществляется по id сущности. Таким образом, события по одной и той же сущности гарантированно будут попадать в одну и ту же партицию топика kafka. После запуска нескольких экземпляров нашего приложения каждый из них подпишется на определенные партиции топика (стандартное поведение для kafka consumer group) и, соответственно, все события по одной и той же сущности будут всегда попадать на одну и туже shard node. Обработка такого события будет происходить на этой же машине. В процессе обработки нам потребуется обращаться к actor, отвечающему за хранение состояния сущности из события. При помощи cluster sharding мы постараемся заставить Akka размещать данный actor на той же shard node, где происходит обработка.
Akka stateful streaming
Пример демо-проекта, созданного для данной статьи, можно посмотреть в github репозитории akka-stateful-streaming.
Мы построим простую real-time offering систему для бюро кредитных историй. Согласно википедии, бюро кредитных историй — организация, занимающаяся оказанием услуг по формированию, обработке и хранению кредитных историй, а также по предоставлению кредитных отчетов. Это организация, в которую банки и различные кредитные организации должны на регулярной основе передавать всю информацию о кредитах клиентов банка. Одной из услуг, которую могло бы предоставлять бюро кредитных историй, это информирование банка об изменениях платежеспособности их клиентов, например, с целью рефинансирования кредита или предложения нового кредита.
В данном примере у нас будет два потока событий. Первый и основной поток – это событие с изменением кредитных показателей:
case class CreditAccountIndicator(
accountId: Long,
bankId: String,
payment: BigDecimal, // размер платежа
debtAmount: BigDecimal, // размер задолженности
overdueAmount: BigDecimal // размер просроченной задолжности
)
Второй поток событий – это настройки порогов по показателям от банка. Очевидно, что это slowly changing dimension – данные настройки будут меняются редко, по сравнению с основным потоком данных.
case class BankAlertSettings(
bankId: String,
paymentThreshold: Option[BigDecimal], //настройка порога по платежу
debtAmountThreshold: Option[BigDecimal], //настройка порога по задолженности
overdueAmountThreshold: Option[BigDecimal] //настройка порога по просрочке
)
Согласно данным ЦБ РФ в России всего 763 кредитных организаций. И для нас не составит большой проблемы запустить 763 actor, каждый из которых будет обслуживать и хранить состояние настройки конкретного банка.
В результате обработки основного потока данных мы хотели бы получить уведомления о превышении пороговых показателей клиентом банка.
case class AlertEvent(accountId: Long, bankId: String, message: String)
Давайте начнем с описания actor, который будет хранить наши настройки. Первым делом нам нужно определить команду, при помощи которой мы будем обновлять состояние.
final case class UpdateState[R](data: R, replyTo: ActorRef[StateUpdated[R]]) extends Command[R]
case class StateUpdated[R](data: Option[R]) extends BankAlertSerializable
Здесь в качестве data будет само значение настройки, а replyTo – это адрес, на который нужно будет отправлять ответ об успешном обновлении state. StateUpdated – это ответ на команду обновления, в данном примере его содержимое не несет никакой пользы. Для нас будет важен сам факт того, что обновление состояния прошло успешно.
Далее определим команду для запроса настроек у actor.
final case class GetState[R](replyTo: ActorRef[StateAnswer[R]]) extends Command[R]
case class StateAnswer[R](data: Option[R]) extends BankAlertSerializable
Единственный параметр команды replyTo – это адрес, на котором отправитель ожидает ответа StateAnswer.
Состояние нашего actor будет описываться простым классом с единственным опциональным полем. Изначально state инициализируется значением None.
final case class State[R](value: Option[R]) {
def update(data: R): State[R] = copy(value = Some(data))
}
Опишем логику обработки наших команд:
def handleCommand(state: State[R], command: Command[R]): Effect[Event[R], State[R]] = {
command match {
case UpdateState(data, replyTo) =>
val event = Event(data)
Effect.persist(event).thenReply(replyTo)(s => StateUpdated(s.value))
case GetState(replyTo) =>
val answer = StateAnswer(state.value)
log.info(s"Answer state ${answer.data}")
Effect.reply(replyTo)(answer)
}
}
Если к нам пришла команда обновления состояния UpdateState, то мы просто сформируем Event и вернем директиву, которая сначала сохранит Event и затем отправит пересчитанный state (настройку) по адресу из команды. Тут надо учитывать, что после сохранения Event автоматически запускается обработчик событий, который в нашем случае обновляет state.
Если к нам пришла команда на запрос state, то мы просто возвращаем в ответ текущее значение.
Ну и наконец простейший обработчик событий:
def handleEvent(state: State[R], event: Event[R]): State[R] = {
state.update(event.data)
}
На каждое событие мы просто перезаписываем state. Очень важно, чтобы логика данного обработчика была без побочных эффектов, потому что эта логика обработки будет вызываться для сохраненных событий каждый раз после перезапуска системы. В данном примере для простоты опущен механизм создания и восстановления состояния из snapshot, но я уверен, что пытливый читатель с легкостью с ним разберется по официальной документации.
В трейте StreamSources описаны две достаточно стандартные функции создания Source и Sink для топиков kafka с чтением offset на источнике и commit offset на приемнике. То есть каждое прочитанное сообщение из топика для нас обогащено offset в виде akka.kafka.ConsumerMessage.Committable, которое мы вынуждены будем передавать по всем этапам обработки вместе с данными. Данный consumer offset для исходного топика будет сохранен в kafka (committed) вместе с отправкой сообщения в результирующий топик.
Отдельное внимание стоит обратить на функцию createShardedKafkaSource. Помимо того, что она создает вполне обычный sourceWithOffsetContext, она также связывает наш state actor с consumer группой при помощи akka-stream-kafka-cluster-sharding.
KafkaClusterSharding.messageExtractor – механизм, который на основании id нашей сущности и количества партиций в топике определяет shardId, на котором должен работать наш state actor. ExternalShardAllocationStrategy занимается ребалансировкой сущностей по shard node. А kafkaSharding.rebalanceListener реагирует на изменение подписок на партиции топика для текущей shard node (при запуске и остановке новых экземплярова приложения) и рассылает изменения всем членам кластера.
Честно говоря, меньше всего уверен в том, что правильно понимаю особенности работы данного механизма. С радостью выслушаю ваши комментарии и замечания.
Для чтения и обновления state в классе StateInterractor объявлены два вспомогательных метода.
def stateRequestAsync(
event: CreditAccountIndicator
): Future[DataWithState] = {
val ref = sharding.entityRefFor(typeKey, event.bankId)
ref.ask[StateAnswer[BankAlertSettings]](GetState(_)).map(ans => DataWithState(ans.data, event))
}
def stateUpdateAsync(
state: BankAlertSettings
): Future[StateUpdated[BankAlertSettings]] = {
val key = state.bankId
val ref = sharding.entityRefFor(typeKey, key)
ref.ask[StateUpdated[BankAlertSettings]](actor => UpdateState(state, actor))
}
извлекаем из каждого сообщения bankId
при помощи ClusterSharding создаем ActorRef для нашей sharded entity
формируем команду и отправляем запрос при помощи ask pattern.
Каждая функция нам возвращает Future с ответом от state actor. Насколько я понимаю, в момент отправки запроса механизм ClusterSharding либо определит реальный адрес нашего actor, либо инициирует его создание.
Стоит отметить, что первое обращение к нашему state actor может занять некоторое время. Поскольку наш actor основан на Akka persistence, то при старте он должен вычитать и обработать все ранее сохраненные Event из хранилища. Все последующие обращения должны происходить достаточно быстро, поскольку данные уже будут храниться в памяти.
В случае необходимости, можно попробовать оптимизировать эти первые обращения предварительной командой чтения state по всем ранее прочитанным bankId. Список ранее встреченных bankId можно при обработке отправлять в отдельный compacted topic (топик который хранит сообщения только с уникальными ключами). При старте приложения, до запуска всех обработчиков, можно вычитывать такой топик с самого начала и отправлять GetState по каждому bankId.
Все, что нам осталось – это создать два Akka stream. Первый для чтения настроек банков.
val settingsSource = stateCommandSource("bank-settings") // 1
val offsetSink = Committer.sink(committerSettings)
val settingsComplete = settingsSource
.map(r => r.value.parseJson.convertTo[BankAlertSettings]) // 2
.mapAsync(1)(stateInterractor.stateUpdateAsync) // 3
.asSource
.map { case (result, committable) =>
log.info(s"State updated to ${result.data}") // 4
committable
}
.toMat(offsetSink)(Keep.right) // 5
.run()
Создаем settingsSource, который будет вычитывать все сообщения из топика с настройками банков bank-settings;
Преобразуем каждое json сообщение из топика в case class BankAlertSettings;
Асинхронно отправляем запрос на изменение state;
Логируем ответ на запрос сохранения state;
Отправляем offset исходного сообщения в offsetSink, который просто закоммитит offset в исходном топике bank-settings.
Второй стрим будет обрабатывать события изменения показателей кредитной истории.
val indicatorsSource = createShardedKafkaSource(sharding, entity, "credit-indicators") // 1
val indicatorsComplete = indicatorsSource
.map(r => r.value.parseJson.convertTo[CreditAccountIndicator]) // 2
.mapAsync(1)(stateInterractor.stateRequestAsync) // 3
.map(calcAlerts) // 4
.asSource
.map(toEnvelope) // 5
.toMat(dataSink)(Keep.right) // 6
.run()
Создаем indicatorsSource, который будет вычитывать все сообщения из топика с показателями credit-indicators. Поскольку мы знаем, что это будет основной нагруженный топик, то инициализируем шардирование нашей sharding entity именно относительно consumer данного топика;
Преобразуем каждое json сообщение из топика в case class CreditAccountIndicator;
Отправляем асинхронный запрос к state actor и получаем набор пар: показатели и текущее состояние настроек;
На основе полученных пар рассчитываем уведомления по показателям, которые упали ниже порога;
Преобразуем каждое сообщение в ProducerMessage.Envelope;
Отправляем их в dataSink.
Немного прокомментирую функцию расчета уведомлений о выходе значений за пороговые.
def calcAlerts(dataWithState: DataWithState): Either[Unit, AlertEvent]
Либо она возвращает уведомление AlertEvent в случае превышения порога, либо Unit, если порог не был превышен. Такой возвращаемый результат нам нужен, для того, чтобы в обоих вариантах закоммитить offset в исходном топике credit-indicators. Если нам из функции вернулся Unit, то мы его просто преобразуем в Envelope событие коммита offset:
ProducerMessage.passThrough(committable)
Если же нам вернулся AlertEvent, то мы его преобразуем в Envelope с json сообщением для топика client-alerts и CommitableOffset для исходного топика credit-indicators.
def alertToEnvelope(committable: Committable)(alert: AlertEvent): Envelope[String, String, Committable] = {
val message = alert.toJson.compactPrint
val record = new ProducerRecord[String, String](alertTopic, message, alert.bankId)
ProducerMessage.single(record, committable)
}
В репозитории проекта есть docker-compose для запуска kafka и MongoDB (используется для хранения events & snapshots). Также есть два файла с настройками для запуска разных экземпляров приложения: app1.conf и app2.conf. Для того, чтобы добавить ещё экземпляры, достаточно прописать новый незанятый адрес/порт в akka.remote.artery.canonical.* и добавить новую node в akka.cluster.seed-nodes.
Хочу также отметить, что текущее решение имеет одно явное ограничение – весь state должен умещаться в памяти приложения. В нашем примере state едва ли занимает 1 килобайт, а количество таких экземпляров будет измеряться тысячами, что в сумме едва ли превысит отметку в несколько мегабайт.
Уважаемые читатели, прошу вас поделиться в комментариях информацией о том, какими фреймворками пользовались вы для решения stateful stream processing задач?