Как стать автором
Обновить
225.57
НЛМК ИТ
Группа НЛМК

Кнопка «F5» устала: real-time уведомления в микросервисной архитектуре

Уровень сложностиСредний
Время на прочтение11 мин
Количество просмотров3.8K

Представьте себе: у вас железнодорожная станция, сотни вагонов, десятки пользователей в системе, каждый раз кто-то нажимает кнопку "Обновить", чтобы узнать — разгрузили ли нужный вагон.

Вся логика обновления построена на "manual refresh". Да-да, пользователь сам жмёт кнопку, чтобы получить свежие данные. Система автоматической разгрузки или другой человек разгрузил что-то на другом конце станции, но вы об этом не узнаете, пока не перезагрузите страницу.

А ещё — избыток HTTP-запросов, polling, перегруженные серверы и полное отсутствие real-time взаимодействия.

Есть вариант! Масштабируемая и отказоустойчивая архитектура с использованием Redis Sentinel + Pub/Sub + WebSocket/SSE.

В статье расскажем, какие проблемы возникают с real-time в Kubernetes, почему стандартные WebSocket-подходы не работают при нескольких подах, как построить отказоустойчивую систему с Redis Sentinel, как сделать real-time UI, сохранив отказоустойчивость и масштабируемость, и как всё это запустить локально для отладки.

Проблема: Медленный и статичный интерфейс

  • Пользователи не видят изменения в реальном времени

  • Если кто-то разгрузил вагоны на другом компьютере, на UI это не отобразится без перезагрузки страницы

  • Частые HTTP-запросы (polling) перегружают сервер

Задача: Реализовать real-time отображение

✔ Сразу показывать обновление количества разгруженных вагонов всем пользователям
✔ Минимизировать нагрузку на сервер (избавиться от постоянного polling’а)
✔ Обеспечить масштабируемость и отказоустойчивость, даже если развернуто несколько подов в Kubernetes

Когда система состоит из нескольких микросервисов, важно организовать масштабируемую и отказоустойчивую доставку уведомлений.
Стандартный WebSocket или SSE (Server-Sent Events) в монолитах работает просто:
Клиенты подписываются → Сервер отправляет уведомления напрямую.

Но если сервис развернут в Kubernetes (K8s) с несколькими подами, возникают сложности:

  • Клиент подключается к одному поду и может не получить уведомление, отправленное другим подом

  • Балансировщик распределяет подключения случайно

  • Если под рестартуется – все подключения теряются

Как мы попробуем решить эту задачу?

Мы разделим архитектуру уведомлений на три ключевых компонента:

1) Бизнес-микросервисы (генераторы событий) – публикуют события в Redis Pub/Sub.
2) Redis Sentinel + Redis Pub/Sub (брокер сообщений) – обеспечивает маршрутизацию и отказоустойчивость.
3) Сервис уведомлений (Notifier Service) с WebSocket/SSE – подписывается на Redis и доставляет уведомления клиентам в реальном времени.

!Клиенту неважно, какой под обрабатывает его WebSocket/SSE – все они получают одно и то же уведомление.

Проблема: Недостатки обычного Redis в Kubernetes

Обычно используют Redis Pub/Sub, но у него есть минусы:
1) Один Redis без Sentinel — один instance (точка отказа, нет High Availability (Высокой Доступности))
2) Если Redis упадёт, то все поды потеряют соединение с уведомлениями

   Какое решение нам нужно?

✔ Redis Sentinel автоматически переключает мастера при сбоях, а реплики обеспечивают высокую доступность и отказоустойчивость

✔ Сервис уведомлений подключается к Redis через Sentinel, поэтому при сбоях система автоматически переподключается к новому мастеру без простоев 

✔ Сервис уведомлений в нескольких подах, который слушает Redis через Sentinel, получает уведомления через Pub/Sub и рассылает их пользователям через WebSocket и SSE 

 Что это нам даёт?

  • Если один Redis-узел падает, Sentinel автоматически назначает новый мастер, и сервис продолжает получать данные.

  • Если под сервиса уведомлений перезапускается, соединение SSE/WebSocket разрывается. Клиентский код обнаружит разрыв (onerror/onclose) и попробует автоматически переподключиться. Балансировщик Kubernetes направит новый запрос клиента на доступный под.

  • Нагрузка на сервер минимизирована — нет постоянных HTTP-запросов (polling).

