Пишем кроссплатформенную библиотеку на C++ для работы с именованными каналами

Предыстория


В процессе разработки одного B2B проекта возникла необходимость обращения к нашему приложению из различных систем вроде 1C, Oracle, MS Sql Server. Первый «универсальный» вариант, который пришел в голову – использовать веб-сервисы. Но, к сожалению, у вышеупомянутых систем несколько разное понимание этого стандарта (например, не факт, что тот же oracle и 1C будут понимать друг друга), кроме того нам не хотелось раздувать проект использованием тяжелых сторонних библиотек (на тот момент мы уже использовали boost и Intel TBB, притягивать же еще Qt или gSoap нам не хотелось).
Поэтому было решено писать свой велосипед.

Вышеупомянутые системы имеют механизм расширения возможностей внутренних языков с помощью dll-плагинов, так называемых внешних компонент. Идея их использования для общения с внешним миром не является новой – на хабре уже были статьи по данной теме:
habrahabr.ru/post/149956
habrahabr.ru/company/Centrobit/blog/165441
habrahabr.ru/post/163859
Итак, мы решили использовать внешние компоненты для решения задачи. Затем нам следовало решить, каким способом, да еще и желательно кроссплатформенным (т.е. чтобы компоненту и приложение под разные платформы можно было бы собирать простой перекомпиляцией), мы будем связывать компоненту с нашим приложением. Проще всего было бы не заморачиваться и найти готовые IPC-решения на эту тему (QLocalSocket в Qt, Named Pipes в ACE), но опять-таки из соображений сокращения числа используемых библиотек было решено написать свой маленький велосипед.
Идеальным решением была бы boost::interprocess::message_queue, но она была не простой очередью, а очередью с приоритетами, что несколько снижало производительность (стремление к высокой производительности также было одной из причин, почему мы отказались от web-сервисов). Кроме того, нам было нужно некое подобие клиент-серверного решения.
От сокетов через loopback мы отказались сразу – у них слишком большой overhead в windows (см. habrahabr.ru/post/81067 ). Поэтому мы стали смотреть в сторону именованных каналов windows и domain sockets в unix.

Часть 1. Составляем ТЗ


Фактически, смысл наших внешних компонент заключался лишь в получении некоего xml-сообщения из системы и отправке ее в канал (конечно, связь должна быть двусторонней, но механизм передачи сообщений назад в систему мы не всегда реализовывали через компоненты). Поэтому мы решили, что не будем погружаться в дебри асинхронной работы с каналами, а нам хватит лишь реализации обычных блокирующих операций чтения-записи (и обработки клиентов). Требования к функциям чтения-записи были следующие:
  • Функции должны принимать два параметра void* указатель на буфер в который надо писать или из которого надо читать и размер данных которые надо прочитать/отправить
  • В случае любой ошибки (например, число принятых или отправленных байт не равно переданному в параметре) функция должна бросить исключение std::runtime_error (да, обработка исключений немного снижает производительность, можно бы использовать коды ошибок)
  • В рамках одного класса реализуется и работа с серверными каналами (акцепт) и работа с клиентскими. В случае создания серверного канала функции чтения-записи не должны работать, а должны бросать исключения

