Привет, Хабр!
Я просмотрел не мало источников, много из них были англоязычными, но хочу выделить отдельное спасибо авторам этих статей на Xабре:
Эта статья является неким сборником многих статей про все, что связано с параллелизмом в Go, например: горутины, каналы, select и многое другое. При создании статьи моя "карта" выглядела так:
Я хотел углубиться в тему горутин с параллелизмом и, слушая на фоне "The Doors", поглощал информацию и выделял интересные мысли из статей
Надеюсь, Вы найдете для себя то, что искали.
Горутины
Появление
Основным источником появления горутин и каналов в Go послужило CSP. Эта модель основывается на коммуникации параллельных программ через каналы.
Объяснение
Горутина - это легковесный тред, управляемый рантаймом Go
В Go приняты такие термины: G (Goroutine) — Горутина M (Machine) — Машина Каждая Машина работает в отдельном потоке и способна выполнять только одну Горутину в момент времени. Планировщик операционной системы, в которой работает программа, переключает Машины. Число работающих Машин ограничено переменной среды GOMAXPROCS
или функцией runtime.GOMAXPROCS(n int)
. По умолчанию она равна количеству ядер процессора компьютера, на которой было запущено приложение.
Для запуска функции как горутину необходимо написать go func()
, где func() - функция, которую хотите запустить. Имейте ввиду, что если вы сделаете так:
package main
import "fmt"
func main() {
// цикл с 5-ю итерациями
for i := 0; i < 5; i++ {
// вызываем функцию greeting() как горутину
go greeting()
}
}
func greeting() {
fmt.Println("Hello World!!!")
}
Go playground: https://go.dev/play/p/xdQFn8gEBFW
Ничего не произойдет, ведь горутина выполняется параллельно с "main()" и "main()" закончится еще до завершения горутины, а если функция "main()" завершается, завершается и вся программа.
Для того, чтобы программа отработала корректно, необходимо дождаться выполнения горутины. Надо использовать WaitGroup, но об этом в следующей главе. Пока что обойдемся time.Sleep(2 * time.Second)
. Функция "main()" "заснет" на две секунды, тем самым дождавшись выполнения "greeting()". Вот исправленный пример:
package main
import (
"fmt"
"time"
)
func main() {
// цикл с 5-ю итерациями
for i := 0; i < 5; i++ {
// вызываем функцию greeting() как горутину
go greeting()
}
// ожидаем незавершившиеся горутины
time.Sleep(2 * time.Second)
}
func greeting() {
fmt.Println("Hello World!!!")
}
Go playground: https://go.dev/play/p/KRwf_oyd0c1
Пакет "sync"
WaitGroup(замена time.Sleep())
WaitGroup - это примитив синхронизации, используемый для ожидания того, как множество горутин завершат свое выполнение. Пакет "sync" предоставляет тип "sync.WaitGroup" и его методы "Add()", "Done()" и "Wait()". По идее, WaitGroup Вы используете вместо того, что бы писать time.Sleep(). Это и более надежно, и ждать перестанете именно в тот момент, как все выполнится, а не когда закончится "sleep()". Давайте посмотрим простой пример с WaitGroup:
package main
import (
"fmt"
"sync"
"time"
)
func main() {
// создаем WaitGroup
wg := sync.WaitGroup{}
// цикл с 5-ю итерациями
for i := 0; i < 5; i++ {
// добавляем в список ожидания одну горутину
wg.Add(1)
go func(i int) {
// говорим, чтобы в конце анонимной функции одна горутина из списка ожидания исчезла
defer wg.Done()
// засыпаем, имитируя какую-то работу
time.Sleep(time.Duration(i) * time.Second)
fmt.Println("Горутина", i, "завершила свое выполнение")
}(i)
}
// ожидаем незавершившиеся горутины
wg.Wait()
fmt.Println("Все горутины завершили свое выполнение!")
}
Go playground: https://go.dev/play/p/XdsoK-FSPR_m
Здесь мы использовали WaitGroup для ожидания пяти горутин. Перед стартом каждой из них, мы использовали "wg.Add(1)" для увеличения счетчика горутин на 1. Внутри горутин, мы вызывали "wg.Done()"(что отнимает 1 от счетчка горутин) когда горутина заканчивала свое выполнение. Наконец, в "main()" мы вызывали "wg.Wait()" для ожидания завершения всех горутин.
Mutex
Mutex (сокращение "mutual exclusion", что в переводе означает "взаимное исключение") - это примитив синхронизации, который используется для защиты от использования данных, разрешая ими пользоваться только одной горутине. В Go, пакет "sync" предоставляет тип "sync.Mutex", который имеет 2 метода: "Lock()" и "Unlock().
Давайте посмотрим простой пример использования мютекса для защиты счетчика:
package main
import (
"fmt"
"sync"
)
func main() {
var (
counter int // счетчик
lock sync.Mutex // наш mutex
)
// создаем WaitGroup
wg := &sync.WaitGroup{}
// цикл с 5-ю итерациями
for i := 0; i < 5; i++ {
// добавляем в список ожидания одну горутину
wg.Add(1)
go func() {
// говорим, чтобы в конце анонимной функции одна горутина из списка ожидания исчезла
defer wg.Done()
// используем mutex для блокировки использования счетчика другими горутинами
lock.Lock()
// увеличиваем счетчик
counter++
fmt.Println("Счетчик:", counter)
// разблокируем счетчик
lock.Unlock()
}()
}
// ожидаем незавершившиеся горутины
wg.Wait()
fmt.Println("Итоговый счетчик:", counter)
}
Go playground: https://go.dev/play/p/eDm4HHbAGEB
В этом примере, мы использовали "sync.Mutex" для защиты от использования переменной "counter". Перед обновлением счетчика, мы вызываем "lock.Lock()" для создания блокировки горутины. Теперь ни одна другая горутина не сможет использовать "counter". После обновления счетчика, мы вызываем "lock.Unlock()" для удаления блокировки. После разблокировки счетчика, им снова может пользоваться кто-угодно.
Atomic operations/атомарные операции
Атомарные операции - это низкоуровневые примитивы синхронизации, которые обеспечивают способ выполнения операций чтения-модификации-записи (read-modify-write) над общими переменными без необходимости в блокировке (Атомарная операция - последовательность инструкций, исполняемых как одна неделимая единица работы. Т.е. это гарантия, что в процессе выполнения не произойдет переключение контекста. ). Пакет "sync/atomic" предоставляет несколько атомарных операций для целых чисел(int), указателей и чисел с плавающей точкой(float). Вот простой пример с использованием атомарных операций:
package main
import (
"fmt"
"sync"
"sync/atomic"
)
func main() {
// объявляем счетчик
var counter int64
// создаем WaitGroup
wg := &sync.WaitGroup{}
// цикл с 5-ю итерациями
for i := 0; i < 5; i++ {
// добавляем в список ожидания одну горутину
wg.Add(1)
go func() {
// говорим, чтобы в конце анонимной функции одна горутина из списка исчезла
defer wg.Done()
// атомарной операцией добалвяем к счетчику единицу
atomic.AddInt64(&counter, 1)
fmt.Println("Счетчик:", atomic.LoadInt64(&counter))
}()
}
// ожидаем незавершившиеся горутины
wg.Wait()
fmt.Println("Итоговый счетчик:", counter)
}
Go playground: https://go.dev/play/p/cLvSy7Mccdj
В этом примере, мы использовали функцию "atomic.AddInt64()" для атомарного изменения переменной "counter". Эта функция добавляет заданное значение к счетчику и возвращает новое. Мы также использовали функцию "atomic.LoadInt64()" для атомарного считывания текущего значения счетчика(переменная "counter"). Обратите внимание, что нам не нужно использовать мютекс для защиты счетчика, так как атомарные операции обеспечивают его безопасное обновление.
Каналы
Определение
Простыми словами: каналы - это инструменты коммуникации между горутинами.
Технически - это конвейер, откуда можно считывать или помещать данные. То есть одна горутина может отправить данные в канал, а другая — считать помещенные в этот канал данные и это тоже примитив синхронизации.
Использование
Создание
Создается канал так: ch := make(chan int)
. На месте "int" может быть любой тип данных, например: string.
Также можно использовать и "make()" со вторым аргументом - с размером канала. Таким образом он станет буферизорованным.
Длина и ёмкость
Буферизированный канал имеет длину и емкость. Длина канала - это количество значений в очереди в буфере канала, которые не были прочитаны. Емкость - это размер самого буфера канала. Для того, чтобы получить длину, мы используем функцию len(Ваш канал)
, а для получения емкости - cap(Ваш канал)
.
Закрытие
Канал можно и нужно закрыть. Как только записали в канал все, что хотели - закройте канал. Для этого надо вызвать функцию close(Ваш канал)
. Если необходимо проверить, закрыт ли канал, или возможно ли прочитать из него данные - напишите такое выражение: val, ok := <- Ваш канал
, где val - переменная, в которую запишется значение с канала, если это возможно, а ok - булева переменная, где true означает, что из канала можно прочитать данные, а false - что он закрыт(и все значения из него прочитаны, в случае буферизованного канала)
For range
С помощью for range можно читать данные из закрытого буферизированного канала, так как данные остаются в буфере даже после закрытия канала. Для этого необходимо написать такую структуру for range: for elem := range Ваш канал
, где elem - элемент из канала. Вот пример:
package main
import "fmt"
func main() {
// создаем канал
c := make(chan int, 5)
// записываем числа в канал
c <- 1
c <- 2
c <- 101
c <- 102
c <- 1000
// закрываем канал
close(c)
// циклом идем по каналу
for num := range c {
fmt.Println(num)
}
}
Go playground: https://go.dev/play/p/DjMEfLAsyZk
Deadlock
Дедлок - это ситуация, когда все горутины находятся в спящем состоянии и не могут быть "разбужены". Вот пример:
package main
func main() {
// создаем канал
c := make(chan int)
// пытаемся получить данные с канала, которые отсутствуют. Deadlock!
<-c
}
Go playground: https://go.dev/play/p/1CSAVnWNEOf
Select
Определение
select
- это почти что switch
, но без аргументов и работающее только с каналами. Иначе - примитив синхронизации, блокирующий поток исполнения, пока один из case'ов не сможет быть выполнен.
Использование
Select, когда два канала готовы к чтению данных одновременно
package main
import "fmt"
func main() {
// создаем каналы
number := make(chan int)
message := make(chan string)
// вызываем функции как горутины
go channelNumber(number)
go channelMessage(message)
// select
select {
case firstChannel := <- number:
fmt.Println("Данные канала:", firstChannel)
case secondChannel := <- message:
fmt.Println("Данные канала:", secondChannel)
}
}
// горутина, которая записывает число в канал
func channelNumber(number chan int) {
// записываем число в канал
number <- 15
}
// горутина, которая записывает строку в канал
func channelMessage(message chan string) {
// записываем строку в канал
message <- "Погружение в параллелизм в Go"
}
Go playground: https://go.dev/play/p/xlXKMfRVBSh
Здесь в селекте мы выбираем один из каналов. Одна горутина записывает данные в канал "number", а другая - в "message". Так как оба канала подготовлены, результат программы случаен(но на самом деле, в данном случае будет "Погружение в параллелизм в Go")
Select, когда один из каналов готов первым
package main
import (
"fmt"
"time"
)
func main() {
// создаем каналы
number := make(chan int)
message := make(chan string)
// вызываем функции как горутины
go channelNumber(number)
go channelMessage(message)
// select
select {
case firstChannel := <-number:
fmt.Println("Данные канала:", firstChannel)
case secondChannel := <-message:
fmt.Println("Данные канала:", secondChannel)
}
}
// горутина для записи числа в канал
func channelNumber(number chan int) {
// записываем число в канал
number <- 15
}
// горутина для записи строки в канал
func channelMessage(message chan string) {
// функция засыпает на 2 секунды
time.Sleep(2 * time.Second)
// записываем строку в канал
message <- "Погружение в параллелизм в Go"
}
Go playground: https://go.dev/play/p/vSVJT_F_Hn1
Здесь тоже есть две функции, которые записывают данные в каналы "number" и "message", но при одном условии. При записи в канал "message", функция засыпает на 2 секунды, делая "message" не готовым к использованию к тому моменту, как "number" уже будет заполненным.
Select, когда два канала одновременно запаздывают на 2 секунды
package main
import (
"fmt"
"time"
)
func main() {
// создаем каналы
number := make(chan int)
message := make(chan string)
// вызываем функции как горутины
go channelNumber(number)
go channelMessage(message)
// select
select {
case firstChannel := <-number:
fmt.Println("Данные канала:", firstChannel)
case secondChannel := <-message:
fmt.Println("Данные канала:", secondChannel)
}
}
// горутина для записи числа в канал
func channelNumber(number chan int) {
// функция засыпает на 2 секунды
time.Sleep(2 * time.Second)
number <- 15
}
// горутина для записи строки в канал
func channelMessage(message chan string) {
// функция засыпает на 2 секунды
time.Sleep(2 * time.Second)
message <- "Погружение в параллелизм в Go"
}
Go playground: https://go.dev/play/p/2OQOmTMmkt-
В данном случае - обе функции засыпают на 2 секунды. Что же будет делать select? Ответ: ничего. Он будет ждать, пока хоть один канал будет готов к чтению. Так что, здесь как и в первом случае, ответ будет случайным, так как спустя 2 секунды оба канала уже будут готовы
Default
package main
import (
"fmt"
"time"
)
func main() {
// создаем каналы
number := make(chan int)
message := make(chan string)
// вызываем функции как горутины
go channelNumber(number)
go channelMessage(message)
// select
select {
case firstChannel := <-number:
fmt.Println("Данные канала:", firstChannel)
case secondChannel := <-message:
fmt.Println("Данные канала:", secondChannel)
// default case
default:
fmt.Println("Подожди!!! Каналы еще не готовы к чтению!")
}
}
// горутина для записи числа в канал
func channelNumber(number chan int) {
// функция засыпает на 2 секунды
time.Sleep(2 * time.Second)
number <- 15
}
// горутина для записи строки в канал
func channelMessage(message chan string) {
// функция засыпает на 2 секунды
time.Sleep(2 * time.Second)
message <- "Погружение в параллелизм в Go"
}
Go playground: https://go.dev/play/p/S7psSqvugQS
Тут - вместо того, чтобы преостанавливать выполнение функции "main" селектом в ожидании готовности каналов, добавляем default (да, все равно, что в switch-case) и теперь, если ни один из каналов не будет готовен к выполнению select, в консоль выведется сообщение "Подожди!!! Каналы еще не готовы к чтению!"
Паттерны (concurrency patterns)
Fan-out
Fan out используется тогда, когда несколько горутин читают из одного и того же канала. Это чтение закончится только тогда, когда канал закроется.
Пример:
package main
import (
"fmt"
"time"
)
// функция по заполнению канала числами
func generate(nums ...int) <-chan int {
// создаем канал
out := make(chan int)
go func() {
// идем циклом по переданным числам
for _, n := range nums {
// записываем каждое число в канал
out <- n
}
}()
// возвращаем канал
return out
}
func main() {
fmt.Println("Запускаем Fan Out ")
// генерируем канал с числами 1, 2, 3
c1 := generate(1, 2, 3)
// запускаем первую горутину
go func() {
// циклом идем по первому каналу и печатаем каждое число из него
for num := range c1 {
fmt.Println(num)
}
}()
// запускаем вторую горутину
go func() {
// циклом идем по первому каналу и печатаем каждое число из него
for num := range c1 {
fmt.Println(num)
}
}()
// ожидаем незавершившиеся горутины
time.Sleep(time.Second * 2)
}
Здесь мы два раза вызываем функцию "generate()", в которой определяем канал int, запускаем горутину и в неё записываем переданные в функцию числа. После вызова горутины возвращаем канал. Получив два канала, запускаем две горутины, которые будут получать числа из канала 1 и канала 2, а после этого выводить их в консоль. В конце "засыпаем" на 2 секунды для того, чтобы дождаться завершения всех горутин.
Fan-in
Fan in используется тогда, когда одна функция читает с нескольких каналов, пока они не будут закрыты. Это полезно для, например, агрегации результатов параллельно выполняющихся задач.
Давайте напишем Fan-in:
func merge(in ...<-chan int) <-chan int {
// создаем WaitGroup
var wg sync.WaitGroup
// создаем итоговый канал
out := make(chan int)
// записываем функцию в переменную "output"
output := func(c <-chan int) {
// говорим, чтобы в конце анонимной функции одна горутина из списка ожидания исчезла
defer wg.Done()
// циклом идем по каналу "c"
for n := range c {
// в итоговый канал "out" записываем числа из канала "c"
out <- n
}
}
// добавляем в список ожидания столько же горутин, сколько каналов "in" было передано
wg.Add(len(in))
// циклом идем по всем каналам "in"
for _, c := range in {
// вызываем "output" как горутину
go output(c)
}
// ожидаем незавершившиеся горутины
wg.Wait()
return out
}
В этой функции мы объеденяем несколько каналов "in" в один канал "out"
Коротко про pipeline
Pipeline - это множество обработчиков и каждый из них принимает какие-либо входные данные, что-то сними делает, обрабатывает их и передает следующему.
Заключение
Я долго писал эту статью, вытаскивая из каждого источника самое важно и, немного переделывая, доделывая, писал все сюда. Надеюсь, Вы прочитали для себя то, что искали. А вот все источники, откуда я брал информацию: