Пишем чат на Vert.x 3

На Хабре не так уж много статей, посвященных Vert.x, раз, два и обчёлся. Поэтому решил внести свой вклад и опубликовать небольшой урок, в котором рассказано, как написать простой чат с помощью Vert.x 3.



Содержание


  1. Что такое Vert.x?
  2. О чате
  3. Структура проекта
  4. Сервер
  5. Клиент
  6. Тестирование
  7. Сборка и запуск исполняемого модуля
  8. Полный исходный код
  9. Полезные ресурсы

Что такое Vert.x?


Vert.x это событийно-ориентированный фреймворк работающий на JVM. На данный момент последняя версия этого фреймворка 3.2. Vert.x 3 предоставляет следующие возможности:

  • Мультиязычность. Компоненты приложения могут быть разработаны на Java, JavaScript, Scala, Python, Ruby, Groovy, а также Clojure;
  • Параллелизм. Довольно-таки простая модель параллелизма, освобождающая от хлопот многопоточного программирования;
  • Асинхронность. Простая модель асинхронного взаимодействия без блокировки;
  • Распределенная шина событий. Включающая как клиентскую, так и серверную стороны. Играет непосредственно главную роль в нашем чате;
  • Java 8. Vert.x 3 требует версии Java не ниже 8.

Более подробно о чате


Приложение запускается на сервере, после развертывания публикуется адрес анонимного чата, к которому можно присоединиться, через любой браузер. По этому адресу приложение транслирует в реальном времени сообщения от всех пользователей.



Поехали!


Разработку будем вести в IntelliJ IDEA 15, достаточно Community-версии.

Структура проекта


Создаем maven-проект. К сожалению готового архетипа, для vert.x 3 нет (хотя для 2 существует), поэтому генерим обычный maven-проект. Его конечная структура будет иметь следующий вид:

структура
src
+---main
|   +---java
|   |   |   Server.java
|   |   |   VerticleLoader.java
|   |   |
|   |   \---webroot
|   |           date-format.js
|   |           index.html
|   |           vertx-eventbus.js
|   |
|   \---resources
\---test
    \---java
            ChatTest.java


В pom.xml задаем следующие зависимости. Где vertx-core библиотека поддержки Verticles (более подробно, что это такое, немного дальше), vertx-web – позволяет использовать обработчик событий (и не только) и vertx-unit – для модульного тестирования.

pom.xml
<dependencies>
    <dependency>
        <groupId>io.vertx</groupId>
        <artifactId>vertx-core</artifactId>
        <version>3.0.0</version>
    </dependency>

    <dependency>
        <groupId>io.vertx</groupId>
        <artifactId>vertx-web</artifactId>
        <version>3.0.0</version>
    </dependency>

    <dependency>
        <groupId>io.vertx</groupId>
        <artifactId>vertx-unit</artifactId>
        <version>3.0.0</version>
        <scope>test</scope>
    </dependency>
</dependencies>


Сервер


Особенностью данного фреймворка является то, что все компоненты должны быть представлены в виде Verticle.

Verticle – это некоторый аналог сервлета, является атомарной единицей развёртывания. Сами разработчики описывают Verticle, как нечто похожее на актера в модели акторов. Собственно, эта конструкция позволяет организовать высокую степень параллелизма и асинхронности, чем и славится Vert.x. В реализации нашего сервера, мы наследуем абстрактный класс AbstractVerticle.

Переопределяемый нами метод start() является точкой входа в программу. Сначала выполняется развертывание приложения – функция deploy(), затем вешается обработчик – метод handle().

Server.java
public class Server extends AbstractVerticle {
    private Logger log = LoggerFactory.getLogger(Server.class);
    private SockJSHandler handler = null;
    private AtomicInteger online = new AtomicInteger(0);

    //точка входа.
    @Override
    public void start() throws Exception {

        if (!deploy()) {
            log.error("Failed to deploy the server.");
            return;
        }

        handle();
    }

    //...
}


Для развертывания приложения необходимо получить свободный порт, в случае, если не удалось его получить, в hostPort будет отрицательное значение. Далее создаем роутер, указываем для него адрес получателя и вешаем обработчик. И наконец, запускаем HTTP-Server на доступном порту.

Server.java
//развертывание приложения.
private boolean deploy() {
    int hostPort = getFreePort();

    if (hostPort < 0)
        return false;

    Router router = Router.router(vertx);

    //обработчик событий.
    handler = SockJSHandler.create(vertx);

    router.route("/eventbus/*").handler(handler);
    router.route().handler(StaticHandler.create());

    //запуск веб-сервера.
    vertx.createHttpServer().requestHandler(router::accept).listen(hostPort);

    try {
        String addr = InetAddress.getLocalHost().getHostAddress();
        log.info("Access to \"CHAT\" at the following address: \nhttp://" + addr + ":" + hostPort);
    } catch (UnknownHostException e) {
        log.error("Failed to get the local address: [" + e.toString() + "]");
        return false;
    }

    return true;
}


