В статьe о dCache рассказано о том, как использовать его в качестве NFS сервера. Но функциональной совместимости с существующими клиентами недостаточно, чтобы системой можно было пользоваться. Производительность тоже должна быть на высоте. Рабочей лошадкой NFS протокола является ONCRPC протокол. В dCache мы используем собственную реализацию, основанную на grizzly nio framework.
ONC RPC (Open Network Computing Remote Procedure Call) — протокол, созданный Sun Microsystems в конце 80х и опубликован в 1995г вместе с NFSv2. ONCRPC получил быстрое распространение и широко использовался, пока в начале 2000 не был вытеснен модными альтернативами, как CORBA, SOAP, а позже REST и JSON-RPC. Тем не менее, ONCRPC всё ещё используется, где простота и скорость важнее моды — в сетевых файловых системах.
Чтобы не изобретать очередной велосипед, вначале мы использовали реализацию Remote Tea, но вскоре столкнулись с ограничениями, которые не могли легко решить: IPv6, GSSAPI, NIO. Так что велосипед пришлось изобретать, но не с нуля. Мы максимально сохранили совместимость с RemoteTea и адаптировали уже написанный код.
В основу мы взяли grizzly-nio, используемый в glassfish. Как и все современные NIO фраймворки, grizzly основан на обработке событий и шаблоне цепочка обязанностей. Т.е., мы описываем цепь фильтров, которые вызываются при определённом событии.
Методы handleXXXX возвращают NextAction, который может быть StopAction или ContinueAction. Если фильтр возвращает StopAction, то обработка цепочки останавливается. В основном, нас интересуют handleRead и handleWrite, которые вызываются при чтении и записи сетевого соединения.
Если мы остановили цепь из-за недостатка данных, то следующий вызов handleRead будет содержать композитный буфер( состоящий из нескольких буферов).
Примитивный сервер выглядит так
На странице проекта можно найти много примеров. По умолчанию, grizzly создаст столько тредов, сколько на машине имеется процессоров. Этот подход хорошо зарекомендовал себя на практике. На машине с 24 ядрами, наш NFS сервер с лёгкостью обслуживает порядка тысячи клиентов.
Сам проект активно развивается, и команда разработчиков быстро реагирует как на сообщения об ошибках, так и на посылаемые патчи и рекомендации.
Весь ONCRPC код оформлен в виде простой для использования отдельной библиотеки. Поддерживаются два типичных варианта интеграции — сервис, встроенный в приложение или сервис, инициализируемый как Spring bean.
Как видно из графика, код на яве не только не медленнее написанного на 'C', но и обгоняет линуксовское ядро (из-за бага, который, надеюсь, уже починили).
Код доступен на гитхабе под LGPL лицензией.
Немного истории для молодых
ONC RPC (Open Network Computing Remote Procedure Call) — протокол, созданный Sun Microsystems в конце 80х и опубликован в 1995г вместе с NFSv2. ONCRPC получил быстрое распространение и широко использовался, пока в начале 2000 не был вытеснен модными альтернативами, как CORBA, SOAP, а позже REST и JSON-RPC. Тем не менее, ONCRPC всё ещё используется, где простота и скорость важнее моды — в сетевых файловых системах.
Реализация
Чтобы не изобретать очередной велосипед, вначале мы использовали реализацию Remote Tea, но вскоре столкнулись с ограничениями, которые не могли легко решить: IPv6, GSSAPI, NIO. Так что велосипед пришлось изобретать, но не с нуля. Мы максимально сохранили совместимость с RemoteTea и адаптировали уже написанный код.
Grizzly-NIO
В основу мы взяли grizzly-nio, используемый в glassfish. Как и все современные NIO фраймворки, grizzly основан на обработке событий и шаблоне цепочка обязанностей. Т.е., мы описываем цепь фильтров, которые вызываются при определённом событии.
package org.glassfish.grizzly.filterchain;
import java.io.IOException;
public interface Filter {
public void onAdded(FilterChain fc);
public void onRemoved(FilterChain fc);
public void onFilterChainChanged(FilterChain fc);
public NextAction handleRead(FilterChainContext fcc) throws IOException;
public NextAction handleWrite(FilterChainContext fcc) throws IOException;
public NextAction handleConnect(FilterChainContext fcc) throws IOException;
public NextAction handleAccept(FilterChainContext fcc) throws IOException;
public NextAction handleEvent(FilterChainContext fcc, FilterChainEvent fce) throws IOException;
public NextAction handleClose(FilterChainContext fcc) throws IOException;
public void exceptionOccurred(FilterChainContext fcc, Throwable thrwbl);
}
Методы handleXXXX возвращают NextAction, который может быть StopAction или ContinueAction. Если фильтр возвращает StopAction, то обработка цепочки останавливается. В основном, нас интересуют handleRead и handleWrite, которые вызываются при чтении и записи сетевого соединения.
@Override
public NextAction handleRead(FilterChainContext ctx) throws IOException {
Buffer messageBuffer = ctx.getMessage();
if (!isMessageArrived(messageBuffer)) {
// пришла только часть сообщения
// ждём остальную часть
return ctx.getStopAction(messageBuffer);
}
// читаем полное сообщение
ctx.setMessage(getMessage(messageBuffer));
return ctx.getInvokeAction();
}
Боевой код
import java.io.IOException;
import java.nio.ByteOrder;
import org.glassfish.grizzly.Buffer;
import org.glassfish.grizzly.filterchain.BaseFilter;
import org.glassfish.grizzly.filterchain.FilterChainContext;
import org.glassfish.grizzly.filterchain.NextAction;
import org.glassfish.grizzly.memory.BuffersBuffer;
public class RpcMessageParserTCP extends BaseFilter {
/**
* RPC fragment record marker mask
*/
private final static int RPC_LAST_FRAG = 0x80000000;
/**
* RPC fragment size mask
*/
private final static int RPC_SIZE_MASK = 0x7fffffff;
@Override
public NextAction handleRead(FilterChainContext ctx) throws IOException {
Buffer messageBuffer = ctx.getMessage();
if (messageBuffer == null) {
return ctx.getStopAction();
}
if (!isAllFragmentsArrived(messageBuffer)) {
return ctx.getStopAction(messageBuffer);
}
ctx.setMessage(assembleXdr(messageBuffer));
final Buffer reminder = messageBuffer.hasRemaining()
? messageBuffer.split(messageBuffer.position()) : null;
return ctx.getInvokeAction(reminder);
}
@Override
public NextAction handleWrite(FilterChainContext ctx) throws IOException {
Buffer b = ctx.getMessage();
int len = b.remaining() | RPC_LAST_FRAG;
Buffer marker = GrizzlyMemoryManager.allocate(4);
marker.order(ByteOrder.BIG_ENDIAN);
marker.putInt(len);
marker.flip();
marker.allowBufferDispose(true);
b.allowBufferDispose(true);
Buffer composite = GrizzlyMemoryManager.createComposite(marker, b);
composite.allowBufferDispose(true);
ctx.setMessage(composite);
return ctx.getInvokeAction();
}
private boolean isAllFragmentsArrived(Buffer messageBuffer) throws IOException {
final Buffer buffer = messageBuffer.duplicate();
buffer.order(ByteOrder.BIG_ENDIAN);
while (buffer.remaining() >= 4) {
int messageMarker = buffer.getInt();
int size = getMessageSize(messageMarker);
/*
* fragmen size bigger than we have received
*/
if (size > buffer.remaining()) {
return false;
}
/*
* complete fragment received
*/
if (isLastFragment(messageMarker)) {
return true;
}
/*
* seek to the end of the current fragment
*/
buffer.position(buffer.position() + size);
}
return false;
}
private static int getMessageSize(int marker) {
return marker & RPC_SIZE_MASK;
}
private static boolean isLastFragment(int marker) {
return (marker & RPC_LAST_FRAG) != 0;
}
private Xdr assembleXdr(Buffer messageBuffer) {
Buffer currentFragment;
BuffersBuffer multipleFragments = null;
boolean messageComplete;
do {
int messageMarker = messageBuffer.getInt();
int size = getMessageSize(messageMarker);
messageComplete = isLastFragment(messageMarker);
int pos = messageBuffer.position();
currentFragment = messageBuffer.slice(pos, pos + size);
currentFragment.limit(size);
messageBuffer.position(pos + size);
if (!messageComplete & multipleFragments == null) {
/*
* we use composite buffer only if required
* as they not for free.
*/
multipleFragments = GrizzlyMemoryManager.create();
}
if (multipleFragments != null) {
multipleFragments.append(currentFragment);
}
} while (!messageComplete);
return new Xdr(multipleFragments == null ? currentFragment : multipleFragments);
}
}
Если мы остановили цепь из-за недостатка данных, то следующий вызов handleRead будет содержать композитный буфер( состоящий из нескольких буферов).
Примитивный сервер выглядит так
public static void main(String[] args) throws IOException {
FilterChainBuilder filterChainBuilder = FilterChainBuilder.stateless();
filterChainBuilder.add(new TransportFilter());
filterChainBuilder.add(new /* здесь парсер */);
filterChainBuilder.add(new /* здесь обработчик */);
final TCPNIOTransport transport =
TCPNIOTransportBuilder.newInstance().build();
transport.setProcessor(filterChainBuilder.build());
transport.bind(HOST, PORT);
transport.start();
System.in.read();
}
На странице проекта можно найти много примеров. По умолчанию, grizzly создаст столько тредов, сколько на машине имеется процессоров. Этот подход хорошо зарекомендовал себя на практике. На машине с 24 ядрами, наш NFS сервер с лёгкостью обслуживает порядка тысячи клиентов.
Сам проект активно развивается, и команда разработчиков быстро реагирует как на сообщения об ошибках, так и на посылаемые патчи и рекомендации.
oncrpc4j
Весь ONCRPC код оформлен в виде простой для использования отдельной библиотеки. Поддерживаются два типичных варианта интеграции — сервис, встроенный в приложение или сервис, инициализируемый как Spring bean.
Встроенное приложение
import org.dcache.xdr.RpcDispatchable;
import org.dcache.xdr.RpcCall;
import org.dcache.xdr.XdrVoid;
import org.dcache.xdr.OncRpcException;
public class Svcd {
private static final int DEFAULT_PORT = 1717;
private static final int PROG_NUMBER = 111017;
private static final int PROG_VERS = 1;
public static void main(String[] args) throws Exception {
RpcDispatchable dummy = new RpcDispatchable() {
@Override
public void dispatchOncRpcCall(RpcCall call)
throws OncRpcException, IOException {
call.reply(XdrVoid.XDR_VOID);
}
};
OncRpcSvc service = new OncRpcSvcBuilder()
.withTCP()
.withAutoPublish()
.withPort(DEFAULT_PORT)
.withSameThreadIoStrategy()
.build();
service.register(new OncRpcProgram(PROG_NUMBER, PROG_VERS), dummy);
service.start();
}
}
Интеграция со Spring
Я не боюсь XML
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd">
<bean id="my-rpc-svc" class="me.mypackage.Svcd">
<description>My RPC service</description>
</bean>
<bean id="my-rpc" class="org.dcache.xdr.OncRpcProgram">
<description>My RPC program number</description>
<constructor-arg index="0" value="1110001" />
<constructor-arg index="1" value="1" />
</bean>
<bean id="rpcsvc-builder" class="org.dcache.xdr.OncRpcSvcFactoryBean">
<description>Onc RPC service builder</description>
<property name="port" value="1717"/>
<property name="useTCP" value="true"/>
</bean>
<bean id="oncrpcsvc" class="org.dcache.xdr.OncRpcSvc" init-method="start" destroy-method="stop">
<description>My RPC service</description>
<constructor-arg ref="rpcsvc-builder"/>
<property name="programs">
<map>
<entry key-ref="my-rpc" value-ref="my-rpc-svc"/>
</map>
</property>
</bean>
</beans>
Производительность
Как видно из графика, код на яве не только не медленнее написанного на 'C', но и обгоняет линуксовское ядро (из-за бага, который, надеюсь, уже починили).
To steal and contribute code
Код доступен на гитхабе под LGPL лицензией.