Кроссплатформенный многопоточный TCP/IP сервер на C++

Как-то раз встала задача по написанию простого и быстрого многопоточного TCP/IP сервера на C++ и при этом, чтобы работал из под Windows и Linux без требования как-либо изменять код за пределами класса самого сервера. Ранее, на чистом C++ без библиотек вроде Qt, Tcp-сервер не писал, и предвещал себе долгое время мучений с платформо-зависимостью. Но как оказалось всё гораздо проще чем казалось на первый взгляд, ведь в основном интерфейсы сокетов обоих систем похожи как две капли воды и различаются лишь в мелких деталях.


Итак класс сервера и клиента выглядит следующим образом:


TcpServer.h


#ifndef TCPSERVER_H
#define TCPSERVER_H

#include <cstdint>
#include <functional>
#include <thread>
#include <list>

#ifdef _WIN32 // Windows NT

#include <WinSock2.h>

#else // *nix

#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>

#endif

//Буффер для приёма данных от клиента
static constexpr uint16_t buffer_size = 4096;

struct TcpServer {
    class Client;
     //Тип Callback-функции обработчика клиента
    typedef std::function<void(Client)> handler_function_t;
     //Статус сервера
    enum class status : uint8_t {
        up = 0,
        err_socket_init = 1,
        err_socket_bind = 2,
        err_socket_listening = 3,
        close = 4
    };

private:
    uint16_t port; //Порт
    status _status = status::close;
    handler_function_t handler;

    std::thread handler_thread;
    std::list<std::thread> client_handler_threads;
    std::list<std::thread::id> client_handling_end;

#ifdef _WIN32 // Windows NT
    SOCKET serv_socket = INVALID_SOCKET;
    WSAData w_data;
#else // *nix
    int serv_socket;
#endif

    void handlingLoop();

public:
    TcpServer(const uint16_t port, handler_function_t handler);
    ~TcpServer();

    //! Set client handler
    void setHandler(handler_function_t handler);

    uint16_t getPort() const;
    uint16_t setPort(const uint16_t port);

    status getStatus() const {return _status;}

    status restart();
    status start();
    void stop();

    void joinLoop();
};

class TcpServer::Client {
#ifdef _WIN32 // Windows NT
    SOCKET socket;
    SOCKADDR_IN address;
    char buffer[buffer_size];
public:
    Client(SOCKET socket, SOCKADDR_IN address);
#else // *nix
    int socket;
    struct sockaddr_in address;
    char buffer[buffer_size];
public:
    Client(int socket, struct sockaddr_in address);
#endif
public:
    Client(const Client& other);
    ~Client();
    uint32_t getHost() const;
    uint16_t getPort() const;

    int loadData();
    char* getData();

    bool sendData(const char* buffer, const size_t size) const;
};

#endif // TCPSERVER_H

Как можно заметить различия минимальны помимо разных подключаемых заголовочных файлов различаются разве что тип сокета — SOCKET для Windows и (как бы странно это не выглядело) int для Linux. Разница здесь лишь в том что Linux использует стандартный int для хранения данных сокета, в то время как в Windows задекларирован собственный тип который относительно архитектуры принимает разный размер и значность целочисленного типа, что можно увидеть в оригинальных заголовочных файлах:


//file _socket_types.h
//...
#if 1
typedef UINT_PTR    SOCKET;
#else
typedef INT_PTR     SOCKET;
#endif
//...

//file BaseTsd.h
//...
#if defined(_WIN64)
 typedef unsigned __int64 UINT_PTR;
#else
 typedef unsigned int UINT_PTR;
#endif
//...
#if defined(_WIN64) 
 typedef __int64 INT_PTR; 
#else 
 typedef int INT_PTR;
#endif
//...

Так же в Windows части TcpServer-хедера присутствует структура для обозначения используемой версии WinSocket — WSAData w_data;(см. WSAData)


Перейдём к реализации сервера:


TcpServer.cpp


#include "../hdr/TcpServer.h"
#include <chrono>

//Конструктор принимает:
//port - порт на котором будем запускать сервер
//handler - callback-функция запускаямая при подключении клиента
//          объект которого и передают первым аргументом в callback
//          (пример лямбда-функции: [](TcpServer::Client){...do something...})
TcpServer::TcpServer(const uint16_t port, handler_function_t handler) : port(port), handler(handler) {}

//Деструктор останавливает сервер если он был запущен
//и вычищает заданную версию WinSocket
TcpServer::~TcpServer() {
  if(_status == status::up)
    stop();
#ifdef _WIN32 // Windows NT
    WSACleanup ();
#endif
}

