Как стать автором
Обновить

Неблокирующий TCP сервер без использования undocumented features

Время на прочтение5 мин
Количество просмотров5K

Введение


В замечательной статье с trapexit «Building a Non-blocking TCP server using OTP principles» рассказывается, как построить неблокирующий TCP сервер используя принципы OTP. Думаю, каждый, кто начинал изучать elrlang рано или поздно сталкивался с этой статьей. Для построения неблокирующего TCP сервера в указанной выше статье используется недокументированный функционал из модуля prim_inet.

Не буду филосовствовать хорошо или плохо использовать undocumented features, в некоторых «костыльных» решениях это действительно нужно, в продакшене я бы предпочел использовать проверенные средства. На заметку, даже в самой статье автор предупреждает: "Examining prim_inet module reveals an interesting fact that the actual call to inet driver to accept a client socket is asynchronous. While this is a non-documented property, which means that the OTP team is free to change this implementation, we will exploit this functionality in the construction of our server [1]."

Под неблокирующим сервером мы подразумеваем, что слушающий процесс и FSM не должны делать каких-либо блокирующих вызовов и быстро реагировать на входящие сообщения (например, изменения в конфигурации, перезапуск и т.д.), не вызывая таймауты [2].

По поводу вырезки выше: проблемы могут возникнуть (со слушающим процессом), в том случае, если он несет дополнительную функциональную нагрузку (например содержит какие-либо дополнительные пользовательские API, которые нужно «дёргать» в процессе работы), FSM по архитектурным соображениям вообще не должен содержать блокирующих вызовов. Поэтому, если единственная функция слушателя — слушать, то нет ничего страшного, что его поток будет заблокирован ожиданием соединения, в том случае, если потребуется перезапустить данный элемент системы, он будет принудительно остановлен супервизором, по заранее установленному таймауту и затем запущен вновь (если не прав поправьте). Проблемы могут возникнуть при горячем обновлении кода (автор не проверял с какими граблями можно в данном случае столкнуться, кто пробовал поделитесь опытом).

Поставим задачу реализовать неблокирующий TCP сервер только документированными методами.

Структура сервера


Первое, что приходит в голову по поставленной задачи — это реализовать ожидание соединения в отдельном процессе. Таким образом структуру сервера можно представить следующим образом.


Рисунок 1

1. application_master:main_loop/2
2. application_master:loop_it/4

Когда запускается приложение создается процесс application_master, в логической структуре — это один процесс, но на физическом уровне создается два процесса. Application master — это лидер группы всех процессов в приложении.

3. Супервизор нашего TCP сервера (supervisor)
4. Слушатель (gen_server), поражающий процесс-слушатель (простой процесс)
5. Супервизор клиентских процессов (supervisor)
6. Процесс-слушатель (простой процесс)
7. Клиентские процессы (gen_fsm)

Исходный код


Думаю, смысла нет приводить исходный код для всех частей системы, остановимся лишь на модуле tcp_listener и процессе который он запускает.

-module(tcp_listener).
-behaviour(gen_server).
-export([start_link/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
		 terminate/2, code_change/3]).
-export([accept_func/1]).

-define(SERVER, ?MODULE). 
1. -define(LOGIC_MODULE, tcp_fsm).

2. -record(state, {
		  listener,       %% Listening socket
		  module          %% FSM handling module
		 }).

start_link(Port) ->
	gen_server:start_link({local, ?SERVER}, ?MODULE, [Port], []).

init([Port]) ->
	Options = [{packet, raw}, {active, once}, {reuseaddr, true}],
	case gen_tcp:listen(Port, Options) of
	{ok, LSocket} ->
	   %% Create first accepting process
