Эпиграф в стиле А.П. Чехова по следам оригинального выступления

Иван Семёныч Троицкий, программист сорока лет, застенчивый и робкий, от природы больше склонный к одиночеству, чем к публичным выступлениям, стоял в кулуарах конференции, смущённо улыбаясь и потирая ладонью вспотевший лоб. Доклад его — «Что не так с конкурентностью в Go и как с этим жить?» — принимали тепло, похлопывали по плечу и говорили:

— Знатно выступил, Иван Семёныч, знатно!

Однако он чувствовал, что наговорил массу пустяков. То есть, конечно, всё было верно: горутины, каналы, алгоритмы... Но чувствовал он себя так, словно не выступал с серьёзным докладом, а рассказал анекдот, смеясь вместе с аудиторией над какими-то пустяками, над самим собой, да над тем, как нелепо, право, порой выходит у хороших людей делать хорошие вещи.

Вернувшись домой, Иван Семёныч сел за статью, чтобы изложить всё ещё раз, подробно и без волнения. Но вместо того, чтобы писать, он глядел на экран, и в голове его тянуло какую-то странную горечь.

— Скажите, ради Бога, Варя, — обратился он к жене, — ну зачем я вообще эту конкурентность изучал?

— А вы, Иван Семёныч, всегда так, — спокойно и даже с лёгкой насмешкой сказала Варвара Павловна, не отвлекаясь от вязания. — То вы об одном печалитесь, то о другом. А люди вас любят. Говорят: талант.

— Какой уж там талант, — с тоской отвечал Троицкий, рассматривая пустой экран. — Вечно одно и то же: скажешь умно — никто не слушает, пошутишь — все смеются. Пойди разберись, чего они хотят. Вот и горутины эти: думаешь, сейчас сделаю быстро и просто, а в итоге запутался, наплодил лишних, и вместо работы они дерутся за память. Прямо, как люди…

— Что ж тут плохого? — Варвара Павловна улыбнулась и погладила мужа по плечу. — Людям нужны и смех, и горечь. Ты уж напиши, Ваня. Напиши, как на сердце. Пусть и про горутины, да от души.

И Иван Семёныч сел и написал: коротко, ясно и с лёгкой грустью. Без всяких технических подробностей, а так, словно бы о себе самом говорил — человеке, который всегда хочет сделать лучше, а выходит как всегда. В конце же добавил тихонько и почти неслышно: «Что не так с конкурентностью в Go — это ясно. А вот как с этим жить — уж решайте сами».

Затем потёр глаза, выключил компьютер и отправился пить чай, твёрдо решив, что больше никогда не будет выступать публично. Впрочем, так он решал после каждой конференции.

Конкурентность — одно из базовых преимуществ Go, но на практике она часто становится источником сложностей: гонки, дедлоки, утечки горутин, избыточная сложность. Все потому, что встроенные инструменты Go мощные, но требуют грамотного подхода.

Я Максим Мирошниченко, разработчик бэкенда Почты VK WorkSpace в VK Tech. Эта статья о принципах эффективного применения паттернов конкурентности и построения надежно работающих решений, использующих конкурентность. Вы узнаете, как определить, нужна ли вам конкурентность вообще, как проектировать микроархитектуру, на какие паттерны опираться, как тестировать и бенчмаркать свои решения. Мы пройдем путь от атомарных блоков до продакшен-архитектур, дополнительно отталкиваясь от идей Роба Пайка и практических кейсов, — чтобы результат был не просто рабочим, а надежным и удобным в поддержке. 

Что не так с конкурентностью в Go

Гонки, dead/live-локи, голодание, ложное разделение, соперничество за блокировки

Для меня Go — это про простоту, читаемость и эффективность кода. Но как только мы начинаем внедрять конкурентность в проект, у нас, по сути, остается только эффективность и множество проблем асинхронщины. Их сложно отлаживать, трудно локализовать, и, честно говоря, мне как разработчику совсем не хочется с ними сталкиваться.

Канальное мышление

В Go есть достаточно уникальная для языков концепция передачи данных между горутинами через каналы. Эта особенность предполагает, что у разработчика будет развито так называемое канальное мышление. Без него мы просто перегружаем свое окно внимания. Чтобы развить этот навык, нужно попрактиковаться в написании конкурентного кода. Это тоже может стать барьером.

Трудночитаемый код

