company_banner

Чаты на вебсокетах, когда на бэкенде WAMP. Теперь про Android

    Мой коллега уже писал про наш опыт разработки чатов на вебсокетах для iOS, поэтому часть про особенности бэкенда с точки зрения клиента у нас общая. А вот реализация на Android, конечно, отличается. И ещё мне не приходилось, как в первой статье, искать библиотеку для поддержки старых версий операционной системы, потому что на Android каких-то глобальных изменений в сетевой части не было, всё работало и так.

    К реализации вернёмся чуть ниже, а начнём с ответов на вопросы про бэкенд, которые появились после первой статьи: почему WAMP, какой брокер используем и некоторые другие моменты.

    На время передам слово нашему бэкенд-разработчику @antoha-gs, а если хочется сразу почитать про клиент-серверное общение и декодирование, то первый раздел можно пропустить.

    Что там на бэкенде

    Почему WAMP. Изначально искал открытый протокол, который мог бы работать поверх WebSocket с поддержкой функционала PubSub и RPC и с потенциалом масштабирования. Лучше всего подошёл WAMP — одни плюсы, разве что не нашёл реализации протокола на Java/Kotlin, которая бы меня устраивала.

    Какой брокер мы используем. Продолжая предыдущий пункт, это, собственно, и послужило написанию собственной реализации протокола. Плюсы — экспертиза в своём коде и гибкость, то есть при надобности всегда можно отойти от стандарта в нужную сторону. Каких-то серьёзных минусов не выявил.

    К небольшим проблемам реализации проекта чатов можно отнести то, что нужно было ресёрчить и ревёрсить то, как работает реализация на Sendbird — сервисе, который мы использовали. То есть какими возможностями мы там пользовались и какой дополнительный функционал был реализован поверх. Также к сложностям отнёс бы перенос данных из Sendbird в свою базу.

    Ещё был такой момент в комментариях:

    «Правильно, что не стали использовать Socket.IO, так как рано или поздно столкнулись бы с двумя проблемами: 1) Пропуск сообщений. 2) Дублирование сообщений. WAMP — к сожалению — также не решает эти вопросы. Поэтому для чатов лучше использовать что-то вроде MQTT».

    Насколько я могу судить, протокол не решает таких проблем магическим образом, всё упирается в реализацию. Да, на уровне протокола может поддерживаться дополнительная информация/настройки для указания уровня обслуживания (at most/at least/exactly), но ответственность за её реализацию всё равно лежит на конкретной имплементации. В нашем случае, учитывая специфику, достаточно гарантировать надёжную запись в базу и доставку на клиенты at most once, что WAMP вполне позволяет реализовать. Также он легко расширяем.

    MQTT — отличный протокол, никаких вопросов, но в данном сравнении у него меньше фич, чем у WAMP, которые могли бы пригодиться нам для сервиса чатов. В качестве альтернативы можно было бы рассмотреть XMPP (aka Jabber), потому что, в отличие от MQTT и WAMP, он предназначен для мессенджеров, но и там без «допилов» бы не обошлось. Ещё можно создать свой собственный протокол, что нередко делают в компаниях, но это, в том числе, дополнительные временные затраты.

    Это были основные вопросы касательно бэкенда после предыдущей статьи, и, думаю, мы ещё вернёмся к нашей реализации в отдельном материале. А сейчас возвращаю слово Сергею.

    Клиент-сервер

    Начну с того, что WAMP означает для клиента.

    • В целом протокол предусматривает почти всё. Это облегчает взаимодействие разработчиков клиентской части и бэка.

    • Кодирование всех типов событий в числах (PUBLISH — это 16, SUBSCRIBE — 32 и так далее). Это усложняет чтение логов разработчику и QA (сразу не догадаться, что значит прилетевшее сообщение [33,11,5862354]).

    • Механизм подписок на события (например, новые сообщения в чат или обновление количества участников) реализован через получение от бэкенда уникального id подписки. Его надо где-то хранить и ни в коем случае не терять во избежание утечек. Как это сделано (было бы сильно проще и подписываться и отписываться просто по id чата):client → подписываемся на новые сообщения в чате  [32,18,{},"co.fun.chat.testChatId"]backend → [33,18,5868752 (id подписки)]client → после выхода из чата отписываемся по id [34,20,5868752]

    Для работы с сокетом использовали OkHttp (стильно, надёжно, современно, реализация ping-pong таймаутов из коробки) и RxJava, потому что сама концепция чата — практически идеальный пример того самого event-based programming, ради которого Rx, в общем, и задумывался.

    Теперь рассмотрим пример коннекта к серверу, использующему WAMP-протокол через OkHttpClient: 

    val request = Request.Builder()
        .url(ChatsConfig.SOCKETURL)
        .addHeader("Connection", "Upgrade")
        .addHeader("Sec-WebSocket-Protocol", "wamp.json")
        .addHeader("Authorization", authToken)
        .build()
    val listener = ChatWebSocketListener()
    webSocket = okHttpClient.newWebSocket(request, listener)

    Пример реализации ChatWebSocketListener:

    private inner class ChatWebSocketListener : WebSocketListener() {
    
    override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
     connectionStatusSubject.onNext(ChatConnectionStatuses.NOTCONNECTED) 
    //subject, оповещающий пользователей о состоянии коннекта (в UI нужен для отображения лоадеров, оффлайн-стейтов и так далее)
    }
    
    override fun onClosing(webSocket: WebSocket, code: Int, reason: String) {
     webSocket.close(1000, null)
    }
    
    override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
     onConnectionError("${t.message} ${response?.body}")
    }
    
    override fun onMessage(webSocket: WebSocket, text: String) {
     socketMessagesSubject.onNext(serverMessageFactory.processMessage(text)) //subject, через который идут все сообщения, которые в дальнейшем фильтруются для конкретных получателей (см. ниже)
    }
    
    override fun onOpen(webSocket: WebSocket, response: Response) {
     authorize()
     }
    }

    Здесь мы видим, что все сообщения от сокета приходят в виде обычного String, представляющего собой JSON, закодированный по правилам WAMP протокола и имеющий структуру:

    [ResultCode: Int, RequestId: Long, ArgumentsMap: JsonObject ]

    Например:

    [50, 7, {"type":100, "chats":[список чатов]}]

    Декодирование и отправка сообщений

    Для декодинга сообщений в объекты мы использовали библиотеку Gson. Все модели ответа отписываются обычными data-классами вида:

    @DontObfuscate
    data class ChatListResponse(@SerializedName("chats") val chatList: List<Chat>)

    А декодирование происходит с помощью следующего кода:

    private fun chatListUpdateInternal(jsonChatsResponse: JSONObject):
    ChatsListUpdatesEvent {
     return gson.fromJson(jsonChatsResponse.toString(), 
    ChatsListUpdatesEvent::class.java)
    }

    Теперь рассмотрим базовый пример отправки сообщения по сокету. Для удобства мы сделали обёртку для всех базовых типов WAMP сообщений: 

    sealed class WampMessage {
     class BaseMessage(val wampId: Int, val seq: Long, val jsonData: JSONArray) : WampMessage() 
    
     class ErrorMessage(val procedureId: Int, val seq: Long, val jsonData: JSONArray) : WampMessage()
    
     object WelcomeMessage : WampMessage()
     class AbortMessage(val jsonData: JSONArray) : WampMessage()
    }

    А также добавили фабрику для формирования этих сообщений:

    fun getCallMessage(rpc: String,
             options: Map<String, Any> = emptyMap(),
             arguments: List<Any?> = emptyList(),
             argumentsDict: Map<String, Any?> = emptyMap()):
    WampMessage.BaseMessage {
     //[CALL, Request|id, Options|dict, Procedure|uri, Arguments|list]
     val seq = nextSeq.getAndIncrement()
     return WampMessage.BaseMessage(WAMP.MessageIds.CALL,
                   seq,
                   JSONArray(listOfNotNull(WAMP.MessageIds.CALL,
                   seq,
                   options,
                   rpc,
                   arguments,
                   argumentsDict)))
    }

    Пример отправки сообщений: 

    val messages: Observable<WampMessage> = socketMessagesSubject
    
    fun sendMessage(msgToSend: WampMessage.BaseMessage): 
    Observable<WampMessage> {
     return messages.filter {
       it is WampMessage.BaseMessage && it.seq == msgToSend.seq
    }
        .take(1)
        .doOnSubscribe {
         webSocket.send(msgToSend.jsonData.toString())
        }
    }

    Сопоставление отправленного сообщения и ответа на него в WAMP происходит с помощью уникального идентификатора seq, отправляемого клиентом, который потом кладётся в ответ.

    В клиенте генерация идентификатора делается следующим образом:

    companion object {
     private val nextSeq: AtomicLong = AtomicLong(1)
    }
    fun getNextSeq() = nextSeq.getAndIncrement()

    Взаимодействие с WAMP Subscriptions 

    Подписки в протоколе WAMP — концепт, по которому подписчик (клиент) подписывается на какие-либо события, приходящие от бэкенда. В нашей реализации мы использовали:

    • обновление списка чатов;

    • новые сообщения в конкретном чате;

    • изменение онлайн-статуса собеседника;

    • изменение в составе участников чата;

    • смена роли юзера (например, когда его назначают модератором);

    • и так далее.

    Клиент сообщает серверу о желании получать события с помощью следующего сообщения:

    [SUBSCRIBE: Int, RequestId: Long, Options: Map, Topic: String]

    Где topic — это скоуп событий, которые нужны подписчику. 

    Для формирования базового события подписки используется код:

    fun getSubscribeMessage(topic: String, options: Map<String, Any> = emptyMap()): 
    WampMessage.BaseMessage {
     val seq = nextSeq.getAndIncrement()
     return WampMessage.BaseMessage(WAMP.MessageIds.SUBSCRIBE,
                   								  seq,
                  								  JSONArray(listOfNotNull(WAMP.MessageIds.SUBSCRIBE,
                                    seq,
                                    options,
                                    topic)))
    }

    Разумеется, при выходе с экрана (например, списка чатов), необходимо соответствующую подписку корректно отменять. И вот тут выявляется одно из свойств протокола WAMP: при отправке subscribe-сообщения бэкенд возвращает числовой id подписки, и выходит, что отписаться от конкретного топика нельзя — нужно запоминать и хранить этот id, чтобы использовать его при необходимости.

    А так как хочется оградить пользователей API подписок от лишнего менеджмента айдишников, было сделано следующее:

    private val subscriptionsMap = ArrayMap<String, Long>()
    
    private fun getBaseSubscription(topic: String): Observable<WampMessage> {
     val msg = wampClientMessageFactory.getSubscribeMessage(topic)
     return send(msg).map {
       val subscriptionId = converter.getSubscriptionId((it.asBaseMessage()).jsonData)
       subscriptionsMap[topic] = subscriptionId
       subscriptionId
    }
        .switchMap { subscriptionId ->
          chatClient.messages.filter {
           it.isMessageFromSubscription(subscriptionId)
         }
        }
    }

    Так клиент ничего не будет знать об id, и для отписки ему будет достаточно указать имя подписки, которую необходимо отменить:

    fun unsubscribeFromTopic(topic: String) {
     if (!subscriptionsMap.contains(topic)) {
        return
     }
     val msg = 
    wampClientMessageFactory.getUnsubscribeMessage(subscriptionsMap[topic])
     send(msg, true).exSubscribe()
     subscriptionsMap.remove(topic)
    }

    Это то, что я хотел рассказать про реализацию на Android, но если есть вопросы — постараюсь ответить на них в комментариях. Напомню, что про чаты на вебсокетах в iOS мы уже писали вот здесь, а также готовим отдельную часть про бэкенд.

    FunCorp
    Разработка развлекательных сервисов

    Комментарии 3

      +1
      Правильно я понимаю, что соединение с вебсокетом идет из приложения (или из браузера)? В этом случае нужно его «держать»? Как это влияет на разряжание аккумулятора (дрейн)?
        +3
        Правильно я понимаю, что соединение с вебсокетом идет из приложения (или из браузера)?


        Да, всё так, коннект держит само приложение. В целом, на аккумулятор это не сильно влияет, потому что коннект к вебсокету не сильно отличается от http keep-alive, который и так постоянно держит соединение открытым для api-клиента.
          +1
          «Держать соединение» — это просто хранить в памяти минимальное TCP-окно, это ничего не дает на процессор, только на память причем очень мало. А вот «держать и активно использовать много соединений» — в некотором смысле дает что-то на процессор, потому что при получении пакета надо выполнять поиск по многим TCP-окнам. Но это вопрос к серверу, а не к клиенту.

        Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

        Самое читаемое