
База
Параллельность - выполнение задач в один момент времени на разных логических ядрах.
Конкурентность - выполнение задач последовательно, но со сменой контекста на другую задачу в ожидание завершения иной задачи. У пользователя может возникнуть иллюзия многозадачности даже в однопроцессорной системе, поскольку смена контекста происходит быстро (микросекунды).
Процессы:
Раздельная память
Раздельные ресурсы
Раздельные регистры
Потоки:
Общая память
Общие ресурсы
Раздельные стэк и регистры
Горутины:
Общая память
Общие ресурсы
Общий системный стэк
Общие регистры
Go runtime представляет модель P:M:G.
P - представляет логическое ядро процессора.
M - поток ОС по числу процессоров P.
G - структура, которая выполняет переданную функцию, создаётся по необходимости, минимум одна на старте программы (main). Стэк всего 2кб, может расширятся до 1гб для 64x и до 250кб для 32х систем.
Управление горутинами осуществляется планировщиком Go, а не ОС. Планировщик Go работает в пользовательском пространстве. Мы не можем напрямую управлять на каком процессоре будет исполняться горутина, за это отвечает планировщик.
Канал - очередь сообщений, которая умеет работать в многопоточной среде, работает по принципу FIFO.
Есть два типа каналов: буферизованный и небуферизованный.
Первый может хранить несколько сообщений, второй только одно.
Синхронизация
sync.WaitGroup - счётчик, который позволяет подождать завершения горутин.
sync.Mutex - блокирует доступ к ресурсу.
sync.RwMutex - разделяемая блокировка на чтение и запись. Читать могут несколько горутин, но мутировать данные только одна.
sync.Atomic - атомарная операция чтения и записи. Работает только с простыми значениями.
sync.Map - lock-free структура. Работает так же, как и обычная map, но потокобезопасная, можно использовать в многопоточной среде. Хорошо подходит для случаев, где надо много читать и мало писать. Если надо много писать, то лучше использовать обычную map и sync.RwMutex.
Небуферизованный канал
Действие | Открытый канал | Закрытый канал | Неинициализированный канал |
|---|---|---|---|
Чтение | Блокировка до прихода писателя | Zero value | Блокировка навсегда |
Запись | Блокировка до прихода читателя | Panic | Блокировка навсегда |
Закрытие | Канал закроется | Panic | Panic |
Буферизованный канал
Действие | Открытый и частично заполненный | Открытый и полностью заполненный | Открытый и пустой | Закрытый и частично заполненный |
|---|---|---|---|---|
Чтение | Прочитаем значение | Прочитаем значение | Блокировка до прихода писателя | Прочитаем значение |
Запись | Запишем значение | Блокировка до прихода читателя | Запишем значение | Panic |
Ограничения канала
Действие | Канал только на чтение | Канал только на запись |
|---|---|---|
Чтение | Ошибка компиляции | |
Запись | Ошибка компиляции | |
Закрытие | Ошибка компиляции |
Важные правила
Закрывает канал тот, кто в него пишет.
Если пишет несколько продюсеров, то закрывает тот, кто создал продюсеров.
Не закрытый канал держит ресурсы. Закрывать надо явно.
Паттерны
Generator - микропаттерн, который наполняет канал. Закрываем канал, чтобы не было проблем.
func generator() <- chan int { ch := make(chant int) go func(){ for i := 0; i <= 12; i++ { ch <- i + 1 } close(ch) }() return ch }
Wrapper - оборачиваем функцию, добавляя функциональность. Если вам что-то говорит слово декоратор, то это тот самый паттерн.
func wrapper(wg *sync.WaitGroup, fn func()) { wg.Add(1) go func() { defer wg.Done() fmt.Println("Work before func") fn() time.Sleep(1 * time.Second) fmt.Println("Work after func") }() } func main() { var wg sync.WaitGroup wrapper(&wg, func() { time.Sleep(1 * time.Second) fmt.Println("heavy work") }) wg.Wait() }
Fan-in - собирает результаты из нескольких каналов в один.
func fanIn(input1, input2 <-chan string) <-chan string { ch := make(chan string) go func(){ for { select { case s := <-input1: ch <- s case s := <-input2: ch <- s } } }() return ch }
Fan-out - одна или несколько горутин пишут в канал, с другой стороны рабочие горутины читают канал, делают работу и умирают.
func worker(ch <-chan int, wg *sync.WaitGroup) { wg.Done() for v := range ch { fmt.Println(v) time.Sleep(1 * time.Second) } } func sender() { ch := make(chan int) var wg sync.WaitGroup for i := 0; i < 2; i++ { wg.Add(1) go worker(ch, &wg) } for i := 0; i < 10; i++ { ch <- i } close(ch) wg.Wait() fmt.Println("done") }
Pipeline - данные обрабатываются цепочкой. Producer -> Producer/Consumer -> Consumer. Стадий обработки может быть сколько угодно.
func producer() <-chan int { c := make(chan int) go func() { for i := 0; i <= 10; i++ { c <- i + 1 } close(c) }() return c } func producerConsumer(c <-chan int) <-chan int { out := make(chan int) go func() { for v := range c { out <- v * 2 } close(out) }() return out } func consumer(ch <-chan int) { for v := range ch { fmt.Println(v) } }
Cancellation - Способ прерывания горутин. Необходим, чтобы избегать висящих горутин, останавливать слишком долгие операции.
// 1. WithCancel func worker(ctx context.Context) { for { select { case <-ctx.Done(): fmt.Println("Done") return default: fmt.Println("Working...") time.Sleep(500 * time.Millisecond) } } } func main() { ctx, cancel := context.WithCancel(context.Background()) go worker(ctx) time.Sleep(1 * time.Second) // работаем cancel() // отменяем time.Sleep(1 * time.Second) // время на завершение } // 2. WithTimeout ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) // спустя 2 секунды воркер перестанет работать // 3. WithDeadline. Можно указать точное время остановки. ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second)) // прибавляем к текущему времени две секунды
Worker pool - каждый воркер берёт задачу, делает работу и отправляет результат в канал, другая горутина, в нашем случае main, читает результат из канала.
func worker(jobs <-chan int, results chan<- int, wg *sync.WaitGroup) { defer wg.Done() for j := range jobs { time.Sleep(1 * time.Second) fmt.Println("job", j) results <- j * j } } func main() { jobs := make(chan int) results := make(chan int) var wg sync.WaitGroup for i := 0; i < 3; i++ { wg.Add(1) go worker(jobs, results, &wg) } go func() { for i := 0; i < 10; i++ { jobs <- i } close(jobs) }() go func() { wg.Wait() close(results) }() for result := range results { fmt.Println(result) } }
