Как стать автором
Обновить
97.53
Bercut
Создаем решения для цифровизации бизнеса

OOM: direct memory при работе с сетью TCP/IP через NIO в Java

Уровень сложностиСложный
Время на прочтение8 мин
Количество просмотров3.6K

Вступление

Привет, меня зовут Денис Агапитов, я руководитель группы Platform Core компании Bercut. Работаю в компании без малого 20 лет, из них 18 пишу на Java.

Сегодня я расскажу об опыте увеличения производительности сетевого стэка и проблемах, с которыми можно столкнуться при использовании NIO в Java.

Эта статья основана на реальной практике борьбы с "OutOfMemory: direct memory" в шине данных гибридной интеграционной платформы.

Группа Platform Core, которой я руковожу, занимается разработкой и развитием гибридной интеграционной платформы, поддержкой систем и сервисов, написанных на платформе.

Платформа включает в себя:

  • Шину данных ESB.

  • Приложения API Gateway, SLES (сервер исполнения бизнес-процессов), SA Container (сервер с сервисами на Java), Notification Broker.

  • Платформенные сервисы: Scheduler, Service Profile Management и прочие.

  • Поддержку интеграции со Spring.

Итак, начнём с предпосылок, которые подвигли заняться анализом данной проблемы.

Особенности работы шины данных в Bercut

Наша гибридная интеграционная платформа имеет свою транспортную шину (RTSIB). Это ESB (enterprise service bus) в рамках архитектуры SOA (service-oriented architecture) со своими стеками HTTP и проприетарного асинхронного протокола RTSIB.

По своей сути это mesh-сеть между разными узлами и приложениями платформы.

Каждое RTSIB соединение обслуживается двумя потоками - читающим и пишущим, при этом пишущий поток вступает в игру только в том случае, если появились данные для отправки, а сокет занят отправкой другого пакета. В этом случае текущий добавляется в очередь отправки. Если же сокет свободен и доступен для записи, то запись в сокет происходит прямо с потока бизнес-логики.

HTTP соединения (на текущий момент мы поддерживаем версию 1.x) в виду синхронной архитектуры не требуют большого количества потоков обслуживания, потоки для них достаются из пула - примерно один поток на 25 соединений.

Такой подход имеет как плюсы и минусы, сегодня обсуждать мы их не будем, а просто возьмём за исходные данные, что на каждом приложении нашей платформы существует довольно большое количество потоков, которые работают с сокетами.

Особенности работы с сетью TCP/IP в неблокирующем режиме в Java

Кто уже интересовался тем, как работает запись в сокет на Java или просто любит смотреть исходные коды JDK, вероятно знает основные особенности работы с сокетом, но для понимания проблемы предлагаю ещё раз их проговорить.

Из исходных кодов Open JDK 13 (основная версия, используемой у нас Java) видно, что если записываемый ByteBuffer является DirectByteBuffer, то запись происходит сразу (writeFromNativeBuffer), а если он расположен в Heap, то сначала достаётся временный DirectByteBuffer, производится копирование и запись из временного DirectByteBuffer.

Код записи в сокет из Open JDK 13 ( IOUtil.java ):

static int write(FileDescriptor fd, ByteBuffer src, long position,
                 boolean directIO, int alignment, NativeDispatcher nd)
    throws IOException
{
    if (src instanceof DirectBuffer) {
        return writeFromNativeBuffer(fd, src, position, directIO, alignment, nd);
    }
 
    // Substitute a native buffer
    int pos = src.position();
    int lim = src.limit();
    assert (pos <= lim);
    int rem = (pos <= lim ? lim - pos : 0);
    ByteBuffer bb;
    if (directIO) {
        Util.checkRemainingBufferSizeAligned(rem, alignment);
        bb = Util.getTemporaryAlignedDirectBuffer(rem, alignment);
    } else {
        bb = Util.getTemporaryDirectBuffer(rem);
    }
    try {
        bb.put(src);
        bb.flip();
        // Do not update src until we see how many bytes were written
        src.position(pos);
 
        int n = writeFromNativeBuffer(fd, bb, position, directIO, alignment, nd);
        if (n > 0) {
            // now update src
            src.position(pos + n);
        }
        return n;
    } finally {
        Util.offerFirstTemporaryDirectBuffer(bb);
    }
}

Дополнительно осложняет ситуацию то, что внутри реализации JDK имеется КЭШ DirectByteBuffer с привязкой к потоку (ThreadLocal):

private static ThreadLocal<BufferCache> bufferCache = new TerminatingThreadLocal<>() {
    @Override
    protected BufferCache initialValue() {
        return new BufferCache();
    }
    @Override
    protected void threadTerminated(BufferCache cache) { // will never be null
        while (!cache.isEmpty()) {
            ByteBuffer bb = cache.removeFirst();
            free(bb);
        }
    }
};
 