Процесс получения свободного порта представлен во фрагменте кода ниже. Сначала проверяется static-поле PROCESS_ARGS на наличие аргументов запуска приложения, одним из которых может быть порт развёртывания приложения, заданный пользователем. В случае, если порт не был задан, используется порт по умолчанию: 8080.

Server.java
//получение свободного порта для развертывания приложения.
private int getFreePort() {
    int hostPort = 8080;

    //если порт задан в качестве аргумента,
    // при запуске приложения.
    if (Starter.PROCESS_ARGS != null
            && Starter.PROCESS_ARGS.size() > 0) {
        try {
            hostPort = Integer.valueOf(Starter.PROCESS_ARGS.get(0));
        } catch (NumberFormatException e) {
            log.warn("Invalid port: [" + Starter.PROCESS_ARGS.get(0) + "]");
        }
    }

    //если некорректно указан порт.
    if (hostPort < 0 || hostPort > 65535)
        hostPort = 8080;

    return getFreePort(hostPort);
}


Если в качестве аргумента конструктора создания сокета, указан параметр со значением 0, то в таком случае будет выдан случайный свободный порт.

Когда порт уже занят (например, порт 8080 уже используется другим приложением, но при этом, он указан в качестве аргумента запуска текущего приложения), выбрасывается исключение BindException, в таком случае выполняется повторная попытка получения свободного порта.

Server.java
private int getFreePort(int hostPort) {
    try {
        ServerSocket socket = new ServerSocket(hostPort);
        int port = socket.getLocalPort();
        socket.close();

        return port;
    } catch (BindException e) {
        //срабатывает, когда указанный порт уже занят.
        if (hostPort != 0)
            return getFreePort(0);

        log.error("Failed to get the free port: [" + e.toString() + "]");
        return -1;
    } catch (IOException e) {
        log.error("Failed to get the free port: [" + e.toString() + "]");
        return -1;
    }
}


В случае успешного развертывания, начинается прослушивание шины событий по адресам: chat.to.server (входящие события) и chat.to.client (исходящие события).

После обработки очередного события на шине, необходимо чекнуть это событие.

Server.java
private void handle() {
    BridgeOptions opts = new BridgeOptions()
            .addInboundPermitted(new PermittedOptions().setAddress("chat.to.server"))
            .addOutboundPermitted(new PermittedOptions().setAddress("chat.to.client"));

    //обработка приходящих событий.
    handler.bridge(opts, event -> {
        if (event.type() == PUBLISH)
            publishEvent(event);

        if (event.type() == REGISTER)
            registerEvent(event);

        if (event.type() == SOCKET_CLOSED)
            closeEvent(event);

        //обратите внимание, после обработки события
        // должен вызываться говорящий сам за себя метод.
        event.complete(true);
    });
}


Любые события, которые происходят на шине, могут быть представлены 7 следующими типами:
Тип Событие
SOCKET_CREATED возникает при создании сокета
SOCKET_CLOSED при закрытии сокета
SEND попытка отправки сообщения от клиента к серверу
PUBLISH публикация сообщения клиентом для сервера
RECEIVE уведомление от сервера, о доставленном сообщении
REGISTER попытка зарегистрировать обработчик
UNREGISTER попытка отменить зарегистрированный обработчик

В нашем приложении нам достаточно лишь обрабатывать события с типом PUBLISH, REGISTER и SOCKET_CLOSED.

Событие с типом PUBLISH срабатывает, когда кто-то из пользователей отправляет сообщение в чат.
REGISTER – срабатывает тогда, когда пользователь регистрирует обработчик. Почему не SOCKET_CREATED? Потому что, событие с типом SOCKET_CREATED предшествует – REGISTER, и, естественно, пока клиент не зарегистрирует обработчик, он не сможет получать события.
SOCKET_CLOSED – возникает, всегда когда пользователь покидает чат или когда возникает непредвиденная ситуация.

При публикации сообщения, срабатывает обработчик и вызывает метод publishEvent. Проверяется адрес назначения, в случае, если он корректен, сообщение извлекается, затем проверяется и публикуется на шине событий для всех клиентов (в т.ч. и отправителя).

