
Как часто вы слышите что-то о протоколе WebSocket? А как часто видели его в проектах? Продакшн? Андроид? Что? Его кто-то использует?
А теперь представьте: перед вами стоит задача внедрить WS в крупный продакшн проект с нуля. С чего начать? Как подступиться?
Меня зовут Нестеренко Михаил и я руковожу разработкой международного Android приложения BetBoom. В этой статье мы поговорим о WebSocket.
Начнем с того, чем все-таки является WebSocket? Самый частый ответ на собеседованиях - «просто сокет». При этом описать его особенности многие затрудняются.
Если он сокет, то есть ли у него что-то общее с TCP сокетом (соединением)? Если да, то почему он все-таки Web? Вопросов, как правило, больше, чем ответов на них.
Если не вдаваться в низкоуровневые подробности, ответы на эти вопросы можно уместить в одну небольшую табличку:
Параметр | WS | HTTP | TCP |
Открытие соединения | Один раз | При каждом запросе | Один раз |
Взаимодействие | Двунаправленное | Однонаправленное | Двунаправленное |
Куки | Да | Да | Нет |
Коды ответов | Да | Да | Нет |
Handshake | Сложный | Сложный | Простой |
Скорость | Высокая | Низкая | Высокая |
Уровень OSI* | Application | Application | Transport |
*Для полного понимания протоколов в целом, рекомендовал бы хотя бы поверхностно ознакомиться с OSI
Из таблички очень легко сделать вывод: WS - «прокаченная версия» TCP с помощью фишек HTTP, поэтому между ними крайне много общего.
WS взял сильные стороны от двух протоколов - именно поэтому он нередко применяется в современной разработке.
На самом деле, это просто другой, более высокий и более простой для программиста, уровень передачи данных.
А «Web» он по причине того, что его начали использовать в веб-браузерах: набор протоколов в них ограничен, и обычный TCP или UDP не могут полноценно функционировать. WebSocket - «обёртка» над классическим TCP.
Подключение к серверу состоит из нескольких простых шагов:
запрос на рукопожатие
подтверждение, что соединение установлено
обмен сообщениями в обе стороны
взаимное закрытие соединения.

При этом «рукопожатие» происходит по схеме HTTP.
Представим, что мы хотим получать коэффициенты для ставок на матч: как поддержать актуальность данных? Отправлять запрос каждую секунду? Очень вряд ли. Наверняка, вам захочется, чтобы сервер сам оповестил вас о том, что данные обновились.
Поэтому, если вы хотите сделать real-time общение между клиентом и сервером и синхронизировать их данные, то WebSocket - ваш оптимальный выбор.
Для выбора WS у нас было больше причин:
синхронизация данных приложения в реальном времени события;
скорость работы - он заметно производительнее обычного HTTP;
это основной протокол работы для нашего бэкенда и фронтенда соответственно.
Долгое время для приложения делали отдельный mobile http gateway, что создавало трудности в поставке новых фич и их актуальности
При этом, после минимального просмотра статистики, можно сделать вывод, что большинство приложений сейчас абсолютно не подготовлены к такому раскладу, поэтому без ресерча тут было не обойтись.
Представим ситуацию, что у нас уже есть продуктовое приложение, которое полностью работает по HTTP. Там уже есть все нужные запросы и данные для них, готовая авторизация, сериализация ответов и куча библиотек, которые годами разрабатывали для самого популярного протокола.
Что же делать? Правильно! Пытаться добавить новое, не сломав при этом старое: оставить все текущие сетевые запросы на HTTP и добавить возможность писать новые на WS.
Начать лучше всего с этапа, про который большинство часто забывает - проектирование.
Посмотрим на схему взаимодействия с сервером на этом самом Web сокете:

