Как стать автором
Обновить
0
FUNCORP
Разработка развлекательных сервисов

Чаты на вебсокетах, когда на бэкенде WAMP. Теперь про Android

Время на прочтение7 мин
Количество просмотров6.8K

Мой коллега уже писал про наш опыт разработки чатов на вебсокетах для iOS, поэтому часть про особенности бэкенда с точки зрения клиента у нас общая. А вот реализация на Android, конечно, отличается. И ещё мне не приходилось, как в первой статье, искать библиотеку для поддержки старых версий операционной системы, потому что на Android каких-то глобальных изменений в сетевой части не было, всё работало и так.

К реализации вернёмся чуть ниже, а начнём с ответов на вопросы про бэкенд, которые появились после первой статьи: почему WAMP, какой брокер используем и некоторые другие моменты.

На время передам слово нашему бэкенд-разработчику @antoha-gs, а если хочется сразу почитать про клиент-серверное общение и декодирование, то первый раздел можно пропустить.

Что там на бэкенде

Почему WAMP. Изначально искал открытый протокол, который мог бы работать поверх WebSocket с поддержкой функционала PubSub и RPC и с потенциалом масштабирования. Лучше всего подошёл WAMP — одни плюсы, разве что не нашёл реализации протокола на Java/Kotlin, которая бы меня устраивала.

Какой брокер мы используем. Продолжая предыдущий пункт, это, собственно, и послужило написанию собственной реализации протокола. Плюсы — экспертиза в своём коде и гибкость, то есть при надобности всегда можно отойти от стандарта в нужную сторону. Каких-то серьёзных минусов не выявил.

К небольшим проблемам реализации проекта чатов можно отнести то, что нужно было ресёрчить и ревёрсить то, как работает реализация на Sendbird — сервисе, который мы использовали. То есть какими возможностями мы там пользовались и какой дополнительный функционал был реализован поверх. Также к сложностям отнёс бы перенос данных из Sendbird в свою базу.

Ещё был такой момент в комментариях:

«Правильно, что не стали использовать Socket.IO, так как рано или поздно столкнулись бы с двумя проблемами: 1) Пропуск сообщений. 2) Дублирование сообщений. WAMP — к сожалению — также не решает эти вопросы. Поэтому для чатов лучше использовать что-то вроде MQTT».

Насколько я могу судить, протокол не решает таких проблем магическим образом, всё упирается в реализацию. Да, на уровне протокола может поддерживаться дополнительная информация/настройки для указания уровня обслуживания (at most/at least/exactly), но ответственность за её реализацию всё равно лежит на конкретной имплементации. В нашем случае, учитывая специфику, достаточно гарантировать надёжную запись в базу и доставку на клиенты at most once, что WAMP вполне позволяет реализовать. Также он легко расширяем.

MQTT — отличный протокол, никаких вопросов, но в данном сравнении у него меньше фич, чем у WAMP, которые могли бы пригодиться нам для сервиса чатов. В качестве альтернативы можно было бы рассмотреть XMPP (aka Jabber), потому что, в отличие от MQTT и WAMP, он предназначен для мессенджеров, но и там без «допилов» бы не обошлось. Ещё можно создать свой собственный протокол, что нередко делают в компаниях, но это, в том числе, дополнительные временные затраты.

Это были основные вопросы касательно бэкенда после предыдущей статьи, и, думаю, мы ещё вернёмся к нашей реализации в отдельном материале. А сейчас возвращаю слово Сергею.

Клиент-сервер

Начну с того, что WAMP означает для клиента.

  • В целом протокол предусматривает почти всё. Это облегчает взаимодействие разработчиков клиентской части и бэка.

  • Кодирование всех типов событий в числах (PUBLISH — это 16, SUBSCRIBE — 32 и так далее). Это усложняет чтение логов разработчику и QA (сразу не догадаться, что значит прилетевшее сообщение [33,11,5862354]).

  • Механизм подписок на события (например, новые сообщения в чат или обновление количества участников) реализован через получение от бэкенда уникального id подписки. Его надо где-то хранить и ни в коем случае не терять во избежание утечек. Как это сделано (было бы сильно проще и подписываться и отписываться просто по id чата):client → подписываемся на новые сообщения в чате  [32,18,{},"co.fun.chat.testChatId"]backend → [33,18,5868752 (id подписки)]client → после выхода из чата отписываемся по id [34,20,5868752]

Для работы с сокетом использовали OkHttp (стильно, надёжно, современно, реализация ping-pong таймаутов из коробки) и RxJava, потому что сама концепция чата — практически идеальный пример того самого event-based programming, ради которого Rx, в общем, и задумывался.

Теперь рассмотрим пример коннекта к серверу, использующему WAMP-протокол через OkHttpClient: 

val request = Request.Builder()
    .url(ChatsConfig.SOCKETURL)
    .addHeader("Connection", "Upgrade")
    .addHeader("Sec-WebSocket-Protocol", "wamp.json")
    .addHeader("Authorization", authToken)
    .build()
val listener = ChatWebSocketListener()
webSocket = okHttpClient.newWebSocket(request, listener)

Пример реализации ChatWebSocketListener:

private inner class ChatWebSocketListener : WebSocketListener() {

override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
 connectionStatusSubject.onNext(ChatConnectionStatuses.NOTCONNECTED) 
//subject, оповещающий пользователей о состоянии коннекта (в UI нужен для отображения лоадеров, оффлайн-стейтов и так далее)
}

override fun onClosing(webSocket: WebSocket, code: Int, reason: String) {
 webSocket.close(1000, null)
}

override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
 onConnectionError("${t.message} ${response?.body}")
}

override fun onMessage(webSocket: WebSocket, text: String) {
 socketMessagesSubject.onNext(serverMessageFactory.processMessage(text)) //subject, через который идут все сообщения, которые в дальнейшем фильтруются для конкретных получателей (см. ниже)
}

override fun onOpen(webSocket: WebSocket, response: Response) {
 authorize()
 }
}

Здесь мы видим, что все сообщения от сокета приходят в виде обычного String, представляющего собой JSON, закодированный по правилам WAMP протокола и имеющий структуру:

[ResultCode: Int, RequestId: Long, ArgumentsMap: JsonObject ]

Например:

[50, 7, {"type":100, "chats":[список чатов]}]

Декодирование и отправка сообщений

Для декодинга сообщений в объекты мы использовали библиотеку Gson. Все модели ответа отписываются обычными data-классами вида:

@DontObfuscate
data class ChatListResponse(@SerializedName("chats") val chatList: List<Chat>)

А декодирование происходит с помощью следующего кода:

private fun chatListUpdateInternal(jsonChatsResponse: JSONObject):
ChatsListUpdatesEvent {
 return gson.fromJson(jsonChatsResponse.toString(), 
ChatsListUpdatesEvent::class.java)
}

Теперь рассмотрим базовый пример отправки сообщения по сокету. Для удобства мы сделали обёртку для всех базовых типов WAMP сообщений: 

sealed class WampMessage {
 class BaseMessage(val wampId: Int, val seq: Long, val jsonData: JSONArray) : WampMessage() 

 class ErrorMessage(val procedureId: Int, val seq: Long, val jsonData: JSONArray) : WampMessage()

 object WelcomeMessage : WampMessage()
 class AbortMessage(val jsonData: JSONArray) : WampMessage()
}

А также добавили фабрику для формирования этих сообщений:

fun getCallMessage(rpc: String,
         options: Map<String, Any> = emptyMap(),
         arguments: List<Any?> = emptyList(),
         argumentsDict: Map<String, Any?> = emptyMap()):
WampMessage.BaseMessage {
 //[CALL, Request|id, Options|dict, Procedure|uri, Arguments|list]
 val seq = nextSeq.getAndIncrement()
 return WampMessage.BaseMessage(WAMP.MessageIds.CALL,
               seq,
               JSONArray(listOfNotNull(WAMP.MessageIds.CALL,
               seq,
               options,
               rpc,
               arguments,
               argumentsDict)))
}