Server.java
private boolean publishEvent(BridgeEvent event) {
    if (event.rawMessage() != null
            && event.rawMessage().getString("address").equals("chat.to.server")) {
        String message = event.rawMessage().getString("body");
        if (!verifyMessage(message))
            return false;

        String host = event.socket().remoteAddress().host();
        int port = event.socket().remoteAddress().port();

        Map<String, Object> publicNotice = createPublicNotice(host, port, message);
        vertx.eventBus().publish("chat.to.client", new Gson().toJson(publicNotice));
        return true;
    } else
        return false;
}


Генерация уведомления для публикации сообщения выглядит следующим образом:

Server.java
//создание уведомления о публикации сообщения.
private Map<String, Object> createPublicNotice(String host, int port, String message) {
    Date time = Calendar.getInstance().getTime();

    Map<String, Object> notice = new TreeMap<>();
    notice.put("type", "publish");
    notice.put("time", time.toString());
    notice.put("host", host);
    notice.put("port", port);
    notice.put("message", message);
    return notice;
}


Вход и выход пользователей в чат обрабатываются следующим способом:

Server.java
//тип события - регистрация обработчика.
private void registerEvent(BridgeEvent event) {
    if (event.rawMessage() != null
            && event.rawMessage().getString("address").equals("chat.to.client"))
        new Thread(() ->
        {
            Map<String, Object> registerNotice = createRegisterNotice();
            vertx.eventBus().publish("chat.to.client", new Gson().toJson(registerNotice));
        }).start();
}

//создание уведомления о регистрации пользователя.
private Map<String, Object> createRegisterNotice() {
    Map<String, Object> notice = new TreeMap<>();
    notice.put("type", "register");
    notice.put("online", online.incrementAndGet());
    return notice;
}

//тип события - закрытие сокета.
private void closeEvent(BridgeEvent event) {
    new Thread(() ->
    {
        Map<String, Object> closeNotice = createCloseNotice();
        vertx.eventBus().publish("chat.to.client", new Gson().toJson(closeNotice));
    }).start();
}

//создание уведомления о выходе пользвателя из чата.
private Map<String, Object> createCloseNotice() {
    Map<String, Object> notice = new TreeMap<>();
    notice.put("type", "close");
    notice.put("online", online.decrementAndGet());
    return notice;
}


Проверка публикуемого сообщения достаточно примитивная, но для примера и этого достаточно, т.е. вы её можете сами усложнить проверяя, например, на передачу скриптов в виде сообщения и прочих хаков.

Server.java
private boolean verifyMessage(String msg) {
    return msg.length() > 0
            && msg.length() <= 140;
}


Для обмена данными используется формат JSON, поэтому файл pom.xml необходимо обновить, добавив следующую зависимость:

pom.xml
<dependency>
    <groupId>com.google.code.gson</groupId>
    <artifactId>gson</artifactId>
    <version>2.3.1</version>
</dependency>


Также, в нашем чате будет отображаться счетчик числа онлайн-пользователей, т.к. наше приложение многопоточное, оно гарантировано должно быть thread-safety, поэтому наиболее простой способ объявить наш счётчик как AtomicInteger.

Клиент


Создаем index.html в разделе webroot, как это представленной на структуре в начале статьи. Для общения с сервером, а точнее, с шиной событий используется библиотека vertx-eventbus.js.

Для форматирования даты, будем использовать библиотеку date-format.js, довольно-таки простая и удобная. Помимо этого, в качестве html оформления будем использовать bootstrap версии 3.3.5, sockjs.js версии 0.3.4, необходимый для библиотеки vertx-eventbus.js и jquery версии 1.11.3.

Обработчик шины событий на стороне клиента выглядит следующим образом:

index.html
var online = 0; //счетчик онлайн-пользователей.
var eb = new EventBus("/eventbus/"); //шина событий.

eb.onopen = function() {
    //обработчик событий в чате.
    eb.registerHandler("chat.to.client", eventChatProcessing);
};

//обработчик событий в чате.
function eventChatProcessing(err, msg) {
    var event = jQuery.parseJSON(msg.body);

    if (event.type == 'publish') { //сообщение.
        var time = Date.parse(event.time);
        var formattedTime = dateFormat(time, "dd.mm.yy HH:MM:ss");

        //добавить сообщение.
        appendMsg(event.host, event.port, event.message, formattedTime);
    } else { //изменение числа пользователей.
        //type: register или close.
        online = event.online;
        $('#online').text(online);
    }
};


В случае, если тип события publish (т.е. публикация сообщения), то данные из события (event) формируются в кортеж и присоединяются к таблице сообщений. Иначе, когда тип события соответствует новому или ушедшему пользователю, просто обновляется счетчик онлайн пользователей. Функция добавления сообщения довольно-таки проста.