Можно заметить:
клиент может отправить запрос на API
клиент получает ответы от API
сервер присылает клиенту данные самостоятельно с помощью «нотификаций».
Следовательно, в приложении будет 2 п��тока данных:
те, которые мы запросили и ждем от сервера,
те, которые мы не запрашивали, но они пришли.
Но приходят эти данные в единое место - SocketClient
Дальше, будем разделять их на 2 потока, которые я описал выше, и перенаправлять в соответствующие специализирующиеся компоненты:
DataCollector- обёртка, в которую клиентский код будет отправлять запрос и получать ответ.NotifyCollector- обёртка, в которой можно будет прослушивать все нотификации и соответствующе реагировать на них.
В нашей архитектуре эти сущности спокойно можно причислить к Data Source - источникам данных, которые находятся почти на самом низком уровне приложения, однако таковыми их мы не будем считать, т.к. они выполняют инфраструктурные и утилитарные функции в системе (они же все-таки просто обертки над клиентом). Поэтому отнесем их к «инфраструктурному» слою.
Что же мы будем считать источником данных (Data Source)?
Все просто: Data Source'ем мы будем считать конкретную реализацию конкретных методов, которые мы можем вызывать в соединении и получить желаемые данные.
В случае же нотификаций можно провернуть аналогичный ход, либо же остановиться более простом варианте: NotifyCollector в иерархии нотификаций будет являться Data Source.
А дальше уже можно организовать привычную чистую архитектуру и обращаться к этим Data Source'ам из репозиториев и так далее.

