Преамбула
Здравствуйте. Я являюсь главным разработчиком крупнейшего в СНГ сервера Minecraft (не буду рекламировать, кому надо, те знают). Уже почти год мы пишем свою реализацию сервера, рассчитанную на больше чем 40 человек (мы хотим видеть цифру в 500 хотя бы). Пока всё было удачно, но последнее время система начала упираться в то, что из-за не самой удачной реализации сети (1 поток на ввод, 1 на вывод + 1 на обработку), при 300 игроках онлайн работает более 980 потоков (+ системные), что в сочетании с производительностью дефолтного io Явы даёт огромное падение производительности, и уже при 100 игроках сервер в основном занимается тем, что пишет/читает в/из сети.
Поэтому я решила переходить на NIO. В руки совершенно случайно попала библиотека Netty, структура которой показалась просто идеально подходящей для того, чтобы встроить её в уже готовое работающее решение. К сожалению, мануалов по Netty мало не только на русском, но и на английском языках, поэтому приходилось много экспериментировать и лазить в код библиотеки, чтобы найти лучший способ.
Здесь я постараюсь расписать серверную часть работы с сетью через Netty, может быть это кому-то будет полезно.
Создание сервера
ExecutorService bossExec = new OrderedMemoryAwareThreadPoolExecutor(1, 400000000, 2000000000, 60, TimeUnit.SECONDS);
ExecutorService ioExec = new OrderedMemoryAwareThreadPoolExecutor(4 /* число рабочих потоков */, 400000000, 2000000000, 60, TimeUnit.SECONDS);
ServerBootstrap networkServer = new ServerBootstrap(new NioServerSocketChannelFactory(bossExec, ioExec, 4 /* то же самое число рабочих потоков */));
networkServer.setOption("backlog", 500);
networkServer.setOption("connectTimeoutMillis", 10000);
networkServer.setPipelineFactory(new ServerPipelineFactory());
Channel channel = networkServer.bind(new InetSocketAddress(address, port));
Используется OrderedMemoryAwareThreadPoolExecutor для выполнения задач Netty, по опыту французских коллег они самые эффективные. Можно использовать другие Executor'ы, например Executors.newFixedThreadPool(n). Ни в коем случае не используйте Executors.newCachedThreadPool(), он создаёт неоправданно много потоков и ни какого выигрыша от Netty почти нет. Использовать более 4 рабочих потоков нет смысла, т.к. они более чем справляются с огромной нагрузкой (программисты из Xebia-France на 4 потоках тянули более 100 000 одновременных подключений). Босс-потоки должны быть по одному на каждый слушаемый порт. Channel, который возвращает функция bind, а так же ServerBootsrap необходимо сохранить, чтобы потом можно было остановить сервер.
PipelineFactory
То, как будут обрабатываться подключения и пакеты клиента, определяет PipelineFactory, которая при открытии канала с клиентом создаёт для него pipeline, в котором определены обработчики событий, которые происходят на канале. В нашем случае, это ServerPipelineFactory:
public class ServerPipelineFactory implements ChannelPipelineFactory {
@Override
public ChannelPipeline getPipeline() throws Exception {
PacketFrameDecoder decoder = new PacketFrameDecoder();
PacketFrameEncoder encoder = new PacketFrameEncoder();
return Channels.pipeline(decoder, encoder, new PlayerHandler(decoder, encoder));
}
}
В данном коде PacketFrameDecoder, PacketFrameEncoder и PlayerHandler — обработчки событий, которые мы определяем. Функция Channels.pipeline() создаёт новый pipeline с переданными ей обработчиками. Будьте внимательны: события проходят обработчики в том порядке, в котором Вы передали из функции pipeline!
Протокол
Немного опишу протокол, чтобы дальше было понятно.
Обмен данными происходит с помощью объектов классов, расширяющих класс Packet, в которых определены две функции, get(ChannelBuffer input) и send(ChannelBuffer output). Соответственно, первая функция читает необходимые данные из канала, вторая — пишет данные пакета в канал.
public abstract class Packet {
public static Packet read(ChannelBuffer buffer) throws IOException {
int id = buffer.readUnsignedShort(); // Получаем ID пришедшего пакета, чтобы определить, каким классом его читать
Packet packet = getPacket(id); // Получаем инстанс пакета с этим ID
if(packet == null)
throw new IOException("Bad packet ID: " + id); // Если произошла ошибка и такого пакета не может быть, генерируем исключение
packet.get(buffer); // Читаем в пакет данные из буфера
return packet;
}
public statuc Packet write(Packet packet, ChannelBuffer buffer) {
buffer.writeChar(packet.getId()); // Отправляем ID пакета
packet.send(buffer); // Отправляем данные пакета
}
// Функции, которые должен реализовать каждый класс пакета
public abstract void get(ChannelBuffer buffer);
public abstract void send(ChannelBuffer buffer);
}
Пример пары пакетов для наглядности:
// Пакет, которым клиент передаёт серверу свой логин
public class Packet1Login extends Packet {
public String login;
public void get(ChannelBuffer buffer) {
int length = buffer.readShort();
StringBuilder builder = new StringBuilder();
for(int i = 0; i < length ++i)
builder.append(buffer.readChar());
login = builder.toString();
}
public void send(ChannelBuffer buffer) {
// Тело отправки пустое, т.к. сервер не посылает этот пакет
}
}
// Пакет, которым сервер выкидывает клиента с указаной причиной, или клиент отключается от сервера
public class Packet255KickDisconnect extends Packet {
public String reason;
public void get(ChannelBuffer buffer) {
int length = buffer.readShort();
StringBuilder builder = new StringBuilder();
for(int i = 0; i < length ++i)
builder.append(buffer.readChar());
reason = builder.toString();
}
public void send(ChannelBuffer buffer) {
buffer.writeShort(reason.length());
for(int i = 0; i < reason.length(); ++i) {
buffer.writeChar(reason.getCharAt(i));
}
}
}
ChannelBuffer очень похож на DataInputStream и DataOutputStream в одном лице. Большинство функций если не такие же, то очень похожи. Заметьте, что я не забочусь о проверке того, хватает ли в буфере байт для чтения, как будто я работаю с блокирующим IO. Об этом далее…
Работа с клиентом
Работа с клиентом в основном определяется классом PlayerHandler:
public class PlayerHandler extends SimpleChannelUpstreamHandler {
private PlayerWorkerThread worker;
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
// Событие вызывается при подключении клиента. Я создаю здесь Worker игрока — объект, который занимается обработкой данных игрока непостредственно.
// Я передаю ему канал игрока (функция e.getChannel()), чтобы он мог в него посылать пакеты
worker = new PlayerWorkerThread(this, e.getChannel());
}
@Override
public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
// Событие закрытия канала. Используется в основном, чтобы освободить ресурсы, или выполнить другие действия, которые происходят при отключении пользователя. Если его не обработать, Вы можете и не заметить, что пользователь отключился, если он напрямую не сказал этого серверу, а просто оборвался канал.
worker.disconnectedFromChannel();
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
// Функция принимает уже готовые Packet'ы от игрока, поэтому их можно сразу посылать в worker. За их формирование отвечает другой обработчик.
if(e.getChannel().isOpen())
worker.acceptPacket((Packet) e.getMessage());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
// На канале произошло исключение. Выводим ошибку, закрываем канал.
Server.logger.log(Level.WARNING, "Exception from downstream", e.getCause());
ctx.getChannel().close();
}
}
Worker может посылать игроку данные просто функцией channel.write(packet), где channel — канал игрока, который передаётся ему при подключении, а packet — объект класса Packet. За кодирование пакетов будет отвечать уже Encoder.
Decoder и Encoder
Собственно, сама важная часть системы — они отвечают за формирование пакетов Packet из потока пользователя и за отправку таких же пакетов в поток.
Encoder очень прост, он отправляет пакеты игроку:
public class PacketFrameEncoder extends OneToOneEncoder {
@Override
protected Object encode(ChannelHandlerContext channelhandlercontext, Channel channel, Object obj) throws Exception {
if(!(obj instanceof Packet))
return obj; // Если это не пакет, то просто пропускаем его дальше
Packet p = (Packet) obj;
ChannelBuffer buffer = ChannelBuffers.dynamicBuffer(); // Создаём динамический буфер для записи в него данных из пакета. Если Вы точно знаете длину пакета, Вам не обязательно использовать динамический буфер — ChannelBuffers предоставляет и буферы фиксированной длинны, они могут быть эффективнее.
Packet.write(p, buffer); // Пишем пакет в буфер
return buffer; // Возвращаем буфер, который и будет записан в канал
}
}
Decoder уже гораздо сложнее. Дело в том, что в буфере, пришедшем от клиента, может просто не оказаться достаточного количества байт для чтения всего пакета. В этом случае, нам поможет класс ReplayingDecoder. Нам всего лишь нужно реализовать его функцию decode и читать в ней данные из потока, не заботясь не о чём:
public class PacketFrameDecoder extends ReplayingDecoder<VoidEnum> {
@Override
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
ctx.sendUpstream(e);
}
@Override
public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
ctx.sendUpstream(e);
}
@Override
protected Object decode(ChannelHandlerContext arg0, Channel arg1, ChannelBuffer buffer, VoidEnum e) throws Exception {
return Packet.read(buffer);
}
}
Спрашивается, как это работает? Очень просто, перед вызовом функции decode декодер помечает текущий индекс чтения, если при чтении из буфера в нём не хватит данных, будет сгенерировано исключение. При этом буфер вернётся в начальное положение и decode будет повторён, когда больше данных будет получено от пользователя. В случае успешного чтения (возвращён не null), декодер попытается вызвать функции decode ещё раз, уже на оставшихся в буфере данных, если в нём есть ещё хотя бы один байт.
Не медленно ли всё это работает, если он генерирует исключение? Медленнее, чем, например, проверка количества данных в буфере и оценка, хватит ли их для чтения пакета. Но он использует кэшированное исключение, поэтому не тратится время на заполнения stacktrace и даже создание нового объекта исключения. Подробнее об и некоторых других, повышающих эффективность, функцийя ReplayingDecoder можно почитать здесь
Вы так же можете поэкспериментировать с FrameDecoder'ом, если, например, Вы можете заранее определить размер пакета по его ID.
Кажется, это всё
Результаты получились отличными. Во-первых, сервер больше не сыпет тысячей потоков — 4 потока Netty + 4 потока обработки данных прекрасно справляются с 250+ клиентами (тестирование продолжается). Во-вторых, нагрузка на процессор стала значительно меньшей и перестала линейно расти от числа подключений. В-третьих, время отклика в некоторых случаях стало меньше.
Надеюсь кому-нибудь это будет полезно. Старалась передать как можно больше важных данных, могла переборщить. Примеров ведь много не бывает? Спрашивайте Ваши ответы и не судите строго — первый раз пишу на хабр.
Постскриптум: ещё несколько полезных вещей
У Netty есть ещё несколько интересных особенностей, которые заслуживают отдельного упоминания:
Во-первых, остановка сервера:
ChannelFuture future = channel.close();
future.awaitUninterruptibly();
Где channel — канал, который возвратила функция bind в начале. future.awaitUninterruptibly() дождётся, пока канал закроется и выполнение кода продолжится.
Самое интересное: ChannelFuture. Когда мы отправляем на канал пакет, функцией channel.write(packet), она возвращает ChannelFuture — это особый объект, который отслеживает состояние выполняемого действия. Через него можно проверить, выполнилось ли действие.
Например, мы хотим послать клиенту пакет отключения и закрыть за ним канал. Если мы сделаем
channel.write(new Packet255KickDisconnect("Пока!"));
channel.close();
то с вероятностью 99%, мы получим ChannelClosedException и пакет до клиента не дойдёт. Но можно сделать так:
ChannelFuture future = channel.write(new Packet255KickDisconnect("Пока!"));
try {
future.await(10000); // Ждём не более 10 секунд, пока действие закончится
} catch(InterruptedException e) {}
channel.close();
То всё будет супер, кроме того, что это может заблокировать поток выполнения, пока пакет не отправится пользователю. Поэтому на ChannelFuture можно повесит listener — объект, который будет уведомлён о том, что событие совершилось и выполнит какие-либо действия. Для закрытия соединения есть уже готовый listener ChannelFutureListener.CLOSE. Пример использования:
ChannelFuture future = channel.write(new Packet255KickDisconnect("Пока!"));
furute.addListener(ChannelFutureListener.CLOSE);
Эффект тот же, блокировок нет. Разобраться в том, как создать свой листенер не сложно — там всего одна функция. Откройте любой готовый класс, здесь я не буду приводить пример.
Ещё важная информация
Как правильно было замечено в комментариях, следует предупредить о том, что в обработчиках (handler-ах, которые висят на pipeline) лучше не стоит использовать блокирующие операции или ожидание. В противном случае, Вы рискуете навсегда потерять поток обработки или просто сильно затормозить обработку событий остальных клиентов.
Так же в обработчике ни в коем случае нельзя «ждать будущего», т.е. выполнять .await() или .awaitUninterruptibly() на любом ChannelFuture. Во-первых, у Вас ничего не получится, их нельзя вызывать из обработчиков — система не даст сделать такую глупость и сгенерирует исключение. Во-вторых, если бы этого не было, Ваш поток опять же мог бы умереть оставив других клиентов без обслуживания.
Вообще, все действия, выполняемые в ChannelHandler'ах должны быть как можно более простыми и неблокирующими. Ни в коем случае не обрабатывайте данные прямо в них — кладите пакеты в очередь и обрабатывайте их в другом потоке.