Как это работает в коде?

dependencies
       <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis-reactive</artifactId>
            <version>3.4.2</version>
        </dependency>
        <dependency>
            <groupId>io.lettuce</groupId>
            <artifactId>lettuce-core</artifactId>
            <version>6.6.0.BETA2</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
       </dependency>

 1. Публикация уведомлений в Redis Sentinel

@Service
@RequiredArgsConstructor
public class MessageNotificationService {

    private final ReactiveStringRedisTemplate redisTemplate;
    private final MessageNotificationConverter messageNotificationConverter;

   /**
     * Публикует уведомление в канал Redis Pub/Sub.
     * <p>
     * Уведомление отправляется во все подписанные сервисы через механизм  
     * Pub/Sub, работающий в Redis Sentinel.
     * </p>
     *
     * @param notificationMessage Сообщение {@link NotificationMessageReq},  
     *                            которое нужно отправить подписчикам. (Ваша ДТО, 
    которое должно быть приведено к единому формату с ответным DTO для удобства сериализации в JSON.)
     */
    public void publish(NotificationMessageReq notificationMessage) {
        redisTemplate.convertAndSend(
                "notifications_channel",  //  Pub/Sub канал в Redis, в который публикуем сообщение
                messageNotificationConverter.convert(notificationMessage) //  Преобразуем в JSON
        ).subscribe();
    }
}

Теперь уведомление реплицируется в кластер Redis Sentinel и доставляется на все поды в Kubernetes.

2. Чтение Redis Pub/Sub и пересылка через SSE/WebSockets

Конфигурация реактивных компонентов для работы с Redis и потоками уведомлений
import lombok.RequiredArgsConstructor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import org.springframework.data.redis.listener.ReactiveRedisMessageListenerContainer;
import reactor.core.publisher.Sinks;


/**
 * Конфигурация реактивных компонентов для работы с Redis и потоками уведомлений.
 */
@Configuration
@RequiredArgsConstructor
public class ReactiveListenerConfig {

    private final ReactiveStringRedisTemplate redisTemplate;

    /**
     * Создаёт и настраивает {@link ReactiveRedisMessageListenerContainer}, который слушает каналы Redis.
     * <p>
     * Используется для подписки на сообщения в Redis с возможностью реактивной обработки.
     * </p>
     *
     * @return Экземпляр {@link ReactiveRedisMessageListenerContainer} с подключением к Redis.
     */
    @Bean
    public ReactiveRedisMessageListenerContainer listenerContainer() {
        return new ReactiveRedisMessageListenerContainer(redisTemplate.getConnectionFactory());
    }

    /**
     * Создаёт многопоточный (`multicast`) Sink для отправки уведомлений в реактивном стиле.
     * <p>
     * Позволяет подписчикам получать уведомления в реальном времени.
     * </p>
     * <p>
     * Используется для потоковой отправки объектов {@link NotificationMessageResp } - ваша придуманная ДТО.
     * </p>
     *
     * @return Экземпляр {@link Sinks.Many} для обработки уведомлений.
     */
    @Bean
    public Sinks.Many<NotificationMessageResp> notificationSink() {
        return Sinks.many().multicast().directAllOrNothing();
    }
}

Конфигурация WebSocket
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

/**
 * Конфигурация WebSocket для отправки уведомлений.
 */
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    /**
     * Настраивает WebSocket STOMP Endpoints.
     * Клиенты подключаются через `/ws`.
     */
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/ws").setAllowedOrigins("*"); // Подключение на `/ws`
    }

    /**
     * Настраивает брокер сообщений WebSocket.
     * Клиенты могут подписываться на `/topic/notifications`.
     */
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableSimpleBroker("/topic"); // Канал для уведомлений
        registry.setApplicationDestinationPrefixes("/app");
    }
}

NotifierService подписывается на Redis Pub/Sub и отправляет уведомления WebSocket/SSE клиентам

@Service
@RequiredArgsConstructor
public class NotifierService {
   
    private final ReactiveRedisMessageListenerContainer listenerContainer;
    
    private final Sinks.Many<NotificationMessageResp> notificationSink;
  