3.   spawn_link(?MODULE, accept_func, [LSocket]),
	   {ok, #state{listener = LSocket, module   = ?LOGIC_MODULE}};
	{error, Reason} ->
	   error_logger:error_msg("Error: ~p~n", [Reason]), {stop, Reason}
   end.


handle_call(_Request, _From, State) ->
	Reply = ok,
	{reply, Reply, State}.


handle_cast(_Msg, State) ->
	{noreply, State}.


handle_info(_Info, State) ->
	{noreply, State}.

terminate(_Reason, #state{listener = LSocket} = _State) ->
	gen_tcp:close(LSocket),
	ok.

code_change(_OldVsn, State, _Extra) ->
	{ok, State}.

accept_func(LSocket) ->
4. 	{ok, Socket} = gen_tcp:accept(LSocket),
 	error_logger:info_msg("Accept connection: ~p.\n", [Socket]),
5. 	{ok, Pid} = tcp_client_sup:start_child(),
6. 	ok = gen_tcp:controlling_process(Socket, Pid),	
7. 	tcp_fsm:set_socket(Pid, Socket),
8. 	accept_func(LSocket).

1. Макрос объявляющий модуль обработки соединения с клиентом.
2. Структура для хранения состояние gen-server.
3. Порождаем дополнительный процесс, который будет «слушать».
4. Ждем коннект.
5. Создаем gen_fsm (модуль tcp_fsm) для обработки соединения с клиентом.
6. Меняем контролирующий сокет процесс на только что созданный процесс в п. 5.
7. Передаем сокет модулю tcp_fsm.
8. Заново начинаем «слушать».

Тестируем


(emacs@host)2> make:all(). # компилируем
Recompile: tcp_server_sup
Recompile: tcp_listener
Recompile: tcp_fsm
Recompile: tcp_client_sup
Recompile: erltcps
up_to_date
(emacs@host)3> code:add_path("../ebin"). # добавляем путь к ebin
true
(emacs@host)4> application:load(erltcps). # загружаем приложение
ok
(emacs@host)5> application:start(erltcps). # запускаем приложение
ok
(emacs@host)6> 
=INFO REPORT==== 22-Jun-2011::13:10:07 ===
Accept connection: #Port<0.2353>. # есть коннект
(emacs@host)6> 
=INFO REPORT==== 22-Jun-2011::13:10:07 ===
IP: {127,0,0,1} # IP адрес
(emacs@host)6> 
=INFO REPORT==== 22-Jun-2011::13:10:15 ===
<<"hello\r\n">> # получено сообщение
(emacs@host)6> 
=INFO REPORT==== 22-Jun-2011::13:10:23 ===
{127,0,0,1} Client disconnected. # клиент отключился
(emacs@host)6> 


Выводы


В итоге мы построили каркас TCP сервера «как-бы» не блокирующего. В нашей реализации заблокированным остается специальный процесс, единственная функция которого ждать соединения и создавать процесс для его обработки. В сам же модуль tcp_listener можно добавить дополнительную логику (например запуск/прекращение приема соединений, путем остановки слушающего процесса).
Плюсы:
  • Мы не использовали undocumented features, которые в продакшене могут нам стоить очень дорого.
  • Заблокированным остается специально созданный для этого процесс.

Минусы:
  • В нашем OTP приложении присутствует процесс созданные не по принципам OTP.
  • Если падает слушающий процесс (accept_func/1 в модуле tcp_listener), то сигнал распространяется, и tcp_listener тоже падает, благо супервизор перезапускает tcp_listener, а он в свою очередь заново создает слушающий процесс из функции accept_func/1.

Эти два минуса между собой связаны. Для каждого есть свое решение. Вот пара задачек для читателей:
1. Что нужно сделать, чтобы tcp_lictener не падал, если упадет слушающий процесс (accept_func/1)?
2. Что нужно добавить, для более безопасного использования простых процессов в OTP приложении?

Скачать


Исходный код для статьи можно скачать на github.

Что почитать?


1. Building a Non-blocking TCP server using OTP principles
2. Создание неблокирующего TCP сервера с использованием принципов OTP
3. Erlang questions mailing list ~ prim_inet
4. Отличная документация
5. Erlang applications
Теги:
Хабы:
Всего голосов 29: ↑25 и ↓4+21
Комментарии15

Публикации