public static ByteBuffer getTemporaryDirectBuffer(int size) {
    // If a buffer of this size is too large for the cache, there
    // should not be a buffer in the cache that is at least as
    // large. So we'll just create a new one. Also, we don't have
    // to remove the buffer from the cache (as this method does
    // below) given that we won't put the new buffer in the cache.
    if (isBufferTooLarge(size)) {
        return ByteBuffer.allocateDirect(size);
    }
 
    BufferCache cache = bufferCache.get();
    ByteBuffer buf = cache.get(size);
    if (buf != null) {
        return buf;
    } else {
        // No suitable buffer in the cache so we need to allocate a new
        // one. To avoid the cache growing then we remove the first
        // buffer from the cache and free it.
        if (!cache.isEmpty()) {
            buf = cache.removeFirst();
            free(buf);
        }
        return ByteBuffer.allocateDirect(size);
    }
}

И после каждого использования временного DirectByteBuffer, он помещается в КЭШ. При этом, если в КЭШе нет DirectByteBuffer необходимого размера, он аллоцируется и после использования также помещается в КЭШ ( Util.offerFirstTemporaryDirectBuffer(bb) ).

Суть проблемы

В первых версиях платформы мы просто использовали HeapByteBuffer через простую и понятную static-функцию ByteBuffer.wrap(byte[] data) и бед вроде как не знали.

Всё работало, скорость была достаточная для текущих telecom-сервисов, работающих на платформе, но в один прекрасный день размер данных DWH (Data Warehouse), проходящих через нашу шину достиг критического объёма в мегабайтах и мы получили OOM Direct Memory.

Почему же так произошло? А вот почему: как обозначил выше, мы имеем mesh-сеть с множеством обслуживающих потоков и имеем данные большого размера, проходящие через эти потоки, которые складывают off-heap память в ThreadLocal КЭШи этих потоков. Достигнув предела насыщения использования off-heap памяти  мы получаем OOM. Конечно, первым действием было увеличение параметра запуска JVM "-XX:MaxDirectMemorySize". Размер используемой direct памяти пришлось увеличить, потом увеличить ещё. Это стало тем самым звоночком, что с проблемой надо разбираться и как можно скорее.

Анализ возможных путей решения

После осознания проблемы, мы провели анализ возможных путей её решения и нашли следующие варианты:

  • Писать в цикле блоками, сдвигая вручную position и limit в записываемом ByteBuffer. Это должно помочь, так как в IOUtils временный DirectByteBuffer выделяется размером size = limit - position.

  • Перейти на использование ByteBuffer.allocateDirect().

  • Написать промежуточную абстракцию, содержащую нарезку из ByteBuffer одного размера, где ByteBuffer одного размера берутся из общего пула и после использования возвращаются обратно.

Погружаясь всё глубже в исследование проблемы стало понятно, что необходимо провести сравнительное тестирование разных размеров ByteBuffer и разных вариантов их использования для чтения и записи из сокета.

За несколько часов я написал тестовое приложение, которое эмулирует 4 вида работы с сокетом:

  • Аллоцирование HeapByteBuffer при каждой записи/чтении.

  • Аллоцирование DirectByteBuffer при каждой записи/чтении.

  • Переиспользование HeapByteBuffer при каждой записи/чтении.

  • Переиспользование DirectByteBuffer при каждой записи/чтении.

В тестовом приложении отсутствует маршаллинг (заполнение реальными данными), а присутствует только работа по записи и чтению из сокета с разными вариантами использования ByteBuffer.

Здесь приводить исходные коды не буду, но кто желает может ознакомится с ними на Github.

На выходе мы получили такую картину:

Из результатов видно, что до 1Mb самым медленным вариантом является аллокация DirectByteBuffer. Аллокация HeapByteBuffer через wrap и кэшированный HeapByteBuffer примерно равны с небольшим лидерством кэшированного. Из общей картины выбивается кэшированный DirectByteBuffer, что логично, так как пишется он напрямую, а время на аллокацию отсутствует.

Выбор и реализация

Для реализации выбрали 3 вариант решения проблемы: написать промежуточную абстракцию, содержащую нарезку из DirectByteBuffer одного размера, которые берутся и возвращаются в общий пул. За основной размер части пакета (размер DirectByteBuffer) было выбрано значение в 32Kb как минимальный по размеру пик при тестировании пропускной способности. Безусловно, так как у нас реализован и стэк HTTPs, фабрика может отдавать и пул с отличными от 32Kb размерами DirectByteBuffer, опираясь на PacketBufferSize и ApplicationBufferSize из настроек текущей сессии SSLEngine.

