
Вступление
Привет, меня зовут Денис Агапитов, я руководитель группы Platform Core компании Bercut. Работаю в компании без малого 20 лет, из них 18 пишу на Java.
Сегодня я расскажу об опыте увеличения производительности сетевого стэка и проблемах, с которыми можно столкнуться при использовании NIO в Java.
Эта статья основана на реальной практике борьбы с "OutOfMemory: direct memory" в шине данных гибридной интеграционной платформы.
Группа Platform Core, которой я руковожу, занимается разработкой и развитием гибридной интеграционной платформы, поддержкой систем и сервисов, написанных на платформе.
Платформа включает в себя:
Шину данных ESB.
Приложения API Gateway, SLES (сервер исполнения бизнес-процессов), SA Container (сервер с сервисами на Java), Notification Broker.
Платформенные сервисы: Scheduler, Service Profile Management и прочие.
Поддержку интеграции со Spring.
Итак, начнём с предпосылок, которые подвигли заняться анализом данной проблемы.
Особенности работы шины данных в Bercut
Наша гибридная интеграционная платформа имеет свою транспортную шину (RTSIB). Это ESB (enterprise service bus) в рамках архитектуры SOA (service-oriented architecture) со своими стеками HTTP и проприетарного асинхронного протокола RTSIB.
По своей сути это mesh-сеть между разными узлами и приложениями платформы.
Каждое RTSIB соединение обслуживается двумя потоками - читающим и пишущим, при этом пишущий поток вступает в игру только в том случае, если появились данные для отправки, а сокет занят отправкой другого пакета. В этом случае текущий добавляется в очередь отправки. Если же сокет свободен и доступен для записи, то запись в сокет происходит прямо с потока бизнес-логики.
HTTP соединения (на текущий момент мы поддерживаем версию 1.x) в виду синхронной архитектуры не требуют большого количества потоков обслуживания, потоки для них достаются из пула - примерно один поток на 25 соединений.
Такой подход имеет как плюсы и минусы, сегодня обсуждать мы их не будем, а просто возьмём за исходные данные, что на каждом приложении нашей платформы существует довольно большое количество потоков, которые работают с сокетами.
Особенности работы с сетью TCP/IP в неблокирующем режиме в Java
Кто уже интересовался тем, как работает запись в сокет на Java или просто любит смотреть исходные коды JDK, вероятно знает основные особенности работы с сокетом, но для понимания проблемы предлагаю ещё раз их проговорить.
Из исходных кодов Open JDK 13 (основная версия, используемой у нас Java) видно, что если записываемый ByteBuffer является DirectByteBuffer, то запись происходит сразу (writeFromNativeBuffer), а если он расположен в Heap, то сначала достаётся временный DirectByteBuffer, производится копирование и запись из временного DirectByteBuffer.
Код записи в сокет из Open JDK 13 ( IOUtil.java ):
static int write(FileDescriptor fd, ByteBuffer src, long position,
boolean directIO, int alignment, NativeDispatcher nd)
throws IOException
{
if (src instanceof DirectBuffer) {
return writeFromNativeBuffer(fd, src, position, directIO, alignment, nd);
}
// Substitute a native buffer
int pos = src.position();
int lim = src.limit();
assert (pos <= lim);
int rem = (pos <= lim ? lim - pos : 0);
ByteBuffer bb;
if (directIO) {
Util.checkRemainingBufferSizeAligned(rem, alignment);
bb = Util.getTemporaryAlignedDirectBuffer(rem, alignment);
} else {
bb = Util.getTemporaryDirectBuffer(rem);
}
try {
bb.put(src);
bb.flip();
// Do not update src until we see how many bytes were written
src.position(pos);
int n = writeFromNativeBuffer(fd, bb, position, directIO, alignment, nd);
if (n > 0) {
// now update src
src.position(pos + n);
}
return n;
} finally {
Util.offerFirstTemporaryDirectBuffer(bb);
}
}
Дополнительно осложняет ситуацию то, что внутри реализации JDK имеется КЭШ DirectByteBuffer с привязкой к потоку (ThreadLocal):
private static ThreadLocal<BufferCache> bufferCache = new TerminatingThreadLocal<>() {
@Override
protected BufferCache initialValue() {
return new BufferCache();
}
@Override
protected void threadTerminated(BufferCache cache) { // will never be null
while (!cache.isEmpty()) {
ByteBuffer bb = cache.removeFirst();
free(bb);
}
}
};
public static ByteBuffer getTemporaryDirectBuffer(int size) {
// If a buffer of this size is too large for the cache, there
// should not be a buffer in the cache that is at least as
// large. So we'll just create a new one. Also, we don't have
// to remove the buffer from the cache (as this method does
// below) given that we won't put the new buffer in the cache.
if (isBufferTooLarge(size)) {
return ByteBuffer.allocateDirect(size);
}
BufferCache cache = bufferCache.get();
ByteBuffer buf = cache.get(size);
if (buf != null) {
return buf;
} else {
// No suitable buffer in the cache so we need to allocate a new
// one. To avoid the cache growing then we remove the first
// buffer from the cache and free it.
if (!cache.isEmpty()) {
buf = cache.removeFirst();
free(buf);
}
return ByteBuffer.allocateDirect(size);
}
}
И после каждого использования временного DirectByteBuffer, он помещается в КЭШ. При этом, если в КЭШе нет DirectByteBuffer необходимого размера, он аллоцируется и после использования также помещается в КЭШ ( Util.offerFirstTemporaryDirectBuffer(bb) ).
Суть проблемы
В первых версиях платформы мы просто использовали HeapByteBuffer через простую и понятную static-функцию ByteBuffer.wrap(byte[] data) и бед вроде как не знали.
Всё работало, скорость была достаточная для текущих telecom-сервисов, работающих на платформе, но в один прекрасный день размер данных DWH (Data Warehouse), проходящих через нашу шину достиг критического объёма в мегабайтах и мы получили OOM Direct Memory.
Почему же так произошло? А вот почему: как обозначил выше, мы имеем mesh-сеть с множеством обслуживающих потоков и имеем данные большого размера, проходящие через эти потоки, которые складывают off-heap память в ThreadLocal КЭШи этих потоков. Достигнув предела насыщения использования off-heap памяти мы получаем OOM. Конечно, первым действием было увеличение параметра запуска JVM "-XX:MaxDirectMemorySize". Размер используемой direct памяти пришлось увеличить, потом увеличить ещё. Это стало тем самым звоночком, что с проблемой надо разбираться и как можно скорее.
Анализ возможных путей решения
После осознания проблемы, мы провели анализ возможных путей её решения и нашли следующие варианты:
Писать в цикле блоками, сдвигая вручную position и limit в записываемом ByteBuffer. Это должно помочь, так как в IOUtils временный DirectByteBuffer выделяется размером size = limit - position.
Перейти на использование ByteBuffer.allocateDirect().
Написать промежуточную абстракцию, содержащую нарезку из ByteBuffer одного размера, где ByteBuffer одного размера берутся из общего пула и после использования возвращаются обратно.
Погружаясь всё глубже в исследование проблемы стало понятно, что необходимо провести сравнительное тестирование разных размеров ByteBuffer и разных вариантов их использования для чтения и записи из сокета.
За несколько часов я написал тестовое приложение, которое эмулирует 4 вида работы с сокетом:
Аллоцирование HeapByteBuffer при каждой записи/чтении.
Аллоцирование DirectByteBuffer при каждой записи/чтении.
Переиспользование HeapByteBuffer при каждой записи/чтении.
Переиспользование DirectByteBuffer при каждой записи/чтении.
В тестовом приложении отсутствует маршаллинг (заполнение реальными данными), а присутствует только работа по записи и чтению из сокета с разными вариантами использования ByteBuffer.
Здесь приводить исходные коды не буду, но кто желает может ознакомится с ними на Github.
На выходе мы получили такую картину:

