Как стать автором
Обновить
588.3
OTUS
Цифровые навыки от ведущих экспертов

Fan-Out, Fan-In с динамической балансировкой горутин в Golang

Уровень сложностиПростой
Время на прочтение7 мин
Количество просмотров2.1K

Привет, Хабр!

Когда речь заходит о конкурентности в Go, паттерн Fan‑Out, Fan‑In — одна из важных концепций на мой взгляд. Он позволяет распределять задачи между несколькими горутинами и затем собирать результаты обратно в один поток.

Что такое Fan-Out, Fan-In?

Fan‑Out означает, что одна горутина отправляет задачи нескольким другим. Это позволяет распараллелить вычисления, что полезно при работе с IO‑операциями, загрузкой данных или обработкой запросов.

Fan‑In — это обратный процесс. Когда несколько параллельно работающих горутин отправляют свои результаты в один канал, из которого их читает главная горутина.

Базовая реализация Fan-Out, Fan-In

Простейший вариант — фиксированное количество воркеров.

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

// worker получает задачи из jobs и отправляет результаты в results.
func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
	defer wg.Done()
	for job := range jobs {
		time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) // Эмуляция работы
		fmt.Printf("Worker %d обработал задачу %d\n", id, job)
		results <- job * 2 // Возвращаем результат
	}
}

func main() {
	rand.Seed(time.Now().UnixNano())

	const numWorkers = 5
	const numJobs = 10

	jobs := make(chan int, numJobs)     // Канал для задач
	results := make(chan int, numJobs)  // Канал для результатов

	var wg sync.WaitGroup

	// Запускаем 5 воркеров
	for i := 0; i < numWorkers; i++ {
		wg.Add(1)
		go worker(i, jobs, results, &wg)
	}

	// Fan-Out: Отправляем задачи в канал
	for j := 0; j < numJobs; j++ {
		jobs <- j
	}
	close(jobs) // Закрываем канал, чтобы воркеры знали, что задач больше не будет

	// Ожидаем завершения всех воркеров
	go func() {
		wg.Wait()
		close(results) // Закрываем канал результатов
	}()

	// Fan-In: Собираем результаты
	for result := range results {
		fmt.Println("Результат:", result)
	}
}

Создаём N горутин‑воркеров, которые читают задачи из jobs. Каждый воркер выполняет свою работу и отправляет результат в results. Главная горутина читает результаты и завершает выполнение.

Жёстко заданное число воркеров (numWorkers = 5) создаёт проблему: если задач больше, очередь переполняется, а если меньше — воркеры простаивают. Неравномерная нагрузка усугубляет ситуацию — если одна из задач выполняется дольше, чем другие, баланс нарушается, а часть потоков просто ждёт. При этом система не умеет адаптироваться к изменяющейся нагрузке, так как количество воркеров фиксировано. Решение? Динамическая балансировка воркеров.

Динамическая балансировка воркеров

Чтобы улучшить производительность, нужно адаптивно управлять числом горутин. Если в очереди слишком много задач — добавляем воркеров. Если задач мало — сокращаем их число.

Реализация:

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"sync/atomic"
	"time"
)

// worker обрабатывает задачи и отправляет результаты
func worker(id int, jobs <-chan int, results chan<- int, activeWorkers *int32, wg *sync.WaitGroup) {
	defer wg.Done()

	for job := range jobs {
		time.Sleep(time.Duration(rand.Intn(200)) * time.Millisecond)
		fmt.Printf("Worker %d обработал задачу %d\n", id, job)
		results <- job * 2
	}

	atomic.AddInt32(activeWorkers, -1) // Уменьшаем счётчик активных воркеров
}

func main() {
	rand.Seed(time.Now().UnixNano())

	const numJobs = 50
	jobs := make(chan int, numJobs)
	results := make(chan int, numJobs)

	var wg sync.WaitGroup
	var activeWorkers int32 = 0

	// Горутина-менеджер следит за нагрузкой
	go func() {
		for {
			time.Sleep(500 * time.Millisecond) // Проверяем нагрузку каждые 500 мс

			// Если в очереди накопилось больше 5 задач, добавляем воркера
			if len(jobs) > 5 && atomic.LoadInt32(&activeWorkers) < 20 {
				wg.Add(1)
				atomic.AddInt32(&activeWorkers, 1)
				go worker(int(atomic.LoadInt32(&activeWorkers)), jobs, results, &activeWorkers, &wg)
			}
		}
	}()

	// Отправляем задачи
	for j := 0; j < numJobs; j++ {
		jobs <- j
	}
	close(jobs)

	// Ожидаем завершения всех воркеров
	go func() {
		wg.Wait()
		close(results)
	}()

	// Fan-In: собираем результаты
	for result := range results {
		fmt.Println("Результат:", result)
	}
}

Горутина отслеживает нагрузку в jobs и при увеличении числа задач динамически добавляет воркеров, обеспечивая равномерное распределение работы. Если задач становится мало, ненужные воркеры завершают выполнение, освобождая ресурсы. Число активных воркеров регулируется автоматически, без необходимости ручного вмешательства.


Где применять?

Обработка входящих HTTP-запросов с динамическим пулом воркеров

Веб‑серверы сталкиваются с проблемой переменной нагрузки. В часы пик они могут получать тысячи запросов в секунду, а ночью трафик падает. Если задать фиксированное число воркеров, сервер либо будет простаивать, либо не справится с нагрузкой.

Решение — использовать Fan‑Out, Fan‑In с динамической балансировкой. Когда количество запросов растёт, сервер создаёт дополнительные горутины для обработки. Когда запросов становится меньше — ненужные горутины завершают работу, освобождая ресурсы.

Пример кода:

package main