При написании слоя абстракции, названной CompositeBuffer, конечно же я реализовал и Input/Output streams, работающие напрямую с CompositeBuffer. Это было необходимо для нормальной работы слоя marshalling/unmarshalling.

В качестве хранилища уже аллоцированных DirectByteBuffer сделал простой стэк на CAS механизме:

public class CasStack<L extends LinkedObject<L>> {
 
    public interface LinkedObject<L extends LinkedObject> {
     
        public L getNext();
         
        public void setNext(L next);
         
    }
 
    private final AtomicReference<L> head = new AtomicReference<>();
 
    public void add(L lo) {
        for (;;) {
            lo.setNext(head.get());
            if (head.compareAndSet(lo.getNext(), lo)) {
                return;
            }
        }
    }
 
    public L poll() {
        L lo;
        for (;;) {
            lo = head.get();
            if (lo == null) {
                return null;
            }
            if (head.compareAndSet(lo, lo.getNext())) {
                lo.setNext(null);
                return lo;
            }
        }
 
    }
 
}

А примерно вот так выглядит часть основного класса CompositeBuffer в разрезе работы с чтением из сокета и записью в сокет (код был адаптирован для статьи):

DirectByteBuffer[] buffers;
int pos;
 
@Override
public int getBufIndex(int position) {
    return position / pool.getPartCapacity();
}
 
@Override
public int read(ReadableByteChannel channel) throws IOException {
    ensureCapacity();
    int cur = getBufIndex(pos), readed = 0, read;
    for (;;) {
        try {
            if (hasRemaining()) {
                if (buffers[cur].hasRemaining()) {
                    read = channel.read(containers[cur]);
                    if (read < 0) {
                        return readed > 0 ? readed : read;
                    }
                    pos += read;
                    readed += read;
                    if (containers[cur].hasRemaining()) {
                        return readed;
                    }
                }
                cur++;
                if (cur == containers.length) {
                    expandCapacity();
                }
            } else {
                return readed;
            }
        } catch (IOException) {
            if (readed > 0) {
                return readed;
            }
            throw ex;
        }
    }
}
 
@Override
public int write(WritableByteChannel channel) throws IOException {
    int cur = getBufIndex(pos), writed = 0, write;
    for (;;) {
        try {
            if (hasRemaining()) {
                if (buffers[cur].hasRemaining()) {
                    write = channel.write(buffers[cur]);
                    if (write < 0) {
                        return writed > 0 ? writed : write;
                    }
                    pos += write;
                    writed += write;
                    if (buffers[cur].hasRemaining()) {
                        return writed;
                    }
                }
                cur++;
                if (cur == containers.length) {
                    if (writed == 0 && hasRemaining()) {
                        release();
                        throw new CompositeBufferLifecycleError();
                    }
                    return writed;
                }
            } else {
                return writed;
            }
        } catch (IOException ex) {
            if (writed > 0) {
                return writed;
            }
            throw ex;
        }
    }
}

Конечно же пришлось написать далеко не один класс, а ещё несколько уровней абстракции, таких, как DirectContainer и механизмы addRef/releaseRef, проверку на ошибки жизненного цикла всей библиотеки и многое другое.

Заключение

По завершению оптимизации и переходу на переиспользуемый DirectByteBuffer пропускная способность шины увеличилась примерно вдвое.

До данной доработки размер off-heap памяти мог достигать 1-3Gb и складывался из максимальных размеров сообщений, прошедших через каждое соединение.

Сейчас же потребление off-heap памяти пулами довольно скромное - на среднем сервисе оно составляет всего 10-20 Mb.

На более сложном компоненте с парой сотен входящих вызовов в секунду, которые порождают до 15 тысяч внутренних вызовов на каждый входящий вызов - размер off-heap пула занимает менее 100Mb.

Важно ещё и то, что теперь off-heap память, используемая приложениями для работы с сетью более контролируема и растёт не от корреляции с количеством соединений (потоков), а только от корреляции с количеством сообщений проходящих через шину в единицу времени и средним размером сообщения.

При этом надо понимать, что размер off-heap памяти не может быть меньше максимального размера сообщения, когда-либо проходившего через узел.

Конечно, полностью исключить OOM direct memory таким решением всё равно не получится, но теперь off-heap память можно прогнозировать.

Теги:
Хабы:
Всего голосов 16: ↑15 и ↓1+18
Комментарии8

Публикации

Информация

Сайт
bercut.com
Дата регистрации
Дата основания
Численность
201–500 человек
Местоположение
Россия
Представитель
Елена