Горизонтальное масштабирование websocket-ов на Ruby

    Не так давно вышла статья, в которой автор описывал свой framework для написания приложений с использованием Ruby, Sinatra и websoсket. Но в том решении не был затронут вопрос горизонтального масштабирования. Так при подключении к одному из узлов, пользователи могут получать уведомления/данные только о событиях/изменениях, вызванных пользователями этого же узла, а при изменениях, внесенных через другой, они не узнают. Для решения данной задачи необходимо организовать общую шину данных. Рассматривать данную задачу буду в контексте обмена сообщениями клиент-клиент.

    Шина данных


    Требования, которые будем предъявлять к шине следующие:
    • простота работа;
    • передача в «реальном времени»;
    • производительность.

    Организовать шину можно через хранилище с периодическим опросом, либо через сервер очередей.
    Первый вариант не удовлетворяет второму условию, т.к. задержка в передаче будет ровна периоду опроса хранилища. Уменьшение периода приведет к росту нагрузки на него. Поэтому этот вариант отметаем сразу.

    Второй вариант подходит лучше всего. В данном случае можно воспользоваться специализированными решениями на подобии RabbitMQ, ActiveMQ. Оба этих продукта представляют из себя серьезные решения, со множеством функций, хорошим масштабированием. Можно использовать и их, но нужно оценить, не будет ли это пушкой по воробьям. Кроме подобных решений функционал очередей предоставляет и Redis, в добавок получаем key-value хранилище, которое нам тоже понадобится.

    Redis предоставляет простейший механизм Pub-Sub, которого достаточно для нашей задачи. Он достаточно быстр, прост в работе и имеет малые задержки при передаче.

    Решение


    Наша система будет иметь следующую схему.



    Сообщения между пользователями одного узла передаются напрямую, а сообщения между узлами через шину.
    Для этого:
    1. узел генерирует уникальное имя;
    2. подписывается по нему на сообщения в Redis;
    3. все клиенты подключенные к этому узлу записывают пару ключ-значение в виде идентификатора клиента и идентификатора узла, к которому он подключен;
    4. при отправке сообщения другому клиенту, узнаем имя узла и передаем сообщения в его очередь для обработки.

    А теперь реализуем


    В качестве библиотеки для websocket выбран faye-websocket-ruby. Для работы с Redis стандартный гем redis (hiredis) + код примера для PubSub через EventMachine, так как реализация из гема работает в блокирующем режиме, а при работе в одном потоке с web-сервером это не допустимо.

    module App
      class << self
        def configuration
          yield(config) if block_given?
          config.sessions = Metriks.counter('total_sessions')
          config.active = Metriks.counter('active_sessions')
        end    
        def config			
          @config ||= OpenStruct.new( redis: nil, root: nil )
        end	
        def id
          @instance_id ||= SecureRandom.hex
        end
        def logger
          @logger ||= Logger.new $stderr
        end
    
        def register
          config.redis.multi do
            config.redis.set "node_#{App.id}", true
            config.redis.expire "node_#{App.id}", 60*10
          end if config.redis
    
          EM.next_tick do        
            config.sub = PubSub.connect
            config.sub.subscribe App.id do |type, channel, message|
              case type
                when 'message'
                  begin
                    json = Oj.load(message, mode: :compat)
                    WS::Base.remote_messsage json
                  rescue => ex
                    App.logger.error "ERROR: #{message.class} #{message} #{ex.to_s}"
                  end
                else
                  App.logger.debug "(#{type}) #{channel}:: #{message}"
              end
            end
            @pingpong = EM.add_periodic_timer(30) do
              App.config.redis.expire "node_#{App.id}", 60
            end
          end
        rescue
          config.redis = nil
        end
      end
    end
    

    Основная работа этого модуля заключается в методе register, который регистрирует себя на шине и ожидает входящие сообщения. Для мониторинга создается ключ вида node_%node_id% c TTL в 60 секунд и периодом обновления 30 секунд, на случай если узел отвалится. Таким образом можно всегда узнать сколько узлов сейчас находится в сети и их имена.

    module WS
      class Base
        NEXT_RACK = [404, {}, []].freeze
        def self.call(*args)
          instance.call(*args)
        end
        def self.instance
          @instance ||= self.new
        end
        def self.remote_messsage(json)
          user = User.get json['from']
          instance.send :process, user, json if user
        rescue => ex
          user.error( { error: ex.to_s } )
        end
        def initialize
          @ws_cache = {}
        end
        def call(env)
          return NEXT_RACK unless Faye::WebSocket.websocket?(env)
          ws = Faye::WebSocket.new(env, ['xmpp'], ping: 5)
          user = User.register(ws)
          ws.onmessage = lambda do |event|
            json = Oj.load(event.data, mode: :compat)
            process(user, json )
          end
          ws.onclose = lambda do |event|
            App.logger.info [:close, event.code, event.reason]
            user.unregister
            user = nil
          end
          ws.rack_response
        rescue WS::User::NotUnique => ex
          ws.send Oj.dump({ action: :error, data: { error: 'not unique session' } })
          ws.close
          ws.rack_response
        end
        private
    
        def process(user, json)
          action = json['action'].to_s
          data = json['data']
          return App.logger.info([:message, 'Empty action']) if action.empty?
          return App.logger.info([:message, "Unknown action #{json['action']}"]) unless user.respond_to? "on_#{action}"
          user.send "on_#{action}", data
        rescue => ex
          user.error({ error: ex.to_s })
          puts ex.to_s
          puts ex.backtrace
        end
      end
    end
    

    Данный класс отвечает за установление соединения и обработку сообщений. В методе call создается новый клиент и вешаются обработчики. Метод класса remote_messsage используется для приема внешних сообщений (из шины). Метод process — единая точка для сообщений пришедших напрямую от клиента и для сообщений пришедших по шине.
    Клиенты
    module WS
      class User
        include UserBehavior
        attr_reader :id
        class Error < StandardError; end
        class RoomFull < Error; end
        class NotFound < Error
          attr_reader :id
          def initialize(id); @id = id end
          def to_s; "User '@#{id}' not found" end
        end
        class NotUnique < Error; end
    
        class  << self
          def cache
            @ws_cache ||= {}
          end
    
          def get(id)
            fail NotFound.new(id) if id.to_s.empty?
            @ws_cache.fetch(id)
          rescue KeyError
            WS::RemoteUser.new(id)
          end
    
          def register(ws)
            self.new(ws)
          end
    
          def unregister(ws)
            url = URI.parse(ws.url)
            id = url.path.split('/').last
            get(id).unregister
          end
        end
    
        def initialize(ws)
          @ws = ws
          register
    
          @pingpong = EM.add_periodic_timer(5) do
            @ws.ping('') do
              App.config.redis.expire @id, 15 if App.config.redis
            end
          end
        end
    
        def unregister
          on_close if respond_to? :on_close
          App.config.active.decrement
          App.config.redis.del @id if App.config.redis
          User.cache.delete(@id)
          @pingpong.cancel
          @pingpong = nil
          @ws = nil
          @id = nil
        end
    
        def send_client(from, action, data)
          return unless @ws
          data = Oj.dump({ from: from.id, action: action.to_s, data: data }, mode: :compat)
          @ws.send(data)
        end
    
        private
        def register
          url = URI.parse(@ws.url)
          @id = url.path.split('/').last
          if App.config.redis
            App.config.redis.multi do
              App.config.redis.set @id, App.id
              App.config.redis.expire @id, 15
            end
            App.config.sessions.increment
            App.config.active.increment
          end
          User.cache[@id] = self
          App.logger.info [:open, @ws.url, @ws.version, @ws.protocol]
          on_register if respond_to? :on_close
          self
        end
      end
    
      class RemoteUser
        include UserBehavior
        attr_reader :id
        attr_reader :node
        def initialize(id)
          @id = id.to_s
          fail WS::User::NotFound.new(id) if @id.empty?
          @node = App.config.redis.get(@id).to_s
          fail WS::User::NotFound.new(id) if @node.empty?
        end
        def send_client(from, action, data)
          return if node.to_s.empty?
          App.logger.info ['REMOTE', self.id, from.id, action]
          data = Oj.dump({ from: from.id, action: action.to_s, data: data }, mode: :compat)
          App.config.redis.publish node, data
        end
      end
    end
    


    Метод register регистрирует пользователя в хранилище, сопоставляя его ID с ID узла куда он подключен и кэширует его в локальном списке. Метод unregister напротив убирает все записи о клиенте и удаляет таймер. Таймер используется для периодической проверки состояние клиента и обновления TTL для его записи, чтобы в Redis не было мертвых душ.
    ID клиента получается из URL по которому был запрос на подключение. Он имеет формат ws://%hostname%/ws/%user_id% где user_id случайно сгенерированная уникальаня последовательность.

    Метод send_client отправляет данные уже самому клиенту.

    Отдельное место занимает метод класса get. Данный метод возвращает по ID экземпляр класса WS::User либо если пользователь не найден в локальном кэше создает экземпляр класса WS::RemoteUser. При его создании проверяется есть ли такой ID в хранилище и какому узлу он принадлежит. Если ID не найдет кидается исключение.

    Класс WS::RemoteUser в отличии от WS::User имеет только один метод send_client, который пересылает сформированные сообщения через шину на требуемый узел.

    Таким образом, неважно где находится клиент вызов метода send_client доставит данные до адресата.

    module UserBehavior
      module ClassMethods
        def register_action(action, params = {})
          return App.logger.info ['register_action', "Method #{action} already defined"] if respond_to? action
    
          block = lambda do |*args |
            if block_given?
              data, from = yield(self, *args)
              send_client from || self, action, data
            else
              send_client self, action, args.first
            end
          end
    
          define_method action, &block
          define_method "on_#{action}" do |data|
            self.send action, data
          end if params[:passthrough]
    
        end
      end
    
      def self.included(base)
        base.instance_exec do
          extend ClassMethods
          register_action :message do |user, from, text|
            [{ to: user.id, text: text }, from]
          end
    
          register_action :error, passthrough: true
        end
      end
    
      def on_message(data)
        App.logger.info ['MESSAGE', id, data.to_s]
    
        to_user_id = data['to']
        to_user = WS::User.get(to_user_id)
        to_user.message self, data['text']
    
      rescue WS::User::NotFound  => ex
        error({ error: ex.to_s })
      end
    end
    

    Обработка самих событий вынесена в отдельный модуль UserBehavior, который расширяет предыдущие два класса методами для реакции на сообщения. Каждое сообщение имеет поля FROM, ACTION и DATA. Первое идентифицирует от кого пришло, второе определяет метод, а третья сопутствующие данные. Так для ACTION со значением «message» будет вызван метод on_message, в который будет передано значение поля DATA.

    Используя такой подход получилось реализовать прозрачную передачу сообщений между подключенными клиентами, при этом не важно находятся они на одном узле или на разных. Для тестирования запускал несколько экземпляров на разных портах, сообщения корректно отправлялись и получались.

    Для желающих попробовать, код рабочего приложения выложил на github. Запускается просто, через rackup

    PS


    Данное решения не является законченным, думаю есть куда его улучшить и убрать лишнее, но как отправная точка вполне сгодится.
    AdBlock has stolen the banner, but banners are not teeth — they will be back

    More
    Ads

    Comments 11

      0
      Исключительно праздное любопытство — чем вам не подошел сам Faye с faye-redis?
      Вместо Oj я бы использовал сразу MultiJson, так будет поддержка JRuby.
        +1
        Больше академический интерес, изначально код писался как пример по организации взаимодействия на сокетах и взаимодействия между узлами.
        В данном случае faye здесь просто как реализация сокетов, не более. Можно было взять и Em::WebSocket, что не принципиально, но в данном случае просто показал некоторое решение, которое можно либо развить, либо как в случае с Faye взять готовое.

        А за MultiJson спасибо.

        Посмотрел как у них реализовано. Подход схожий, но делается упор на абстракцию в виде каналов. В итоге если нужно передавать клиент-клиент, то каждый должен подписываться на канал друг друга. Получится либо очень очень много подписок (изначально старался уйти от этого), либо придем к схеме которую описал в статье, либо что-то еще.
          0
          Клиент-клиент как раз нет, количество каналов будет равно количеству клиентов.
          Публикуете в канал /user/:id, на который подписывается пользователь с :id, в сообщении присылаете id пользователя-отправителя.

          Ну и огромный плюс faye — он не ограничен одним websocket.
            0
            Да, согласен, перечитал еще раз.
            У них получается что лишь одна очередь (подписка на уровне Redis), в которую все шлют уведомление о новых сообщениях, далее фильтруют и выгребают сами данные.
              0
              Там чуть сложнее, вот пример ключей, которые он хранит:
              1) "/clients"
              5) "/channels/time"
              7) "/clients/j6hb00g9ta8d8y8b9o48hmi4b2br0bh/channels"
              9) "/channels/user/registered"

              Т.е. общий пул соединений (клиентов), каналы, на которые подписан конкретный клиент, и очереди для каждого используемого канала.
        +1
        А вот только дописал модуль веб-сокетов для своего движка, нужно ещё документацию сделать.

        Так вот, я каждый запускаемый сервер заношу в пул, первый в списке является мастером, если падает — удаляется и следующий становится мастером.

        Каждый сервер при старте проверяет — если он не мастер — подключается к нему, слушает на сообщения в специальном внутреннем шифрованом формате. Таким образом используются только веб-сокеты, даже между серверами. Когда нужно отправить сообщение определённой группе пользователей — отправляется пользователям на текущем сервере, и на мастер, который рассылает сообщение на остальные сервера.

        Сам пул — MEMORY табличка с одним полем — публичным адресом сервера, например wss://example.com/WebSockets (альтернативно можно хранить в любом другом удобном месте, в том же Redis).

        В моем варианте не нужно никакой шины, система сама адаптируется к падениям текущего мастера (который в последствии будет поднят, но станет уже последним в пуле). При отказоустойчивой архитектуре БД получается весьма надежно. Сами пользователи вообще нигде не регистрируются, id пользователя хранится в одном из полей соединения, так как сообщение рассылается на все сервера мне всё равно где именно и сколько одновременных подключений у конкретного пользователя.
          0
          Получается что всё горизонтальное масштабирование упирается в сервис Redis, который вроде бы горизонтально не расширяется? В чем тогда смысл такого подхода, не проще ли для WebSocket держать отдельный сервер вместо Redis?
            0
            Что случится, когда сервер перестанет вывозить N одновременно подключенных клиентов? Нужно будет добавить еще один websocket-сервер.
            А взаимодействовать между собой они будут через Redis. Что-то мне подсказывает, что сервер WebSocket уткнется в свой потолок гораздо раньше, чем Redis.
              0
              > Что случится, когда сервер перестанет вывозить N одновременно подключенных клиентов?
              Тот же самый вопрос можно задать про Redis. Вы говорите про низкий потолок WebSocket, возможно просто нужно использовать софт для WebSocker сервера который бы давал примерно такой же потолок как redis.

              Впрочем определенный выигрыш я вижу тут есть, так как WebSocket и Redis будут потреблять разные ресурсы, поскольку сервер Redis не обязан поддерживать большое количество открытых TCP соединений. Однако трудно все же назвать это полностью горизонтальным масштабированием.
              0
              Судя по замерам отсюда: redis.io/topics/benchmarks Redis не самое узкое место, а если все же производительность упрется в него, то всегда можно перейти на другие решения, например RabbitMQ и построить кластер.
                0
                Если например узкое место будет network IO (представим что клиенты отправляют большие объемы данных друг-другу), то сетевой интерфейс редис сервера забьется и такое горизонтальное масштабирование работать не будет.

                Вообще я думаю что ваше решение вполне может подходящим к своей ситуации, но все же горизонтальность тут не полная. Наверное вы правы что при необходимости это можно дальше масштабирвать через RabbitMQ (с последним я не сталкивался).

            Only users with full accounts can post comments. Log in, please.