Search
Write a publication
Pull to refresh

О boost::asio::ip::tcp::iostream

Появилась задача написать некий сервер.
Если уж выбор пал на С++, то надо и библиотеку применить, желательно козырную — boost в самый аккурат. Про неё столько пишут и говорят… да еще с таким трепетом и так восторженно, что я не мог не проникнуться идеей приобщиться.

Мысль, что в бусте нет библиотеки для работы с сокетами даже не возникала — она должна там быть. И правда, есть — это boost::asio. Почитав доки я восхитился.
Самое восхитительное, что есть удивительный класс boost::asio::ip::tcp::iostream! Вот его-то и буду юзать в сервере — в чистом сиплюсплюсном стиле потоковый ввод-вывод… Лепота!
Но следует учесть, что boost::asio::ip::tcp::iostream подчиняется законам жанра и по своему интерпретирует «перевод каретки», «новая строка» и т.д. Следовательно бинарные данные пересылать не получится. Для этого был написан класс base64 для кодирования бинарных данных в текстовые. Оверхед траффика в 33% для моей задачи вполне допустим.

Итак, сервер должен обслуживать коннекты клиентов, авторизовать их и что-то делать. Банально.
Сервер должен быть многопоточным. Чтобы на каждый коннект свой тред. Должен быть такой диспетчер-криэйтор — он будет принимать коннекты и создавать треды, которые будут работать с клиентами. Этот диспетчер не должен следить на жизнью тредов — они сами должны разрешать все ситуации. Количество тредов не ограничено — предполагается, что процесс обмена с клиентом будет не столь длителен. А если нет — отключим газ (с) к.ф.
В документации asio есть много отличных примеров. Немного подпилив получим это:

server.cpp
int main(int argc, char** argv)
{
...
boost::asio::io_service io_service;    
boost::asio::ip::tcp::endpoint endpoint(boost::asio::ip::tcp::v4(), port);
boost::asio::ip::tcp::acceptor acceptor(io_service, endpoint);

    // крутимся  
    for ( ;; ) 
    {
      boost::asio::ip::tcp::iostream* stream = new boost::asio::ip::tcp::iostream;
      acceptor.accept(*stream->rdbuf());  // ждем-c...

      std::cout << "\nClient connected" << std::endl;
        
      // создаем тред job, параметр - указатель на стрим, на котором клиент 
      boost::thread thr(boost::bind(&action::job, stream));
      thr.detach(); // пущай себе летает...
    }

...
return 0;
}

Поток создается динамически, и ответственность за его прибитие лежит на порожденном треде.

Работа с клиентом
thread.cpp
namespace action
{
  void job(boost::asio::ip::tcp::iostream* stream)
  {
    std::string from_remote_side;

    *stream >> from_remote_side;  // ждем от клиента
    if (from_remote_side == "me") // явно упрощенная процедура авторизации ;)
    {
       // здесь остальная реализация протокола
    } 
  }
}


Вот тут — *stream >> from_remote_side; засада. Ввод синхронный, значит тред будет ждать когда что-то там прилетит. А если не прилетит, а если вдруг много коннектов и, естественно, создано много тредов и все они ждут… Ни на какие мысли не наводит? =)
Ограничить время ожидания ввода? Да! Надо вкрутить какой-нибудь таймер, который бы делал что-нибудь полезное — разрывал соединение, например.

Сторожевой таймер
Его задача закрыть поток. Это вызовет завершение операции потокового ввода.
Таймер запускается в отдельном треде из треда, который обслуживает запросы клиента.
Таймер, выждав заданное время, закроет стрим, если job() за время ожидания не установит флаг завершения операции done.
С флагом не все гладко. Рассмотрим ситуацию, когда job() завершится раньше, чем таймер.
По завершении job() адрес, по которому хранится done, станет невалидным, вочдог попытается прочесть данные и access violation.
Объявлять done глобальной нельзя. Так как за неё могут конкурировать другие треды, а этот факт потянет за собой мьютексы и прочие локи… Зачем плодить лишние сущности?
Нужно заставить job() дать возможность таймеру проверить done, т.е. не завершаться раньше сторожевого таймера. А значит нужен какой-то механизм синхронизации. В библиотеке boost::thread есть класс barrier. Он создается с параметром количества процессов, которые должны на каком-то этапе ожидать завершения работы остальных процессов, например, для получения всех необходимых результатов работа всех процессов. Как только все процессы подойдут к точке ожидания, все они разблокируются и начнут работать дальше. В нашем случае процессы просто завершатся.
Класс barrier создается в job(), а watch_dog() получает указатель на него в качестве параметра.

Модифицируем job():
namespace action
{
  void job(boost::asio::ip::tcp::iostream* stream)
  {
    int done = 0;
    int time_out = 30;
    boost::barrier barrier(2);  // два треда должны ждать друг друга
    std::string from_remote_side;
    ...

// запускаем сторожевой таймер на время достаточное, например, для 
// аутентификации клиента (в секундах)
    
    boost::thread thr(boost::bind(watch_dog, time_out, stream, &done, &barrier));

    *stream >> from_remote_side;  // ждем от клиента
    if (from_remote_side == "me") // явно упрощенная процедура авторизации ;)
    {
      done = 1;

       // здесь остальная реализация протокола
    } 

    if ( done ) // если нет, то поток уже закрыт сторожевым таймером
    {
      stream->close();
    }

    delete [] stream; // !!!
    barrier.wait();  // если завершаемся раньше, чем сторожевой таймер, то ждем его
    std::cout << "The end" << std::endl;
  }
}

Cторожевой таймер использует класс deadline_timer из библиотеки boost::asio и класс seconds из boost::date_time.

thread.cpp
namespace action
{
  void watch_dog ( int wait_time, boost::asio::ip::tcp::iostream* stream, 
                   int* done, boost::barrier* barrier )
  {
    boost::asio::io_service io;
    boost::asio::deadline_timer t(io, boost::posix_time::seconds(wait_time));

    t.wait();             // за это время должна завершиться операция
 
    if ( !*done )
    {
      stream->close();   // закрыть поток
      std::cout << "Goof!" << std::endl;
    }

    barrier->wait();         // ждем когда отработает родительский тред
  }
}


Важно: job() и watch_dog() должны быть в одном файле.
Tags:
Hubs:
You can’t comment this publication because its author is not yet a full member of the community. You will be able to contact the author only after he or she has been invited by someone in the community. Until then, author’s username will be hidden by an alias.