//Задаёт callback-функцию запускаямую при подключении клиента
void TcpServer::setHandler(TcpServer::handler_function_t handler) {this->handler = handler;}

//Getter/Setter порта
uint16_t TcpServer::getPort() const {return port;}
uint16_t TcpServer::setPort( const uint16_t port) {
    this->port = port;
    restart(); //Перезапустить если сервер был запущен
    return port;
}

//Перезапуск сервера
TcpServer::status TcpServer::restart() {
    if(_status == status::up)
      stop ();
    return start();
}

// Вход в поток обработки соединений
void TcpServer::joinLoop() {handler_thread.join();}

//Загружает в буфер данные от клиента и возвращает их размер
int TcpServer::Client::loadData() {return recv(socket, buffer, buffer_size, 0);}
//Возвращает указатель на буфер с данными от клиента
char* TcpServer::Client::getData() {return buffer;}
//Отправляет данные клиенту
bool TcpServer::Client::sendData(const char* buffer, const size_t size) const {
  if(send(socket, buffer, size, 0) < 0) return false;
  return true;
}

#ifdef _WIN32 // Windows NT
//Запуск сервера
TcpServer::status TcpServer::start() {
    WSAStartup(MAKEWORD(2, 2), &w_data); //Задаём версию WinSocket

    SOCKADDR_IN address; //Структура хост/порт/протокол для инициализации сокета
    address.sin_addr.S_un.S_addr = INADDR_ANY; //Любой IP адресс
    address.sin_port = htons(port); //Задаём порт
    address.sin_family = AF_INET; //AF_INET - Cемейство адресов для IPv4

    //Инициализируем наш сокет и проверяем корректно ли прошла инициализация
    //в противном случае возвращаем статус с ошибкой
    if(static_cast<int>(serv_socket = socket(AF_INET, SOCK_STREAM, 0)) == SOCKET_ERROR) return _status = status::err_socket_init;
    //Присваиваем к сокету адресс и порт и проверяем на коректность сокет
    //в противном случае возвращаем статус с ошибкой
    if(bind(serv_socket, (struct sockaddr*)&address, sizeof(address)) == SOCKET_ERROR) return _status = status::err_socket_bind;
    //Запускаем прослушку и проверяем запустилась ли она
    //в противном случае возвращаем статус с ошибкой
    if(listen(serv_socket, SOMAXCONN) == SOCKET_ERROR) return _status = status::err_socket_listening;

    //Меняем статус, запускаем обработчик соединений и возвращаем статус
    _status = status::up;
    handler_thread = std::thread([this]{handlingLoop();});
    return _status;
}

//Остановка сервера
void TcpServer::stop() {
    _status = status::close; //Изменение статуса
    closesocket (serv_socket); //Закрытие сокета
    joinLoop(); //Ожидание завершения
    for(std::thread& cl_thr : client_handler_threads) //Перебор всех клиентских потоков
        cl_thr.join(); // Ожидание их завершения
    client_handler_threads.clear (); // Очистка списка клиентских потоков
    client_handling_end.clear (); // Очистка списка идентификаторов завершённых клиентских потоков
}

// Функиця обработки соединений
void TcpServer::handlingLoop() {
    while(_status == status::up) {
        SOCKET client_socket; //Сокет клиента
        SOCKADDR_IN client_addr; //Адресс клиента
        int addrlen = sizeof(client_addr); //Размер адреса клиента
        //Получение сокета и адреса клиента
        //(если сокет коректен и сервер зарущен запуск потока обработки)
        if ((client_socket = accept(serv_socket, (struct sockaddr*)&client_addr, &addrlen)) != 0 && _status == status::up){
            client_handler_threads.push_back(std::thread([this, &client_socket, &client_addr] {
                handler(Client(client_socket, client_addr)); //Запуск callback-обработчика
                //Добавление идентификатора в список идентификаторов завершённых клиентских потоков
                client_handling_end.push_back (std::this_thread::get_id()); 
            }));
        }
        //Очистка отработанных клиентских потоков
        if(!client_handling_end.empty())
          for(std::list<std::thread::id>::iterator id_it = client_handling_end.begin (); !client_handling_end.empty() ; id_it = client_handling_end.begin())
            for(std::list<std::thread>::iterator thr_it = client_handler_threads.begin (); thr_it != client_handler_threads.end () ; ++thr_it)
              if(thr_it->get_id () == *id_it) {
                thr_it->join();
                client_handler_threads.erase(thr_it);
                client_handling_end.erase (id_it);
                break;
              }

        std::this_thread::sleep_for(std::chrono::milliseconds(50));
    }
}

