Конкурентность — одна из самых мощных возможностей Go, и её освоение критически важно для создания масштабируемых и эффективных приложений. В этой статье мы рассмотрим 7 шаблонов конкурентности в Go, которые помогут вам писать надёжный код.
1. Пул воркеров
Описание: Пул воркеров создаёт фиксированное количество горутин, которые обрабатывают задачи из общей очереди. Этот шаблон полезен для управления количеством одновременно выполняемых задач и оптимизации использования ресурсов.
package main import ( "fmt" "sync" "time" ) func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) { defer wg.Done() for job := range jobs { fmt.Printf("Воркер %d начал задачу %d\n", id, job) time.Sleep(time.Second) fmt.Printf("Воркер %d завершил задачу %d\n", id, job) results <- job * 2 } } func main() { const numJobs = 5 const numWorkers = 3 jobs := make(chan int, numJobs) results := make(chan int, numJobs) var wg sync.WaitGroup for i := 1; i <= numWorkers; i++ { wg.Add(1) go worker(i, jobs, results, &wg) } for j := 1; j <= numJobs; j++ { jobs <- j } close(jobs) wg.Wait() close(results) for result := range results { fmt.Println("Результат:", result) } }
Реальный сценарий: Веб-сервер, обрабатывающий входящие HTTP-запросы, где каждый запрос обрабатывается воркером из пула.
2. Fan-Out / Fan-In
Описание: Fan-Out запускает несколько горутин для параллельной обработки данных, а Fan-In собирает результаты в единый канал. Этот шаблон полезен для параллельной обработки с последующей агрегацией.
package main import ( "fmt" "sync" ) func producer(id int, ch chan<- int, wg *sync.WaitGroup) { defer wg.Done() for i := 0; i < 5; i++ { ch <- i fmt.Printf("Производитель %d создал %d\n", id, i) } } func consumer(id int, in <-chan int, out chan<- int, wg *sync.WaitGroup) { defer wg.Done() for v := range in { out <- v * 2 fmt.Printf("Потребитель %d обработал %d\n", id, v) } } func main() { numProducers := 2 numConsumers := 2 input := make(chan int, 10) output := make(chan int, 10) var wg sync.WaitGroup for i := 1; i <= numProducers; i++ { wg.Add(1) go producer(i, input, &wg) } wg.Wait() close(input) for i := 1; i <= numConsumers; i++ { wg.Add(1) go consumer(i, input, output, &wg) } wg.Wait() close(output) for result := range output { fmt.Println("Результат:", result) } }
Реальный сценарий: Конвейер обработки данных, где разные этапы выполняются разными наборами воркеров.
3. Пайплайн
Описание: Пайплайн объединяет несколько этапов обработки, где каждый этап выполняет преобразование данных и передаёт их следующему этапу. Подходит для последовательной обработки данных.
package main import "fmt" func stage1(nums []int) <-chan int { out := make(chan int) go func() { for _, n := range nums { out <- n } close(out) }() return out } func stage2(in <-chan int) <-chan int { out := make(chan int) go func() { for n := range in { out <- n * 2 } close(out) }() return out } func stage3(in <-chan int) <-chan int { out := make(chan int) go func() { for n := range in { out <- n + 1 } close(out) }() return out } func main() { nums := []int{1, 2, 3, 4, 5} c1 := stage1(nums) c2 := stage2(c1) c3 := stage3(c2) for result := range c3 { fmt.Println(result) } }
Реальный сценарий: Система обработки изображений, где изображение проходит через этапы масштабирования, фильтрации и кодирования.
4. Публикация-Подписка
Описание: Шаблон "Публикация-Подписка" позволяет публиковать сообщения для нескольких подписчиков. Полезен в системах, где разные сервисы должны независимо реагировать на события.
package main import ( "fmt" "sync" "time" ) type PubSub struct { mu sync.Mutex channels map[string][]chan string } func NewPubSub() *PubSub { return &PubSub{ channels: make(map[string][]chan string), } } func (ps *PubSub) Subscribe(topic string) <-chan string { ch := make(chan string) ps.mu.Lock() ps.channels[topic] = append(ps.channels[topic], ch) ps.mu.Unlock() return ch } func (ps *PubSub) Publish(topic, msg string) { ps.mu.Lock() for _, ch := range ps.channels[topic] { ch <- msg } ps.mu.Unlock() } func (ps *PubSub) Close(topic string) { ps.mu.Lock() for _, ch := range ps.channels[topic] { close(ch) } ps.mu.Unlock() } func main() { ps := NewPubSub() subscriber1 := ps.Subscribe("news") subscriber2 := ps.Subscribe("news") var wg sync.WaitGroup wg.Add(2) go func() { defer wg.Done() for msg := range subscriber1 { fmt.Println("Подписчик 1 получил:", msg) } }() go func() { defer wg.Done() for msg := range subscriber2 { fmt.Println("Подписчик 2 получил:", msg) } }() ps.Publish("news", "Срочные новости!") ps.Publish("news", "Ещё новости!") time.Sleep(time.Second) ps.Close("news") wg.Wait() }
Реальный сценарий: Система обмена сообщениями, где сервисы подписываются на определённые типы событий.
5. Select с таймаутом
Описание: Использование select с таймаутом позволяет избежать бесконечных блокировок. Полезно, когда нужно выполнить действие или прервать операцию, если она занимает слишком много времени.
package main import ( "fmt" "time" ) func main() { c := make(chan string) go func() { time.Sleep(2 * time.Second) c <- "результат" }() select { case res := <-c: fmt.Println("Получено:", res) case <-time.After(1 * time.Second): fmt.Println("Таймаут") } }
Реальный сценарий: Сетевой клиент, который пытается подключиться к серверу и останавливается, если сервер не отвечает вовремя.
6. Семафор
Описание: Семафор ограничивает количество горутин, которые могут одновременно обращаться к ресурсу. Полезен для управления конкурентностью и предотвращения перегрузки ресурсов.
package main import ( "fmt" "sync" "time" ) func worker(id int, sem chan struct{}, wg *sync.WaitGroup) { defer wg.Done() sem <- struct{}{} // Захват семафора fmt.Printf("Воркер %d начал\n", id) time.Sleep(time.Second) fmt.Printf("Воркер %d завершил\n", id) <-sem // Освобождение семафора } func main() { const numWorkers = 5 const maxConcurrent = 2 sem := make(chan struct{}, maxConcurrent) var wg sync.WaitGroup for i := 1; i <= numWorkers; i++ { wg.Add(1) go worker(i, sem, &wg) } wg.Wait() }
Реальный сценарий: Пул подключений к базе данных, где одновременно допускается ограниченное количество подключений.
7. Ограничение частоты
Описание: Ограничение частоты управляет скоростью обработки событий с помощью тикера. Полезно, когда нужно контролировать частоту выполнения задач, например, запросов к API.
package main import ( "fmt" "time" ) func main() { rate := time.Second ticker := time.NewTicker(rate) defer ticker.Stop() requests := make(chan int, 5) for i := 1; i <= 5; i++ { requests <- i } close(requests) for req := range requests { <-ticker.C // Ожидание следующего тика fmt.Println("Обработка запроса", req) } }
Реальный сценарий: Шлюз API, ограничивающий количество запросов, которые пользователь может сделать за определённый период.
Заключение
Шаблоны конкурентности в Go необходимы для создания эффективных и масштабируемых приложений. Освоение этих шаблонов позволит вам эффективно управлять конкурентностью, оптимизировать использование ресурсов и повысить производительность ваших приложений.