В дополнение ко всему этому вишенкой на торте будет сам конкурентный код — его тяжело читать:

go func() {
	defer close(outStream)

	for {
		select {
		case <-ctx.Done():
			return
		case val, ok := <-input:
			if !ok {
				return
			}
			select {
			case <-ctx.Done():
				return
			case outStream <- val:
			}
		}
	}
}()

Я привел достаточно простую функцию, которая завершается при закрытии канала, из которого она читает, либо по сигналу из контекста. И мне кажется, чтобы такое быстро распознать и понять, нужно быть Магнусом Карлсеном в мире конкурентного проектирования.

Иными словами, я хочу сказать, что конкурентность в Go — это boilerplate. Мы как разработчики и проектировщики систем должны задать себе вопрос: зачем мы хотим внедрить конкурентность в продакшен-код? И действительно ли она нужна нам для решения конкретной задачи?

Зачем нужна конкурентность

Даже если ты Go-разработчик, который ни разу не свитчил контекст, не создавал свои каналы и ни разу не писал go func, то, скорее всего, ты уже использовал конкурентность. Только в ее скрытом виде — например, в http-сервере.

http-сервер → Скрытое использование

Когда мы пишем хендлеры, то на подкорке понимаем: они будут обрабатываться конкурентно. Но при этом не перегружаем свое внимание этой конкурентностью — просто воспринимаем ее как нечто само собой разумеющееся. Это и есть классная реализация: конкурентность есть, но скрытая.

ticker → Простейшее использование

Другой пример — ticker из пакета time. Он знаком каждому, кто читал хотя бы пару книг по Go. Мы используем его, чтобы реализовать внутри кода Cron-джобы, которые будут выполняться через определенные промежутки времени. Почему мы не сопротивляемся его использованию? Потому что это просто, понятно и мы этому доверяем.

errorgroup → Оптимизационное использование

Третий кейс — errorgroup. Они появились сравнительно недавно, но отлично иллюстрируют мою мысль. Допустим, есть пять независимых сетевых запросов. Гонять их последовательно нерационально, это трата ресурсов системы. А errorgroup — удобный инструмент для доступа к конкурентному выполнению.

Мой пойнт: давайте учиться у лучших и строить конкурентный продакшен-код на тех же принципах, что уже заложены в стандартной библиотеке Go.

Настал час писать for { select { ... } }

Теперь о том, какие требования можно предъявить к конкурентному коду.

Отделение от бизнес-логики

Первое и самое важное — конкурентный код должен быть отделен от бизнес-логики. В идеале его использование должно напоминать работу с http-сервером, когда реализация скрыта.

Атомарность функций

Далее мы хотим взять лучшее от тикера: это должны быть простые, функциональные, конкурентные блоки с понятными названиями. Чтобы разработчик, открывший код, сразу понял, что они делают. У каждой функции четкая задача и понятный сценарий использования.

Выдерживаем уровень абстракции

Следующий шаг — объединение этих атомарных блоков. Они должны стыковаться между собой в соответствии с гайдлайнами проекта — в слой микроархитектуры, в котором мы описываем, как именно взаимодействуют части системы.

Простота и гибкость

Если блоки будут гибкими и универсальными, другие разработчики, ранее знакомые с кодом, смогут их переиспользовать, не тратя время на погружение заново.

Доверие за счет тестирования

Чтобы уменьшить страх использования конкурентного кода, мы покрываем его тестами: ищем утечки горутин, проверяем на дедлоки и прочие проблемы конкурентности. Чем больше тестов, тем лучше, но даже базовое покрытие уже дает уверенность.

Эти тезисы можно встретить, в принципе, в каждой второй книге по программированию. Посмотрим, как это выглядит на практике.

Утилитарные блоки

У нас есть пул конкурентных паттернов языка Go. Они сослужат отличную службу и станут базой для построения конкурентной архитектуры и удобной работы с каналами. Что их отличает — это похожая сигнатура функции. Это метод, который на вход принимает один канал, что-то делает и отдает результат в другой. 

Знание паттернов — это мультитул разработчика конкурентных систем. В идеале хочется, чтобы мы доставали нужный паттерн буквально на автомате: увидели задачу — вспомнили нужный блок — вставили. Достаточно хотя бы знать, что такие паттерны существуют, а уже потом отточим, как и где их применять.

Давайте посмотрим на основные из них.

