Как стать автором
Обновить

Reactive Spring Cloud Gateway: конвейер WebSocket в Restful и обратно

Время на прочтение17 мин
Количество просмотров6.2K

В предыдущей статье Reactive Spring ABAC Security: безопасность уровня Enterprise впервые и крайне кратко представлен описанный в данной статье подход. Пришло время детально раскрыть нюансы и глубину потенциала нового подхода, когда основным протоколом для взаимодействия браузера и мобильных приложений с сервисами Spring Cloud выступает стабильный и быстрый WebSocket.

Аннотация

Spring Cloud Gateway официально служит следующим целям: слоем безопасности, маршрутизатором, балансировщиком, провайдером протоколов сжатия. Gateway ничего не знает о передаваемых данных и тем более о бизнес-логике – это инфраструктурный сервис.

Связь Gateway с WebSocket обуславливает здравый смысл. Если перенести логику WebSocket в Gateway, то при высоких нагрузках потребуется ненамного увеличить количество подов с одной стороны (из-за ограничения количества открытых соединений), а с другой если через WebSocket работают не только браузеры, но и мобильные приложения – то такой подход становится естественным.

В статье описываются явные преимущества данного подхода, детально перечисляются нюансы реализации и определяются общие перспективы дальнейшего развития. Но основная идея сводится к уменьшению стоимости разработки за счёт переиспользования базовых технологий для всего необходимого функционала.

Что позволит в итоге освободившиеся ресурсы перекинуть на улучшения качества отрисовки интерфейсов, добавления современных спец. эффектов, увеличения динамичности интерфейса на действия пользователя и получаемых данных в реальном режиме времени.

Введение

В эпоху рассвета мобильных приложений на Android, в то время, когда в 2009 году Samsung объявил об отказе от Symbian и переходе на Android (iOS ещё не было) – WebSocket в том же году был ещё в виде черновика, т.е. исторически мобильные приложения не застали эту технологию и развитие ушло своим чередом.

Кроме того, существовала проблема при переключении телефона с одной соты на другую – также менялся и IP-адрес телефона, что прерывало все установленные соединения. Данный фактор был не в пользу WebSocket, который в исходной версии не умел переподключаться при обрыве связи. Сейчас, качество связи стала крайне стабильной, а IP-адреса внутри сети мобильного оператора привязаны к телефону. Можно сказать появились тепличные условия для восхождения WebSocket и почему бы и нет.

В представленном проекте https://github.com/SevenParadigms/spring-cloud-websocket-gateway, интеграция WebSocket не влияет на работу и исходный код существующих сервисов, т.к. конвейер сообщений WebSocket в Restful прозрачно конвертирует запросы и ответы.

Преимущества использования конвейера WebSocket в Restful следующие:

  1. Классический микросервисный подход Spring Cloud к организации обработки потока данных на стороне бэкенда как от браузера так и от мобильных приложений по WebSocket, когда сервисам не интересно откуда приходят http-запросы;

  2. Единое понятие сессии: отключение всех соединений WebSocket пользователя однозначно сообщит о закрытии сессии как в браузере, так и в мобильном приложении (такое знание для многих бизнес-процессов критично);

  3. Открытые соединения WebSocket точно определяют количество пользователей онлайн, в то время как в случае http-запросов, онлайн считается по количеству запросов за последние полчаса;

  4. Более не требуется тратить значительные ресурсы на организацию Push-уведомлений – полученные через WebSocket уведомлений ничем не отличаются визуально, при условии когда приложение запущено (что в подавляющем большинстве случаев приемлемо);

  5. При высоких нагрузках WebSocket заметно быстрее HTTP и значительно меньше потребляет трафика, при этом, поддерживает полнодуплексный асинхронный обмен сообщениями в обе стороны;

  6. Из коробки Origin-based cross-domain policy (политика безопасности на основе происхождения);

  7. Прямым преимуществом использования WebSocket перед http-запросами в возможности обратной связи в реальном времени, что позволяет строить современные высококонкурентные реактивные интерфейсы в мобильных приложениях.