index.html
//добавление нового сообщения.
function appendMsg(host, port, message, formattedTime) {
    var $msg = $('<tr bgcolor="#dff0d8"><td align="left">' + formattedTime + '</td><td align="left">' + host + ' [' + port + ']' + '</td><td>' + message + '</td></tr>');

    var countMsg = $('#messages tr').length;
    if (countMsg == 0)
        $('#messages').append($msg);
    else
        $('#messages > tbody > tr:first').before($msg);
}


Во время отправки сообщения, оно сначала публикуется по адресу “chat.to.server”, где его обрабатывает сервер, в случае, если сообщение проходит верификацию, оно рассылается всем клиентам, в т.ч. и отправителю.

index.html
$(document).ready(function() {
    //событие отправления сообщения.
    $('#chatForm').submit(function(evt) {
        evt.preventDefault();
        var message = $('#message').val();
        if (message.length > 0) {
            //отправление сообщения на шину событий.
            eb.publish("chat.to.server", message);
            $('#message').val("").focus();
            countChar();
        }
    });
});


Ну и наконец последний метод, который обрабатывает количество введенных символов, по условию, пользователь не может ввести сообщение длиной более 140 символов.

index.html
//счетчик введенных символов.
function countChar() {
    var len = $('#message').val().length;
    if (len > 140) {
        var msg = $('#message').val().substring(0, 140);
        $('#message').val(msg);
    } else {
        $('#charNum').text(140 - len);
        var per = 100 / 140 * len;
        $('#charNumProgressBar').css('width', per + '%').attr('aria-valuenow', per);
    }
};


Полная версия index.html, включая разметку, размещена в конце статьи.

После того, как мы написали серверную и клиентскую части, наступила очередь запуска приложения. Для запуска и удобной отладки, я рекомендую написать собственный загрузчик Verticle, хотя и есть более простая альтернатива, которую я приведу немного позже.

Единственно, значение, которое инициализирует переменную dir должно быть актуально, т.е. в действительности должен существовать такой путь. А также, переменная verticleID должна инициализироваться именем запускаемого verticle-класса, весь остальной код не подлежит изменению.

VerticleLoader.java
public class VerticleLoader {
    private static Vertx vertx;

    public static Vertx getVertx() {
        return vertx;
    }

    public static void load() {
        load(null);
    }

    public static void load(Handler<AsyncResult<String>> completionHandler) {
        VertxOptions options = new VertxOptions().setClustered(false);
        //путь до verticle-класса.
        String dir = "chat/src/main/java/";

        try {
            File current = new File(".").getCanonicalFile();
            if (dir.startsWith(current.getName()) && !dir.equals(current.getName())) {
                dir = dir.substring(current.getName().length() + 1);
            }
        } catch (IOException e) {
        }

        System.setProperty("vertx.cwd", dir);
        String verticleID = Server.class.getName();

        Consumer<Vertx> runner = vertx ->
        {
            try {
                if (completionHandler == null)
                    vertx.deployVerticle(verticleID);
                else
                    vertx.deployVerticle(verticleID, completionHandler);
            } catch (Throwable t) {
                t.printStackTrace();
            }
        };

        if (options.isClustered()) {
            Vertx.clusteredVertx(options, res ->
            {
                if (res.succeeded()) {
                    vertx = res.result();
                    runner.accept(vertx);
                } else {
                    res.cause().printStackTrace();
                }
            });
        } else {
            vertx = Vertx.vertx(options);
            runner.accept(vertx);
        }
    }

    public static void main(String[] args) {
        load();
    }
}


Теперь, когда загрузчик готов, создадим конфигурацию запуска: Run – Edit Configuration… – Add New Configuration (Alt + Insert) – Application. Указываем Main Class как VerticleLoader, сохраняем конфигурацию и запускаем.

Изображение конфигурации


PROFIT!

Обещанная альтернатива.

Альтернативная конфигурация
Необходимо создать конфигурацию запуска, как это представлено на картинке. На самом деле, класс Starter является главным классом, он содержит метод main, который-то и является точкой входа приложения.




Тестирование


Давайте протестируем разработанное нами приложение. Делать мы это будем с использованием JUnit, поэтому необходимо снова открыть pom.xml и добавить следующую зависимость:

pom.xml
<dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.12</version>
    <scope>test</scope>
</dependency>


В setUp мы создаем экземпляр Vertx и развертываем на него нашу Verticle. В отличии от традиционного JUnit методов, все текущие методы получают еще TestContext. Задача этого объекта соблюдать асинхронность наших тестов.

В методе tearDown() для объекта TestContext вызывается asyncAssertSuccess(), он терпит неудачу, если при завершении работы Verticle возникли проблемы.

ChatTest.java
@RunWith(VertxUnitRunner.class)
public class ChatTest {
    private Vertx vertx;
    private int port = 8080;
    private Logger log = LoggerFactory.getLogger(ChatTest.class);