    @PostConstruct
    public void subscribeToRedisNotifications() {
        listenerContainer.receive(new ChannelTopic("notifications_channel")) //слушаем тот же канал, в который слали сообщение
                .map(message -> messageNotificationConverter.convert(message.getMessage()))
                .doOnNext(notification -> {
                    log.info(" Получено уведомление из Redis: {}", notification);
  
                    // Отправка в SSE
                    notificationSink.tryEmitNext(notification); 

                    // Отправка в WebSocket
                    messagingTemplate.convertAndSend("/topic/notifications", notification);
                })
                .subscribe();
    }
  }

3. SSE + Heartbeat, чтобы соединение не разрывалось

Браузеры и балансировщики часто рвут SSE по "таймауту" – исправляем это "пингами"

 @Operation(summary = "Получить поток уведомлений",
               description = "Позволяет подписаться на SSE-уведомления из Redis-канала `notifications_channel`.",
               responses = @ApiResponse(
                       content = @Content(array = @ArraySchema(schema =
                       @Schema(implementation = NotificationMessageResp.class)),
                                          mediaType = MediaType.TEXT_EVENT_STREAM_VALUE)))
@GetMapping(value = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<NotificationMessageResp> getSseNotifications() {
    return Flux.merge(
        notificationSink.asFlux(), 
      // Heartbeat ping
        Flux.interval(Duration.ofSeconds(2))
            .map(e -> NotificationMessageResp.builder()
                                             .event(EventType.PING.name())
                                             .data(RecordData.builder().timestamp(Instant.now()).build())
                                             .build()))
      .share(); //Все подписанные клиенты получат сообщения одновременно!
}
Инструкция: Как задать таймаут в HAProxy Ingress через Kubernetes Dashboard

Можно избавиться от этих строк:

Flux.interval(Duration.ofSeconds(2))
            .map(e -> NotificationMessageResp.builder()
                                             .event(EventType.PING.name())
                                             .data(RecordData.builder().timestamp(Instant.now()).build())
                                             .build())

Если в вашем кластере Kubernetes используется HAProxy Ingress Controller

Шаги: Настройка таймаута в HAProxy Ingress (через UI)

1) Открываем Kubernetes Dashboard и переходим в раздел Networking → Routes.
2) Находим нужный сервис (notifications-service, если используем SSE/WebSocket).
3) Переходим во вкладку YAML (Редактирование манифеста).
4) В metadata.annotations добавляем следующую строку:

  haproxy.router.openshift.io/timeout: "10m"
apiVersion: route.openshift.io/v1
kind: Route
metadata:
  name: notifications-service
  annotations:
    haproxy.router.openshift.io/timeout: "10m"  #  Таймаут соединения (10 минут)

5) Сохраняем изменения и применяем конфигурацию.

Теперь HAProxy не будет разрывать соединения WebSocket/SSE при простое до 10 минут, а клиентский код автоматически переподключится в случае разрыва.

✔ Теперь SSE-соединение не разорвётся из-за тайм-аута!

Скриншёт

Производительность Redis Pub/Sub

Согласно документации Redis, система с 3 подами в Kubernetes может обрабатывать до 500 000 сообщений в секунду через Redis Pub/Sub, в зависимости от конфигурации серверов, сетевых задержек и размера сообщений.

Для нашей задачи такое решение более чем подходит.

Развертывание Redis Sentinel локально для экспериментов через Docker

 1) Создаём файлы конфигурации

 Создаём следующие файлы в рабочей директории:

docker-compose.yml
version: '3.8'

services:

    redis-master:
        image: redis:7.2
        container_name: redis-master
        restart: always
        ports:
            - "6379:6379"  #  Доступен на localhost:6379
        networks:
            - redis_network
        command: [ "redis-server", "/etc/redis/redis.conf" ]
        volumes:
            - ./redis-master.conf:/etc/redis/redis.conf

    redis-slave:
        image: redis:7.2
        container_name: redis-slave
        restart: always
        ports:
            - "6380:6379"  #  Доступен на localhost:6380
        networks:
            - redis_network
        command: [ "redis-server", "/etc/redis/redis.conf", "--replicaof", "redis-master", "6379" ]
        volumes:
            - ./redis-slave.conf:/etc/redis/redis.conf

    redis-sentinel:
        image: redis:7.2
        container_name: redis-sentinel
        restart: always
        ports:
            - "26379:26379"  #  Доступен на localhost:26379
        networks:
            - redis_network
        command: ["redis-server", "/etc/redis/sentinel.conf", "--sentinel"]
        volumes:
            - ./sentinel.conf:/etc/redis/sentinel.conf

