Введение
В это статье освещается работа UDP Tracker Protocol. Все примеры, приведенные в статье, будут на Java с использованием NIO-фреймворка Netty. В качестве БД взята MongoDB.
Обычно торрент-трекеры работают через протокол HTTP, передавая данные посредством GET-запросов. Работа трекера по протоколу UDP позволяет существенно сократить траффик (более чем в 2 раза), а так же избавиться от ограничения на количество одновременных соединений, которое накладывает протокол TCP.
Ссылка на UDP-трекер в клиенте может выглядеть так: udp://tracker.openbittorrent.com:80/announce, где на месте announce может быть что угодно (либо вообще ничего). А вот указание порта обязательно, в отличие от HTTP трекера.
Общие принципы протокола
Теперь о том, как в общих чертах работает UDP-трекер.
1. Сначала клиент посылает трекеру запрос на соединение (пакет 0x00 Connect). В этом запросе поле connection ID равно 0x41727101980 — это идентификатор протокола. Кроме того, клиент передает ID транзакции, выбранный им случайно.
2. Далее сервер создает клиенту уникальный connection ID, который и передает в ответном пакете. При этом сервер обязан передать ID транзакции, который он получил от клиента.
3. Клиент теперь имеет уникальный ID (который, впрочем, не особо и нужен, если это открытый трекер без регистрации пользователей и учета траффика.) и может слать нам пакеты с анонсами.
4. В ответ на анонс сервер отдает список пиров торрента, интервал обращений клиента к серверу и статистику сидов/пиров.
5. Ещё клиент может отправлять нам Scrape-запросы, где передается несколько хешей торрентов, к которым он хочет получить статистику. Количество запрашиваемых торрентов за 1 запрос не может превышать 74 ввиду ограничений протокола UDP.
Разработка сервера
На данном этапе я советую вам скачать исходники трекера, т.к. в статье я опишу только ключевые моменты. Скачать исходники и используемые библиотеки можно здесь: github.com/lafayette/udp-torrent-tracker
Инициализация Netty.
Executor threadPool = Executors.newCachedThreadPool(); DatagramChannelFactory factory = new NioDatagramChannelFactory(threadPool); // Создаем фабрику для UDP. bootstrap = new ConnectionlessBootstrap(factory); bootstrap.getPipeline().addLast("handler", new Handler()); // Handler будет принимать все сообщения от клиентов. bootstrap.setOption("reuseAddress", true); // При нештатном завершении работы сервера остается открытым порт. К сожалению, ничего лучше reuseAddress придумать не удалось. Буду благодарен, если кто-то подскажет более правильное решение. // В ShutdowHook мы можем задать действия, которые будут произведены по заверешении работы Netty. Например, закрыть канал и освободить ресурсы. Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { public void run() { channel.close(); bootstrap.releaseExternalResources(); } })); String host = Config.getInstance().getString("listen.host", "127.0.0.1"); Integer port = Config.getInstance().getInt("listen.port", 8080); InetSocketAddress address = new InetSocketAddress(host, port); // Создаем объект, который указывает, на каком адресе и порте слушать. Можно указать только порт и тогда Netty будет слушать на всех интерфейсах. logger.info("Listening on " + host + ":" + port); // И в заключение мы вызываем метод bind, который запускает позволяет начать слушать заданный порт. bootstrap.bind(address);
Получение сообщений от клиентов.
public class Handler extends SimpleChannelUpstreamHandler { private static final Logger logger = Logger.getLogger(Handler.class); public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { ChannelBuffer channelBuffer = (ChannelBuffer)e.getMessage(); // Из ChannelBuffer мы будем читать пришедший нам udp-пакет. // Каждый пакет, полученный от клиента, содержит как минимум connection ID (long), action ID (int) и transaction ID (int) и не может быть меньше 16 байт. if (channelBuffer.readableBytes() < 16) { logger.debug("Incorrect packet received from " + e.getRemoteAddress()); } long connectionId = channelBuffer.readLong(); // Здесь можно проверять connectionId, но у нас открытый трекер без учета пользователей и мы можем игнорировать это. int actionId = channelBuffer.readInt(); // ID действия. Может быть: 0x00 Connect; 0x01 Announce; 0x02 Scrape; 0x03: Error. Последний (ошибку) отправляет только сервер. int transactionId = channelBuffer.readInt(); // ID транзакции. В ответе мы обязаны отправить принятый ID транзакции, иначе клиент нас проигнорирует. Action action = Action.byId(actionId); ClientRequest request; switch (action) { case CONNECT: request = new ConnectionRequest(); break; case ANNOUNCE: request = new AnnounceRequest(); break; case SCRAPE: request = new ScrapeRequest(); break; default: logger.debug("Incorrect action supplied"); ErrorResponse.send(e, transactionId, "Incorrect action"); return; } // Здесь мы передадим в обработчик запроса все необходимые данные, включая заголовок из идентификаторов соединения, действия и транзакции. request.setContext(ctx); request.setMessageEvent(e); request.setChannelBuffer(channelBuffer); request.setConnectionId(connectionId); request.setAction(action); request.setTransactionId(transactionId); // Остается лишь вызвать чтение оставшихся данных для притятого пакета. request.read(); } }
MongoDB
Для работы с MongoDB я воспользовался замечательной библиотекой для маппинга — Morphia.
Вот как я описал класс для хранения пира:
@Entity("peers") public class Peer { public @Id ObjectId id; public @Indexed byte[] infoHash; public byte[] peerId; public long downloaded; public long left; public long uploaded; public @Transient int event; public int ip; public short port; public @Transient int key; public @Transient int numWant; public @Transient short extensions; public long lastUpdate; @PrePersist private void prePersist() { this.lastUpdate = System.currentTimeMillis(); } }
Аннотация Transient означает, что мы не сохраняем это поле в таблицу. Эти поля нам будут нужны только для обработки запроса. Поле infoHash помечено аннотацией Indexed, т.к. мы будем искать подходящих пиров именно по хэшу торрента.
Так же нам нужно создать подключение к БД. Делается это довольно просто:
morphia = new Morphia(); morphia.map(Peer.class); // Говорим Морфии, что хотим маппить этот класс. mongo = new Mongo(host, port); datastore = morphia.createDatastore(mongo, "udptracker");
И пример поиска пиров по info_hash
Query<Peer> peersQuery = Datastore.instance().find(Peer.class); peersQuery.field("infoHash").equal(peer.infoHash); peersQuery.field("peerId").notEqual(peer.peerId); // Исключаем себя из списка. peersQuery.limit(peer.numWant).offset(randomOffset); // Ограничение по numWant и случайный набор пиров.
Для лучшего понимания лучше будет заглянуть в документацию Morphia.
В остальном все довольно просто: из полученного СhannelBuffer читаем данные от клиента, а затем в e.getChannel() отправляем ответ. Реализацию всех пакетов вы можете посмотреть в исходниках.
Кроме того для лучшего понимания протокола советую изучить xbtt.sourceforge.net/udp_tracker_protocol.html
Исходники вышеописанного сервера: github.com/lafayette/udp-torrent-tracker
P.S. Сразу хочу сказать, что это первый мой опыт работы как с Netty, так и с MongoDB. По сути я на этом проекте изучал обе этих замечательных вещи. Поэтому очень приветствуются советы как можно было сделать лучше/красивее/по-джедайски.