    //@Ignore
    @Before
    public void setUp(TestContext context) throws IOException {
        VerticleLoader.load(context.asyncAssertSuccess());
        vertx = VerticleLoader.getVertx();
    }

    //@Ignore
    @After
    public void tearDown(TestContext context) {
        vertx.close(context.asyncAssertSuccess());
    }

    //...
}


В методе loadVerticleTest мы проверяем загрузку нашего приложения. Создаем клиента и пытаемся удостовериться, что приложение, развернутое по указанному нами адресу доступно. В случае успеха, мы получаем код состояния 200.

Затем, пытаемся получить содержимое страницы, заголовок которой должен содержать текст “Chat”.

Так как запрос и ответ являются асинхронными операциями, поэтому необходимо это как-то контролировать и получать уведомления, когда тест завершился для этого используется объект Async, вызывающий всегда метод complete() по завершению теста.

ChatTest.java
@Test
public void loadVerticleTest(TestContext context) {
    log.info("*** loadVerticleTest ***");

    Async async = context.async();
    vertx.createHttpClient().getNow(port, "localhost", "/", response ->
    {
        context.assertEquals(response.statusCode(), 200);
        context.assertEquals(response.headers().get("content-type"), "text/html");
        response.bodyHandler(body ->
        {
            context.assertTrue(body.toString().contains("<title>Chat</title>"));
            async.complete();
        });
    });
}


В методе eventBusTest создается клиент шины событий и вешается обработчик. В то время, пока клиент ждет какие-либо события на шине, публикуется сообщение. Обработчик реагирует на это и проверяет тело входящего события на эквивалентность, в случае успешной проверки тест завершается вызовом async.complete().

ChatTest.java
@Test
public void eventBusTest(TestContext context) {
    log.info("*** eventBusTest ***");

    Async async = context.async();
    EventBus eb = vertx.eventBus();
    eb.consumer("chat.to.server").handler(message ->
    {
        String getMsg = message.body().toString();
        context.assertEquals(getMsg, "hello");
        async.complete();
    });

    eb.publish("chat.to.server", "hello");
}


Запускаем тесты.

Посмотреть как...
Вкладка Maven Projects – Lifecycle – test – Run [test].


Сборка и запуск исполняемого модуля


Для этого необходимо в pom.xml добавить плагин maven-shade-plugin. Где Main-Verticle в нашем случае должен указывать на класс Server.

pom.xml
<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-shade-plugin</artifactId>
    <version>2.3</version>
    <executions>
        <execution>
            <phase>package</phase>
            <goals>
                <goal>shade</goal>
            </goals>
            <configuration>
                <transformers>
                    <transformer
                            implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                        <manifestEntries>
                            <Main-Class>io.vertx.core.Starter</Main-Class>
                            <Main-Verticle>Server</Main-Verticle>
                        </manifestEntries>
                    </transformer>
                </transformers>
                <artifactSet/>
                <outputFile>${project.build.directory}/${project.artifactId}-${project.version}-fat.jar</outputFile>
            </configuration>
        </execution>
    </executions>
</plugin>


Выполняем команду Run Maven Build, после чего в каталоге target появится chat-1.0-fat.jar. Для запуска приложения исполняемый модуль и папка webroot должны находиться в одном каталоге. Чтобы развернуть наше приложение на порту 12345 необходимо выполнить команду:
java -jar chat-1.0-fat.jar 12345


На этом всё. Успехов!

Полный исходный код


Server.java
import com.google.gson.Gson;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Starter;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.StaticHandler;
import io.vertx.ext.web.handler.sockjs.BridgeEvent;
import io.vertx.ext.web.handler.sockjs.BridgeOptions;
import io.vertx.ext.web.handler.sockjs.PermittedOptions;
import io.vertx.ext.web.handler.sockjs.SockJSHandler;

import java.io.IOException;
import java.net.BindException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;

import static io.vertx.ext.web.handler.sockjs.BridgeEvent.Type.*;

public class Server extends AbstractVerticle {
    private Logger log = LoggerFactory.getLogger(Server.class);
    private SockJSHandler handler = null;
    private AtomicInteger online = new AtomicInteger(0);

    //точка входа.
    @Override
    public void start() throws Exception {

        if (!deploy()) {
            log.error("Failed to deploy the server.");
            return;
        }

        handle();
    }

