Как стать автором
Обновить

Разбираем паттерны конкурентности

Уровень сложностиПростой
Время на прочтение4 мин
Количество просмотров3.5K

База

Параллельность - выполнение задач в один момент времени на разных логических ядрах.
Конкурентность - выполнение задач последовательно, но со сменой контекста на другую задачу в ожидание завершения иной задачи. У пользователя может возникнуть иллюзия многозадачности даже в однопроцессорной системе, поскольку смена контекста происходит быстро (микросекунды).

Процессы:

  • Раздельная память

  • Раздельные ресурсы

  • Раздельные регистры

Потоки:

  • Общая память

  • Общие ресурсы

  • Раздельные стэк и регистры

Горутины:

  • Общая память

  • Общие ресурсы

  • Общий системный стэк

  • Общие регистры

Go runtime представляет модель P:M:G.
P - представляет логическое ядро процессора.
M - поток ОС по числу процессоров P.
G - структура, которая выполняет переданную функцию, создаётся по необходимости, минимум одна на старте программы (main). Стэк всего 2кб, может расширятся до 1гб для 64x и до 250кб для 32х систем.

Управление горутинами осуществляется планировщиком Go, а не ОС. Планировщик Go работает в пользовательском пространстве. Мы не можем напрямую управлять на каком процессоре будет исполняться горутина, за это отвечает планировщик.

Канал - очередь сообщений, которая умеет работать в многопоточной среде, работает по принципу FIFO.
Есть два типа каналов: буферизованный и небуферизованный.
Первый может хранить несколько сообщений, второй только одно.

Синхронизация

sync.WaitGroup - счётчик, который позволяет подождать завершения горутин.
sync.Mutex - блокирует доступ к ресурсу.
sync.RwMutex - разделяемая блокировка на чтение и запись. Читать могут несколько горутин, но мутировать данные только одна.
sync.Atomic - атомарная операция чтения и записи. Работает только с простыми значениями.
sync.Map - lock-free структура. Работает так же, как и обычная map, но потокобезопасная, можно использовать в многопоточной среде. Хорошо подходит для случаев, где надо много читать и мало писать. Если надо много писать, то лучше использовать обычную map и sync.RwMutex.

Небуферизованный канал

Действие

Открытый канал

Закрытый канал

Неинициализированный канал

Чтение

Блокировка до прихода писателя

Zero value

Блокировка навсегда

Запись

Блокировка до прихода читателя

Panic

Блокировка навсегда

Закрытие

Канал закроется

Panic

Panic

Буферизованный канал

Действие

Открытый и частично заполненный

Открытый и полностью заполненный

Открытый и пустой

Закрытый и частично заполненный

Чтение

Прочитаем значение

Прочитаем значение

Блокировка до прихода писателя

Прочитаем значение

Запись

Запишем значение

Блокировка до прихода читателя

Запишем значение

Panic

Ограничения канала

Действие

Канал только на чтение

Канал только на запись

Чтение

Ошибка компиляции

Запись

Ошибка компиляции

Закрытие

Ошибка компиляции

Важные правила

  • Закрывает канал тот, кто в него пишет.

  • Если пишет несколько продюсеров, то закрывает тот, кто создал продюсеров.

  • Не закрытый канал держит ресурсы. Закрывать надо явно.

Паттерны

Generator - микропаттерн, который наполняет канал. Закрываем канал, чтобы не было проблем.

func generator() <- chan int {
	ch := make(chant int)
	go func(){
		for i := 0; i <= 12; i++ {
			ch <- i + 1
		}
		close(ch)
	}()
	return ch
}

Wrapper - оборачиваем функцию, добавляя функциональность. Если вам что-то говорит слово декоратор, то это тот самый паттерн.

func wrapper(wg *sync.WaitGroup, fn func()) {  
    wg.Add(1)  
  
    go func() {  
       defer wg.Done()  
  
       fmt.Println("Work before func")  
       fn()  
       time.Sleep(1 * time.Second)  
       fmt.Println("Work after func")  
    }()  
}  
  
