
Процессы в Elixir (ну и в Erlang конечно же) идентифицируются с помощью уникального идентификатора процесса — pid.
Мы используем их, чтобы взаимодействовать с процессами. Сообщения посылаются как бы в pid, а виртуальная машина сама заботится о доставке этих сообщений в правильный процесс.
Иногда, впрочем, чрезмерное доверие к pid может приводить к значительным проблемам.
К примеру, мы можем хранить pid уже мёртвого процесса, или мы можем использовать Supervisor, который абстрагирует создание процессов от нас, поэтому мы даже не знаем, какой у них pid (пер: а ещё Supervisor можете перезапустить упавший процесс с другим pid, и мы об этом не узнаем никак).
Давайте создадим простое приложение и посмотрим: с какими проблемами мы можем столкнуться и как мы эти проблемы будем решать.
Начинаем вообще без реестра
Для первого примера — создадим простой чат. Начнём с создания mix проекта:
$ mix new chat
Создадим абсолютно стандартный GenServer, который будем использовать на протяжении всех примеров в этом статье:
# ./lib/chat/server.ex defmodule Chat.Server do use GenServer # API def start_link do GenServer.start_link(__MODULE__, []) end def add_message(pid, message) do GenServer.cast(pid, {:add_message, message}) end def get_messages(pid) do GenServer.call(pid, :get_messages) end # SERVER def init(messages) do {:ok, messages} end def handle_cast({:add_message, new_message}, messages) do {:noreply, [new_message | messages]} end def handle_call(:get_messages, _from, messages) do {:reply, messages, messages} end end
Если такой код кажется вам незнакомым или не понятным — почитайте начало работы с Elixir, в котором есть отличные параграфы об OTP.Запустим iex сессию с mix окружением и попробуем поработать с нашим сервером:
$ iex -S mix iex> {:ok, pid} = Chat.Server.start_link {:ok, #PID<0.107.0>} iex> Chat.Server.add_message(pid, "foo") :ok iex> Chat.Server.add_message(pid, "bar") :ok iex> Chat.Server.get_messages(pid) ["bar", "foo"]
Код этого этапа — вот в этом коммите
На этом этапе вроде как всё так хорошо, что просто замечательно. Мы получаем pid процесса, затем для каждого сообщения, которое мы хотим послать (add_message/2 и get_messages/1) мы передаём этот pid — и всё работает настолько предсказуемо, что даже скучно.
Впрочем, веселуха начинается тогда, когда мы попробуем добавить Supervisor...
Очень приятно: я — Supervisor!
Итак, по какой-то причине наш процесс Chat.Server умирает. Мы остаёмся одни в пустой и холодной iex сессии, и у нас нету другого выбора, кроме как запустить новый процесс, получить его pid и писать сообщения уже на этот новый pid. Так давайте же создадим Supervisor — и нам не придётся беспокоится о таких мелочах!
# ./lib/chat/supervisor.ex defmodule Chat.Supervisor do use Supervisor def start_link do Supervisor.start_link(__MODULE__, []) end def init(_) do children = [ worker(Chat.Server, []) ] supervise(children, strategy: :one_for_one) end end
Ну, создать Supervisor очень просто. Но у нас теперь проблема, если модель поведения нашего сервера не изменится. Ведь мы не запускаем процесс Chat.Server сами, Supervisor делает это за нас. И поэтому мы не имеем никакого доступа к pid процесса!
Эта не баг, а фича такого OTP паттерна, как Supervisor. Мы не можем получить доступ к pid его дочерних процессов, потому что он может неожиданно (но, естественно, только в случае необходимости) перезапустить процесс, а фактически убить его и создать новый с новым pid.
Регистрируем имена процессов
Чтобы получить доступ к нашему процессу Chat.Server нам нужно придумать способ указывать на процесс, другой — не pid. нам нужен такой указатель, чтобы он сохранялся даже при рестарте процесса через Supervisor (пер: то есть даже тогда, когда меняется pid).
И такой указатель называется имя!
Для начала, изменим Chat.Server:
# ./lib/chat/server.ex defmodule Chat.Server do use GenServer def start_link do # We now start the GenServer with a `name` option. GenServer.start_link(__MODULE__, [], name: :chat_room) end # And our function doesn't need to receive the pid anymore, # as we can reference the process with its unique name. def add_message(message) do GenServer.cast(:chat_room, {:add_message, message}) end def get_messages do GenServer.call(:chat_room, :get_messages) end # ... end
Изменения — вот в этом коммите
Сейчас всё должно работать так же, но только лучше — ведь мы не должны передавать везде этот pid:
$ iex -S mix iex> Chat.Supervisor.start_link {:ok, #PID<0.94.0>} iex> Chat.Server.add_message("foo") :ok iex> Chat.Server.add_message("bar") :ok iex> Chat.Server.get_messages ["bar", "foo"]
Даже если процесс перезапустится — всё равно мы сможем обратиться к нему тем же способом:
iex> Process.whereis(:chat_room) #PID<0.111.0> iex> Process.whereis(:chat_room) |> Process.exit(:kill) true iex> Process.whereis(:chat_room) #PID<0.114.0> iex> Chat.Server.add_message "foo" :ok iex> Chat.Server.get_messages ["foo"]
Ну, для наших текущих задач вроде бы проблемы решены, но давайте попробуем сделать что-нибудь посложнее (и более приближённое к реальным задачам).
Динамическое создание процессов
Представьте, что нам необходимо поддерживать несколько чат-комнат. Клиент может создать новую комнату с именем, и он ожидает, что сможет посылать сообщения в ту комнату, которую он хочет. Тогда интерфейс должен быть приблизительно такой:
iex> Chat.Supervisor.start_room("first room") iex> Chat.Supervisor.start_room("second room") iex> Chat.Server.add_message("first room", "foo") iex> Chat.Server.add_message("second room", "bar") iex> Chat.Server.get_messages("first room") ["foo"] iex> Chat.Server.get_messages("second room") ["bar"]
Начнём пожалуй сверху, и изменим Supervisor чтобы всё это поддерживать:
# ./lib/chat/supervisor.ex defmodule Chat.Supervisor do use Supervisor def start_link do # We are now registering our supervisor process with a name # so we can reference it in the `start_room/1` function Supervisor.start_link(__MODULE__, [], name: :chat_supervisor) end def start_room(name) do # And we use `start_child/2` to start a new Chat.Server process Supervisor.start_child(:chat_supervisor, [name]) end def init(_) do children = [ worker(Chat.Server, []) ] # We also changed the `strategty` to `simple_one_for_one`. # With this strategy, we define just a "template" for a child, # no process is started during the Supervisor initialization, # just when we call `start_child/2` supervise(children, strategy: :simple_one_for_one) end end
И давайте заставим наш Chat.Server принимать имена в start_link функции:
# ./lib/chat/server.ex defmodule Chat.Server do use GenServer # Just accept a `name` parameter here for now def start_link(name) do GenServer.start_link(__MODULE__, [], name: :chat_room) end #... end
Изменения — вот в этом коммите
А вот и проблема! У нас же может быть несколько процессов Chat.Server, и они не могут все быть с именем :chat_room. Беда...
$ iex -S mix iex> Chat.Supervisor.start_link {:ok, #PID<0.107.0>} iex> Chat.Supervisor.start_room "foo" {:ok, #PID<0.109.0>} iex> Chat.Supervisor.start_room "bar" {:error, {:already_started, #PID<0.109.0>}}
Честно говоря, VM очень красноречива. Мы пытаемся создать второй процесс, но процесс с таким именем уже существует, о чём нам так ехидно напоминает среда. Надо придумать какой то другой способ, но какой?..
К сожалению, тип аргумента name определён достаточно чётко. Мы не можем использовать что-то типа {:chat_room, "room name"}. Давайте обратимся к документации:
Поддерживаемые значения:
atom— в этом случаеGenServerрегистрируется локально с данным именемatomс помощьюProcess.register/2.
{:global, term}— в этом случаеGenServerрегистрируется глобально с данным именемtermс помощью функций в модуле:global.
{:via, module, term}— в этом случаеGenServerрегистрируется с помощью определённого вmoduleмеханизма и имени `term.
The supported values are:
anatom— theGenServeris registered locally with the given name usingProcess.register/2.
{:global, term}— theGenServeris registered globally with the given term using the functions in the:globalmodule.
{:via, module, term}— theGenServeris registered with the given mechanism and name.
Первую опцию — atom, мы уже использовали, и точно знаем, что в нашем хитром случае она не подходит.
Вторая опция используется для регистрации процесса глобально в кластере нод. Она использует локальную ETS таблицу. Кроме того она будет требовать постоянной синхронизации внутри нод в кластере, в связи с чем работа программы будет замедляться. Так что используйте её только когда это действительно нужно.
Третья, и последняя, опция использует в качестве параметра кортеж с :via, и это как раз то, что нам надо для решения нашей проблемы! Вот что говорит по этому поводу документация:
Опция:viaпринимает в качестве параметра модуль, который имеет следующий интерфейс:register_name/2,unregister_name/1,whereis_name/1иsend/2.
The :via option expects a module that exports register_name/2, unregister_name/1, whereis_name/1 and send/2.
Вообще ничего не понятно? Мне тоже! Так что посмотрим этот метод в деле.
Используем кортеж :via
Итак, кортеж :via — это способ сказать Elixir, что мы собираемся использовать отдельный модуль для регистрации наших процессов. Этот модуль должен делать следующие вещи:
- Регистрировать имя, которым может быть любой
term, с помощью функцииregister_name/2; - Удалять имена из регистра, с помощью функции
unregister_name/1; - Находить
pidпо имени, с помощьюwhereis_name/1; - Посылать сообщения определённому процессу с помощью
send/2.
Чтобы это всё работало, вышеперечисленные функции должны передавать ответ в определённом формате, определённом в OTP — так же как и handle_call/3 и handle_cast/2 подчиняются определённым правилам.
Попробуем определить модуль, которые всё это знает:
# ./lib/chat/registry.ex defmodule Chat.Registry do use GenServer # API def start_link do # We register our registry (yeah, I know) with a simple name, # just so we can reference it in the other functions. GenServer.start_link(__MODULE__, nil, name: :registry) end def whereis_name(room_name) do GenServer.call(:registry, {:whereis_name, room_name}) end def register_name(room_name, pid) do GenServer.call(:registry, {:register_name, room_name, pid}) end def unregister_name(room_name) do GenServer.cast(:registry, {:unregister_name, room_name}) end def send(room_name, message) do # If we try to send a message to a process # that is not registered, we return a tuple in the format # {:badarg, {process_name, error_message}}. # Otherwise, we just forward the message to the pid of this # room. case whereis_name(room_name) do :undefined -> {:badarg, {room_name, message}} pid -> Kernel.send(pid, message) pid end end # SERVER def init(_) do # We will use a simple Map to store our processes in # the format %{"room name" => pid} {:ok, Map.new} end def handle_call({:whereis_name, room_name}, _from, state) do {:reply, Map.get(state, room_name, :undefined), state} end def handle_call({:register_name, room_name, pid}, _from, state) do # Registering a name is just a matter of putting it in our Map. # Our response tuple include a `:no` or `:yes` indicating if # the process was included or if it was already present. case Map.get(state, room_name) do nil -> {:reply, :yes, Map.put(state, room_name, pid)} _ -> {:reply, :no, state} end end def handle_cast({:unregister_name, room_name}, state) do # And unregistering is as simple as deleting an entry # from our Map {:noreply, Map.delete(state, room_name)} end end
Опять же: в наших руках выбрать, каким образом наш реестр будет внутри работать. Здесь мы используем простую Map для связи имени и pid. Этот код абсолютно прост и прямолинеен, особенно если вы хорошо знаете как работает GenServer. Незнакомыми могут казаться только возвращаемые функциями значения.
Пришло время попробовать наш реестр в iex сессии:
$ iex -S mix iex> {:ok, pid} = Chat.Server.start_link("room1") {:ok, #PID<0.107.0>} iex> Chat.Registry.start_link {:ok, #PID<0.109.0>} iex> Chat.Registry.whereis_name("room1") :undefined iex> Chat.Registry.register_name("room1", pid) :yes iex> Chat.Registry.register_name("room1", pid) :no iex> Chat.Registry.whereis_name("room1") #PID<0.107.0> iex> Chat.Registry.unregister_name("room1") :ok iex> Chat.Registry.whereis_name("room1") :undefined
5 секунд — полёт отличный! Реестр работает как надо: и регистрирует, и удаляет регистрацию. Попробуем его использовать в наших чатах.
Наша проблема была в том, что у нас были нескольких запущенных серверов Chat.Server, инициализированных через Supervisor. Чтобы отправить сообщение в определённую комнату, мы хотели бы вызывать Chat.Server.add_message(“room1”, “my message”), поэтому мы должны были бы регистрировать имена серверов как {:chat_room, “room1”} и {:chat_room, “room2”}. Вот как это делается через кортеж :via:
# ./lib/chat/server.ex defmodule Chat.Server do use GenServer # API def start_link(name) do # Instead of passing an atom to the `name` option, we send # a tuple. Here we extract this tuple to a private method # called `via_tuple` that can be reused in every function GenServer.start_link(__MODULE__, [], name: via_tuple(name)) end def add_message(room_name, message) do # And the `GenServer` callbacks will accept this tuple the # same way it accepts a pid or an atom. GenServer.cast(via_tuple(room_name), {:add_message, message}) end def get_messages(room_name) do GenServer.call(via_tuple(room_name), :get_messages) end defp via_tuple(room_name) do # And the tuple always follow the same format: # {:via, module_name, term} {:via, Chat.Registry, {:chat_room, room_name}} end # SERVER (no changes required here) # ... end
Изменения — вот в этом коммите
Вот что здесь происходит: каждый раз, когда мы посылаем сообщение в Chat.Server, передавая имя комнаты, он сам будет находить pid нужного процесса с помощью того модуля, который мы ему передали в кортеже :via (в данном случае это Chat.Registry).
Это решает нашу проблему: теперь мы можем использовать любое количество Chat.Server процессов (ну, пока не закончится фантазия на имена), и нам никогда не надо знать их pid. Совсем.
Впрочем, есть ещё одна проблема в таком решении. Догадались?
Именно! Наш реестр не знает о процессах, которые упали, и должны быть перезапущены через Supervisor. А это значит, что когда такое произойдёт, реестр не даст пересоздать запись с таким же именем, и будет хранить pid мёртвого процесса.
По идее, решение этой проблемы — не слишком сложное. Мы заставим наш реестр осуществлять мониторинг всех процессов, pid которых он хранит. Как только такой "наблюдаемый" процесс упадёт — мы его просто удалим из нашего реестра.
# in lib/chat/registry.ex defmodule Chat.Registry do # ... def handle_call({:register_name, room_name, pid}, _from, state) do case Map.get(state, room_name) do nil -> # When a new process is registered, we start monitoring it. Process.monitor(pid) {:reply, :yes, Map.put(state, room_name, pid)} _ -> {:reply, :no, state} end end def handle_info({:DOWN, _, :process, pid, _}, state) do # When a monitored process dies, we will receive a # `:DOWN` message that we can use to remove the # dead pid from our registry. {:noreply, remove_pid(state, pid)} end def remove_pid(state, pid_to_remove) do # And here we just filter out the dead pid remove = fn {_key, pid} -> pid != pid_to_remove end Enum.filter(state, remove) |> Enum.into(%{}) end end
Изменения — вот в этом коммите
Убедимся в том, что всё работает:
$ iex -S mix iex> Chat.Registry.start_link {:ok, #PID<0.107.0>} iex> Chat.Supervisor.start_link {:ok, #PID<0.109.0>} iex> Chat.Supervisor.start_room("room1") {:ok, #PID<0.111.0>} iex> Chat.Server.add_message("room1", "message") :ok iex> Chat.Server.get_messages("room1") ["message"] iex> Chat.Registry.whereis_name({:chat_room, "room1"}) |> Process.exit(:kill) true iex> Chat.Server.add_message("room1", "message") :ok iex> Chat.Server.get_messages("room1") ["message"]
Ну, теперь уже совершенно не важно, сколько раз Supervisor перезапустит процесс Chat.Server: как только мы посылаем сообщение в комнату — оно будет доставлено по верному pid.
Упрощаем с помощью gproc
В принципе с нашим чатом мы на этом и закончим, но хочется рассказать ещё об одной фиче, которая упростит для нас регистрацию с помощью кортежа :via. Это — gproc, — библиотека Erlang.
И научим наш Chat.Server использовать gproc вместо нашего Chat.Registry, а потом мы вообще избавимся от Chat.Registry.
Начнём пожалуй с зависимостей. Для этого добавим gproc в mix.exs:
# ./mix.exs defmodule Chat.Mixfile do # ... def application do [applications: [:logger, :gproc]] end defp deps do [{:gproc, "0.3.1"}] end end
Затем подтянем зависимости с помощью:
$ mix deps.get
Теперь мы можем поменять нашу регистрацию с помощью кортежа :via — пусть использует gproc, а не Chat.Registry:
# ./lib/chat/server.ex defmodule Chat.Server do # ... # The only thing we need to change is the `via_tuple/1` function, # to make it use `gproc` instead of `Chat.Registry` defp via_tuple(room_name) do {:via, :gproc, {:n, :l, {:chat_room, room_name}}} end # ... end
gproc использует ключи-кортежи, состоящие из трёх значений: {type, scope, key}.
В нашем случае мы используем:
:n— это значитимя, то есть не может быть больше одного процесса, зарегистрированного под таким ключом;:l— это значитlocal, то есть процесс регистрируется только на нашей ноде;{:chat_room, room_name}— это сам ключ в виде кортежа.
Дополнительную информацию по возможным настройкам gproc искать тут.
После таких изменений вообще выкинем наш Chat.Registry, и проверим что всё продолжает работать в iex сессии:
$ iex -S mix iex> Chat.Supervisor.start_link {:ok, #PID<0.190.0>} iex> Chat.Supervisor.start_room("room1") {:ok, #PID<0.192.0>} iex> Chat.Supervisor.start_room("room2") {:ok, #PID<0.194.0>} iex> Chat.Server.add_message("room1", "first message") :ok iex> Chat.Server.add_message("room2", "second message") :ok iex> Chat.Server.get_messages("room1") ["first message"] iex> Chat.Server.get_messages("room2") ["second message"] iex> :gproc.where({:n, :l, {:chat_room, "room1"}}) |> Process.exit(:kill) true iex> Chat.Server.add_message("room1", "first message") :ok iex> Chat.Server.get_messages("room1") ["first message"]
Изменения — вот в этом коммите
Куда плыть дальше, капитан?
Мы с вами разобрались в куче сложных вопросов. Основные выводы:
- Будьте осторожны, работая с
pidнапрямую: они поменяются как только процесс перезапустится. - Если вам нужно получить ссылку только на один процесс ( как у нас было с единственной комнатой в чате), регистрация процесса с именем в виде атома — достаточная мера;
- Если вам нужно создавать процессы динамически (множество чат-комнат), вы можете использовать кортеж
:viaдля предоставления своего собственного реестра; - Подобные реестры уже существуют ( к примеру
gproc), и если вы их используете — не придётся строить свой велосипед;
Конечно, это ещё не всё. Если вам нужна глобальная регистрация на всех нодах в кластере, другие средства тоже могут быть хороши. У Erlang есть глобальные модули для глобальных регистраций, pg2 для групп процессов, да и тот же gprc может вам помочь.
Если эта статья вас заинтересовала — почитайте Saša Jurić. Elixir in Action.
А вот и репка с сыром)