    //развертывание приложения.
    private boolean deploy() {
        int hostPort = getFreePort();

        if (hostPort < 0)
            return false;

        Router router = Router.router(vertx);

        //обработчик событий.
        handler = SockJSHandler.create(vertx);

        router.route("/eventbus/*").handler(handler);
        router.route().handler(StaticHandler.create());

        //запуск веб-сервера.
        vertx.createHttpServer().requestHandler(router::accept).listen(hostPort);

        try {
            String addr = InetAddress.getLocalHost().getHostAddress();
            log.info("Access to \"CHAT\" at the following address: \nhttp://" + addr + ":" + hostPort);
        } catch (UnknownHostException e) {
            log.error("Failed to get the local address: [" + e.toString() + "]");
            return false;
        }

        return true;
    }

    //получение свободного порта для развертывания приложения.
    private int getFreePort() {
        int hostPort = 8080;

        //если порт задан в качестве аргумента,
        // при запуске приложения.
        if (Starter.PROCESS_ARGS != null
                && Starter.PROCESS_ARGS.size() > 0) {
            try {
                hostPort = Integer.valueOf(Starter.PROCESS_ARGS.get(0));
            } catch (NumberFormatException e) {
                log.warn("Invalid port: [" + Starter.PROCESS_ARGS.get(0) + "]");
            }
        }

        //если некорректно указан порт.
        if (hostPort < 0 || hostPort > 65535)
            hostPort = 8080;

        return getFreePort(hostPort);
    }

    //если в качестве порта указано значение 0,
    // то выдается случайный свободный порт.
    private int getFreePort(int hostPort) {
        try {
            ServerSocket socket = new ServerSocket(hostPort);
            int port = socket.getLocalPort();
            socket.close();

            return port;
        } catch (BindException e) {
            //срабатывает, когда указанный порт уже занят.
            if (hostPort != 0)
                return getFreePort(0);

            log.error("Failed to get the free port: [" + e.toString() + "]");
            return -1;
        } catch (IOException e) {
            log.error("Failed to get the free port: [" + e.toString() + "]");
            return -1;
        }
    }

    private void handle() {
        BridgeOptions opts = new BridgeOptions()
                .addInboundPermitted(new PermittedOptions().setAddress("chat.to.server"))
                .addOutboundPermitted(new PermittedOptions().setAddress("chat.to.client"));

        //обработка приходящих событий.
        handler.bridge(opts, event -> {
            if (event.type() == PUBLISH)
                publishEvent(event);

            if (event.type() == REGISTER)
                registerEvent(event);

            if (event.type() == SOCKET_CLOSED)
                closeEvent(event);

            //обратите внимание, после обработки события
            // должен вызываться говорящий сам за себя метод.
            event.complete(true);
        });
    }

    //тип события - публикация сообщения.
    private boolean publishEvent(BridgeEvent event) {
        if (event.rawMessage() != null
                && event.rawMessage().getString("address").equals("chat.to.server")) {
            String message = event.rawMessage().getString("body");
            if (!verifyMessage(message))
                return false;

            String host = event.socket().remoteAddress().host();
            int port = event.socket().remoteAddress().port();

            Map<String, Object> publicNotice = createPublicNotice(host, port, message);
            vertx.eventBus().publish("chat.to.client", new Gson().toJson(publicNotice));
            return true;
        } else
            return false;
    }

    //создание уведомления о публикации сообщения.
    private Map<String, Object> createPublicNotice(String host, int port, String message) {
        Date time = Calendar.getInstance().getTime();

        Map<String, Object> notice = new TreeMap<>();
        notice.put("type", "publish");
        notice.put("time", time.toString());
        notice.put("host", host);
        notice.put("port", port);
        notice.put("message", message);
        return notice;
    }

    //тип события - регистрация обработчика.
    private void registerEvent(BridgeEvent event) {
        if (event.rawMessage() != null
                && event.rawMessage().getString("address").equals("chat.to.client"))
            new Thread(() ->
            {
                Map<String, Object> registerNotice = createRegisterNotice();
                vertx.eventBus().publish("chat.to.client", new Gson().toJson(registerNotice));
            }).start();
    }

    //создание уведомления о регистрации пользователя.
    private Map<String, Object> createRegisterNotice() {
        Map<String, Object> notice = new TreeMap<>();
        notice.put("type", "register");
        notice.put("online", online.incrementAndGet());
        return notice;
    }

    //тип события - закрытие сокета.
    private void closeEvent(BridgeEvent event) {
        new Thread(() ->
        {
            Map<String, Object> closeNotice = createCloseNotice();
            vertx.eventBus().publish("chat.to.client", new Gson().toJson(closeNotice));
        }).start();
    }

    //создание уведомления о выходе пользвателя из чата.
    private Map<String, Object> createCloseNotice() {
        Map<String, Object> notice = new TreeMap<>();
        notice.put("type", "close");
        notice.put("online", online.decrementAndGet());
        return notice;
    }

