Преамбула
Здравствуйте. Я являюсь главным разработчиком крупнейшего в СНГ сервера Minecraft (не буду рекламировать, кому надо, те знают). Уже почти год мы пишем свою реализацию сервера, рассчитанную на больше чем 40 человек (мы хотим видеть цифру в 500 хотя бы). Пока всё было удачно, но последнее время система начала упираться в то, что из-за не самой удачной реализации сети (1 поток на ввод, 1 на вывод + 1 на обработку), при 300 игроках онлайн работает более 980 потоков (+ системные), что в сочетании с производительностью дефолтного io Явы даёт огромное падение производительности, и уже при 100 игроках сервер в основном занимается тем, что пишет/читает в/из сети.
Поэтому я решила переходить на NIO. В руки совершенно случайно попала библиотека Netty, структура которой показалась просто идеально подходящей для того, чтобы встроить её в уже готовое работающее решение. К сожалению, мануалов по Netty мало не только на русском, но и на английском языках, поэтому приходилось много экспериментировать и лазить в код библиотеки, чтобы найти лучший способ.
Здесь я постараюсь расписать серверную часть работы с сетью через Netty, может быть это кому-то будет полезно.
Создание сервера
ExecutorService bossExec = new OrderedMemoryAwareThreadPoolExecutor(1, 400000000, 2000000000, 60, TimeUnit.SECONDS); ExecutorService ioExec = new OrderedMemoryAwareThreadPoolExecutor(4 /* число рабочих потоков */, 400000000, 2000000000, 60, TimeUnit.SECONDS); ServerBootstrap networkServer = new ServerBootstrap(new NioServerSocketChannelFactory(bossExec, ioExec, 4 /* то же самое число рабочих потоков */)); networkServer.setOption("backlog", 500); networkServer.setOption("connectTimeoutMillis", 10000); networkServer.setPipelineFactory(new ServerPipelineFactory()); Channel channel = networkServer.bind(new InetSocketAddress(address, port));
Используется OrderedMemoryAwareThreadPoolExecutor для выполнения задач Netty, по опыту французских коллег они самые эффективные. Можно использовать другие Executor'ы, например Executors.newFixedThreadPool(n). Ни в коем случае не используйте Executors.newCachedThreadPool(), он создаёт неоправданно много потоков и ни какого выигрыша от Netty почти нет. Использовать более 4 рабочих потоков нет смысла, т.к. они более чем справляются с огромной нагрузкой (программисты из Xebia-France на 4 потоках тянули более 100 000 одновременных подключений). Босс-потоки должны быть по одному на каждый слушаемый порт. Channel, который возвращает функция bind, а так же ServerBootsrap необходимо сохранить, чтобы потом можно было остановить сервер.
PipelineFactory
То, как будут обрабатываться подключения и пакеты клиента, определяет PipelineFactory, которая при открытии канала с клиентом создаёт для него pipeline, в котором определены обработчики событий, которые происходят на канале. В нашем случае, это ServerPipelineFactory:
public class ServerPipelineFactory implements ChannelPipelineFactory { @Override public ChannelPipeline getPipeline() throws Exception { PacketFrameDecoder decoder = new PacketFrameDecoder(); PacketFrameEncoder encoder = new PacketFrameEncoder(); return Channels.pipeline(decoder, encoder, new PlayerHandler(decoder, encoder)); } }
В данном коде PacketFrameDecoder, PacketFrameEncoder и PlayerHandler — обработчки событий, которые мы определяем. Функция Channels.pipeline() создаёт новый pipeline с переданными ей обработчиками. Будьте внимательны: события проходят обработчики в том порядке, в котором Вы передали из функции pipeline!
Протокол
Немного опишу протокол, чтобы дальше было понятно.
Обмен данными происходит с помощью объектов классов, расширяющих класс Packet, в которых определены две функции, get(ChannelBuffer input) и send(ChannelBuffer output). Соответственно, первая функция читает необходимые данные из канала, вторая — пишет данные пакета в канал.
public abstract class Packet { public static Packet read(ChannelBuffer buffer) throws IOException { int id = buffer.readUnsignedShort(); // Получаем ID пришедшего пакета, чтобы определить, каким классом его читать Packet packet = getPacket(id); // Получаем инстанс пакета с этим ID if(packet == null) throw new IOException("Bad packet ID: " + id); // Если произошла ошибка и такого пакета не может быть, генерируем исключение packet.get(buffer); // Читаем в пакет данные из буфера return packet; } public statuc Packet write(Packet packet, ChannelBuffer buffer) { buffer.writeChar(packet.getId()); // Отправляем ID пакета packet.send(buffer); // Отправляем данные пакета } // Функции, которые должен реализовать каждый класс пакета public abstract void get(ChannelBuffer buffer); public abstract void send(ChannelBuffer buffer); }
Пример пары пакетов для наглядности:
// Пакет, которым клиент передаёт серверу свой логин public class Packet1Login extends Packet { public String login; public void get(ChannelBuffer buffer) { int length = buffer.readShort(); StringBuilder builder = new StringBuilder(); for(int i = 0; i < length ++i) builder.append(buffer.readChar()); login = builder.toString(); } public void send(ChannelBuffer buffer) { // Тело отправки пустое, т.к. сервер не посылает этот пакет } } // Пакет, которым сервер выкидывает клиента с указаной причиной, или клиент отключается от сервера public class Packet255KickDisconnect extends Packet { public String reason; public void get(ChannelBuffer buffer) { int length = buffer.readShort(); StringBuilder builder = new StringBuilder(); for(int i = 0; i < length ++i) builder.append(buffer.readChar()); reason = builder.toString(); } public void send(ChannelBuffer buffer) { buffer.writeShort(reason.length()); for(int i = 0; i < reason.length(); ++i) { buffer.writeChar(reason.getCharAt(i)); } } }
ChannelBuffer очень похож на DataInputStream и DataOutputStream в одном лице. Большинство функций если не такие же, то очень похожи. Заметьте, что я не забочусь о проверке того, хватает ли в буфере байт для чтения, как будто я работаю с блокирующим IO. Об этом далее…
Работа с клиентом
Работа с клиентом в основном определяется классом PlayerHandler:
public class PlayerHandler extends SimpleChannelUpstreamHandler { private PlayerWorkerThread worker; @Override public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { // Событие вызывается при подключении клиента. Я создаю здесь Worker игрока — объект, который занимается обработкой данных игрока непостредственно. // Я передаю ему канал игрока (функция e.getChannel()), чтобы он мог в него посылать пакеты worker = new PlayerWorkerThread(this, e.getChannel()); } @Override public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { // Событие закрытия канала. Используется в основном, чтобы освободить ресурсы, или выполнить другие действия, которые происходят при отключении пользователя. Если его не обработать, Вы можете и не заметить, что пользователь отключился, если он напрямую не сказал этого серверу, а просто оборвался канал. worker.disconnectedFromChannel(); } @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { // Функция принимает уже готовые Packet'ы от игрока, поэтому их можно сразу посылать в worker. За их формирование отвечает другой обработчик. if(e.getChannel().isOpen()) worker.acceptPacket((Packet) e.getMessage()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { // На канале произошло исключение. Выводим ошибку, закрываем канал. Server.logger.log(Level.WARNING, "Exception from downstream", e.getCause()); ctx.getChannel().close(); } }
Worker может посылать игроку данные просто функцией channel.write(packet), где channel — канал игрока, который передаётся ему при подключении, а packet — объект класса Packet. За кодирование пакетов будет отвечать уже Encoder.
Decoder и Encoder
Собственно, сама важная часть системы — они отвечают за формирование пакетов Packet из потока пользователя и за отправку таких же пакетов в поток.
Encoder очень прост, он отправляет пакеты игроку:
public class PacketFrameEncoder extends OneToOneEncoder { @Override protected Object encode(ChannelHandlerContext channelhandlercontext, Channel channel, Object obj) throws Exception { if(!(obj instanceof Packet)) return obj; // Если это не пакет, то просто пропускаем его дальше Packet p = (Packet) obj; ChannelBuffer buffer = ChannelBuffers.dynamicBuffer(); // Создаём динамический буфер для записи в него данных из пакета. Если Вы точно знаете длину пакета, Вам не обязательно использовать динамический буфер — ChannelBuffers предоставляет и буферы фиксированной длинны, они могут быть эффективнее. Packet.write(p, buffer); // Пишем пакет в буфер return buffer; // Возвращаем буфер, который и будет записан в канал } }
Decoder уже гораздо сложнее. Дело в том, что в буфере, пришедшем от клиента, может просто не оказаться достаточного количества байт для чтения всего пакета. В этом случае, нам поможет класс ReplayingDecoder. Нам всего лишь нужно реализовать его функцию decode и читать в ней данные из потока, не заботясь не о чём:
public class PacketFrameDecoder extends ReplayingDecoder<VoidEnum> { @Override public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { ctx.sendUpstream(e); } @Override public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { ctx.sendUpstream(e); } @Override protected Object decode(ChannelHandlerContext arg0, Channel arg1, ChannelBuffer buffer, VoidEnum e) throws Exception { return Packet.read(buffer); } }
Спрашивается, как это работает? Очень просто, перед вызовом функции decode декодер помечает текущий индекс чтения, если при чтении из буфера в нём не хватит данных, будет сгенерировано исключение. При этом буфер вернётся в начальное положение и decode будет повторён, когда больше данных будет получено от пользователя. В случае успешного чтения (возвращён не null), декодер попытается вызвать функции decode ещё раз, уже на оставшихся в буфере данных, если в нём есть ещё хотя бы один байт.
Не медленно ли всё это работает, если он генерирует исключение? Медленнее, чем, например, проверка количества данных в буфере и оценка, хватит ли их для чтения пакета. Но он использует кэшированное исключение, поэтому не тратится время на заполнения stacktrace и даже создание нового объекта исключения. Подробнее об и некоторых других, повышающих эффективность, функцийя ReplayingDecoder можно почитать здесь
Вы так же можете поэкспериментировать с FrameDecoder'ом, если, например, Вы можете заранее определить размер пакета по его ID.
Кажется, это всё
Результаты получились отличными. Во-первых, сервер больше не сыпет тысячей потоков — 4 потока Netty + 4 потока обработки данных прекрасно справляются с 250+ клиентами (тестирование продолжается). Во-вторых, нагрузка на процессор стала значительно меньшей и перестала линейно расти от числа подключений. В-третьих, время отклика в некоторых случаях стало меньше.
Надеюсь кому-нибудь это будет полезно. Старалась передать как можно больше важных данных, могла переборщить. Примеров ведь много не бывает? Спрашивайте Ваши ответы и не судите строго — первый раз пишу на хабр.
Постскриптум: ещё несколько полезных вещей
У Netty есть ещё несколько интересных особенностей, которые заслуживают отдельного упоминания:
Во-первых, остановка сервера:
ChannelFuture future = channel.close(); future.awaitUninterruptibly();
Где channel — канал, который возвратила функция bind в начале. future.awaitUninterruptibly() дождётся, пока канал закроется и выполнение кода продолжится.
Самое интересное: ChannelFuture. Когда мы отправляем на канал пакет, функцией channel.write(packet), она возвращает ChannelFuture — это особый объект, который отслеживает состояние выполняемого действия. Через него можно проверить, выполнилось ли действие.
Например, мы хотим послать клиенту пакет отключения и закрыть за ним канал. Если мы сделаем
channel.write(new Packet255KickDisconnect("Пока!")); channel.close();
то с вероятностью 99%, мы получим ChannelClosedException и пакет до клиента не дойдёт. Но можно сделать так:
ChannelFuture future = channel.write(new Packet255KickDisconnect("Пока!")); try { future.await(10000); // Ждём не более 10 секунд, пока действие закончится } catch(InterruptedException e) {} channel.close();
То всё будет супер, кроме того, что это может заблокировать поток выполнения, пока пакет не отправится пользователю. Поэтому на ChannelFuture можно повесит listener — объект, который будет уведомлён о том, что событие совершилось и выполнит какие-либо действия. Для закрытия соединения есть уже готовый listener ChannelFutureListener.CLOSE. Пример использования:
ChannelFuture future = channel.write(new Packet255KickDisconnect("Пока!")); furute.addListener(ChannelFutureListener.CLOSE);
Эффект тот же, блокировок нет. Разобраться в том, как создать свой листенер не сложно — там всего одна функция. Откройте любой готовый класс, здесь я не буду приводить пример.
Ещё важная информация
Как правильно было замечено в комментариях, следует предупредить о том, что в обработчиках (handler-ах, которые висят на pipeline) лучше не стоит использовать блокирующие операции или ожидание. В противном случае, Вы рискуете навсегда потерять поток обработки или просто сильно затормозить обработку событий остальных клиентов.
Так же в обработчике ни в коем случае нельзя «ждать будущего», т.е. выполнять .await() или .awaitUninterruptibly() на любом ChannelFuture. Во-первых, у Вас ничего не получится, их нельзя вызывать из обработчиков — система не даст сделать такую глупость и сгенерирует исключение. Во-вторых, если бы этого не было, Ваш поток опять же мог бы умереть оставив других клиентов без обслуживания.
Вообще, все действия, выполняемые в ChannelHandler'ах должны быть как можно более простыми и неблокирующими. Ни в коем случае не обрабатывайте данные прямо в них — кладите пакеты в очередь и обрабатывайте их в другом потоке.
