
База
Параллельность - выполнение задач в один момент времени на разных логических ядрах.
Конкурентность - выполнение задач последовательно, но со сменой контекста на другую задачу в ожидание завершения иной задачи. У пользователя может возникнуть иллюзия многозадачности даже в однопроцессорной системе, поскольку смена контекста происходит быстро (микросекунды).
Процессы:
Раздельная память
Раздельные ресурсы
Раздельные регистры
Потоки:
Общая память
Общие ресурсы
Раздельные стэк и регистры
Горутины:
Общая память
Общие ресурсы
Общий системный стэк
Общие регистры
Go runtime представляет модель P:M:G.
P - представляет логическое ядро процессора.
M - поток ОС по числу процессоров P.
G - структура, которая выполняет переданную функцию, создаётся по необходимости, минимум одна на старте программы (main). Стэк всего 2кб, может расширятся до 1гб для 64x и до 250кб для 32х систем.
Управление горутинами осуществляется планировщиком Go, а не ОС. Планировщик Go работает в пользовательском пространстве. Мы не можем напрямую управлять на каком процессоре будет исполняться горутина, за это отвечает планировщик.
Канал - очередь сообщений, которая умеет работать в многопоточной среде, работает по принципу FIFO.
Есть два типа каналов: буферизованный и небуферизованный.
Первый может хранить несколько сообщений, второй только одно.
Синхронизация
sync.WaitGroup - счётчик, который позволяет подождать завершения горутин.
sync.Mutex - блокирует доступ к ресурсу.
sync.RwMutex - разделяемая блокировка на чтение и запись. Читать могут несколько горутин, но мутировать данные только одна.
sync.Atomic - атомарная операция чтения и записи. Работает только с простыми значениями.
sync.Map - lock-free структура. Работает так же, как и обычная map, но потокобезопасная, можно использовать в многопоточной среде. Хорошо подходит для случаев, где надо много читать и мало писать. Если надо много писать, то лучше использовать обычную map и sync.RwMutex.
Небуферизованный канал
Действие | Открытый канал | Закрытый канал | Неинициализированный канал |
---|---|---|---|
Чтение | Блокировка до прихода писателя | Zero value | Блокировка навсегда |
Запись | Блокировка до прихода читателя | Panic | Блокировка навсегда |
Закрытие | Канал закроется | Panic | Panic |
Буферизованный канал
Действие | Открытый и частично заполненный | Открытый и полностью заполненный | Открытый и пустой | Закрытый и частично заполненный |
---|---|---|---|---|
Чтение | Прочитаем значение | Прочитаем значение | Блокировка до прихода писателя | Прочитаем значение |
Запись | Запишем значение | Блокировка до прихода читателя | Запишем значение | Panic |
Ограничения канала
Действие | Канал только на чтение | Канал только на запись |
---|---|---|
Чтение | Ошибка компиляции | |
Запись | Ошибка компиляции | |
Закрытие | Ошибка компиляции |
Важные правила
Закрывает канал тот, кто в него пишет.
Если пишет несколько продюсеров, то закрывает тот, кто создал продюсеров.
Не закрытый канал держит ресурсы. Закрывать надо явно.
Паттерны
Generator - микропаттерн, который наполняет канал. Закрываем канал, чтобы не было проблем.
func generator() <- chan int {
ch := make(chant int)
go func(){
for i := 0; i <= 12; i++ {
ch <- i + 1
}
close(ch)
}()
return ch
}
Wrapper - оборачиваем функцию, добавляя функциональность. Если вам что-то говорит слово декоратор, то это тот самый паттерн.
func wrapper(wg *sync.WaitGroup, fn func()) {
wg.Add(1)
go func() {
defer wg.Done()
fmt.Println("Work before func")
fn()
time.Sleep(1 * time.Second)
fmt.Println("Work after func")
}()
}
func main() {
var wg sync.WaitGroup
wrapper(&wg, func() {
time.Sleep(1 * time.Second)
fmt.Println("heavy work")
})
wg.Wait()
}
Fan-in - собирает результаты из нескольких каналов в один.
func fanIn(input1, input2 <-chan string) <-chan string {
ch := make(chan string)
go func(){
for {
select {
case s := <-input1: ch <- s
case s := <-input2: ch <- s
}
}
}()
return ch
}
Fan-out - одна или несколько горутин пишут в канал, с другой стороны рабочие горутины читают канал, делают работу и умирают.
func worker(ch <-chan int, wg *sync.WaitGroup) {
wg.Done()
for v := range ch {
fmt.Println(v)
time.Sleep(1 * time.Second)
}
}
func sender() {
ch := make(chan int)
var wg sync.WaitGroup
for i := 0; i < 2; i++ {
wg.Add(1)
go worker(ch, &wg)
}
for i := 0; i < 10; i++ {
ch <- i
}
close(ch)
wg.Wait()
fmt.Println("done")
}
Pipeline - данные обрабатываются цепочкой. Producer -> Producer/Consumer -> Consumer. Стадий обработки может быть сколько угодно.
func producer() <-chan int {
c := make(chan int)
go func() {
for i := 0; i <= 10; i++ {
c <- i + 1
}
close(c)
}()
return c
}
func producerConsumer(c <-chan int) <-chan int {
out := make(chan int)
go func() {
for v := range c {
out <- v * 2
}
close(out)
}()
return out
}
func consumer(ch <-chan int) {
for v := range ch {
fmt.Println(v)
}
}
Cancellation - Способ прерывания горутин. Необходим, чтобы избегать висящих горутин, останавливать слишком долгие операции.
// 1. WithCancel
func worker(ctx context.Context) {
for {
select {
case <-ctx.Done():
fmt.Println("Done")
return
default:
fmt.Println("Working...")
time.Sleep(500 * time.Millisecond)
}
}
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
go worker(ctx)
time.Sleep(1 * time.Second) // работаем
cancel() // отменяем
time.Sleep(1 * time.Second) // время на завершение
}
// 2. WithTimeout
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) // спустя 2 секунды воркер перестанет работать
// 3. WithDeadline. Можно указать точное время остановки.
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second)) // прибавляем к текущему времени две секунды
Worker pool - каждый воркер берёт задачу, делает работу и отправляет результат в канал, другая горутина, в нашем случае main, читает результат из канала.
func worker(jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for j := range jobs {
time.Sleep(1 * time.Second)
fmt.Println("job", j)
results <- j * j
}
}
func main() {
jobs := make(chan int)
results := make(chan int)
var wg sync.WaitGroup
for i := 0; i < 3; i++ {
wg.Add(1)
go worker(jobs, results, &wg)
}
go func() {
for i := 0; i < 10; i++ {
jobs <- i
}
close(jobs)
}()
go func() {
wg.Wait()
close(results)
}()
for result := range results {
fmt.Println(result)
}
}