Эта заметка о том, зачем нам нужен пакет coroutine для Go и как он будет выглядеть. Но прежде всего, что такое корутины?
Сегодня каждый программист знаком с вызовами функций (подпрограмм): F вызывает G, которая останавливает F и запускает G. G выполняет свою работу, потенциально вызывая и ожидая другие функции, и в конце концов возвращается. Когда G возвращается, G уже нет, а F продолжает работать. В этой схеме одновременно выполняется только одна функция, а ее вызывающие ожидают, поднимаясь вверх по стеку вызовов.
В отличие от подпрограмм, корутины выполняются конкурентно на разных стеках, но все равно верно, что одновременно выполняется только одна функция, а ее вызывающая сторона ждет. F запускает G, но G не выполняется немедленно. Вместо этого F должен явно возобновить (resume) выполнение G, который затем начинает выполняться. В любой момент G может развернуться и вернуться (yield) назад к F. Это приостановит G и продолжит F операцию возобновления. В конце концов F снова вызывает resume, который приостанавливает F и продолжает G после выхода. И так далее, туда-сюда, пока G не вернется, что приведет к очистке G и продолжению F с последней операции возобновления, с некоторым сигналом для F, что G закончена и F больше не должна пытаться возобновить G. В этом паттерне одновременно выполняется только одна корутина, а ее вызывающая сторона ждет на другом стеке. Они выполняются по очереди в четко определенном, скоординированном порядке.
Это несколько абстрактно. Давайте посмотрим на реальные программы.
Корутины в Lua
В качестве примера можно привести сравнение двух двоичных деревьев на предмет наличия в них одинаковой последовательности значений, даже если их структуры различны. Например, здесь приведен код на языке Lua 5 для генерации некоторых двоичных деревьев:
function T(l, v, r)
return {left = l, value = v, right = r}
end
e = nil
t1 = T(T(T(e, 1, e), 2, T(e, 3, e)), 4, T(e, 5, e))
t2 = T(e, 1, T(e, 2, T(e, 3, T(e, 4, T(e, 5, e)))))
t3 = T(e, 1, T(e, 2, T(e, 3, T(e, 4, T(e, 6, e)))))
Деревья t1
и t2
содержат значения 1, 2, 3, 4, 5; t3
содержит 1, 2, 3, 4, 6.
Мы можем написать корутину для обхода дерева и получения каждого значения:
function visit(t)
if t ~= nil then -- note: ~= is "not equal"
visit(t.left)
coroutine.yield(t.value)
visit(t.right)
end
end
Тогда для сравнения двух деревьев мы можем создать две корутины visit
и попеременно считывать и сравнивать последовательные значения:
function cmp(t1, t2)
co1 = coroutine.create(visit)
co2 = coroutine.create(visit)
while true
do
ok1, v1 = coroutine.resume(co1, t1)
ok2, v2 = coroutine.resume(co2, t2)
if ok1 ~= ok2 or v1 ~= v2 then
return false
end
if not ok1 and not ok2 then
return true
end
end
end
Аргументы t1
и t2
в coroutine.resume
используются только на первой итерации в качестве аргумента visit
. Последующие возобновления возвращают это значение из coroutine.yield
, но в коде оно игнорируется.
Более идиоматичная версия на языке Lua предполагает использование функции coroutine.wrap
, которая возвращает функцию, скрывающую объект coroutine
:
function cmp(t1, t2)
next1 = coroutine.wrap(function() visit(t1) end)
next2 = coroutine.wrap(function() visit(t2) end)
while true
do
v1 = next1()
v2 = next2()
if v1 ~= v2 then
return false
end
if v1 == nil and v2 == nil then
return true
end
end
end
После завершения работы корутины, next
функция возвращает nil
(полный код).
Генераторы в Python (итераторы в CLU)
Python предоставляет генераторы, которые очень похожи на корутины Lua, но они не являются корутинами, поэтому стоит указать на различия. Главное отличие заключается в том, что "очевидные" программы не работают. Например, вот прямой перевод нашего дерева Lua и посетителя в Python:
def T(l, v, r):
return {'left': l, 'value': v, 'right': r}
def visit(t):
if t is not None:
visit(t['left'])
yield t['value']
visit(t['right'])
Но этот очевидный перевод не работает:
>>> e = None
>>> t1 = T(T(T(e, 1, e), 2, T(e, 3, e)), 4, T(e, 5, e))
>>> for x in visit(t1):
... print(x)
...
4
>>>
Мы потеряли 1, 2, 3 и 5. Что произошло?
В Python этот def visit
не определяет обычную функцию. Поскольку тело содержит оператор yield
, результатом является генератор:
>>> type(visit(t1))
<class 'generator'>
>>>
Вызов visit(t['left'])
вообще не выполняет код в visit
. Он только создает и возвращает новый генератор, который затем отбрасывается. Чтобы избежать отбрасывания этих результатов, необходимо выполнить цикл по генератору и повторно выдать их:
def visit(t):
if t is not None:
for x in visit(t['left']):
yield x
yield t['value']
for x in visit(t['right'])
yield x
В Python 3.3 появилась функция yield from
, позволяющая:
def visit(t):
if t is not None:
yield from visit(t['left']):
yield t['value']
yield from visit(t['right'])
Объект генератора содержит состояние единственного вызова visit
, то есть значения локальных переменных и то, какая строка выполняется. Это состояние выталкивается в стек вызовов при каждом возобновлении работы генератора и затем заталкивается обратно в объект генератора при каждом выходе (yield
), который может произойти только в самом верхнем кадре вызова. Таким образом, генератор использует тот же стек, что и исходная программа, избегая необходимости в полной реализации корутин, но вводя вместо этого непонятные ограничения.
Генераторы в Python, похоже, почти полностью скопированы с CLU, который был пионером этой абстракции (и многих других вещей), хотя в CLU они называются итераторами, а не генераторами. Итератор дерева CLU выглядит следующим образом:
visit = iter (t: cvt) yields (int):
tagcase t
tag empty: ;
tag non_empty(t: node):
for x: int
in tree$visit(t.left) do
yield(x);
end;
yield(t.value);
for x: int
in tree$visit(t.right) do
yield(x);
end;
end;
end visit;
Синтаксис отличается, особенно tagcase
, который рассматривает представление дерева в виде тегированного объединения, но основная структура, включая вложенные циклы for
, точно такая же, как и в нашей первой рабочей версии на Python. Кроме того, поскольку CLU был статически типизирован, visit
четко обозначен как итератор (iter
), а не как функция (proc
в CLU). Благодаря этой информации о типе, неправильное использование visit
в качестве обычного вызова функции, как в нашем глючном примере на Python, компилятор может диагностировать (и, как я полагаю, диагностировал).
О реализации CLU первоначальные разработчики писали: "Итераторы - это разновидность корутин; однако их использование достаточно ограничено, и они реализуются с использованием только стека программы. Поэтому использование итератора лишь немного дороже, чем использование процедуры". Это звучит точно так же, как объяснение, которое я дал выше для генераторов Python. Подробнее об этом см. в работе Барбары Лисков и др. 1977 года "Механизмы абстракции в CLU", в частности, разделы 4.2, 4.3 и 6.
Корутины, потоки и генераторы
На первый взгляд, корутины, потоки и генераторы выглядят одинаково. Все они обеспечивают конкурентность в той или иной форме, но имеют существенные отличия.
Корутины обеспечивают конкурентность без параллелизма: когда выполняется одна корутина, возобновляющая ее или уступающая ей не выполняется. Поскольку корутины выполняются по очереди и переключаются только в определенных точках программы, корутины могут обмениваться данными между собой без гонок. Явные переключения (
coroutine.resume
в первом примере Lua или вызов функцииnext
во втором примере Lua) служат точками синхронизации, создавая happens-before edges. Поскольку планирование осуществляется в явном виде (без какого-либо прерывания) и полностью без участия операционной системы, переключение корутины занимает не более десяти наносекунд, а обычно и того меньше. Запуск и завершение работы также значительно дешевле, чем у потоков.Потоки предоставляют больше возможностей, чем корутины, но и обходятся дороже. Дополнительная мощность - это параллелизм, а стоимость - это накладные расходы на планирование, включая более дорогие контекстные переключения и необходимость добавления преимуществ в той или иной форме. Обычно операционная система предоставляет потоки, и переключение потоков занимает несколько микросекунд. Для данной таксономии горутины Go являются дешевыми потоками: переключение горутины занимает ближе к нескольким сотням наносекунд, поскольку среда выполнения Go берет на себя часть работы по планированию, но горутины все равно обеспечивают полный параллелизм и вытеснение потоков. (Новые облегченные потоки Java по сути являются тем же самым, что и горутины).
Генераторы обеспечивают меньшую мощность, чем корутины, поскольку в корутине разрешен выход (
yield
) только самого верхнего кадра. Этот кадр перемещается туда-сюда между объектом и стеком вызовов для его приостановки и возобновления.
Корутины являются полезным строительным блоком для написания программ, которым нужна конкурентность для структурирования программы, но не для параллелизма. Подробный пример такого подхода приведен в моей предыдущей заметке "Хранилище данных в потоке управления". Другие примеры см. в статье Аны Лусии Де Моуры и Роберто Иерусалимши "Пересмотр корутин", опубликованной в 2009 году. Оригинальный пример приведен в работе Мелвина Конвея "Design of a Separable Transition-Diagram Compiler", опубликованной в 1963 году.
Зачем нужны корутины в Go?
Корутины - это паттерн конкурентности, который не обслуживается напрямую существующими библиотеками конкурентности Go. Горутины часто достаточно близки, но, как мы видели, это не одно и то же, и иногда это различие имеет значение.
Например, в докладе Роба Пайка "Лексическое сканирование в Go", опубликованном в 2011 году, представлены оригинальные лексер и парсер для пакета text/template. Они выполнялись в отдельных горутинах, соединенных каналом, несовершенно имитируя пару корутин: лексер и парсер работали параллельно, причем лексер просматривал следующую лексему, а парсер обрабатывал последнюю. Генераторов было бы недостаточно - лексер выдает значения из множества различных функций, но полноценные горутины оказались слишком сложными. Параллелизм, обеспечиваемый горутинами, вызвал гонки и в конечном итоге привел к отказу от этой конструкции в пользу хранилища состояния лексера в объекте, что было более точной имитацией корутин. Правильные корутины позволили бы избежать гонок и были бы более эффективны, чем горутины.
Предполагается, что в будущем корутины в Go будут использоваться для итераций над общими коллекциями. Мы обсуждали возможность добавления в Go поддержки ranging over functions, что побудило бы авторов коллекций и других абстракций предоставлять CLU-подобные функции-итераторы. Итераторы можно реализовать уже сегодня, используя значения функций, без каких-либо изменений в языке. Например, несколько упрощенный итератор деревьев в Go может выглядеть следующим образом:
func (t *Tree[V]) All(yield func(v V)) {
if t != nil {
t.left.All(yield)
yield(t.value)
t.right.All(yield)
}
}
Сегодня этот итератор может быть вызван как:
t.All(func(v V) {
fmt.Println(v)
})
и, возможно, его вариант может быть вызван в будущей версии Go как:
for v := range t.All {
fmt.Println(v)
}
Иногда, однако, мы хотим выполнить итерацию по коллекции таким образом, что это не укладывается в рамки одного цикла for
. Примером может служить сравнение бинарных деревьев: две итерации должны быть как-то чередованы. Как мы уже видели, ответ на этот вопрос дают корутины, позволяющие превратить функцию типа (*Tree).All
(итератор "push") в функцию, возвращающую поток значений, по одному за вызов (итератор "pull").
Как реализовать корутины в Go
Если мы хотим добавить корутины в Go, мы должны стремиться сделать это без изменения языка. Это означает, что определение корутин должно быть доступно для реализации и понимания в терминах обычного кода Go. Позже я выскажусь за оптимизированную реализацию, предоставляемую непосредственно средой выполнения, но эта реализация должна быть неотличима от чистого определения Go.
Начнем с очень простой версии, в которой операция yield
полностью игнорируется. Она просто запускает функцию в другой горутине:
package coro
func New[In, Out any](f func(In) Out) (resume func(In) Out) {
cin := make(chan In)
cout := make(chan Out)
resume = func(in In) Out {
cin <- in
return <-cout
}
go func() { cout <- f(<-cin) }()
return resume
}
New
принимает функцию f
, которая должна иметь один аргумент и один результат. New
выделяет каналы, определяет resume
, создает горутину для выполнения f
и возвращает функцию resume
. Новая горутина блокируется на <-cin
, поэтому возможности для параллелизма отсутствуют. Функция resume
разблокирует новую горутину, посылая значение in
, а затем блокирует получение значения out
. Эта пара "посылка-получение" образует корутинный переключатель. Мы можем использовать coro.New
следующим образом (полный код):
func main() {
resume := coro.New(strings.ToUpper)
fmt.Println(resume("hello world"))
}
Пока что coro.New
- это просто неуклюжий способ вызова функции. Нам необходимо добавить yield
, который мы можем передать в качестве аргумента в f
:
func New[In, Out any](f func(in In, yield func(Out) In) Out) (resume func(In) Out) {
cin := make(chan In)
cout := make(chan Out)
resume = func(in In) Out {
cin <- in
return <-cout
}
yield := func(out Out) In {
cout <- out
return <-cin
}
go func() { cout <- f(<-cin, yield) }()
return resume
}
Заметим, что параллелизма здесь все равно нет: yield
- это еще одна пара send-receive. Эти горутины ограничены коммуникационным паттерном и действуют неотличимо от корутин.
Пример: Парсер строк
Прежде чем перейти к преобразованию итераторов, рассмотрим несколько более простых примеров. В разделе "Хранилище данных в потоке управления" мы рассматривали проблему взятия функции
func parseQuoted(read func() byte) bool
и запустить ее в отдельном потоке управления, чтобы байты по одному передавались в метод Write
. Вместо специальной канальной реализации, приведенной в этом посте, мы можем использовать:
type parser struct {
resume func(byte) Status
}
func (p *parser) Init() {
coparse := func(_ byte, yield func(Status) byte) Status {
read := func() byte { return yield(NeedMoreInput) }
if !parseQuoted(read) {
return BadInput
}
return Success
}
p.resume = coro.New(coparse)
p.resume(0)
}
func (p *parser) Write(c byte) Status {
return p.resume(c)
}
Функция Init
выполняет всю работу, и даже не очень. Она определяет функцию coparse
, которая имеет сигнатуру, необходимую для coro.New
, что означает добавление отбрасываемого входа типа byte
. Эта функция определяет read
, которое выдает NeedMoreInput
, а затем возвращает байт, предоставленный вызывающей стороной. Затем выполняется parseQuoted(read)
, преобразующая bool-результат в обычный код состояния. Создав с помощью coro.New
корутину для coparse
, Init
вызывает p.resume(0)
, чтобы позволить coparse
перейти к первому чтению в parseQuoted
. Наконец, метод Write
представляет собой тривиальную обертку вокруг p.resume
(полный код).
Эта установка абстрагирует нас от пары каналов, которые мы поддерживали вручную в предыдущем посте, позволяя работать на более высоком уровне при написании программы.
Пример: Простое сито
В качестве несколько более крупного примера рассмотрим Doug McIlroy's concurrent prime sieve. Он состоит из конвейера корутин, по одному для каждого простого числа p
, каждая из которых выполняется:
loop:
n = get a number from left neighbor
if (p does not divide n)
pass n to right neighbor
Счетная корутина, расположенная в левой части конвейера, передает в левый конец конвейера числа 2, 3, 4, .... Корутины печати, расположенные в крайней правой части, могут считывать простые числа, печатать их и создавать новые корутины фильтрации. Первый фильтр в конвейере удаляет кратные 2, следующий - кратные 3, следующий - кратные 5 и так далее.
Созданный нами примитив coro.New
позволяет взять прямой цикл, выдающий значения, и преобразовать его в функцию, которая может быть вызвана для получения каждого значения по очереди. Вот счетчик:
func counter() func(bool) int {
return coro.New(func(more bool, yield func(int) bool) int {
for i := 2; more; i++ {
more = yield(i)
}
return 0
})
}
Логика счетчика - это литерал функции, передаваемый в New
. Он принимает yield
функцию типа func(int) bool
. Код выдает значение, передавая его в yield
, а затем получает обратно булево значение, говорящее о том, следует ли продолжать генерировать новые числа. При получении команды на остановку, либо потому, что при входе значение more
было false
, либо потому, что вызов yield
вернул false
, цикл завершается. Он возвращает конечное, игнорируемое значение, удовлетворяющее типу функции, требуемому New
.
New
превращает это в функцию цикла, обратную yield: func(bool) int
, которая может быть вызвана с true
для получения следующего значения или с false
для остановки генератора. Корутина фильтрации лишь немного сложнее:
func filter(p int, next func(bool) int) (filtered func(bool) int) {
return coro.New(func(more bool, yield func(int) bool) int {
for more {
n := next(true)
if n%p != 0 {
more = yield(n)
}
}
return next(false)
})
}
Она принимает простое p
и функцию next
, подключенную к корутине слева, а затем возвращает отфильтрованный выходной поток для подключения к корутине справа.
Наконец, у нас есть корутина печати:
func main() {
next := counter()
for i := 0; i < 10; i++ {
p := next(true)
fmt.Println(p)
next = filter(p, next)
}
next(false)
}
Начиная со счетчика, main
сохраняет в next
вывод построенного на данный момент конвейера. Затем выполняется цикл: чтение простого числа p
, печать p
, а затем добавление нового фильтра на правом конце конвейера для удаления кратных p
(полный код).
Обратите внимание, что отношения вызова между корутинами могут меняться со временем: любая корутина C может вызвать функцию next
другой корутины D и стать корутиной, к которой D передает управление. Первый yield
счетчика идет к main
, в то время как его последующие yield
идут ко 2-ому фильтру. Аналогично, каждый p
-фильтр передает свой первый вывод (следующее простое число) в main
, в то время как его последующие yield
идут к фильтру для этого следующего простого числа.
Корутины и горутины
В определенном смысле неправильно называть эти потоки управления корутинами. Это полноценные горутины, и они могут делать все, что может обычная горутина, включая блок ожидания мьютексов, каналов, системных вызовов и т.д. Что делает coro.New
, так это создает горутины с доступом к операциям переключения корутин внутри функций yield
и resume
(которые сито называет next
). Возможность использования этих операций может быть даже передана разным горутинам, что и происходит при передаче main
каждого своего next
потока каждой следующей горутине filter
. В отличие от оператора go
, coro.New
добавляет новую конкурентность в программу без нового параллелизма. Горутина, которую создает coro.New(f)
, может выполняться только тогда, когда какая-либо другая горутина явно одолжит ей возможность выполнения с помощью resume
; этот заем возвращается с помощью yield
или возврата f
. Если у вас есть только одна главная горутина и выполняется 10 операторов go
, то все 11 горутин могут быть запущены одновременно. Напротив, если у вас есть одна главная горутина и выполняется 10 вызовов coro.New
, то теперь есть 11 потоков управления, но параллельность программы остается прежней: одновременно выполняется только один. То, какие именно горутины приостанавливаются в операциях корутин, может меняться по мере выполнения программы, но параллелизм при этом не увеличивается.
Короче говоря, go
создает новый конкурентный параллельный поток управления, а coro.New
- новый конкурентный непараллельный поток управления. Удобно продолжать говорить о непараллельных потоках управления как о корутинах, но следует помнить, что то, какие именно корутины являются "непараллельными", может меняться в процессе выполнения программы, точно так же, как может меняться в процессе выполнения программы то, какие горутины принимают или отправляют из каналов.
Надежные резюме
Есть несколько улучшений, которые мы можем внести в coro.New
, чтобы он лучше работал в реальных программах. Первое - разрешить вызывать resume
после завершения работы функции: сейчас это приводит к deadlock-у. Добавим bool-результат, указывающий, был ли результат resume
получен в результате yield
. Реализация coro.New
, которую мы имеем на данный момент, выглядит следующим образом:
func New[In, Out any](f func(in In, yield func(Out) In) Out) (resume func(In) Out) {
cin := make(chan In)
cout := make(chan Out)
resume = func(in In) Out {
cin <- in
return <-cout
}
yield := func(out Out) In {
cout <- out
return <-cin
}
go func() {
cout <- f(<-cin, yield)
}()
return resume
}
Чтобы добавить этот дополнительный результат, нам нужно отследить, выполняется ли f
, и вернуть этот результат из resume
:
func New[In, Out any](f func(in In, yield func(Out) In) Out) (resume func(In) (Out, bool)) {
cin := make(chan In)
cout := make(chan Out)
running := true
resume = func(in In) (out Out, ok bool) {
if !running {
return
}
cin <- in
out = <-cout
return out, running
}
yield := func(out Out) In {
cout <- out
return <-cin
}
go func() {
out := f(<-cin, yield)
running = false
cout <- out
}()
return resume
}
Обратите внимание, что поскольку resume
может выполняться только тогда, когда вызывающая горутина заблокирована, и наоборот, совместное использование переменной running
не является гонкой. Они синхронизируются, выполняясь по очереди. Если resume
вызывается после выхода горутины, то resume
возвращает нулевое значение и false
.
Теперь мы можем определить, когда горутина завершила свою работу (полный код):
func main() {
resume := coro.New(func(_ int, yield func(string) int) string {
yield("hello")
yield("world")
return "done"
})
for i := 0; i < 4; i++ {
s, ok := resume(0)
fmt.Printf("%q %v\n", s, ok)
}
}
$ go run cohello.go
"hello" true
"world" true
"done" false
"" false
$
Пример: Преобразование итератора
В примере с простым ситом было показано прямое использование coro.New
, но аргумент more bool
был несколько неудобен и не соответствовал рассмотренным ранее функциям итераторов. Рассмотрим преобразование любого push-итератора в pull-итератор с помощью coro.New
. Нам понадобится способ завершить работу корутины, выполняющей push-итератор, если мы хотим остановиться раньше, поэтому мы добавим в yield
булевый результат, указывающий, стоит ли продолжать, как в простом сите:
push func(yield func(V) bool)
Цель новой функции coro.Pull
- превратить эту push-функцию в pull-итератор. Итератор будет возвращать следующее значение и булево число, указывающее, закончилась ли итерация, подобно приему канала или просмотру карты:
pull func() (V, bool)
Если мы хотим остановить push-итерацию раньше времени, нам нужно как-то сигнализировать об этом, поэтому Pull
будет возвращать не только pull-функцию, но и stop-функцию:
stop func()
Если сложить эти функции вместе, то полная сигнатура Pull
будет выглядеть следующим образом:
func Pull[V any](push func(yield func(V) bool)) (pull func() (V, bool), stop func()) {
...
}
Первое, что необходимо сделать Pull
, - это запустить корутину для выполнения итератора push
, а для этого нужна функция-обертка с нужным типом, а именно функция, которая принимает more bool
, чтобы соответствовать результату bool
из yield
, и возвращает конечное значение V
. Функция pull
может вызывать resume(true)
, а функция stop
- resume(false)
:
func Pull[V any](push func(yield func(V) bool)) (pull func() (V, bool), stop func()) {
copush := func(more bool, yield func(V) bool) V {
if more {
push(yield)
}
var zero V
return zero
}
resume := coro.New(copush)
pull = func() (V, bool) {
return resume(true)
}
stop = func() {
resume(false)
}
return pull, stop
}
Вот и вся реализация. Благодаря возможностям coro.New
нам потребовалось совсем немного кода и усилий для создания хорошего преобразователя итераторов.
Чтобы использовать coro.Pull
, нам нужно переопределить метод All
дерева, чтобы он ожидал и использовал новый результат bool
из yield
:
func (t *Tree[V]) All(yield func(v V) bool) {
t.all(yield)
}
func (t *Tree[V]) all(yield func(v V) bool) bool {
return t == nil ||
t.Left.all(yield) && yield(t.Value) && t.Right.all(yield)
}
Теперь у нас есть все необходимое для написания функции сравнения деревьев на языке Go (полный код):
func cmp[V comparable](t1, t2 *Tree[V]) bool {
next1, stop1 := coro.Pull(t1.All)
next2, stop2 := coro.Pull(t2.All)
defer stop1()
defer stop2()
for {
v1, ok1 := next1()
v2, ok2 := next2()
if v1 != v2 || ok1 != ok2 {
return false
}
if !ok1 && !ok2 {
return true
}
}
}
Распространение паники
Еще одним улучшением является передача паники от корутины обратно ее вызывающей стороне, то есть той корутине, которая в последний раз вызывала resume
для ее выполнения (и, следовательно, сидит заблокированной в resume
в ожидании ее). Очень распространенным запросом является создание некоторого механизма для информирования одной горутины о панике другой, но в общем случае это может быть затруднительно, поскольку мы не знаем, какую горутину информировать и готова ли она услышать это сообщение. В случае с корутинами у нас вызывающая сторона заблокирована в ожидании новостей, поэтому имеет смысл передать сообщение о панике.
Чтобы сделать это, мы можем добавить defer
, чтобы перехватить панику в новой корутине и снова вызвать ее в ожидающем resume
.
type msg[T any] struct {
panic any
val T
}
func New[In, Out any](f func(in In, yield func(Out) In) Out) (resume func(In) (Out, bool)) {
cin := make(chan In)
cout := make(chan msg[Out])
running := true
resume = func(in In) (out Out, ok bool) {
if !running {
return
}
cin <- in
m := <-cout
if m.panic != nil {
panic(m.panic)
}
return m.val, running
}
yield := func(out Out) In {
cout <- msg[Out]{val: out}
return <-cin
}
go func() {
defer func() {
if running {
running = false
cout <- msg[Out]{panic: recover()}
}
}()
out := f(<-cin, yield)
running = false
cout <- msg[Out]{val: out}
}()
return resume
}
Протестируем его (полный код):
func main() {
defer func() {
if e := recover(); e != nil {
fmt.Println("main panic:", e)
panic(e)
}
}()
next, _ := coro.Pull(func(yield func(string) bool) {
yield("hello")
panic("world")
})
for {
fmt.Println(next())
}
}
Новая корутина выдает hello
, а затем вызывает панику world
. Эта паника передается обратно в главную горутину, которая печатает значение и восстанавливается. Видно, что паника возникает в вызове resume
:
% go run coro.go
hello true
main panic: world
panic: world [recovered]
panic: world
goroutine 1 [running]:
main.main.func1()
/tmp/coro.go:9 +0x95
panic({0x108f360?, 0x10c2cf0?})
/go/src/runtime/panic.go:1003 +0x225
main.coro_New[...].func1()
/tmp/coro.go.go:55 +0x91
main.Pull[...].func2()
/tmp/coro.go.go:31 +0x1c
main.main()
/tmp/coro.go.go:17 +0x52
exit status 2
%
Отмена
Распространение паники позаботилось о том, чтобы сообщить вызывающей стороне о досрочном выходе из корутины, но как сообщить корутине о досрочном выходе вызывающей стороны? По аналогии с функцией stop
в pull-итераторе, нам нужно каким-то образом сигнализировать корутине, что она больше не нужна, возможно, потому, что вызывающий паникует, а возможно, потому, что вызывающий просто возвращается.
Для этого мы можем изменить coro.New
так, чтобы он возвращал не только resume
, но и функцию cancel
. Вызов cancel
будет похож на resume
, только вместо возврата значения yield
будет паниковать. Если во время отмены корутина паникует по-другому, мы хотим, чтобы cancel
распространяла эту панику, как это делает resume
. Но, конечно, мы не хотим, чтобы cancel
распространял свою собственную панику, поэтому мы создаем уникальное значение паники, которое можно проверить. Мы также должны обработать отмену перед началом выполнения f
.
var ErrCanceled = errors.New("coroutine canceled")
func New[In, Out any](f func(in In, yield func(Out) In) Out) (resume func(In) (Out, bool), cancel func()) {
cin := make(chan msg[In])
cout := make(chan msg[Out])
running := true
resume = func(in In) (out Out, ok bool) {
if !running {
return
}
cin <- msg[In]{val: in}
m := <-cout
if m.panic != nil {
panic(m.panic)
}
return m.val, running
}
cancel = func() {
e := fmt.Errorf("%w", ErrCanceled) // unique wrapper
cin <- msg[In]{panic: e}
m := <-cout
if m.panic != nil && m.panic != e {
panic(m.panic)
}
}
yield := func(out Out) In {
cout <- msg[Out]{val: out}
m := <-cin
if m.panic != nil {
panic(m.panic)
}
return m.val
}
go func() {
defer func() {
if running {
running = false
cout <- msg[Out]{panic: recover()}
}
}()
var out Out
m := <-cin
if m.panic == nil {
out = f(m.val, yield)
}
running = false
cout <- msg[Out]{val: out}
}()
return resume, cancel
}
Мы могли бы изменить Pull
, чтобы использовать паники и для отмены итераторов, но в этом контексте явный bool
кажется более понятным, тем более что остановка итератора не является исключением.
Пример: Пересмотр простого сита
Давайте посмотрим, как распространение и отмена паники позволяют очистке простого сита "просто работать". Сначала обновим сито, чтобы оно использовало новый API. Функции counter
и filter
уже являются "однострочными" вызовами return coro.New(...)
. Изменим сигнатуру, чтобы включить в нее дополнительную cancel
-функцию, возвращаемую из coro.New
:
func counter() (func(bool) (int, bool), func()) {
return coro.New(...)
}
func filter(p int, next func(bool) (int, bool)) (func(bool) (int, bool), func()) {
return coro.New(...)
}
Затем преобразуем функцию main
в функцию primes
, которая выводит n
простых чисел (полный код):
func primes(n int) {
next, cancel := counter()
defer cancel()
for i := 0; i < n; i++ {
p, _ := next(true)
fmt.Println(p)
next, cancel = filter(p, next)
defer cancel()
}
}
При выполнении этой функции, получив n
простых чисел, она возвращается. Каждый из отложенных вызовов cancel
очищает созданные корутины. А что если в одной из этих корутин произошла ошибка и она запаниковала? Если эта корутина была возобновлена next
-вызовом в primes
, то паника возвращается в primes
, и отложенные cancel
-вызовы primes
очищают все остальные корутины. Если же работа была возобновлена next
-вызовом в корутине filter
, то паника распространяется до ожидающей корутины filter
, затем до следующей ожидающей корутины filter
и так далее, пока не дойдет до p := next(true)
в primes
, который снова очистит оставшиеся корутины.
API
Мы пришли к следующему API:
New
создает новую приостановленную корутину, готовую выполнить функциюf
. Новая корутина - это горутина, которая никогда не выполняется сама по себе: она выполняется только в то время, когда какая-либо другая горутина вызывает ее и ожидает, вызываяresume
илиcancel
.Горутина может приостановить свою работу и переключиться на новую корутину, вызвав
resume(in)
. Первый вызовresume
запускаетf(in, yield)
. Вызовresume
блокирует выполнениеf
до тех пор, покаf
не вызоветyield(out)
или не вернется в исходное состояние. Когдаf
вызываетyield
, тоyield
блокируется, аresume
возвращаетout, true
. Когдаf
возвращается,resume
возвращаетout, false
. Еслиresume
вернулся из-заyield
, то следующийresume(in)
переключается обратно наf
, аyield
возвращаетсяin
.cancel
прекращает выполнениеf
и закрывает корутину. Если вызовresume
не был произведен, тоf
вообще не выполняется. В противном случае отмена приводит к панике заблокированного вызоваyield
с ошибкой, удовлетворяющейerrors.Is(err, ErrCanceled)
.Если
f
паникует и не восстанавливает панику, то паника останавливается в корутинеf
и перезапускается в ожидающейf
горутине, вызывая повторную панику заблокированного ожиданияresume
илиcancel
с тем же значением паники.cancel
не выполняет повторную панику, если паникаf
- это паника, которую вызвала самаcancel
.После возврата
f
или паники корутины больше не существует. Последующие вызовыresume
возвращают нулевое значение иfalse
. Последующие вызовыcancel
просто возвращаются.Функции
resume
,cancel
иyield
могут передаваться между различными горутинами и использоваться ими, тем самым динамически меняя, какая из горутин является "корутиной". ХотяNew
создает новую горутину, она также устанавливает инвариант, согласно которому одна горутина всегда заблокирована - либо вresume
,cancel
,yield
, либо (сразу послеNew
) в ожиданииresume
, которая вызоветf
. Этот инвариант сохраняется до возвращенияf
, после чего новая горутина завершается. В результатеcoro.New
создает новую конкурентность в программе без какого-либо нового параллелизма.Если несколько горутин вызывают
resume
илиcancel
, то эти вызовы сериализуются. Аналогичным образом, если несколько горутин вызываютyield
, эти вызовы сериализуются.
func New[In, Out any](f func(in In, yield func(Out) In) Out) (resume func(In) (Out, bool), cancel func())
Эффективность
Как я уже говорил в начале, хотя важно иметь определение корутин, которое можно понять по ссылке на чистую реализацию Go, я считаю, что нам следует использовать оптимизированную реализацию во время выполнения. На моем MacBook Pro 2019 года передача значений туда и обратно с помощью основанного на каналах coro.New
, описанного в этом посте, требует примерно 190 нс на переключение, или 380 нс на значение в coro.Pull
. Помните, что coro.Pull
не является стандартным способом использования итератора: стандартным способом является прямой вызов итератора, который вообще не имеет накладных расходов на корутину. coro.Pull
нужен только в том случае, если требуется обрабатывать итерируемые значения инкрементально, а не с помощью одного цикла for
. Тем не менее, мы хотим сделать coro.Pull
настолько быстрым, насколько это возможно.
Сначала я попробовал заставить компилятор помечать пары send-receive и оставлять подсказки для среды выполнения, чтобы объединить их в одну операцию. Это позволило бы времени выполнения канала обойти планировщик и перейти непосредственно к другой корутине. Такая реализация требует около 118 нс на одно переключение, или 236 нс на одно pulled-значение (на 38% быстрее). Это уже лучше, но все равно не так быстро, как хотелось бы. Вся общность каналов добавляет слишком много накладных расходов.
Далее я добавил прямой переключатель корутин во время выполнения, полностью избегая каналов. Это позволило сократить переключение между корутинами до трех атомарных сравнений (одно в структуре данных корутины, одно для статуса планировщика блокирующей корутины и одно для статуса планировщика возобновляющей корутины), что я считаю оптимальным с учетом инвариантов безопасности, которые необходимо поддерживать. Такая реализация занимает 20 нс на переключение, или 40 нс на pulled-значение. Это примерно в 10 раз быстрее, чем исходная реализация канала. Возможно, более важно то, что 40 нс на pulled-значение кажутся достаточно малыми в абсолютном выражении, чтобы не стать узким местом для кода, которому нужен coro.Pull
.
Расс Кокс - это один из ключевых разработчиков языка программирования Go. Он внес значительный вклад в развитие Go и активно участвует в его поддержке. Расс также выступает с докладами на конференциях, например, на GopherCon 2022, где он говорил о совместимости и том, как программы на Go продолжают работать. Кроме того, он ведет исследования в области моделей памяти. Вы можете найти его работы на GitHub.
Трудности перевода. В контексте программирования, yield
обычно не переводится как "выход". Это потому, что yield
не завершает функцию, в отличие от return
. Вместо этого, yield
приостанавливает выполнение функции и "производит" или "генерирует" значение. Когда функция возобновляет свое выполнение, она продолжает с того места, где был вызван yield
. Таким образом, yield
больше похож на "паузу" или "производство", а не "выход".