Проект на GitHub находится в стадии ранней альфы. Эта статья дополняет README проекта.

Сетевые события можно записывать и вычитывать из стороннего хранилища по своему вкусу. Каждое событие выглядит так:

public interface NetworkEvent {
    UUID getConnectionId();
    int getSerial();
    EventType getType();
    byte[] getPayload();
}

Они рассказывают о некотором сетевом изменении (соединение открылось, пришли данные, соединение закрылось или оборвано) и поддаются обработке через Consumer<NetworkEvent>, причём обработчики можно выстраивать в цепочки и лабиринты для фильтрации, троттлинга, распараллеливания обработки и т.п.

Если два процесса умеют записывать такие события в хранилище вычитывать записанное с другой стороны - между ними налажено взаимодействие, при том, что прямых сетевых подключений друг к другу они не производят.

Пример использования этого очевиднейшего принципа - HTTP-прокси. Один процесс - приёмник прокси, "мама", то, что слушает на порту 3128 обычно. Он соединения принимает, но вместо того, чтобы как классический прокси тут же обратиться на запрошенные адреса и передать им полученные запросы - сохраняет запросы в виде NetworkEvent и кладёт в хранилище. А в это время совсем в другом месте передатчик прокси, "папа", уже вычитывает запросы из хранилища, открывает соединения на своём сетевом стеке, получает ответы и передаёт их через NetworkEvent и хранилище "маме".

Вся суть моего прототипа в том, что транспорт (чтение и запись хранилища) пишет пользователь под свои сиюминутные нужды. Я приведу пример транспорта на основе H2SQL практически целиком. Чтение:

public class H2Retrieval extends PollingRetrievalSupport {
    private final String connectionUrl;
    private final String tableName;
    private final Consumer<NetworkEvent> targetConsumer;
    private final AtomicLong lastProcessedId = new AtomicLong();

    public H2Retrieval(String databasePath, String tableName, Consumer<NetworkEvent> targetConsumer, long pollIntervalMs) {
        super(pollIntervalMs);
        this.connectionUrl = String.format(
                "jdbc:h2:%s;AUTO_SERVER=TRUE;DB_CLOSE_DELAY=-1;LOCK_TIMEOUT=10000",
                Paths.get(databasePath).toAbsolutePath()
        );
        this.tableName = tableName;
        this.targetConsumer = targetConsumer;
    }

    protected void poll() {
        String selectSQL = String.format("""
                SELECT id, connection_id, serial, event_type, payload 
                FROM %s 
                WHERE id > ? 
                ORDER BY id ASC
                """, tableName);

        try (Connection conn = DriverManager.getConnection(connectionUrl);
             PreparedStatement pstmt = conn.prepareStatement(selectSQL)) {
            pstmt.setLong(1, lastProcessedId.get());
            long eventId = lastProcessedId.get();
            try (ResultSet rs = pstmt.executeQuery()) {
                while (rs.next()) {
                    eventId = rs.getLong("id");
                    NetworkEvent event = extractEvent(rs);

                    targetConsumer.accept(event);

                }
            } finally {
                long finalEventId = eventId;
                lastProcessedId.updateAndGet(val -> Math.max(val, finalEventId));
            }
        } catch (SQLException e) {
            throw new RuntimeException("Failed to poll events", e);
        }
    }

    private NetworkEvent extractEvent(ResultSet rs) throws SQLException {
        UUID connectionId = (UUID) rs.getObject("connection_id");
        int serial = rs.getInt("serial");
        EventType type = EventType.valueOf(rs.getString("event_type"));
        byte[] payload = rs.getBytes("payload");
        return NetworkEvent.create(connectionId, serial, type, payload);
    }
}

И запись:

public class H2Storage implements Consumer<NetworkEvent> {
    private final String connectionUrl;
    private final String tableName;

    public H2Storage(String databasePath, String tableName) {
        this.connectionUrl = String.format(
                "jdbc:h2:%s;AUTO_SERVER=TRUE;DB_CLOSE_DELAY=-1;LOCK_TIMEOUT=10000",
                Paths.get(databasePath).toAbsolutePath()
        );
        this.tableName = tableName;
    }

    @Override
    public void accept(NetworkEvent event) {
        String insertSQL = String.format("""
            INSERT INTO %s (connection_id, serial, event_type, payload) 
            VALUES (?, ?, ?, ?)
            """, tableName);

        try (Connection conn = DriverManager.getConnection(connectionUrl);
             PreparedStatement pstmt = conn.prepareStatement(insertSQL)) {

            pstmt.setObject(1, event.getConnectionId());
            pstmt.setInt(2, event.getSerial());
            pstmt.setString(3, event.getType().name());
            pstmt.setBytes(4, event.getPayload());
            pstmt.executeUpdate();

        } catch (SQLException e) {
            throw new RuntimeException("Failed to store network event", e);
        }
    }
}

На этом, в общем-то, всё. Я надеюсь, что этот интерфейс не будет становиться сложнее. Он может стать даже проще, если Muxalma посредством своих цепочек преобразователей станет умнее - научится компенсировать потерю пакетов, шейпингу трафика, backpressure и т.п. Реализации этих интерфейсов достаточно, чтобы заработал HTTP-прокси на основе Netty через хранилище, избранное вами. Отслеживание судьбы HTTP-соединений и мультиплекс возьмёт на себя Муксалма.

И здесь вы можете сказать - господин хороший, не ломитесь ли вы в открытую дверь со своей поделкой из трёх с половиной классов? На это я отвечу, что хотел начать именно эту дискуссию, т.к. дверь такую сходу не нашёл, а ещё я писатель, а не читатель, и широко известное в узких кругах всеми признанное решение просто не найду.