// Конструктор клиента по сокету и адресу
TcpServer::Client::Client(SOCKET socket, SOCKADDR_IN address) : socket(socket), address(address) {}
// Конструктор копирования
TcpServer::Client::Client(const TcpServer::Client& other) : socket(other.socket), address(other.address) {}

TcpServer::Client::~Client() {
    shutdown(socket, 0); //Обрыв соединения сокета
    closesocket(socket); //Закрытие сокета
}

// Геттеры хоста и порта
uint32_t TcpServer::Client::getHost() const {return address.sin_addr.S_un.S_addr;}
uint16_t TcpServer::Client::getPort() const {return address.sin_port;}

#else // *nix

//Запуск сервера (по аналогии с реализацией для Windows)
TcpServer::status TcpServer::start() {
    struct sockaddr_in server;
    server.sin_addr.s_addr = INADDR_ANY;
    server.sin_port = htons( port );
    server.sin_family = AF_INET;
    serv_socket = socket(AF_INET, SOCK_STREAM, 0);

    if(serv_socket == -1) return _status = status::err_socket_init;
    if(bind(serv_socket,(struct sockaddr *)&server , sizeof(server)) < 0) return _status = status::err_socket_bind;
    if(listen(serv_socket, 3) < 0)return _status = status::err_socket_listening;

    _status = status::up;
    handler_thread = std::thread([this]{handlingLoop();});
    return _status;
}

//Остановка сервера
void TcpServer::stop() {
    _status = status::close;
    close(serv_socket);
    joinLoop();
    for(std::thread& cl_thr : client_handler_threads)
        cl_thr.join();
    client_handler_threads.clear ();
    client_handling_end.clear ();
}

// Функиця обработки соединений (по аналогии с реализацией для Windows)
void TcpServer::handlingLoop() {
    while (_status == status::up) {
        int client_socket;
        struct sockaddr_in client_addr;
        int addrlen = sizeof (struct sockaddr_in);
        if((client_socket = accept(serv_socket, (struct sockaddr*)&client_addr, (socklen_t*)&addrlen)) >= 0 && _status == status::up)
            client_handler_threads.push_back(std::thread([this, &client_socket, &client_addr] {
                handler(Client(client_socket, client_addr));
                client_handling_end.push_back (std::this_thread::get_id());
            }));

        if(!client_handling_end.empty())
          for(std::list<std::thread::id>::iterator id_it = client_handling_end.begin (); !client_handling_end.empty() ; id_it = client_handling_end.begin())
            for(std::list<std::thread>::iterator thr_it = client_handler_threads.begin (); thr_it != client_handler_threads.end () ; ++thr_it)
              if(thr_it->get_id () == *id_it) {
                thr_it->join();
                client_handler_threads.erase(thr_it);
                client_handling_end.erase (id_it);
                break;
              }

        std::this_thread::sleep_for(std::chrono::milliseconds(50));
    }
}

// Конструктор клиента по сокету и адресу
TcpServer::Client::Client(int socket, struct sockaddr_in address) : socket(socket), address(address) {}
// Конструктор копирования
TcpServer::Client::Client(const TcpServer::Client& other) : socket(other.socket), address(other.address) {}

TcpServer::Client::~Client() {
    shutdown(socket, 0); //Обрыв соединения сокета
    close(socket); //Закрытие сокета
}

// Геттеры хоста и порта
uint32_t TcpServer::Client::getHost() {return address.sin_addr.s_addr;}
uint16_t TcpServer::Client::getPort() {return address.sin_port;}

#endif

Реализация для Linux и Windows практически идентична за исключением некотрых мест, обусловленных разве что различными структурами хранящим адреса(struct sockaddr_in/SOCKADDR_IN, struct sockaddr/SOCKADDR) и сокеты(int/SOCKET), а так же наличием у Windows объекта версии WinSocket(WSAData).


Пример использования:


main.cpp


#include "server/hdr/TcpServer.h"

#include <iostream>

//Парсер ip в std::string
std::string getHostStr(const TcpServer::Client& client) {
  uint32_t ip = client.getHost ();
  return std::string() + std::to_string(int(reinterpret_cast<char*>(&ip)[0])) + '.' +
         std::to_string(int(reinterpret_cast<char*>(&ip)[1])) + '.' +
         std::to_string(int(reinterpret_cast<char*>(&ip)[2])) + '.' +
         std::to_string(int(reinterpret_cast<char*>(&ip)[3])) + ':' +
         std::to_string( client.getPort ());
}