Первым делом, реализуем сам сокет клиент и основные методы работы с соединением:
подключение;
отключение;
отправка запроса;
получение ответов;
прослушивание нотификаций.
В процессе разработки будут использоваться следующие библиотеки:
Интерфейс нашего будущего соединения:
interface SocketClient<D> { val config: SocketConfig val isConnected: StateFlow<Boolean> val notify: Flow<SocketEvent.Notify> val data: Flow<SocketEvent.Response> suspend fun sendRaw(request: String) suspend fun <R : SocketEvent.Response> send(request: SocketRequest<D>): R suspend fun start(): Boolean suspend fun close(): Boolean suspend fun reload(): Boolean } interface SocketConfig { val name: String val host: String val cookies: Map<String, String> val requestTimeoutMillis: Long val connectTimeoutMillis: Long }
Сразу заметим, что мы не имеем привязки к формату запросов или ответов идущих в сокет, что в будущем (а точнее уже!) позволяет работать с другими хостами.
data class SocketRequest<D>( val type: KType, val authorized: Boolean, val data: D ) @Serializable sealed interface SocketEvent { @Serializable data object Unknown : SocketEvent data class Pending(val id: String) : SocketEvent data object Unauthorized : SocketEvent data object Refreshed : SocketEvent interface Notify : SocketEvent interface Response : SocketEvent { val id: String } }
А теперь приступим к реализации клиента:
internal class DefaultSocketClient<D>( override val config: SocketConfig, private val httpClient: HttpClient, private val interceptor: SocketInterceptor<D> ) : SocketClient<D> { private val coroutineExceptionHandler = CoroutineExceptionHandler { _, e -> handleError(e) } private val coroutineContext = Dispatchers.IO + coroutineExceptionHandler private val scope = CoroutineScope(coroutineContext) private lateinit var socket: DefaultClientWebSocketSession private val _notify = MutableSharedFlow<SocketEvent.Notify>( replay = 0, extraBufferCapacity = 16, onBufferOverflow = BufferOverflow.DROP_OLDEST ) private val _data = MutableSharedFlow<SocketEvent.Response>( replay = 0, extraBufferCapacity = 16, onBufferOverflow = BufferOverflow.DROP_OLDEST ) override val notify = _notify.asSharedFlow() override val data = _data.asSharedFlow() private val isOpen = MutableStateFlow(false) override val isConnected = isOpen.asStateFlow() private suspend fun getSocket(): DefaultClientWebSocketSession = withTimeout(config.connectTimeoutMillis) { while (isOpen.value.not()) { delay(100) } socket } private suspend fun initSocket() { socket = httpClient.webSocketSession { url("wss://${config.host}") config.cookies.forEach { (k, v) -> cookie(k, v) } timeout { requestTimeoutMillis = config.requestTimeoutMillis connectTimeoutMillis = config.connectTimeoutMillis } } isOpen.value = true scope.launch { subscribe( onNotify = _notify::emit, onRequest = _data::emit ) } } override suspend fun reload(): Boolean = /** some logic */ override suspend fun start(): Boolean = /** some logic */ override suspend fun close(): Boolean = /** some logic */ override suspend fun sendRaw(request: String) { getSocket().send(request) } override suspend fun <R : SocketEvent.Response> send( request: SocketRequest<D> ): R { val id = interceptor.intercept(request).id return data.first { it.id == id } as R } private suspend fun subscribe( onNotify: suspend (SocketEvent.Notify) -> Unit, onRequest: suspend (SocketEvent.Response) -> Unit ) = getSocket() .incoming .receiveAsFlow() .filterIsInstance<Frame.Text>() .intercept(interceptor, config) .catch { handleError(it) } .collect { event -> when (event) { is SocketEvent.Notify -> onNotify(event) is SocketEvent.Response -> onRequest(event) else -> Unit } } }
Здесь и далее некоторые моменты реализации будут упущены, дабы не распространять весь продуктовый код
Обратим внимание на метод subscribe - он играет ключевую роль в нашей реализации:
Получает весь поток данных в сокете.
Берет текстовые сообщения.
Перехватывает их с помощью нашего интерцептора (о котором стоит поговорить отдельно).
Сериализует данные и делает дополнительные действия, если таковые требуются (например, обновления токенов).
В случае возникновения исключений, обрабатывает их.
И дальше отправляет данные в один из потоков, которые мы уже описали раньше: данные и нотификации.
И вот что происходит в процессе обработки:
internal fun <D> Flow<Frame.Text>.intercept( interceptor: SocketInterceptor<D>, config: SocketConfig ) = transform { frame -> val json = frame.readText() if (config.logEnabled) "(${config.name}) Intercept data: $json" logWithTagInfo TAG val event = interceptor.intercept(json) when (event) { is SocketEvent.Unknown -> "Received unknown json: $json" logWithTag TAG is SocketEvent.Pending -> "Request with id = ${event.id} was pending" logWithTag TAG is SocketEvent.Unauthorized -> "Received response is unauthorized" logWithTag TAG is SocketEvent.Refreshed -> "Auth token was refreshed" logWithTag TAG is SocketEvent.Notify, is SocketEvent.Response -> emit(event) } }
А происходит просто трансляция эвентов, которые генерирует наш интерцептор. В базовом случае, если никакой дополнительной реакции на данные не потребовалось, то это просто перекидывание сериализованного ответа дальше по цепочке.
Благодаря обобщениям на всех уровнях мы добились возможности использовать такой клиент для практически любого сервера с разными форматами общения и схемой запроса.
И дополнительно объявим нашу обертку того самого SocketEvent.Response со специфичными для нашего API полями:
interface BBResponse<out T, out E> : SocketEvent.Response { val result: T? val error: E? /* other common fields */ }
Если сильно упрощать, то на этом моменте мы уже имеем неплохую базу для общения с сервером и проект вполне сможет функционировать так некоторое время.
Так, например, могут выглядеть запросы на получение данных пользователя:
@Serializable internal data class GetUserResponse( @SerialName("id") override val id: String, @SerialName("result") override val result: GetUserDTO? = null, @SerialName("error") override val error: ErrorDTO? = null, ) : BBResponse<GetUserDTO, ErrorDTO> suspend fun getUser(id: Int): User { val request = BBSocketRequestData(params = GetUserParamsDTO(id = id)) socketClient.sendRaw(socketJson.encodeToString(request)) socketClient.data .filterIsInstance<GetUserResponse>() .first { /** some logic */ } .toDomain() }
Но так как мы все-таки делаем продуктовые решение, добавим немного удобства использования этих методов и обобщим вызовы, чтобы это можно было спокойно делать в любом клиентом классе.
Добавим недостающие функции:
переподключение к сети;
обра��отка ошибок;
таймауты;
кэширование;
передача состояний запроса (бездействие, загрузка, успех и ошибка).
Посмотрим на нашу обертку для отправки запросов, которая содержит вышеперечисленное:
private inline fun <reified P, R, reified E : BBError> request( namespace: String, method: String, params: P, authorized: Boolean, crossinline load: suspend (client: BBSocketClient, request: SocketRequestInfo) -> Result<R, E>, ): StateFlow<Result<R, E>> = flow { emit(Result.Loading) val requestInfo = socketRequestInfoGenerator<P>( namespace = namespace, method = method, params = params, authorized = authorized ) val result = withTimeout(CoreConstants.Socket.REQUEST_TIMEOUT) { connectionController.openSocket() load(socketClient, requestInfo) } emit(result) }.catch { emit(Result.Error()) }.stateIn( scope = scope, started = SharingStarted.Eagerly, initialValue = Result.Idle )
То есть в этом классе (и методе) можно делать предварительную подготовку к запросу и все дополнительные действия по обработке или установке каких-либо ограничений.
И добавим публичную обертку для вызова без кэшей:
inline fun <reified P, reified R : Any?, reified E : BBError, reified DTO, reified EDTO> request( namespace: String, method: String, params: P, authorized: Boolean = true, crossinline successMapper: (DTO) -> R, crossinline errorMapper: (EDTO) -> E ): StateFlow<Result<R, E>> = this.request( namespace = namespace, method = method, params = params, authorized = authorized, load = { websocket, info -> websocket.load( info = info, successMapper = successMapper, errorMapper = errorMapper ) } )
При этом, под капотом загадочного метода load не скрывается ничего сложного:
создание специальной обертки с тех. данными для запроса;
отправка запроса;
ожидание результата;
маппинг в состояние в зависимости от ответа.
internal suspend inline fun <R, E, DTO, EDTO> BBSocketClient.load( info: SocketRequestInfo, crossinline successMapper: (DTO) -> R, crossinline errorMapper: (EDTO) -> E ): Result<R, E> = with(info) { send<BBResponse<DTO, EDTO>>( SocketRequest( type = type, authorized = authorized, data = data ) ).baseMap(successMapper, errorMapper) }
Сериализация
Можно заметить загадочный KType в тех. данных для отправки запроса. Вот тут и появляются первые проблемы нашей реализации.
data class SocketRequest<D>( val type: KType, val authorized: Boolean, val data: D )
Для удобного использования сериализации с помощью kotlinx-serialization Json объект при вызове метода encodeToString должен иметь доступ к конкретному классу, который он сериализует, т.е. либо чтобы класс был указан явно (программистом), либо получен inline с помощью ключевого слова reified:
inline fun <reified T> T.encodeToString() = json.encodeToString(this)
В обратном случае мы получим ошибку Cannot use 'T' as reified type parameter. Use a class instead., которая напрямую скажет нам об этом.
Для использования ключевого слова reified функция должна обязательно быть inline.
Однако, стандартные функции языка Kotlin не позволяют совместить использования интерфейса (SocketClient) и inline модификатора
Поэтому перед нами встал выбор
Иметь удобную сериализацию с
reifiedгенериками.Иметь интерфейс и тесты на сокет.
И выбрать все-таки пришлось вариант 2, поскольку в будущем это могло бы создать много неприятных проблем.
Поэтому при передаче нашего запроса в SocketClient мы добавляем запросу небольшой контекст (см. выше SocketRequest) c KType, который можно использовать при сериализации:
private fun encode(type: KType, request: BBSocketRequestData<*>) = socketJson.encodeToString(serializer(type), request)
Десериализация
Ещё интереснее становится процесс десериализации.
Основные проблемы:
В сокет одновременно может прийти несколько ответов на запрос в один момент.
При получении данных, мы не знаем в какой класс их десериализовать, т.к. все взаимодействие асинхронно (см. пример
getUser).
Поэтому в ответе на запрос должны присутствовать какие-то опознавательные знаки.
Например, поле type, которое будет содержать уникальное значение
Тогда мы сможем воспользоваться удобным инструментом kotlinx-serialization - discriminator
Так библиотека сможет идентифицировать класс по этому ключу и преобразовывать данные для нас.
Это было бы образцовое решение, но, к сожалению, для нас оно не актуально, т.к. такого однозначного ключа мы не имеем по ряду причин (в т.ч. исторических).
Поэтому для однозначной десериализации был написан наш собственный сериализатор, который бы сопоставлял наши ключи с тем, что приходит в ответе.
Однако останавливаться на нем мы пока что не будем, чтобы не затягивать статью.
В итоге, у нас получился неплохой, удобный и достаточно лаконичный инструмент для отправки запросов:
internal class DefaultUserDataSource( private val socket: DataCollector ) : UserDataSource { override fun getBalance(): StateFlow<Result<User, BBError>> = socket.request( namespace = NAMESPACE, method = GET_USER, params = GetUserParamsDTO(), successMapper = GetUserDTO::toDomainEntity, errorMapper = ErrorDTO::toDomainEntity ) }
Таким образом, мы можем расширять нашу коллекцию запросов сколько угодно и не повторять шаблонный код каждый раз.
Нотификации
С нотификациями дела обстоят намного легче:
Простая идентификация (в нашем случае).
Не нужно распределение по клиентам, которые запрашивают их.
Не нужно постоянно следить за сетью.
Получаем вот такую небольшую обертку над клиентом, к которой сможет обратиться любой другой класс и наблюдать за нотификациями:
internal class DefaultNotifySocketCollector( socket: BBSocketClient ) : NotifySocketCollector { override val balanceNotify = socket.notify.baseMap(BalanceNotifyDTO::toDomainEntity) override val levelUpNotify = socket.notify.baseMap(LevelUpDTO::toDomainEntity) private inline fun <reified DTO, R> Flow<SocketEvent.Notify>.baseMap(crossinline mapper: DTO.() -> R) = this .filterIsInstance<BBNotify<*>>() .map { it.result } .filterIsInstance<DTO>() .map(mapper) }
Нюансы
Теперь, мы имеем рабочий код для взаимодействия с сервером по WS, но кое-что мы все-таки забыли в ходе реализации.
Android Doze mode
Google и Android в последнее время все больше и больше задумываются над удобством использования ОС обычными пользователями.
Один из важнейших параметров - это автономность.
Для улучшения этого параметра уже достаточно давно ввели doze mode, который «оптимизирует» процессы в неактивном режиме.
Для нас он означает, что при бездействии устройства (с отключенным экраном) Android начинает убивать сетевые соединения (в т.ч. и наше).
Но у нас есть возможность отслеживать это:
internal class DefaultIdleObserver( private val context: Context ) : IdleObserver { actual override val isIdle = callbackFlow { val filter = IntentFilter(PowerManager.ACTION_DEVICE_IDLE_MODE_CHANGED) val idleBroadcastReceiver = object : BroadcastReceiver() { override fun onReceive(context: Context?, intent: Intent?) { val pm = context?.getSystemService(Context.POWER_SERVICE) as PowerManager launch { send(pm.isDeviceIdleMode) } // true - заходит в doze mode // false - выходит из doze mode } } launch { send(false) } context.registerReceiver(idleBroadcastReceiver, filter) awaitClose { context.unregisterReceiver(idleBroadcastReceiver) } }.distinctUntilChanged() }
Смена сети и подключение к VPN (сейчас очень актуально)
В случае, если пользователь меняет сеть (например, с мобильного интернета на WiFi) или подключается к VPN, то для сокета это означает конец его жизни.
Сеть обрывается - сокет закрывается.
Эти события мы тоже можем отследить с помощью небольшой утилиты:
internal class DefaultConnectionObserver( context: Context ) : ConnectionObserver { private val connectivityManager = context.getSystemService(Context.CONNECTIVITY_SERVICE) as ConnectivityManager actual override val status: Flow<ConnectionObserver.Status> = callbackFlow { val callback = object : ConnectivityManager.NetworkCallback() { override fun onAvailable(network: Network) { super.onAvailable(network) launch { send(ConnectionObserver.Status.AVAILABLE) } } override fun onLosing(network: Network, maxMsToLive: Int) { super.onLosing(network, maxMsToLive) launch { send(ConnectionObserver.Status.LOSING) } } override fun onLost(network: Network) { super.onLost(network) launch { send(ConnectionObserver.Status.LOST) } } override fun onUnavailable() { super.onUnavailable() launch { send(ConnectionObserver.Status.UNAVAILABLE) } } } val request = NetworkRequest.Builder() .addCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET) .addTransportType(NetworkCapabilities.TRANSPORT_VPN) .addTransportType(NetworkCapabilities.TRANSPORT_WIFI) .addTransportType(NetworkCapabilities.TRANSPORT_CELLULAR) .build() connectivityManager.registerNetworkCallback(request, callback) awaitClose { connectivityManager.unregisterNetworkCallback(callback) } }.distinctUntilChanged() }
Периодические разрывы сети от ОС
Несмотря на то, что мы отследили все основные кейсы, Android иногда может неприятно удивить просто разорвав наше соединение без определенной причины.
Для этого мы дополнительно будем следить за состоянием SocketClient - socket.isConnected и пытаться открыть его снова, если что-то пошло не так.
Таким образом, мы имеем еще один утилитарный класс, который следит за нашим подключением и периодически пытается восстановить его:
internal class DefaultSocketConnectionController( connectionObserver: ConnectionObserver, idleObserver: IdleObserver, private val socket: SocketClient<*>, private val scope: CoroutineScope ) : SocketConnectionController { private var restartingJob: Job? = null override val isIdle = idleObserver.isIdle override val status = connectionObserver.status.map(ConnectionObserver.Status::toDomainEntity) override fun startSocketAutoReconnect( onOpened: suspend () -> Unit, onClosed: suspend () -> Unit ) { if (restartingJob != null) return restartingJob = combine( isIdle, status, socket.isConnected.filter(Boolean::not), ::Triple ).map { (idle, status, _) -> when { idle || status == ConnectionStatus.LOST -> ConnectionIntent.DISCONNECT !idle || status == ConnectionStatus.AVAILABLE -> ConnectionIntent.CONNECT else -> ConnectionIntent.IDLE } }.onEach { state -> when (state) { ConnectionIntent.CONNECT -> openSocket(onOpened) ConnectionIntent.DISCONNECT -> closeSocket(onClosed) ConnectionIntent.IDLE -> Unit } }.launchIn(scope) } /** some code */ }
Несмотря на то, что мы уже разобрали почти все взаимодействие мобильного приложения с сервером по WebSocket, есть еще ряд вопрос, которые можно более подробно разобрать в будущих статьях:
Кэширование запросов.
JWT авторизации по
WebSocket.Как справиться с неоднозначной сериализацией.
Итак, мы почти с нуля спроектировали продуктовое решение для двустороннего обмена сообщениями по протоколу WebSocket, которое можно расширять и модифицировать для включения нового функционала.
При этом большая часть кода вполне может функционировать и на KMP, что оставляет зазор для его расширения на другие платформы.
Хотя это и далеко не самое стандартное решение как для мобильных приложений, так и в целом для клиентских, оно имеет ряд преимуществ, которые сложно игнорировать, и во многих сценариях они дают больше возможностей.
И именно таким был наш опыт знакомства с этим протоколом.
В эту небольшую статью вошли месяцы проектирования, разработки и последующих модификаций решения, чтобы обеспечить удобную работу с сетью и простое расширение запросов.
И Я был очень рад поделиться этим опытом с Вами!
