На одном из проектов Группы «Иннотех» появилась задача перевода асинхронных запросов в синхронные. По сути, нужно было подружить REST и Apache Kafka в одном запросе.
Если разложить задачу по полочкам, то у нас есть два сервиса, которые общаются между собой — назовём их А и Б. В сервис А приходит потребитель с запросом на получение данных, которые лежат в сервисе Б. Таким образом, сервис А отправляет по REST запрос данных в Б и ожидает ответ на запрос в Kafka. Пока этот ответ не получен, пользователь ждёт данные.
Казалось бы, популярная задача и ответ должен быть в Google или Stack Overflow. Но удалось найти только решение подобной задачи через готовую библиотеку, соединяющую два сервера с Kafka. Поэтому проблема была решена самостоятельно с помощью Java, фреймворка Spring и небольшой ИТ-смекалки.
Постановка задачи
Перед тем как начать реализацию, необходимо поставить задачу и понять, что у нас есть и чего хотим добиться.
У нас есть сервис Client, у которого будет один end-point. А end-point, в свою очередь, будет принимать обычную строку, отправлять её второму сервису и ждать от него ответа в Kafka.
И второй сервис — Server. Он будет принимать строчку по REST от сервиса Client, переводить её в UpperCase и возвращать в ответ по Kafka.
Сервис Client ожидает ответ от Server. Пока он не получит ответ, пользователь будет ждать результат. При этом может быть такая ситуация, что ожидание ответа затягивается. Чтобы не допустить слишком большой паузы, необходимо предусмотреть прерывание по таймауту, например, в 10 секунд. Но это условие не обязательно, мы должны иметь возможность ожидать ответа бесконечно долго. Но точно не 7,5 миллионов лет ровно, иначе ответ может разочаровать.
Таким образом, можно привести пример работы. Потребитель отправляет в сервис Client строку, например, «abc123», и пока не получит ответ от сервиса Server будет висеть в ожидании. Ответ от Server должен быть «ABC123», который будет возвращён потребителю. Если время ожидание ответа превысит таймаут, то вместо ответа будет возвращён HTTP-код с ошибкой 504 (Gateway Timeout).
Реализация Client
Буду описывать только основные моменты. Если требуется код целиком, то ниже прикреплена ссылка на репозиторий.
Реализация SenderReceiver
Ядро логики — это реализация класса SenderReceiver
, который отвечает за «сон» процесса, пока тот не получит данные извне.
Этот класс состоит из двух основных методов: receive()
— отвечает за засыпание, пока данные не будут получены, и send()
— получает данные и будит receive()
. Чтобы понимать, в каком состоянии поток сейчас находится, 1) спит и ждёт (метод receive()
), или же 2) получает и пробуждает (send()
), мы заведём флаг boolean
, который назовём transfer
и инициализируем значением true
.
private boolean transfer = true;
Теперь реализуем receive()
:
public synchronized void receive() {
while (transfer) {
if (timeout != 0 && start.before(new Date(System.currentTimeMillis() - timeout))) {
timeoutException = new TimeoutException();
Thread.currentThread().interrupt();
return;
}
try {
wait(timeout);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
Thread.currentThread().interrupt();
}
Поскольку изначально флаг transfer = true
, то процесс переходит в режим ожидания, пока его не разбудит send()
, либо же не пройдёт количество миллисекунд, заданное в timeout
.
Если значение timeout
отличное от 0, то значит задан механизм прерывания потока по времени выполнения. Для этого мы должны проверять текущее время, смещённое на это значение, со временем старта потока. Если время превышено, то мы сохраняем TimeoutException()
в переменную, которая изначально равна null, это нам пригодится чуть попозже, и завершаем поток. В случае если флаг transfer
стал false
и поток не завершил ожидание по таймауту, мы просто выходим из цикла и завершаем поток.
Можно заметить, что timeout
со значением 0 уникален. Если он равен 0, то поток будет ожидать пробуждения бесконечно долго. После того как его пробудят и transfer
перейдёт в значение false
, поток завершится.
Далее реализуем метод send()
:
public synchronized void send(final T data) {
transfer = false;
this.data = data;
notifyAll();
}
Видно, что метод send()
очень прост. Он принимает данные извне, переводит наш флаг transfer
в false
, сохраняет данные и пробуждает все потоки, которые висят в wait()
.
На самом деле есть ещё один важный вспомогательный метод getData()
, который либо возвращает данные, которые хранятся в data
, либо выкидывает ошибку, если соответствующая переменная не null.
public T getData() throws TimeoutException {
if (Objects.nonNull(timeoutException))
throw timeoutException;
return data;
}
Реализация SenderReceiverMap
После того как мы реализовали механизм ожидания ответа и пробуждения при его получении, необходимо реализовать класс, который будет хранить в себе набор таких ожиданий. Это нужно для того, чтобы связать запросы из сервиса Client с ответами из сервиса Server. Так одновременно множество пользователей могут «дёрнуть» наш end-point, а нам необходимо не запутаться и отдать те данные, которые запросил конкретный потребитель. Назовём этот класс SenderReceiverMap
.
Очевидно, что ответы от Server могут прийти к нам в произвольном порядке — это зависит от времени обработки конкретного запроса. Например, время обработки одного запроса может быть 5 секунда, а второго — 3. Чтобы мы смогли искать связи потока и запроса от пользователя, нам необходимо как-то их уникально помечать. Для этого мы введём id
запроса. Чтобы мы смогли быстро искать, нам необходимо использовать Map. Так как мы работаем с потоками, то необходимо пользоваться безопасными коллекциями. Итого получится:
private final ConcurrentMap<T, SenderReceiver> senderReceiverConcurrentMap;
Как несложно догадаться T
— это тип id. Он может быть любым, например, Integer, String, UUID. Я предпочитаю UUID.
Для того чтобы добавить новое ожидание, необходимо реализовать метод — он будет принимать заранее сгенерированный id
запроса, который мы дополнительно передаём в Server (но об этом чуть позже) и timeout
.
public Thread add(T id, Long timeout) {
SenderReceiver<V> responseWait = new SenderReceiver<V>(timeout);
senderReceiverConcurrentMap.put(id, responseWait);
Runnable task = responseWait::receive;
return new Thread(task);
}
V
— тип данных сообщения. В нашем случае String. Видно, что мы создаём SenderReceiver
и добавляем его в коллекцию с соответствующимid
запроса. Далее создаём новый Thread()
и тут же возвращаем, чтобы мы смогли приостановить его, пока не получим данные в метод send()
в классе SenderReceiver
.
Также потребуются методы, которые смогут вернуть нужный нам SenderReceiver
из senderReceiverConcurrentMap
, проверить есть ли id
в senderReceiverConcurrentMap
, и метод удаления запроса из senderReceiverConcurrentMap
. Просто приведу реализацию, так как комментировать тут нечего.
public SenderReceiver<V> get(T id) {
return senderReceiverConcurrentMap.get(id);
}
public Boolean containsKey(T id) {
return senderReceiverConcurrentMap.containsKey(id);
}
public SenderReceiver remove(T id) {
return senderReceiverConcurrentMap.remove(id);
}
Реализация запроса данных у сервиса Server
Теперь необходимо реализовать запрос к сервису Server по REST, а также добавить в коллекцию его идентификатор вместе с ожиданием результата.
public String get(String text) throws TimeoutException {
UUID requestId = UUID.randomUUID();
while (senderReceiverMap.containsKey(requestId)) {
requestId = UUID.randomUUID();
}
String responseFromServer = this.sendText(requestId, text);
System.out.println("REST response from server: " + responseFromServer);
Thread thread = senderReceiverMap.add(requestId, timeout);
thread.start();
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
String responseKafka;
try {
responseKafka = senderReceiverMap.get(requestId).getData();
} catch (TimeoutException e) {
throw e;
} finally {
senderReceiverMap.remove(requestId);
}
return responseKafka;
}
Как видно, в этом методе мы принимаем строку от пользователя. Далее, создаём id
запроса и отправляем его вместе с полученной строкой. Добавляем запрос вместе с его id
в коллекцию запросов и запускаем полученный Thread
. На этом шаге процесс зависнет, пока кто-то не «дёрнет» метод send()
у объекта SenderReceiver
, который можно найти по конкретному id
запроса.
После того как кто-то вызовет метод send()
и передаст в него данные, либо сам поток завершит существование по timeout
, метод продолжит работу. Он вызовет senderReceiverMap.get(requestId).getData()
, который и вернёт либо TimeoutException
, либо данные, пришедшие в метод send()
. Осталось только удалить из коллекции уже обработанный запрос и вернуть данные, которые мы получили.
Реализация KafkaListener
Из вышеописанного понятно, что нам осталось только вызвать метод send()
у конкретного объекта ожидания SenderReceiver
, который можем запросто найти по его id
. Так как мы синхронизируем запросы с Kafka
, значит и вызывать метод должны, когда к нам приходят данные из него.
@KafkaListener(topics = "${kafka.topic}", groupId = "${kafka.groupId}")
public void listenGroupFoo(ConsumerRecord<String, KafkaMessage<String>> record) {
UUID rqId = this.getRqId(record.headers());
if (senderReceiverMap.containsKey(rqId)) {
SenderReceiver<String> stringSenderReceiver = senderReceiverMap.get(rqId);
stringSenderReceiver.send(record.value().getData());
}
}
Тут тоже ничего сложного: из headers
получаем id
запроса, по нему мы достаём из Map нужный SenderReceiver
и просто на нём вызываем метод send()
.
Таким образом, понятно, что необязательно использовать Kafka для синхронизации, мы можем использовать любой другой поток, главное, чтобы были id
запроса и данные, которые хотим вернуть. Например, можно использовать другой брокер сообщений, либо вообще другой запрос REST. Решение гибкое, главное — вызвать send()
.
Реализация сервиса Server
Здесь всё предельно просто. Важно не забыть поменять port
, на котором будет запускаться сервис Server, чтобы обеспечить его одновременный запуск с сервисом Client. Для этого в application.properties
устанавливаем следующий параметр:
server.port=8888
Реализация end-point
В сервисе Server должен быть end-point, который принимает данные от сервиса Client и отправляет результат в Kafka, связанный с id
запроса.
@PostMapping("/test")
public String test(@RequestBody RequestDto request) {
Runnable runnable =
() -> {
System.out.println("Start requestId: " + request.getRequestId() + " text: " + request.getData());
try {
int sleepMs = ThreadLocalRandom.current().nextInt(0, 10000 + 1);
System.out.println("RequestId: " + request.getRequestId() + " sleep: " + sleepMs + "ms");
Thread.sleep(sleepMs);
} catch (InterruptedException e) {
e.printStackTrace();
}
kafkaMessageSender.send(request.getRequestId(), new KafkaMessage<>(request.getData().toUpperCase()));
System.out.println("End requestId: " + request.getRequestId());
};
Thread thread = new Thread(runnable);
thread.start();
return "Ok!";
}
Этот end-point принимает структуру, в которой содержатся requestId
— id
запроса и data
— данные, которые необходимо обработать. В нашем случае это строка. Этот end-point создаёт отдельный поток, завершение которого он не дожидается, а сразу же возвращает по REST в ответ строчку "Ok!"
. В потоке мы эмулируем тяжёлую работу Server — для этого мы рандомно генерируем количество миллисекунд, на которые поток заснёт, а после того как проснётся, отправит данные (строку в верхнем регистре) в Kafka.
Реализация отправки сообщения в Kafka
Отправка в Kafka выглядит следующим образом:
public void send(final UUID requestId, final KafkaMessage<String> message) {
ProducerRecord<String, KafkaMessage<String>> record = new ProducerRecord<>(topic, message);
record.headers().add(new RecordHeader(RQ_ID, requestId.toString().getBytes()));
ListenableFuture<SendResult<String, KafkaMessage<String>>> future = kafkaTemplate.send(record);
future.addCallback((success) -> { }, System.out::println );
kafkaTemplate.flush();
}
Мы устанавливаем в headers
новый header RQ_ID
, куда записываем id
запроса, а затем просто вызываем send()
, в который отправляем обработанные данные.
На этом работа сервиса Server заканчивается.
Выводы
На самом деле в данном решении не обязательно использовать именно REST+Kafka. Как можно заметить, решение универсальное и данную реализацию легко изменить на любые взаимодействия — хоть REST+REST, хоть Kafka+Kafka, хоть на голубиную почту.
Рабочий пример с кодом можно найти тут.