Многопоточность в Ruby

Original author: David Thomas, Chad Fowler, Andrew Hunt
  • Translation
Перевод главы «Multithreading» книги David Thomas и Andrew Hunt «Programming Ruby: The Pragmatic Programmers' Guide, Second Edition».

Часто самым простым способом выполнить одновременно две вещи является использование потоков в Ruby. Они являются внутрипроцессными, встроенными в интерпретатор Ruby. Это делает потоки Ruby полностью переносимыми, т.е. независимыми от операционной системы. Но в то же время вы точно не получите выгоду от использования родных, нативных потоков. Что это значит?

Вы можете столкнуться с голоданием (thread starvation — это когда поток с маленьким приоритетом не имеет шанса запуститься). Если вы хотите заблокировать ваши потоки, то со скрежетом остановится целый процесс. А если возникнет ситуация, что некоторые потоки будут посылать вызовы операционной системе, для выполнения которых требуется немалое время, то все потоки будут висеть, пока интерпретатор не получит контроль обратно. И наконец, если ваша машина имеет больше одного процессора, потоки Ruby не будут это использовать, т.к. они запускаются в одном процессе, а в одиночном родном потоке они будут вынуждены запускаться на одном процессоре единовременно.

Все это звучит страшновато. Тем не менее, на практике во многих случаях выгода от использования потоков во многом перевешивает любые потенциальные проблемы, которые могут возникнуть. Потоки Ruby являются эффективным и легким путем достижения параллельности в вашем коде. Вы просто должны понять основные проблемы реализации, и, соответственно, архитектуру.

Создание потоков Ruby


Создание нового потока довольно прямолинейно. Следующий код — простой пример. Он параллельно скачивает набор Веб-страниц. Для каждого запрашиваемого для закачки URL код создает отдельный поток, который управляет HTTP-транзакцией.

require 'net/http'
pages = %w( www.rubycentral.com slashdot.org www.google.com )
threads = []
for page_to_fetch in pages
    threads << Thread.new(page_to_fetch) do |url|
        h = Net::HTTP.new(url, 80)
        puts "Fetching: #{url}"
        resp = h.get('/'nil )    
        puts "Got #{url}#{resp.message}"
    end
end 
threads.each {|thr| thr.join }


Результат:
Fetching: www.rubycentral.com
Fetching: slashdot.org
Fetching: www.google.com
Got www.google.com: OK
Got www.rubycentral.com: OK
Got slashdot.org: OK 


Давайте взглянем на этот код более внимательно: здесь происходит несколько тонких моментов. Новые потоки создаются вызовом Thread.new. Это задает блок, содержащий код, который будет выполняться в новом потоке. В нашем случае блок использует библиотеку net/http для извлечения главной страницы указанных сайтов. Наша трассировка явно показывает, что эти извлечения выполняются параллельно.

Когда мы создаем поток, мы указываем необходимый URL в качестве параметра. Этот параметр передается в блок в виде переменной url. Почему мы это делаем, когда проще было бы использовать значение переменной page_to_fetch внутри блока?

Поток имеет общий доступ ко всем глобальным переменным, переменным экземпляра и локальным переменным, которые имеются на момент запуска потока. Любой человек, имеющий младшего брата, может вам сказать, что общий доступ или совместное использование не всегда является хорошей вещью. В этом случае, все три потока будут делиться переменной page_to_fetch. Когда запускается первый поток, page_to_fetch принимает значение «www.rubycentral.com». А между тем, цикл, создающий потоки, до сих пор работает. В следующий момент времени page_to_fetch устанавливается в «slashdot.org». Если первый поток еще не закончил использовать переменную page_to_fetch, то он неожиданно начнет использовать ее новое значение. Этот вид ошибки очень трудно отследить.

Однако, локальные переменные, создаваемые внутри блока потока, являются действительно локальными по отношению к этому потоку — каждый поток будет иметь свою копию адреса страницы. Вы можете указать любое число аргументов в блоке с помощью Thread.new.