int main() {
  //Создание объекта TcpServer с передачей аргументами порта и лябда-фунции для обработк клиента
  TcpServer server( 8080,

      [](TcpServer::Client client){

          //Вывод адреса подключившего клиента в консоль
          std::cout<<"Connected host:"<<getHostStr(client)<<std::endl;

          //Ожидание данных от клиента
          int size = 0;
          while (size == 0) size = client.loadData ();

          //Вывод размера данных и самих данных в консоль
          std::cout
              <<"size: "<<size<<" bytes"<<std::endl
              << client.getData() << std::endl;

          //Отправка ответа клиенту
          const char answer[] = "Hello World from Server";
          client.sendData(answer, sizeof (answer));
      }

  );

  //Запуск серевера
  if(server.start() == TcpServer::status::up) {
    //Если сервер запущен вывести сообщение и войти в поток ожиданий клиентов
      std::cout<<"Server is up!"<<std::endl;
      server.joinLoop();
  } else {
    //Если сервер не запущен вывод кода ошибки и заверешение программы
      std::cout<<"Server start error! Error code:"<< int(server.getStatus()) <<std::endl;
      return -1;
  }

}

Ссылка на GitHub


Использовавашиеся статьи:


Похожие публикации

AdBlock похитил этот баннер, но баннеры не зубы — отрастут

Подробнее
Реклама

