Pull to refresh

Пишем кроссплатформенную библиотеку на C++ для работы с именованными каналами

Reading time16 min
Views32K

Предыстория


В процессе разработки одного B2B проекта возникла необходимость обращения к нашему приложению из различных систем вроде 1C, Oracle, MS Sql Server. Первый «универсальный» вариант, который пришел в голову – использовать веб-сервисы. Но, к сожалению, у вышеупомянутых систем несколько разное понимание этого стандарта (например, не факт, что тот же oracle и 1C будут понимать друг друга), кроме того нам не хотелось раздувать проект использованием тяжелых сторонних библиотек (на тот момент мы уже использовали boost и Intel TBB, притягивать же еще Qt или gSoap нам не хотелось).
Поэтому было решено писать свой велосипед.

Вышеупомянутые системы имеют механизм расширения возможностей внутренних языков с помощью dll-плагинов, так называемых внешних компонент. Идея их использования для общения с внешним миром не является новой – на хабре уже были статьи по данной теме:
habrahabr.ru/post/149956
habrahabr.ru/company/Centrobit/blog/165441
habrahabr.ru/post/163859
Итак, мы решили использовать внешние компоненты для решения задачи. Затем нам следовало решить, каким способом, да еще и желательно кроссплатформенным (т.е. чтобы компоненту и приложение под разные платформы можно было бы собирать простой перекомпиляцией), мы будем связывать компоненту с нашим приложением. Проще всего было бы не заморачиваться и найти готовые IPC-решения на эту тему (QLocalSocket в Qt, Named Pipes в ACE), но опять-таки из соображений сокращения числа используемых библиотек было решено написать свой маленький велосипед.
Идеальным решением была бы boost::interprocess::message_queue, но она была не простой очередью, а очередью с приоритетами, что несколько снижало производительность (стремление к высокой производительности также было одной из причин, почему мы отказались от web-сервисов). Кроме того, нам было нужно некое подобие клиент-серверного решения.
От сокетов через loopback мы отказались сразу – у них слишком большой overhead в windows (см. habrahabr.ru/post/81067 ). Поэтому мы стали смотреть в сторону именованных каналов windows и domain sockets в unix.

Часть 1. Составляем ТЗ


Фактически, смысл наших внешних компонент заключался лишь в получении некоего xml-сообщения из системы и отправке ее в канал (конечно, связь должна быть двусторонней, но механизм передачи сообщений назад в систему мы не всегда реализовывали через компоненты). Поэтому мы решили, что не будем погружаться в дебри асинхронной работы с каналами, а нам хватит лишь реализации обычных блокирующих операций чтения-записи (и обработки клиентов). Требования к функциям чтения-записи были следующие:
  • Функции должны принимать два параметра void* указатель на буфер в который надо писать или из которого надо читать и размер данных которые надо прочитать/отправить
  • В случае любой ошибки (например, число принятых или отправленных байт не равно переданному в параметре) функция должна бросить исключение std::runtime_error (да, обработка исключений немного снижает производительность, можно бы использовать коды ошибок)
  • В рамках одного класса реализуется и работа с серверными каналами (акцепт) и работа с клиентскими. В случае создания серверного канала функции чтения-записи не должны работать, а должны бросать исключения