Из результатов видно, что до 1Mb самым медленным вариантом является аллокация DirectByteBuffer. Аллокация HeapByteBuffer через wrap и кэшированный HeapByteBuffer примерно равны с небольшим лидерством кэшированного. Из общей картины выбивается кэшированный DirectByteBuffer, что логично, так как пишется он напрямую, а время на аллокацию отсутствует.
Выбор и реализация
Для реализации выбрали 3 вариант решения проблемы: написать промежуточную абстракцию, содержащую нарезку из DirectByteBuffer одного размера, которые берутся и возвращаются в общий пул. За основной размер части пакета (размер DirectByteBuffer) было выбрано значение в 32Kb как минимальный по размеру пик при тестировании пропускной способности. Безусловно, так как у нас реализован и стэк HTTPs, фабрика может отдавать и пул с отличными от 32Kb размерами DirectByteBuffer, опираясь на PacketBufferSize и ApplicationBufferSize из настроек текущей сессии SSLEngine.
При написании слоя абстракции, названной CompositeBuffer, конечно же я реализовал и Input/Output streams, работающие напрямую с CompositeBuffer. Это было необходимо для нормальной работы слоя marshalling/unmarshalling.
В качестве хранилища уже аллоцированных DirectByteBuffer сделал простой стэк на CAS механизме:
public class CasStack<L extends LinkedObject<L>> {
public interface LinkedObject<L extends LinkedObject> {
public L getNext();
public void setNext(L next);
}
private final AtomicReference<L> head = new AtomicReference<>();
public void add(L lo) {
for (;;) {
lo.setNext(head.get());
if (head.compareAndSet(lo.getNext(), lo)) {
return;
}
}
}
public L poll() {
L lo;
for (;;) {
lo = head.get();
if (lo == null) {
return null;
}
if (head.compareAndSet(lo, lo.getNext())) {
lo.setNext(null);
return lo;
}
}
}
}
А примерно вот так выглядит часть основного класса CompositeBuffer в разрезе работы с чтением из сокета и записью в сокет (код был адаптирован для статьи):
DirectByteBuffer[] buffers;
int pos;
@Override
public int getBufIndex(int position) {
return position / pool.getPartCapacity();
}
@Override
public int read(ReadableByteChannel channel) throws IOException {
ensureCapacity();
int cur = getBufIndex(pos), readed = 0, read;
for (;;) {
try {
if (hasRemaining()) {
if (buffers[cur].hasRemaining()) {
read = channel.read(containers[cur]);
if (read < 0) {
return readed > 0 ? readed : read;
}
pos += read;
readed += read;
if (containers[cur].hasRemaining()) {
return readed;
}
}
cur++;
if (cur == containers.length) {
expandCapacity();
}
} else {
return readed;
}
} catch (IOException) {
if (readed > 0) {
return readed;
}
throw ex;
}
}
}
@Override
public int write(WritableByteChannel channel) throws IOException {
int cur = getBufIndex(pos), writed = 0, write;
for (;;) {
try {
if (hasRemaining()) {
if (buffers[cur].hasRemaining()) {
write = channel.write(buffers[cur]);
if (write < 0) {
return writed > 0 ? writed : write;
}
pos += write;
writed += write;
if (buffers[cur].hasRemaining()) {
return writed;
}
}
cur++;
if (cur == containers.length) {
if (writed == 0 && hasRemaining()) {
release();
throw new CompositeBufferLifecycleError();
}
return writed;
}
} else {
return writed;
}
} catch (IOException ex) {
if (writed > 0) {
return writed;
}
throw ex;
}
}
}
Конечно же пришлось написать далеко не один класс, а ещё несколько уровней абстракции, таких, как DirectContainer и механизмы addRef/releaseRef, проверку на ошибки жизненного цикла всей библиотеки и многое другое.
Заключение
По завершению оптимизации и переходу на переиспользуемый DirectByteBuffer пропускная способность шины увеличилась примерно вдвое.
До данной доработки размер off-heap памяти мог достигать 1-3Gb и складывался из максимальных размеров сообщений, прошедших через каждое соединение.
Сейчас же потребление off-heap памяти пулами довольно скромное - на среднем сервисе оно составляет всего 10-20 Mb.
На более сложном компоненте с парой сотен входящих вызовов в секунду, которые порождают до 15 тысяч внутренних вызовов на каждый входящий вызов - размер off-heap пула занимает менее 100Mb.
Важно ещё и то, что теперь off-heap память, используемая приложениями для работы с сетью более контролируема и растёт не от корреляции с количеством соединений (потоков), а только от корреляции с количеством сообщений проходящих через шину в единицу времени и средним размером сообщения.
При этом надо понимать, что размер off-heap памяти не может быть меньше максимального размера сообщения, когда-либо проходившего через узел.
Конечно, полностью исключить OOM direct memory таким решением всё равно не получится, но теперь off-heap память можно прогнозировать.