
Процессы в Elixir (ну и в Erlang конечно же) идентифицируются с помощью уникального идентификатора процесса — pid.
Мы используем их, чтобы взаимодействовать с процессами. Сообщения посылаются как бы в pid, а виртуальная машина сама заботится о доставке этих сообщений в правильный процесс.
Иногда, впрочем, чрезмерное доверие к pid может приводить к значительным проблемам.
К примеру, мы можем хранить pid уже мёртвого процесса, или мы можем использовать Supervisor, который абстрагирует создание процессов от нас, поэтому мы даже не знаем, какой у них pid (пер: а ещё Supervisor можете перезапустить упавший процесс с другим pid, и мы об этом не узнаем никак).
Давайте создадим простое приложение и посмотрим: с какими проблемами мы можем столкнуться и как мы эти проблемы будем решать.
Начинаем вообще без реестра
Для первого примера — создадим простой чат. Начнём с создания mix проекта:
$ mix new chatСоздадим абсолютно стандартный GenServer, который будем использовать на протяжении всех примеров в этом статье:
# ./lib/chat/server.ex
defmodule Chat.Server do
use GenServer
# API
def start_link do
GenServer.start_link(__MODULE__, [])
end
def add_message(pid, message) do
GenServer.cast(pid, {:add_message, message})
end
def get_messages(pid) do
GenServer.call(pid, :get_messages)
end
# SERVER
def init(messages) do
{:ok, messages}
end
def handle_cast({:add_message, new_message}, messages) do
{:noreply, [new_message | messages]}
end
def handle_call(:get_messages, _from, messages) do
{:reply, messages, messages}
end
endЕсли такой код кажется вам незнакомым или не понятным — почитайте начало работы с Elixir, в котором есть отличные параграфы об OTP.Запустим iex сессию с mix окружением и попробу��м поработать с нашим сервером:
$ iex -S mix
iex> {:ok, pid} = Chat.Server.start_link
{:ok, #PID<0.107.0>}
iex> Chat.Server.add_message(pid, "foo")
:ok
iex> Chat.Server.add_message(pid, "bar")
:ok
iex> Chat.Server.get_messages(pid)
["bar", "foo"]Код этого этапа — вот в этом коммите
На этом этапе вроде как всё так хорошо, что просто замечательно. Мы получаем pid процесса, затем для каждого сообщения, которое мы хотим послать (add_message/2 и get_messages/1) мы передаём этот pid — и всё работает настолько предсказуемо, что даже скучно.
Впрочем, веселуха начинается тогда, когда мы попробуем добавить Supervisor...
Очень приятно: я — Supervisor!
Итак, по какой-то причине наш процесс Chat.Server умирает. Мы остаёмся одни в пустой и холодной iex сессии, и у нас нету другого выбора, кроме как запустить новый процесс, получить его pid и писать сообщения уже на этот новый pid. Так давайте же создадим Supervisor — и нам не придётся беспокоится о таких мелочах!
# ./lib/chat/supervisor.ex
defmodule Chat.Supervisor do
use Supervisor
def start_link do
Supervisor.start_link(__MODULE__, [])
end
def init(_) do
children = [
worker(Chat.Server, [])
]
supervise(children, strategy: :one_for_one)
end
endНу, создать Supervisor очень просто. Но у нас теперь проблема, если модель поведения нашего сервера не изменится. Ведь мы не запускаем процесс Chat.Server сами, Supervisor делает это за нас. И поэтому мы не имеем никакого доступа к pid процесса!
Эта не баг, а фича такого OTP паттерна, как Supervisor. Мы не можем получить доступ к pid его дочерних процессов, потому что он может неожиданно (но, естественно, только в случае необходимости) перезапустить процесс, а фактически убить его и создать новый с новым pid.
Регистрируем имена процессов
Чтобы получить доступ к нашему процессу Chat.Server нам нужно придумать способ указывать на процесс, другой — не pid. нам нужен такой указатель, чтобы он сохранялся даже при рестарте процесса через Supervisor (пер: то есть даже тогда, когда меняется pid).
И такой указатель называется имя!
Для начала, изменим Chat.Server:
# ./lib/chat/server.ex
defmodule Chat.Server do
use GenServer
def start_link do
# We now start the GenServer with a `name` option.
GenServer.start_link(__MODULE__, [], name: :chat_room)
end
# And our function doesn't need to receive the pid anymore,
# as we can reference the process with its unique name.
def add_message(message) do
GenServer.cast(:chat_room, {:add_message, message})
end
def get_messages do
GenServer.call(:chat_room, :get_messages)
end
# ...
endИзменения — вот в этом коммите
Сейчас всё должно работать так же, но только лучше — ведь мы не должны передавать везде этот pid:
$ iex -S mix
iex> Chat.Supervisor.start_link
{:ok, #PID<0.94.0>}
iex> Chat.Server.add_message("foo")
:ok
iex> Chat.Server.add_message("bar")
:ok
iex> Chat.Server.get_messages
["bar", "foo"]Даже если процесс перезапустится — всё равно мы сможем обратиться к нему тем же способом:
iex> Process.whereis(:chat_room)
#PID<0.111.0>
iex> Process.whereis(:chat_room) |> Process.exit(:kill)
true
iex> Process.whereis(:chat_room)
#PID<0.114.0>
iex> Chat.Server.add_message "foo"
:ok
iex> Chat.Server.get_messages
["foo"]Ну, для наших текущих задач вроде бы проблемы решены, но давайте попробуем сделать что-нибудь посложнее (и более приближённое к реальным задачам).
Динамическое создание процессов
Представьте, что нам необходимо поддерживать несколько чат-комнат. Клиент может создать новую комнату с именем, и он ожидает, что сможет посылать сообщения в ту комнату, которую он хочет. Тогда интерфейс должен быть приблизительно такой:
iex> Chat.Supervisor.start_room("first room")
iex> Chat.Supervisor.start_room("second room")
iex> Chat.Server.add_message("first room", "foo")
iex> Chat.Server.add_message("second room", "bar")
iex> Chat.Server.get_messages("first room")
["foo"]
iex> Chat.Server.get_messages("second room")
["bar"]Начнём пожалуй сверху, и изменим Supervisor чтобы всё это поддерживать:
# ./lib/chat/supervisor.ex
defmodule Chat.Supervisor do
use Supervisor
def start_link do
# We are now registering our supervisor process with a name
# so we can reference it in the `start_room/1` function
Supervisor.start_link(__MODULE__, [], name: :chat_supervisor)
end
def start_room(name) do
# And we use `start_child/2` to start a new Chat.Server process
Supervisor.start_child(:chat_supervisor, [name])
end
def init(_) do
children = [
worker(Chat.Server, [])
]
# We also changed the `strategty` to `simple_one_for_one`.
# With this strategy, we define just a "template" for a child,
# no process is started during the Supervisor initialization,
# just when we call `start_child/2`
supervise(children, strategy: :simple_one_for_one)
end
endИ давайте заставим наш Chat.Server принимать имена в start_link функции:
# ./lib/chat/server.ex
defmodule Chat.Server do
use GenServer
# Just accept a `name` parameter here for now
def start_link(name) do
GenServer.start_link(__MODULE__, [], name: :chat_room)
end
#...
endИзменения — вот в этом коммите
А вот и проблема! У нас же может быть несколько процессов Chat.Server, и они не могут все быть с именем :chat_room. Беда...
$ iex -S mix
iex> Chat.Supervisor.start_link
{:ok, #PID<0.107.0>}
iex> Chat.Supervisor.start_room "foo"
{:ok, #PID<0.109.0>}
iex> Chat.Supervisor.start_room "bar"
{:error, {:already_started, #PID<0.109.0>}}Честно говоря, VM очень красноречива. Мы пытаемся создать второй процесс, но процесс с таким именем уже существует, о чём нам так ехидно напоминает среда. Надо придумать какой то другой способ, но какой?..
К сожалению, тип аргумента name определён достаточно чётко. Мы не можем использовать что-то типа {:chat_room, "room name"}. Давайте обратимся к документации:
Поддерживаемые значения:
atom— в этом случаеGenServerрегистрируется локально с данным именемatomс помощьюProcess.register/2.
{:global, term}— в этом случаеGenServerрегистрируется глобально с данным именемtermс помощью функций в модуле:global.
{:via, module, term}— в этом случаеGenServerрегистрируется с помощью определённого вmoduleмеханизма и имени `term.
The supported values are:
anatom— theGenServeris registered locally with the given name usingProcess.register/2.
{:global, term}— theGenServeris registered globally with the given term using the functions in the:globalmodule.
{:via, module, term}— theGenServeris registered with the given mechanism and name.
Первую опцию — atom, мы уже использовали, и точно знаем, что в нашем хитром случае она не подходит.
Вторая опция используется для регистрации процесса глобально в кластере нод. Она использует локальную ETS таблицу. Кроме того она будет требовать постоянной синхронизации внутри нод в кластере, в связи с чем работа программы будет замедляться. Так что используйте её только когда это действительно нужно.
Третья, и последняя, опция использует в качестве параметра кортеж с :via, и это как раз то, что нам надо для решения нашей проблемы! Вот что говорит по этому поводу документация:
Опция:viaпринимает в качестве параметра модуль, который имеет следующий интерфейс:register_name/2,unregister_name/1,whereis_name/1иsend/2.
The :via option expects a module that exports register_name/2, unregister_name/1, whereis_name/1 and send/2.
Вообще ничего не понятно? Мне тоже! Так что посмотрим этот метод в деле.
Используем кортеж :via
Итак, кортеж :via — это способ сказать Elixir, что мы собираемся использовать отдельный модуль для регистрации наших процессов. Этот модуль должен делать следующие вещи:
- Регистрировать имя, которым может быть любой
term, с помощью функцииregister_name/2; - Удалять имена из регистра, с помощью функции
unregister_name/1; - Находить
pidпо имени, с помощьюwhereis_name/1; - Посылать сообщения определённому процессу с помощью
send/2.
Чтобы это всё работало, вышеперечисленные функции должны передавать ответ в определённом формате, определённом в OTP — так же как и handle_call/3 и handle_cast/2 подчиняются определённым правилам.
Попробуем определить модуль, которые всё это знает:
# ./lib/chat/registry.ex
defmodule Chat.Registry do
use GenServer
# API
def start_link do
# We register our registry (yeah, I know) with a simple name,
# just so we can reference it in the other functions.
GenServer.start_link(__MODULE__, nil, name: :registry)
end
def whereis_name(room_name) do
GenServer.call(:registry, {:whereis_name, room_name})
end
def register_name(room_name, pid) do
GenServer.call(:registry, {:register_name, room_name, pid})
end
def unregister_name(room_name) do
GenServer.cast(:registry, {:unregister_name, room_name})
end
def send(room_name, message) do
# If we try to send a message to a process
# that is not registered, we return a tuple in the format
# {:badarg, {process_name, error_message}}.
# Otherwise, we just forward the message to the pid of this
# room.
case whereis_name(room_name) do
:undefined ->
{:badarg, {room_name, message}}
pid ->
Kernel.send(pid, message)
pid
end
end
# SERVER
def init(_) do
# We will use a simple Map to store our processes in
# the format %{"room name" => pid}
{:ok, Map.new}
end
def handle_call({:whereis_name, room_name}, _from, state) do
{:reply, Map.get(state, room_name, :undefined), state}
end
def handle_call({:register_name, room_name, pid}, _from, state) do
# Registering a name is just a matter of putting it in our Map.
# Our response tuple include a `:no` or `:yes` indicating if
# the process was included or if it was already present.
case Map.get(state, room_name) do
nil ->
{:reply, :yes, Map.put(state, room_name, pid)}
_ ->
{:reply, :no, state}
end
end
def handle_cast({:unregister_name, room_name}, state) do
# And unregistering is as simple as deleting an entry
# from our Map
{:noreply, Map.delete(state, room_name)}
end
endОпять же: в наших руках выбрать, каким образом наш реестр будет внутри работать. Здесь мы используем простую Map для связи имени и pid. Этот код абсолютно прост и прямолинеен, особенно если вы хорошо знаете как работает GenServer. Незнакомыми могут казаться только возвращаемые функциями значения.
Пришло время попробовать наш реестр в iex сессии:
$ iex -S mix
iex> {:ok, pid} = Chat.Server.start_link("room1")
{:ok, #PID<0.107.0>}
iex> Chat.Registry.start_link
{:ok, #PID<0.109.0>}
iex> Chat.Registry.whereis_name("room1")
:undefined
iex> Chat.Registry.register_name("room1", pid)
:yes
iex> Chat.Registry.register_name("room1", pid)
:no
iex> Chat.Registry.whereis_name("room1")
#PID<0.107.0>
iex> Chat.Registry.unregister_name("room1")
:ok
iex> Chat.Registry.whereis_name("room1")
:undefined5 секунд — полёт отличный! Реестр работает как надо: и регистрирует, и удаляет регистрацию. Попробуем его использовать в наших чатах.
Наша проблема была в том, что у нас были нескольких запущенных серверов Chat.Server, инициализированных через Supervisor. Чтобы отправить сообщение в определённую комнату, мы хотели бы вызывать Chat.Server.add_message(“room1”, “my message”), поэтому мы должны были бы регистрировать имена серверов как {:chat_room, “room1”} и {:chat_room, “room2”}. Вот как это делается через кортеж :via:
# ./lib/chat/server.ex
defmodule Chat.Server do
use GenServer
# API
def start_link(name) do
# Instead of passing an atom to the `name` option, we send
# a tuple. Here we extract this tuple to a private method
# called `via_tuple` that can be reused in every function
GenServer.start_link(__MODULE__, [], name: via_tuple(name))
end
def add_message(room_name, message) do
# And the `GenServer` callbacks will accept this tuple the
# same way it accepts a pid or an atom.
GenServer.cast(via_tuple(room_name), {:add_message, message})
end
def get_messages(room_name) do
GenServer.call(via_tuple(room_name), :get_messages)
end
defp via_tuple(room_name) do
# And the tuple always follow the same format:
# {:via, module_name, term}
{:via, Chat.Registry, {:chat_room, room_name}}
end
# SERVER (no changes required here)
# ...
endИзменения — вот в этом коммите
Вот что здесь происходит: каждый раз, когда мы посылаем сообщение в Chat.Server, передавая имя комнаты, он сам будет находить pid нужного процесса с помощью того модуля, который мы ему передали в кортеже :via (в данном случае это Chat.Registry).
Это решает нашу проблему: теперь мы можем использовать любое количество Chat.Server процессов (ну, пока не закончится фантазия на имена), и нам никогда не надо знать их pid. Совсем.
Впрочем, есть ещё одна проблема в таком решении. Догадались?
Именно! Наш реестр не знает о процессах, которые упали, и должны быть перезапущены через Supervisor. А это значит, что когда такое произойдёт, реестр не даст пересоздать запись с таким же именем, и будет хранить pid мёртвого процесса.
По идее, решение этой проблемы — не слишком сложное. Мы заставим наш реестр осуществлять мониторинг всех процессов, pid которых он хранит. Как только такой "наблюдаемый" процесс упадёт — мы его просто удалим из нашего реестра.
# in lib/chat/registry.ex
defmodule Chat.Registry do
# ...
def handle_call({:register_name, room_name, pid}, _from, state) do
case Map.get(state, room_name) do
nil ->
# When a new process is registered, we start monitoring it.
Process.monitor(pid)
{:reply, :yes, Map.put(state, room_name, pid)}
_ ->
{:reply, :no, state}
end
end
def handle_info({:DOWN, _, :process, pid, _}, state) do
# When a monitored process dies, we will receive a
# `:DOWN` message that we can use to remove the
# dead pid from our registry.
{:noreply, remove_pid(state, pid)}
end
def remove_pid(state, pid_to_remove) do
# And here we just filter out the dead pid
remove = fn {_key, pid} -> pid != pid_to_remove end
Enum.filter(state, remove) |> Enum.into(%{})
end
endИзменения — вот в этом коммите
Убедимся в том, что всё работает:
$ iex -S mix
iex> Chat.Registry.start_link
{:ok, #PID<0.107.0>}
iex> Chat.Supervisor.start_link
{:ok, #PID<0.109.0>}
iex> Chat.Supervisor.start_room("room1")
{:ok, #PID<0.111.0>}
iex> Chat.Server.add_message("room1", "message")
:ok
iex> Chat.Server.get_messages("room1")
["message"]
iex> Chat.Registry.whereis_name({:chat_room, "room1"}) |> Process.exit(:kill)
true
iex> Chat.Server.add_message("room1", "message")
:ok
iex> Chat.Server.get_messages("room1")
["message"]Ну, теперь уже совершенно не важно, сколько раз Supervisor перезапустит процесс Chat.Server: как только мы посылаем сообщение в комнату — оно будет доставлено по верному pid.
Упрощаем с помощью gproc
В принципе с нашим чатом мы на этом и закончим, но хочется рассказать ещё об одной фиче, которая упростит для нас регистрацию с помощью кортежа :via. Это — gproc, — библиотека Erlang.
И научим наш Chat.Server использовать gproc вместо нашего Chat.Registry, а потом мы вообще избавимся от Chat.Registry.
Начнём пожалуй с зависимостей. Для этого добавим gproc в mix.exs:
# ./mix.exs
defmodule Chat.Mixfile do
# ...
def application do
[applications: [:logger, :gproc]]
end
defp deps do
[{:gproc, "0.3.1"}]
end
endЗатем подтянем зависимости с помощью:
$ mix deps.getТеперь мы можем поменять нашу регистрацию с помощью кортежа :via — пусть использует gproc, а не Chat.Registry:
# ./lib/chat/server.ex
defmodule Chat.Server do
# ...
# The only thing we need to change is the `via_tuple/1` function,
# to make it use `gproc` instead of `Chat.Registry`
defp via_tuple(room_name) do
{:via, :gproc, {:n, :l, {:chat_room, room_name}}}
end
# ...
endgproc использует ключи-кортежи, состоящие из трёх значений: {type, scope, key}.
В нашем случае мы используем:
:n— это значитимя, то есть не может быть больше одного процесса, зарегистрированного под таким ключом;:l— это значитlocal, то есть процесс регистрируется только на нашей ноде;{:chat_room, room_name}— это сам ключ в виде кортежа.
Дополнительную информацию по возможным настройкам gproc искать тут.
После таких изменений вообще выкинем наш Chat.Registry, и проверим что всё продолжает работать в iex сессии:
$ iex -S mix
iex> Chat.Supervisor.start_link
{:ok, #PID<0.190.0>}
iex> Chat.Supervisor.start_room("room1")
{:ok, #PID<0.192.0>}
iex> Chat.Supervisor.start_room("room2")
{:ok, #PID<0.194.0>}
iex> Chat.Server.add_message("room1", "first message")
:ok
iex> Chat.Server.add_message("room2", "second message")
:ok
iex> Chat.Server.get_messages("room1")
["first message"]
iex> Chat.Server.get_messages("room2")
["second message"]
iex> :gproc.where({:n, :l, {:chat_room, "room1"}}) |> Process.exit(:kill)
true
iex> Chat.Server.add_message("room1", "first message")
:ok
iex> Chat.Server.get_messages("room1")
["first message"]Изменения — вот в этом коммите
Куда плыть дальше, капитан?
Мы с вами разобрались в куче сложных вопросов. Основные выводы:
- Будьте осторожны, работая с
pidнапрямую: они поменяются как только процесс перезапустится. - Если вам нужно получить ссылку только на один процесс ( как у нас было с единственной комнатой в чате), регистрация процесса с именем в виде атома — достаточная мера;
- Если вам нужно создавать процессы динамически (множество чат-комнат), вы можете использовать кортеж
:viaдля предоставления своего собственного реестра; - Подобные реестры уже существуют ( к примеру
gproc), и если вы их используете — не придётся строить свой велосипед;
Конечно, это ещё не всё. Если вам нужна глобальная регистрация на всех нодах в кластере, другие средства тоже могут быть хороши. У Erlang есть глобальные модули для глобальных регистраций, pg2 для групп процессов, да и тот же gprc может вам помочь.
Если эта статья вас заинтересовала — почитайте Saša Jurić. Elixir in Action.
А вот и репка с сыром)