Управление потоками

Другая тонкость происходит на последней строке нашей программы. Почему мы вызываем join для каждого создаваемого потока?

Когда программа Ruby завершается, все потоки убиваются, несмотря на их состояние. Однако, вы можете подождать завершения отдельного потока путем вызова метода Thread#join. Вызывающий поток заблокируется до того момента, как текущий поток не завершится. Вызывая join для каждого потока, вы можете быть уверенными, что все три запроса будут выполнены перед завершением основной программы. Если вы не хотите блокировать поток навсегда, вы можете передать в join параметр лимита времени — если этот лимит закончится перед завершением потока, вызов join вернет значение nil. Другой вариант join'а — метод Thread#value, который возвращает значение последней операции, выполненной в потоке.

Помимо join, для управления потоками используется несколько других удобных операций. Доступ к текущему потоку можно всегда получить, используя Thread.current. Вы можете получить список всех потоков, используя Thread.list, которая возвращает список всех объектов Thread: и работающих, и остановленных. Для определения статуса отдельного потока вы можете использовать Thread#status и Thread#alive?.
Дополнительно вы можете настроить приоритет потока, используя Thread#priority=. Потоки с бОльшим приоритетом будут запускаться перед потоками с меньшим приоритетом. Мы поговорим немного позже о планировании расписания потоков, а также об их запуске и остановке.

Переменные потока

Поток имеет обычный доступ ко всем переменным, находящимся в области видимости во время его запуска. Локальные переменные блока, содержащего код потока, являются локальными по отношению к самому потоку и не имеют к себе общего доступа.

Но что делать, если вам понадобятся в потоке такие переменные, над которыми можно было бы иметь доступ с других потоков — включать их в главный поток? Характерной чертой класса Thread является специальная возможность, позволяющая создавать и иметь доступ по имени к локальным переменным потока. Вы просто обращаетесь с объектом потока как с хэшем, устанавливая значения элементов с помощью [ ]= и читая их с помощью [ ]. В следующем примере каждый поток записывает текущее значение счетчика в локальную переменную потока с ключом mycount. Для осуществления этого код использует строку «mycount» в качестве индекса объекта потока.

count = 0
threads = []
10.times do |i|
    threads[i] = Thread.new do
        sleep(rand(0.1))
        Thread.current["mycount"] = count
        count += 1
    end
end
threads.each {|t| t.join; print t["mycount"]", " } 
puts "count = #{count}" 


Результат:
4, 1, 0, 8, 7, 9, 5, 6, 3, 2, count = 10 


Главный поток ждет, пока завершатся остальные потоки, а затем выводит значения счетчика, зафиксированные каждым потоком. Для интереса мы добавили случайную задержку каждому потоку перед записью значения счетчика.

Потоки и исключения


Что будет, если в потоке возникнет необработанное исключение? Это зависит от значения флага abort_on_exception и от значения флага отладки интерпретатора.
Если abort_on_exception = false и флаг отладки не включен (состояние по умолчанию), то необработанное исключение просто убьет текущий поток, а все остальные продолжат свою работу. В реальности, вы даже ничего не знаете об исключении, пока для потока, выбросившего это исключение, не будет вызван join.

В следующем примере поток 2 раздувается и не может ничего вывести. Однако, вы все еще можете видеть след от остальных потоков.

threads = []
4.times do |number|
    threads << Thread.new(number) do |i|
        raise "Boom!" if i == 2
        print "#{i}\n"
    end
end 
threads.each {|t| t.join } 


Результат:
0 
1
3
prog.rb:4: Boom! (RuntimeError)
from prog.rb:8:in `join'
from prog.rb:8
from prog.rb:8:in `each'
from prog.rb:8


Мы можем перехватить исключение во время выполнения join.
threads = []
4.times do |number|
    threads << Thread.new(number) do |i|
        raise "Boom!" if i == 2
        print "#{i}\n"
    end
end
threads.each do |t|
    begin
        t.join
        rescue RuntimeError => e
        puts "Failed: #{e.message}"
    end
