Учимся применять Semaphore и Worker Pool на Go
Учимся применять Semaphore и Worker Pool на Go
Привет, Хабр! Я Артём Чаадаев из команды ассортимента размещения в Туту и занимаюсь разработкой на языке Go.
Большое количество статей посвящено простоте конкурентной разработки на Go, однако без практических примеров для начинающих разработчиков бывает трудно понять, как можно это применить. Более того, задачи на конкурентную разработку постоянно фигурируют на собеседованиях.
Поэтому в рамках данной статьи решим задачу, приближенную к реальным условиям, например, как деактивировать большое количество пользователей в стороннем API (обычно это бывает после применения бизнес-логики). Для решения используем два паттерна конкурентной разработки на Go: семафор (Semaphore) и пул обработчиков (Worker pool).
Также выясним плюсы и минусы обоих подходов с помощью небольших бенчмарк-тестов с точки зрения времени выполнения и затрат по памяти.
Постановка задачи
Есть некое количество пользователей, например от 1 до 100 000, по каждому надо выполнить функцию Deactivate. Как правило, это происходит путём отправки одного запроса в сторонний API (который не позволяет обновлять пачками) по данному пользователю с его ID и какими-нибудь данными. Каждый запрос даёт временные затраты. При последовательной отправке это будет приблизительно время тайм-аута запроса * количество пользователей. Это даст серьёзный рост временных затрат.
Для того чтобы ускорить время обработки пользователей, можем воспользоваться преимуществами языка Go, а именно возможностью обработки данных в конкурентном режиме. Однако при этом нужно иметь возможность ограничивать конкурентное выполнение.
В рамках данной статьи мы не будем рассматривать основы конкурентности, поэтому рекомендую к этому моменту ознакомиться с примитивами конкурентности: каналами, горутинами, WaitGroup, мьютексами. Цель данной статьи — познакомить вас с тем, как пользоваться примитивами конкурентности в рамках аналогичных задач.
Для решения данного кейса можем воспользоваться двумя подходами:
семафор (Semaphore);
пул обработчиков (Worker pool).
Для простоты не учтено завершение процессов по контексту, хотя это нужно сделать в реальных условиях.
Но при этом для наглядности (в своё время, когда я разбирался с подходами в аналогичных задачах, это меня интересовало) я оставил завершение процесса при первом же получении ошибки.
Семафор (Semaphore)
В рамках данного подхода создаётся, по сути, неограниченное количество горутин. Их выполнение блокируется с помощью объекта Semaphore, который ограничивает количество одновременно выполняемых горутин с помощью буферизированного канала. В данном подходе на каждую задачу создаётся горутина. Однако если буфер канала семафора переполнен, то при операции Acquire горутина с задачей блокируется до тех пор, пока буфер не освободится с помощью операции Release.
Для сбора результатов используется канал, из которого происходит чтение в основной горутине (здесь, конечно, речь идёт о той, в которой выполняется функция, это совсем не обязательно может быть горутина, в которой выполняется функция main).
Реализация в рамках примера показана ниже:
type Semaphore struct {
C chan struct{}
}
func (s *Semaphore) Acquire() {
s.C <- struct{}{}
}
func (s *Semaphore) Release() {
<-s.C
}
type resultWithError struct {
User users.User
Err error
}
func DeactivateUsers(usrs []users.User, gCount int) ([]users.User, error) {
// создаем семафор и передаем ему канал с размером буфера равным ограничению на количество одновременно выполняемых горутин
sem := Semaphore{
C: make(chan struct{}, gCount),
}
// канал для сбора результатов
outputCh := make(chan resultWithError, len(usrs))
// канал для оповещения горутин, что мы больше не ждем их выполнения
sgnlCh := make(chan struct{})
output := make([]users.User, 0, len(usrs))
for _, v := range usrs {
go func(usr users.User) {
sem.Acquire()
defer sem.Release()
err := usr.Deactivate()
// если ловим закрытие сигнального канала, то завершаем функцию
select {
case outputCh <- resultWithError{
User: usr,
Err: err,
}:
case <-sgnlCh:
return
}
}(v)
}
// ждем и собираем результаты
// либо мы получим все, либо выйдет ошибка, по которой мы перестанем читать
for i := len(usrs); i > 0; i-- {
res := <-outputCh
if res.Err != nil {
close(sgnlCh)
return nil, fmt.Errorf("an error occurred: %w", res.Err)
}
output = append(output, res.User)
}
return output, nil
}
Пул обработчиков (Worker pool)
Здесь же мы создаём настраиваемое заранее фиксированное количество горутин, которое не зависит от того, сколько задач на входе. Таким образом, не нужно создавать сотни тысяч горутин при соответствующем наборе пользователей.
Каждая из горутин читает общий канал с входными данными, куда отправляются задачи, пока он не закроется.
Аналогично предыдущему способу в основной горутине (здесь, конечно, речь идёт о той, в которой выполняется функция) собираются результаты.
Реализация в рамках примера показана ниже:
type resultWithError struct {
User users.User
Err error
}
func deactivateUser(wg *sync.WaitGroup, inCh <-chan users.User, outCh chan<- resultWithError) {
defer wg.Done()
for usr := range inCh {
err := usr.Deactivate()
outCh <- resultWithError{
User: usr,
Err: err,
}
}
}
func DeactivateUsers(usrs []users.User, wgCount int) ([]users.User, error) {
// канал для передачи входных данных горутинам
inputCh := make(chan users.User)
// канал для сбора результатов
outputCh := make(chan resultWithError)
// для ожидания завершения всех горутин
wg := &sync.WaitGroup{}
output := make([]users.User, 0, len(usrs))
go func() {
defer close(inputCh)
for i := range usrs {
inputCh <- usrs[i]
}
}()
go func() {
for i := 0; i < wgCount; i++ {
wg.Add(1)
go deactivateUser(wg, inputCh, outputCh)
}
wg.Wait()
close(outputCh)
}()
// собираем результаты
for res := range outputCh {
if res.Err != nil {
return nil, fmt.Errorf("an error occurred: %w", res.Err)
}
output = append(output, res.User)
}
return output, nil
}
Бенчим
В рамках работы над статьёй мне было интересно сравнить оба этих подхода с помощью бенчмарков.
Условия для бенчмаркинга были следующими:
количество пользователей — от 1 до 100 000;
количество горутин — от 10 до 1000;
количество запусков каждых кейсов —10.
Для этого использовалась утилита benchstat. Она есть тут: https://cs.opensource.google/go/x/perf.
В рамках бенчмаркинга рассмотрены следующие параметры:
затраты по времени (sec/op — секунд на операцию);
затраты по памяти (B/op — байт на операцию);
количество аллокаций (allocs/op — количество операций выделения памяти на операцию).
Сравнение затрат по времени
│ semaphore.txt │ workerpool.txt │
│ sec/op │ sec/op vs base │
DeactivateUsers/input_size_1_goroutines_count_10-10 713.2n ± 0% 4030.5n ± 1% +465.09% (p=0.000 n=10)
DeactivateUsers/input_size_10_goroutines_count_10-10 5.752µ ± 1% 7.236µ ± 0% +25.80% (p=0.000 n=10)
DeactivateUsers/input_size_100_goroutines_count_10-10 47.49µ ± 4% 41.69µ ± 0% -12.21% (p=0.000 n=10)
DeactivateUsers/input_size_1000_goroutines_count_10-10 496.2µ ± 20% 386.6µ ± 1% -22.09% (p=0.000 n=10)
DeactivateUsers/input_size_10000_goroutines_count_10-10 4.567m ± 17% 3.979m ± 1% -12.86% (p=0.023 n=10)
DeactivateUsers/input_size_100000_goroutines_count_10-10 67.03m ± 2% 39.95m ± 1% -40.41% (p=0.000 n=10)
DeactivateUsers/input_size_1_goroutines_count_100-10 708.9n ± 0% 19625.0n ± 2% +2668.18% (p=0.000 n=10)
DeactivateUsers/input_size_10_goroutines_count_100-10 5.763µ ± 1% 20.340µ ± 2% +252.96% (p=0.000 n=10)
DeactivateUsers/input_size_100_goroutines_count_100-10 47.32µ ± 2% 58.72µ ± 1% +24.10% (p=0.000 n=10)
DeactivateUsers/input_size_1000_goroutines_count_100-10 490.4µ ± 8% 466.2µ ± 26% ~ (p=0.436 n=10)
DeactivateUsers/input_size_10000_goroutines_count_100-10 4.998m ± 19% 4.812m ± 6% ~ (p=0.218 n=10)
DeactivateUsers/input_size_100000_goroutines_count_100-10 66.58m ± 2% 58.71m ± 5% -11.81% (p=0.000 n=10)
DeactivateUsers/input_size_1_goroutines_count_1000-10 707.2n ± 0% 317225.0n ± 4% +44756.48% (p=0.000 n=10)
DeactivateUsers/input_size_10_goroutines_count_1000-10 5.760µ ± 1% 309.622µ ± 4% +5275.85% (p=0.000 n=10)
DeactivateUsers/input_size_100_goroutines_count_1000-10 48.93µ ± 4% 324.29µ ± 14% +562.82% (p=0.000 n=10)
DeactivateUsers/input_size_1000_goroutines_count_1000-10 518.3µ ± 12% 636.7µ ± 14% +22.84% (p=0.000 n=10)
DeactivateUsers/input_size_10000_goroutines_count_1000-10 4.301m ± 14% 4.728m ± 16% ~ (p=0.529 n=10)
DeactivateUsers/input_size_100000_goroutines_count_1000-10 67.48m ± 3% 69.69m ± 19% ~ (p=1.000 n=10)
Видим:
огда максимальное количество одновременно выполняемых горутин больше, чем задач, то временные затраты у семафора меньше на 1–3 порядка;
когда максимальное количество одновременно выполняемых горутин меньше, чем задач, то временные затраты в принципе держатся в тех же порядках, хотя и Worker pool быстрее;
при количестве горутин от 100 и больше незначительно растут временные затраты у Worker pool, однако не особо меняются у семафора.
Сравнение затрат по памяти
│ semaphore.txt │ workerpool.txt │
│ B/op │ B/op vs base │
DeactivateUsers/input_size_1_goroutines_count_10-10 552.0 ± 0% 688.0 ± 0% +24.64% (p=0.000 n=10)
DeactivateUsers/input_size_10_goroutines_count_10-10 2.695Ki ± 0% 1.172Ki ± 0% -56.52% (p=0.000 n=10)
DeactivateUsers/input_size_100_goroutines_count_10-10 25.227Ki ± 0% 6.609Ki ± 0% -73.80% (p=0.000 n=10)
DeactivateUsers/input_size_1000_goroutines_count_10-10 237.66Ki ± 0% 56.61Ki ± 0% -76.18% (p=0.000 n=10)
DeactivateUsers/input_size_10000_goroutines_count_10-10 2350.1Ki ± 0% 552.6Ki ± 0% -76.48% (p=0.000 n=10)
DeactivateUsers/input_size_100000_goroutines_count_10-10 22.892Mi ± 0% 5.344Mi ± 0% -76.65% (p=0.000 n=10)
DeactivateUsers/input_size_1_goroutines_count_100-10 552.0 ± 0% 3568.0 ± 0% +546.38% (p=0.000 n=10)
DeactivateUsers/input_size_10_goroutines_count_100-10 2.695Ki ± 0% 3.984Ki ± 0% +47.83% (p=0.000 n=10)
DeactivateUsers/input_size_100_goroutines_count_100-10 25.227Ki ± 0% 9.424Ki ± 0% -62.64% (p=0.000 n=10)
DeactivateUsers/input_size_1000_goroutines_count_100-10 237.66Ki ± 0% 59.43Ki ± 0% -74.99% (p=0.000 n=10)
DeactivateUsers/input_size_10000_goroutines_count_100-10 2350.0Ki ± 0% 555.5Ki ± 0% -76.36% (p=0.000 n=10)
DeactivateUsers/input_size_100000_goroutines_count_100-10 22.892Mi ± 0% 5.347Mi ± 0% -76.64% (p=0.000 n=10)
DeactivateUsers/input_size_1_goroutines_count_1000-10 552.0 ± 0% 32371.5 ± 0% +5764.40% (p=0.000 n=10)
DeactivateUsers/input_size_10_goroutines_count_1000-10 2.695Ki ± 0% 32.115Ki ± 0% +1091.52% (p=0.000 n=10)
DeactivateUsers/input_size_100_goroutines_count_1000-10 25.23Ki ± 0% 37.58Ki ± 0% +48.95% (p=0.000 n=10)
DeactivateUsers/input_size_1000_goroutines_count_1000-10 237.66Ki ± 0% 87.60Ki ± 0% -63.14% (p=0.000 n=10)
DeactivateUsers/input_size_10000_goroutines_count_1000-10 2350.0Ki ± 0% 583.9Ki ± 0% -75.15% (p=0.000 n=10)
DeactivateUsers/input_size_100000_goroutines_count_1000-10 22.892Mi ± 0% 5.378Mi ± 0% -76.51% (p=0.000 n=10)
Видим:
когда максимальное количество одновременно выполняемых горутин больше, чем задач, то затраты по памяти у семафора меньше, разница вышла на 1–2 порядка;
когда максимальное количество одновременно выполняемых горутин меньше, чем задач, то затраты по памяти у Worker pool меньше, но разница примерно на 1 порядок.
Сравнение количества аллокаций
│ semaphore.txt │ workerpool.txt │
│ allocs/op │ allocs/op vs base │
DeactivateUsers/input_size_1_goroutines_count_10-10 8.000 ± 0% 16.000 ± 0% +100.00% (p=0.000 n=10)
DeactivateUsers/input_size_10_goroutines_count_10-10 26.00 ± 0% 16.00 ± 0% -38.46% (p=0.000 n=10)
DeactivateUsers/input_size_100_goroutines_count_10-10 206.00 ± 0% 16.00 ± 0% -92.23% (p=0.000 n=10)
DeactivateUsers/input_size_1000_goroutines_count_10-10 2006.00 ± 0% 16.00 ± 0% -99.20% (p=0.000 n=10)
DeactivateUsers/input_size_10000_goroutines_count_10-10 20006.00 ± 0% 16.00 ± 0% -99.92% (p=0.000 n=10)
DeactivateUsers/input_size_100000_goroutines_count_10-10 200006.00 ± 0% 16.00 ± 0% -99.99% (p=0.000 n=10)
DeactivateUsers/input_size_1_goroutines_count_100-10 8.000 ± 0% 106.000 ± 0% +1225.00% (p=0.000 n=10)
DeactivateUsers/input_size_10_goroutines_count_100-10 26.00 ± 0% 106.00 ± 0% +307.69% (p=0.000 n=10)
DeactivateUsers/input_size_100_goroutines_count_100-10 206.0 ± 0% 106.0 ± 0% -48.54% (p=0.000 n=10)
DeactivateUsers/input_size_1000_goroutines_count_100-10 2006.0 ± 0% 106.0 ± 0% -94.72% (p=0.000 n=10)
DeactivateUsers/input_size_10000_goroutines_count_100-10 20006.0 ± 0% 106.0 ± 1% -99.47% (p=0.000 n=10)
DeactivateUsers/input_size_100000_goroutines_count_100-10 200006.0 ± 0% 108.5 ± 2% -99.95% (p=0.000 n=10)
DeactivateUsers/input_size_1_goroutines_count_1000-10 8.000 ± 0% 1006.000 ± 0% +12475.00% (p=0.000 n=10)
DeactivateUsers/input_size_10_goroutines_count_1000-10 26.00 ± 0% 1006.00 ± 0% +3769.23% (p=0.000 n=10)
DeactivateUsers/input_size_100_goroutines_count_1000-10 206.0 ± 0% 1006.0 ± 0% +388.35% (p=0.000 n=10)
DeactivateUsers/input_size_1000_goroutines_count_1000-10 2.006k ± 0% 1.006k ± 0% -49.85% (p=0.000 n=10)
DeactivateUsers/input_size_10000_goroutines_count_1000-10 20.006k ± 0% 1.010k ± 0% -94.95% (p=0.000 n=10)
DeactivateUsers/input_size_100000_goroutines_count_1000-10 200.006k ± 0% 1.040k ± 1% -99.48% (p=0.000 n=10)
Видим:
при изменении нагрузки количество аллокаций растёт пропорционально количеству задач (и это логично, на каждую задачу создаём горутину) в семафоре;
при изменении нагрузки количество аллокаций константно для Worker pool и может поменяться, только если поменять заданное количество горутин.
Выводы
По ходу статьи мы познакомились с двумя паттернами конкурентности: семафор (Semaphore) и пул обработчиков (Worker pool).
При решении примера, указанного выше, можно использовать оба этих паттерна на выбор. Исходя из результатов бенчмаркинга, расхождения в затратах по времени и памяти не такие большие (в районе одного порядка). Разница ощутима только для ситуации, когда задач меньше чем горутин и, соответственно, наоборот.
Возможно у кого-то уже были практические примеры, где можно было бы чётко проследить, что конкретно лучше использовать, буду рад, если поделитесь в комментариях.
Подробнее просмотреть репозиторий вместе с тестами и бенчмарками можно здесь.