Пример отправки сообщений: 

val messages: Observable<WampMessage> = socketMessagesSubject

fun sendMessage(msgToSend: WampMessage.BaseMessage): 
Observable<WampMessage> {
 return messages.filter {
   it is WampMessage.BaseMessage && it.seq == msgToSend.seq
}
    .take(1)
    .doOnSubscribe {
     webSocket.send(msgToSend.jsonData.toString())
    }
}

Сопоставление отправленного сообщения и ответа на него в WAMP происходит с помощью уникального идентификатора seq, отправляемого клиентом, который потом кладётся в ответ.

В клиенте генерация идентификатора делается следующим образом:

companion object {
 private val nextSeq: AtomicLong = AtomicLong(1)
}
fun getNextSeq() = nextSeq.getAndIncrement()

Взаимодействие с WAMP Subscriptions 

Подписки в протоколе WAMP — концепт, по которому подписчик (клиент) подписывается на какие-либо события, приходящие от бэкенда. В нашей реализации мы использовали:

  • обновление списка чатов;

  • новые сообщения в конкретном чате;

  • изменение онлайн-статуса собеседника;

  • изменение в составе участников чата;

  • смена роли юзера (например, когда его назначают модератором);

  • и так далее.

Клиент сообщает серверу о желании получать события с помощью следующего сообщения:

[SUBSCRIBE: Int, RequestId: Long, Options: Map, Topic: String]

Где topic — это скоуп событий, которые нужны подписчику. 

Для формирования базового события подписки используется код:

fun getSubscribeMessage(topic: String, options: Map<String, Any> = emptyMap()): 
WampMessage.BaseMessage {
 val seq = nextSeq.getAndIncrement()
 return WampMessage.BaseMessage(WAMP.MessageIds.SUBSCRIBE,
               								  seq,
              								  JSONArray(listOfNotNull(WAMP.MessageIds.SUBSCRIBE,
                                seq,
                                options,
                                topic)))
}

Разумеется, при выходе с экрана (например, списка чатов), необходимо соответствующую подписку корректно отменять. И вот тут выявляется одно из свойств протокола WAMP: при отправке subscribe-сообщения бэкенд возвращает числовой id подписки, и выходит, что отписаться от конкретного топика нельзя — нужно запоминать и хранить этот id, чтобы использовать его при необходимости.

А так как хочется оградить пользователей API подписок от лишнего менеджмента айдишников, было сделано следующее:

private val subscriptionsMap = ArrayMap<String, Long>()

private fun getBaseSubscription(topic: String): Observable<WampMessage> {
 val msg = wampClientMessageFactory.getSubscribeMessage(topic)
 return send(msg).map {
   val subscriptionId = converter.getSubscriptionId((it.asBaseMessage()).jsonData)
   subscriptionsMap[topic] = subscriptionId
   subscriptionId
}
    .switchMap { subscriptionId ->
      chatClient.messages.filter {
       it.isMessageFromSubscription(subscriptionId)
     }
    }
}

Так клиент ничего не будет знать об id, и для отписки ему будет достаточно указать имя подписки, которую необходимо отменить:

fun unsubscribeFromTopic(topic: String) {
 if (!subscriptionsMap.contains(topic)) {
    return
 }
 val msg = 
wampClientMessageFactory.getUnsubscribeMessage(subscriptionsMap[topic])
 send(msg, true).exSubscribe()
 subscriptionsMap.remove(topic)
}

Это то, что я хотел рассказать про реализацию на Android, но если есть вопросы — постараюсь ответить на них в комментариях. Напомню, что про чаты на вебсокетах в iOS мы уже писали вот здесь, а также готовим отдельную часть про бэкенд.

Теги:
Хабы:
Всего голосов 50: ↑50 и ↓0+50
Комментарии3

Публикации

Информация

Сайт
funcorp.dev
Дата регистрации
Дата основания
Численность
101–200 человек
Местоположение
Кипр
Представитель
ulanana

Истории