    //довольно простая проверка сообщения,
    // конечно её можно усложнить,
    // но для пример и этого достаточно ;)
    private boolean verifyMessage(String msg) {
        return msg.length() > 0
                && msg.length() <= 140;
    }
}


VerticleLoader.java
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.impl.StringEscapeUtils;

import java.io.File;
import java.io.IOException;
import java.util.function.Consumer;

public class VerticleLoader {
    private static Vertx vertx;

    public static Vertx getVertx() {
        return vertx;
    }

    public static void load() {
        load(null);
    }

    public static void load(Handler<AsyncResult<String>> completionHandler) {
        VertxOptions options = new VertxOptions().setClustered(false);
        //путь до verticle-класса.
        String dir = "chat/src/main/java/";

        try {
            File current = new File(".").getCanonicalFile();
            if (dir.startsWith(current.getName()) && !dir.equals(current.getName())) {
                dir = dir.substring(current.getName().length() + 1);
            }
        } catch (IOException e) {
        }

        System.setProperty("vertx.cwd", dir);
        String verticleID = Server.class.getName();

        Consumer<Vertx> runner = vertx ->
        {
            try {
                if (completionHandler == null)
                    vertx.deployVerticle(verticleID);
                else
                    vertx.deployVerticle(verticleID, completionHandler);
            } catch (Throwable t) {
                t.printStackTrace();
            }
        };

        if (options.isClustered()) {
            Vertx.clusteredVertx(options, res ->
            {
                if (res.succeeded()) {
                    vertx = res.result();
                    runner.accept(vertx);
                } else {
                    res.cause().printStackTrace();
                }
            });
        } else {
            vertx = Vertx.vertx(options);
            runner.accept(vertx);
        }
    }

    public static void main(String[] args) {
        load();
    }
}


ChatTest.java
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;

import java.io.IOException;

@RunWith(VertxUnitRunner.class)
public class ChatTest {
    private Vertx vertx;
    private int port = 8080;
    private Logger log = LoggerFactory.getLogger(ChatTest.class);

    //@Ignore
    @Before
    public void setUp(TestContext context) throws IOException {
        //развертывание нашей Verticle.
        VerticleLoader.load(context.asyncAssertSuccess());
        vertx = VerticleLoader.getVertx();
    }

    //@Ignore
    @After
    public void tearDown(TestContext context) {
        vertx.close(context.asyncAssertSuccess());
    }

    //@Ignore
    @Test
    public void loadVerticleTest(TestContext context) {
        log.info("*** loadVerticleTest ***");

        Async async = context.async();
        vertx.createHttpClient().getNow(port, "localhost", "/", response ->
        {
            //проверка доступности развернутого нами приложения.
            context.assertEquals(response.statusCode(), 200);
            context.assertEquals(response.headers().get("content-type"), "text/html");

            //проверка содержимого страницы.
            response.bodyHandler(body ->
            {
                context.assertTrue(body.toString().contains("<title>Chat</title>"));
                async.complete();
            });
        });
    }

    //@Ignore
    @Test
    public void eventBusTest(TestContext context) {
        log.info("*** eventBusTest ***");

        Async async = context.async();
        EventBus eb = vertx.eventBus();
        //ожидание события на шине.
        eb.consumer("chat.to.server").handler(message ->
        {
            String getMsg = message.body().toString();
            context.assertEquals(getMsg, "hello");
            async.complete();
        });

        //отправка сообщения на шину.
        eb.publish("chat.to.server", "hello");
    }
}