networks:
    redis_network:
        driver: bridge

redis-master.conf
port 6379
bind 0.0.0.0
appendonly yes
protected-mode no

redis-slave.conf
port 6379
bind 0.0.0.0
appendonly yes
replicaof redis-master 6379
protected-mode no

sentinel.conf
port 26379
bind 0.0.0.0
sentinel monitor mymaster 127.0.0.1 6379 2
sentinel down-after-milliseconds mymaster 5000
sentinel failover-timeout mymaster 10000
sentinel parallel-syncs mymaster 1

Выполняем команду:

docker compose up -d
Как проверить работоспособность Redis Sentinel после развертывания?

После запуска Docker Compose с Redis Sentinel, можно выполнить следующие команды:

1) Проверка статуса Redis Sentinel

Убедимся, что Sentinel видит мастер-узел и следит за репликами:

docker exec -it redis-sentinel redis-cli -p 26379 info Sentinel

2) Проверка ролей мастер/реплика

 Проверим, кто сейчас мастер и сколько реплик подключено:

docker exec -it redis-master redis-cli info replication

Если всё работает, увидим строку:

role:master
connected_slaves:1

Аналогично можно проверить реплику:

docker exec -it redis-slave redis-cli info replication

Ответ должен содержать role:slave.

3) Тест Redis Pub/Sub (отправка уведомления)

 Подписка на канал notifications_channel:
Открываем первый терминал и запускаем слушатель Redis:

docker exec -it redis-master redis-cli SUBSCRIBE notifications_channel

Публикация сообщения в канал:
Во втором терминале отправляем тестовое событие:

docker exec -it redis-master redis-cli PUBLISH notifications_channel "Test message"

В первом терминале должно появиться:

1) "message"
2) "notifications_channel"
3) "Test message"

Это значит, что Pub/Sub работает корректно! 

Конфигурация spring-boot приложения для подключения к Redis Sentinel локально

application.yml
spring:
   data:
      redis:
         sentinel:
             master: ${SPRING_DATA_REDIS_SENTINEL_MASTER:mymaster}  # Имя master узла, указанное в sentinel.conf
             nodes: ${SPRING_DATA_REDIS_SENTINEL_NODES:localhost:26379} # Список Sentinel узлов
             password: ${SPRING_DATA_REDIS_SENTINEL_PASSWORD:mystrongpassword}  # Пароль для Sentinel (если он установлен)
         password: ${SPRING_DATA_REDIS_PASSWORD:mystrongpassword}  # Пароль для Redis (отдельно от Sentinel). Для локальных эксперементов не пригодится.
         timeout: ${SPRING_DATA_REDIS_TIMEOUT:60000}  # Таймаут в миллисекундах

Вывод: Почему Redis Sentinel + SSE/WebSocket — мощное решение?

✔ Уведомления отказоустойчивы благодаря Redis Sentinel — даже если одна нода Redis выходит из строя, система продолжает работать без перебоев.
✔ Поддержка SSE & WebSocket делает UI максимально реактивным — данные обновляются в реальном времени без необходимости ручного обновления страницы.
✔ Горизонтальное масштабирование в Kubernetes — сервис уведомлений легко масштабируется без дублирования событий.
✔ Redis Pub/Sub гарантирует, что все подписанные поды получают уведомления — нет проблем с балансировкой нагрузки.
✔ Стабильность соединений — обновление других микросервисов не влияет на сервис уведомлений, соединения SSE и WebSocket не разрываются при деплоях.
✔ Если Redis или сервис уведомлений вдруг перестанет работать, это затронет только механизм real-time обновления UI. Однако все остальные процессы, включая операции разгрузки вагонов, продолжат работать: пользователи смогут обновлять данные вручную.

Теперь система уведомлений в вашей архитектуре полностью отказоустойчива, масштабируема и готова к высоким нагрузкам! 

Дисклеймер: Все DTO и URL-адреса в данной статье являются выдуманными и приведены исключительно в демонстрационных целях. Все конфигурационные файлы основаны на стандартных настройках. Любые совпадения с реальными сервисами, системами или инфраструктурой случайны и не являются преднамеренными.

Теги:
Хабы:
+20
Комментарии18

Публикации

Информация

Сайт
nlmk.com
Дата регистрации
Дата основания
2013
Численность
свыше 10 000 человек
Местоположение
Россия