end 


Результат:
0
1
3 
Failed: Boom!

Однако, если установить abort_on_exception в true или использовать -d для отключения флага отладки, то необработанное исключение убьет все работающие потоки. Как только поток 2 умрет, больше не будет произведено никакого вывода.

Thread.abort_on_exception = true
threads = []
4.times do |number|
    threads << Thread.new(number) do |i|
        raise "Boom!" if i == 2
        print "#{i}\n"
    end
end
threads.each {|t| t.join }


Результат:
0
1
prog.rb:5: Boom! (RuntimeError)
from prog.rb:4:in `initialize'
from prog.rb:4:in `new'
from prog.rb:4
from prog.rb:3:in `times'
from prog.rb:3


Данный пример также иллюстрирует глюк. Внутри цикла предпочтительней использовать print для вывода числа, чем puts. Почему? Потому что puts тайно разбивает свою работу на две составляющие: выводит свой аргумент, а затем выводит символ новой строки. Между ними двумя может запуститься поток, и вывод будет чередоваться. Вызывая print одной строки, которая уже содержит символ новой строки, мы можем обойти данную проблему.

Управление планировщиком потоков


В хорошо спроектированном приложении вы будете просто давать потокам делать свою работу. Создание временных зависимостей в многопоточном приложении, как правило считается дурным тоном, т.к. это делает код намного труднее для восприятия, а также делает невозможной оптимизацию выполнения вашей программы планировщиком потоков.

Однако, иногда вам будет необходимо управлять потоками явно. Например, музыкальный автомат, показывающий светомузыку. Мы должны остановить его на время, когда останавливается музыка. Вы можете использовать два потока в виде схемы производитель-потребитель, где потребитель должен подождать, если производитель имеет незавершенные заказы.

Класс Thread предоставляет набор методов для управления планировщиком потоков. Вызов Thread.stop останавливает текущий поток, а вызов Thread#run запускает отдельный поток. Thread.pass запускает планировщик для передачи выполнения другому потоку, а Thread#join и Thread#value приостанавливают вызывающий поток, пока заданные потоки на завершатся.

Мы можем продемонстрировать эту особенность в следующей совершенно бессмысленной программе. Она создает два дочерних потока: t1 и t2, каждый из которых является экзмепляром класса Chaser. Метод chase инкрементирует счетчик, но не дает ему стать большим, чем на два по сравнению со счетчиком в другом потоке. Для остановки этого увеличения метод вызывает Thread.pass, которые позволяет запуститься методу chase в другом потоке. Для интереса мы сразу после старта приостанавливаем потоки, а затем в случайном порядке запускаем.

class Chaser
    attr_reader :count
    def initialize(name)
        @name = name
        @count = 0
    end
    def chase(other)
        while @count < 5
            while @count - other.count > 1
                Thread.pass
            end
            @count += 1
            print "#@name#{count}\n"
        end
    end 
end 

c1 = Chaser.new("A")
c2 = Chaser.new("B")
threads = [
    Thread.new { Thread.stop; c1.chase(c2) },
    Thread.new { Thread.stop; c2.chase(c1) }
]
start_index = rand(2)
threads[start_index].run
threads[1 - start_index].run 
threads.each {|t| t.join }


Результат:
B: 1
B: 2
A: 1
B: 3
A: 2
B: 4
A: 3
B: 5
A: 4
A: 5


Однако, использовать такие элементарные действия для достижения синхронизации в реальном коде не так просто — состояние гонок будет вас постоянно преследовать. А когда вы будете работать с общими данными, состояние гонок точно гарантирует вам долгую и разочаровывающую отладку. Вообще-то предыдущий пример содержит ошибку: существует возможность инкрементировать счетчик в одном потоке, но перед выводом его значения, запускается другой поток и выводит значение своего счетчика. В результате вывод будет в неправильном порядке.

К счастью, потоки имеют одну дополнительную возможность — идея взаимного исключения (mutual exclusion). Используя это, мы можем создавать безопасные схемы синхронизации.
Share post

Similar posts

AdBlock has stolen the banner, but banners are not teeth — they will be back

More
Ads

Comments 30

    +1
    потоки в Ruby очень похожи на потоки Python, в них есть достаточно много ограничений, в общем смысле их стоит использовать по количеству либо ядер (либо ядер х 2, если smp), но никак не больше, к тому же в вашем примере (хоть это довольно незначительно) если соединение диал-ап, то потоки не выполнятся параллельно (это и не только, в зависимости от задачи, может стать bottle neck в приложении), к тому же присутствует GIL (Global Interpreter Lock), про ограничения можно неплохо почитать вот тут www.infoq.com/news/2007/05/ruby-threading-futures

    пс. не принимайте как критику, это дополнение к изложенному :-)
      –1
      Кто определяет, какими цветами подсвечивать синтаксис?
        0
        Интересные заявления Вы делаете. Вы предлагаете забить на многопоточность, если в системе одно ядро? Откуда Вы это правило взяли: количество ядер = количество потоков?
          –1
          Нужно знать свою платформу а не использовать все подряд
            –1
            Current stable releases of Ruby use user space threads (also called «green threads»), which means that the Ruby interpreter takes care of everything to do with threads.… User space threads, on the other hand, can not make use of multiple cores or multiple CPUs (because the OS doesn't know about them and thus can't schedule them on these cores/CPUs).

            здесь уже сама ось распараллелит задачи по ядрам или потокам, но Ruby сам этого не сможет сделать, это кстати цитата из ссылки, которую я дал, в противном случае (если у вас к примеру 2 ресурсоемкие задачи на одном ядре, то выполняться они будут точно НЕ параллельно, а либо с переключением контекста, либо друг за другом).
              +1
              Во-первых, информация в цитате, которую Вы приводите, устарела. Текущий стабильный релиз (1.9.1) уже не использует userspace threads. Используются нативные потоки. Хотя все равно присутсвует GIL.

              Во-вторых, возвращаясь к моему вопросу. О каком распараллеливании Вы говорите, если Вы утверждаете, что потоки надо использовать по количеству ядер? Т.е. на одноядерной машине, не более одного потока.
                +1
                GIL все еще присутствует, а это означает, что ваши треды на одноядерной машине будут работать скорее всего через переключение контекста (т.е. сначала одна квант, грубо говоря, потом другая),

                насчет 1.9.1 прекрасно написано в комментариях вот тут, они не параллелятся
                blog.reverberate.org/2009/01/31/ruby-191-released/

                разница в 1.9.1 и 1.8 заключается в том, что треды стали нативными, это повлияло и на возможности, и на скорость, но никак не решило задачу параллельности
                  0
                  Слушайте, это все здорово, но я же Вас не о том спрашиваю. Я хорошо представляю, как устроена поточная модель и в 1.8, и в 1.9.1. Т.ч. перестаньте, пожалуйста, сыпать ссылками и цитатами.

                  Меня интересует простая вещь (наверное, простая) относительно Вашего первого комментария. Что это за правило такое: количество потоков = количество ядер.

                  Просто Вам вот там плюсы ставят за этот комментарий. А я, честно говоря, не понимаю за что. Вы мне объясните.
                    +1
                    я вам и объясняю, вы видимо только на плюсы смотрите, а не на суть.

                    правило количество потоков (тредов) = количество ядер возникло потому, что на одном ядре нормально (без переключения контекста или блокировок) можно запустить 1 тред, который ГАРАНТИРОВАНО ПОЛУЧИТ нужное количество процессорного времени, а следующий тред, запущенный на этом ядре сможет работать только тогда, когда первый тред будет выполнен.

                    я понятно объясняю?
                      –1
                      Т.е. тот факт, что на одном ядре одновременно может выполняться только один поток, по-вашему является достаточным основанием для отказа от многопоточного программирования в Ruby? Объясните мне тогда, что в других языках это как по-другому? Может от него вообще в принципе стоит отказаться?

                      И интересно, что выражение «гарантированно получит» значит. Вы его как-то особенно выделили. Т.е. другие треды получат меньше процессорного времени, чем им нужно или они просто не выполнятся. Что с ними по-вашему произойдет?
                        +1
                        давайте еще раз по азбуке пройдемся, если уж так.

                        представьте, что потоки в Ruby это рабочие, у рабочего есть пространство 1 квадратный метр, ему надо выкопать метровую яму, вы можете разместить на этом пространстве и 2 и 3 рабочих, но ЭФФЕКТИВНО эту работу будет выполнять только один

                        представьте, что потоки Ruby это кассир, как думаете, будет лучше, если она обслужит по одному покупателю, или будет пытаться обслужить всю толпу? ее внимание не сможет быть занято более чем 1 человеком в квант времени

                        так же и процессоры (ядра)

                        многоядерность — сравнительно недавнее изобретение, к которому еще далеко не все привыкли и научились работать, для него требуется новая архитектура (от операционных систем к приложениям), которая бы делала на 4 ядрах столько же работы, сколько могут делать 4 процессора, одинаковые по производительности, но увеличение количества пока что не повлияло на качество в той же мере

                        что касается других языков программирования, в Python'e, который я упоминал ситуация схожая с Ruby, Perl просто делает копию (клонирует) себя, так получается новый поток

                        и напоследок, я как-то заморачивался с написанием многопоточных приложений, пытаясь найти оптимальное количество тредов, приложение было подобным тому, что привел автор топика, но фишка в том, что вы запускаете несколько потоков, которые в свою очередь запускают несколько wget'ов, которые в свою очередь используют ваш интернет канал, ОСи и Ruby не сложно запустить несколько экземпляров одной программы, и тут треды — выигрыш, но bottle neck не процессор, а интернет-канал

                        если же вы в тредах пытаетесь рассчитывать числа Фибоначчи, то 1 тред будет кушать уже процессор, съедая его без остатка на расчеты (если ось не будет пытаться распараллеливать), не оставляя остальным ничего, либо ось будет таки распараллеливать ваши расчеты, но тогда каждый тред получит образно по 1 кванту времени и так будет по кругу, пока расчет не будет закончен
                          –1
                          Похоже вы действительно не понимаете, зачем нужны потоки и как их можно использовать помимо задействования CPU.

                          Простейший пример:
                          вы — вебсервер.
                          У вас десять клиентов. Но вы не обслуживаете клиентов одновременно, пока вы заняты одним, другой готовит/передает данные. По мимо состояния «что-то делаю» потоки могут (и чаще всего) находяться в состоянии «жду».

                          Второй простейший пример — софтина с графической оболочкой:
                          Пока вычислительный поток делает задачу, GUI поток спит и ждет или ответа от вычислительного или действий пользователя.

                          Также не учитываете архитектуры современного CPU которые способны выполнять несколько комманд за один такт и т.д.

                          Теперь поясните еще раз почему «количество потоков = ядер»
                            +1
                            вы по-моему что-то курите, пример с вебсервером логично было бы описать в контексте socket select, но никак не многопоточности, за сим кланяюсь
                              –1
                              А Вы уже, наверное, все, что у Вас было выкурили :)

                              Kernel#select и потоки — это альтернативные способы решения приведенной задачи (организации веб-сервера, да и сервера любого другого типа вообще). И Вы еще попробуйте с использованием голого select (без фреймворка типа EventMachine) реализовать более-менее сложный сервер. Чтобы он как минимум был также эффективен, как аналогичный сервер, реализованный через потоки, который и реализовать, и поддерживать куда проще.
                                0
                                Детской лопаточкой колодец копать
                                В предметке разберитесь
                                  0
                                  А Вы не могли бы поконкретнее? А то я, может быть, и рад бы разобраться в том, в чем я с Вашей точки зрения не разбираюсь. Но из Вашего коммента совершенно не ясно.
                                –1
                                А после Ваших торгово-строительных аналогий, я бы вообще поосторожней чужие примеры (особенно вполне удачные, как в данном случае) называл нелогичными :)
                          0
                          Если каждый поток ест 100% ядра (ну там — бесконечные циклы гоняет например) то выигрыша не будет, да.
                          Однако на высокоуровневых языках принято дергать более высокооуровневые же примитивы, а не только считать что-то, от использования потоков одновременно с кучей разного IO выирыш будет очевидный — пока один поток на своем квадратном метре ждет завершения операции, второй вполне может еще что-то поделать

                          В свое время hyper-threading так появился
                            0
                            я это написал прямо над вашим комментарием в 2х последних абзацах
                      +1
                      В рамках текущей архитектуры 1.8/1.9 GIL не уберешь :(
                  0
                  Бред.

                  1. Потоки в питоне не имеет смысла использовать ядерx2 ну никак — наличие GIL заставляет один из потоков «приостановиться» во время работы другого.

                  2. Для питона есть multiprocessing который решает проблему GIL и позволяет задействовать ядра.

                  3. «потоки встроенные в интерпретатор» о которых пишет автор сгинули в 1.8, и теперь в рубях полновесные OS потоки.

                  4. Использование потоков может преследовать кучу целей помимо «задействовать побольше ядер», да.
                  +1
                  круто! Спaсибо aвтору! :*
                    +2
                    Хорошая статья. Касательно «threads << Thread.new» есть более правильные способы делать join потоков.
                    Во-первых, можно

                    Thread.list.each { |t| t.join if t != Thread.current }
                     

                    Условие здесь вставлено потому что никто не может вызвать join для самого себя.

                    А во-вторых потоки можно группировать:

                    requests = ThreadGroup.new
                    page_to_fetch.each do |page|
                      requests.add Thread.new(page_to_fetch) do |url|
                        # …
                      end
                    end
                     

                    Ну и потом пройти по группt и сделать join.
                      0
                      Хороший материал и грамотно подан. Спасибо!
                        +6
                        Многопоточность в Ruby
                        вRuby многопоточность!
                          0
                          «внутрипроцессные» потоки в рубях сгинули в 1.8! Автор, поправься!
                            0
                            Да, в тексте описываются потоки в Ruby версии 1.8. Но с другой стороны, по сути ничего не изменилось. В 1.9 хоть и используются нативные потоки, но одно приложение может оперировать только одним потоком в одно время из-за все той же thread-safety.
                            Вот что сказано в последней версии книги:
                            Prior to Ruby 1.9, these were implemented as so-called green threads — threads were switched totally within the interpreter. In Ruby 1.9, threading is now performed by the operating system. This is an improvement, but not quite as big an improvement as you might want. Although threads can now take advantage of multiple processors (and multiple cores in a single processor), there’s a major catch. Many Ruby extension libraries are not thread safe (because they were written for the old threading model). So, Ruby compromises: it uses native operating system threads but operates only a single thread at a time. You’ll never see two threads in the same application running Ruby code truly concurrently. (You will, however, see threads busy doing (say) I/O while another thread executes Ruby code. That’s part of the point....)

                            Да и много ли сейчас людей используют 1.9 для разработки?
                              –3
                              трололо. Создайте сотню «зеленых» потоков и сотню нативных и сравните нагрузку. В юниксах, поток практически = процесс со всеми вытекающими затратами на запуск и т.д.

                              Ну и часть страшилок статьи теряет актуальность когда контроль над потоками принимает ОС
                                +1
                                А еще в 1.9 появились Fibers. Облегченные зеленые потоки с кооперативным шедулингом.

                                ruby-doc.org/core-1.9/classes/Fiber.html

                                Так что есть из чего выбирать.

                                А вообще мне больше по душе EventMachine. Хотя писать через колбеки иногда не особо приятно.
                              0
                              Спасибо за детальное описание!
                              Попробую на деле в своих грабберах и парсерах (Mechanize + proxy)

                              Only users with full accounts can post comments. Log in, please.