Не так давно вышла статья, в которой автор описывал свой framework для написания приложений с использованием Ruby, Sinatra и websoсket. Но в том решении не был затронут вопрос горизонтального масштабирования. Так при подключении к одному из узлов, пользователи могут получать уведомления/данные только о событиях/изменениях, вызванных пользователями этого же узла, а при изменениях, внесенных через другой, они не узнают. Для решения данной задачи необходимо организовать общую шину данных. Рассматривать данную задачу буду в контексте обмена сообщениями клиент-клиент.
Требования, которые будем предъявлять к шине следующие:
Организовать шину можно через хранилище с периодическим опросом, либо через сервер очередей.
Первый вариант не удовлетворяет второму условию, т.к. задержка в передаче будет ровна периоду опроса хранилища. Уменьшение периода приведет к росту нагрузки на него. Поэтому этот вариант отметаем сразу.
Второй вариант подходит лучше всего. В данном случае можно воспользоваться специализированными решениями на подобии RabbitMQ, ActiveMQ. Оба этих продукта представляют из себя серьезные решения, со множеством функций, хорошим масштабированием. Можно использовать и их, но нужно оценить, не будет ли это пушкой по воробьям. Кроме подобных решений функционал очередей предоставляет и Redis, в добавок получаем key-value хранилище, которое нам тоже понадобится.
Redis предоставляет простейший механизм Pub-Sub, которого достаточно для нашей задачи. Он достаточно быстр, прост в работе и имеет малые задержки при передаче.
Наша система будет иметь следующую схему.

Сообщения между пользователями одного узла передаются напрямую, а сообщения между узлами через шину.
Для этого:
В качестве библиотеки для websocket выбран faye-websocket-ruby. Для работы с Redis стандартный гем redis (hiredis) + код примера для PubSub через EventMachine, так как реализация из гема работает в блокирующем режиме, а при работе в одном потоке с web-сервером это не допустимо.
Основная работа этого модуля заключается в методе register, который регистрирует себя на шине и ожидает входящие сообщения. Для мониторинга создается ключ вида node_%node_id% c TTL в 60 секунд и периодом обновления 30 секунд, на случай если узел отвалится. Таким образом можно всегда узнать сколько узлов сейчас находится в сети и их имена.
Данный класс отвечает за установление соединения и обработку сообщений. В методе call создается новый клиент и вешаются обработчики. Метод класса remote_messsage используется для приема внешних сообщений (из шины). Метод process — единая точка для сообщений пришедших напрямую от клиента и для сообщений пришедших по шине.
Метод register регистрирует пользователя в хранилище, сопоставляя его ID с ID узла куда он подключен и кэширует его в локальном списке. Метод unregister напротив убирает все записи о клиенте и удаляет таймер. Таймер используется для периодической проверки состояние клиента и обновления TTL для его записи, чтобы в Redis не было мертвых душ.
ID клиента получается из URL по которому был запрос на подключение. Он имеет формат ws://%hostname%/ws/%user_id% где user_id случайно сгенерированная уникальаня последовательность.
Метод send_client отправляет данные уже самому клиенту.
Отдельное место занимает метод класса get. Данный метод возвращает по ID экземпляр класса WS::User либо если пользователь не найден в локальном кэше создает экземпляр класса WS::RemoteUser. При его создании проверяется есть ли такой ID в хранилище и какому узлу он принадлежит. Если ID не найдет кидается исключение.
Класс WS::RemoteUser в отличии от WS::User имеет только один метод send_client, который пересылает сформированные сообщения через шину на требуемый узел.
Таким образом, неважно где находится клиент вызов метода send_client доставит данные до адресата.
Обработка самих событий вынесена в отдельный модуль UserBehavior, который расширяет предыдущие два класса методами для реакции на сообщения. Каждое сообщение имеет поля FROM, ACTION и DATA. Первое идентифицирует от кого пришло, второе определяет метод, а третья сопутствующие данные. Так для ACTION со значением «message» будет вызван метод on_message, в который будет передано значение поля DATA.
Используя такой подход получилось реализовать прозрачную передачу сообщений между подключенными клиентами, при этом не важно находятся они на одном узле или на разных. Для тестирования запускал несколько экземпляров на разных портах, сообщения корректно отправлялись и получались.
Для желающих попробовать, код рабочего приложения выложил на github. Запускается просто, через rackup
Данное решения не является законченным, думаю есть куда его улучшить и убрать лишнее, но как отправная точка вполне сгодится.
Шина данных
Требования, которые будем предъявлять к шине следующие:
- простота работа;
- передача в «реальном времени»;
- производительность.
Организовать шину можно через хранилище с периодическим опросом, либо через сервер очередей.
Первый вариант не удовлетворяет второму условию, т.к. задержка в передаче будет ровна периоду опроса хранилища. Уменьшение периода приведет к росту нагрузки на него. Поэтому этот вариант отметаем сразу.
Второй вариант подходит лучше всего. В данном случае можно воспользоваться специализированными решениями на подобии RabbitMQ, ActiveMQ. Оба этих продукта представляют из себя серьезные решения, со множеством функций, хорошим масштабированием. Можно использовать и их, но нужно оценить, не будет ли это пушкой по воробьям. Кроме подобных решений функционал очередей предоставляет и Redis, в добавок получаем key-value хранилище, которое нам тоже понадобится.
Redis предоставляет простейший механизм Pub-Sub, которого достаточно для нашей задачи. Он достаточно быстр, прост в работе и имеет малые задержки при передаче.
Решение
Наша система будет иметь следующую схему.

