Как стать автором
Обновить

Высокопроизводительный SUN/ONCRPC сервер на Java NIO

Время на прочтение6 мин
Количество просмотров6.9K
В статьe о dCache рассказано о том, как использовать его в качестве NFS сервера. Но функциональной совместимости с существующими клиентами недостаточно, чтобы системой можно было пользоваться. Производительность тоже должна быть на высоте. Рабочей лошадкой NFS протокола является ONCRPC протокол. В dCache мы используем собственную реализацию, основанную на grizzly nio framework.

Немного истории для молодых

ONC RPC (Open Network Computing Remote Procedure Call) — протокол, созданный Sun Microsystems в конце 80х и опубликован в 1995г вместе с NFSv2. ONCRPC получил быстрое распространение и широко использовался, пока в начале 2000 не был вытеснен модными альтернативами, как CORBA, SOAP, а позже REST и JSON-RPC. Тем не менее, ONCRPC всё ещё используется, где простота и скорость важнее моды — в сетевых файловых системах.

Реализация


Чтобы не изобретать очередной велосипед, вначале мы использовали реализацию Remote Tea, но вскоре столкнулись с ограничениями, которые не могли легко решить: IPv6, GSSAPI, NIO. Так что велосипед пришлось изобретать, но не с нуля. Мы максимально сохранили совместимость с RemoteTea и адаптировали уже написанный код.



Grizzly-NIO

В основу мы взяли grizzly-nio, используемый в glassfish. Как и все современные NIO фраймворки, grizzly основан на обработке событий и шаблоне цепочка обязанностей. Т.е., мы описываем цепь фильтров, которые вызываются при определённом событии.

package org.glassfish.grizzly.filterchain;

import java.io.IOException;

public interface Filter {
    public void onAdded(FilterChain fc);
    public void onRemoved(FilterChain fc);
    public void onFilterChainChanged(FilterChain fc);
    public NextAction handleRead(FilterChainContext fcc) throws IOException;
    public NextAction handleWrite(FilterChainContext fcc) throws IOException;
    public NextAction handleConnect(FilterChainContext fcc) throws IOException;
    public NextAction handleAccept(FilterChainContext fcc) throws IOException;
    public NextAction handleEvent(FilterChainContext fcc, FilterChainEvent fce) throws IOException;
    public NextAction handleClose(FilterChainContext fcc) throws IOException;
    public void exceptionOccurred(FilterChainContext fcc, Throwable thrwbl);
}


Методы handleXXXX возвращают NextAction, который может быть StopAction или ContinueAction. Если фильтр возвращает StopAction, то обработка цепочки останавливается. В основном, нас интересуют handleRead и handleWrite, которые вызываются при чтении и записи сетевого соединения.

    @Override
    public NextAction handleRead(FilterChainContext ctx) throws IOException {

        Buffer messageBuffer = ctx.getMessage();
        if (!isMessageArrived(messageBuffer)) {
            // пришла только часть сообщения
            // ждём остальную часть
            return ctx.getStopAction(messageBuffer);
        }
        // читаем полное сообщение
        ctx.setMessage(getMessage(messageBuffer));
        return ctx.getInvokeAction();
    }


Боевой код
import java.io.IOException;

import java.nio.ByteOrder;
import org.glassfish.grizzly.Buffer;
import org.glassfish.grizzly.filterchain.BaseFilter;
import org.glassfish.grizzly.filterchain.FilterChainContext;
import org.glassfish.grizzly.filterchain.NextAction;
import org.glassfish.grizzly.memory.BuffersBuffer;

public class RpcMessageParserTCP extends BaseFilter {

    /**
     * RPC fragment record marker mask
     */
    private final static int RPC_LAST_FRAG = 0x80000000;
    /**
     * RPC fragment size mask
     */
    private final static int RPC_SIZE_MASK = 0x7fffffff;

    @Override
    public NextAction handleRead(FilterChainContext ctx) throws IOException {

        Buffer messageBuffer = ctx.getMessage();
        if (messageBuffer == null) {
            return ctx.getStopAction();
        }

        if (!isAllFragmentsArrived(messageBuffer)) {
            return ctx.getStopAction(messageBuffer);
        }

        ctx.setMessage(assembleXdr(messageBuffer));

        final Buffer reminder = messageBuffer.hasRemaining()
                ? messageBuffer.split(messageBuffer.position()) : null;

        return ctx.getInvokeAction(reminder);
    }

    @Override
    public NextAction handleWrite(FilterChainContext ctx) throws IOException {

        Buffer b = ctx.getMessage();
        int len = b.remaining() | RPC_LAST_FRAG;

        Buffer marker = GrizzlyMemoryManager.allocate(4);
        marker.order(ByteOrder.BIG_ENDIAN);
        marker.putInt(len);
        marker.flip();
        marker.allowBufferDispose(true);
        b.allowBufferDispose(true);
        Buffer composite = GrizzlyMemoryManager.createComposite(marker, b);
        composite.allowBufferDispose(true);
        ctx.setMessage(composite);
        return ctx.getInvokeAction();
    }