Последний пункт требует пояснения – серверный канал нужен лишь для того, чтобы в цикле выполнять блокирующий метод ожидания клиента, возвращаемым значением которого является объект-канал с подключенным клиентом (по такой идеологии работает напиример функция accept для POSIX сокетов. В windows API же с каналами все наоборот – в случае блокирующей работы функция ожидания просто останавливает поток до тех пор, пока на том конце канала не появится клиент, затем его нужно обработать, а для следующих клиентов создать новый слушающий канал и т.д. )
В результате был сочинен следующий абстрактный класс INamedPipe от которого затем наследуются классы WinNamedPipe и POSIXNamedPipe
#pragma once
#include <stdexcept>
	class INamedPipe
	{
	protected:
                std::string _name;
		bool _server;
		INamedPipe(){};
		virtual void  internalReadBytes(void* buf,size_t size)=0;
		virtual void internalWriteBytes(const void* buf,size_t size)=0;
		virtual void internalFlush()=0;
	public:

		INamedPipe(const std::string prefix, const std::string& name, bool server):_name(prefix),_server(server)
		{
			_name.append(name);
		}
		void ConnectOrOpen()
		{
				if(_server)
					open();
				else
					connect();
		}
		virtual void ReadBytes(void* buf,size_t size)
		{
			if(!_server)
			{
				if(size<1)
					throw std::out_of_range("Size is 0 or less");
				internalReadBytes(buf,size);
			}
			else
				throw std::runtime_error("This operation is not supported on server pipe");
		}
		virtual void WriteBytes(const void* buf,size_t size)
		{
			if(!_server)
			{
				if(size<1)
					throw std::out_of_range("Size is 0 or less");
				internalWriteBytes(buf,size);

Конструктор INamedPipe принимает три параметра – префикс пути, где будет расположен канал в системе, имя канала, и параметр указывающий тип канала – клиентский или серверный (было решено объединить оба типа в одном классе).
Функция ConnectOrOpen() вызывает необходимый метод для открытия канала в зависимости от его типа (для сервера – open(), для клиента — connect()).

Часть 2. Пишем windows-реализацию WinNamedPipe


Перед реализацией мы решили погуглить примеры. Первым делом нагуглились туториалы из MSDN ( msdn.microsoft.com/ru-ru/library/windows/desktop/aa365592, msdn.microsoft.com/ru-ru/library/windows/desktop/aa365588 ) и документация на русском — www.frolov-lib.ru/books/bsp/v27/ch2_3.htm. С помощью этих статей и была написана первая версия класса WinNamedPipe. Спустя некоторое время мы нашли хорошую статью по написанию класса работы с именованными каналами на примере создания внешней компоненты для MetaTrader ( www.mql5.com/ru/articles/115 ), которую тоже рекомедуем к прочтению.
В конструкторе WinNamedPipe уже задан префикс – согласно требованиям MSDN полный путь к именованному каналу должен иметь вид «\\имя_компьютера\pipe\имя_канала» и в случае работы с именованным каналом на локальной машине имеет вид «\\.\pipe\pipe_name» (да, именованные каналы в Windows позволяют обмениваться информацией и по сети).
В классе также присутствует конструктор, принимающий на вход параметр типа HANDLE – число, идентифицирующее в операционной системе какую-нибудь структуру или объект (с помощью данного конструктора мы сможем создавать новый экземпляр класса WinNamedPipe зная только HANDLE уже существующего именованного канала). Данный конструктор будет использоваться в функции WaitForConnection() (см. далее)
Реализация open() выглядит следующим образом:
void WinNamedPipe::open(){
_hPipe = CreateNamedPipe( 
		(LPSTR)_name.data(),             // имя создаваемого канала
		PIPE_ACCESS_DUPLEX,       // разрешен доступ на чтение и запись 
		PIPE_TYPE_BYTE |   //читаем побайтово
		PIPE_WAIT,                // блокирующий режим 
		PIPE_UNLIMITED_INSTANCES, // число экземпляров канала неограниченно  
		BUFFER_PIPE_SIZE,                  // размер буфера исходящих сообщений 
		BUFFER_PIPE_SIZE,                  // размер буфера входящих сообщений 
		0,                        // тайм-аут ожидания (0=бесконечно) 
		NULL);                    // атрибут безопасности по умолчанию – доступ разрешен всем

	if (_hPipe == INVALID_HANDLE_VALUE) 
	{
		THROW_LAST_ERROR("CreateNamedPipe failed");
	}

Заметим, что размер буфера BUFFER_PIPE_SIZE мы задаем в самом начале файла. Фактически, его размер не влияет на максимальный размер передаваемых данных – в нашем проекте мы успешно передавали куски, которые были больше буфера в несколько раз.
Особое внимание следует обратить на первый параметр: (LPSTR)_name.data()
Поскольку имя канала в нашем классе хранится в std::string, то для корректной его передачи в WinAPI функции надо устанавливать в свойствах проекта в Visual Studio «Use Multi-Byte character Set» (подробнее см. статью habrahabr.ru/post/164193 ).
Подробное описание параметров функции CreateNamedPipe можно прочитать в MSDN ( msdn.microsoft.com/en-us/library/windows/desktop/aa365150 и на www.frolov-lib.ru/books/bsp/v27/ch2_3.htm ).
Клиентское подключение к каналу осуществляется с помощью функции connect(), которая имеет следующую реализацию:
void WinNamedPipe::connect()
	for(;;)
	{
		WaitNamedPipe((LPSTR)_name.data(), NMPWAIT_WAIT_FOREVER);
		_hPipe = CreateFile( 
			(LPSTR)_name.data(),   // имя канала 
			GENERIC_READ |  // доступ на чтение и запись 
			GENERIC_WRITE, 
			0,              
			NULL,           // доступ по умолчанию разрешен всем
			OPEN_EXISTING,  // открываем существующий канал 
			0,               
			NULL);           
			// Break if the pipe handle is valid or error!=232
		 if (_hPipe != INVALID_HANDLE_VALUE||GetLastError() != ERROR_PIPE_BUSY) 
         break; 
	}
	if (_hPipe == INVALID_HANDLE_VALUE) 
			THROW_LAST_ERROR("Could not open pipe");
	DWORD dwMode = PIPE_TYPE_BYTE; 
	BOOL fSuccess = SetNamedPipeHandleState( 
		_hPipe,    // указатель на созданный канал  
		&dwMode,  //  параметры чтения-записи
		NULL,     // максимальный размер чтения и записи неограничен 
		NULL);    // таймаут не ограничен 
	if ( ! fSuccess) 
	{
		THROW_LAST_ERROR("SetNamedPipeHandleState failed"); 
	}
}

Сначала с помощью функции WaitNamedPipe мы встаем на «вечное» ожидание свободного экземпляра серверного канала, затем с помощью CreateFile соединяемся с ним. После этого мы проверяем, удачно ли прошло соединение. Если указатель на объект-канал валиден (или произошла ошибка отличная от ERROR_PIPE_BUSY – ошибки, обозначающей, что свободный канал отсутствует) мы выходим из цикла и настраиваем нужный нам режим работы с каналом. В противном случае цикл повторяется до тех пор, пока подключиться нормально не удастся или же произойдет ошибка, отличная от ERROR_PIPE_BUSY. Если серверный именованный канал с данным именем не существует, то WaitNamedPipe завершится сразу, GetLastError вернет код ошибки 2 (The system cannot find the file specified) и connect() бросит исключение.
Согласно нашей задумке WaitForConnection() должна возвращать указатель на новый WinNamedPipe, к которому уже присоединен клиент и к которому можно применять функции чтения-записи. Однако в Windows стандартный механизм приема клиентов осуществляется по-другому – функция ConnectNamedPipe (получающая на вход HANDLE созданного канала) просто блокирует поток до тех пор, пока на другом конце канала не появится клиент. Затем, чтобы не потерять других клиентов, нужно создать новый слушающий канал и передать его в данную функцию и т.д. Поэтому для реализации нашей задумки приходится из WaitForConnection вызывать open() еще раз.
Функция WaitForConnection() выглядит следующим образом:
WinNamedPipe* WinNamedPipe::WaitForConnection()
{
	if(_server)	 
	{
		DWORD error;
		if (ConnectNamedPipe(_hPipe, NULL)||(error=GetLastError())==ERROR_PIPE_CONNECTED)
		{
			HANDLE client=_hPipe;
			open();
			return new WinNamedPipe(client);
		}
		else
		{
			THROW_LAST_ERROR("WaitForConnection failed");
		}
	}
	else
	{
		throw std::runtime_error("WaitForConnection is not supported on server pipe\n");
	}
}

Отметим важный момент – если клиентских каналов создается множество, то возможна ситуация, когда клиент подключится к каналу еще до вызова ConnectNamedPipe. В этом случае GetLastError вернет код ошибки 535 (но это не будет являться ошибкой)
Функции чтения и записи и просты и понятны без дополнительных комментариев:
void  WinNamedPipe::internalReadBytes(void* buf,size_t size)
{
	DWORD cbBytesRead = 0;
	BOOL fSuccess = FALSE;
	// Начинаем читать из канала 
	fSuccess = ReadFile( 
		_hPipe,        // указатель на канал
		buf,    // буфер-приемник для данных 
		size, // размер буфера 
		&cbBytesRead, // ссылка на переменную, куда будет записано число прочитанных байт 
		NULL);        // не используем асинхронные операции 

	if (!fSuccess || cbBytesRead == 0 ||cbBytesRead!=size)
	{   
		if (GetLastError() == ERROR_BROKEN_PIPE)
		{
			THROW_LAST_ERROR("pipe disconnected"); 
		}
		else
		{
			THROW_LAST_ERROR("read failed"); 
		}
	}

}

void  WinNamedPipe::internalWriteBytes(const void* buf,size_t size)
{
	DWORD cbWritten;
	BOOL fSuccess = FALSE;

	fSuccess = WriteFile( 
		_hPipe,        // указатель на канал 
		buf,     // буфер из которого читаем данные для отправки  
		size, // число байт для отправки 
		&cbWritten,   // ссылка  на переменную куда записать число отправленных байт
		NULL);        // не используем асинхронные операции 

	if (!fSuccess || size != cbWritten)
	{   
		THROW_LAST_ERROR("WriteFile failed"); 
	}
}

На функции Close() следует остановиться поподробней. Для того, чтобы закрыть серверный канал и сообщить клиенту на другом его конце об этом используют DisconnectNamedPipe(HANDLE pipe). После этого на нем можно снова ожидать новых клиентов, а можно сообщить операционной системе, что он не нужен, используя CloseHandle(HANDLE pipe). CloseHandle можно также вызывать и на клиентских каналах, однако данный метод должен вызываться только один раз – либо на серверном пайпе с подключенным клиентом, либо на клиенте. Поэтому было решено, что вызывать DisconnectNamedPipe и CloseHandle мы будем только на серверных экземплярах каналов, а на клиентских Close будет методом-пустышкой.
void WinNamedPipe::Close()
{
	if(_server||_server_with_client)
	{
		DisconnectNamedPipe(_hPipe); 
		CloseHandle(_hPipe); //May throw an exception if a debugger is attached to the process and handle is not valid
	}
}

Также следует помнить, что при отправке или получении больших объемов данных клиент должен уведомить противоположную сторону о том, что все данные прочитаны, а сервер не должен закрывать канал до получения этого уведомления.
Для уменьшения копипасты был создан макрос, получающий GetLastError и бросающий исключения:
#define THROW_LAST_ERROR(e){			\
	int error=GetLastError();				\
	std::stringstream err;					\
	err<<e<<", GLE="<<error;				\
	throw std::runtime_error(err.str().data());		\
}


Часть 3. Пишем unix-реализацию – PosixNamedPipe


Для реализации posix-совместимой части библиотеки мы решили использовать Unix domain sockets (подробнее про работу с сокетами в unix можно прочитать тут — wiki.linuxformat.ru/index.php/LXF83:Unix_API ).
В отличие от windows, файл-указатель на локальный сокет может быть расположен в любом месте файловой системы. Мы решили передавать в качестве префикса пути /tmp/. Следует отметить, что если в качестве префикса передать пустую строку, то клиент не сможет подключиться к сокету и будет возвращать ошибку file not found. Кроме этого, если файл с именем локального сокета будет существовать перед функцией, создающей локальный сокет, то ни один клиент не сможет подключиться к серверу (ошибка Connection refused). Поэтому перед созданием сокета следует проверить наличие данного файла и попытаться его удалить. Отсюда же и возникла идея с фиксированным префиксом пути – надо же как-то ликвидировать потенциальную уязвимость, дающую возможность затереть файл в любом месте системы (и тут неплохо бы вспомнить о правах доступа и использовании функции basename). Также отсюда следует тот факт, что серверных экземпляров класса PosixNamedPipe с одинаковыми именами более одного быть не должно – для корректной работы каждому имени должен соответствовать только один экземпляр класса.
Код функции open() выглядит следующим образом:
void PosixNamedPipe::open()
{
	sock= socket(AF_UNIX, SOCK_STREAM, 0);
	if(sock == -1) {
		THROW_ERROR("Create_socket failed: ");
	}
	unlink(_name.c_str());
	desc.sun_family = AF_UNIX;
	strcpy(desc.sun_path, _name.c_str());
	if (bind(sock, (sockaddr*)&desc, sizeof(struct sockaddr_un)) == -1) {
		THROW_ERROR("Connection failed(bind): ");
	}
	if (listen(sock,SOMAXCONN) == -1) {
    THROW_ERROR("Connection failed(listen): ");
  }
}

Функция connect() почти идентична:
void PosixNamedPipe::connect()
{
	sock= socket(AF_UNIX, SOCK_STREAM, 0);
	if(sock == -1)
	{
		THROW_ERROR("Create_socket failed: ");
	}
	desc.sun_family = AF_UNIX;
	strcpy(desc.sun_path, _name.c_str());
	if (::connect(sock, (sockaddr*)&desc, sizeof(struct sockaddr_un)) == -1)
	{
		THROW_ERROR("Connection failed(connect): ");
	}
}

Для того, чтобы написать WaitForConnection() согласно нашему ТЗ, в отличие от windows-версии кода тут даже не пришлось извращаться.
PosixNamedPipe* PosixNamedPipe::WaitForConnection()
{
	int client=accept(sock,NULL,NULL);
	if(client!=-1)
		return new PosixNamedPipe(client);
	else {
		THROW_ERROR("Accept error: ");
	}
}

Конструктор, вызываемый в ней также банален:
PosixNamedPipe::PosixNamedPipe(int pipe)
{
	sock=pipe;
	_server=false;
	memset(&desc, 0, sizeof(struct sockaddr_un));
}

Тут следует заметить, что для корректной работы функций bind и ::connect рекомендуется обнулять экземпляр структуры sockaddr_un перед инициализацией ее полей, в связи с чем в конструкторы класса была добавлена строчка memset(&desc, 0, sizeof(struct sockaddr_un)).
Функции чтения-записи для сокетов также довольно просты:
void PosixNamedPipe::internalReadBytes(void* buf,size_t size)
{
  size_t ret=-1;
  if ((recv(sock, buf, size, MSG_WAITALL)) == -1) {
    THROW_ERROR("Error while reading: ");
  }
}

void PosixNamedPipe::internalWriteBytes(const void* buf,size_t size)
{
  size_t ret=-1;
  if ((ret = send(sock, buf, size, 0)) == -1||ret!=size) {
    THROW_ERROR("Error while sending: ");
  }
}

В функции закрытия канала на всякий случай предусмотрено удаление файла при закрытии серверного экземпляра класса (и вызов функции закрытия также автоматически осуществляется в деструкторе):
void PosixNamedPipe::Close()
{
	if(_server)
		unlink(desc.sun_path);
	close(sock);
}
PosixNamedPipe::~PosixNamedPipe()
{
  this->Close();
}


Часть 4. Пишем многопоточный сервер для обработки клиентов


Итак, классы для работы с именованными каналами созданы. Теперь сделаем небольшой трюк, чтобы не писать #ifdef _WIN32 и подобные вещи при создании экземпляров данных классов в наших приложениях:

//NamedPipe.h
#pragma once
#ifdef _WIN32
	#include "WinNamedPipe.h"
	typedef WinNamedPipe NamedPipe;
#else
	#include <unistd.h>
	#include "PosixNamedPipe.h"
	typedef PosixNamedPipe NamedPipe;
#endif

Теперь в любом проекте делаем #include «NamedPipe.h» и можем писать код вида NamedPipe* client=new NamedPipe(«NamedPipe.h»,0) не заботясь о том, под какую из операционных систем будет собираться наше приложение.
Для обработки входящих запросов из внешних компонент в нашем проекте был написан простой многопоточный класс-сервер на основе паттерна thread pool с использованием библиотек boost::thread и Intel TBB. Основная идея была такой – в одном потоке в бесконечном цикле идет вызов WaitForConnection(), а указатели на объекты NamedPipe с подключенными клиентами складываются в очередь tbb::concurrent_queue, откуда их забирают потоки, непосредственно занятые чтением-записью и обработкой входящих сообщений.
Код функции, занимающейся приемом входящих соединений выглядит так:
void NamedPipeServer::run()
{
	NamedPipe* client;
	_pipe=new NamedPipe(_name, 1);
	try
	{
		_pipe->ConnectOrOpen();
	}
	catch(const std::exception& e)
	{
	}
	while(_active)
	{
		try
		{
			client=_pipe->WaitForConnection();
			_clients.push(client);
		}
		catch(const std::exception& e)
		{
		}
	}
	delete _pipe;
}

Заметим, что на месте пустых скобок после catch у нас стоял вызов макросов логирования, но в коде, переписанном для данной статьи, данные вызовы были убраны, чтобы не перегружать нашу библиотеку зависимостями.
По нашему замыслу работа с клиентами должна описываться в функции handleClient(NamedPipe* client), которая в классе сервера сделана виртуальной и должна переопределяться в классе-наследнике.
В каждом потоке-обработчике крутится следующий цикл:
void NamedPipeServer::workerProc()
{
	NamedPipe* client;
	while(_active)
		if(_clients.try_pop(client))
		{
			handleClient(client);
		}
		else
			boost::this_thread::sleep(boost::posix_time::milliseconds(100));
}

В цикле поток пытается достать клиента из очереди и в случае успеха вызывает handleClient, иначе – засыпает на некоторое время, чтобы избежать холостой загрузки процессора.
Для запуска всех потоков вызывается метод Start(), который вызывает метод startWorkers(), создающий потоки:
void NamedPipeServer::Start()
{
	_active=true;
	startWorkers();
}
void NamedPipeServer::startWorkers()
{
	for (size_t i = 0; i < _thread_pool_size; ++i)
	{
		boost::shared_ptr<boost::thread> thread(new boost::thread(boost::bind(&NamedPipeServer::workerProc, this)));
		_threads.push_back(thread);
	}
	boost::shared_ptr<boost::thread> dispatcher(new boost::thread(boost::bind(&NamedPipeServer::run,this)));
	_threads.push_back(dispatcher);
}

Отметим, что метод Start() не блокирует поток выполнения. Для блокировки его до завершения всех потоков сервера следует использовать метод JoinWorkers:
void NamedPipeServer::JoinWorkers()
{
	size_t size=_threads.size();
	for (std::size_t i = 0; i < size; ++i)
		_threads[i]->join();
	for (std::size_t i = 0; i < size; ++i)
		_threads[i].reset();
	_threads.clear();
}

На практике в нашем приложении считается, что запущенные серверы работают до тех пор, пока приложение не закроется. Это избавило нас от проблем, возникающих при попытке остановки потоков сервера именованных каналов.
Для того, чтобы попробовать остановить потоки сервера можно написать, например, такой метод Stop():
void NamedPipeServer::Stop()
{
	_active=false;
	this->JoinWorkers();
}

Однако при вызове данного метода поток «встанет». Причина тому – тот факт, что один из потоков заблокирован функцией WaitForConnection().
И выход возможен только после того, как к нам присоединится очередной клиент (соответственно после такого соединения клиент обломается).
Самый простой (но не самый хороший) вариант решения проблемы возможного блокирования – создать в функции Stop клиентский именованный канал и подключиться к нашему серверу, сбросив тем самым блокировку потока WaitForConnection.
Для более правильного решения этой проблемы нам следует изменить поведение функции WaitForConnection(), добавив в нее таймаут.
Новая функция для Windows-версии выглядит так:
WinNamedPipe*  WinNamedPipe::WaitForConnection(unsigned int timeout)
{
	if(_server)	 
	{
		OVERLAPPED lpOverlapped = {0};
		lpOverlapped.hEvent = CreateEvent(0,1,1,0);
		if(ConnectNamedPipe(_hPipe, &lpOverlapped)==0)
		{
			if(GetLastError()==ERROR_PIPE_CONNECTED)
				if (!SetEvent(lpOverlapped.hEvent)) 
					THROW_LAST_ERROR("AsyncWaitForConnection failed");
			int result = WaitForSingleObject(lpOverlapped.hEvent,timeout);
			if (WAIT_OBJECT_0 == result)
			{
				HANDLE client=_hPipe;
				open();
				return new WinNamedPipe(client);
			}
			else
			{
				return NULL;
			}
		}
		else
		{
			THROW_LAST_ERROR("AsyncWaitForConnection failed");
		}
	}
	else
	{
		throw std::runtime_error("WaitForConnection is not supported on client pipe\n");
	}
}

Для того, чтобы данный метод корректно работал, надо изменить вызов функции CreateNamedPipe в open() следующим образом:
_hPipe = CreateNamedPipe( 
		(LPSTR)_name.data(),           
		PIPE_ACCESS_DUPLEX |     
             FILE_FLAG_OVERLAPPED,       // Вот тут мы включили поддержку неблокирующих операций
		PIPE_TYPE_BYTE |  
		PIPE_WAIT,             
		PIPE_UNLIMITED_INSTANCES, 
		BUFFER_PIPE_SIZE,              
		BUFFER_PIPE_SIZE,               
		0,              
		NULL);                    // default security attribute 

Реализация для linux выглядит так:
PosixNamedPipe* PosixNamedPipe::WaitForConnection(unsigned int timeout)
{ 
	int nsock;              
	int retour;             
	fd_set readf;          
	fd_set writef;           
	struct timeval to;      

	FD_ZERO(&readf);
	FD_ZERO(&writef);
	FD_SET(sock, &readf);
	FD_SET(sock, &writef);
	to.tv_usec = timeout*1000;

	retour = select(sock+1, &readf, &writef, 0, &to);

	if (retour == 0)  
	{
		return NULL;
	}

	if ( (FD_ISSET(sock, &readf)) || (FD_ISSET(sock,&writef))) 
	{
			nsock = accept(sock, NULL, NULL);
			return new PosixNamedPipe(nsock);
	}
	else
	{
		throw std::runtime_error("invalid socket descriptor!\n");
	}
}

В завершение отметим, что деструктор класса сервера для корректной работы должен вызывать метод Stop() и затем очищать всю очередь _clients при наличии там необработанных клиентов. Так как очередь клиентов в сервере состоит из указателей, то ее также надо очистить вручную:

NamedPipeServer::~NamedPipeServer(void)
{
	this->Stop();
	while(!_clients.empty())
	{
		if(_clients.try_pop(_pipe))
			delete _pipe;
	}
}

Часть 5. Пример использования библиотеки.


Для тестирования библиотеки была написана спамилка, в бесконечном цикле создающая два потока, которые пытаются соединиться с сервером и отправить ему 10 строчек. После завершения потоков цикл повторяется.
Каждый поток выглядит так:
void spamming_thread()
{
	std::vector<std::string> words;
	words.push_back(std::string("one "));
	words.push_back(std::string("two "));
	words.push_back(std::string("three "));
	words.push_back(std::string("four "));
	words.push_back(std::string("five "));
	words.push_back(std::string("six "));
	words.push_back(std::string("seven "));
	words.push_back(std::string("eight "));
	words.push_back(std::string("nine "));
	words.push_back(std::string("ten "));

	NamedPipe client("NamedPipeTester",0);
	try
	{
		client.ConnectOrOpen();
		for(int i=0;i<words.size();++i)
		{
			std::cout<<"sending "<<words[i]<<"\n";
			size_t size=words[i].size();
			client.WriteBytes(&size,sizeof(size));
			client.WriteBytes(words[i].data(),size);
		}
		client.Close();
	}
	catch(const std::runtime_error &e)
	{
		std::cout<<"Exception: "<<e.what()<<"\n";
	}
}

Теперь напишем сервер, который принимает подключения:
#pragma once
#include "../NamedPipeServer.h"

class SimpleServer: public NamedPipeServer
{
protected:
	void handleClient(NamedPipe* client);
public:
	SimpleServer(const std::string &name):NamedPipeServer(name,1){};
	~SimpleServer(){};
};
void SimpleServer::handleClient(NamedPipe* client)
{
	for(int i=0;i<10;++i)
	{
		try
		{
			size_t size;
			client->ReadBytes(&size,sizeof(size));
			if(size>0)
			{
				char* message=new char[size];
				client->ReadBytes(message,size);
				//Using std::cout is bad in multi-threading apps
				//std::string msg(message,size);
				//std::cout<<"Message from pipe: "<<msg<<"\n";
				delete[] message;
			}
		}
		catch(const std::exception& e)
		{
			std::cout<<"Exception!:"<<e.what()<<"\n";
		}
	}
	client->Close();
}

Запустим его, подождем полминуты и попробуем остановить:
		SimpleServer* s=new SimpleServer("NamedPipeTester");
		s->Start();
		boost::this_thread::sleep(boost::posix_time::milliseconds(30000));
		delete s;
		system("pause");

Заключение.


Данная библиотека вот уже полгода успешно используется в нашем проекте. Для хабра код библиотеки был исправлен и дополнен. Тем не менее, так как статья написана начинающим программистом, то в коде возможны ошибки. Автор будет благодарен за конструктивную критику, замечания и предложения по улучшению и за найденные ошибки.
Данная статья не претендует на полноту. В ней не рассмотрены такие вещи, как неблокирующее чтение и запись, установка прав доступа для именованных каналов и альтернатива побайтовому режиму чтения-записи — режим обмена сообщениями.
Весь исходный код к статье вы можете найти на github.com/xpavlov/libNamedPipe
AdBlock has stolen the banner, but banners are not teeth — they will be back

More
Ads

Comments 36

    +1
    Достаточно было примера использования и пары картинок + описание ньюансов использования. Ссылка на github очень полезна
      0
      >>В рамках одного класса реализуется и работа с серверными каналами (акцепт) и работа с клиентскими.
      А почему не было разделено на два класса Читатель\Писатель?
        0
        Во время проектирования мы решили, что не будем создавать отдельный класс только ради акцепта новых клиентов. Хотя такое разделение было бы неплохо сделать. В .NET оно есть. Но там в серверный пайп можно и писать и читать.
          0
          Ваш сервер практически не содержит общего кода с клиентом да и используется только внутри библиотеки. Явно неплохо бы было разбить на 2. Плюс вместо наследованния лучше-бы было использовать агрегацию. Кажется что код от этого бы стал легче.
            0
            Насчет агрегации — не уверен, но про разбивку подумаем, действительно, смешивать функционал в одном классе нехорошо.
              0
              хотите я вам пул реквест с агрегацией запилю? :)
                0
                Было бы здорово, именно для этого я и сделал репозиторий на гитхабе, а не просто выложил zip с сорцами
        +3
        А почему не использовали boost.asio, там это уже все есть:
        Для unix
        Для win32: 1 или 2

        Также, если у вас текстовый протокол, то у вас из коробки бы работали asio streams, вместо чтения записи побайтно
          0
          Мы хотели максимально избавить внешние компоненты от сторонних зависимостей. Но, честно говоря, наличие указанных вами классов в asio мы проглядели. Спасибо за наводку.
          +3
          По замечаниям: вы пишете что у вам требовалась высокая производительность, хотя по факту в коде сервера встречаются строки вида: boost::this_thread::sleep(boost::posix_time::milliseconds(100)); Что убивает ваше стремление к производительности полностью.

          Для того чтобы избежать ненужного поллинга можно заменить тип очереди на concurrent_bounded_queue Тогда вы избавить от постоянной долбежки функции try_pop и будете дожидаться сообщения без поллинга.

          Еще очень смущает наличие кода вида:
          catch(const std::exception& e)
          {
          }
          catch(...)
          {
          }

          Теряется всякая информация об ошибках, что не есть хорошо.

          Также есть несколько логических ошибок в коде (NamedPipeServer.cpp:12) — почему если будет выброшено исключение, то вы все равно продолжаете работу и пытаетесь обрабатывать сообщения. По моему гораздо логичней передавать в объект класса NamedPipeServer уже созданное соединение (или создавать в конструкторе), тогда бы вы не нарушали внутренних инвариантов класса (ваш объект _pipe был бы всегда создан и активен).

          Ну вот вроде все.

          P.S. Рекомендую прочесть книгу Энтони Вильямся concurrency in action c++. Там все эти моменты тщательно разобраны.
            –1
            Спасибо за замечания и книгу.

            >если будет выброшено исключение, то вы все равно продолжаете работу и пытаетесь обрабатывать сообщения.

            Так надо. Одно исключение не должно валить весь сервер и обламывать других клиентов.

            >Теряется всякая информация об ошибках, что не есть хорошо.

            В статье написано, что внутри catch у нас было логирование. Но для статьи я убрал его, чтобы убрать зависимость итогового варианта от log4cxx. Но пустышки catch оставить пришлось, чтобы один эксепшн не валил приложения.

            >в коде сервера встречаются строки вида: boost::this_thread::sleep(boost::posix_time::milliseconds(100)); Что убивает ваше стремление к производительности полностью.

            Хорошо, сделаем засыпание на 1мс. Но оно все равно должно быть — иначе поедание ресуров процессора на холостом цикле будет слишком большим
              +1
              Так, давайте еще раз.
              Не делайте засыпание на 1 мс. Это еще хуже, да — вы уменьшите время отклика, но если ваша очередь будет много времени пустой, то все равно будет тратиться время на вызовы try_pop. В лучшем случае вы не просадите производительность. Рекомендую сделать упор именно на concurrent_bounded_queue

              Что касается исключений, давайте разберемся почему оно происходит. Вы создаете объект NamedPipeServer, и у вас все хорошо, но когда вы вызываете метод run, то в нем происходит исключение, и ваш метод уже не знает что с ним делать. По факту, ваша программа остается в неконсистентном состоянии (вот здесь как раз нарушается инвариант), и вы снаружи никогда не узнаете, о том что произошла ошибка.

              Теперь рассмотрим другой вариант, конструктор NamedPipeServer принимает один аргумент — pipe (который должен быть создан выше по коду). Таким образом в момент создания объекта NamedPipeServer, вы уже точно знаете, что пайп открыт и метод run можно спокойно вызывать, и более того, вся эта чехарда с исключениями из метода run просто выкинется (как и вызов ConnectOrOpen. И теперь нам уже не страшно исключение, потому что мы можем написать код такого вида:
              try {
                std::auto_ptr<NamedPipe> pipe(new NamedPipe(name, 1));
                NamedPipeServer server(pipe);
                server.Start();
              } catch(const std::exception & e) {
                //logging, shutting down, anything else
              }
              


              Кстати, что касается исключений, то опять же, можно запускать не сами boost::thread, а использовать boost::future + boost::packaged_task, таким образом вы всегда сможете переместить исключение из потока в основной (main) поток программы.
                0
                Да, спасибо, как я написал ниже про засыпание — я просто невнимательно прочитал ваш комментарий.

                Отдельное спасибо за boost::future и packaged_task — мы давно хотели найти более удобный планировщик задач.
                  0
                  Мне кажется, что в случае использования данной очереди возможна следующая проблема.

                  В документации написано, что метод pop() «Block until an item becomes available, and then dequeue it.».

                  Давайте представим, что все потоки-обработчики клиентов встали на методе pop() и ждут, когда в очереди что-то появится. А в это время мы хотим остановить сервер. Поток с WaitForConnection() мы уже остановили, новых клиентов нет, а другие потоки по-прежнему стоят на блокирующем pop() и, следовательно, вызов join() применительно к ним просто повесит главный поток выполнения.
                    0
                    UPD: Нашел метод abort(), с помощью которого можно остановить все потоки корректно.
                0
                >Для того чтобы избежать ненужного поллинга можно заменить тип очереди на concurrent_bounded_queue Тогда вы избавить от постоянной долбежки функции try_pop и будете дожидаться сообщения без поллинга.

                Извиняюсь, проглядел данный совет. В принципе я изначально был за, но полгода назад, во время рождения данной библиотеки, начальство было против данной замены (
                  +1
                  > начальство было против данной замены (
                  Не повезло, сочувствую.

                  Кстати я как-то на работе нечаянно провел тест: я написал одну программу — демон, и забыл ее выключить (там тоже были sleep и все такое) и она крутилась на тестовом сервере и все про нее забыли (случайно наткнулся на нее), я посмотрел, что за несколько месяцев работы, она сожрала день процессорного времени — а это был только простой, по факту она ничего не делала. При этом висящий рядом демон ftpd сожрал не более нескольких минут процессорного времени. Меня эта ситуация в свое время очень мотивировала на обучение параллельному программированию, возможно это вам поможет
                    0
                    >Не повезло, сочувствую.

                    Ну так все прогеры уже разбежались, и остался лишь «сам себе IT директор»
                +1
                Поскольку имя канала в нашем классе хранится в std::string, то для корректной его передачи в WinAPI функции надо устанавливать в свойствах проекта в Visual Studio «Use Multi-Byte character Set»

                Шел суровый 2013 год… программисты бились с кодировками как могли… а могли они немного…

                //и дело даже не в том что std::string-у никто UCS-2 хранить не запрещает (ибо оно использует доп поле для определения свой длины)… но, ребята, это снежный зверек…
                  0
                  А как бы сделали вы?
                    0
                    ну точно не так. У вас еще нет проблем с русскими именами компьютеров (к примеру) при использовании вашей «либы» с компьютера, где нет русской локали (не настроена как «по умолчанию для программ не поддерживающих unicode» (как-то так называется опция))? Тогда mr. ЗлойБаг идёт к вам…
                      0
                      С именами машин проблемы и не должно быть — по ТЗ и система и наше приложение должны быть развернуты на одной машине. Иначе бы мы использовали только сокеты и этой статьи не было бы.

                      Кроме этого, есть соглашение что имена пайпов должны быть на английском и хардкодятся в бинарниках. Ну, есть, конечно, опция с добавлением префиксов (если одна копия приложения на машине слушает, скажем 1ску а вторая — оракл), но там мы явно применяем конвертацию юникода в ascII, а в случае с 1с — еще и транслит.
                  +1
                  Задаю этот вопрос на хабре уже второй раз — первый был оставлен сообществом без ответа. Почему так популярна среди разработчиков логическая цепочка, подобная этой:
                  кроме того нам не хотелось раздувать проект использованием тяжелых сторонних библиотек (на тот момент мы уже использовали boost и Intel TBB, притягивать же еще Qt или gSoap нам не хотелось).
                  Поэтому было решено писать свой велосипед.

                  Они, простите, кусаются? Библиотеки, я имею в виду. Существует диктуемое Великим Эфиром (Большим Братом / <любым другим божеством по вкусу>) ограничение на число используемых в проекте библиотек? Их затем и создают, библиотеки, чтобы можно было для решения какой-либо задачи ограничиться затратами на изучение этой самой библиотеки, а не вбухать всегда, чтобы там ни думали сами разработчики, всегда большее время на продумывание, разработку, отладку и поддержку своего решения. «тяжелых… библиотек» — вам их носить куда-то? Или пара мегабайт дискового пространства пользователя стоит дороже недель/месяцев/лет труда команды профессиональных программистов? Непонятна мне эта философия. Поясните, пожалуйста, в чем я до сих пор заблуждаюсь.
                  К частной сути самого поста этот комментарий большого отношения не имеет, он относится именно к общей идее, почему-то крайне распространенной в ИТ. Заранее спасибо.
                    +2
                    Не берусь судить за автора, но имею пару своих мыслей на этот счет. Использовать большую чужую библиотеку с одной стороны хорошо и привлекательно, особенно если она решает твою проблему. С другой стороны еще привлекательней попробовать решить проблему самому, дабы научиться чему-то новому и столкнуться с новыми трудностями, и потом уже, с высоты приобретенного опыта смотреть на что-то реализованное в boost-e или другой библиотеке. Сравнивать подходы, смотреть на красивые или не очень решения. Без опыта решения задач, которые решает библиотека тяжело впитать понимание как она должна использоваться, а это уже сильно повышает риск прострелить себе ногу в нескольких местах.
                      0
                      Очень хороший ответ, спасибо.
                      Исключительно с целью стимуляции развития мысли:

                      Возьмем в качестве примера функциональность, реализуемую библиотекой Boost.Asio. Пусть разработчик не имеет ни большого опыта в области сетевого программировния, ни исчерпывающих знаний предмета (но базовые представления он имеет). У него есть, на мой взгляд, три пути решения сетевой задачи:
                      1. Просто начать использовать Boost.Asio, опираясь на ее прекрасную документацию.
                      2. Изучить У. Стивенса («UNIX. Разработка сетевых приложений»), изучить второй том POSA, попытаться самостоятельно реализовать библиотеку для решения своих задач, попытаться с ее помощью, собственно, решить задачу, обжечься, итеративно повторить процесс несколько раз. Затем выполнить пункт 1.
                      3. Прочесть несколько первых глав Стивенса. Изучить документацию Boost.Asio, при каждом подходящем случае (и даже чаще) обращаясь за детальными разъяснениями к Стивенсу и POSA. Обеспечить себе таким образом a) глубокое понимание архитектуры используемой библиотеки b) хорошее представление об ограничениях библиотеки и местах, в которых её абстракции могут дать течь. Выполнить пункт 1.

                      Мне представляется, что путь 3) наиболее разумен в большинстве случаев.

                      И да. Вы попытались объяснить, зачем изобретают велосипеды. Но пока остается совсем непонятным, почему это желание сами изобретатели объясняют не так, как вы, а так, как автор. Ссылаясь на противоестественность использования более, чем n библиотек в проекте, «тяжесть» чего-либо, измеряемую непонятно в чём, и пр.
                        0
                        Для ответа на этот вопрос мне проще всего восстановить по памяти диалог полугодовой давности:

                        — как мы будем связываться из внешней компоненты с нашим проектом?

                        — сокеты

                        — не айс, loopback это — медленно, посмотри что-нибудь вроде shared memory

                        — вот есть межпроцессная очередь сообщений в бусте

                        — она с приоритетами, это тоже медленно.

                        — ну вот когда я делал компоненту для MSSQL, то там я использовал именованные каналы из .NET

                        — поищи такое же для плюсов

                        — нашел только QLocalsocket в Qt

                        (пауза)

                        — это слишком толстая либа, ставить ее на все компы заказчика не айс. Может, попробуем свою написать?

                        Собственно, как-то так оно и было. Заказчик системы тоже «шарит» и обладает чертой, присущей большинству заказчиков — не называет проблему, а требует запрогать то решение, которое пришло ему в голову, пытается контролировать процесс разработки.
                        С одной стороны это хорошо. С другой — когда влияние заказчика распространяется на выбор библиотек — нет.

                        Теперь мое видение велосипедостроения. Для того чтобы обучаться — писать велосипеды очень даже хорошо. И набивать шишки. Но с другой стороны, впаривать велосипеды заказчику совсем не хорошо. Хотя бы сточки зрения поддержки и уровня покрытия тестами кода. За 10мб Qt-библиотеки стоит большое коммьюнити разработчиков, сама библиотека использовалась в миллионах проектов и шанс словить в ней ошибку гораздо меньше шанса словить ошибки от велосипеда.

                        Только при написании этой статьи я убрал 6 багов из той версии, что сейчас крутится у заказчика. Ошибки были не критичные — при определенных условиях сервер посылал клиент после соединения, но клиент умел переподключаться, да пара утечек памяти (одна в деструкторе сервера, который никогда не вызывался).
                      0
                      Большие библиотеки как минимум увеличиваю время сборки проекта. Да и баги (если найдутся) в них тяжело фиксить.
                      В этом отношении можно понять любителей маленьких велосипедов.
                      +2
                      К вороху советов выше хочу добавить, что использовать xml для коммуникации и заботиться о производительности одновременно — это как-то не правильно. :) Мы для обмена данными начали использовать MessagePack вместо xml и остались очень довольны: он компактный, быстро парсится, легко читается и легко пишется. Есть код для работы с MessagePack из основных языков.
                        0
                        Заказчику очень хочется посылать именно xml. Причем, по замыслу проекта, он просто гоняет эти xml между разными системами, никак их не меняя и даже не читая.
                        Messagepack мы используем для мериализации класса сообщений, которые передаем по сети.
                          0
                          Расскажите ему про BSON.
                            0
                            Он в курсе. MessagePack, кстати, вполне может заменить BSON.
                            Проблема тут в том, что тот же 1с или MSSQL средствами встроенного языка умеет формировать и парсить только xml-ки. Поэтому в наше приложение из обозначенных систем приходит именно xml. Так как наше приложение фактически без изменений засылает его в другую (но удаленную) систему, то особого резона преобразовывать xml<->BSON мы не видели. Разве что трафик экономить.
                        0
                        Простите мозги уже в кучу, но как вы прерываете работу клинта если он висит на операции read если это Linux система?
                          0
                          В принципе, в boost есть возможность получить нативный указатель на поток. После этого можно сделать pthread_kill() или CancelSynchronousIo() в Windows Vista и выше, но у заказчика еще местами XP стоит :(.
                          Еще один вариант — крутить в цикле select() перед вызовом recv().
                          Но по-хорошему надо работать с неблокирующими (асинхронными) операциями.
                            0
                            Есть еще один вариант, без убийства потоков.Надо сделать возможность получения указателя на канал, который обрабатывается потоком сервера. Тогда достаточно закрыть данный канал для того чтобы прервать операции чтения. Нехорошего тут только одно — пострадают те клиенты, с которыми происходит обмен данными.
                            0
                            Почему вы не рассматривали ZeroMQ например?
                              0
                              Честно говоря, тогда (полгода назад) никто из программистов, включая начальника, не предложил такую идею. Только стандартные варианты — sockets, shared memory, pipes.
                              Но сейчас работы по переходу на MQ уже ведутся

                            Only users with full accounts can post comments. Log in, please.