Сообщения между пользователями одного узла передаются напрямую, а сообщения между узлами через шину.
Для этого:
- узел генерирует уникальное имя;
- подписывается по нему на сообщения в Redis;
- все клиенты подключенные к этому узлу записывают пару ключ-значение в виде идентификатора клиента и идентификатора узла, к которому он подключен;
- при отправке сообщения другому клиенту, узнаем имя узла и передаем сообщения в его очередь для обработки.
А теперь реализуем
В качестве библиотеки для websocket выбран faye-websocket-ruby. Для работы с Redis стандартный гем redis (hiredis) + код примера для PubSub через EventMachine, так как реализация из гема работает в блокирующем режиме, а при работе в одном потоке с web-сервером это не допустимо.
module App class << self def configuration yield(config) if block_given? config.sessions = Metriks.counter('total_sessions') config.active = Metriks.counter('active_sessions') end def config @config ||= OpenStruct.new( redis: nil, root: nil ) end def id @instance_id ||= SecureRandom.hex end def logger @logger ||= Logger.new $stderr end def register config.redis.multi do config.redis.set "node_#{App.id}", true config.redis.expire "node_#{App.id}", 60*10 end if config.redis EM.next_tick do config.sub = PubSub.connect config.sub.subscribe App.id do |type, channel, message| case type when 'message' begin json = Oj.load(message, mode: :compat) WS::Base.remote_messsage json rescue => ex App.logger.error "ERROR: #{message.class} #{message} #{ex.to_s}" end else App.logger.debug "(#{type}) #{channel}:: #{message}" end end @pingpong = EM.add_periodic_timer(30) do App.config.redis.expire "node_#{App.id}", 60 end end rescue config.redis = nil end end end
Основная работа этого модуля заключается в методе register, который регистрирует себя на шине и ожидает входящие сообщения. Для мониторинга создается ключ вида node_%node_id% c TTL в 60 секунд и периодом обновления 30 секунд, на случай если узел отвалится. Таким образом можно всегда узнать сколько узлов сейчас находится в сети и их имена.
module WS class Base NEXT_RACK = [404, {}, []].freeze def self.call(*args) instance.call(*args) end def self.instance @instance ||= self.new end def self.remote_messsage(json) user = User.get json['from'] instance.send :process, user, json if user rescue => ex user.error( { error: ex.to_s } ) end def initialize @ws_cache = {} end def call(env) return NEXT_RACK unless Faye::WebSocket.websocket?(env) ws = Faye::WebSocket.new(env, ['xmpp'], ping: 5) user = User.register(ws) ws.onmessage = lambda do |event| json = Oj.load(event.data, mode: :compat) process(user, json ) end ws.onclose = lambda do |event| App.logger.info [:close, event.code, event.reason] user.unregister user = nil end ws.rack_response rescue WS::User::NotUnique => ex ws.send Oj.dump({ action: :error, data: { error: 'not unique session' } }) ws.close ws.rack_response end private def process(user, json) action = json['action'].to_s data = json['data'] return App.logger.info([:message, 'Empty action']) if action.empty? return App.logger.info([:message, "Unknown action #{json['action']}"]) unless user.respond_to? "on_#{action}" user.send "on_#{action}", data rescue => ex user.error({ error: ex.to_s }) puts ex.to_s puts ex.backtrace end end end
Данный класс отвечает за установление соединения и обработку сообщений. В методе call создается новый клиент и вешаются обработчики. Метод класса remote_messsage используется для приема внешних сообщений (из шины). Метод process — единая точка для сообщений пришедших напрямую от клиента и для сообщений пришедших по шине.
Клиенты
module WS class User include UserBehavior attr_reader :id class Error < StandardError; end class RoomFull < Error; end class NotFound < Error attr_reader :id def initialize(id); @id = id end def to_s; "User '@#{id}' not found" end end class NotUnique < Error; end class << self def cache @ws_cache ||= {} end def get(id) fail NotFound.new(id) if id.to_s.empty? @ws_cache.fetch(id) rescue KeyError WS::RemoteUser.new(id) end def register(ws) self.new(ws) end def unregister(ws) url = URI.parse(ws.url) id = url.path.split('/').last get(id).unregister end end def initialize(ws) @ws = ws register @pingpong = EM.add_periodic_timer(5) do @ws.ping('') do App.config.redis.expire @id, 15 if App.config.redis end end end def unregister on_close if respond_to? :on_close App.config.active.decrement App.config.redis.del @id if App.config.redis User.cache.delete(@id) @pingpong.cancel @pingpong = nil @ws = nil @id = nil end def send_client(from, action, data) return unless @ws data = Oj.dump({ from: from.id, action: action.to_s, data: data }, mode: :compat) @ws.send(data) end private def register url = URI.parse(@ws.url) @id = url.path.split('/').last if App.config.redis App.config.redis.multi do App.config.redis.set @id, App.id App.config.redis.expire @id, 15 end App.config.sessions.increment App.config.active.increment end User.cache[@id] = self App.logger.info [:open, @ws.url, @ws.version, @ws.protocol] on_register if respond_to? :on_close self end end class RemoteUser include UserBehavior attr_reader :id attr_reader :node def initialize(id) @id = id.to_s fail WS::User::NotFound.new(id) if @id.empty? @node = App.config.redis.get(@id).to_s fail WS::User::NotFound.new(id) if @node.empty? end def send_client(from, action, data) return if node.to_s.empty? App.logger.info ['REMOTE', self.id, from.id, action] data = Oj.dump({ from: from.id, action: action.to_s, data: data }, mode: :compat) App.config.redis.publish node, data end end end
Метод register регистрирует пользователя в хранилище, сопоставляя его ID с ID узла куда он подключен и кэширует его в локальном списке. Метод unregister напротив убирает все записи о клиенте и удаляет таймер. Таймер используется для периодической проверки состояние клиента и обновления TTL для его записи, чтобы в Redis не было мертвых душ.
ID клиента получается из URL по которому был запрос на подключение. Он имеет формат ws://%hostname%/ws/%user_id% где user_id случайно сгенерированная уникальаня последовательность.
Метод send_client отправляет данные уже самому клиенту.
Отдельное место занимает метод класса get. Данный метод возвращает по ID экземпляр класса WS::User либо если пользователь не найден в локальном кэше создает экземпляр класса WS::RemoteUser. При его создании проверяется есть ли такой ID в хранилище и какому узлу он принадлежит. Если ID не найдет кидается исключение.
Класс WS::RemoteUser в отличии от WS::User имеет только один метод send_client, который пересылает сформированные сообщения через шину на требуемый узел.
Таким образом, неважно где находится клиент вызов метода send_client доставит данные до адресата.
module UserBehavior module ClassMethods def register_action(action, params = {}) return App.logger.info ['register_action', "Method #{action} already defined"] if respond_to? action block = lambda do |*args | if block_given? data, from = yield(self, *args) send_client from || self, action, data else send_client self, action, args.first end end define_method action, &block define_method "on_#{action}" do |data| self.send action, data end if params[:passthrough] end end def self.included(base) base.instance_exec do extend ClassMethods register_action :message do |user, from, text| [{ to: user.id, text: text }, from] end register_action :error, passthrough: true end end def on_message(data) App.logger.info ['MESSAGE', id, data.to_s] to_user_id = data['to'] to_user = WS::User.get(to_user_id) to_user.message self, data['text'] rescue WS::User::NotFound => ex error({ error: ex.to_s }) end end
Обработка самих событий вынесена в отдельный модуль UserBehavior, который расширяет предыдущие два класса методами для реакции на сообщения. Каждое сообщение имеет поля FROM, ACTION и DATA. Первое идентифицирует от кого пришло, второе определяет метод, а третья сопутствующие данные. Так для ACTION со значением «message» будет вызван метод on_message, в который будет передано значение поля DATA.
Используя такой подход получилось реализовать прозрачную передачу сообщений между подключенными клиентами, при этом не важно находятся они на одном узле или на разных. Для тестирования запускал несколько экземпляров на разных портах, сообщения корректно отправлялись и получались.
Для желающих попробовать, код рабочего приложения выложил на github. Запускается просто, через rackup
PS
Данное решения не является законченным, думаю есть куда его улучшить и убрать лишнее, но как отправная точка вполне сгодится.