Java: Socks 4 Proxy работа с неблокирующими сокетами

    Начиная с версии 1.4 в j2se появился package java.nio, который позволяет работать с сокетами в неблокирующем режиме, что зачастую повышает производительность, упрощает код и даёт дополнительные возможности и функционал. А начиная с версии j2se 1.6 на серверах под упралением ОС линукс(kernel 2.6) реализация класса Selector выполнена с использованием epoll, что обеспечивает максимально возможную производительность.

    В примере описанном ниже я постараюсь продемонстрироватьь основные принципе работы с неблокирующими сокетами, на примере вполне реальной задачи – реализации Socks 4 прокси сервер.

    Во время жизни с неблокирующим сокетом может приключиться всякое, а именно

    ServerSocketChannel
    • OP_ACCEPT – входящее соединение

    SocketChannel
    • OP_READ – на соске данные или дисконнект
    • OP_WRITE – соска готова к записи или дисконнект
    • OP_CONNECT – соединение или установлено или нет



    Выбираются сокеты на которых что-то случилось при помощи одного из методов
    • select() – блокирующий метод, просыпается по событию или по wakeUp()
    • select(long) – та же тема только с таймаутом
    • selectNow() – ну и неблокирующий вариант


    В нашем случае прокся штука пассивная, поэтому нам больше подходит базовый блокирующий select().
    После этого нужно запросить у селектора ключи которые проявили активность за последнюю выборку и используя методы isAcceptable(), isReadable(), isWriteable(), isConnectable() узнать что с ними произошло.

    Основной алгоритм работы нашего прокси сервера такой:
    1. Принимаем соединение
    2. Парсим заголовок (для упрощения этого шага мы предполагаем что размер заголовка всегда меньше размера буфера)
    3. Устанавливаем соединение с целью
    4. Отвечаем клиенту что всё ОК
    5. Проксируем
    6. Закрываем соединения


    Чтобы избежать проблем с полными сокет буферами проксировать будем следующим образом:
    Пусть у нас два конца A и B при этом A.in=B.out и наоборот, следовательно A.interestOps()|OP_READ!= B.interestOps()|OP_WRITE (чтобы один буфер одновременно не использовался двумя каналами).
    После того как одна из сторон закроет соединение, надо дописать данные из буфера второй стороне и закрыть соединение.

    Ну и собственно сам код, функции старался расположить в порядке действий для упрощения понимания алгоритма, комментарии прилагаются.
    package ru.habrahabr;
     
    import java.io.IOException;
    import java.net.InetAddress;
    import java.net.InetSocketAddress;
    import java.net.UnknownHostException;
    import java.nio.ByteBuffer;
    import java.nio.channels.ClosedChannelException;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.nio.channels.spi.SelectorProvider;
    import java.util.Iterator;
     
    /**
     * Класс реализующий простой неблокирующий Socks 4 Proxy Сервер Реализуюший
     * только команду connect
     * 
     * @author dgreen
     * @date 19.09.2009
     * 
     */

    public class Socks4Proxy implements Runnable {
        int bufferSize = 8192;
        /**
         * Порт
         */

        int port;
        /**
         * Хост
         */

        String host;
     
        /**
         * Дополнительная информация цепляемая к каждому ключу {@link SelectionKey}
         * 
         * @author dgreen
         * @date 19.09.2009
         * 
         */

        static class Attachment {
            /**
             * Буфер для чтения, в момент проксирования становится буфером для
             * записи для ключа хранимого в peer
             * 
             * ВАЖНО: При парсинге Socks4 заголовком мы предполагаем что размер
             * буфера, больше чем размер нормального заголовка, у браузера Mozilla
             * Firefox, размер заголовка равен 12 байт 1 версия + 1 команда + 2 порт +
             * 4 ip + 3 id (MOZ) + 1 \0
             */

     
            ByteBuffer in;
            /**
             * Буфер для записи, в момент проксирования равен буферу для чтения для
             * ключа хранимого в peer
             */

            ByteBuffer out;
            /**
             * Куда проксируем
             */

            SelectionKey peer;
     
        }
     
        /**
         * так выглядит ответ ОК или Сервис предоставлен
         */

        static final byte[] OK = new byte[] { 0x00, 0x5a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 };
     
        /**
         * Сердце неблокирующего сервера, практически не меняется от приложения к
         * приложению, разве что при использование неблокирующего сервера в
         * многопоточном приложение, и работе с ключами из других потоков, надо
         * будет добавить некий KeyChangeRequest, но нам в этом приложение это без
         * надобности
         */

        @Override
        public void run() {
            try {
                // Создаём Selector
                Selector selector = SelectorProvider.provider().openSelector();
                // Открываем серверный канал
                ServerSocketChannel serverChannel = ServerSocketChannel.open();
                // Убираем блокировку
                serverChannel.configureBlocking(false);
                // Вешаемся на порт
                serverChannel.socket().bind(new InetSocketAddress(host, port));
                // Регистрация в селекторе
                serverChannel.register(selector, serverChannel.validOps());
                // Основной цикл работу неблокирующего сервер
                // Этот цикл будет одинаковым для практически любого неблокирующего
                // сервера
                while (selector.select() > -1) {
                    // Получаем ключи на которых произошли события в момент
                    // последней выборки
                    Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                    while (iterator.hasNext()) {
                        SelectionKey key = iterator.next();
                        iterator.remove();
                        if (key.isValid()) {
                            // Обработка всех возможнных событий ключа
                            try {
                                if (key.isAcceptable()) {
                                    // Принимаем соединение
                                    accept(key);
                                } else if (key.isConnectable()) {
                                    // Устанавливаем соединение
                                    connect(key);
                                } else if (key.isReadable()) {
                                    // Читаем данные
                                    read(key);
                                } else if (key.isWritable()) {
                                    // Пишем данные
                                    write(key);
                                }
                            } catch (Exception e) {
                                e.printStackTrace();
                                close(key);
                            }
                        }
                    }
                }
     
            } catch (Exception e) {
                e.printStackTrace();
                throw new IllegalStateException(e);
            }
        }
     
        /**
         * Функция принимает соединение, регистрирует ключ с интересуемым действием
         * чтение данных (OP_READ)
         * 
         * @param key
         *            ключ на котором произошло событие
         * @throws IOException
         * @throws ClosedChannelException
         */

        private void accept(SelectionKey key) throws IOExceptionClosedChannelException {
            // Приняли
            SocketChannel newChannel = ((ServerSocketChannel) key.channel()).accept();
            // Неблокирующий
            newChannel.configureBlocking(false);
            // Регистрируем в селекторе
            newChannel.register(key.selector()SelectionKey.OP_READ);
        }
     
        /**
         * Читаем данные доступные в данный момент. Функция бывает в двух состояних -
         * чтение заголовка запроса и непосредственного проксирование
         * 
         * @param key
         *            ключ на котором произошло событие
         * @throws IOException
         * @throws UnknownHostException
         * @throws ClosedChannelException
         */

        private void read(SelectionKey key) throws IOExceptionUnknownHostExceptionClosedChannelException {
            SocketChannel channel = ((SocketChannel) key.channel());
            Attachment attachment = ((Attachment) key.attachment());
            if (attachment == null) {
                // Лениво инициализируем буферы
                key.attach(attachment = new Attachment());
                attachment.in = ByteBuffer.allocate(bufferSize);
            }
            if (channel.read(attachment.in) < 1) {
                // -1 - разрыв 0 - нету места в буфере, такое может быть только если
                // заголовок превысил размер буфера
                close(key);
            } else if (attachment.peer == null) {
                // если нету второго конца :) стало быть мы читаем заголовок
                readHeader(key, attachment);
            } else {
                // ну а если мы проксируем, то добавляем ко второму концу интерес
                // записать
                attachment.peer.interestOps(attachment.peer.interestOps() | SelectionKey.OP_WRITE);
                // а у первого убираем интерес прочитать, т.к пока не записали
                // текущие данные, читать ничего не будем
                key.interestOps(key.interestOps() ^ SelectionKey.OP_READ);
                // готовим буфер для записи
                attachment.in.flip();
            }
        }
     
        private void readHeader(SelectionKey key, Attachment attachment) throws IllegalStateExceptionIOException,
                UnknownHostExceptionClosedChannelException {
            byte[] ar = attachment.in.array();
            if (ar[attachment.in.position() - 1] ==  0 ) {
                // Если последний байт \0 это конец ID пользователя.
                if (ar[ 0 ] !4 && ar[1] !1 || attachment.in.position() < 8) {
                    // Простенькая проверка на версию протокола и на валидность
                    // команды,
                    // Мы поддерживаем только conect
                    throw new IllegalStateException("Bad Request");
                } else {
                    // Создаём соединение
                    SocketChannel peer = SocketChannel.open();
                    peer.configureBlocking(false);
                    // Получаем из пакета адрес и порт
                    byte[] addr = new byte[] { ar[4], ar[5], ar[6], ar[7] };
                    int p = (((0xFF & ar[2]) << 8) + (0xFF & ar[3]));
                    // Начинаем устанавливать соединение
                    peer.connect(new InetSocketAddress(InetAddress.getByAddress(addr), p));
                    // Регистрация в селекторе
                    SelectionKey peerKey = peer.register(key.selector()SelectionKey.OP_CONNECT);
                    // Глушим запрашивающее соединение
                    key.interestOps( 0 );
                    // Обмен ключами :)
                    attachment.peer = peerKey;
                    Attachment peerAttachemtn = new Attachment();
                    peerAttachemtn.peer = key;
                    peerKey.attach(peerAttachemtn);
                    // Очищаем буфер с заголовками
                    attachment.in.clear();
                }
            }
        }
     
        /**
         * Запись данных из буфера
         * 
         * @param key
         * @throws IOException
         */

        private void write(SelectionKey key) throws IOException {
            // Закрывать сокет надо только записав все данные
            SocketChannel channel = ((SocketChannel) key.channel());
            Attachment attachment = ((Attachment) key.attachment());
            if (channel.write(attachment.out) == -1) {
                close(key);
            } else if (attachment.out.remaining() ==  0 ) {
                if (attachment.peer == null) {
                    // Дописали что было в буфере и закрываемся
                    close(key);
                } else {
                    // если всё записано, чистим буфер
                    attachment.out.clear();
                    // Добавялем ко второму концу интерес на чтение
                    attachment.peer.interestOps(attachment.peer.interestOps() | SelectionKey.OP_READ);
                    // А у своего убираем интерес на запись
                    key.interestOps(key.interestOps() ^ SelectionKey.OP_WRITE);
                }
            }
        }
     
        /**
         * Завершаем соединение
         * 
         * @param key
         *            ключ на котором произошло событие
         * @throws IOException
         */

        private void connect(SelectionKey key) throws IOException {
            SocketChannel channel = ((SocketChannel) key.channel());
            Attachment attachment = ((Attachment) key.attachment());
            // Завершаем соединение
            channel.finishConnect();
            // Создаём буфер и отвечаем OK
            attachment.in = ByteBuffer.allocate(bufferSize);
            attachment.in.put(OK).flip();
            attachment.out = ((Attachment) attachment.peer.attachment()).in;
            ((Attachment) attachment.peer.attachment()).out = attachment.in;
            // Ставим второму концу флаги на на запись и на чтение
            // как только она запишет OK, переключит второй конец на чтение и все
            // будут счастливы
            attachment.peer.interestOps(SelectionKey.OP_WRITE | SelectionKey.OP_READ);
            key.interestOps( 0 );
        }
     
        /**
         * No Comments
         * 
         * @param key
         * @throws IOException
         */

        private void close(SelectionKey key) throws IOException {
            key.cancel();
            key.channel().close();
            SelectionKey peerKey = ((Attachment) key.attachment()).peer;
            if (peerKey !null) {
                ((Attachment)peerKey.attachment()).peer=null;
                if((peerKey.interestOps()&SelectionKey.OP_WRITE)== 0 ) {
                    ((Attachment)peerKey.attachment()).out.flip();
                }
                peerKey.interestOps(SelectionKey.OP_WRITE);
            }
        }
     
        public static void main(String[] args) {
            Socks4Proxy server = new Socks4Proxy();
            server.host = "127.0.0.1";
            server.port = 1080;
            server.run();
        }
    }


    Далее открываем ваш любимый браузер, выбираем socks 4 proxy вводим 127.0.0.1:1080 и проверяем работоспособность.
    Поделиться публикацией
    Комментарии 26
      +3
      Была бы карма :) запостил бы в Java
      • НЛО прилетело и опубликовало эту надпись здесь
          +3
          :) ну постараюсь в ближайщих статьях, осветить создание своего сокет-сервера делающего чего-нить полезное, с неблокирующим коннектором и многопоточными воркерами :)
      • НЛО прилетело и опубликовало эту надпись здесь
          +1
          Спасибо за статью. На сколько я понял у вас только один поток отвечает за работу сокетами. Что бы увеличить их число нужна ли какая либо дополнительная синхронизация?
            +2
            Selector и должен работать в одном потоке, ключам даже interestOps поменять из другого нельзя.
            Пример использования нескольких потоков, планируется в следующем посте, обычно использую блокирующую очередь и N воркеров для этой задачи.

            Если предложите что полезного бы сервер мог делать, сделаю пример на базе этих полезных действий.
              0
              Ну на пример кеш, со стандартным набором команд: положить, взять, удалить, для простого типа String. На политики очистки кеша можно не заморачиваться.
                0
                Ну тогда хранилище с простым протоколом для хранения текстовых данных,
                данные будут складываться например в lucene с возможность полнотекстового поиска.

                Команда на положить:

                >key \n
                data data data\n
                data data data\n
                data 123 data data\n
                \n

                команда на поискать.

                ?123 data\n

                и дальше соответственно ответ.

                Поисковые задачи будут соответственно распаралелены.

                  0
                  Для примера достаточно складывать просто в ConcurrentMap, и доставать только по ключу. Если понадобиться можно легко будет перейти на другое хранилище.
              0
              Вообще для операции проксирования, одного потока вполне достаточно, т.к. все методы неблокируюшие, процессор тратится только на работу с доступными данными, размер которых на одной итерации ограничен размером сокет буфера или байтбуфера в который происходит чтение. Скорее всего такой сервер сможет достаточно безпроблемно прокачать 100мБ канал.

              +1
              Раз у нас Java >= 1.5, то вот это безобразие…

              Iterator <SelectionKey> iterator = selector.selectedKeys().iterator();
              while (iterator.hasNext()) {
              SelectionKey key = iterator.next();
              iterator.remove();

              }

              … наверно можно в одну строчку переписать?
              for(SelectionKey key: selector.selectedKeys()) {...}

              // -----------8K-------------

              Byte[] ar;
              int p = (((0xFF & ar[2]) << 8) + (0xFF & ar[3]));

              «О амперсанд, безсмысленный и беспощадный!»
              Разве в Byte может быть что-то болшее чем 0xFF?

              int p = (ar[2] << 8) | ar[3];

              // -----------8K-------------

              Attachment peerAttachemtn // — опечатка

              // -----------8K-------------

              p.s. вообще такие вещи проще писать на чистом Си с использованием libevent.
                +3
                >Iterator iterator = selector.selectedKeys().iterator();
                while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                iterator.remove();

                Нельзя, при одновременном проходе по коллеции и удалении элементов из нее цикл foreach вызовет ConcurrentModificationException (http://www.java2s.com/Code/Java/Collections-Data-Structure/IterateaCollectionandremoveanitemExceptionwrongversion.htm)
                Для этих целей и нужен iterator
                  0
                  Нельзя, при одновременном проходе по коллеции и удалении элементов из нее цикл foreach вызовет ConcurrentModificationException ну для этих целей ещё можно проходится по коллекции в обратном направлении. Помогает
                    0
                    Блин, только проснулся) Сделаем по — другому…

                    >>Нельзя, при одновременном проходе по коллеции и удалении элементов из нее цикл foreach вызовет ConcurrentModificationException
                    Ну, для этих целей ещё можно проходится по коллекции в обратном направлении. Помогает
                      0
                      А можно пример для Сета :)
                        0
                        for (int i = collection.size() — 1; i > 0; i--)
                        collection.remove(i);
                          0
                          java.sun.com/javase/6/docs/api/java/util/Set.html
                          не вижу remove по индексу, в сете вообще нету индексов :) и правда ещё не проснулся :)
                            0
                            >>А можно пример для Сета :)

                            Омг)))) Я понял о чем вы говорите))))) Так, я пошёл делать себе кофе )
                    +1
                    амперсанд тут по делу… рекомендую выполнить

                    byte x = (byte)0xFF;
                    byte y = (byte)0xFF;
                    System.out.println (((0xFF & x) << 8) + (0xFF & y));
                    System.out.println ((x << 8) | y);

                    и удивиться. дело в том, что в Java побитовые операции применяются только к интам и лонгам (в байткоде не предусмотрены побитовые операции для байтов), поэтому перед применением операции сдвига происходит знаковое расширение байта до инта…
                      0
                      Спасибо. Крутое западло, не знал.
                    +3
                    Отличная статья! Хочу лишь заметить, что для реальных задачь все же лучше использовать Apache Mina — обертку над NIO.
                      +1
                      Я смотрел на MINA, на Grizzly и Netty. Остановился в конце-концов на Netty.
                      0
                      Зачем Runnable? Если нужен все таки поток, то вы не через run() start-уйте ;-)
                      Надеюсь у вас появится возможность сделать сравнение linux kern2.6 vs kern2.4 vs xp.
                        +1
                        accept лучше делать в отдельном треде классическим блокирующим способом, lattency от этого сильно выигрывает
                          0
                          Не поспоришь, конечно же аццептить будет намного быстрее, только это может вызвать усложнение кода при регистрации сокета в селекторе, скорее всего это операция возможна только в том же треде, где происходит select(), по крайней мере с установкой interstOps всё именно так и приходится кидать ChangeRequest в очередь на выполнение и кричать selector.wakeUp(), т.е. после accept скорее всего надо будет сделать тоже самое, только RegisterRequest.
                            0
                            да, но выигрышь того стоит. Плюс еще в acceptor'е можно разбрасывать соединения на несколько реакторов (чтобы лучше загружать ядра)

                        Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

                        Самое читаемое