Конвертер (Pipeline)

func Pipeline[IN, OUT any](fn func(IN) OUT) func(<-chan IN) <-chan OUT

Паттерн конвертер, он же pipeline, работает как переходник. По его сигнатуре видно, что он принимает какую-то функцию конвертации одного объекта в другой, например из типа A в тип B. Далее мы загоняем в него канал типа A, и он возвращает канал типа B один к одному. Это удобный переходник нашей микроархитектуры.

FanIn / FanOut

func FanIn[T any](streams ...<-chan T) <-chan T

Группа паттернов FanIn / FanOut — антонимы. FanIn собирает слайс однотипных каналов в один. FanOut, наоборот, берет один канал и распределяет значения по нескольким — например, с помощью Round-robin.

Батчирование

func Batch[T any](ch <-chan T, size int) <-chan []T

Паттерн группировки значений в каналах в качестве аргументов принимает канал с типом T, размер (size), на который будет группировать, и возвращает канал типа слайса из T. Если size = 256, то на каждые 256 значений входного канала мы получим одно значение слайса размером 256 значений исходящего канала.

Я не привожу реализации, потому что для каждого проекта они будут разными. Паттерны можно комбинировать в более сложные структуры, например в воркерпул.

Воркерпул

func Parallel[IN, OUT any](stream <-chan IN, fn func(IN) OUT, count int) <-chan OUT

Выше приведена простейшая реализация. В функцию Parallel мы передаем:

  • канал входных данных;

  • функцию обработки;

  • количество воркеров count.

На выходе — канал с результатами.

Не перегружая свое когнитивное окно, мы понимаем: эта функция будет выполняться, предположим, на пяти воркерах и у нас будет приятный интерфейс взаимодействия с ней через тот же канал.

Это все утилитарные паттерны, которые, как правило, не делают никакой настоящей работы. Когда мы переходим к работе с настоящими системами, они обычно не заточены под канальное взаимодействие.

Выход в последовательный мир

Когда мы внедряем конкурентность, сталкиваемся с тем, что не все подсистемы к ней готовы. Клиент базы, например, ожидает простого вызова методов, а не взаимодействие через каналы. Поэтому, если мы внедряем конкурентность, нам нужно адаптировать подсистемы, с которыми мы работаем, — делать их совместимыми с каналами. Это добавляет сложности: нужно глубже погружаться в работу подсистемы и понимать принципы ее работы.

Вот несколько параметров, которые часто оказываются критичными.

Характер нагрузки: CPU/Network

Нам может быть интересен характер нагрузки системы, с которой работаем. Если это процессорная нагрузка, мы, скорее всего, захотим использовать параллелизм и выполнять задачу на разных ядрах системы. Если это нагрузка input-output, то достаточно конкурентности, которая не даст ресурсам простаивать.

Соотношение чтения/записи (R/W)

При работе с базой данных важно понимать, какая нагрузка преобладает: чтение или запись. Например, в случае высокой записи нужно продумать, как грамотно оптимизироваться под такую нагрузку.

Время обработки операции

Далее на систему и построенную микроархитектуру мы будем писать имитационные бенчмарки. Чтобы сделать это правильно, нужно знать, сколько в среднем занимает операция. Также это поможет лучше понять особенности системы.

Все это я говорю вот зачем: внедряя конкурентность, мы должны гораздо глубже разбираться в задаче и системе, с которыми работаем. Конкурентность требует более высокого уровня понимания. Это важно учитывать.

Дальше посмотрим на разные хаки при работе с такими системами.

Разделение на уровне данных

Задача:

Наш KISS-кэш перешел в состояние техдолга — начал тормозить.

Итак, у нас есть простой кэш, написанный год назад и, возможно, даже не нами. В какой-то момент он начинает тормозить. Приходит продакт и говорит, что нужно чинить.

В этом случае начинаем с анализа — смотрим, как все устроено.

type sMap struct {
	mu *sync.RWMutex
	m  map[int]struct{}
}

func (s *sMap) Set(key int) {
	s.mu.Lock()
	defer s.mu.Unlock()

	s.m[key] = struct{}{}
}

Перед нами классическая структура: Mutex + map. Ничего нового, действительно KISS.
Выдвигаем гипотезу, почему система тормозит. Смотрим на профиль нагрузки, анализируем характеристики и видим:

  • мапа большая;

  • нагрузка на запись превышает 80% от общего количества операций.

