Паттерны конкурентности в Go. Подробный разбор. Часть 1. Worker pool

Паттерны конкурентности в Go. Подробный разбор. Часть 2. Fan-Out/Fan-In

Pub/Sub

Pub/sub, сокращение от «publish-subscribe» (публикация-подписка), — это паттерн обмена сообщениями между различными частями приложения. Прелесть такого подхода в том, что отправитель сообщения не знает о получателе, а получатель не знает об отправителе. Оба они знают только об одном посреднике — брокере сообщений. Отправитель публикует сообщение, а получатель подписывается на получение сообщений. В итоге это кардинально уменьшает связанность между частями приложения и неминуемую головную боль ею вызванную.

Пример:
В приложении нужно по расписанию выполнять некие действия - например рекламные рассылки или какое-нибудь закрытие дня раз в сутки. Вы реализуете сервис плнировщика задач, при старте задаете ему расписание, по которому он публикует некие уведомления. А потом в любом месте приложения просто подписываетесь на выбранные уведомления и выполняете нужные действия при их получении. Это просто, это удобно, это гибко, это легко масштабировать. Это Pub/Sub.

В Go паттерн Pub/Sub можно реализовать несколькими способами. Один из самых простых и эффективных — использование каналов. Каналы в Go являются потокобезопасными и позволяют передавать данные между горутинами максимально простым способом. Собственно, известные брокеры сообщений (RabbitMQ, Kafka и т.д.) - это предельная реализация паттерна Pub/Sub для обмена между приложениями (микросервисами). Мы же в данной статье рассмотрим, как реализовать Pub/Sub внутри приложения с использованием каналов.

Реализация

Как правило, сообщение издателя имеют два основных поля: тема (topic) и сообщение (message). Подписчики могут подписываться на определенные темы и получать все сообщения, опубликованные в этих темах. В нашем примере структура сообщения представлены типом topics map[string][]chan any. Мапа в данном случае идеально подходит. Ключом мапы является тема, а значением - срез каналов (подписчиков), по которым будем рассылать произвольные данные. В момент подписки для нового подписчика создается канал, который добавляется в срез каналов для соответствующей темы.

Notifier

Начнем с минимальной струтуры брокера сообщений:

type Notifier struct {
	topics map[string][]chan any
}

func (n *Notifier) Subscribe(topic string, handler func(any)) {
	ch := make(chan any)
    n.topics[topic] = append(n.topics[topic], ch)

	go func() {
		for msg := range ch {
			handler(msg)
		}
	}()
}

func (n *Notifier) Publish(topic string, message any) {
	for _, ch := range n.topics[topic] {
		ch <- message
	}	
}

При подписке мы передаем топик, на который подписываемся и функцию-хэндлер, которая будет вызвана при получении сообщения. У функций есть параметр any, позволяющий передавать произвольные данные сообщения. Внутри метода создается канал, который добавляется в срез каналов для соответствующей темы. Если тема еще не существует, она будет создана мапой автоматически. Затем запускается горутина, которая будет ждать сообщения из канала и вызывать обработчик для каждого сообщения. Горутина завершится, когда канал будет закрыт.

При публикации, мы сначала получаем срез подписчиков (каналов) для темы, а затем отправляем сообщение во все каналы.

Что тут не так, чего не хватает? Ну как минимум - нет инициализации мапы. После объявления карты ее значение по умолчанию равно nil. Чтение из такой карты возвращает пустой результат, но попытка записи в такую карту вызывает ошибку времени выполнения. Добавим:

func NewNotifier() *Notifier {
	return &Notifier{
		topics: make(map[string][]chan any),
	}
}

Далее, в коде есть доступ к общему ресурсу из нескольких горутин без синхронизации, что приводит к data race и непредсказуемому поведению. Общий ресурс в данном случае - это мапа topics.
Добавим мьютекс:

type Notifier struct {
	topics map[string][]chan any
	mu     sync.Mutex
}

func (n *Notifier) Subscribe(topic string, handler func(any)) {
	ch := make(chan any)
	
	n.mu.Lock()
    n.topics[topic] = append(n.topics[topic], ch)
	n.mu.Unlock()

	go func() {
		for msg := range ch {
			handler(msg)
		}
	}()
}

func (n *Notifier) Publish(topic string, message any) {
	n.mu.Lock()
	chnls := n.topics[topic]
	n.mu.Unlock()

	for _, ch := range chnls {
		ch <- message
	}	
}

Выглядит неплохо на первый взляд. При подписке мы захватываем мьютекс, добавляем канал в мапу, и сразу же отпускаем. При публикации мы перед получаением среза подписчиков (каналов) для темы опять захватываем мьютекс, после получения среза - отпускаем. Пока мьютекс захвачен, код может выполняться только одной горутиной. Таким образом, наш код гарантирует, что мапа не будет одновременно читаться и записываться разными горутинами. Будто бы мы сделали всё правильно. И тем не менее, в этом коде зарыта потенциальная проблема. Чтобы понять, в чем она состоит, сначала вспомним, как устроены срезы в Go:

type slice struct {
    ptr *array
    len int
    cap int
}

Когда выполняется chnls := n.topics[topic] копируется заголовок среза, но не сам массив. Это значит, что chnls и n.topics[topic] указывают на один и тот же массив. Что тут может пойти не так? Пример:

  1. Горутина A (Publish) читает chnls в цикле.

  2. Горутина B (Subscribe) одновременно делает append в тот же массив.

Если при добавлении capasity среза не хватает, то append создаёт новый массив, копирует в него данные из старого массива, добавляет новый элемент и возвращает новый срез. При этом старый массив не модифицируется, поэтому конкурентного доступа к одной и той же памяти не происходит.

Если же при добавлении срезу хватает capacity, то append модифицирует тот же массив, который в данный момент читается другой горутиной, что может привести к неопределённому поведению при итерации - data race. Читать и изменять не атомарно одни и те же данные из разных потоков нельзя.

Самое простое решение - держать мютекс захваченным на время итерации по срезу в методе Publish. Но это существенно снизит производительность, Publish будет блокировать Subscribe и наоборот.

Давайте это поправим иначе:

func (n *Notifier) Subscribe(topic string, handler func(any)) {
	ch := make(chan any)
	
	n.mu.Lock()
	oldChnls := n.topics[topic]

	// copy-on-write to avoid data races with concurrent readers
	newChnls := make([]chan any, len(oldChnls)+1)
	copy(newChnls, oldChnls)
	newChnls[len(oldChnls)] = ch

	n.topics[topic] = newChnls
	n.mu.Unlock()

	go func() {
		for msg := range ch {
			handler(msg)
		}
	}()
}

Теперь при добавлении элемента в срез всегда создаётся новый массив, который копируется из старого, и только после этого новый массив присваивается n.topics[topic]. Это гарантирует, что читающие горутины не будут читать массив, который в данный момент модифицируется, поскольку это уже разные массивы. Такой прием называется copy-on-write.

Для начала полноценного использования нашего кода нам необходим метод корректной финализации нашего Notifier, который будет закрывать все каналы и освобождать ресурсы. Добавим метод Close. Метод должен вызываться в конце работы приложения. После его вызова, использовать Notifier уже будет нельзя.

type Notifier struct {
	topics map[string][]chan any
	mu     sync.Mutex
	closed     bool
}

func (n *Notifier) Close() {
	n.mu.Lock()
	defer n.mu.Unlock()

	if n.closed {
		return
	}

	for _, chs := range n.topics {
		for _, ch := range chs {
			close(ch)
		}
	}

	n.topics = nil
	n.closed = true
}

func (n *Notifier) Subscribe(topic string, handler func(any)) error {
	if n.closed {
		return fmt.Errorf("notifier is closed")
	}

	ch := make(chan any)
	
	n.mu.Lock()
	oldChnls := n.topics[topic]

	// copy-on-write to avoid data races with concurrent readers
	newChnls := make([]chan any, len(oldChnls)+1)
	copy(newChnls, oldChnls)
	newChnls[len(oldChnls)] = ch

	n.topics[topic] = newChnls
	n.mu.Unlock()

	go func() {
		for msg := range ch {
			handler(msg)
		}
	}()

	return nil
}

func (n *Notifier) Publish(topic string, message any) error {
	n.mu.Lock()
	if n.closed {
		n.mu.Unlock()
		return fmt.Errorf("notifier is closed")
	}

	chnls := n.topics[topic]
	n.mu.Unlock()

	for _, ch := range chnls {
		ch <- message
	}
	
	return nil
}

В принципе, текущую реализацию уже можно считать неким минимальным MVP, иллюстрирующим использование паттерна Pub/Sub.

Что можно было бы добавить к коду Notifier, чтобы получить из него полноценную библиотеку?

  1. В первую очередь не хватает метода Unsubscribe, который позволит отписаться от топика. Хотя в реальных кейсах отписка используется редко, но для полноценной библиотеки это было бы логично.

  2. Главным недостатком данной реализации является использование одной простейшей стратегии отправки сообщений подписчикам.

     for _, ch := range chnls {
     	ch <- message
     }
    

    При публикации, мы последовательно отправляем сообщение в каждый канал. Поскольку мы используем небуферизированные каналы, нам приходится каждый раз ждать, пока подписчик не прочитает сообщение. Это может привести к задержкам в обработке сообщений, если хэндлер подписчика будет обрабатывать сообщение долго. Универсального способа решения этой проблемы нет. Есть несколько стратегий, у каждой из которых есть свои преимущества и недостатки. Первое и самое эффективное решение - использовать буферизированные каналы. Второе - ограничить время обработки сообщения подписчиком таймаутом. Вариантов много и их можно комбинировать для каждого конкретного случая. Подобные стратегии используются в работе и “взрослых” брокеров сообщений (NATS, Kafka и пр.).

  3. Не будет лишним добавить recovery в функцию хэндлера подписчика. Если хэндлер подписчика упадет с паникой, это приведет к падению горутины, которая читает канал.

Полный код готовой библиотеки, пригодной для полноценного использования доступен здесь.