Pull to refresh
104.01
Rating
Иннотех
We help the businesses with digital transformation

Synchronous Request-Response using REST and Apache Kafka

Иннотех corporate blog High performance *Programming *Java *Apache *
Translation
Original author: Kirill Voronin

On one of the Innotech’s projects, we received a task to convert asynchronous requests into synchronous ones. Basically, the purpose was to integrate REST and Apache Kafka into the same request.

In more detail, we have two services that communicate with each other. Let’s call them A and B. Service A receives a request from a consumer for data that is stored in service B. Thus, service A sends a request for data to B in REST and waits for the response of this request in Kafka. The user is waiting for data until this response is received.

It appears to be a common problem, and the solution should be in Google or Stack Overflow. We managed to find the solution of a similar problem only in an existing library connecting two servers to Kafka. So the problem was solved using Java, Spring framework and a little IT-wit.

Problem Statement

Before beginning implementation, we need to state the problem and understand what we have and what we want to achieve.

We have a Client service that will have one end-point. An end-point, in turn, will take a normal string, send it to the second service, and wait for the response in Kafka.

And we have the second service — Server. It will receive a REST string from the Client service, convert it to UpperCase and return in response by Kafka.

The Client service awaits for the response from the Server. The user will wait for the result until the response is delivered. Waiting for a response may take a long time. To avoid too high of latency, it is necessary to provide an interrupt timeout, e.g. 10 seconds. But this condition should not be necessary; we should be able to wait indefinitely for a response. But definitely not waiting for 7.5 million years, otherwise the response could be disappointing.

This is an example of how it works. The consumer sends a string to the Client service, e.g. “abc123”, and until the response from the Server service is delivered, they will be kept hanging. The response which will be returned to the consumer from the Server should be “ABC123”. If the wait time exceeds the timeout value, an HTTP code with a 504 (Gateway Timeout) error will be returned instead of the response.

Client Implementation

I will only describe the main points. If you need the whole code, there is a link to the repository below.

SenderReceiver Implementation

The logic core is a SenderReceiver class implementation, which is responsible for the process's rest state until it receives data from the outside.

This class consists of two main methods: a receive() method is responsible for resting until data is received, and a send() method is responsible for receiving data and waking up the receive() method. To understand the current state of the thread, 1) whether resting or waiting (the receive() method), or 2) receiving and waking up (the send() method), we will create a boolean flag called transfer and initialize it using the true value.

private boolean transfer = true;

Now let’s implement the receive() method:

public synchronized void receive() {
    while (transfer) {
        if (timeout != 0 && start.before(new Date(System.currentTimeMillis() - timeout))) {
            timeoutException = new TimeoutException();
            Thread.currentThread().interrupt();
            return;
        }
        try {
            wait(timeout);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    Thread.currentThread().interrupt();
}

Since initially the flag transfer = true, the process goes into a standby mode until it is woken up by the send() method, or milliseconds set in timeout elapse.

If the timeout value is different from 0, it means that the thread interrupt mechanism by runtime has been set. To do this, we must compare the current time offset by this value with the start time of the thread. If this time is exceeded, we save TimeoutException() into a variable which is initially null (we’ll need it later), and terminate the thread. If the transfer flag becomes false and the thread has not finished waiting using a timeout, we will simply exit the loop and terminate the thread.

You can see that the timeout with a value of 0 is unique. If it is 0, the thread will wait indefinitely to wake up. After it is woken up and transfer changes to a false value, the thread will terminate.

Now, let’s implement the send() method:

public synchronized void send(final T data) {
    transfer = false;
    this.data = data;
    notifyAll();
}

You can see that the send() method is quite simple. It receives data from outside, changes our transfer flag to false, saves the data, and wakes up all the threads that are hanging in wait().

In fact, there is another important auxiliary method called getData() which either returns the data stored in data, or generates an error if the relevant variable is not null.

public T getData() throws TimeoutException {
    if (Objects.nonNull(timeoutException))
        throw timeoutException;
    return data;
}

SenderReceiverMap Implementation

After we have implemented the mechanism of waiting for a response and waking up once it is delivered, we need to implement a class that will store a set of waits. This is necessary in order to link requests from the Client service with responses from the Server service. This way, a lot of users can “pull” our end-point simultaneously, and we will return each user the data they requested without getting mixed up. Let’s call this class SenderReceiverMap.

Obviously, the responses from the Server can be delivered in any order – it depends on the processing time of a particular request. For example, the processing time of one request may be 5 seconds, and of another may be 3 seconds. To be able to look for connections between the thread and the user’s request, we need to label them uniquely. To do this, we must enter an id of the request. To perform the search quickly, we need to use Map. Since we are working with threads, we need to use safe collections. Thus, we will get the following:

private final ConcurrentMap<T, SenderReceiver> senderReceiverConcurrentMap;

As you can probably guess, T is an id type. It can be anything, such as an Integer, String, or UUID. I prefer UUID.

To add a new wait, we need to implement a method which will receive a pre-generated request id and which we will additionally transfer to the Server (more on this later) and timeout.

public Thread add(T id, Long timeout) {
    SenderReceiver<V> responseWait = new SenderReceiver<V>(timeout);
    senderReceiverConcurrentMap.put(id, responseWait);
    Runnable task = responseWait::receive;
    return new Thread(task);
}

V is a type of the message data. In our case, it is String. You can see that we create a SenderReceiver and add it to the collection with a relevant request id. Then we create a new Thread() and immediately return it, so that we can pause it until the data is delivered in the send() method of the SenderReceiver class.

We will also need methods that can return the required SenderReceiver from the SenderReceiverConcurrentMap, and check if there is id in the SenderReceiverConcurrentMap, and a method to remove a request from the SenderReceiverConcurrentMap. Review this implementation below, since there is nothing to comment on here.

public SenderReceiver<V> get(T id) {
    return senderReceiverConcurrentMap.get(id);
}
public Boolean containsKey(T id) {
return senderReceiverConcurrentMap.containsKey(id);
}
public SenderReceiver remove(T id) {
return senderReceiverConcurrentMap.remove(id);
}

Implementing a Request for Data from the Server Service

Now we will implement a request to the Server service via REST, and add its id to the collection together with the wait for the result.

public String get(String text) throws TimeoutException {
    UUID requestId = UUID.randomUUID();
    while (senderReceiverMap.containsKey(requestId)) {
        requestId = UUID.randomUUID();
    }

    String responseFromServer = this.sendText(requestId, text);
    System.out.println("REST response from server: " + responseFromServer);

    Thread thread = senderReceiverMap.add(requestId, timeout);
    thread.start();
    try {
        thread.join();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    String responseKafka;
    try {
        responseKafka = senderReceiverMap.get(requestId).getData();
    } catch (TimeoutException e) {
        throw e;
    } finally {
        senderReceiverMap.remove(requestId);
    }
    return responseKafka;
}

As you can see, we receive a string from the user in this method. Next, we create a request id and send it together with the received string. We add the request together with its id to the collection of requests, and run the resulting Thread. At this step, the process will freeze until someone “pulls” the send() method from the SenderReceiver object, which can be found by a particular request id.

If someone calls the send() method and sends data to it, or the thread itself terminates via timeout, the method will continue to work. It calls senderReceiverMap.get(requestId).getData() which will return either TimeoutException or the data sent to the send() method. All we have to do is to remove the already-processed request from the collection and return the data we received.

KafkaListener Implementation

The information explained above makes clear that we only have to call the send() method from a particular SenderReceiver wait object that can be easily found by its id. Since we synchronize requests with Kafka, we should call this method when we receive the data from it.

@KafkaListener(topics = "${kafka.topic}", groupId = "${kafka.groupId}")
public void listenGroupFoo(ConsumerRecord<String, KafkaMessage<String>> record) {
    UUID rqId = this.getRqId(record.headers());
    if (senderReceiverMap.containsKey(rqId)) {
        SenderReceiver<String> stringSenderReceiver = senderReceiverMap.get(rqId);
        stringSenderReceiver.send(record.value().getData());
    }
}

There is nothing complicated here either: from headers we obtain a request id required to obtain the relevant SenderReceiver from Map, which we will use to call the send() method.

As you can see, there is no need to use Kafka for synchronization; we can use any other thread as long as we have a request id and the data we want to return. For example, you could use a different message broker, or a totally different REST request. This is a flexible solution, because all you need to do is to call send().

Server Service Implementation

This is really very simple. Just don’t forget to change the port on which the Server service will run to ensure it will run simultaneously with the Client service. To do this, set the following parameter in the application.properties:

server.port=8888

End-Point Implementation

The Server service must have an end-point that receives data from the Client service and sends the result to Kafka linked with the request id.

@PostMapping("/test")
public String test(@RequestBody RequestDto request) {

    Runnable runnable =
            () -> {
                System.out.println("Start requestId: " + request.getRequestId() + "   text: " + request.getData());

                try {
                    int sleepMs = ThreadLocalRandom.current().nextInt(0, 10000 + 1);
                    System.out.println("RequestId: " + request.getRequestId() + " sleep: " + sleepMs + "ms");
                    Thread.sleep(sleepMs);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                kafkaMessageSender.send(request.getRequestId(), new KafkaMessage<>(request.getData().toUpperCase()));

                System.out.println("End requestId: " + request.getRequestId());
            };
    Thread thread = new Thread(runnable);
    thread.start();

    return "Ok!";
}

This end-point takes a structure which contains requestId — a request id and data — the data needed to be processed. This is a string in our case. This end-point creates a separate thread, which doesn’t wait for a thread to terminate, but immediately returns an “Ok!” string via REST in response. In the thread, we emulate how hard the Server works. To do this, we randomly generate the number of milliseconds during which the thread will fall asleep, and after it wakes up it will send data (an uppercase string) to Kafka.

Implementation of Sending a Message in Kafka

Sending in Kafka looks like this:

public void send(final UUID requestId, final KafkaMessage<String> message) {
    ProducerRecord<String, KafkaMessage<String>> record = new ProducerRecord<>(topic, message);
    record.headers().add(new RecordHeader(RQ_ID, requestId.toString().getBytes()));
    ListenableFuture<SendResult<String, KafkaMessage<String>>> future = kafkaTemplate.send(record);
    future.addCallback((success) -> { }, System.out::println );
    kafkaTemplate.flush();
}

In the headers, we set a new RQ_ID header where we save a request id, and then we just call send() where we send the processed data.

This where the Server service stops.

Conclusions

In fact, you don’t have to use REST+Kafka in this solution. As you can see, the solution is universal, and this implementation can easily be changed to fit any interaction, whether it is REST+REST, or Kafka+Kafka, or even pigeon mail.

You can find a case study here.

Tags:
Hubs:
Total votes 1: ↑1 and ↓0 +1
Views 1K
Comments 0
Comments Leave a comment

Posts

Information

Website
inno.tech
Registered
Founded
Employees
5,001–10,000 employees
Representative
Дмитрий