Переходим к методу Set — все по классике: блокируется Mutex и значение пишется в map. Ничего необычного.

И тут рождается гипотеза: бутылочным горлышком становится именно Mutex. Самое простое решение в этом случае — расширить бутылочное горлышко за счет шардирования структуры.

Сейчас у нас есть структура sMap. Мы объявим эту структуру в качестве слайса []sMap в шардированной мапе. Количество шардов определяется шард-каунтом — пусть будет 6 для примера.

Теперь, чтобы записать значение в эту мапу, определим номер шарда как хэш ключа, по остатку разделенный на количество шардов: hash(key) % s.shardCount.

type shardedMap struct {
	m          []sMap
	shardCount int
}

func (s *shardedMap) Set(key int) {
	shardNum := hash(key) % s.shardCount
	shard := s.m[shardNum]
  
	shard.mu.Lock()
	defer shard.mu.Unlock()

	shard.m[key] = struct{}{}
}

Получим номер шарда, достанем нужную sMap из слайса, которая организована ровно так же, как и старая, и заблокируем Mutex конкретно этого шарда. Таким образом снимем с него нагрузку.

Это наша гипотеза. Проверим ее бенчмарками, которые я называю имитационными, потому что они описывают систему, в рамках которой мы оптимизируем профиль нагрузки и прочие моменты.

Они покажут, что шардированная мапа работает примерно в два раза быстрее, чем предыдущая структура. Хак сработал, задача решена.

Дальше можно пойти в сторону оптимизации количества шардов. Это уже вопрос настройки, как stable-тесты: нужно точнее воспроизвести условия продакшена.

Проверка перед блокировкой

Задача:

Синхронизация кэша с редко меняющимися значениями

Есть кэш, в котором значения почти не меняются, но изменения мы должны отслеживать практически сразу. То есть проверка происходит постоянно. Как это выглядит в коде:

func LockCheck(key string) {
    mu.Lock()
    defer mu.Unlock()

    if _, ok := check(key); ok {
        return
    }

    do()
    set(key)

    return
}

Мы вызываем функцию LockCheck, чтобы проверить значение. Первое, что она делает, это лочит Mutex — mu.Lock(). После блокировки — проверка значения, соответствует ли оно ожиданиям. Если да, мы Defer’ом разблокируем Mutex и выходим. А если нет, выполняем нужную работу, выставляем ключ и выходим также с разблокировкой Mutex.

Нетрудно заметить, что тут есть проблема. Блокировка Mutex происходит зря, когда мы выходим из чека с помощью return. Нужно улучшить код, для этого добавим предварительную проверку под RLock:

func CheckLockCheck(key string) {
    rwmu.RLock()
    _, ok := check(key)
    rwmu.RUnlock()
    if ok {
        return
    }

    rwmu.Lock()
    defer rwmu.Unlock()
  
    if _, ok := check(key); ok {
        return
    }

    do()
    set(key)

    return
}

Мы заменили обычный Mutex на RWMutex — теперь в коде две проверки. Первая проверка происходит под RLock. Если значение не устроило те самые редкие операции обновления, захватываем Lock, чтобы обновить, с повторной проверкой внутри, чтобы не было гонки, и делаем ту самую работу, которую делали раньше.

Отлично. Мы выдвинули гипотезу, оптимизировали код. Следующим шагом проверим гипотезу с имитационным бенчмарком.

Бенчмарк говорит, что первая, более простая версия, работает в два раза быстрее.

Вывод:

Наше оптимизированное решение хуже текущего.

Это удивляет. Мы ведь сократили число блокировок — что пошло не так? Вариантов несколько:

  • Некорректно задали условия работы. Проблема может быть в том, что мы неправильно написали имитационные бенчмарки и они не то имитируют.

  • Гипотеза изначально неверна. Возможно, Mutex в этом месте вообще не был проблемой. Мы оптимизировали то, что не мешало.

  • Мы переусложнили без необходимости. По сути, это классический Overengineering. Мы усложнили код ради мнимой пользы — и в итоге навредили.

Получается, даже хорошие паттерны проектирования могут сделать хуже. Если мы внедряем их, а они усложняют код, то гипотезу об оптимизации нужно проверить. Интуиция может нас сильно подвести, поэтому всегда проверяйте гипотезу.