Если принять во внимание факт увеличения расхода батареи из-за постоянно открытого соединения WebSocket, то согласно GSM Arena с 2010 года плотность аккумуляторов к 2020 году выросла в 3 раза и при том же форм-факторе ёмкость батареи достигла 5000 мАч – это действительно много.

А если ещё учесть время на полную зарядку современных аккумуляторов от сети в 15 минут, то при таких темпах развития данный факт можно смело отнести к незначительным.

Конечно, Gateway продолжит маршрутизацию и балансировку http-запросов, т.к. останутся: авторизация пользователя, запросы выдающие большие объёмы данных, выгрузка файлов в браузер, а также интеграция внешних систем.

Рассматриваемый в статье проект позволяет снизить стоимость разработки и владения за счёт использования классического подхода http-запросов при работе с браузером и мобильными приложениями по протоколу WebSocket.

Представленный исходный код несмотря на небольшой размер и простоту реализации – эффективно выполняет все возложенные на него функции.

Поддержка WebSocket в Android и iOS

В 2010 году в iOS 4.2 впервые появилась встроенная поддержка протокола WebSocket.

Наиболее распространённая практика использования WebSocket в среде iOS – это библиотека SocketRocket от Facebook 2012 года, а в среде Androidбиблиотека Java-WebSocket от opensource разработчика Nathan Rajlich, создавший проект в 2010 году (скорее всего для нужд одного из клиентов фондовой биржи NASDAQ).

В 2013 году один из разработчиков популярного проекта Socket.io закомитил клиент WebSocket под Android, которая также активно дорабатывается и в настоящее время. А спустя 4 года, основные разработчики Socket.io выпустили клиент под iOS также с полной поддержкой WebSocket.

В 2014 году вышла в релиз спецификация JSR-356, поддержка которой привела к включению в Java Development Kit (JDK) пакета javax.websocket. В дальнейшем, непосредственно в Kotlin Standard Library была внедрена обвёртка вокруг DOM API с базовым классом WebSocket – который также можно использовать в Android приложениях.

Помимо нативных библиотек от команды Socket.io, существует более десятка сторонних с полной имплементацией протокола WebSocket по спецификации RFC 6455 и большинство из них активно поддерживаются довольно крупными сообществами.

WebSocket профессионально присутствует в мобильных приложениях более 10 лет, т.к. на глобальном уровне требование к доставке данных в реальном режиме времени постоянно расширяется с каждым новым типом данных из года в год.

Основные аспекты реализации

Браузер или мобильное приложение отправляет и принимает запросы по WebSocket в обвёртке:

data class MessageWrapper(
    val type: HttpMethod = HttpMethod.GET,
    val baseUrl: String? = null,
    val uri: String = StringUtils.EMPTY,
    val body: JsonNode? = null
)

Атрибуты образуют http-запрос к сервисам:

  • type – тип GET, HEAD, POST, PUT, PATCH, DELETE, TRACE (по умолчанию GET);

  • baseUrl – имя сервиса, например: http://account-service;

  • uri – метод контроллера и query-запрос, например: findAllPaged?sort=id:desc;

  • body – json запроса при типе запроса POST, PUT или PATCH.

При формировании ответа body заменяется ответом с сервиса: message.copy(body = it), а baseUrl и uri служат идентификаторами запроса в обработчике WebSocket в браузере или в мобильном приложении.

Функция преобразования запросов WebSocket в Restful и обратно:

fun handling(message: MessageWrapper, username: String) {
        kafkaPublisher.publishWebSocketKafka(
            WebSocketEvent(username, message.baseUrl, message.uri, message.body.toString())
        ).subscribe()
        if (message.baseUrl != null) {
            val webClient = Beans.of(WebClient.Builder::class.java).baseUrl(message.baseUrl).build()
            val response = when (message.type) {
                HttpMethod.GET -> webClient.get().uri(message.uri).retrieve()
                HttpMethod.POST -> webClient.post().uri(message.uri).body(BodyInserters.fromValue(message.body!!))
                    .retrieve()
                HttpMethod.PUT -> webClient.put().uri(message.uri).body(BodyInserters.fromValue(message.body!!))
                    .retrieve()
                HttpMethod.DELETE -> webClient.delete().uri(message.uri).retrieve()
                HttpMethod.PATCH -> webClient.patch().uri(message.uri).body(BodyInserters.fromValue(message.body!!))
                    .retrieve()
                HttpMethod.HEAD -> webClient.head().uri(message.uri).retrieve()
                HttpMethod.OPTIONS -> webClient.options().uri(message.uri).retrieve()
                HttpMethod.TRACE -> webClient.method(HttpMethod.TRACE).uri(message.uri).retrieve()
            }
            response
                .onStatus({ status -> status.isError })
                { clientResponse ->
                    clientResponse.bodyToMono(ByteArrayResource::class.java)
                        .map { responseAnswer: ByteArrayResource ->
                            WebClientResponseException(
                                clientResponse.rawStatusCode(),
                                clientResponse.statusCode().name,
                                clientResponse.headers().asHttpHeaders(),
                                responseAnswer.byteArray,
                                Charsets.UTF_8
                            )
                        }
                }
                .bodyToMono(JsonNode::class.java).subscribe {
                    info { "Request[${message.baseUrl}${message.uri}] by user[$username] accepted" }
                    debug { it.toString() }
                    val sessionChain = clients.getIfPresent(username)
                    sessionChain?.sendMessage(message.copy(body = it))
                }
        }
    }

Детальный обзор реализации

Рассмотрим публикацию точки входа для реактивного WebSocket для конвертации запросов из WebSocket в HTTP к сервисам и обратно:

@Component
@WebSocketEntryPoint("/wsf")
class WebSocketFactory(val kafkaPublisher: EventDrivenPublisher) : WebSocketHandler {
    override fun handle(session: WebSocketSession) = session.handshakeInfo.principal
    .cast(UsernamePasswordAuthenticationToken::class.java)
    .flatMap { authToken: UsernamePasswordAuthenticationToken ->
        val output = session.send(Flux.create {
            authToken.credentials
            clients.put(authToken.name, WebSocketSessionChain(
                session = session, tokenHash = authToken.credentials as Long, chain = it))
        })
        val input = session.receive()
            .map { obj: WebSocketMessage -> obj.payloadAsText.parseJson(MessageWrapper::class.java) }
            .doOnNext { handling(it, authToken.name) }.then()

        Mono.zip(input, output).then().doFinally { signal: SignalType ->
            val sessionChain = clients.getIfPresent(authToken.name)!!
            sessionChain.session.close()
            kafkaPublisher.publishDisconnect(
                UserDisconnectEvent(authToken.name, sessionChain.tokenHash, false)
            .subscribe {
                clients.invalidate(authToken.name)
                info { "Connection close with signal[${signal.name}] and user[${authToken.name}]" }
            }
        }
        kafkaPublisher.publishConnect(
            UserConnectEvent(authToken.name, authToken.authorities.map { it.authority })
        )
    }
<dependency>
    <groupId>com.github.ben-manes.caffeine</groupId>
    <artifactId>caffeine</artifactId>
    <version>3.1.0</version>
</dependency>

Кэш clients на базе библиотеки Caffeine, имеет ограничения по времени бездействия и по размеру количества открытых соединений WebSocket, а при завершении сессии по таймауту посылает в Kafka соответствующее событие:

val clients = Caffeine.newBuilder()
    .maximumSize(Beans.getProperty(Constants.GATEWAY_CACHE_SIZE, Long::class.java, 10000))
    .expireAfterAccess(
        Beans.getProperty(Constants.GATEWAY_CACHE_ACCESS, Long::class.java, 1800000),
        TimeUnit.MILLISECONDS
    )
    .removalListener { key: String?, value: WebSocketSessionChain?, cause: RemovalCause ->
        if (cause.wasEvicted() && ObjectUtils.isNotEmpty(key)) {
            kafkaPublisher.publishDisconnect(
                UserDisconnectEvent(key, value!!.tokenHash, true)
            ).subscribe {
                value.session.close()
                info { "WebSocket disconnected by timeout with user[$key]" }
            }
        }
    }.build<String, WebSocketSessionChain>()

События Kafka отправляются через сервис EventDriverPublisher, где представлены методы по каждому типу события – это связано с необходимостью типизации модели для сериализации, т.к. для сжатия используется библиотека Apache Avro. Все модели Kafka предварительно генерируются плагином avro-maven-plugin.

Использование протокола сжатия Avro позволяет строить именно типизированные SQL-запросы по историческим данным Kafka посредством процессора Lenses SQL:

Описание модели WebSocketEvent в нотификации Avro:

{
  "namespace": "io.github.sevenparadigms.gateway.kafka.model",
  "type": "record",
  "name": "WebSocketEvent",
  "fields": [
    {
      "name": "username",
      "type": {
        "type": "string",
        "default": "null"
      }
    },
    {
      "name": "baseUrl",
      "type": "string"
    },
    {
      "name": "uri",
      "type": "string"
    },
    {
      "name": "body",
      "type": "string"
    }
  ]
}

Реализация методов класса EventDrivenPublisher:

private val producerProps: Map<String, Any> = mapOf(
    BOOTSTRAP_SERVERS_CONFIG to kafkaProperties.broker,
    KEY_SERIALIZER_CLASS_CONFIG to kafkaProperties.serializer,
    VALUE_SERIALIZER_CLASS_CONFIG to kafkaProperties.serializer,
    SCHEMA_REGISTRY_URL_CONFIG to kafkaProperties.schemaRegistryUrl,
    VALUE_SUBJECT_NAME_STRATEGY to RecordNameStrategy::class.java,
    AUTO_REGISTER_SCHEMAS to true
)

fun <T> publish(topic: String, event: T, key: String = UUID.randomUUID().toString()) =
    KafkaSender.create<String, T>(SenderOptions.create(producerProps)).createOutbound()
        .send(Mono.just(ProducerRecord(topic, key, event)))
        .then()
        .doOnSuccess { info { "Successfully sent to topic[$topic]: $event with id=$key" }  }

fun publishWebSocketKafka(event: WebSocketEvent) = publish(kafkaProperties.webSocketKafkaTopic, event)

fun publishConnect(event: UserConnectEvent) = publish(kafkaProperties.userConnectTopic, event)

fun publishDisconnect(event: UserDisconnectEvent): Mono<Void> {
    eventPublisher.publishEvent(RevokeTokenEvent(hash = event.hash, source = event.username))
    return publish(kafkaProperties.userDisconnectTopic, event)
}

Конфигурация Kafka в application.yml:

kafka:
  web-socket-topic: websocket-transport
  web-socket-kafka-topic: websocket-kafka
  user-connect-topic: user-connect-event
  user-disconnect-topic: user-disconnect-event
  broker: localhost:9092
  group-id: websocket-gateway
  serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
  deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
  schema-registry-url: http://localhost:8081

Рассмотрим кусочки кода точки входа для WebSocket:

val output = session.send(Flux.create {
    clients.put(authToken.name, WebSocketSessionChain(session, it))
})

здесь видно, что соединение WebSocket через объект it = FluxSink сохраняется в кэше clients для дальнейшей отправки сообщений пользователю в браузер или в мобильное приложение в структуре MessageWrapper по его идентификатору, который представлен как name;

Mono.zip(input, output).then().doFinally { signal: SignalType ->
    val sessionChain = clients.getIfPresent(authToken.name)!!
    sessionChain.session.close()
    kafkaPublisher.publishDisconnect(
        UserDisconnectEvent(authToken.name, sessionChain.tokenHash, false)
    ).subscribe {
        clients.invalidate(authToken.name)
        info { "Connection close with signal[${signal.name}] and user[${authToken.name}]" }
    }
}

а конструкция doFinally { } позволяет перехватить момент закрытия соединения WebSocket, корректно завершить сессию и отправить событие Kafka.

<dependency>
    <groupId>io.github.sevenparadigms</groupId>
    <artifactId>reactive-spring-abac-security</artifactId>
    <version>1.5.2</version>
</dependency>

В Gateway включена библиотека reactive-spring-abac-security и чтобы подключится к WebSocket необходимо передать в заголовке jwt-токен, который для производительности валидируется только по времени и по подписи публичным ключом из application.yml.

spring:
  security:
    jwt:
      public-key:
        MIIDeTCCAmGgAwIBAgIEFzIFujANBgkqhkiG9w0BAQsFADBtMQswCQYDVQQGEwJG
        UjEQMA4GA1UECBMHVW5rbm93bjEOMAwGA1UEBxMFUGFyaXMxFzAVBgNVBAoTDlNl
        dmVucGFyYWRpZ21zMQ8wDQYDVQQLEwZnaXRodWIxEjAQBgNVBAMTCUxhbyBUc2lu
        ZzAeFw0yMjA0MDMxODQyMDRaFw0zMjAzMzExODQyMDRaMG0xCzAJBgNVBAYTAkZS
        MRAwDgYDVQQIEwdVbmtub3duMQ4wDAYDVQQHEwVQYXJpczEXMBUGA1UEChMOU2V2
        ZW5wYXJhZGlnbXMxDzANBgNVBAsTBmdpdGh1YjESMBAGA1UEAxMJTGFvIFRzaW5n
        MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAjtdx8tYDDRFUpw3oJdFx
        Avcho5ytRQt1PZUymRoioO28RO9mXdrhJgKXA2MFlmjnzD/yRwR/PqZcneKz7rKx
        kN14HYQNxgKrUFNZwtAtePiTAcAPy4NqtVeE8pS5djQ+bIqlpnJUhYvtK1vDlMkS
        KUJr/N2/sRAQcH8fQiPG5vwI+MpHjWjqjjM+ycslPWqQp2QguaqxMd4IAjL8fZnP
        2LGyCZdZCRbtu3TknW+zmgVMF9hiEdtUX677cBfamnslpCUe4ACI5aziwua5GQZV
        DwfaFf6kOAtKcEa7CUy3axCs82KVa3lfPW/b8ALWDllbjYLZWVwNfvR5bKFFg2tk
        GQIDAQABoyEwHzAdBgNVHQ4EFgQU29M6xK0D1NAvRRE1MApZv4Qr0l8wDQYJKoZI
        hvcNAQELBQADggEBADCIzI/jC+3dXnhdca2ozwH6Oq46coT61tmLnCmlpTvE352w
        g/FhpujILUOIwaShKjgIiBA1u1DYrZM1m9BoZ6/YuXa9OYpCK0jamuug4Vnz0bIm
        fQIQPfCMJcouwc4pCm8jAzWSo8xfTJ/yhUnqt7/NQkGuSWsHVZN9O1leKVa2xTEU
        C5APTpX7Rj2+mU8c/fDzFA1m+LXYp2T3dbi3yVOTzSwRkE84sE18fdgRuvJfpmxL
        W3BuVKQ9/1bzpcTK1onKw7WNqrjCoO37G+d42IeDzXMdDjyI3POYYy8g/o//sp6O
        JhhMDEwt2aEAKEVlQxYzgMBn8HeUQrHSeX+ML8Q=
<dependency>
  <groupId>io.github.sevenparadigms</groupId>
  <artifactId>kubernetes-embedded-hazelcast</artifactId>
  <version>1.2.8</version>
</dependency>

В Gateway также включена библиотека kuberbetes-embedded-hazelcast, которая инициализируется в Spring как CacheManager и запускает сервер Hazelcast как Embedded вместе с сервисом. Уникальной особенностью данной библиотеки является возможность задания максимального размера кэша по его наименованию в application.yml, что отсутствует в исходном Hazelcast.

Также в библиотеке присутствует интерфейс AnySerializable, который позволяет оперировать любыми типами данных со скоростью одновременно вставки и чтения более 1000 раз в миллисекунду.

Необязательные параметры jwt-кэша прописываются в application.yml:

spring.cache:
  jwt.expireAfterWrite: 300000 # milliseconds, by default 5 minutes
  jwt.maximumSize: 10000 # by default

В кластере Hazelcast хранится хэш jwt-токена, рассчитанный алгоритмом Murmur. Кэшируется хэш на уровне подов одного сервиса на время жизни токена и при повторном обращении результат валидации возвращается из кэша. Токен отзывается по завершении работы сессии пользователя при котором инициируется событие Spring RevokeTokenEvent и в запись кэша токена вносится признак деактивации.

Внешняя интеграция в WebSocket

Для асинхронной отправки сообщений в WebSocket через событие Kafka, как один из способов, добавлен роутер:

@Configuration
class RoutesConfiguration(private val kafkaHandler: KafkaHandler) {
    @Bean
    fun route(): RouterFunction<ServerResponse> = router {
        ("/kafka").nest {
            accept(MediaType.APPLICATION_JSON).nest {
                POST("", kafkaHandler::publish)
            }
        }
    }
}

который ожидает модель EventWrapper:

data class EventWrapper(
    val topic: String = StringUtils.EMPTY,
    val body: JsonNode = JsonUtils.objectNode()
)

и вызывает метод publish:

@Component
class KafkaHandler(private val kafkaPublisher: EventDrivenPublisher) {
    fun publish(request: ServerRequest) = request.bodyToMono(EventWrapper::class.java)
        .map { kafkaPublisher.publish(it.topic, it.body.jsonToObject(WebSocketEvent::class.java)) }
        .map { ServerResponse.ok().build() }
        .doOnError { error("Exception while trying to process event: " + it.message) }
}

после чего, в произвольный топик Kafka публикуется сообщение из body.

Если мы хотим отправить сообщение именно в WebSocket и определённому пользователю, тогда в topic указывается значение websocket-transport, а в body описывается модель MessageWrapper:

{
    "topic": "websocket-transport",
    "body": {
        "username": userId,
        "baseUrl": "http://account-service",
        "uri": "cash-out",
        "body": { "value": -100 }
    }
}

При желании, сервис может напрямую в Kafka отправить сообщение пользователю в WebSocket не вызывая интеграционный метод /kafka в Gateway, а также подписаться на события канала websocket-kafka, куда помещаются все принятые из WebSocket сообщения. При этом, сообщения с пустым baseUrl не исполняются конвейером, но также помещаются в топик websocket-kafka.

Другими словами, сервисы могут спокойно перейти во взаимодействие с WebSocket через Kafka, асинхронно принимая и отправляя сообщения – это та же микросервисная архитектура с брокером сообщений в качестве транспорта. Данный подход особенно оправдывает себя при высоких нагрузках, а также позволяет максимально эффективно использовать "железо" распределяя нагрузку в подах сервиса через очередь непосредственно в освободившийся поток процессора.

Проектируя и разрабатывая крупные государственные системы с распределением в региональные ЦОД, всегда вносил в архитектуру одновременно 3 вида транспорта для каждого запроса: http, брокер сообщений и предварительную запись на диск кластером in-memory базы данных с итоговой проверкой доставки по каждому из маршрутов – всё это на случай ядерной войны и это не шутка.

Возникла необходимость самому реализовывать механизм гарантированной доставки через реализацию алгоритмов распределённого реплицируемого блочного устройства (DRBD), когда в условиях частых изменений данных на всех узлах и нестабильной связи в дальних регионах – уже через месяц штатные средства репликации и синхронизации Oracle переставали работать от слова совсем.

В Spring Cloud есть два способа обработки ошибок транспорта: Circuit Breaker и Retry, предлагающие большой спектр атрибутов управления поведением при возникновении ошибки. Их можно использовать вместе и естественным образом это делает библиотека:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-circuitbreaker-resilience4j</artifactId>
</dependency>

где через аннотации над функциями выстраивается посредством AspectJ управляющая инфраструктура, которая также позволяет обработать ошибки транспорта Kafka, получив в Exception все детали Event. Главное при этом не забывать про идемпотентность.

Поэтому рекомендую совмещать доступность сервисов по http и kafka на случай выхода из строя кластера Kafka во время промышленной, природной или военной катастрофы.

И наконец рассмотрим класс KafkaConsumerConfiguration, где создана подписка на топик websocket-transport, которая перенаправляет все принятые события в WebSocket:

private val receiverOptions = ReceiverOptions.create<String, WebSocketEvent>(
    mapOf(
        BOOTSTRAP_SERVERS_CONFIG to kafkaProperties.broker,
        GROUP_ID_CONFIG to kafkaProperties.groupId,
        KEY_DESERIALIZER_CLASS_CONFIG to kafkaProperties.deserializer,
        VALUE_DESERIALIZER_CLASS_CONFIG to kafkaProperties.deserializer,
        AUTO_OFFSET_RESET_CONFIG to "earliest",
        ENABLE_AUTO_COMMIT_CONFIG to true,
        SCHEMA_REGISTRY_URL_CONFIG to kafkaProperties.schemaRegistryUrl,
        VALUE_SUBJECT_NAME_STRATEGY to RecordNameStrategy::class.java,
        SPECIFIC_AVRO_READER_CONFIG to true
    )
).commitInterval(Duration.ZERO)
    .commitBatchSize(0)
    .subscription(setOf(kafkaProperties.webSocketTopic))

@Bean
fun listenWebSocketEvent(webSocketFactory: WebSocketFactory) = 
  KafkaReceiver.create(receiverOptions)
    .receive()
    .concatMap { record ->
        Mono.fromRunnable<Void> {
            val it = record.value()
            debug { "Transfer kafka message to WebSocket: $it" }
            webSocketFactory.get(it.username!!)?.sendMessage(it.copyTo(MessageWrapper()))
        }
    }.subscribe()

где из локальной коллекции пода открытых соединений в webSocketFactory по имени пользователя получаем объект FluxSink<WebSocketMessage> для отправки сообщения.

В конфигурации потребителя Kafka не указан CLIENT_ID – что позволяет всем подам сервиса по GROUP_ID одновременно обработать сообщение, т.к. соединение WebSocket присутствует только в одном из подов.

В качестве итога обзора небольшого, но эффективного кода можно также показать то небольшое количество подключаемых зависимостей в файле сборки pom.xml:

<spring-cloud.version>2021.0.1</spring-cloud.version>
<abac-security.version>1.5.2</abac-security.version>
<reactor-kafka.version>1.3.11</reactor-kafka.version>
<avro.version>1.11.0</avro.version>
<kafka-avro.version>7.1.1</kafka-avro.version>
<hazelcast.version>5.1.1</hazelcast.version>
<embedded-hazelcast.version>1.2.8</embedded-hazelcast.version>

Резюме

Простая идея интеграции в действующий Spring Cloud Gateway конвейера WebSocket в Restful уже существующих микросервисов и обратно – открывают множество возможностей и перспектив. В частности, отказа от часто дорогой инфраструктуры вокруг Push-уведомлений. При этом, Push-уведомления имеют ряд недостатков, такие как произвольные задержки транспорта, внеочерёдность уведомлений, переполнение пула очереди.

Кроме того, существующий фронт в браузере и в мобильных приложениях необязательно сразу переписывать под использование конвейера WebSocket, ведь Gateway продолжает свою работу по маршрутизации и балансировке.

Сначала, на фронт можно добавить асинхронную фабрику обработки входящих сообщений для реактивной адаптации интерфейса, а потом достаточно подменить используемую сетевую библиотеку на свою с тем же API, чтобы на заднем фоне упаковать запросы в обвёртку и затем отправить через WebSocket.

Представленный подход позволяет в любой момент перевести транспорт особо чувствительного микросервиса с http на kafka, что помимо управляемого трафика также приведёт к эффективному использованию ресурсов и соответственно удешевлению владения.

На примере организации процесса бизнес-логирования действий системы или аудита, нагрузка на БД при этом порой превышает 50%, то здесь решение часто видят в агрегации бизнес-логов через Kafka, чтобы затем методом Batch разом сохранить.

Есть две причины на мой взгляд почему аудит не стоит делать через Kafka: данные аудита удобно использовать в бизнес-логике и возможно в одной бизнес-операции, а также теряется согласованность при потери данных до записи в журнал WAL, но при наличии в Kafka.

Наиболее эффективно задача аудита, с точки зрения простоты реализации и гарантии консистентности, решается через создание виртуального потокового репликатора PostgreSQL – который даже встроен в реактивный драйвер r2dbc-postgres:

Flux<T> replicationStream = replicationConnection
    .startReplication(replicationRequest).flatMapMany(it -> {
    return it.map(byteBuf -> {…})
        .doOnError(t -> it.close().subscribe());
});

где при сохранении данных в master-ноду лаг в PostgresSQL 14 составляет менее 100 мс в реальном времени, а если сервис виртуального репликатора запустить вместе с PostgreSQL, то лаг составит менее 5 мс – что весьма и востребовано. Конечно, аудит не все данные фиксирует в реальном времени, некритичные можно аккумулировать и сбрасывать через протокол Copy примерно каждые 500 мс.

Не стоит воспринимать гарантию доставки Kafka как панацею, довольно часто видел как в крупных информационных системах, кластера Kafka (+ZooKeeper) и ElasticSearch размещались целиком в одном ЦОД – при этом система используется во всех регионах. Это позволяет при диверсии или прямой атаке, техногенном или природном катаклизме лишиться системы целиком.

WebSocket на сегодняшний день самый популярный протокол реального времени, ставшим стандартом де-факто в мобильных приложениях, таких как финансовые инструменты, социальные сети, навигация по местоположению, заказ такси, доставка еды и конечно компьютерные игры.

Современные библиотеки WebSocket для Android и iOS реализуют широкий спектр методов и интерфейсов, позволяющий разработчикам даже с небольшим опытом создавать полноценные приложения реального времени с простым исходным кодом.

Так, в своё время, разработал ядро высоконагруженной криптовалютной биржи со схлопыванием более 100 тыс. контрактов по сделке за 1 секунду и конечно основным протоколом в мобильных приложениях биржи был WebSocket.

Архитектура WebSocket соответствует модели управляемых событий Event-Driven (EDA), а код оптимизирован для минимизации overhead по трафику и latency при передаче данных по сети. В наступившее время широкополосных каналов Интернет – WebSocket позволяет в полной мере реализовать реактивные интерфейсы, «оживляя» таким образом мобильное приложение в руках пользователя.

Буду рад, если данная статья помогла кому-то принять решение о выборе в качестве основного транспорта WebSocket как для браузера, так и для мобильных приложений. Ведь переход, как было рассмотрено, не влечёт никаких рисков, не отражается на работоспособности и исходном коде существующего функционала. При этом преимуществ такого подхода не перечесть.

Теги:
Хабы:
Рейтинг0
Комментарии9

Публикации

Истории

Работа

Ближайшие события