Комментарии 35

    +4
    while(_status == status::up) {


    Это UB
      0
      Что-то не улавливаю. Поясните, плс?
        0

        Гонка

          0

          Во-первых, у компилятора могут оказаться основания полагать, что поле _status в цикле не меняется, из-за чего цикл превратится в вечный. Кстати, бесконечный цикл — тоже UB, что разрешает компилятору корёжить программу дальше.


          Во-вторых, значение status::up может "застрять" в кеше процессора, из-за чего цикл сделает кучу холостых операций.


          Если один поток записал в переменную некоторое значение — это ещё не означает, что другой поток именно это значение и прочитает.

            +3
            Во-вторых, значение status::up может «застрять» в кеше процессора, из-за чего цикл сделает кучу холостых операций.


            Это, кстати, распространённый миф, кэши процессора когерентны и если вы в одну кэш линию что-то записали, то она «разъедется» на все ядра.
            Другое дело что никто не гарантирует что запись будет осуществлена в том месте где вы написали (процессор/компилятор вольны переставлять инструкции) или вообще осуществлена — никто же не сказал что это атомик, можно вообще запись в кэш/память выкинуть или на регистрах оставить.
              +1
              Другое дело что никто не гарантирует что запись будет осуществлена в том месте где вы написали

              Соббсно это и будет наблюдаться как эффект "застревания" в кэше. Инструкции, меняющие переменную могут выполнится хз когда, и все это время цикл в другом потоке будет в холостую гонять.

            +2
            Нельзя менять/читать значение неатомарной переменной из разных потоков. Если компилятор сможет доказать что в этом потоке переменная не меняется, он может выкинуть проверку нафиг или считать значение из памяти один раз, заменив проверку в цикле на if перед бесконечным циклом.
              –1
              да хватит, хватит уже, первого комента достаточно.
              Я уже так привык, что race condition всегда называют своим термином, что и думать забыл о нём, как об UB.
                0

                Ну, для такого и придумали volatile. Что впрочем не отменяет необходимость синхронизации чтения/записи

                  +1

                  Так-то volatile != atomic. Но да, вы правы, использовался он чаще именно для этого

                    +2
                    Ну нет же, не для этого придумали volatile. И даже deprecated ему собираются прикрутить в таких сценариях.
                      0
                      Вроде только в C++, в нормальном языке вроде оставляют, нет?
                        0
                        Хз что там в «нормальном» языке, но использование volatile для многопоточки — вещь, за которую надо увольнять. Да, volatile защищает от оптимизаций компилятора, но он не делает UB код меньшим UB, просто заметает баги под ковер. Ровно до тех пор пока вы не запустите свой «офигенный» «быстрый» алгоритм на volatile и reinterpret_cast'ах на каком-нибудь ARM.
                        К сожалению, x86 и друзья являются «strong ordered» архитектурами и поймать instruction reorder там сложно (вот хорошая статья, демонстрирующая, как его всё-таки словить), а значит ваш супер-мега «локфри» на волатайлах будет успешно «работать», возможно в продакшне и не один год.
                          0
                          Вы такой категоричный! Не поверите, мой код работает как раз на «каких-то» ARM'ах, и volatile там вполне достаточно. На более продвинутых процессорах — там да, нужны барьеры памяти, даже в MMIO регистр записать. Но на тех платформах, работа с которыми заполняет мои дни — нет. Поэтому я использую «нормальный язык», в котором reintrpret_cast — имя переменной. И из него работающий инструмент не выбрасывают.
                            +1
                            Вы же сейчас о ключевом слове volatile, который вешают на переменную, а не о штуках типа asm volatile("" ::: «memory»);?
                            Если да, то у меня для вас плохие новости — либо у вас где-то есть барьер, который маскирует проблему, либо вы просто ещё не нашли багу.
                            Ключевое слово volatile не спасает от проблем многопоточности на relaxed-ordered архитектурах, потому что инструкции может переставлять процессор, а не компилятор (точнее, это могут делать оба, но volatile запрещает только второму).
                            Другое дело, что стандарт «нормального языка» ничего не говорит о многопоточности и memory ordering (поправьте, если ошибаюсь), а значит оставляет вас и компилятор в серой зоне — компилятор может вам помочь и напихать барьеров памяти, видя volatile, а может и не помогать и тогда ваш код будет содержать трудноотлаживаемую багу.
                              0
                              Я дико рекомендую вот этот цикл статей на Хабре, развеивает кучу мифов и даёт представление о том куда и на что смотреть, если поймали data race habr.com/ru/post/195770.
              +10

              Почему в многопоточном сервере я не вижу ни одного мьютекса или другого примитива синхронизации?


              Зато я сходу вижу гонку между TcpServer::stop и TcpServer::handlingLoop...

                0
                Если отвечать на вопрос в общем (не относительно кода автора поста), то это не всегда нужно, например при параллельной обработке запросов может быть что просто ничего не надо синхронизировать.
                  +1
                  Почему в многопоточном сервере я не вижу ни одного мьютекса или другого примитива синхронизации?


                  Блокировки вообще не обязательны для многопоточного приложения. Достаточно CAS и других атомарных операций.
                    +2

                    Но CAS и других атомарных операций я тоже не вижу...

                  +5
                  Как-то раз встала задача по написанию простого и быстрого многопоточного TCP/IP сервера на C++ и при этом, чтобы работал из под Windows и Linux без требования как-либо изменять код за пределами класса самого сервера.

                  Можно нескромный вопрос: "встала задача" по учебе (в качестве лабораторной или курсовой) или же по работе?

                    0
                    bool TcpServer::Client::sendData(const char* buffer, const size_t size) const {
                    if(send(socket, buffer, size, 0) < 0) return false;
                    return true;
                    }


                    А если send вернет больше 0 или 0?
                      +3

                      return true;

                        +2
                        А если при этом send послал меньше чем просили?
                          +3

                          Всё равно true!

                            +3
                            Да. Но это ведь ошибка, вы не послали все данные в сеть и вернули, что «всё хорошо».
                              +8

                              Это не я!

                      –5
                      К сожалению, в современном мире такие сервера уже почти никому не нужны по причине отсутствия шифрования данных.
                        +4

                        это далеко не самая боььшая проблема, плюс есть много мест, где шифрование делается на уровнях выше

                        +1
                        for(std::list<std::thread::id>::iterator id_it = client_handling_end.begin (); !client_handling_end.empty(); id_it = client_handling_end.begin())
                        for(std::list<std::thread>::iterator thr_it = client_handler_threads.begin (); thr_it != client_handler_threads.end (); ++thr_it)
                        if(thr_it->get_id () == *id_it) {…

                        Может лучше map юзать? Квадратичная сложность — так себе идея
                          +1

                          Там на самом деле надо в список client_handling_end сразу поток класть, а не его id. И из client_handler_threads поток исключать сразу же, а не в конце.

                          +23
                          Каждый такое писал, но не каждый рискнул выложить.
                            0
                            Согласен, но замечу, что самое интересное начинается, кода это действительно становится рабочим проектом, и ты начинаешь смотреть в сторону pool, epool, WSAPool, разбираешься с реактором и проактором… И для следующего проекта берешь либо boost либо poco :-)
                              +1
                              есть же libuv, где при желании можно запустить N лупов по количеству ядер и получить многопоточный сервер вместо асинхронного event-based.
                              0

                              100-пудово. Больше 10 лет тому назад в качестве домашнего задания при собеседовании получил подобное задание. К тому времени во всю использовал уже boost.asio.
                              Начал делать, думал за пару вечеров управиться. Но писать плохо не хотелось. Гляжу, сроки задания едут, а выходит что-то типа упрощенного asio. А тут и другой работодатель меня и нашел :)

                            Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

                            Самое читаемое