Введение
В замечательной статье с trapexit «Building a Non-blocking TCP server using OTP principles» рассказывается, как построить неблокирующий TCP сервер используя принципы OTP. Думаю, каждый, кто начинал изучать elrlang рано или поздно сталкивался с этой статьей. Для построения неблокирующего TCP сервера в указанной выше статье используется недокументированный функционал из модуля prim_inet.
Не буду филосовствовать хорошо или плохо использовать undocumented features, в некоторых «костыльных» решениях это действительно нужно, в продакшене я бы предпочел использовать проверенные средства. На заметку, даже в самой статье автор предупреждает: "Examining prim_inet module reveals an interesting fact that the actual call to inet driver to accept a client socket is asynchronous. While this is a non-documented property, which means that the OTP team is free to change this implementation, we will exploit this functionality in the construction of our server [1]."
Под неблокирующим сервером мы подразумеваем, что слушающий процесс и FSM не должны делать каких-либо блокирующих вызовов и быстро реагировать на входящие сообщения (например, изменения в конфигурации, перезапуск и т.д.), не вызывая таймауты [2].
По поводу вырезки выше: проблемы могут возникнуть (со слушающим процессом), в том случае, если он несет дополнительную функциональную нагрузку (например содержит какие-либо дополнительные пользовательские API, которые нужно «дёргать» в процессе работы), FSM по архитектурным соображениям вообще не должен содержать блокирующих вызовов. Поэтому, если единственная функция слушателя — слушать, то нет ничего страшного, что его поток будет заблокирован ожиданием соединения, в том случае, если потребуется перезапустить данный элемент системы, он будет принудительно остановлен супервизором, по заранее установленному таймауту и затем запущен вновь (если не прав поправьте). Проблемы могут возникнуть при горячем обновлении кода (автор не проверял с какими граблями можно в данном случае столкнуться, кто пробовал поделитесь опытом).
Поставим задачу реализовать неблокирующий TCP сервер только документированными методами.
Структура сервера
Первое, что приходит в голову по поставленной задачи — это реализовать ожидание соединения в отдельном процессе. Таким образом структуру сервера можно представить следующим образом.
Рисунок 1
1. application_master:main_loop/2
2. application_master:loop_it/4
Когда запускается приложение создается процесс application_master, в логической структуре — это один процесс, но на физическом уровне создается два процесса. Application master — это лидер группы всех процессов в приложении.
3. Супервизор нашего TCP сервера (supervisor)
4. Слушатель (gen_server), поражающий процесс-слушатель (простой процесс)
5. Супервизор клиентских процессов (supervisor)
6. Процесс-слушатель (простой процесс)
7. Клиентские процессы (gen_fsm)
Исходный код
Думаю, смысла нет приводить исходный код для всех частей системы, остановимся лишь на модуле tcp_listener и процессе который он запускает.
-module(tcp_listener).
-behaviour(gen_server).
-export([start_link/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-export([accept_func/1]).
-define(SERVER, ?MODULE).
1. -define(LOGIC_MODULE, tcp_fsm).
2. -record(state, {
listener, %% Listening socket
module %% FSM handling module
}).
start_link(Port) ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [Port], []).
init([Port]) ->
Options = [{packet, raw}, {active, once}, {reuseaddr, true}],
case gen_tcp:listen(Port, Options) of
{ok, LSocket} ->
%% Create first accepting process
3. spawn_link(?MODULE, accept_func, [LSocket]),
{ok, #state{listener = LSocket, module = ?LOGIC_MODULE}};
{error, Reason} ->
error_logger:error_msg("Error: ~p~n", [Reason]), {stop, Reason}
end.
handle_call(_Request, _From, State) ->
Reply = ok,
{reply, Reply, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, #state{listener = LSocket} = _State) ->
gen_tcp:close(LSocket),
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
accept_func(LSocket) ->
4. {ok, Socket} = gen_tcp:accept(LSocket),
error_logger:info_msg("Accept connection: ~p.\n", [Socket]),
5. {ok, Pid} = tcp_client_sup:start_child(),
6. ok = gen_tcp:controlling_process(Socket, Pid),
7. tcp_fsm:set_socket(Pid, Socket),
8. accept_func(LSocket).
1. Макрос объявляющий модуль обработки соединения с клиентом.
2. Структура для хранения состояние gen-server.
3. Порождаем дополнительный процесс, который будет «слушать».
4. Ждем коннект.
5. Создаем gen_fsm (модуль tcp_fsm) для обработки соединения с клиентом.
6. Меняем контролирующий сокет процесс на только что созданный процесс в п. 5.
7. Передаем сокет модулю tcp_fsm.
8. Заново начинаем «слушать».
Тестируем
(emacs@host)2> make:all(). # компилируем
Recompile: tcp_server_sup
Recompile: tcp_listener
Recompile: tcp_fsm
Recompile: tcp_client_sup
Recompile: erltcps
up_to_date
(emacs@host)3> code:add_path("../ebin"). # добавляем путь к ebin
true
(emacs@host)4> application:load(erltcps). # загружаем приложение
ok
(emacs@host)5> application:start(erltcps). # запускаем приложение
ok
(emacs@host)6>
=INFO REPORT==== 22-Jun-2011::13:10:07 ===
Accept connection: #Port<0.2353>. # есть коннект
(emacs@host)6>
=INFO REPORT==== 22-Jun-2011::13:10:07 ===
IP: {127,0,0,1} # IP адрес
(emacs@host)6>
=INFO REPORT==== 22-Jun-2011::13:10:15 ===
<<"hello\r\n">> # получено сообщение
(emacs@host)6>
=INFO REPORT==== 22-Jun-2011::13:10:23 ===
{127,0,0,1} Client disconnected. # клиент отключился
(emacs@host)6>
Выводы
В итоге мы построили каркас TCP сервера «как-бы» не блокирующего. В нашей реализации заблокированным остается специальный процесс, единственная функция которого ждать соединения и создавать процесс для его обработки. В сам же модуль tcp_listener можно добавить дополнительную логику (например запуск/прекращение приема соединений, путем остановки слушающего процесса).
Плюсы:
- Мы не использовали undocumented features, которые в продакшене могут нам стоить очень дорого.
- Заблокированным остается специально созданный для этого процесс.
Минусы:
- В нашем OTP приложении присутствует процесс созданные не по принципам OTP.
- Если падает слушающий процесс (accept_func/1 в модуле tcp_listener), то сигнал распространяется, и tcp_listener тоже падает, благо супервизор перезапускает tcp_listener, а он в свою очередь заново создает слушающий процесс из функции accept_func/1.
Эти два минуса между собой связаны. Для каждого есть свое решение. Вот пара задачек для читателей:
1. Что нужно сделать, чтобы tcp_lictener не падал, если упадет слушающий процесс (accept_func/1)?
2. Что нужно добавить, для более безопасного использования простых процессов в OTP приложении?
Скачать
Исходный код для статьи можно скачать на github.
Что почитать?
1. Building a Non-blocking TCP server using OTP principles
2. Создание неблокирующего TCP сервера с использованием принципов OTP
3. Erlang questions mailing list ~ prim_inet
4. Отличная документация
5. Erlang applications