Мы посмотрели работу с мапоподобными кэш-структурами, а теперь разберем, как работать с интенсивной процессорной нагрузкой.

Разделение на уровне работы

Задача:

Ускорить работу однопоточной программы по поиску простых чисел.

Есть программа, которая считает простые числа, например числа Мерсена. Мы хотим ускорить ее работу. Заглядываем в код и видим, что она выполняется в один поток. Очевидный шаг — попробовать многопоточное выполнение, а многопоточные алгоритмы для расчета простых чисел ускорят работу программы.

Верхнеуровнево логика такая:

Есть поток чисел. Есть Black box, который определяет, является ли число простым. И выходной поток — простые числа, прошедшие через эту проверку. Хочется добавить сюда параллелизм. Поэтому добавляем воркерпул.

Теперь поток любых чисел будет запускаться в паттерн FanOut, который возвращает слайс каналов по размеру воркеров. Этот слайс загружается в воркеры, которые будут производить те самые расчеты простых чисел и отдавать слайс каналов. Эти результаты объединяем обратно в один канал с помощью паттерна FanIn.

Какое количество воркеров использовать?

Хороший старт, если в системе нет другой большой работы, — использовать значение runtime.GOMAXPROCS(0). И дальше смотреть, будет ли снижаться количество воркеров, эмпирическим путем в рамках системы. Если проблем нет, оставляем, если проблемы есть, то понижаем их количество.

Код реализации последней блок-схемы будет выглядеть так:

func Parallel[IN, OUT any](stream <-chan IN, fn func(IN) OUT, count int) <-chan OUT {
    processStreams := make([]<-chan OUT, count)
    for i := range count {
        processStreams[i] = Pipeline(fn)(stream)
    }

    return FanIn(processStreams...)
}

Верхнеуровневое взаимодействие нужно описывать кодом. Несмотря на то, что мы реализуем воркерпул и это конкурентность, в коде нет атрибутов конкурентного кода. Только два паттерна:

  • Pipeline, куда мы передаем функцию fn для расчета простых чисел;

  • и FanIn, который собирает все каналы результатов в один и возвращает конечному пользователю.

После рассмотрения кода и имплементации архитектуры проверяем гипотезу, чтобы узнать, получили ли мы ускорение. Запускаем имитационные бенчмарки и видим:

В этом случае получили почти двукратное ускорение и подтвердили гипотезу.

В примерах выше мы рассматривали достаточно атомарные задачи. В реальности они встречаются нечасто. Дальше покажу пример, который вы реально сможете вспомнить, когда столкнетесь с чем-то похожим.

Микроархитектурный кейс

Задача:

В систему поступают задания. 

Выполнение заданий — CPU-intensive.

Результат обработки пишем в базу данных.

Допустим, у нас есть система, в которую поступают задания. Эти задания требуют процессорного времени и дают CPU-нагрузку. После обработки результат нужно записать в базу данных.

На верхнем уровне архитектура будет выглядеть так:

Архитектура бизнес-логики и наш код должны на нее откликаться, то есть выглядеть так:

  • канал Task — входящие задания;

  • черный ящик, который производит CPU-обработку и возвращает канал Result;

  • Result идет дальше — в систему, которая записывает данные удаленно.

Способы оптимизации этой схемы перекликаются с предыдущим кейсом. Мы можем сделать параллельную обработку заданий, к тому же писать в базу батчами и группировать данные. Это особенно критично, если в нашей системе они порождаются очень часто.

Теперь блок-схема будет выглядеть так:

Канал заданий обрабатывается через блок параллельной обработки, порождает канал Result. Канал Result пропускается через паттерн группировки, и формируются слайсы из Result. Дальше происходит запись в базу.

Реализация в коде выглядит так:

func Benchmark_AddBatchAndWorkers(b *testing.B) {
    const maxBatchSize = 256
    const workerCount = 10

    dataStream := genData(b.N)
    resultStream := co.Parallel(dataStream, process, workerCount)
    batchStream := co.Batch(resultStream, maxBatchSize)

    b.ResetTimer()
    for vs := range batchStream {
        update(vs...)
    }
}

Нейминг здесь отличается специально — чтобы, читая код, разработчик визуализировал блок-схему. Это код взаимодействия, не просто логика — как блоки системы общаются между собой.