func main() {  
    var wg sync.WaitGroup  
  
    wrapper(&wg, func() {  
       time.Sleep(1 * time.Second)  
       fmt.Println("heavy work")  
    })  
  
    wg.Wait()  
}

Fan-in - собирает результаты из нескольких каналов в один.

func fanIn(input1, input2 <-chan string) <-chan string {
	ch := make(chan string)
	go func(){
		for {
			select {
				case s := <-input1: ch <- s
				case s := <-input2: ch <- s
			}
		}
	}()
	return ch
}

Fan-out - одна или несколько горутин пишут в канал, с другой стороны рабочие горутины читают канал, делают работу и умирают.

func worker(ch <-chan int, wg *sync.WaitGroup) {  
    wg.Done()  
    for v := range ch {  
       fmt.Println(v)  
       time.Sleep(1 * time.Second)  
    }  
}  
  
func sender() {  
    ch := make(chan int)  
    var wg sync.WaitGroup  
    for i := 0; i < 2; i++ {  
       wg.Add(1)  
       go worker(ch, &wg)  
    }  
  
    for i := 0; i < 10; i++ {  
       ch <- i  
    }  
  
    close(ch)  
  
    wg.Wait()  
    fmt.Println("done")  
}

Pipeline - данные обрабатываются цепочкой. Producer -> Producer/Consumer -> Consumer. Стадий обработки может быть сколько угодно.

func producer() <-chan int {  
    c := make(chan int)  
  
    go func() {  
       for i := 0; i <= 10; i++ {  
          c <- i + 1  
       }  
       close(c)  
    }()  
  
    return c  
}  
  
func producerConsumer(c <-chan int) <-chan int {  
    out := make(chan int)  
  
    go func() {  
       for v := range c {  
          out <- v * 2  
       }  
       close(out)  
    }()  
  
    return out  
}  
  
func consumer(ch <-chan int) {  
    for v := range ch {  
       fmt.Println(v)  
    }  
}

Cancellation - Способ прерывания горутин. Необходим, чтобы избегать висящих горутин, останавливать слишком долгие операции.

// 1. WithCancel
func worker(ctx context.Context) {  
    for {  
       select {  
       case <-ctx.Done():  
          fmt.Println("Done")  
          return  
       default:  
          fmt.Println("Working...")  
          time.Sleep(500 * time.Millisecond)  
       }  
    }  
}  
  
func main() {  
    ctx, cancel := context.WithCancel(context.Background())  
    go worker(ctx)  
  
    time.Sleep(1 * time.Second) // работаем  
    cancel()  // отменяем
    time.Sleep(1 * time.Second)  // время на завершение
}

// 2. WithTimeout
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) // спустя 2 секунды воркер перестанет работать 

// 3. WithDeadline. Можно указать точное время остановки.
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second)) // прибавляем к текущему времени две секунды

Worker pool - каждый воркер берёт задачу, делает работу и отправляет результат в канал, другая горутина, в нашем случае main, читает результат из канала.

func worker(jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {  
    defer wg.Done()  
  
    for j := range jobs {  
       time.Sleep(1 * time.Second)  
       fmt.Println("job", j)  
       results <- j * j  
    }  
  
}  
  
func main() {  
    jobs := make(chan int)  
    results := make(chan int)  
  
    var wg sync.WaitGroup  
  
    for i := 0; i < 3; i++ {  
       wg.Add(1)  
       go worker(jobs, results, &wg)  
    }  
  
    go func() {  
       for i := 0; i < 10; i++ {  
          jobs <- i  
       }  
       close(jobs)  
    }()  
  
    go func() {  
       wg.Wait()  
       close(results)  
    }()  
  
    for result := range results {  
       fmt.Println(result)  
    }  
}

Теги:
Хабы:
+7
Комментарии1

Публикации

Истории

Работа

Go разработчик
77 вакансий

Ближайшие события

4 – 5 апреля
Геймтон «DatsCity»
Онлайн
8 апреля
Конференция TEAMLY WORK MANAGEMENT 2025
МоскваОнлайн
25 – 26 апреля
IT-конференция Merge Tatarstan 2025
Казань
20 – 22 июня
Летняя айти-тусовка Summer Merge
Ульяновская область