WaitGroup — это, по сути, способ дождаться, пока несколько горутин закончат свою работу. Мы начнем с основ, а затем разберемся, как это устроено под капотом.
Есть классные статьи, где рассматривается несколько примитивов синхронизации в Go:
Каждый из них имеет свой набор проблем, и этот не отличается. Мы сосредоточимся на проблемах выравнивания WaitGroup, и как ее внутренняя структура менялась в разных версиях.
Эта статья основана на версии Go 1.23.
Что такое sync.WaitGroup
Давайте сначала погрузимся в проблему. Представьте, что у вас есть большая задача , и вы решили разбить ее на более мелкие подзадачи, которые могут выполняться одновременно, они не зависят друг от друга.
Чтобы справиться с этим, мы используем горутины, потому что они позволяют этим маленьким задачам выполняться параллельно:
func main() {
for i := 0; i < 10; i++ {
go func(i int) {
fmt.Println("Task", i)
}(i)
}
fmt.Println("Done")
}
// Output:
// Done
Но вот в чем дело: велика вероятность того, что главная горутина завершит работу и выйдет из нее раньше, чем другие горутины закончат свою работу.
Когда мы отделяем множество программ для выполнения своих задач, мы хотим отслеживать их, чтобы главная горутина не завершила свою работу и не вышла из нее раньше всех остальных. Вот тут-то и приходит на помощь WaitGroup. Каждый раз, когда одна из наших горутин завершает свою задачу, она сообщает об этом WaitGroup.
После того, как все горутины помечены, как «выполненные», главная горутина завершает работу, и все аккуратно сворачивается.
func main() {
var wg sync.WaitGroup
wg.Add(10)
for i := 0; i < 10; i++ {
go func(i int) {
defer wg.Done()
fmt.Println("Task", i)
}(i)
}
wg.Wait()
fmt.Println("Done")
}
// Output:
// Task 0
// Task 1
// Task 2
// Task 3
// Task 4
// Task 5
// Task 6
// Task 7
// Task 8
// Task 9
// Done
Итак, вот как это обычно происходит:
Добавление горутин: Прежде чем запустить свои горутины, вы сообщаете WaitGroup, сколько их ждать. Для этого используется WaitGroup.Add(n)
, где n — количество планируемых к запуску Goroutines.
Гороутины запущены: Каждая горутина запускается и делает свое дело. Когда она закончит, она должна сообщить об этом WaitGroup
, вызвав WaitGroup.Done()
, чтобы уменьшить счетчик на единицу.
Ожидание для всех горутин: В главной горутине, которая не занимается тяжелой работой, вы вызываете WaitGroup.Wait()
. Это приостанавливает выполнение главной программы до тех пор, пока счетчик в WaitGroup не достигнет нуля. Проще говоря, она ждет, пока все остальные горутины не завершат свою работу и не подадут сигнал об окончании.
Обычно WaitGroup.Add(1)
используется при запуске какой-либо горутины:
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
...
}()
}
Технически оба способа подходят, но использование wg.Add(1)
немного снижает производительность. Тем не менее, это менее опасно по сравнению с использованием wg.Add(1)
.
Почему
wg.Add(n)
считается ошибкоопасным?
Дело в том, что если в дальнейшем логика цикла изменится, например, кто-то добавит оператор continue, который будет пропускать некоторые итерации, все может пойти наперекосяк:
wg.Add(10)
for i := 0; i < 10; i++ {
if someCondition(i) {
continue
}
go func() {
defer wg.Done()
...
}()
}
В этом примере мы используем wg.Add(n)
перед циклом, предполагая, что цикл всегда будет запускать ровно n горутин.
Но если это не произойдет , например, если некоторые итерации будут пропущены, то ваша программа может застрять в ожидании горутин, которые так и не были запущены. Если честно, именно такая ошибка может стать настоящей болью для отслеживания.
В этом случае больше подходит wg.Add(1)
. Это может быть связано с небольшим снижением производительности, но это гораздо лучше, чем иметь дело с человеческими ошибками.
Также есть распространенная ошибка, которую люди допускают при использовании sync.WaitGroup
:
for i := 0; i < 10; i++ {
go func() {
wg.Add(1)
defer wg.Done()
...
}()
}
Вот к чему это приводит: wg.Add(1)
вызывается внутри горутиной. Это может стать проблемой, потому что горутина может начать выполняться после того, как главная горутина уже вызвала wg.Wait().
Это может вызвать всевозможные проблемы со временем. Также, если вы заметили, во всех приведенных выше примерах defer
используется вместе с wg.Done()
. Его действительно следует использовать вместе с defer
, чтобы избежать проблем с множественными путями возврата или паническим восстановлением, чтобы убедиться, что он всегда вызывается и не блокирует вызывающего на неопределенное время.
Это должно покрыть все основные кейсы. Давайте перейдем к следующему разделу.
Как выглядит sync.WaitGroup
Для начала давайте посмотрим исходный код sync.WaitGroup
. Вы заметите похожую схему в sync.Mutex.
Опять же, если вы не знаете, как работает Mutex, мы рекомендуем вам сначала ознакомиться с этой статьей.
type WaitGroup struct {
noCopy noCopy
state atomic.Uint64
sema uint32
}
type noCopy struct{}
func (*noCopy) Lock() {}
func (*noCopy) Unlock() {}
В Go легко скопировать структуру, просто присвоив ее другой переменной. Но некоторые структуры, например WaitGroup, лучше не копировать.
Копирование WaitGroup может испортить все, потому что внутреннее состояние, которое отслеживает горутины и их синхронизацию, может рассинхронизироваться между копиями.
Такие же проблемы могут возникнуть и с WaitGroup.
noCopy
Структура noCopy
включена в WaitGroup как способ предотвратить ошибки копирования, но не путем выброса ошибок, а в качестве предупреждения. Она была внесена Александром Валялкиным, техническим директором VictoriaMetrics, и представлена в изменении #22015.
Структура noCopy
на самом деле не влияет на то, как выполняется ваша программа. Вместо этого она служит маркером, по которому такие инструменты, как go vet
, могут определить, когда структура была скопирована таким образом, что этого делать не следует.
type noCopy struct{}
func (*noCopy) Lock() {}
func (*noCopy) Unlock() {}
Структура очень проста:
У него нет полей, поэтому он не занимает никакого значимого места в памяти.
У него есть два метода, Lock и Unlock, которые ничего не делают (no-op). Эти методы существуют только для того, чтобы работать с программой проверки —copylocks в инструменте go vet.
Когда вы запускаете go vet на своем коде, он проверяет, не были ли структуры с полем noCopy, например WaitGroup, скопированы таким образом, что это может вызвать проблемы.
Она выдаст ошибку, чтобы сообщить вам о возможной проблеме. Это даст вам возможность исправить ее до компиляции:
func main() {
var a sync.WaitGroup
b := a
fmt.Println(a, b)
}
// go vet:
// assignment copies lock value to b: sync.WaitGroup contains sync.noCopy
// call of fmt.Println copies lock value: sync.WaitGroup contains sync.noCopy
// call of fmt.Println copies lock value: sync.WaitGroup contains sync.noCopy
В этом случае go vet предупредит вас о трех различных местах, где происходит копирование. Вы можете попробовать сами на Go Playground.
Обратите внимание, что это всего лишь мера предосторожности, когда мы пишем и тестируем наш код, мы все еще можем запускать его как обычно.
Внутреннее состояние
Состояние WaitGroup
хранится в переменной atomic.Uint64
. В этом единственном значении упаковано несколько вещей.
Вот как это выглядит:
Counter (старшие 32 бита): Эта часть отслеживает количество горутин, которых ожидает группа WaitGroup. Когда вы вызываете wg.Add()
с положительным значением, он увеличивает этот счетчик, а когда вы вызываете wg.Done()
, он уменьшает счетчик на единицу.
Waiter (младшие 32 бита): Отслеживает количество горутин, ожидающих, пока этот счетчик (старшие 32 бита) достигнет нуля. Каждый раз, когда вы вызываете wg.Wait(), он увеличивает этот счетчик «ожидания». Как только счетчик достигнет нуля, он освободит все горутины, которые ждали.
И последнее поле, sema uint32
, которое является внутренним semaphore, управляемым средой выполнения Go.
Когда горутина вызывает wg.Wait()
и счетчик не равен нулю, она увеличивает счетчик ожидающих, а затем блокирует его, вызывая runtime_Semacquire(&wg.sema)
. Этот вызов функции переводит горутину в спящий режим, пока она не будет “разбужена” соответствующим вызовом runtime_Semrelease(&wg.sema)
.
Проблема выравнивания
Разговоры о прошлом скучны, особенно когда хочется перейти сразу к делу. Но знание прошлого — лучший способ понять настоящее.
Давайте посмотрим, как WaitGroup развивалась на протяжении нескольких версий Go:
Могу сказать, что ядро WaitGroup (the counter, waiter, and semaphore) в разных версиях Go практически не менялось. Однако струтктура этих элементов менялась много раз.
Когда мы говорим о выравнивании, мы имеем в виду необходимость хранения типов данных по определенным адресам памяти для обеспечения эффективного доступа.
Например, в 64-битной системе 64-битное значение, такое как uint64
, в идеале должно храниться по адресу памяти, кратному 8 байтам. Причина в том, что процессор может получить выровненные данные за один раз, но если данные не выровнены, то для доступа к ним может потребоваться несколько операций.
Вот тут-то и возникают сложности:
На 32-битных архитектурах компилятор не гарантирует, что 64-битные значения будут выровнены по 8-байтовой границе. Вместо этого они могут быть выровнены только по 4-байтовой границе.
Это становится проблемой, когда мы используем пакет atomic для выполнения операций над переменной состояния. Пакет atomic специально отмечает:
«На ARM, 386 и 32-битных MIPS ответственность за выравнивание 64-битных слов, доступ к которым осуществляется атомарно с помощью примитивных атомарных функций, лежит на вызывающей стороне», — примечание к пакету atomic.
Это означает, что если мы не выровняем переменную state uint64 по 8-байтовой границе на этих 32-битных архитектурах, то это может привести к аварийному завершению программы.
Итак, что же можно исправить? Давайте посмотрим, как это решалось в разных версиях.
Go 1.5: state1 [12]байт
Я бы порекомендовал вам угадать логику этого решения, пока вы читаете приведенный ниже код, а затем мы вместе разберемся с ним.
type WaitGroup struct {
state1 [12]byte
sema uint32
}
func (wg *WaitGroup) state() *uint64 {
if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
return (*uint64)(unsafe.Pointer(&wg.state1))
} else {
return (*uint64)(unsafe.Pointer(&wg.state1[4]))
}
}
Вместо того чтобы напрямую использовать uint64 для состояния, WaitGroup отводит 12 байт в массиве (state1 [12]byte). Может показаться, что это больше, чем нужно, но на это есть своя причина.
Цель использования 12 байт заключается в том, чтобы обеспечить достаточное пространство для поиска 8-байтового сегмента с правильным выравниванием.
Выравнивание структуры зависит от типа ее поля, поэтому в зависимости от архитектуры оно может быть 8- или 4-байтовым. Если начальный адрес state1
не выровнен, код может просто сместиться на несколько байт (в данном случае на 4 байта), чтобы найти участок в этих 12 байтах, который выровнен.
Go 1.11: state1 [3]uint32
Using 12 bytes was overkill, with 4 bytes essentially do nothing. In Go 1.11, the solution was to streamline things by merging the state (which includes the counter and waiter) and sema into just 3 uint32 fields.
type WaitGroup struct {
noCopy noCopy
state1 [3]uint32
}
// state returns pointers to the state and sema fields stored within wg.state1.
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
} else {
return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
}
}
Итак, как же нам теперь избежать проблемы выравнивания состояний? Ответ кроется в методе state(), но если вы не знакомы с пакетом unsafe, он может показаться немного сложным.
Поскольку state1
теперь является массивом uint32, он начинается с адреса, кратного 4 байтам.
Для переменной типа массив ее выравнивание соответствует выравниванию типа ее элемента.
Вот как работает метод state()
, если адрес wg.state1
не выровнен по 8 байтам:
Первый элемент
(state1[0])
используется для sema.Второй элемент
(state1[1])
используется для отслеживания количества waiter.Третий элемент
(state1[2]
) используется для counter.
А если адрес wg.state1
выровнен по 8 байт, то элементы в state1
переставляются между state и sema, как показано на рисунке выше.
Go 1.18: state1 uint64; state2 uint32
Проблема предыдущего подхода заключалась в том, что независимо от того, имеем ли мы дело с 64-битным или 32-битным выравниванием, нам все равно приходилось жонглировать state и sema в 12-байтном массиве, верно? Но поскольку большинство систем сегодня 64-битные, имело смысл оптимизировать под более распространенный сценарий.
Идея в Go 1.18 проста: на 64-битной системе нам не нужно делать ничего особенного, state1
хранит состояние, а state2
служит для semaphore:
type WaitGroup struct {
noCopy noCopy
state1 uint64
state2 uint32
}
// state returns pointers to the state and sema fields stored within wg.state*.
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
if unsafe.Alignof(wg.state1) == 8 || uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
// state1 is 64-bit aligned: nothing to do.
return &wg.state1, &wg.state2
} else {
state := (*[3]uint32)(unsafe.Pointer(&wg.state1))
return (*uint64)(unsafe.Pointer(&state[1])), &state[0]
}
}
Однако на системе с 4-байтовым выравниванием код возвращается к предыдущему решению Go 1.11. Метод state()
преобразует state1
и state2
в трехэлементный массив uint32 и переставляет элементы, чтобы сохранить правильное выравнивание state
и sema
.
Самое интересное происходит в Go 1.20, где они решили использовать atomic.Uint64
для работы с состоянием, полностью устранив необходимость в методе state()
.
Go 1.20: state atomic.Uint64
В Go 1.19 была введена ключевая оптимизация, чтобы значения uint64, используемые в атомарных операциях, всегда выравнивались по 8-байтовым границам, даже на 32-битных архитектурах.
Для этого Расс Кокс ввел специальную структуру atomic.Uint64, которая, по сути, является оберткой вокруг uint64:
type Uint64 struct {
_ noCopy
_ align64
v uint64
}
// align64 may be added to structs that must be 64-bit aligned.
// This struct is recognized by a special case in the compiler
// and will not work if copied to any other package.
type align64 struct{}
Так в чем же дело с align64
Когда вы включаете align64
в структуру, это сигнализирует компилятору Go, что вся структура должна быть выровнена по 8-байтовой границе в памяти.
Но как именно
align64
это делает?
Получается, align64
— это обычная, пустая структура. У нее нет никаких методов или специального поведения, как у структуры noCopy
, о которой мы говорили ранее.
Но вот где происходит волшебство. Само поле align64 не занимает места, но оно выступает в качестве «маркера», который указывает компилятору Go, что со структурой нужно работать по-другому. Компилятор Go распознает align64
и автоматически корректирует расположение памяти struct, добавляя необходимые прокладки, чтобы вся struct начиналась с 8-байтовой границы.
После этого структура WaitGroup становится намного проще:
type WaitGroup struct {
noCopy noCopy
state atomic.Uint64
sema uint32
}
Благодаря atomic.Uint64 состояние гарантированно выравнивается по 8 байтам, поэтому вам не придется беспокоиться о проблемах выравнивания, которые могут испортить атомарные операции над 64-битными переменными.
Внутренняя работа sync.WaitGroup
Мы рассмотрели внутреннюю структуру и проблемы выравнивания uint64 на 32-битных архитектурах, особенно при работе с атомарными операциями. Но есть один важный вопрос, который мы еще не решили — почему бы нам просто не разделить counter и waiter на две отдельные переменные uint32?
Кажется, что это могло бы упростить дело.
Если бы мы использовали мьютекс для управления параллелизмом, нам не нужно было бы беспокоиться о том, на какой системе мы работаем — 32-битной или 64-битной, не нужно было бы решать проблемы выравнивания.
Однако здесь есть компромисс. Использование блокировок, как и мьютексов, увеличивает накладные расходы, особенно если операции выполняются часто. Каждый раз, когда вы блокируете и разблокируете, вы добавляете немного задержки, которая может накапливаться, если речь идет о высокочастотных операциях.
С другой стороны, мы используем атомарные операции для безопасного изменения этой 64-битной переменной, без необходимости блокировать и разблокировать мьютекс.
Теперь давайте разберем, как работает каждый метод в WaitGroup, и поймем, как Go реализует алгоритм без блокировок, используя state.
wg.Add(delta int)
Когда мы передаем значение в метод wg.Add(?)
, он соответствующим образом корректирует counter.
Если вы передаете положительную дельту, она прибавляется к counter. Интересно, что можно передать и отрицательную дельту, которая вычтет из counter. Как вы уже догадались, wg.Done()
— это просто сокращение для wg.Add(-1)
:
// Done decrements the [WaitGroup] counter by one.
func (wg *WaitGroup) Done() {
wg.Add(-1)
}
Однако если отрицательная дельта приведет к тому, что счетчик упадет ниже нуля, программа запаникует.
WaitGroup не проверяет счетчик перед обновлением, она сначала обновляет его, а затем проверяет. Это означает, что если счетчик станет отрицательным после вызова wg.Add(?)
, он останется отрицательным до тех пор, пока не случится паника.
Поэтому, если и планируете повторно использовать WaitGroup, будьте внимательны.
Не стесняйтесь пропустить приведенный ниже фрагмент кода, если вам нужен только обзор, но он здесь, если вы хотите глубже понять, как все работает.
func (wg *WaitGroup) Add(delta int) {
... // we excludes the race stuffs
// Add the delta to the counter
state := wg.state.Add(uint64(delta) << 32)
// Extract the counter and waiter count from the state
v := int32(state >> 32)
w := uint32(state)
...
if v < 0 {
panic("sync: negative WaitGroup counter")
}
if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
if v > 0 || w == 0 {
return
}
if wg.state.Load() != state {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// Reset waiters count to 0.
wg.state.Store(0)
for ; w != 0; w-- {
runtime_Semrelease(&wg.sema, false, 0)
}
}
Обратите внимание, что вы можете увеличить счетчик атомарно, вызвав wg.state.Add(uint64(delta) << 32)
.
Вот что, возможно, не очень известно: когда вы добавляете положительную дельту для обозначения начала новых задач или горутин, это должно происходить до вызова wg.Wait()
.
Поэтому, если вы повторно используете группу WaitGroup или хотите дождаться одной партии задач, сбросить ее, а затем дождаться другой партии, вам нужно убедиться, что все вызовы wg.Wait
завершены, прежде чем начинать новые вызовы wg.Add(positive)
для следующей партии задач.
Это позволит избежать путаницы в том, какими задачами в данный момент управляет группа WaitGroup.
С другой стороны, вы можете вызвать wg.Add(?)
с отрицательной дельтой в любое время, если это не приведет к переходу счетчика в отрицательную область.
Wait()
О функции wg.Wait()
можно сказать не так уж много. В основном она выполняет цикл и пытается увеличить счетчик ожиданий с помощью атомарной операции CAS (Compare-And-Swap).
// Wait blocks until the [WaitGroup] counter is zero.
func (wg *WaitGroup) Wait() {
... // we excludes the race stuffs
for {
// Load the state of the WaitGroup
state := wg.state.Load()
v := int32(state >> 32)
w := uint32(state)
if v == 0 {
// Counter is 0, no need to wait.
...
return
}
// Increment waiters count.
if wg.state.CompareAndSwap(state, state+1) {
...
runtime_Semacquire(&wg.sema)
if wg.state.Load() != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
...
return
}
}
}
Если операция CAS завершилась неудачно, это означает, что другая горутина изменила состояние, возможно, счетчик достиг нуля или был увеличен/уменьшен. В этом случае wg.Wait() не может просто считать, что все осталось как было, поэтому она повторяет попытку.
Когда CAS завершается успешно, wg.Wait()
увеличивает счетчик ожидания, а затем переводит горутину в спящий режим с помощью semaphore.
Если вы помните, в конце wg.Add()
проверяет два условия: равен ли счетчик 0 и больше ли счетчик ожидания 0. Если оба условия верны, она будит все ожидающие горутины и сбрасывает состояние в 0.
Вот и вся история с sync.WaitGroup
. Если вы дошли до конца статьи, значит, вы явно вникли в детали.