Что здесь происходит:

  • dataStream — канал с потоком входных заданий.

  • co.Parallel — паттерн для параллельной обработки:
    Мы передаем:

    • канал с заданиями;

    • функцию process, имеющую сигнатуру превращения типа канала dataStream в тип результата;

    • количество воркеров, выполняющих обработку.

  • На выходе получаем поток результатов — resultStream.

  • co.Batch — паттерн для группировки. Мы превращаем поток результатов в поток слайсов результатов ([]Result) с заданным размером батча (maxBatchSize).

Я не показываю реализации методов батчирования, потому что каждая реализация — это изобретение под конкретный проект. В этом батчировании можно сделать так, чтобы через секунду, если у нас не набрался батч, мы отдавали бы в базу данных. А в другом проекте в батчировании мы хотим строго 256. Это можно решить только кастомизацией.

Далее берем батчи и пишем в базу данных функции update().

Теперь нужно проверить гипотезу о том, что данные оптимизации в нашем конкретном кейсе нам помогли. Для этого пишем имитационные бенчмарки:

  • без добавления оптимизации;

  • только с добавлением группировки;

  • только с добавлением воркеров;

  • с добавлением обеих оптимизаций.

Видим, что каждая оптимизация дает ощутимый прирост производительности. Их комбинация — тоже, но уже не так ощутимо. Можно задаться вопросом: почему комбинация дает только минус 84 наносекунды? Ответ в деталях конкретной системы. Почему-то именно эта система при имитации дает такие результаты, и мы в целом им верим.

Фреймворк для разработки конкурентных приложений

Ранее я уже упоминал фреймворк — подход к проектированию конкурентных решений. Сейчас самое время подвести итоги и сформулировать ключевые выводы.

Тип проблемы

Начинаем с проблемы, которую нужно решить. Она может быть связана с оптимизацией. Но прежде чем браться за код, важно определить тип проблемы:

  • I/O-bound. Если это сетевая или input/output-нагрузка и мы хотим ее оптимизировать, скорее всего, нам подойдет конкурентность в чистом виде. И асинхронное выполнение позволяет избежать простаивания ресурсов.

  • CPU-bound. Если же основное ограничение — это процессорная нагрузка, то здесь мы говорим уже о параллелизме. В таких случаях используются воркерпулы (worker pools) и особое внимание уделяется железу, на котором гоняем код.

Тестирование и бенчмаркинг

Когда мы переходим к реализации, важно двигаться сверху вниз — начиная с конкурентных блоков и заканчивая общей архитектурой их взаимодействия.

Конкурентные методы

Мы должны понять, какие конкурентные блоки для взаимодействия нам нужно написать или найти в проекте, если мы уже давно следуем этому пути, и покрыть их конкурентными тестами. Я имею в виду тесты, которые будут проверять работу кода в асинхронной среде. Они помогут развить доверие к этому коду за счет того, что мы будем знать, нет ли утечек горутин или дедлоков, как ведет себя функция при множественных одновременных вызовах.

Взаимодействие блоков

Далее объединяем эти строительные блоки в слой абстракции — в конкурентную микроархитектуру, где будем их соединять различными методами. Важно написать на эту микроархитектуру бенчмарки, чтобы проверить гипотезу и убедиться, что код работает корректно и делает то, что ожидаем. Если на этом этапе найдем проблемы, то за счет предыдущего тестирования будет гораздо проще локализовывать и дебажить код.

Слои абстракции

Чтобы писать конкурентный код, который приятно читать и удобно поддерживать, важно разделять три слоя абстракции. Причем конкурентную логику нужно максимально изолировать.

1. Бизнес-логика

Самый верхний слой. Он должен быть синхронным и последовательным.

2. Конкурентная микроархитектура

Конкурентная микроархитектура должна объяснять разработчикам, как устроено взаимодействие. Здесь разработчик должен представлять себе блок-схему. Никаких деталей реализации и for { select { ... } } не нужно. Они будут на следующем уровне абстракции.

3. Реализация отдельных блоков

На этом слое уже пишем запутанный конкурентный код, но он будет изолирован и атомарен. Мы читаем одну такую функцию — да, может быть, немного нагружаем когнитивное окно. Но оно ограничено, и его хватает, чтобы понять, что делает эта часть.

Итак, если у нас будет три слоя абстракции, то наш конкурентный код в продакшене будет приятно читаемым и приятно поддерживаемым, а также будет следовать лучшим практикам стандартной библиотеки языка Go.