Привет, Хабр!
Данная задача в разных вариациях мне давалась на нескольких собеседованиях несколько лет назад. Хоть мой дизайн и проходил, мне стало интересно реализовать это в коде с нуля. Сыроватый и сильно урезанный по функционалу MVP готов, ссылка на github будет под катом. Пока что мной запланировано 3 статьи - эта, по бэкенду и по фронту. Будет много кода на scala, много котов (cats effect), стримов (fs2), пара lock-free техник, scala js, и постараюсь сделать так, чтобы мозг от всего этого не взорвался.
Все, кому интересно - добро пожаловать под кат.
Постановка задачи
Для начала, определимся с вводными, а именно - что мы проектируем.
Первое довольно очевидное вводное - распределенное горизонтально масштабируемое web-приложение с api, по которому может достучаться браузер (то есть UDP не берем)
Хотим at least once гарантию доставки сообщений - сообщения не теряются, но могут дублироваться
Доставка сообщений за разумное время - то есть, если клиент онлайн, сообщение ему должно приходить по возможности быстрее
Сообщения хранятся. Клиент может получить историю сообщений с момента своего последнего полученного сообщения
Чаты на произвольное количество участников, каждый из которых должен иметь возможность получить сообщение. Произвольное количество чатов.
Что ж, теперь разберем требования по пунктам.
Пункт 1 дает нам на выбор несколько вариантов архитектур:
stateless-приложение. AppServer-ы не хранят состояние, все состояние хранится в БД и опционально в распределенном кэше. AppServer-ы ничего друг про друга не знают. Балансер простой, запрос может прилететь на любой appServer. Добавление/удаление инстансов простое, происходит быстро и с минимальными последствиями. Накладные расходы на пересылку сообщений между appServer-ами, на взаимодействие с БД. Это рекомендуемая по умолчанию архитектура, и именно ее я выбрал в качестве целевой. Не подходит для большого трафика и минимального latency - видео- аудио-звонки, онлайн игры.
stateful-приложение, в котором клиенты, относящиеся к одному чату, обслуживаются одним appServer-ом. В данном случае обеспечивается минимальный latency, отсутствует внутренний трафик на пересылку сообщений. Подходит для видео- аудио-звонков, онлайн-игр. Большое кол-во клиентских соединений, так как клиент в общем случае подключается к нескольким чатам. Балансер сложный, на основе consistent hashing по id чата. При добавлении-удалении инстансов нужно обеспечить миграцию клиентов, возможна ситуация раздвоения чатов (проблема, похожая на split brain).
stateful-приложение на Akka. Комбинация первых двух подходов. Один чат обслуживается одним актором, минимальные затраты на синхронизацию с БД. Есть внутренний трафик, в общем случае 2 пересылки сообщения. Проблемы типа split brain решаются кластером самостоятельно (вроде там разновидность gossip, точно не помню). Нужен ack при обмене сообщениями, т.к. у Akka гарантии at most once (сообщения могут теряться). Второй рабочий вариант, и я планировал немного отдохнуть и реализовать его тоже.
Пункт 2 в пояснении не нуждается.
Пункт 3 требований подразумевает асинхронную доставку. В случае с web-клиентом это практически безальтернативно webSocket. Есть вариант с long polling и с server side events, но наиболее удобен webSocket, так что возьмем его.
Пункт 4 нам намекает на то, что нужно использовать БД web-масштабов, да еще и такую, которая умеет сортировать.
Пункт 5 скорее про организацию бизнес-логики, но требование "произвольное количество чатов" нам отсекает kafk-у, у нее вроде как были с этим проблемы.
Еще одно требование-пожелание, которое явно не вытекает из озвученных, но дает очень весомый плюс. Это - асинхронный ввод/вывод. При синхронном вводе/выводе мы ограничены количеством тредов, которые на одной не топовой машине имеют порядок тысяч, часто 2048 или 4096. Да, это пресловутая C10K-problem. При асинхронном вводе/выводе с использованием механизмов epoll/kqueue/iocp мы на одной машине ограничены количеством сокетов, которые имеют порядок сотни тысяч. Требование асинхронного ввода/вывода распространяется также и на коннекты к БД, и на саму БД в частности, и ставит под сомнение postgres или mysql. Также это требование снова отсекает kafk-у, у нее клиент по-моему только синхронный (если есть нормальный асинхронный клиент кафки, который не является оберткой над синхронным, просьба поделиться).
Бизнес-логика
Теперь немного поговорим про бизнес-логику. Она довольно простая в принципе: клиент подключается webSocket-ом к инстансу приложения и посылает сообщение о том, что присоединился к чату. Инстанс идет в БД и записывает информацию о том, что сообщения данному клиенту с clientId можно доставлять по host:port:clientId, где host:port - это адрес самого инстанса, на котором он слушает входящие от других инстансов. (UPD:) Только после этого клиент делает запрос истории и передает timestamp последнего полученного сообщения, или 0, если их еще не было. Отправитель делает запрос к инстансу, к которому он подключен, тот идет в БД, читает список кортежей host:port:clientId, группирует их по host:port и отправляет сообщение на все нужные инстансы. Далее собирает со всех список не ответивших клиентов, обновляет записи об этом в БД и отдает ответ отправителю, что все хорошо. Для внутренней коммуникации по host:port используем grpc.
Забегая немного вперед в выборе БД, накидалась вот такая картинка:

И тут вылезает целый ворох проблем синхронизации!
Пока одно сообщение отправляется адресатам, подключается новый адресат, и он может не успеть сообщение получить. Нам нужно предоставить гарантию того, что либо он сообщение получит, либо сможет прочитать его в логе, передав timestamp последнего имеющегося у него сообщения.
Проблема в самих timestamp-ах. Дело в том, что нормального времени у нас нет. Точнее, есть, но у каждой машины оно свое. Насколько это критично? Ну, я видел своими глазами сервер, который думал, что живет на 10 минут в будущем. Добавить еще такого же пришельца, но из прошлого, и мы получим разброс в 20 минут, который порвет нам любую синхронизацию. Приходит такой клиент со своим timestamp-ом из будущего, и часть сообщений ему не придет - те, которые были отправлены тем, кто застрял в прошлом. Есть, конечно, сервисы типа TrueTime, которые отдают интервал [low, high], в котором точно находится точное время, и который довольно мал, но для наших задач это избыточно. Есть векторные часы, но с ними сложно. Нам подойдет механизм эпох.
Механизм эпох работает следующим образом. У нас есть неубывающий номер эпохи типа Long, который увеличивается на 1 всегда, когда изменяется список адресатов - на добавление или удаление. Внутри одной эпохи может быть много сообщений, и они маркируются обычным timestamp-ом (можно взять ts из записи в бд). И мы гарантируем, что сообщение будет записано в лог с текущим номером эпохи. Таким образом мы гарантируем, что все сообщения будут доставлены. Попробуем доказать. Клиент может не получить сообщение только если оно добавится в лог с той же эпохой, но меньшим timestamp-ом, который уже есть у клиента. Если он его не получил, то его удалят из списка рассылки, увеличат номер эпохи и запишут это сообщение с увеличенным номером эпохи - таким образом, получаем противоречие. У не доставленного напрямую сообщения всегда номер эпохи будет больше, чем последний сохраненный номер у отключенного клиента, и это наш главный инвариант, который нам предстоит обеспечить.
С использованием механизма эпох возникает трудность в том, что запись в лог и изменение эпохи должны происходить взаимоисключающе. Конкурирующие изменения эпохи также должны быть синхронизированы. У нас вырисовывается перспектива использования механизма распределенных блокировок, а это дорого. Есть вариант с транзакциями в БД. Есть вариант с zookeeper-ом, в curator-е есть соответствующий рецепт, но беда в том, что zookeeper - это система класса replicated state machine, и все его состояние должно умещаться на одной машине, что нарушает условие "произвольное количество чатов". По-моему, у kafk-и были похожие проблемы в свое время. В общем, вариант с блокировками - не вариант.
Остается еще одна возможность - использование lock-free техник. Когда-то давно, больше 10 лет назад, я увлекался lock-free алгоритмами. Если у нас есть CAS (compare-and-swap, compare-and-set, в общем, атомарное сравнение с обменом), то мы можем написать любую синхронизацию. Из распространенных БД CAS нам предоставляют mongodb и hbase. Я выбрал HBase, так как она хранит сами данные сортированными, тогда как в mong-е для этого нужен индекс.
Наконец-то код
Итак, нам нужно написать 2 алгоритма - отправки сообщения и подключения/отключения клиента. Опишем их в терминах нескольких функций, оперирующих состоянием и записью/отправкой сообщений. Функции описаны в терминах эффекта cats.effect.IO.
Отправка сообщения:
trait MessagingLogic[A, M] { // A - address type, M - message type
type State = MessagingLogic.State[A] // state type
type MsgData = MessagingLogic.MsgData[M]
val algo = MessagingLogic.Algo.apply[M] _
val msg = MessagingLogic.Msg.apply[M] _
// Yes, I know about "interface segregation principle", but here I'd prefer to keep
// all functions in one list for understandability (we'll really need it!)
// sends message to all addresses, return failed addresses
protected def sendToAll(addresses: Set[A], epoch: Long, message: M): IO[Set[A]]
// state logic
protected def readState(recipient: String): IO[State]
protected def casState(recipient: String, expect: State, newState: State): IO[Boolean]
// log logic
protected def readMessages(recipient: String, fromEpoch: Long, fromTS: Long, untilEpoch: Long): FStream[IO, M]
protected def writeMessage(key: String, message: M): IO[Long] // returns timestamp
protected def logMessage(recipient: String, epoch: Long, timestamp: Long, key: String): IO[Unit]
// sends message with known last state
private def sendMsgs(
msgData: MsgData,
lastState: State,
sent: Set[A] = Set.empty,
fails: Set[A] = Set.empty): FStream[IO, Algo[M]] =
{
val sendTo = lastState.addresses -- sent -- fails
val MsgData(recipient, key, message, timestamp) = msgData
FStream.eval(logMessage(recipient, lastState.epoch, timestamp, key)) >>
FStream(algo(lastState.epoch, -1, s"start send to ${sendTo.size}, sent=${sent.size}, last failures=${fails.size}")) ++
FStream.eval(sendToAll(sendTo, lastState.epoch, message)).flatMap { failures =>
if (failures.isEmpty && fails.isEmpty) {
// no failures and no previous fails - just check epoch
// if epoch don't match - re-send
FStream.eval(readState(recipient)).flatMap { state =>
if (lastState.epoch == state.epoch)
FStream(algo(lastState.epoch, state.epoch, "OK"))
else
FStream(algo(lastState.epoch, state.epoch, s"retry, no failures")) ++
sendMsgs(msgData, state, sendTo)
}
} else {
val allFails = failures ++ fails
// failures or previous fails, changes addresses set => changes epoch
val newState = State(lastState.epoch + 1, lastState.addresses -- allFails)
FStream.eval(casState(recipient, lastState, newState)).flatMap { result =>
if (result)
FStream(algo(lastState.epoch, newState.epoch, s"failures=${failures.size}, epoch change OK")) ++
sendMsgs(msgData, newState, sendTo ++ sent -- failures)
else
FStream(algo(lastState.epoch, -1, s"failures=${failures.size}, sendTo=${sendTo.size}, sent=${sent.size}, lastFails=${fails.size} epoch change failed, retry")) ++
sendKey(msgData, sendTo ++ sent -- failures, allFails)
}
}
}
}
// send function, where we read current state and then perform logic with this
// last observed state
private def sendKey(msgData: MsgData, sent: Set[A] = Set.empty, fails: Set[A] = Set.empty): FStream[IO, Algo[M]] = {
val MsgData(recipient, key, message, timestamp) = msgData
FStream.eval(readState(recipient)).flatMap { state =>
FStream(algo(-1, state.epoch, s"start send")) ++
sendMsgs(msgData, state, sent, fails)
}
}
// main send function, where we just write message to 'messages' table and
// perform subsequent steps
def send(recipient: String, key: String, message: M): FStream[IO, Algo[M]] = {
FStream(algo(-1, -1, s"start write message key=$key")) ++
FStream.eval(writeMessage(key, message)).flatMap { timestamp =>
sendKey(MsgData(recipient, key, message, timestamp))
}
}
Код написан в терминах fs2.Stream, который тут переименован в FStream. Функция send просто возвращает стрим шагов, которые произвел алгоритм. Забегая вперед, это было не самое лучшее решение, но тогда оно мне казалось удачным - не нужно дополнительно вводить логирование, и все шаги можно получить и записать в тот же лог в одну строку, что в теории должно облегчать в будущем отладку. Итак, функция send просто записывает сообщение в таблицу messages, и передает управление дальше. SendKey читает текущее состояние и передает управление в sendMsgs. Там сообщение пишется в лог с текущей эпохой, затем пересылается всем адресатам, кому оно еще не было отправлено, и собираются ошибки. Если ошибок не было - перечитывается состояние, если эпоха не поменялась - успех, выходим. Если поменялось - рекурсивно вызываемся с новым состоянием. Если были ошибки - накидываем номер эпохи, формируем новый список адресатов и пробуем поменять состояние, если успех - вызываемся от нового состояния, если нет - значит, кто-то успел состояние поменять, возвращаемся на этап sendKey.
Функция подключения клиентов проще:
private def addClientCurrent(recipient: String, lastState: State, addresses: Set[A]): FStream[IO, Algo[M]] = {
FStream(algo(lastState.epoch, -1, s"set ${addresses.size} addresses")) ++
FStream.eval(casState(recipient, lastState, State(lastState.epoch + 1, lastState.addresses ++ addresses))).flatMap { result =>
if (result) {
FStream(algo(lastState.epoch, lastState.epoch + 1, s"cas OK"))
} else {
FStream(algo(lastState.epoch, -1, s"cas failed")) ++
addClient(recipient, addresses)
}
}
}
def addClient(recipient: String, addresses: Set[A]): FStream[IO, Algo[M]] = {
FStream(algo(-1, -1, s"read state")) ++
FStream.eval(readState(recipient)).flatMap { state =>
addClientCurrent(recipient, state, addresses)
}
}
Здесь все примерно так же, как в случае ошибок при отправке сообщения: читаем состояние, накидываем эпоху, докидываем адресов, пытаемся изменить состояние, если получилось - выходим, если нет - возвращаемся с чтения состояния.
И здесь есть 2 проблемы.
Состояние хранится сериализованным, список адресов вместе с эпохой. А это значит, что любое добавление адреса - это O(N), соответственно, добавление N адресов - это O(N^2), что плохо. Эта проблема решается в лоб делением списка адресов на chunk-и и связыванием их в односвязный список с записью головного чанка в состояние. Функция send, раз ей все равно все адреса нужны, разматывает этот список и производит compaction. Это решение с одной стороны не очень сложное, с другой - мне хотелось написать MVP побыстрее. Так что проблема решаема, но на данном этапе мне не хотелось ее решать. Тем более, есть проблема похуже, и это
Contention. Если приглядеться, то увидим, что функция добавления адресата - это типичный CAS-loop, в котором тот, кому не удалось произвести изменение, уходит на второй круг. Так вот, в случае высокого contention lock-free алгоритм проигрывает алгоритму с блокировками именно тем, что большое количество потоков будут раз за разом делать бесполезную работу, пытаясь успеть внести изменения, но изменения всегда вносит только один.
Если пошла жара...
Представим ситуацию - в наш мессенджер пришла популярная модель с целью выкладывать фотки своей пятой точки и рубить деньги на рекламе, и - о ужас! - открыла доступ к комментариям с целью типа создать видимость работы с аудиторией (нет). И нам начинают сыпаться как из рога изобилия запросы на добавление в чат и сообщения (комментарии), которым, по-хорошему, место в /dev/null, но бизнес есть бизнес, и у нас вырисовывается довольно сильный contention.
На помощь приходит буферизация. Идея - собрать побольше запросов на добавление и сообщений вместе, и обработать все скопом. Заодно и проблема O(N^2) при добавлении адресов решится - если мешаем эти запросы с сообщениями в кучу, то нам так и так все сохраненные адреса читать.
Такой буфер довольно несложно сделать. Заводим таблицу, в которую будем скидывать запросы на добавления и сообщения вместе, как было с сообщениями в предыдущем параграфе, в сортированном виде, и заведем еще один регистр с еще одной эпохой, и сделаем так: поток читает эпоху, пишет в таблицу запрос на добавление или сообщение, перечитывает эпоху, если изменилась - начинаем сначала, если нет - отлично, ждем несколько секунд, накидываем номер эпохи, пытаемся эпоху изменить, если неудача - просто выходим, дело сделано. Если текущий поток был тем "счастливчиком", который первый изменил эпоху - тогда он собирает все запросы предыдущей эпохи и закидывает их в алгоритм обработки (тот, что в следующем коде назван очень говорящим названием "out"):
trait BufferLogic[M, E] { // M - message type, E - event type (returns from out)
type BufferState = BufferLogic.State
type Msg = BufferLogic.Msg[M]
// delay
protected def timeout: Duration
// state stuff
protected def readBufferState(recipient: String): IO[BufferState]
protected def casBufferState(recipient: String, expect: BufferState, newState: BufferState): IO[Boolean]
// messages stuff
protected def addMessage(recipient: String, msg: Msg): IO[Unit]
protected def changeMessageEpoch(recipient: String, epoch: Long, timestamp: Long, key: String, value: M, toEpoch: Long): IO[Unit]
protected def readMessages(recipient: String, epoch: Long): FStream[IO, Msg]
// output - processing of whole messages batch
protected def out(recipient: String, msgs: Seq[Msg]): FStream[IO, E]
// adds 'msg' to db and checks if it's epoch matches with current epoch
private def consistentAdd(recipient: String, msg: Msg, lastState: BufferState, prevState: Option[BufferState] = None): IO[Long] = {
async[IO] {
prevState.fold(addMessage(recipient, msg.copy(epoch = lastState.epoch))) { ps =>
changeMessageEpoch(recipient, ps.epoch, msg.timestamp, msg.key, msg.msg, lastState.epoch)
}.await
val st = readBufferState(recipient).await
if (st.epoch == lastState.epoch) lastState.epoch
else consistentAdd(recipient, msg, st, Some(lastState)).await
}
}
def bufferSend(recipient: String, key: String, timestamp: Long, msg: M): FStream[IO, E] = {
val io = async[IO] {
val state = readBufferState(recipient).await
val lastEpoch = consistentAdd(recipient, Msg(state.epoch, key, timestamp, msg), state).await
// sleep for timeout to guarantee "timeout" period between epochs
IO.sleep(timeout).await
val casRes = casBufferState(recipient, State(lastEpoch), State(lastEpoch + 1)).await
// if we are "lucky", we are the first who moved epoch, so we should
// gather all epoch messages and move them to "out";
// otherwise do nothing
if (casRes) readMessages(recipient, lastEpoch).compile.fold(IndexedSeq.empty[Msg])(_ :+ _).await
else IndexedSeq.empty[Msg]
}
FStream.eval(io).flatMap { seq =>
out(recipient, seq)
}
}
def initBufferState(recipient: String): IO[Unit] = IO(())
}
object BufferLogic {
case class State(epoch: Long)
case class Msg[M](epoch: Long, key: String, timestamp: Long, msg: M)
}
Здесь преимущество в том, что у нас отсутствует contention - потоки, которые не смогли изменить состояние (а изменить состояние может только один) не уходят на второй круг, а просто выходят.
А недостаток - теряется гарантия at least once) Теперь наш алгоритм уязвим к сбоям вида "поменял эпоху и умер". К счастью, эта проблема решается добавлением к сообщению адреса колбэка, но на данном этапе я не хочу ее решать. Поэтому просто констатируем - запросы на добавление мы будем буферизовать, а для сообщений оставим на выбор - буферизовать, теряя гарантии, или нет. Отправка сообщений не дает большого contention-а, там на второй круг логика уходит только в случае отключения клиентов.
Теперь нам надо написать алгоритм обработки пачки перемешанных запросов на добавление и сообщений. Буферизованные запросы, приходящие пачками, остаются, тем не менее, конкурентными. К счастью, наш send-алгоритм от этого почти не поменяется (обещаю, этот код - последний, больше на сегодня не будет):
// TODO: refactor
private def sendMsgs(
msgData: MsgData,
newAddresses: Set[A],
lastState: State,
sent: Set[A] = Set.empty,
fails: Set[A] = Set.empty): FStream[IO, Algo[M]] =
{
val allAddrs = lastState.addresses ++ newAddresses -- fails
val sendTo = allAddrs -- sent
val MsgData(recipient, messages, timestamp) = msgData
// we log messages only when we add all 'newAddresses', because
// if we have 'newAddresses' to add, we are guaranteed that there
// will be new epoch, and we add messages to log only then
val log =
if (newAddresses.isEmpty)
FStream.eval(logMessages(recipient, lastState.epoch, timestamp, messages.keySet))
else FStream(())
log >>
FStream(algo(lastState.epoch, -1, s"start send to ${sendTo.size}, sent=${sent.size}, last failures=${fails.size}")) ++
FStream.eval(sendToAll(recipient, sendTo, lastState.epoch, messages.map { case (id, m) =>
id -> FullMessage(id, lastState.epoch, timestamp, m)
})).flatMap { failures =>
// it's important here that only if we have no new addresses,
// no current failures and no previous failures,
// we can go without epoch increment
if (newAddresses.isEmpty && failures.isEmpty && fails.isEmpty) {
// no failures and no previous fails - just check epoch
// if epoch don't match - re-send
FStream.eval(readState(recipient)).flatMap { state =>
if (lastState.epoch == state.epoch)
FStream(algo(lastState.epoch, state.epoch, "OK"))
else
FStream(algo(lastState.epoch, state.epoch, s"retry, no failures")) ++
sendMsgs(msgData, newAddresses, state, allAddrs)
}
} else {
// we have failures, or prev fails, or new addresses -
// in any case, there is address set change, so - epoch movement
val allFails = failures ++ fails
val newAllAddrs = allAddrs -- failures
// failures or previous fails, changes addresses set => changes epoch
val newState = State(lastState.epoch + 1, newAllAddrs)
FStream.eval(casState(recipient, lastState, newState)).flatMap { result =>
if (result)
FStream(algo(lastState.epoch, newState.epoch, s"failures=${failures.size}, epoch change OK")) ++
sendMsgs(msgData, Set.empty, newState, newAllAddrs)
else
FStream(algo(lastState.epoch, -1, s"failures=${failures.size}, sendTo=${sendTo.size}, sent=${sent.size}, lastFails=${fails.size} epoch change failed, retry")) ++
sendKeys(msgData, newAddresses, newAllAddrs, allFails)
}
}
}
}
def sendKeys(msgData: MsgData, newAddresses: Set[A], sent: Set[A] = Set.empty, fails: Set[A] = Set.empty): FStream[IO, Algo[M]] = {
FStream.eval(readState(msgData.recipient)).flatMap { state =>
FStream(algo(-1, state.epoch, s"start send")) ++
sendMsgs(msgData, newAddresses, state, sent, fails)
}
}
def send(recipient: String, messages: Map[String, M], newAddresses: Set[A]): FStream[IO, Algo[M]] = {
FStream(algo(-1, -1, s"start write ${messages.size} messages")) ++
FStream.eval(writeMessages(messages)).flatMap { timestamp =>
sendKeys(MsgData(recipient, messages, timestamp), newAddresses)
}
}
Тут просто разбираемся, кому, что и куда отправлять, куда, что и с какой эпохой сохранять, а общий алгоритм не поменялся.
Итог
Если честно, я думал, статья будет раза в 3 короче, но у меня пока нет идей, как ее ощутимо подсократить (и это я еще тесты не описывал). Зато была проделана основная работа - определен вариант архитектуры, выбраны БД, технология, и описана бизнес-логика.
В следующей статье напишем обвязку - те самые функции, которые сегодня остались абстрактными и в терминах которых работает логика. Напишем бинды к HBase, хендлеры webSocket, grpc, свяжем вместе буфер и логику отправки, и попробуем сохранить мозг целым, а нервы - расслабленными (хотя я свои примерно все там и оставил).
Спасибо за внимание!