Последний пункт требует пояснения – серверный канал нужен лишь для того, чтобы в цикле выполнять блокирующий метод ожидания клиента, возвращаемым значением которого является объект-канал с подключенным клиентом (по такой идеологии работает напиример функция accept для POSIX сокетов. В windows API же с каналами все наоборот – в случае блокирующей работы функция ожидания просто останавливает поток до тех пор, пока на том конце канала не появится клиент, затем его нужно обработать, а для следующих клиентов создать новый слушающий канал и т.д. )
В результате был сочинен следующий абстрактный класс INamedPipe от которого затем наследуются классы WinNamedPipe и POSIXNamedPipe
#pragma once
#include <stdexcept>
	class INamedPipe
	{
	protected:
                std::string _name;
		bool _server;
		INamedPipe(){};
		virtual void  internalReadBytes(void* buf,size_t size)=0;
		virtual void internalWriteBytes(const void* buf,size_t size)=0;
		virtual void internalFlush()=0;
	public:

		INamedPipe(const std::string prefix, const std::string& name, bool server):_name(prefix),_server(server)
		{
			_name.append(name);
		}
		void ConnectOrOpen()
		{
				if(_server)
					open();
				else
					connect();
		}
		virtual void ReadBytes(void* buf,size_t size)
		{
			if(!_server)
			{
				if(size<1)
					throw std::out_of_range("Size is 0 or less");
				internalReadBytes(buf,size);
			}
			else
				throw std::runtime_error("This operation is not supported on server pipe");
		}
		virtual void WriteBytes(const void* buf,size_t size)
		{
			if(!_server)
			{
				if(size<1)
					throw std::out_of_range("Size is 0 or less");
				internalWriteBytes(buf,size);

Конструктор INamedPipe принимает три параметра – префикс пути, где будет расположен канал в системе, имя канала, и параметр указывающий тип канала – клиентский или серверный (было решено объединить оба типа в одном классе).
Функция ConnectOrOpen() вызывает необходимый метод для открытия канала в зависимости от его типа (для сервера – open(), для клиента — connect()).

Часть 2. Пишем windows-реализацию WinNamedPipe


Перед реализацией мы решили погуглить примеры. Первым делом нагуглились туториалы из MSDN ( msdn.microsoft.com/ru-ru/library/windows/desktop/aa365592, msdn.microsoft.com/ru-ru/library/windows/desktop/aa365588 ) и документация на русском — www.frolov-lib.ru/books/bsp/v27/ch2_3.htm. С помощью этих статей и была написана первая версия класса WinNamedPipe. Спустя некоторое время мы нашли хорошую статью по написанию класса работы с именованными каналами на примере создания внешней компоненты для MetaTrader ( www.mql5.com/ru/articles/115 ), которую тоже рекомедуем к прочтению.
В конструкторе WinNamedPipe уже задан префикс – согласно требованиям MSDN полный путь к именованному каналу должен иметь вид «\\имя_компьютера\pipe\имя_канала» и в случае работы с именованным каналом на локальной машине имеет вид «\\.\pipe\pipe_name» (да, именованные каналы в Windows позволяют обмениваться информацией и по сети).
В классе также присутствует конструктор, принимающий на вход параметр типа HANDLE – число, идентифицирующее в операционной системе какую-нибудь структуру или объект (с помощью данного конструктора мы сможем создавать новый экземпляр класса WinNamedPipe зная только HANDLE уже существующего именованного канала). Данный конструктор будет использоваться в функции WaitForConnection() (см. далее)
Реализация open() выглядит следующим образом:
void WinNamedPipe::open(){
_hPipe = CreateNamedPipe( 
		(LPSTR)_name.data(),             // имя создаваемого канала
		PIPE_ACCESS_DUPLEX,       // разрешен доступ на чтение и запись 
		PIPE_TYPE_BYTE |   //читаем побайтово
		PIPE_WAIT,                // блокирующий режим 
		PIPE_UNLIMITED_INSTANCES, // число экземпляров канала неограниченно  
		BUFFER_PIPE_SIZE,                  // размер буфера исходящих сообщений 
		BUFFER_PIPE_SIZE,                  // размер буфера входящих сообщений 
		0,                        // тайм-аут ожидания (0=бесконечно) 
		NULL);                    // атрибут безопасности по умолчанию – доступ разрешен всем

	if (_hPipe == INVALID_HANDLE_VALUE) 
	{
		THROW_LAST_ERROR("CreateNamedPipe failed");
	}

Заметим, что размер буфера BUFFER_PIPE_SIZE мы задаем в самом начале файла. Фактически, его размер не влияет на максимальный размер передаваемых данных – в нашем проекте мы успешно передавали куски, которые были больше буфера в несколько раз.
Особое внимание следует обратить на первый параметр: (LPSTR)_name.data()
Поскольку имя канала в нашем классе хранится в std::string, то для корректной его передачи в WinAPI функции надо устанавливать в свойствах проекта в Visual Studio «Use Multi-Byte character Set» (подробнее см. статью habrahabr.ru/post/164193 ).
Подробное описание параметров функции CreateNamedPipe можно прочитать в MSDN ( msdn.microsoft.com/en-us/library/windows/desktop/aa365150 и на www.frolov-lib.ru/books/bsp/v27/ch2_3.htm ).
Клиентское подключение к каналу осуществляется с помощью функции connect(), которая имеет следующую реализацию:
void WinNamedPipe::connect()
	for(;;)
	{
		WaitNamedPipe((LPSTR)_name.data(), NMPWAIT_WAIT_FOREVER);
		_hPipe = CreateFile( 
			(LPSTR)_name.data(),   // имя канала 
			GENERIC_READ |  // доступ на чтение и запись 
			GENERIC_WRITE, 
			0,              
			NULL,           // доступ по умолчанию разрешен всем
			OPEN_EXISTING,  // открываем существующий канал 
			0,               
			NULL);           
			// Break if the pipe handle is valid or error!=232
		 if (_hPipe != INVALID_HANDLE_VALUE||GetLastError() != ERROR_PIPE_BUSY) 
         break; 
	}
	if (_hPipe == INVALID_HANDLE_VALUE) 
			THROW_LAST_ERROR("Could not open pipe");
	DWORD dwMode = PIPE_TYPE_BYTE; 
	BOOL fSuccess = SetNamedPipeHandleState( 
		_hPipe,    // указатель на созданный канал  
		&dwMode,  //  параметры чтения-записи
		NULL,     // максимальный размер чтения и записи неограничен 
		NULL);    // таймаут не ограничен 
	if ( ! fSuccess) 
	{
		THROW_LAST_ERROR("SetNamedPipeHandleState failed"); 
	}
}

Сначала с помощью функции WaitNamedPipe мы встаем на «вечное» ожидание свободного экземпляра серверного канала, затем с помощью CreateFile соединяемся с ним. После этого мы проверяем, удачно ли прошло соединение. Если указатель на объект-канал валиден (или произошла ошибка отличная от ERROR_PIPE_BUSY – ошибки, обозначающей, что свободный канал отсутствует) мы выходим из цикла и настраиваем нужный нам режим работы с каналом. В противном случае цикл повторяется до тех пор, пока подключиться нормально не удастся или же произойдет ошибка, отличная от ERROR_PIPE_BUSY. Если серверный именованный канал с данным именем не существует, то WaitNamedPipe завершится сразу, GetLastError вернет код ошибки 2 (The system cannot find the file specified) и connect() бросит исключение.
Согласно нашей задумке WaitForConnection() должна возвращать указатель на новый WinNamedPipe, к которому уже присоединен клиент и к которому можно применять функции чтения-записи. Однако в Windows стандартный механизм приема клиентов осуществляется по-другому – функция ConnectNamedPipe (получающая на вход HANDLE созданного канала) просто блокирует поток до тех пор, пока на другом конце канала не появится клиент. Затем, чтобы не потерять других клиентов, нужно создать новый слушающий канал и передать его в данную функцию и т.д. Поэтому для реализации нашей задумки приходится из WaitForConnection вызывать open() еще раз.
Функция WaitForConnection() выглядит следующим образом:
WinNamedPipe* WinNamedPipe::WaitForConnection()
{
	if(_server)	 
	{
		DWORD error;
		if (ConnectNamedPipe(_hPipe, NULL)||(error=GetLastError())==ERROR_PIPE_CONNECTED)
		{
			HANDLE client=_hPipe;
			open();
			return new WinNamedPipe(client);
		}
		else
		{
			THROW_LAST_ERROR("WaitForConnection failed");
		}
	}
	else
	{
		throw std::runtime_error("WaitForConnection is not supported on server pipe\n");
	}
}

Отметим важный момент – если клиентских каналов создается множество, то возможна ситуация, когда клиент подключится к каналу еще до вызова ConnectNamedPipe. В этом случае GetLastError вернет код ошибки 535 (но это не будет являться ошибкой)
Функции чтения и записи и просты и понятны без дополнительных комментариев:
void  WinNamedPipe::internalReadBytes(void* buf,size_t size)
{
	DWORD cbBytesRead = 0;
	BOOL fSuccess = FALSE;
	// Начинаем читать из канала 
	fSuccess = ReadFile( 
		_hPipe,        // указатель на канал
		buf,    // буфер-приемник для данных 
		size, // размер буфера 
		&cbBytesRead, // ссылка на переменную, куда будет записано число прочитанных байт 
		NULL);        // не используем асинхронные операции 

	if (!fSuccess || cbBytesRead == 0 ||cbBytesRead!=size)
	{   
		if (GetLastError() == ERROR_BROKEN_PIPE)
		{
			THROW_LAST_ERROR("pipe disconnected"); 
		}
		else
		{
			THROW_LAST_ERROR("read failed"); 
		}
	}

}

void  WinNamedPipe::internalWriteBytes(const void* buf,size_t size)
{
	DWORD cbWritten;
	BOOL fSuccess = FALSE;

	fSuccess = WriteFile( 
		_hPipe,        // указатель на канал 
		buf,     // буфер из которого читаем данные для отправки  
		size, // число байт для отправки 
		&cbWritten,   // ссылка  на переменную куда записать число отправленных байт
		NULL);        // не используем асинхронные операции 

	if (!fSuccess || size != cbWritten)
	{   
		THROW_LAST_ERROR("WriteFile failed"); 
	}
}

На функции Close() следует остановиться поподробней. Для того, чтобы закрыть серверный канал и сообщить клиенту на другом его конце об этом используют DisconnectNamedPipe(HANDLE pipe). После этого на нем можно снова ожидать новых клиентов, а можно сообщить операционной системе, что он не нужен, используя CloseHandle(HANDLE pipe). CloseHandle можно также вызывать и на клиентских каналах, однако данный метод должен вызываться только один раз – либо на серверном пайпе с подключенным клиентом, либо на клиенте. Поэтому было решено, что вызывать DisconnectNamedPipe и CloseHandle мы будем только на серверных экземплярах каналов, а на клиентских Close будет методом-пустышкой.
void WinNamedPipe::Close()
{
	if(_server||_server_with_client)
	{
		DisconnectNamedPipe(_hPipe); 
		CloseHandle(_hPipe); //May throw an exception if a debugger is attached to the process and handle is not valid
	}
}

Также следует помнить, что при отправке или получении больших объемов данных клиент должен уведомить противоположную сторону о том, что все данные прочитаны, а сервер не должен закрывать канал до получения этого уведомления.
Для уменьшения копипасты был создан макрос, получающий GetLastError и бросающий исключения:
#define THROW_LAST_ERROR(e){			\
	int error=GetLastError();				\
	std::stringstream err;					\
	err<<e<<", GLE="<<error;				\
	throw std::runtime_error(err.str().data());		\
}


Часть 3. Пишем unix-реализацию – PosixNamedPipe


Для реализации posix-совместимой части библиотеки мы решили использовать Unix domain sockets (подробнее про работу с сокетами в unix можно прочитать тут — wiki.linuxformat.ru/index.php/LXF83:Unix_API ).
В отличие от windows, файл-указатель на локальный сокет может быть расположен в любом месте файловой системы. Мы решили передавать в качестве префикса пути /tmp/. Следует отметить, что если в качестве префикса передать пустую строку, то клиент не сможет подключиться к сокету и будет возвращать ошибку file not found. Кроме этого, если файл с именем локального сокета будет существовать перед функцией, создающей локальный сокет, то ни один клиент не сможет подключиться к серверу (ошибка Connection refused). Поэтому перед созданием сокета следует проверить наличие данного файла и попытаться его удалить. Отсюда же и возникла идея с фиксированным префиксом пути – надо же как-то ликвидировать потенциальную уязвимость, дающую возможность затереть файл в любом месте системы (и тут неплохо бы вспомнить о правах доступа и использовании функции basename). Также отсюда следует тот факт, что серверных экземпляров класса PosixNamedPipe с одинаковыми именами более одного быть не должно – для корректной работы каждому имени должен соответствовать только один экземпляр класса.
Код функции open() выглядит следующим образом:
void PosixNamedPipe::open()
{
	sock= socket(AF_UNIX, SOCK_STREAM, 0);
	if(sock == -1) {
		THROW_ERROR("Create_socket failed: ");
	}
	unlink(_name.c_str());
	desc.sun_family = AF_UNIX;
	strcpy(desc.sun_path, _name.c_str());
	if (bind(sock, (sockaddr*)&desc, sizeof(struct sockaddr_un)) == -1) {
		THROW_ERROR("Connection failed(bind): ");
	}
	if (listen(sock,SOMAXCONN) == -1) {
    THROW_ERROR("Connection failed(listen): ");
  }
}

Функция connect() почти идентична:
void PosixNamedPipe::connect()
{
	sock= socket(AF_UNIX, SOCK_STREAM, 0);
	if(sock == -1)
	{
		THROW_ERROR("Create_socket failed: ");
	}
	desc.sun_family = AF_UNIX;
	strcpy(desc.sun_path, _name.c_str());
	if (::connect(sock, (sockaddr*)&desc, sizeof(struct sockaddr_un)) == -1)
	{
		THROW_ERROR("Connection failed(connect): ");
	}
}

Для того, чтобы написать WaitForConnection() согласно нашему ТЗ, в отличие от windows-версии кода тут даже не пришлось извращаться.
PosixNamedPipe* PosixNamedPipe::WaitForConnection()
{
	int client=accept(sock,NULL,NULL);
	if(client!=-1)
		return new PosixNamedPipe(client);
	else {
		THROW_ERROR("Accept error: ");
	}
}

Конструктор, вызываемый в ней также банален:
PosixNamedPipe::PosixNamedPipe(int pipe)
{
	sock=pipe;
	_server=false;
	memset(&desc, 0, sizeof(struct sockaddr_un));
}

Тут следует заметить, что для корректной работы функций bind и ::connect рекомендуется обнулять экземпляр структуры sockaddr_un перед инициализацией ее полей, в связи с чем в конструкторы класса была добавлена строчка memset(&desc, 0, sizeof(struct sockaddr_un)).
Функции чтения-записи для сокетов также довольно просты:
void PosixNamedPipe::internalReadBytes(void* buf,size_t size)
{
  size_t ret=-1;
  if ((recv(sock, buf, size, MSG_WAITALL)) == -1) {
    THROW_ERROR("Error while reading: ");
  }
}

void PosixNamedPipe::internalWriteBytes(const void* buf,size_t size)
{
  size_t ret=-1;
  if ((ret = send(sock, buf, size, 0)) == -1||ret!=size) {
    THROW_ERROR("Error while sending: ");
  }
}

В функции закрытия канала на всякий случай предусмотрено удаление файла при закрытии серверного экземпляра класса (и вызов функции закрытия также автоматически осуществляется в деструкторе):
void PosixNamedPipe::Close()
{
	if(_server)
		unlink(desc.sun_path);
	close(sock);
}
PosixNamedPipe::~PosixNamedPipe()
{
  this->Close();
}


Часть 4. Пишем многопоточный сервер для обработки клиентов


Итак, классы для работы с именованными каналами созданы. Теперь сделаем небольшой трюк, чтобы не писать #ifdef _WIN32 и подобные вещи при создании экземпляров данных классов в наших приложениях:

//NamedPipe.h
#pragma once
#ifdef _WIN32
	#include "WinNamedPipe.h"
	typedef WinNamedPipe NamedPipe;
#else
	#include <unistd.h>
	#include "PosixNamedPipe.h"
	typedef PosixNamedPipe NamedPipe;
#endif

Теперь в любом проекте делаем #include «NamedPipe.h» и можем писать код вида NamedPipe* client=new NamedPipe(«NamedPipe.h»,0) не заботясь о том, под какую из операционных систем будет собираться наше приложение.
Для обработки входящих запросов из внешних компонент в нашем проекте был написан простой многопоточный класс-сервер на основе паттерна thread pool с использованием библиотек boost::thread и Intel TBB. Основная идея была такой – в одном потоке в бесконечном цикле идет вызов WaitForConnection(), а указатели на объекты NamedPipe с подключенными клиентами складываются в очередь tbb::concurrent_queue, откуда их забирают потоки, непосредственно занятые чтением-записью и обработкой входящих сообщений.
Код функции, занимающейся приемом входящих соединений выглядит так:
void NamedPipeServer::run()
{
	NamedPipe* client;
	_pipe=new NamedPipe(_name, 1);
	try
	{
		_pipe->ConnectOrOpen();
	}
	catch(const std::exception& e)
	{
	}
	while(_active)
	{
		try
		{
			client=_pipe->WaitForConnection();
			_clients.push(client);
		}
		catch(const std::exception& e)
		{
		}
	}
	delete _pipe;
}

Заметим, что на месте пустых скобок после catch у нас стоял вызов макросов логирования, но в коде, переписанном для данной статьи, данные вызовы были убраны, чтобы не перегружать нашу библиотеку зависимостями.
По нашему замыслу работа с клиентами должна описываться в функции handleClient(NamedPipe* client), которая в классе сервера сделана виртуальной и должна переопределяться в классе-наследнике.
В каждом потоке-обработчике крутится следующий цикл:
void NamedPipeServer::workerProc()
{
	NamedPipe* client;
	while(_active)
		if(_clients.try_pop(client))
		{
			handleClient(client);
		}
		else
			boost::this_thread::sleep(boost::posix_time::milliseconds(100));
}

В цикле поток пытается достать клиента из очереди и в случае успеха вызывает handleClient, иначе – засыпает на некоторое время, чтобы избежать холостой загрузки процессора.
Для запуска всех потоков вызывается метод Start(), который вызывает метод startWorkers(), создающий потоки:
void NamedPipeServer::Start()
{
	_active=true;
	startWorkers();
}
void NamedPipeServer::startWorkers()
{
	for (size_t i = 0; i < _thread_pool_size; ++i)
	{
		boost::shared_ptr<boost::thread> thread(new boost::thread(boost::bind(&NamedPipeServer::workerProc, this)));
		_threads.push_back(thread);
	}
	boost::shared_ptr<boost::thread> dispatcher(new boost::thread(boost::bind(&NamedPipeServer::run,this)));
	_threads.push_back(dispatcher);
}

Отметим, что метод Start() не блокирует поток выполнения. Для блокировки его до завершения всех потоков сервера следует использовать метод JoinWorkers:
void NamedPipeServer::JoinWorkers()
{
	size_t size=_threads.size();
	for (std::size_t i = 0; i < size; ++i)
		_threads[i]->join();
	for (std::size_t i = 0; i < size; ++i)
		_threads[i].reset();
	_threads.clear();
}

На практике в нашем приложении считается, что запущенные серверы работают до тех пор, пока приложение не закроется. Это избавило нас от проблем, возникающих при попытке остановки потоков сервера именованных каналов.
Для того, чтобы попробовать остановить потоки сервера можно написать, например, такой метод Stop():
void NamedPipeServer::Stop()
{
	_active=false;
	this->JoinWorkers();
}

Однако при вызове данного метода поток «встанет». Причина тому – тот факт, что один из потоков заблокирован функцией WaitForConnection().
И выход возможен только после того, как к нам присоединится очередной клиент (соответственно после такого соединения клиент обломается).
Самый простой (но не самый хороший) вариант решения проблемы возможного блокирования – создать в функции Stop клиентский именованный канал и подключиться к нашему серверу, сбросив тем самым блокировку потока WaitForConnection.
Для более правильного решения этой проблемы нам следует изменить поведение функции WaitForConnection(), добавив в нее таймаут.
Новая функция для Windows-версии выглядит так:
WinNamedPipe*  WinNamedPipe::WaitForConnection(unsigned int timeout)
{
	if(_server)	 
	{
		OVERLAPPED lpOverlapped = {0};
		lpOverlapped.hEvent = CreateEvent(0,1,1,0);
		if(ConnectNamedPipe(_hPipe, &lpOverlapped)==0)
		{
			if(GetLastError()==ERROR_PIPE_CONNECTED)
				if (!SetEvent(lpOverlapped.hEvent)) 
					THROW_LAST_ERROR("AsyncWaitForConnection failed");
			int result = WaitForSingleObject(lpOverlapped.hEvent,timeout);
			if (WAIT_OBJECT_0 == result)
			{
				HANDLE client=_hPipe;
				open();
				return new WinNamedPipe(client);
			}
			else
			{
				return NULL;
			}
		}
		else
		{
			THROW_LAST_ERROR("AsyncWaitForConnection failed");
		}
	}
	else
	{
		throw std::runtime_error("WaitForConnection is not supported on client pipe\n");
	}
}

Для того, чтобы данный метод корректно работал, надо изменить вызов функции CreateNamedPipe в open() следующим образом:
_hPipe = CreateNamedPipe( 
		(LPSTR)_name.data(),           
		PIPE_ACCESS_DUPLEX |     
             FILE_FLAG_OVERLAPPED,       // Вот тут мы включили поддержку неблокирующих операций
		PIPE_TYPE_BYTE |  
		PIPE_WAIT,             
		PIPE_UNLIMITED_INSTANCES, 
		BUFFER_PIPE_SIZE,              
		BUFFER_PIPE_SIZE,               
		0,              
		NULL);                    // default security attribute 

Реализация для linux выглядит так:
PosixNamedPipe* PosixNamedPipe::WaitForConnection(unsigned int timeout)
{ 
	int nsock;              
	int retour;             
	fd_set readf;          
	fd_set writef;           
	struct timeval to;      

	FD_ZERO(&readf);
	FD_ZERO(&writef);
	FD_SET(sock, &readf);
	FD_SET(sock, &writef);
	to.tv_usec = timeout*1000;

	retour = select(sock+1, &readf, &writef, 0, &to);

	if (retour == 0)  
	{
		return NULL;
	}

	if ( (FD_ISSET(sock, &readf)) || (FD_ISSET(sock,&writef))) 
	{
			nsock = accept(sock, NULL, NULL);
			return new PosixNamedPipe(nsock);
	}
	else
	{
		throw std::runtime_error("invalid socket descriptor!\n");
	}
}

В завершение отметим, что деструктор класса сервера для корректной работы должен вызывать метод Stop() и затем очищать всю очередь _clients при наличии там необработанных клиентов. Так как очередь клиентов в сервере состоит из указателей, то ее также надо очистить вручную:

NamedPipeServer::~NamedPipeServer(void)
{
	this->Stop();
	while(!_clients.empty())
	{
		if(_clients.try_pop(_pipe))
			delete _pipe;
	}
}

Часть 5. Пример использования библиотеки.


Для тестирования библиотеки была написана спамилка, в бесконечном цикле создающая два потока, которые пытаются соединиться с сервером и отправить ему 10 строчек. После завершения потоков цикл повторяется.
Каждый поток выглядит так:
void spamming_thread()
{
	std::vector<std::string> words;
	words.push_back(std::string("one "));
	words.push_back(std::string("two "));
	words.push_back(std::string("three "));
	words.push_back(std::string("four "));
	words.push_back(std::string("five "));
	words.push_back(std::string("six "));
	words.push_back(std::string("seven "));
	words.push_back(std::string("eight "));
	words.push_back(std::string("nine "));
	words.push_back(std::string("ten "));

	NamedPipe client("NamedPipeTester",0);
	try
	{
		client.ConnectOrOpen();
		for(int i=0;i<words.size();++i)
		{
			std::cout<<"sending "<<words[i]<<"\n";
			size_t size=words[i].size();
			client.WriteBytes(&size,sizeof(size));
			client.WriteBytes(words[i].data(),size);
		}
		client.Close();
	}
	catch(const std::runtime_error &e)
	{
		std::cout<<"Exception: "<<e.what()<<"\n";
	}
}

Теперь напишем сервер, который принимает подключения:
#pragma once
#include "../NamedPipeServer.h"

class SimpleServer: public NamedPipeServer
{
protected:
	void handleClient(NamedPipe* client);
public:
	SimpleServer(const std::string &name):NamedPipeServer(name,1){};
	~SimpleServer(){};
};
void SimpleServer::handleClient(NamedPipe* client)
{
	for(int i=0;i<10;++i)
	{
		try
		{
			size_t size;
			client->ReadBytes(&size,sizeof(size));
			if(size>0)
			{
				char* message=new char[size];
				client->ReadBytes(message,size);
				//Using std::cout is bad in multi-threading apps
				//std::string msg(message,size);
				//std::cout<<"Message from pipe: "<<msg<<"\n";
				delete[] message;
			}
		}
		catch(const std::exception& e)
		{
			std::cout<<"Exception!:"<<e.what()<<"\n";
		}
	}
	client->Close();
}

Запустим его, подождем полминуты и попробуем остановить:
		SimpleServer* s=new SimpleServer("NamedPipeTester");
		s->Start();
		boost::this_thread::sleep(boost::posix_time::milliseconds(30000));
		delete s;
		system("pause");

Заключение.


Данная библиотека вот уже полгода успешно используется в нашем проекте. Для хабра код библиотеки был исправлен и дополнен. Тем не менее, так как статья написана начинающим программистом, то в коде возможны ошибки. Автор будет благодарен за конструктивную критику, замечания и предложения по улучшению и за найденные ошибки.
Данная статья не претендует на полноту. В ней не рассмотрены такие вещи, как неблокирующее чтение и запись, установка прав доступа для именованных каналов и альтернатива побайтовому режиму чтения-записи — режим обмена сообщениями.
Весь исходный код к статье вы можете найти на github.com/xpavlov/libNamedPipe
Tags:
Hubs:
+24
Comments36

Articles

Change theme settings