index.html
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Chat</title>
    <meta charset="windows-1251">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <script src="//cdn.jsdelivr.net/sockjs/0.3.4/sockjs.min.js"></script>
    <link rel="stylesheet" href="http://maxcdn.bootstrapcdn.com/bootstrap/3.3.5/css/bootstrap.min.css">
    <script src="https://ajax.googleapis.com/ajax/libs/jquery/1.11.3/jquery.min.js"></script>
    <script src="http://maxcdn.bootstrapcdn.com/bootstrap/3.3.5/js/bootstrap.min.js"></script>
    <script src="date-format.js"></script>
    <script src="vertx-eventbus.js"></script>

    <style type="text/css">
        body {
            padding-top: 40px;
            padding-bottom: 40px;
            background-color: #f5f5f5;
        }

        .received{
            width: 160px;
            font-size: 10px;
        }

        input[type=text]:focus, textarea:focus{
            box-shadow: 0 0 5px #4cae4c;
            border: 1px solid #4cae4c;
        }

        .tab-content{
            padding:5px
        }
    </style>

    <script>
        var online = 0; //счетчик онлайн-пользователей.
        var eb = new EventBus("/eventbus/"); //шина событий.

        eb.onopen = function() {
            //обработчик событий в чате.
            eb.registerHandler("chat.to.client", eventChatProcessing);
        };

        //обработчик событий в чате.
        function eventChatProcessing(err, msg) {
            var event = jQuery.parseJSON(msg.body);

			if (event.type == 'publish') {//сообщение.
				var time = Date.parse(event.time);
				var formattedTime = dateFormat(time, "dd.mm.yy HH:MM:ss");

				//добавить сообщение.
				appendMsg(event.host, event.port, event.message, formattedTime);
			} else { //изменение числа пользователей.
			    //type: register или close.
			    online = event.online;
				$('#online').text(online);
			}
        };

        //добавление нового сообщения.
		function appendMsg(host, port, message, formattedTime){
			var $msg = $('<tr bgcolor="#dff0d8"><td align="left">' + formattedTime
					+ '</td><td align="left">' + host + ' [' + port + ']'
					+ '</td><td>' + message
					+ '</td></tr>');

            var countMsg = $('#messages tr').length;
			if (countMsg == 0)
				$('#messages').append($msg);
			else
			    $('#messages > tbody > tr:first').before($msg);
		}

        $(document).ready(function() {
            //событие отправления сообщения.
            $('#chatForm').submit(function(evt) {
                evt.preventDefault();
                var message = $('#message').val();
                if (message.length > 0) {
                    //отправление сообщения на шину событий.
                    eb.publish("chat.to.server", message);
                    $('#message').val("").focus();
                    countChar();
                }
            });
        });

        //счетчик введенных символов.
        function countChar() {
            var len = $('#message').val().length;
            if (len > 140) {
                var msg = $('#message').val().substring(0, 140);
                $('#message').val(msg);
            } else {
                $('#charNum').text(140 - len);
                var per = 100 / 140 * len;
                $('#charNumProgressBar').css('width', per+'%').attr('aria-valuenow', per);
            }
        };
    </script>
</head>
<body>
    <div class="container chat-wrapper">
        <form id="chatForm">
            <h2 align="center" class="alert alert-success">CHAT ROOM</h2>
            <fieldset>
                <div class="input-group input-group-lg">
                    <span class="input-group-addon" id="onlineIco">
                        <span class="glyphicon glyphicon-eye-open"></span>
                    </span>
                    <span class="input-group-addon" id="online">
                        <span class="glyphicon glyphicon-option-horizontal"></span>
                    </span>
                    <input type="text" maxlength="141" autocomplete="off" class="form-control"
                        placeholder="What's new?" id="message" aria-describedby="sizing-addon1"
                        onkeyup="countChar()"/>
                    <span class="input-group-btn">
                        <button class="btn btn-success" type="submit">
                            <span class="glyphicon glyphicon-send"></span>
                        </button>
                    </span>
                </div>
            </fieldset>

            <h3 id="charNum">140</h3>

            <div class="progress">
                <div id="charNumProgressBar" class="progress-bar progress-bar-success active" role="progressbar"
                     aria-valuenow="0" aria-valuemin="0" aria-valuemax="100" style="width: 0%">
                    <span class="sr-only">100% Complete</span>
                </div>
            </div>

            <div class="panel panel-success">
                <div class="panel-heading"><h3>New messages</h3></div>
                    <table id="messages" class="table table-hover" width="100%">
                        <colgroup>
                            <col style="width:10%">
                            <col style="width:10%">
                            <col style="width:10%">
                        </colgroup>
                    </table>
            </div>
        </form>
    </div>
</body>
</html>



Полезные ресурсы


AdBlock has stolen the banner, but banners are not teeth — they will be back

More
Ads

Comments 7

    0
    Забавно конечно вы свободный порт определяете, после close его может кто-то успеть занять)
      0
      Стоило написать статью, как релизнулась версия 3.2.1.

      Счетчик на базе AtomicInteger будет работать корректно только в случае одного экземпляра вашего чата. Как только вы запустите несколько вертиклов чата, начнете получать некорректные значения.
        0
        Счетчик на базе AtomicInteger будет работать корректно только в случае одного экземпляра вашего чата. Как только вы запустите несколько вертиклов чата, начнете получать некорректные значения.

        Это еще почему, можно поподробней?
          0
          Потому что у каждого вертикла будет свой экземпляр этого атомика.
            0
            Совсем нет. По умолчанию кластеризация в vert.x отключена, поэтому несколько созданных инстансов чата ни как не могут повлиять друг на друга, отправляя события в EventBus.
              0
              Я вам как раз про кластеризацию.
        0
        Пару месяцев назад был на митапе, где выступал один из разработчиков вертекса. Мне он тогда очень понравился! Теперь вот благодаря вашей статье «потыкаю его палочкой».
        Спасибо!

        Only users with full accounts can post comments. Log in, please.