Распределенные вычисления в Elixir — классический пример MapReduce

    Распределенные вычисления в Elixir


    Elixir и Erlang идеально подходят для создания распределенных приложений, выполняющих параллельно несколько, возможно схожих задач. Поддержка многих конкурентных процессов работающих в изоляции была одним из основных аспектов при разработке виртуальной машины языка Erlang.


    Постараемся проверить эту возможность использовать потенциал многоядерного процессора на простом примере. Подсчитаем сколько раз встечается слово "лошадь" в рассказах писателя О. Генри размещенных в текстовых файлах в одной директории. Технически, мы будем считать количество вхождения последавательности символов "лошадь", а не слова, и только в нижнем регистре.



    Подcчет вхождений подстроки в файлах


    Начнем с функции подсчета количества вхождений подстроки в содержимом текстового файла.


    word_count = fn(file, word) ->
      {:ok, content} = File.read(file)
      length(String.split(content, word)) - 1
    end

    Читаем содержимое файла и возвращаем количество упоминаний слова. Обработка ошибок опущена, для простоты.


    Добавим в функцию задержку на 1 секунду, а также выведем результат подсчета в консоль, перед тем как его вернуть.


    word_count = fn(file, word) ->
      :timer.sleep(1000)
      {:ok, content} = File.read(file)
      count = length(String.split(content, word)) - 1
      IO.puts "Found #{inspect count} occurrence(s) of the word in file #{inspect file}"
      count
    end

    Теперь посчитаем количество подстроки в каждом файле и выведем сумму.


    Path.wildcard("/data/OGENRI/*.txt")
    |> Enum.map(fn(file) -> word_count.(file, "лошадь") end)
    |> Enum.reduce(fn(x, acc) -> acc + x end)
    |> IO.puts

    И заодно замерим время выолнения всей программы.


    # sync_word_count.exs
    start_time = :os.system_time(:milli_seconds)
    
    word_count = fn(file, word) ->
      :timer.sleep(1000)
      {:ok, content} = File.read(file)
      count = length(String.split(content, word)) - 1
      IO.puts "Found #{inspect count} occurrence(s) of the word in file #{inspect file}"
      count
    end
    
    Path.wildcard("/data/OGENRI/*.txt")
    |> Enum.map(fn(file) -> word_count.(file, "лошадь") end)
    |> Enum.reduce(fn(x, acc) -> acc + x end)
    |> IO.puts
    
    end_time = :os.system_time(:milli_seconds)
    IO.puts "Finished in #{(end_time - start_time) / 1000} seconds"

    Всего у меня 12 файлов и ждать пришлось около 12-ти секунд, секунда за секундой созерцая как на мониторе появляется результат подсчета для каждого файла.


    iex sync_word_count.exs
    Erlang/OTP 18 [erts-7.3] [source] [64-bit] [smp:8:8] [async-threads:10] [hipe] [kernel-poll:false] [dtrace]
    
    Found 0 occurrence(s) of the word in file "/data/OGENRI/businessmen.txt"
    Found 1 occurrence(s) of the word in file "/data/OGENRI/choose.txt"
    Found 0 occurrence(s) of the word in file "/data/OGENRI/four.txt"
    Found 1 occurrence(s) of the word in file "/data/OGENRI/light.txt"
    Found 10 occurrence(s) of the word in file "/data/OGENRI/prevr.txt"
    Found 0 occurrence(s) of the word in file "/data/OGENRI/r_dl.txt"
    Found 1 occurrence(s) of the word in file "/data/OGENRI/r_linii.txt"
    Found 10 occurrence(s) of the word in file "/data/OGENRI/r_sixes.txt"
    Found 9 occurrence(s) of the word in file "/data/OGENRI/serdce.txt"
    Found 0 occurrence(s) of the word in file "/data/OGENRI/stihi.txt"
    Found 0 occurrence(s) of the word in file "/data/OGENRI/voice.txt"
    Found 0 occurrence(s) of the word in file "/data/OGENRI/ways.txt"
    32
    Finished in 12.053 seconds
    Interactive Elixir (1.3.1) - press Ctrl+C to exit (type h() ENTER for help)

    Асинхронное выполнение задач


    Для подсчета количества вхождений подстроки асинхронно воспользуемся методом создания процесса spawn (порождать) и методами send and receive для отправки и получения сообщения, соответственно.


    Для каждого файла будем создавать отдельный процесс


    async_word_count = fn(file, word) ->
      caller = self
      spawn(fn ->
        send(caller, {:result, word_count.(file, word)})
      end)
    end

    self — это текущий процесс. Создаем переменную caller с тем же значением, что и self. Порожденный процесс вызывает функцию word_count/2 и посылает результат обратно родительскому процессу.


    Чтобы получить значение обрано, в родительком процессе нужно использовать receive (столько же раз, сколько процессов будет создано). Создадим для этого метод get_result/0.


    get_result = fn ->
      receive do
        {:result, result} -> result
      end
    end

    Обновим программу.


    # async_word_count.exs
    start_time = :os.system_time(:milli_seconds)
    
    word_count = fn(file, word) ->
      :timer.sleep(1000)
      {:ok, content} = File.read(file)
      count = length(String.split(content, word)) - 1
      IO.puts "Found #{inspect count} occurrence(s) of the word in file #{inspect file}"
      count
    end
    
    async_word_count = fn(file, word) ->
      caller = self
      spawn(fn ->
        send(caller, {:result, word_count.(file, word)})
      end)
    end
    
    get_result = fn ->
      receive do
        {:result, result} -> result
      end
    end
    
    Path.wildcard("/data/OGENRI/*.txt")
    |> Enum.map(fn(file) -> async_word_count.(file, "лошадь") end)
    |> Enum.map(fn(_) -> get_result.() end)
    |> Enum.reduce(fn(x, acc) -> acc + x end)
    |> IO.puts
    
    end_time = :os.system_time(:milli_seconds)
    IO.puts "Finished in #{(end_time - start_time) / 1000} seconds"

    Проверим.


    iex async_word_count.exs
    Erlang/OTP 18 [erts-7.3] [source] [64-bit] [smp:8:8] [async-threads:10] [hipe] [kernel-poll:false] [dtrace]
    
    Found 9 occurrence(s) of the word in file "/data/OGENRI/serdce.txt"
    Found 0 occurrence(s) of the word in file "/data/OGENRI/businessmen.txt"
    Found 0 occurrence(s) of the word in file "/data/OGENRI/four.txt"
    Found 1 occurrence(s) of the word in file "/data/OGENRI/choose.txt"
    Found 1 occurrence(s) of the word in file "/data/OGENRI/light.txt"
    Found 10 occurrence(s) of the word in file "/data/OGENRI/prevr.txt"
    Found 1 occurrence(s) of the word in file "/data/OGENRI/r_linii.txt"
    Found 10 occurrence(s) of the word in file "/data/OGENRI/r_sixes.txt"
    Found 0 occurrence(s) of the word in file "/data/OGENRI/stihi.txt"
    Found 0 occurrence(s) of the word in file "/data/OGENRI/voice.txt"
    Found 0 occurrence(s) of the word in file "/data/OGENRI/ways.txt"
    Found 0 occurrence(s) of the word in file "/data/OGENRI/r_dl.txt"
    32
    Finished in 1.014 seconds
    Interactive Elixir (1.3.1) - press Ctrl+C to exit (type h() ENTER for help)

    Вывод


    Приключения и лошади раньше были неотделимы друг от друга, сейчас уже, возможно, это не совсем так.


    Ссылки


    » http://elixir-lang.org/getting-started/processes.html
    » http://culttt.com/2016/07/27/understanding-concurrency-parallelism-elixir/
    » https://elixirschool.com/lessons/advanced/concurrency/
    » Код и текстовые файлы (папка OGENRI)

    Ads
    AdBlock has stolen the banner, but banners are not teeth — they will be back

    More

    Comments 15

      0
      Спасибо за интересное исследование. Поэтому возник вопрос, т.к. неинтересные публикации вопросов не вызывают:
      Добавим в функцию задержку на 1 секунду, а также выведем результат подсчета в консоль, перед тем как его вернуть.
      Этот момент вызвал сомнение. Значительное время тест не работает, а спит. Т.о. тестируем сон, а не только работу.
      • UFO just landed and posted this here
        +2

        Тема действительно интересная, я сам недавно сталкивался с распараллеливанием вычислений в Elixir, поэтому сделал небольшой рефакторинг и добавил основную основную фичу, значительно облегчающую распараллеливание — Task.


        Читать по ссылке, а пока предлагаю взглянуть на код:


        Весь длинный код работающего скрипта
        defmodule Wordcount do
          @main_word "лошадь"
        
          defp count(text, word) do
            length(String.split(text, word)) - 1
          end
        
          defp do_proceed(file_path) do
            File.read!(file_path)
            |> count(@main_word)
            |> (fn (count) -> IO.puts "Found #{count} occurrence(s) of the word in file #{file_path}" end).()
          end
        
          def proceed(:async) do
            Path.wildcard("./OGENRI/*.txt")
            |> Enum.map(&Task.async(fn -> do_proceed(&1) end))
            |> Enum.map(&Task.await/1)
          end
        
          def proceed(:sync) do
            Path.wildcard("./OGENRI/*.txt")
            |> Enum.map(&do_proceed/1)
          end
        
          ### Example:
          #   benchmark
          #   benchmark(:sync)
          def benchmark(type \\ :async) do
            start_time = :os.system_time(:milli_seconds)
            proceed(type)
            end_time = :os.system_time(:milli_seconds)
            IO.puts "Finished in #{(end_time - start_time)} miliseconds"
          end
        end

        Вот конкретные различия между синхронной и асинхронной версией


        def proceed(:sync) do
          Path.wildcard("./OGENRI/*.txt")
          |> Enum.map(&do_proceed/1)
        end
        
        def proceed(:async) do
          Path.wildcard("./OGENRI/*.txt")
          |> Enum.map(&Task.async(fn -> do_proceed(&1) end))
          |> Enum.map(&Task.await/1)
        end

        Пишем синхронный код — потом вуаля! — и он щелчком пальцев превращается в асинхронный. Добро пожаловать в functional programming!)

          +11
          что будет, если взять презерватив и долго учиться его натягивать на кактус?

          Такая же искусственная конструкция.

          Создание и управление процессами в эрланге — это достаточно нетривиальная штука, потому что надо следить как за ошибками в них, так и за используемыми ресурсами, да и поддерживать целостным дерево процессов что бы можно было остановить всю задачу за корневой.

          Просто бездумно форкаться в ожидании успеха по процессу на каждый элемент списка — это годится только для статьи на хабр, но никак не для чего-то, что можно выложить на продакшн.

          Path.wildcard("/data/OGENRI/*.txt")
          |> Enum.map(fn(file) -> async_word_count.(file, "лошадь") end)
          


          вы здесь создаете уйму неуправляемых процессов на каждый файл. Вы моментально схлопочете emfile, но узнать об этом не получится, потому что вы даже spawn_link не делаете.

          К сожалению, вы демонстрируете очень вредный и неряшливый подход на неверных примерах.
            0

            Это вообще беда elixir-комьюнити. Я смотрю за статьями на хабре, смотрю в elixir subreddit, и это какой-то ад — толковых постов почти нет, везде какой-то треш и полное непонимание того, как надо писать на Erlang.

            0

            Ваш результат лишь показывает, что спать асинхронно в 12 потоков быстрее, чем спать 12 раз синхронно в одном.


            Игрушечный пример на одной машине не показывает оверхеда от передачи данных по сети (который может быть очень значительным), управления задачами (что если нода упадёт?) и т.п.


            Кроме того, "классический" MapReduce устроен посложнее, чем у вас написано: редьюсеров обычно больше одного и результаты разделяются между редьюсерами по хэшу ключа и сортируются, это как раз одна из ключевых идей. Такой подход позволяет распараллелить фазу редьюса, избежать перегрузки машины, которая выполняет редьюс (все ключи могут просто не помещаться в память), и терять меньше работы в случае аварийного завершения редьюсера.


            Перемножить большие матрицы на небольшом кластере или сделать JOIN независимых датасетов по какому-нибудь общему ключу было бы гораздо более показательно.

              0
              Для этих целей José предлагает использовать https://hexdocs.pm/gen_stage/Experimental.Flow.html

              Подробнее: http://elixir-lang.org/blog/2016/07/14/announcing-genstage/

              На последней конференции он два часа про GenStage говорил.
                0
                интересная может быть штука
                  0

                  Почему «может быть»? Я ее уже в продакшене в хвост и в гриву гоняю. Валим специально оговорился, что Experimental там только из-за возможных коллизий с именами/переименованиями, код — production-ready.

                    0
                    практически всё с чем мы работаем (кроме самой beam.smp и основ OTP) требует допила.

                    Хосе Валим хороший программист, но он из рельс, а там очень и очень принято делать proof of concept, а потом как срастется.

                    Как срастется в случае с эрлангом означает OOM и рассыпающиеся штуки под нагрузкой.

                    Какие у вас цифры на хвосте и гриве? Сколько трафика, сколько событий, как часто срабатывает система защиты от перегрузки внутри пайплайна, какая у вас запланированная реакция системы на перегруз внутренних частей пайплайна?
                      0
                      Да, мне нужно было оговориться, что у нас в этой части нет критичных данных. Мы агрегируем некие данные, идущие из разных источников, примерно 100 запросов в секунду, в пике — 1000. Если запрос потеряется — ничего страшного, поэтому внутри пайплайна защиты нет. Но при этом ничего не теряется, пока, по крайней мере.

                      Запланированной реакции на перегруз в обычном понимании нет, потому что, если я правильно вообще все понимаю, consumer — в модели описанной выше — захлебнуться не может. Узким местом, таким образом, является provider. Чтобы этого избежать, у меня бэкэндом стоит Riak, который специально обещает записать все, что ему всунули, а вот уж читать — как получится. До пиковых значений мне добраться не удавалось пока, поэтому я даже и не знаю, что ответить на «как боремся».

                      В качестве premature optimization я запилил Riak hook на запись, который пишет «разреженный» бакет по данным раз в секунду (из-за того, что первоочередная задача — попасть в сторадж, бывает, что чуть реже, на десятые) и если provider начинает захлебываться, он откатывается на «разреженный» бакет. Но, повторюсь, мы до потолка пока не допрыгивали, и не заметно, что допрыгнем в ближайшее время.

                      _P. S._ на меня Валим производит впечатление человека, который свалил от DHH именно из-за того, что у него совсем другая парадигма мышления.
                        0
                        Простите, но тогда о чём мы говорим?

                        То, что вы делаете, можно сделать вообще как угодно. Хоть на каждый чих запускать процесс и слать ему сообщение.

                        Я говорю о ситуации, когда у нас по 16-24 ядер работают на 80% загрузки и если где-то возникает бесконтрольная передача данных из одной точки в другую, то это место гарантированно взорвется OOM-ом.

                        При этом перед этим оно успеет положить шедулер, потому что всё таки сборка мусора не бесплатная.

                        «for exchanging events with back-pressure» — вот это самое важное. Это очень правильные и ценные слова, но как всегда: хочется услышать, как это в бою.

                        Я знаю, что под gen_server-ом очень много выстраданной боли и даже в нём есть досадные проблемы, которые нельзя устранить (типа удалить запрос от сдохшего клиента). А что тут — интересно.
                          0

                          Я говорим о том, что Experimental.GenStage в моем конкретном случае (там есть back-pressure и просто запускать процессы не получится, они time consuming) позволил бесплатно [в теории] снять головную боль про «что будет, если завтра я найду во входном канале не тысячу, а миллион реквестов в минуту». То есть ровно про то, для чего он был создан.


                          Вы же говорите о том, что у вас есть что-то такое, что нагружает N ядер на M процентов, что в теории может вообще нуждаться в ручном маршаллинге, своем шедулере, или своем умном GenServerе. Я охотно верю, что все задачи в мире GenStage не решит, но конкретно мою — контролировать количество и успех обработчиков — он решил. Как-то так.

                            0
                            Я как раз и хочу понять, почему вы решили, что он дал вам ощущение спокойствия за тысячекратный скачок нагрузки?
                              0

                              Потому что все жизненно важное находится за консумером, который может от провайдера либо получить дополнительные N сущностей, либо упереться в мертвого/полудохлого провайдера, что не так страшно. Ад, который вы описываете с OOM, утянутым на дно шедулером и маршем Шопена, изолирован на уровне «за пределами бизнес логики».


                              Идея и реализация настолько просты, что я просто не понимаю, где оно может поломаться. Аналогия примерно такая: мы накрыли бомбу чугунной ванной. Мы не решили проблему взрыва, но мы гарантировали себе отсутствие разрушений вокруг.

              Only users with full accounts can post comments. Log in, please.