Миллиарды сообщений в минуту по протоколу TCP/IP
Основные выводы
Согласованность данных имеет решающее значение при обмене данными между программными компонентами на разных машинах, чтобы обеспечить сохранность информации.
Обмен данными с низкой задержкой требует иного подхода, чем обычные форматы.
Библиотека Chronicle Wire с открытым исходным кодом обеспечивает высокоэффективные средства сериализации и десериализации данных для передачи в и из Chronicle Queue.
Недавние дополнения к библиотеке расширяют возможности ее использования с каналами связи TCP/IP, обеспечивая чрезвычайно высокую пропускную способность.
Использование Wire через TCP/IP открывает возможность облачно-ориентированного (cloud native) развертывания приложений на основе Chronicle.
Одной из наиболее важных проблем при создании распределенных приложений является вопрос представления данных. Мы должны убедиться, что данные, отправленные компонентом «удаленному» компоненту (т. е. компоненту, который является частью другого процесса), будут получены правильно с теми же значениями.
Это может показаться простым, но помните, что взаимодействующие компоненты могут быть написаны на совершенно разных языках. Ситуация усложняется еще больше, если учесть, что разные аппаратные/системные архитектуры, скорее всего, имеют различные способы представления «одинаковых» значений. Простого копирования байтов из одного компонента в другой недостаточно. Даже в Java, где мы можем считать себя «защищенными» от такого рода ситуаций, нет требования, чтобы две разные реализации JVM или разные версии от одного производителя использовали одно и то же внутреннее представление для объектов.
Наиболее распространенным решением этой проблемы является определение «канонического» представления данных, которое будет понятно между процессами — даже между языками программирования — и перевод данных в этот формат перед отправкой, а затем обратно в собственный формат получателя после получения.
Существует несколько таких «форматов передачи», начиная от текстовых стандартов, таких как YAML, JSON или XML, до бинарных вариантов, таких как Protobuf, которые включают метаданные или являются полностью неструктурированными.
В компании Chronicle Software разработан ряд библиотек для поддержки создания приложений, оптимизированных для обмена сообщениями с низкой задержкой, в основном в сфере финансовых услуг. Компания предоставляет разработку решений на заказ и консультации клиентам со всего мира, большинство из которых работает в финансовой сфере.
Одна из этих библиотек, Chronicle Wire , обеспечивает высокопроизводительные преобразования состояния объектов Java между их внутренним представлением в JVM и форматом, который позволяет сохранять это состояние или передавать его другому процессу Java.
Chronicle Wire вырос из проекта Chronicle Queue, который обеспечивает микросекундные задержки при обмене сообщениями между JVM на одной машине или стабильные задержки в десятки микросекунд между машинами при масштабировании трафика до миллионов сообщений в секунду. В настоящее время Wire является ключевой частью большинства программных компонентов, разработанных в Chronicle, для различных сценариев использования от сериализации и десериализации состояния объектов для связи между компонентами до эффективной модели для управления конфигурацией этих компонентов.
Поскольку программные архитектуры все чаще используют основанный на событиях распределенный подход, Chronicle стремился расширить пространство, в котором можно использовать Chronicle Wire, для поддержки TCP/IP-соединений между компонентами. В этой статье представлен базовый обзор возможностей, которые будут доступны, и несколько простых примеров того, как их можно использовать.
Уже видны некоторые поразительные показатели производительности для этого базового подхода - например, в тесте производительности, описанном в статье Питера Лоури Java is Very Fast, If You Don't Create Many Objects, который построен на сети TCP/IP с обратной связью на одной машине, мы смогли передать более 4 миллиардов событий в минуту.
Был проведен сравнительный анализ с аналогичными технологиями, используемыми для обмена данными, а именно с Jackson и BSON. В тесте, обрабатывающем 100 байтовых сообщений, время обработки каждого сообщения в 99,99 процентилях составило около 10,5 микросекунд при использовании Chronicle Wire по сравнению с 1400 микросекундами при использовании Jaskcon/BSON. Это существенная разница.
Здесь представлено введение в ключевые концепции, используемые для реализации этого подхода. Эти функции мы разрабатываем так, чтобы они были гибкими и эффективными, и в будущих статьях мы покажем некоторые более сложные сценарии использования.
О Chronicle Wire
Chronicle Wire является прослойкой между приложением и потоком байтов, выступая в качестве источника или приемника данных. Wire сериализует (marshal) состояние объекта Java и сохраняет полученный формат в потоке байтов, либо считывает последовательность байтов из потока байтов и десериализует (unmarshal) их в объект Java, основываясь только на информации, содержащейся в сообщении.
Давайте рассмотрим простой пример. Мы будем моделировать сохранение объекта Java, сериализуя его состояние в Wire и считывая его обратно в отдельный объект. Мы будем использовать следующий класс с именем Person.
public class Person extends SelfDescribingMarshallable {
private String name;
@NanoTime
private long timestampNS;
@Base85
private long userName;
…
}
Полный код класса можно найти в репозитории Chronicle Wire на Github.
Родительский тип SelfDescribingMarshallable
содержит необходимые функции для взаимодействия с Wire — он примерно эквивалентен интерфейсу тегирования java.io.Serializable
, используемому в сериализации Java, хотя он намного мощнее и не содержит недостатков безопасности.
Как следует из названия, объект SelfDescribingMarshallable
не требует дополнительных средств для поддержки сериализации и десериализации — таких как схема для XML или генератор кода для Protobuf или SBE. Кроме того, интерфейс обеспечивает реализацию «основных» методов Java-объектов данных equals(), hashcode()
и toString()
.
Аннотация @NanoTime
используется Chronicle Wire для наиболее эффективного кодирования значения свойства в качестве метки времени, а @Base85
используется для кодирования коротких строк с с экономией места. Обе аннотации также обеспечивают преобразование их компактных внутренних представлений в дружественные строковые представления для соответствующих значений.
Давайте создадим экземпляр Chronicle Wire, который будет выполнять сериализацию и десериализацию в/из YAML, используя область памяти, выделенную в Java куче.
Wire yWire = Wire.newYamlWireOnHeap();
Чтобы создать и инициализировать экземпляр класса Person, мы должны написать код:
Person p1 = new Person()
.name("George Ball")
.timestampNS(CLOCK.currentTimeNanos())
.userName(Base85.INSTANCE.parse("georgeb"));
System.out.println("p1: " + p1);
Мы используем перегруженные методы и стиль потока вместо методов get…()
и set…()
для доступа к свойствам и их изменения. Вывод кода показывает инициализированное состояние объекта Person, используя метод toString()
родительского типа SelfDescribingMarshallable
:
p1: !Person {
name: George Ball,
timestampNS: 2022-11-11T10:11:26.1922124,
userName: georgeb
}
Теперь мы сериализуем объект в Wire. Поскольку Wire был создан для использования текста/YAML, его содержимое может быть легко отображено:
Wire yWire = Wire.newYamlWireOnHeap();
p1.writeMarshallable(yWire);
System.out.println(yWire);
Как мы видим свойства сериализованы соответствующим образом:
name: George Ball
timestampNS: 2022-11-11T10:11:54.7071341
userName: georgeb
Теперь мы можем создать пустой экземпляр класса Person, заполнить его, считывая данные из Wire, и распечатать его:
Person p2 = new Person();
p2.readMarshallable(yWire);
System.out.println("p2: " + p2);
Вывод показывает, что новый объект имеет правильное состояние:
p2: !Person {
name: George Ball,
timestampNS: 2022-11-11T10:13:29.388,
userName: georgeb
}
Код этих примеров можно найти в Chronicle Wire Github repo.
Методы чтения и записи
Обычно мы представляем себе объекты, которые сериализуются и десериализуются с помощью Wire для хранения каких-то данных, относящихся к нашему приложению. При использовании Chronicle Queue в качестве транспорта сообщений эти объекты формируют полезную нагрузку сообщений, и мы называем их объектами передачи данных (DTO).
Однако можно взглянуть на эту функциональность с другой стороны. Сериализованная форма объекта Person содержит свойства объекта в формате YAML:
name: George Ball
timestampNS: 2022-11-11T10:11:54.7071341
userName: georgeb
Если обобщить это, то мы получим средство кодирования и отправки, использующее Wire для запроса вызова метода с заданным аргументом. Из-за однонаправленной природы нашего транспорта сообщений эти методы должны быть void, т.е. они не могут возвращать значение. Чтобы проиллюстрировать это, рассмотрим интерфейс, который содержит определения операций, выполняемых над объектами Person
. Реализация(и) метода(ов) на данный момент опустим:
public interface PersonOps {
void addPerson(Person p);
}
Для простоты здесь указан только один метод. Он предназначен для того, чтобы взять один аргумент типа Person
и добавить его в некоторую коллекцию. Исходя из предыдущего примера, мы можем ожидать, что экземпляр этого типа будет закодирован в Wire следующим образом:
addPerson: {
name: George Ball,
timestampNS: 2022-11-11T10:11:54.7071341,
userName: georgeb
}
и декодируется в форму, которую можно считать вызовом метода:
personOps.addPerson(
Marshallable.fromString(Person.class, "" +
"name: Alice Smithl\n" +
"timestampNS: 2022-11-11T10:11:54.7071341\n" +
"userName: alices\n"));
Chronicle Wire предлагает возможность кодировать и декодировать вызовы методов подобным образом. Отправитель использует тип с именем MethodWriter, а получатель - тип с именем MethodReader
.
В качестве примера, для типа PersonOps
, показанного выше, мы можем создать метода записи:
final PersonOps personOps = yWire.methodWriter(PersonOps.class);
Результатом вызова этого метода является экземпляр типа интерфейса, имеющий заглушку реализации метода addPerson(), который кодирует запрос к Wire. Мы можем вызвать этот метод следующим образом:
personOps.addPerson(p1);
personOps.addPerson(new Person()
.name("Bob Singh")
.timestampNS(CLOCK.currentTimeNanos())
.userName(Base85.INSTANCE.parse("bobs")));
и если мы посмотрим на Wire, мы увидим запрос вызова, закодированный как сообщение:
addPerson: {
name: Alice Smith,
timestampNS: 2022-11-11T10:11:54.7071341,
userName: alices
}
...
addPerson: {
name: George Ball,
timestampNS: 2022-11-11T10:28:47.466,
userName: georgeb
}
...
addPerson: {
name: Bob Singh,
timestampNS: 2022-11-11T10:28:48.3001121,
userName: bobs
}
...
На принимающей стороне мы можем создать объект MethodReader
, предоставляющий реализацию метода, который будет вызван при декодировании:
MethodReader reader = yWire.methodReader(
(PersonOps) p -> System.out.println("added " + p));
Когда сообщение будет прочитано и декодировано, будет вызван метод:
for (int i = 0; i < 3; i++)
reader.readOne();
После вызова метода мы покажем результат вызова с помощью System.out.println()
:
added !Person {
name: Alice Smith,
timestampNS: 2022-11-11T10:11:54.7071341,
userName: alices
}
added !Person {
name: George Ball,
timestampNS: 2022-11-11T10:28:47.466,
userName: georgeb
}
added !Person {
name: Bob Jones,
timestampNS: 2022-11-11T10:28:48.3001121,
userName: bobj
}
Это потенциально очень мощный инструмент, поскольку он дает нам очень гибкий и эффективный способ кодирования событий или сообщений и их связывания с обработчиками. Доступна вся гибкость кодирования Wire - текстовые форматы или высокоэффективные двоичные форматы, а также множество различных типов базовых транспортных средств, с которыми работает Wire.
Теперь мы рассмотрим, как добавление поддержки сетевого взаимодействия на основе TCP/IP в качестве транспорта для Wire может расширить возможности еще больше.
Концепции
Новые возможности основаны на трех абстракциях:
Канал
Канал Chronicle (Chronicle Channel) — это абстракция двунаправленного соединения «точка-точка» между двумя компонентами. Тип канала, указанный при создании канала, определяет используемый транспорт. Первоначальная реализация поддерживает TCP/IP, используя асинхронные сокеты или внутренние каналы, которые соединяют две конечные точки в рамках одного процесса. Предполагается, что она будет поддерживать дополнительные, более высокоуровневые транспорты, такие как GRPC, REST или Websockets.
Канал переносит События, упакованные в виде сообщений Chronicle Wire, между этими двумя компонентами. Типы каналов могут быть определены для различных видов транспорта, хотя первоначальная реализация поддерживает TCP/IP или «локальные» (внутрипроцессные) каналы.
Контекст
Контекст (Context) — это контейнер управления каналами, отвечающий за их конфигурацию и жизненный цикл.
Обработчик
Обработчик (Handler) — это компонент, который привязывается к каналу и определяет, как обрабатываются входящие события и передаются исходящие (результирующие) события. Это позволяет реализовать различные формы управления сессиями. Имеется множество предопределенных обработчиков, также можно определить дополнительные обработчики.
Обработчик ассоциируется с каналом во время установления соединения, обычно «инициатором» соединения (т. е. клиентом).
Работа с каналами
Давайте рассмотрим несколько примеров этих функций в действии.
Пример 1: Hello, World
В соответствии со стандартной практикой, первый пример представляет собой простое эхо-сообщение «Hello». Пронумерованные комментарии указывают на интересные места в коде и соответствуют приведенному ниже списку:
public class Channel1ReadWrite {
private static final String URL = System.getProperty("url", "tcp://:3334"); // ===> (1)
public static void main(String[] args) {
try (ChronicleContext context = ChronicleContext.newContext(URL).name("Channel1"); // ===> (2)
ChronicleChannel channel = context.newChannelSupplier(new EchoHandler()).get()) {
Jvm.startup().on(Channel1.class, "Channel set up on port: " + channel.channelCfg().port());
Says says = channel.methodWriter(Says.class); // ===> (3)
says.say("Well hello there");
StringBuilder eventType = new StringBuilder(); // ===> (4)
String text = channel.readOne(eventType, String.class);
Jvm.startup().on(Channel1.class, ">>>> " + eventType + ": " + text);
}
}
}
Критически важной для настройки канала является строка URL. В настоящее время в качестве транспорта доступен только TCP/IP, но со временем могут и будут поддерживаться и другие. Семантика этой строки в понимании настройки Chronicle Channel представлена в следующей таблице.
Формат URL | Значение |
internal:// | Внутренний канал для обработки |
tcp://:{порт} | Каналы принимают входящие запросы, используйте порт 0 для эфемерного порта |
tcp://{имя хоста}:{порт} | Клиентская сторона канала |
Мы используем
try-with-resources
, чтобы гарантировать, что все необходимые компоненты, которые мы создаем, будут закрыты надлежащим образом, когда мы закончим. Сначала мы создаем Context, который будет управлять жизненным циклом и конфигурацией каналов.
Context предоставляет фабрику, из которой могут быть созданы новые каналы. При запросе нового канала мы указываем, какой обработчик будет использоваться для обработки входящих событий. В этом примере мы используем EchoHandler
, что, как следует из названия, просто разворачивает событие и отправляет его обратно отправителю.
Вся необходимая работа по настройке серверного сокета для этого соединения выполняется фабрикой. Возвращенный канал доступен для использования.
Помните, что TCP/IP — это полнодуплексный протокол, поэтому канал, который мы имеем, является двунаправленным. Поэтому мы можем послать событие через канал, используя
MethodWriter
, сгенерированный из следующего типа:
public interface Says extends Syncable {
void say(String say);
}
…
Says says = channel.methodWriter(Says.class);
says.say("Well hello there");
…
Затем мы можем использовать Chronicle Wire для чтения эхо-события из канала и отображения его деталей.
Затем мы можем использовать Chronicle Wire для чтения эхо-события из канала и отображения его деталей.
После выполнения этого простого примера мы можем увидеть вывод:
[main] INFO run.chronicle.wire.channel.demo1.Channel1 - Channel set up on port: 3334
[main] INFO run.chronicle.wire.channel.demo1.Channel1 - >>>> say: Well hello there
Пример 2: Отдельные клиент и сервер
Первый пример немного искусственный, так как он объединяет функциональность клиентской и серверной частей в один процесс. Хотя это может быть идеально для целей тестирования или отладки, в реальности мы хотим разделить обе стороны на отдельные процессы. Давайте посмотрим на сервер после этого разделения:
public class ChannelService {
static final int PORT = Integer.getInteger("port", 4441);
public static void main(String[] args) throws IOException {
System.setProperty("port", "" + PORT); // set if not set.
ChronicleGatewayMain.main(args);
}
}
Обратите внимание, что теперь он очень короткий, благодаря использованию служебного класса ChronicleGatewayMain
, который инкапсулирует функциональность настройки серверной части (channel acceptor), удалению шаблонного кода и максимально возможного использования настроек по умолчанию.
Код для клиентской стороны показан ниже, а пронумерованные комментарии снова иллюстрируют интересные моменты:
public class ChannelClient {
private static final String URL = System.getProperty("url", "tcp://localhost:" + ChannelService.PORT); // ===> (1)
public static void main(String[] args) {
try (ChronicleContext context = ChronicleContext.newContext(URL).name("ChannelClient"); // ===> (2)
ChronicleChannel channel = context.newChannelSupplier(new EchoHandler()).get()) {
Jvm.startup().on(ChannelClient.class, "Channel set up on port: " + channel.channelCfg().port());
Says says = channel.methodWriter(Says.class); // ===> (3)
says.say("Well hello there");
StringBuilder eventType = new StringBuilder();
String text = channel.readOne(eventType, String.class);
Jvm.startup().on(ChannelClient.class, ">>>> " + eventType + ": " + text);
}
}
}
Строка URL содержит имя хоста и номер порта, что сообщает логике создания канала, что мы инициируем настройку канала со стороны клиента, предоставляя полный адрес акцептора для сервиса.
Контекст настроен как инициатор/клиент из-за формата строки URL. При создании канала из контекста инициатора/клиента мы указываем, какой обработчик будет использоваться на принимающей стороне. Это является частью спецификации запрашиваемого канала, которая отправляется сервису во время настройки канала.
Необходимо, чтобы сервис имел необходимый код для обработчика — из соображений безопасности код не пересылается по сети ни на каком этапе — в противном случае настройка канала будет неудачной.
Как только канал установлен, код такой же, как и в первом примере.
При запуске как клиентского, так и серверного приложений результат будет таким же, как и в первом примере:
[main] INFO run.chronicle.wire.channel.demo2.ChannelClient - Channel set up on port: 4441
[main] INFO run.chronicle.wire.channel.demo2.ChannelClient - >>>> say: Well hello there
Пример 3: Простое взаимодействие запрос/ответ
Ранее мы рассмотрели, как использовать MethodReader и MethodWriter Wire для реализации способа передачи запросов на вызов методов вне текущего процесса. Теперь мы можем расширить этот пример, чтобы продемонстрировать возможность использования Wire через канал TCP/IP реализовать базовое взаимодействие запроса/ответа с сервисом способом, аналогичным удаленному вызову процедур.
Сам сервис прост и предоставляет всего один метод — цель данного примера состоит в том, чтобы продемонстрировать шаги, необходимые для создания сервиса и доступа к нему.
Этот пример состоит из четырех частей:
Сервис, которая реализует бизнес-логику с точки зрения типов сообщений для ввода и вывода.
Обработчик канала, который подключает сервис к базовой инфраструктуре канала.
Драйвер сервиса, который действует как точка входа на стороне сервера, создавая и настраивая как сервис, так и обработчик канала.
Клиент, отдельное приложение, которое создает и отправляет запрос и получает ответ.
Обслуживание
Сервис определяется с помощью интерфейса, который содержит сигнатуры методов, представляющие поддерживаемые запросы. Мы определяем интерфейс сервиса следующим образом:
public interface PersonOps {
void addPerson ( Person p );
}
Тип Person
определен ранее.
Обмен сообщениями в Chronicle однонаправленный, поэтому методы API сервиса недействительны. Поэтому нам необходимо определить второй интерфейс, который определяет сообщение, используемое для ответа:
public interface ResponseSender {
void respond(ReqStatus status);
}
Тип ReqStatus
указывает на успех или неудачу метода и определяется следующим образом:
public enum ReqStatus {
OK,
ERROR
}
Эти два интерфейса соединены вместе, чтобы сформировать «обработчик» входящих запросов:
public class PersonOpsProcessor implements PersonOpsHandler {
private transient ResponseSender responder; // ===> (1)
public PersonOpsProcessor responder(ResponseSender responseSender) { // ===> (2)
this.responder = responseSender;
return this;
}
@Override
public void addPerson(Person p) { // ===> (3)
responder.respond(ReqStatus.OK);
}
}
Это поле будет храниться ссылка на выходные данные для этого сервиса, в которую отправляются ответные сообщения.
В этом примере ResponseSender вводится с помощью метода setter, это также можно сделать с помощью конструктора.
Это реализация метода в интерфейсе PersonOps, который для простоты отправляет ответ об успешном статусе.
Обработчик канала
Вспомните из обсуждения концепций, что обработчик канала (Channel Handler) отвечает за обработку сообщений/событий, которые передаются по связанному с ним каналу (Channel).
Для этого примера нам нужно определить класс, который будет отправлять входящие сообщения на Channel соответствующему методу обработчика в сервисе и подключать выходные данные службы к каналу:
public class PersonSvcHandler extends AbstractHandler<PersonSvcHandler> { // ===> (1)
private final PersonOpsHandler personOpsHandler; // ===> (2)
public PersonSvcHandler(PersonOpsHandler personOpsHandler) { // ===> (3)
this.personOpsHandler = personOpsHandler;
}
public void run(ChronicleContext context, ChronicleChannel channel) { // ===> (4)
channel.eventHandlerAsRunnable(
personOpsHandler.responder(channel.methodWriter(ResponseSender.class))
).run();
}
@Override
public ChronicleChannel asInternalChannel(ChronicleContext context, // ===> (5)
ChronicleChannelCfg channelCfg) {
throw new UnsupportedOperationException("Internal Channel not supported");
}
}
В базовом классе реализована общая функциональность платформы. Наш класс будет предоставлять необходимую информацию для нашего сервиса.
Ссылка на реализацию методов обработчика.
Реализация
PersonOpsHandler
внедряется в обработчик с помощью конструктора.Когда инициируется новое подключение к каналу, запускается экземпляр нашего обработчика, при этом необходимые объекты
MethodReader
иMethodWriter
инициализируются корректно. Это инкапсулируется в методеrun()
и происходит для каждого установленного соединения канала.В этом примере класса мы явно запретили создание экземпляра обработчика для запуска с внутренним каналом.
Класс драйвера сервиса
После выполнения этих шагов класс драйвера для сервиса становится простым и более или менее идентичен предыдущему примеру, использующему класс ChronicleGatewayMain
для создания конфигурации канала:
public class PersonSvcMain {
static final int PORT = Integer.getInteger("port", 7771);
public static void main(String... args) throws IOException {
System.setProperty("port", "" + PORT);
ChronicleGatewayMain.main(args);
}
}
Клиент
Мы можем реализовать простой клиент для нашего сервиса Person, создав канал и отправив запросы к нашему сервису.
public class PersonClient {
private static final String URL = System.getProperty("url", "tcp://localhost:" + PersonSvcMain.PORT); // ===> (1)
public static void main(String[] args) {
try (ChronicleContext context = ChronicleContext.newContext(URL)) {
ChronicleChannel channel = context.newChannelSupplier(new PersonSvcHandler(new PersonOpsProcessor())) // ===> (2)
.get();
final PersonOps personOps = channel.methodWriter(PersonOps.class); // ===> (3)
Person thePerson = new Person()
.name("George")
.timestampNS(SystemTimeProvider.CLOCK.currentTimeNanos())
.userName(Base85.INSTANCE.parse("georgeb")));
;
personOps.addPerson(thePerson);
StringBuilder evtType = new StringBuilder();
ReqStatus response = channel.readOne(evtType, ReqStatus.class);
Jvm.startup().on(PersonClient.class, " >>> " + evtType + ": " + response);
}
}
}
URL-адрес по умолчанию настроен с номером порта, который был настроен на сервере.
Канал создается, и в него внедряется экземпляр нашего пользовательского обработчика.
После создания мы можем использовать метод MethodWriter канала для создания методов-заглушек, которые будут отправлять соответствующие сериализованные события в сервис.
Резюме
В Chronicle Wire были добавлены новые функции, позволяющие взаимодействовать с другими компонентами по TCP/IP сети. В этой статье описаны основные идеи того, как это будет работать в Wire, и приведены некоторые простые примеры.
Существует множество других сценариев использования этой быстрой и эффективной связи в распределенных службах. Дополнительные примеры доступны в проекте Chronicle Wire GitHub , наряду с примерами из этой статьи.