    private boolean isAllFragmentsArrived(Buffer messageBuffer) throws IOException {
        final Buffer buffer = messageBuffer.duplicate();
        buffer.order(ByteOrder.BIG_ENDIAN);

        while (buffer.remaining() >= 4) {

            int messageMarker = buffer.getInt();
            int size = getMessageSize(messageMarker);

            /*
             * fragmen size bigger than we have received
             */
            if (size > buffer.remaining()) {
                return false;
            }

            /*
             * complete fragment received
             */
            if (isLastFragment(messageMarker)) {
                return true;
            }

            /*
             * seek to the end of the current fragment
             */
            buffer.position(buffer.position() + size);
        }

        return false;
    }

    private static int getMessageSize(int marker) {
        return marker & RPC_SIZE_MASK;
    }

    private static boolean isLastFragment(int marker) {
        return (marker & RPC_LAST_FRAG) != 0;
    }

    private Xdr assembleXdr(Buffer messageBuffer) {

        Buffer currentFragment;
        BuffersBuffer multipleFragments = null;

        boolean messageComplete;
        do {
            int messageMarker = messageBuffer.getInt();

            int size = getMessageSize(messageMarker);
            messageComplete = isLastFragment(messageMarker);

            int pos = messageBuffer.position();
            currentFragment = messageBuffer.slice(pos, pos + size);
            currentFragment.limit(size);

            messageBuffer.position(pos + size);
            if (!messageComplete & multipleFragments == null) {
                /*
                 * we use composite buffer only if required
                 * as they not for free.
                 */
                multipleFragments = GrizzlyMemoryManager.create();
            }

            if (multipleFragments != null) {
                multipleFragments.append(currentFragment);
            }
        } while (!messageComplete);

        return new Xdr(multipleFragments == null ? currentFragment : multipleFragments);
    }
}


Если мы остановили цепь из-за недостатка данных, то следующий вызов handleRead будет содержать композитный буфер( состоящий из нескольких буферов).

Примитивный сервер выглядит так
    public static void main(String[] args) throws IOException {

        FilterChainBuilder filterChainBuilder = FilterChainBuilder.stateless();
        filterChainBuilder.add(new TransportFilter());
        filterChainBuilder.add(new /* здесь парсер */);
        filterChainBuilder.add(new /* здесь обработчик */);

        final TCPNIOTransport transport =
                TCPNIOTransportBuilder.newInstance().build();
        transport.setProcessor(filterChainBuilder.build());
        transport.bind(HOST, PORT);
        transport.start();
        System.in.read();
    }

На странице проекта можно найти много примеров. По умолчанию, grizzly создаст столько тредов, сколько на машине имеется процессоров. Этот подход хорошо зарекомендовал себя на практике. На машине с 24 ядрами, наш NFS сервер с лёгкостью обслуживает порядка тысячи клиентов.

Сам проект активно развивается, и команда разработчиков быстро реагирует как на сообщения об ошибках, так и на посылаемые патчи и рекомендации.

oncrpc4j

Весь ONCRPC код оформлен в виде простой для использования отдельной библиотеки. Поддерживаются два типичных варианта интеграции — сервис, встроенный в приложение или сервис, инициализируемый как Spring bean.

Встроенное приложение

import org.dcache.xdr.RpcDispatchable;
import org.dcache.xdr.RpcCall;
import org.dcache.xdr.XdrVoid;
import org.dcache.xdr.OncRpcException;

public class Svcd {
    private static final int DEFAULT_PORT = 1717;
    private static final int PROG_NUMBER = 111017;
    private static final int PROG_VERS = 1;

    public static void main(String[] args) throws Exception {
        RpcDispatchable dummy = new RpcDispatchable() {
            @Override
            public void dispatchOncRpcCall(RpcCall call)
                          throws OncRpcException, IOException {
                call.reply(XdrVoid.XDR_VOID);
            }
        };
        OncRpcSvc service = new OncRpcSvcBuilder()
                .withTCP()
                .withAutoPublish()
                .withPort(DEFAULT_PORT)
                .withSameThreadIoStrategy()
                .build();
        service.register(new OncRpcProgram(PROG_NUMBER, PROG_VERS), dummy);
        service.start();
    }
}


Интеграция со Spring

Я не боюсь XML
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans-2.5.xsd">

    <bean id="my-rpc-svc" class="me.mypackage.Svcd">
        <description>My RPC service</description>
    </bean>

     <bean id="my-rpc" class="org.dcache.xdr.OncRpcProgram">
        <description>My RPC program number</description>
        <constructor-arg index="0" value="1110001" />
        <constructor-arg index="1" value="1" />
    </bean>

    <bean id="rpcsvc-builder" class="org.dcache.xdr.OncRpcSvcFactoryBean">
        <description>Onc RPC service builder</description>
        <property name="port" value="1717"/>
        <property name="useTCP" value="true"/>
    </bean>

    <bean id="oncrpcsvc" class="org.dcache.xdr.OncRpcSvc" init-method="start" destroy-method="stop">
        <description>My RPC service</description>
        <constructor-arg ref="rpcsvc-builder"/>
        <property name="programs">
            <map>
                <entry key-ref="my-rpc" value-ref="my-rpc-svc"/>
            </map>
        </property>
    </bean>
</beans>



Производительность



Как видно из графика, код на яве не только не медленнее написанного на 'C', но и обгоняет линуксовское ядро (из-за бага, который, надеюсь, уже починили).

To steal and contribute code


Код доступен на гитхабе под LGPL лицензией.
Теги:
Хабы:
Всего голосов 13: ↑13 и ↓0+13
Комментарии0

Публикации

Истории

Работа

Java разработчик
350 вакансий

Ближайшие события