Введение
В это статье освещается работа UDP Tracker Protocol. Все примеры, приведенные в статье, будут на Java с использованием NIO-фреймворка Netty. В качестве БД взята MongoDB.
Обычно торрент-трекеры работают через протокол HTTP, передавая данные посредством GET-запросов. Работа трекера по протоколу UDP позволяет существенно сократить траффик (более чем в 2 раза), а так же избавиться от ограничения на количество одновременных соединений, которое накладывает протокол TCP.
Ссылка на UDP-трекер в клиенте может выглядеть так: udp://tracker.openbittorrent.com:80/announce, где на месте announce может быть что угодно (либо вообще ничего). А вот указание порта обязательно, в отличие от HTTP трекера.
Общие принципы протокола
Теперь о том, как в общих чертах работает UDP-трекер.
1. Сначала клиент посылает трекеру запрос на соединение (пакет 0x00 Connect). В этом запросе поле connection ID равно 0x41727101980 — это идентификатор протокола. Кроме того, клиент передает ID транзакции, выбранный им случайно.
2. Далее сервер создает клиенту уникальный connection ID, который и передает в ответном пакете. При этом сервер обязан передать ID транзакции, который он получил от клиента.
3. Клиент теперь имеет уникальный ID (который, впрочем, не особо и нужен, если это открытый трекер без регистрации пользователей и учета траффика.) и может слать нам пакеты с анонсами.
4. В ответ на анонс сервер отдает список пиров торрента, интервал обращений клиента к серверу и статистику сидов/пиров.
5. Ещё клиент может отправлять нам Scrape-запросы, где передается несколько хешей торрентов, к которым он хочет получить статистику. Количество запрашиваемых торрентов за 1 запрос не может превышать 74 ввиду ограничений протокола UDP.
Разработка сервера
На данном этапе я советую вам скачать исходники трекера, т.к. в статье я опишу только ключевые моменты. Скачать исходники и используемые библиотеки можно здесь: github.com/lafayette/udp-torrent-tracker
Инициализация Netty.
Executor threadPool = Executors.newCachedThreadPool();
DatagramChannelFactory factory = new NioDatagramChannelFactory(threadPool); // Создаем фабрику для UDP.
bootstrap = new ConnectionlessBootstrap(factory);
bootstrap.getPipeline().addLast("handler", new Handler()); // Handler будет принимать все сообщения от клиентов.
bootstrap.setOption("reuseAddress", true); // При нештатном завершении работы сервера остается открытым порт. К сожалению, ничего лучше reuseAddress придумать не удалось. Буду благодарен, если кто-то подскажет более правильное решение.
// В ShutdowHook мы можем задать действия, которые будут произведены по заверешении работы Netty. Например, закрыть канал и освободить ресурсы.
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
public void run() {
channel.close();
bootstrap.releaseExternalResources();
}
}));
String host = Config.getInstance().getString("listen.host", "127.0.0.1");
Integer port = Config.getInstance().getInt("listen.port", 8080);
InetSocketAddress address = new InetSocketAddress(host, port); // Создаем объект, который указывает, на каком адресе и порте слушать. Можно указать только порт и тогда Netty будет слушать на всех интерфейсах.
logger.info("Listening on " + host + ":" + port);
// И в заключение мы вызываем метод bind, который запускает позволяет начать слушать заданный порт.
bootstrap.bind(address);
Получение сообщений от клиентов.
public class Handler extends SimpleChannelUpstreamHandler {
private static final Logger logger = Logger.getLogger(Handler.class);
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
ChannelBuffer channelBuffer = (ChannelBuffer)e.getMessage(); // Из ChannelBuffer мы будем читать пришедший нам udp-пакет.
// Каждый пакет, полученный от клиента, содержит как минимум connection ID (long), action ID (int) и transaction ID (int) и не может быть меньше 16 байт.
if (channelBuffer.readableBytes() < 16) {
logger.debug("Incorrect packet received from " + e.getRemoteAddress());
}
long connectionId = channelBuffer.readLong(); // Здесь можно проверять connectionId, но у нас открытый трекер без учета пользователей и мы можем игнорировать это.
int actionId = channelBuffer.readInt(); // ID действия. Может быть: 0x00 Connect; 0x01 Announce; 0x02 Scrape; 0x03: Error. Последний (ошибку) отправляет только сервер.
int transactionId = channelBuffer.readInt(); // ID транзакции. В ответе мы обязаны отправить принятый ID транзакции, иначе клиент нас проигнорирует.
Action action = Action.byId(actionId);
ClientRequest request;
switch (action) {
case CONNECT:
request = new ConnectionRequest();
break;
case ANNOUNCE:
request = new AnnounceRequest();
break;
case SCRAPE:
request = new ScrapeRequest();
break;
default:
logger.debug("Incorrect action supplied");
ErrorResponse.send(e, transactionId, "Incorrect action");
return;
}
// Здесь мы передадим в обработчик запроса все необходимые данные, включая заголовок из идентификаторов соединения, действия и транзакции.
request.setContext(ctx);
request.setMessageEvent(e);
request.setChannelBuffer(channelBuffer);
request.setConnectionId(connectionId);
request.setAction(action);
request.setTransactionId(transactionId);
// Остается лишь вызвать чтение оставшихся данных для притятого пакета.
request.read();
}
}
MongoDB
Для работы с MongoDB я воспользовался замечательной библиотекой для маппинга — Morphia.
Вот как я описал класс для хранения пира:
@Entity("peers")
public class Peer {
public @Id ObjectId id;
public @Indexed byte[] infoHash;
public byte[] peerId;
public long downloaded;
public long left;
public long uploaded;
public @Transient int event;
public int ip;
public short port;
public @Transient int key;
public @Transient int numWant;
public @Transient short extensions;
public long lastUpdate;
@PrePersist
private void prePersist() {
this.lastUpdate = System.currentTimeMillis();
}
}
Аннотация Transient означает, что мы не сохраняем это поле в таблицу. Эти поля нам будут нужны только для обработки запроса. Поле infoHash помечено аннотацией Indexed, т.к. мы будем искать подходящих пиров именно по хэшу торрента.
Так же нам нужно создать подключение к БД. Делается это довольно просто:
morphia = new Morphia();
morphia.map(Peer.class); // Говорим Морфии, что хотим маппить этот класс.
mongo = new Mongo(host, port);
datastore = morphia.createDatastore(mongo, "udptracker");
И пример поиска пиров по info_hash
Query<Peer> peersQuery = Datastore.instance().find(Peer.class);
peersQuery.field("infoHash").equal(peer.infoHash);
peersQuery.field("peerId").notEqual(peer.peerId); // Исключаем себя из списка.
peersQuery.limit(peer.numWant).offset(randomOffset); // Ограничение по numWant и случайный набор пиров.
Для лучшего понимания лучше будет заглянуть в документацию Morphia.
В остальном все довольно просто: из полученного СhannelBuffer читаем данные от клиента, а затем в e.getChannel() отправляем ответ. Реализацию всех пакетов вы можете посмотреть в исходниках.
Кроме того для лучшего понимания протокола советую изучить xbtt.sourceforge.net/udp_tracker_protocol.html
Исходники вышеописанного сервера: github.com/lafayette/udp-torrent-tracker
P.S. Сразу хочу сказать, что это первый мой опыт работы как с Netty, так и с MongoDB. По сути я на этом проекте изучал обе этих замечательных вещи. Поэтому очень приветствуются советы как можно было сделать лучше/красивее/по-джедайски.