import (
	"fmt"
	"net/http"
	"sync"
	"sync/atomic"
	"time"
)

var activeWorkers int32

func worker(id int, requests <-chan *http.Request, wg *sync.WaitGroup) {
	defer wg.Done()
	for req := range requests {
		fmt.Printf("Worker %d обработал запрос %s\n", id, req.URL.Path)
		time.Sleep(200 * time.Millisecond) // Имитация обработки
	}
	atomic.AddInt32(&activeWorkers, -1)
}

func main() {
	requests := make(chan *http.Request, 100)
	var wg sync.WaitGroup

	go func() {
		for {
			time.Sleep(500 * time.Millisecond)
			if len(requests) > 10 && atomic.LoadInt32(&activeWorkers) < 50 {
				wg.Add(1)
				atomic.AddInt32(&activeWorkers, 1)
				go worker(int(atomic.LoadInt32(&activeWorkers)), requests, &wg)
			}
		}
	}()

	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
		requests <- r
		w.Write([]byte("Запрос принят\n"))
	})

	fmt.Println("Сервер запущен на порту 8080")
	http.ListenAndServe(":8080", nil)

	close(requests)
	wg.Wait()
}

Горутина‑менеджер следит за размером очереди requests. Если в очереди много запросов, сервер динамически создаёт воркеров. Если запросов мало — горутины завершают работу.

Обработка сообщений в очередях Kafka/RabbitMQ с динамическим числом воркеров

В системах на основе Kafka, RabbitMQ или SQS нагрузка на потребителей сообщений может меняться в зависимости от времени суток, маркетинговых акций и других факторов. Если запустить фиксированное количество обработчиков сообщений, система либо будет тратить ресурсы впустую, либо не справится с пиковыми нагрузками.

Решение — динамическое масштабирование потребителей. Когда в очереди много сообщений, система создаёт дополнительные обработчики. Когда сообщений становится меньше — ненужные воркеры завершаются.

Пример кода для потребления сообщений из Kafka с динамическим числом воркеров:

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"sync/atomic"
	"time"
)

var activeConsumers int32

func consumer(id int, messages <-chan string, wg *sync.WaitGroup) {
	defer wg.Done()
	for msg := range messages {
		fmt.Printf("Consumer %d обработал сообщение: %s\n", id, msg)
		time.Sleep(time.Duration(rand.Intn(300)) * time.Millisecond)
	}
	atomic.AddInt32(&activeConsumers, -1)
}

func main() {
	messages := make(chan string, 100)
	var wg sync.WaitGroup

	go func() {
		for {
			time.Sleep(500 * time.Millisecond)
			if len(messages) > 10 && atomic.LoadInt32(&activeConsumers) < 20 {
				wg.Add(1)
				atomic.AddInt32(&activeConsumers, 1)
				go consumer(int(atomic.LoadInt32(&activeConsumers)), messages, &wg)
			}
		}
	}()

	// Имитация поступления сообщений в очередь
	go func() {
		for i := 0; i < 100; i++ {
			messages <- fmt.Sprintf("Сообщение %d", i)
			time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
		}
		close(messages)
	}()

	wg.Wait()
}

Горутина‑менеджер следит за количеством сообщений в очереди. Если сообщений много, запускаются новые потребители. Когда сообщений становится меньше, лишние потребители завершают работу.

Массовая обработка файлов (логов, видео, изображений)

Когда нужно обработать большой массив файлов — например, преобразовать изображения, сжать видео или разбирать логи, — Fan‑Out, Fan‑In с динамической балансировкой становится идеальным решением.

Допустим, у нас есть папка с тысячами файлов, и мы не хотим перегружать систему запуском слишком большого количества процессов. Нам нужен адаптивный обработчик, который будет сам регулировать количество воркеров.

Пример кода:

package main

import (
	"fmt"
	"math/rand"
	"os"
	"sync"
	"sync/atomic"
	"time"
)

var activeWorkers int32

func processFile(id int, files <-chan string, wg *sync.WaitGroup) {
	defer wg.Done()
	for file := range files {
		fmt.Printf("Worker %d обрабатывает файл %s\n", id, file)
		time.Sleep(time.Duration(rand.Intn(300)) * time.Millisecond) // Имитация работы
	}
	atomic.AddInt32(&activeWorkers, -1)
}

func main() {
	files := make(chan string, 100)
	var wg sync.WaitGroup

	// Горутиина-менеджер следит за загрузкой
	go func() {
		for {
			time.Sleep(500 * time.Millisecond)
			if len(files) > 5 && atomic.LoadInt32(&activeWorkers) < 10 {
				wg.Add(1)
				atomic.AddInt32(&activeWorkers, 1)
				go processFile(int(atomic.LoadInt32(&activeWorkers)), files, &wg)
			}
		}
	}()

	// Имитация чтения файлов из папки
	go func() {
		for i := 0; i < 50; i++ {
			files <- fmt.Sprintf("file_%d.log", i)
			time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
		}
		close(files)
	}()

	wg.Wait()
}

Горутина следит за количеством файлов в очереди и добавляет воркеров при необходимости. Когда файлы заканчиваются, ненужные горутины автоматически завершаются.


Чтобы лучше понять механизмы распределённой обработки данных, которые применимы в динамическом балансировании воркеров, приходите на открытый урок 10 марта. Тема: «Системы обмена сообщениями: RabbitMQ и Kafka». Записаться

Все уроки по IT-архитектуре, программированию, тестированию и другим IT-направлениям смотрите в календаре.

Теги:
Хабы:
Всего голосов 6: ↑4 и ↓2+2
Комментарии8

Публикации

Информация

Сайт
otus.ru
Дата регистрации
Дата основания
Численность
101–200 человек
Местоположение
Россия
Представитель
OTUS