Как стать автором
Обновить
3122.42
RUVDS.com
VDS/VPS-хостинг. Скидка 15% по коду HABR15

Параллельное программирование на Go

Уровень сложностиСредний
Время на прочтение21 мин
Количество просмотров4.7K
Автор оригинала: Daniel Lemire

На практике, создаваемое нами ПО выполняется на множестве процессоров. К сожалению, многие наши допущения, справедливые для одного процессора, в случае нескольких процессоров становятся ложными. Например, каким будет состояние памяти, если два процессора изменяют один блок памяти? В общем случае на этот вопрос ответить сложно. Может случиться так, что внесённое одним процессом изменение перепишет внесённое другим. Справедливо может быть и обратное: может «победить» изменение другого процессора. Или оба процесса могут попытаться внести изменение одновременно, в результате чего возникнет неопределённое состояние, не соответствующее ни одному ожидаемому. Мы называем такие операции доступа «гонками данных» — ситуацией, в которой два или более процессоров в программе одновременно получают доступ к одной области памяти, и хотя бы одна из этих операций доступа выполняет запись без должной синхронизации.

Всё становится сложнее, если вам нужно, чтобы несколько процессоров изменяли одну и ту же память намеренно. Например, предположим, что у нас есть переменная, отслеживающая количество проданных товаров. В программе инкремент этой одной переменной могут выполнять несколько процессоров.

▍ Потоки и горутины


Поток (thread) — это наименьшая единица исполнения в рамках процесса, которую может независимо планировать и запускать операционная система. Поток представляет собой единую последовательность исполняемых CPU команд, позволяющую программе одновременно выполнять множественные задачи в рамках одного процесса. Поток существует внутри более масштабной сущности под названием «процесс», который, по сути, является выполняемой программой со своим пространством в памяти, ресурсами и состоянием. Процесс может содержать один или несколько потоков, использующих одну память и ресурсы (например, открытые файлы или глобальные переменные), выделенные для этого процесса.

Существует ограничение на количество потоков, которыми программа может управлять эффективно. Чтобы обеспечить возможность ещё большего параллелизма, в языке программирования Go есть собственная концепция потока, называемая горутиной (goroutine). Хоть горутина и не является потоком в традиционном смысле, внутренне она отображается в обычные потоки. Среда исполнения Go использует планировщик для отображения множества горутин в меньшее количество потоков. Это реальные потоки, понятные операционной системе — сущности уровня ядра с собственным стеком и контекстом выполнения.

Один поток в Go может выполнять несколько горутин благодаря эффективному переключению между ними. Благодаря этому горутины гораздо менее затратны, чем потоки операционной системы — на практике можно создавать тысячи или даже миллионы горутин, в то время как создание такого же количества потоков исчерпает ресурсы системы из-за того, что они занимают больше памяти.

В каком-то смысле Go размывает границу между конкурентностью и параллелизмом. Конкурентность (concurrency) — это управление множеством задач так, чтобы они могли выполняться независимо друг от друга. Параллелизм же подразумевает одновременное выполнение множественных задач с задействованием множества разных ресурсов. Конкурентность делает упор на проектирование ПО с целью координации задач и может работать как с одним, так и с несколькими ядрами, а параллелизм использует оборудование, чтобы достичь истинного одновременного исполнения. Эти два принципа могут комбинироваться, когда система с конкурентностью использует для эффективности параллельные ресурсы.

Для запуска горутины достаточно ввести ключевое слово go и функцию:

go func() {
    fmt.Println("Canada")
}()

При этом будет порождена горутина, но среда выполнения Go сама решает, на каком потоке она будет выполняться, и потенциально может запускать в том же потоке другие горутины.

К сожалению, программа, состоящая лишь из одной такой горутины, может разочаровать:

package main

import (
    "fmt"
)

func main() {
    go func() {
        fmt.Println("Canada")
    }()
}

Проблема в том, что функция main может завершиться до того, как завершилась горутина. В Go горутины выполняются конкурентно (одновременно), а функция main (то есть основная горутина) не имеет автоматического механизма ожидания завершения других горутин. Если основная горутина завершается, то программа тоже завершается, и потенциально это может произойти до завершения других горутин. Чтобы горутина точно завершилась до завершения программы, проще всего будет синхронизовать основную горутину с порождённой при помощи механизма канала или WaitGroup.

Канал (channel) в Go — это встроенная конструкция, позволяющая горутинам (конкурентным функциям) общаться друг с другом и синхронизировать своё выполнение. Канал имеет тип и создаётся при помощи функции make:

ch := make(chan int) // канал, работающий с integer

Ключевое слово chan используется для объявления канала. Тип после chan (например, int) определяет, какой тип данных может передавать канал.

Для отправки значения в канал используется оператор <-.

ch <- 42 // отправляем в канал значение 42

Для получения значения из канала тоже используется оператор <-.

value := <-ch // получаем значение из канала и сохраняем его в 'value'

Чтобы сообщить, что других данных передаваться не будет, используется функция close: close(ch). Отправка в закрытый канал вызывает панику.

Показанный ниже код выведет «Canada»:

package main

import "fmt"

func main() {
    ch := make(chan string) // Создаём канал для строк

    go func() {
        ch <- "Canada" // Отправляем сообщение в канал
    }()

    msg := <-ch // Получаем сообщение в основной горутине
    fmt.Println(msg)
}

Код ниже показывает, как можно использовать каналы, чтобы дождаться завершения горутины:

package main

import (
    "fmt"
)

func main() {
    channel := make(chan bool) // Создаём канал для сигналирования о завершении

    go func() {
        fmt.Println("Canada")
        channel <- true // Сигнал того, что горутина завершила выполнение
    }()

    <-channel // Ожидаем, пока горутина сигнализирует о завершении
}

После завершения выполнения горутина отправляет в канал значение (true). Функция main устанавливает блокировку на <-done, ожидая получения из канала, благодаря чему она не выполняет выход, пока не завершена горутина.

По умолчанию у канала нет буфера: он может содержать не больше одного значения, поэтому если попробовать записать в него несколько значений, то он заблокируется, пока не будет считано хотя бы одно значение.

ch := make(chan int, 2)
ch <- 1 // Блокировка не происходит (в буфере есть место)
ch <- 2 // Происходит блокировка (буфер теперь полон)
ch <- 3 // Блокировка сохраняется, пока значение не будет получено

В языке Go можно передавать функции множество разных каналов в виде аргументов. Каналы — это значения первого класса в Go, то есть их можно передавать как параметры, получать из функций и хранить в переменных. При передаче функции множества каналов они просто включаются в список параметров функции с указанием их типов.

Рассмотрим пример доступа к двум URL:

package main

import (
    "fmt"
    "net/http"
    "time"
)

// Структура Response для хранения URL и результата его получения
type Response struct {
    url    string
    status string
    err    error
}

func fetchURL(url string, ch chan Response) {
    // Создаём HTTP-клиент с таймаутом
    client := &http.Client{
        Timeout: 10 * time.Second,
    }

    // Выполняем запрос HTTP GET
    resp, err := client.Get(url)
    if err != nil {
        ch <- Response{url, "", err}
        return
    }
    defer resp.Body.Close()

    ch <- Response{url, resp.Status, nil}
}

func main() {
    // Записываем время начала
    startTime := time.Now()
    // Создаём канал для ответов
    ch := make(chan Response)

    // URL для получения
    urls := []string{
        "https://www.google.com",
        "https://www.github.com",
    }

    // Запускаем горутины для каждого URL
    for _, url := range urls {
        go fetchURL(url, ch)
    }

    // Собираем ответы
    for i := 0; i < len(urls); i++ {
        resp := <-ch
        if resp.err != nil {
            fmt.Printf("Error fetching %s: %v\n", resp.url, resp.err)
        } else {
            fmt.Printf("Successfully fetched %s: %s\n", resp.url, resp.status)
        }
    }

    // Закрываем канал (необязательно, потому что программа на этом завершается)
    close(ch)

    // Вычисляем и выводим прошедшее время
    elapsed := time.Since(startTime)

    fmt.Printf("\nTotal time taken: %s\n", elapsed)
}

Эта программа определяет структуру Response для хранения URL, его статуса и ошибок. Она реализует функцию fetchURL, которая получает в качестве параметров URL и канал, использует HTTP-клиент с 10-секундным таймаутом, выполняет запрос GET к URL и отправляет результат через канал. Она использует defer, чтобы гарантировать закрытие тела ответа. В данном случае в канал можно выполнять запись или чтение из функции: чтобы в него можно было только записывать, мы можем объявить его при передаче как ch chan<- Response, а не ch chan Response. В функции main мы создаём канал для получения ответов, определяем два URL для приёма, запускаем горутину для каждого URL, собираем ответы из канала и выводим результаты.

При выполнении программы мы при помощи отдельных горутин получаем оба URL одновременно; программа использует каналы для передачи результатов основной горутине и выводит статус (например, «200 OK») или ошибки для каждого URL.

Мы можем переписать эту программу без горутин, чтобы она была проще:

package main

import (
    "fmt"
    "net/http"
    "time"
)

// Структура Response для хранения URL и результатов его получения
type Response struct {
    url    string
    status string
    err    error
}

func fetchURLSynchro(url string) Response {
    // Создаём HTTP-клиент с таймаутом
    client := &http.Client{
        Timeout: 10 * time.Second,
    }

    // Делаем запрос HTTP GET
    resp, err := client.Get(url)
    if err != nil {
        return Response{url, "", err}
    }
    defer resp.Body.Close()

    return Response{url, resp.Status, nil}
}

func main() {
    // URL для получения
    urls := []string{
        "https://www.google.com",
        "https://www.github.com",
    }
    startTime := time.Now()

    for i := 0; i < len(urls); i++ {
        resp := fetchURLSynchro(urls[i])
        if resp.err != nil {
            fmt.Printf("Error fetching %s: %v\n", resp.url, resp.err)
        } else {
            fmt.Printf("Successfully fetched %s: %s\n", resp.url, resp.status)
        }
    }
    elapsed := time.Since(startTime)
    fmt.Printf("\nTotal time taken: %s\n", elapsed)
}

Эти две программы выполняют одну и ту же задачу, но в одной применяется две горутины (в дополнение к основной горутине), а в другой — только основная горутина. Протестировав эти программы, можно выяснить, что вариант с двумя горутинами выполняется быстрее: операции доступа к сети обычно затратны и их легко можно распараллеливать. То есть две задачи могут выполняться на компьютере практически независимо друг от друга даже при одновременном исполнении. Может оказаться так, что параллельные запросы к двум URL при помощи HTTP-запросов занимают 250 мс, а для последовательных запросов требуется 400 мс.

Однако из этого не стоит делать однозначный вывод, что чем больше горутин, тем быстрее работает ПО. Часто это не так. Более того, дополнительные горутины могут вызвать использование дополнительных процессоров, что увеличивает энергопотребление вашего ПО. Добавление горутин усложняет ПО, его поддержку и отладку.

Строго говоря, для конкурентного выполнения двух сетевых запросов не требуется параллелизм (то есть несколько физических процессоров). Исполнение таких запросов не требует особо много времени обработки и связано с ожиданием ответа от сети. Следовательно, в данном случае использование горутин с большой вероятностью уместно. При разбиении на горутины задач, требующих большего количества вычислительных ресурсов, рост производительности может и не произойти.

Чтобы проиллюстрировать это, давайте рассмотрим случай суммирования всех значений в массиве. Мы возьмём два примера: сначала маленький массив (100 тысяч элементов), а потом большой массив с миллионами элементов. В обоих случаях мы можем использовать или простую функцию (с одной горутиной), или функцию, использующую множество горутин. Чтобы максимизировать параллелизм, мы сделаем количество горутин равным количеству обнаруженных в системе языком Go процессоров (runtime.NumCPU()).

package main

import (
    "fmt"
    "runtime"
    "testing"
)

// sequentialSum вычисляет сумму массива последовательно
func sequentialSum(numbers []int) int {
    sum := 0
    for _, n := range numbers {
        sum += n
    }
    return sum
}

// goroutineSumWithChannels вычисляет сумму при помощи горутин и каналов
func goroutineSumWithChannels(numbers []int) int {
    numGoroutines := runtime.NumCPU() // Используем количество ядер CPU
    chunkSize := (len(numbers) + numGoroutines - 1) / numGoroutines
    resultChan := make(chan int, numGoroutines) // Буферизованный канал для частичных сумм
    activeGoroutines := 0
    // Разделяем массив на блоки и обрабатываем их при помощи горутин
    for i := 0; i < numGoroutines; i++ {
        start := i * chunkSize
        end := start + chunkSize
        if end > len(numbers) {
            end = len(numbers)
        }
        if start >= end {
            break
        }

        go func(slice []int) {
            partialSum := 0
            for _, n := range slice {
                partialSum += n
            }
            resultChan <- partialSum
        }(numbers[start:end])
        activeGoroutines++
    }

    // Собираем частичные суммы из канала
    total := 0
    for i := 0; i < activeGoroutines; i++ {
        total += <-resultChan
    }
    close(resultChan)

    return total
}

// Функции бенчмарков
func BenchmarkSequentialSum(b *testing.B) {
    numbers := make([]int, 100000)
    for i := range numbers {
        numbers[i] = i
    }

    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        sequentialSum(numbers)
    }
}

func BenchmarkGoroutineSumWithChannels(b *testing.B) {
    numbers := make([]int, 100000)
    for i := range numbers {
        numbers[i] = i
    }

    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        goroutineSumWithChannels(numbers)
    }
}

// Функции бенчмарков
func BenchmarkSequentialSumLarge(b *testing.B) {
    numbers := make([]int, 10000000)
    for i := range numbers {
        numbers[i] = i
    }

    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        sequentialSum(numbers)
    }
}

func BenchmarkGoroutineSumWithChannelsLarge(b *testing.B) {
    numbers := make([]int, 10000000)
    for i := range numbers {
        numbers[i] = i
    }

    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        goroutineSumWithChannels(numbers)
    }
}

func main() {
    fmt.Printf("Number of CPU cores: %d\n", runtime.NumCPU())

    res := testing.Benchmark(BenchmarkGoroutineSumWithChannels)
    fmt.Println("BenchmarkGoroutineSumWithChannels", res)
    ress := testing.Benchmark(BenchmarkSequentialSum)
    fmt.Println("BenchmarkSequentialSum", ress)

    resl := testing.Benchmark(BenchmarkGoroutineSumWithChannelsLarge)
    fmt.Println("BenchmarkGoroutineSumWithChannelsLarge", resl)
    ressl := testing.Benchmark(BenchmarkSequentialSumLarge)
    fmt.Println("BenchmarkSequentialSumLarge", ressl)
}

В системе с большим количеством процессоров мы можем получить следующий результат:

Количество ядер CPU: 128
BenchmarkGoroutineSumWithChannels     4048      258798 нс/оп.
BenchmarkSequentialSum    23756      50516 нс/оп.
BenchmarkGoroutineSumWithChannelsLarge      744    1414114 нс/оп.
BenchmarkSequentialSumLarge      237       5030224 нс/оп.

Мы видим, что при суммировании небольшого массива способ с 128 горутинами занимает в пять раз больше времени. Если он использует 128 процессоров, то он может оказаться в 128 * 5 = 640 раз менее эффективным! Можно сделать вывод, что если задача очень незатратная, например, суммирование тысяч integer, то не следует использовать больше одной горутины.

В случае, когда мы суммируем 10 миллионов integer, распараллеленная задача интереснее: она выполняется в 3,6 раза быстрее. Решение с одной горутиной, вероятно, более эффективно: один процессор тратит в 3,6 раз больше времени, чем одна сотня горутин.

Проблема с простым суммированием заключается в том, что самыми затратными оказываются операции доступа к памяти, а не вычисления. А что, если рассмотреть более затратную задачу? Давайте суммируем синус значений массива при помощи различного количества горутин (1, 2, …). В массиве будет находиться один миллион значений.

package main

import (
    "fmt"
    "math"
    "runtime"
    "testing"
)

func computeSineSum(numbers []int) float64 {
    sum := 0.0
    for _, n := range numbers {
        sum += math.Sin(float64(n))
    }
    return sum
}

// computeSineSumWithGoroutines вычисляет сумму квадратов при помощи указанного количества горутин
func computeSineSumWithGoroutines(numbers []int, numGoroutines int) float64 {
    chunkSize := (len(numbers) + numGoroutines - 1) / numGoroutines
    resultChan := make(chan float64, numGoroutines)

    for i := 0; i < numGoroutines; i++ {
        start := i * chunkSize
        end := start + chunkSize
        if end > len(numbers) {
            end = len(numbers)
        }
        if start >= end {
            break
        }

        go func(slice []int) {
            partialSum := 0.0
            for _, n := range slice {
                partialSum += math.Sin(float64(n))
            }
            resultChan <- partialSum
        }(numbers[start:end])
    }

    // Собираем результаты
    total := 0.0
    activeGoroutines := (len(numbers) + chunkSize - 1) / chunkSize
    for i := 0; i < activeGoroutines; i++ {
        total += <-resultChan
    }
    close(resultChan)
    return total
}

// Бенчмарки
func BenchmarkSequential(b *testing.B) {
    numbers := make([]int, 1000000)
    for i := range numbers {
        numbers[i] = i % 1000 // Ограничиваем числа по величине
    }

    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        computeSineSum(numbers)
    }
}

func Benchmark1Goroutines(b *testing.B) {
    numbers := make([]int, 1000000)
    for i := range numbers {
        numbers[i] = i % 1000
    }

    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        computeSineSumWithGoroutines(numbers, 1)
    }
}

func Benchmark2Goroutines(b *testing.B) {
    numbers := make([]int, 1000000)
    for i := range numbers {
        numbers[i] = i % 1000
    }

    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        computeSineSumWithGoroutines(numbers, 2)
    }
}

func Benchmark4Goroutines(b *testing.B) {
    numbers := make([]int, 1000000)
    for i := range numbers {
        numbers[i] = i % 1000
    }

    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        computeSineSumWithGoroutines(numbers, 4)
    }
}

func Benchmark8Goroutines(b *testing.B) {
    numbers := make([]int, 1000000)
    for i := range numbers {
        numbers[i] = i % 1000
    }

    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        computeSineSumWithGoroutines(numbers, 8)
    }
}

func Benchmark16Goroutines(b *testing.B) {
    numbers := make([]int, 1000000)
    for i := range numbers {
        numbers[i] = i % 1000
    }

    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        computeSineSumWithGoroutines(numbers, 16)
    }
}

func BenchmarkMaxGoroutines(b *testing.B) {
    numbers := make([]int, 1000000)
    for i := range numbers {
        numbers[i] = i % 1000
    }

    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        computeSineSumWithGoroutines(numbers, runtime.NumCPU())
    }
}

func main() {

    fmt.Printf("CPU cores: %d\n", runtime.NumCPU())
    res1 := testing.Benchmark(BenchmarkSequential)
    fmt.Println("BenchmarkSequential", res1)
    res11 := testing.Benchmark(Benchmark1Goroutines)
    fmt.Println("Benchmark1Goroutines", res11)
    res2 := testing.Benchmark(Benchmark2Goroutines)
    fmt.Println("Benchmark2Goroutines", res2)
    res4 := testing.Benchmark(Benchmark4Goroutines)
    fmt.Println("Benchmark4Goroutines", res4)
    res8 := testing.Benchmark(Benchmark8Goroutines)
    fmt.Println("Benchmark8Goroutines", res8)
    res16 := testing.Benchmark(Benchmark16Goroutines)
    fmt.Println("Benchmark16Goroutines", res16)
    resmax := testing.Benchmark(BenchmarkMaxGoroutines)
    fmt.Println("BenchmarkMaxGoroutines", resmax)
}

На мощной машине со множеством ядер мы можем получить следующие результаты:

Ядра CPU cores: 128
Benchmark1Goroutines      114     13701908 нс/оп.
Benchmark2Goroutines      134      8913817 нс/оп.
Benchmark4Goroutines      253      4648170 нс/оп.
Benchmark8Goroutines      472      2272842 нс/оп.
Benchmark16Goroutines      835     1227975 нс/оп.
BenchmarkMaxGoroutines      916    1189217 нс/оп.

Переход с одной горутины на две увеличивает скорость в 1,5 раза. Переход с одной горутины на 16 горутин увеличивает скорость в 11 раз. Увеличение количества горутин до более чем 16 не даёт дополнительного выигрыша. Такой паттерн сублинейного выигрыша с верхней границей достаточно типичен.

Тем не менее, горутины и каналы сами по себе могут быть на удивление эффективны. Давайте создадим цепочку каналов. У каждой горутины есть входной и выходной канал. Как только данные получаются во входной канал, они записываются во выходной канал. Мы соединим сотни горутин в цепочку входных и выходных каналов:

package main

import (
    "fmt"
    "time"
)

// Функция relay представляет каждую горутину в цепочке
func relay(input <-chan int, output chan<- int) {
    // Ожидаем значение из входного канала
    value := <-input
    // Отправляем значение в выходной канал
    output <- value
}

func main() {
    // Количество горутин в цепочке
    const chainLength = 10000

    // Создаём срез для хранения всех каналов
    channels := make([]chan int, chainLength+1)

    // Инициализируем все каналы
    for i := range channels {
        channels[i] = make(chan int)
    }

    // Начинаем отсчёт времени
    startTime := time.Now()

    // Создаём цепочку горутин
    for i := 0; i < chainLength; i++ {
        go relay(channels[i], channels[i+1])
    }

    // Отправляем первое значение в первый канала
    go func() {
        channels[0] <- 42
    }()

    // Ждём и получаем значение из последнего канала
    result := <-channels[chainLength]

    // Вычисляем затраченное время
    elapsed := time.Since(startTime)

    // Выводим результаты
    fmt.Printf("Value %d successfully passed through %d goroutines\n", result, chainLength)
    fmt.Printf("Time taken: %v\n", elapsed)
    fmt.Printf("Average time per hop: %v\n", elapsed/time.Duration(chainLength))
}

При выполнении этой программы можно получить следующий результат:

Значение 42 успешно передано через 10000 горутин
Потрачено времени: 13,987416 мс
Среднее время на переход: 1,398 мкс

По сути, таким образом можно обойти примерно миллион горутин в секунду.

▍ Группы ожидания


Ещё один распространённый способ управления множеством горутин заключается в использовании sync.WaitGroup. Прежде чем приводить пример, нам нужно разобраться с ключевым словом defer. В Go ключевое слово defer используется для того, чтобы запланировать вызов функции, которая должна выполняться непосредственно перед тем, как выполнит возврат окружающая её функция (содержащая конструкцию defer). Например, показанная ниже функция выведет Canada, а затем Mexico:

func() {
        defer fmt.Println("Mexico")
        fmt.Println("Canada")
    }

Давайте рассмотрим горутину, основанную на откладывании (defer) группы ожидания:

package main

import (
    "fmt"
    "sync"
)

func main() {
    var wg sync.WaitGroup
    wg.Add(1) // Выполняем инкремент счётчика WaitGroup на 1

    go func() {
        defer wg.Done() // Выполняем декремент после того, как горутина завершит выполнение
        fmt.Println("Canada")
    }()

    wg.Wait() // Ждём, пока счётчик не достигнет нуля
}

Для одной горутины, как в нашем примере, решение с каналами проще, потому что требует меньшего количества строк и в нём не нужны дополнительные import. Однако если горутин много, то может быть проще решение с группами ожидания.

▍ Атомарные операции


Если нам нужно считывать данные из разных горутин, то в случае постоянства данных это не вызывает никаких проблем. Если никто не выполняет запись в данные, то и сложностей не возникнет.

К сожалению, нам часто нужно менять данные в процессе их считывания из разных горутин. Иногда для коммуникаций можно использовать каналы, но случается, что этого недостаточно.

Рассмотрим пример: возьмём массив из десяти integer, горутины будут случайным образом выполнять декремент одного элемента массива, а затем инкремент другого элемента. Изначально сумма всех элементов должна быть равна 1000 и оставаться равной 1000 при условии отсутствия багов. Можно реализовать это в коде следующим образом:

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

func main() {

    // Инициализируем массив из 10 элементов, каждый из которых имеет значение 100
    arr := [10]int{100, 100, 100, 100, 100, 100, 100, 100, 100, 100}
    var wg sync.WaitGroup

    // Функция для поведения горутин
    worker := func() {
        defer wg.Done()
        wg.Add(1)
        r := rand.New(rand.NewSource(time.Now().UnixNano()))

        // В качестве примера выполняем 200000000 итераций
        for i := 0; i < 200000000; i++ {
            // Берём первый случайный индекс
            idx1 := r.Intn(10)
            // Продолжаем, только если значение > 0
            if arr[idx1] > 0 {
                // Выполняем декремент первого элемента
                arr[idx1]--

                // Берём второй случайный индекс
                idx2 := r.Intn(10)
                // Выполняем декремент второго элемента
                arr[idx2]++

            }
        }
    }

    // Запускаем две горутины
    go worker()
    go worker()
    fmt.Println("waiting...")
    wg.Wait()
    fmt.Println("waiting...ok")

    fmt.Println("\nFinal array state:", arr)
    // Проверяем, что общая сумма по-прежнему равна 1000 (10 * 100)
    sum := 0
    for _, val := range arr {
        sum += val
    }
    fmt.Println("Total sum:", sum)
}

В этой программе есть ошибка: она содержит гонки данных, потому что мы записываем и читаем данные из разных горутин без синхронизации. Программа может выдать следующий результат:

Final array state: [3001 644 880 324 2319 2845 3664 160 232 1741]
Total sum: 15810

Обратите внимание, что сумма больше ожидаемой.

В Go можно избежать такого бага с гарантией атомарности, обеспечиваемой пакетом sync/atomic: он обеспечивает выполнение операций наподобие инкрементов неделимыми шагами, что предотвращает условия гонки. Например, функции atomic.AddInt32(&x, 1) и atomic.AddInt64(&x, 1) гарантируют, что операция инкремента (чтение-изменение-запись) выполнится атомарно. Это значит, что даже если два потока будут выполнять инкремент конкурентно, операции будут сделаны последовательными на аппаратном уровне, и никакие обновления не потеряются.

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "sync/atomic"
    "time"
)

func main() {

    // Инициализируем массив из 10 элементов, каждый из которых имеет значение 100
    arr := [10]int32{100, 100, 100, 100, 100, 100, 100, 100, 100, 100}
    var wg sync.WaitGroup

    // Функция для поведения горутин
    worker := func() {
        defer wg.Done()
        wg.Add(1)
        r := rand.New(rand.NewSource(time.Now().UnixNano()))

        // В качестве примера выполняем 200000000 итераций
        for i := 0; i < 200000000; i++ {
            // Выбираем первый случайный индекс
            idx1 := r.Intn(10)
            // Продолжаем, только если значение > 0
            val := atomic.LoadInt32(&arr[idx1])
            if val > 0 {
                if atomic.CompareAndSwapInt32(&arr[idx1], val, val-1) {
                    // Выбираем второй случайный индекс
                    idx2 := r.Intn(10)
                    // Выполняем инкремент второго элемента
                    atomic.AddInt32(&arr[idx2], 1)
                }

            }
        }
    }

    // Запускаем две горутины
    go worker()
    go worker()
    fmt.Println("waiting...")
    wg.Wait()
    fmt.Println("waiting...ok")

    fmt.Println("\nFinal array state:", arr)
    // Проверяем, что общая сумма по-прежнему равна 1000 (10 * 100)
    sum := 0
    for _, val := range arr {
        sum += int(val)
    }
    fmt.Println("Total sum:", sum)
}

Выражение atomic.LoadInt32(&arr[idx1]) атомарно считывает значение по индексу массива idx1. Значение сохраняется в локальной переменной (val): для локальной переменной гонки данных невозможны. Затем мы используем операцию Compare-And-Swap (CAS): atomic.CompareAndSwapInt32(&arr[idx1], val, val-1). Она проверяет, по-прежнему ли arr[idx1] равно val (ранее загруженному значению) и если это так, присваивает arr[idx1] значение val-1. В случае успеха она возвращает true, и false, если значение после загрузки изменилось. Важно то, что она выполняется как единая атомарная операция. Далее мы используем atomic.AddInt32(&arr[idx2], 1) для атомарного прибавления 1 к arr[idx2].

При запуске этой новой программы сумма значений массива сохраняется. Программа безопасна.

▍ Мьютекс


Атомарные операции (например, atomic.AddInt32 или atomic.CompareAndSwapInt32) предназначены для единых неделимых операций с одной переменной. Когда мы имеем дело с более сложными структурами данных, их становится недостаточно.

В таких более сложных случаях мы используем мьютекс. Мьютекс (mutex, сокращённо от «mutual exclusion», то есть «взаимное исключение») — это примитив синхронизации, используемый в конкурентном программировании, чтобы не позволять различным потокам или процессам одновременно получать доступ к общему ресурсу или изменять его. Он гарантирует, что одновременно к критической части кода может получить доступ только один поток (или горутина), позволяя таким образом избежать условий гонки и поддерживая целостность данных. По сути, в конкретный момент времени может удерживаться только одна «блокировка» (lock).

Для примера создадим программу, в которой происходит транзакция денег между двумя счетами; нам нужно гарантировать, что снятие с одного счёта и депонирование на другой будут происходить без вмешательства других горутин. Для этого необходимо обеспечить защиту многоэтапной операции, на что неспособны атомарные операции.

package main

import (
    "fmt"
    "sync"
    "time"
)

type Bank struct {
    accounts map[string]int // Map ID счетов и балансов
    mutex    sync.Mutex    // Мьютекс для защиты всей операции транзакции
}

func NewBank() *Bank {
    return &Bank{
        accounts: map[string]int{
            "Alice": 1000,
            "Bob":   500,
        },
    }
}

func (b *Bank) Transfer(from, to string, amount int, wg *sync.WaitGroup) {
    defer wg.Done()

    // Блокируем мьютекс, чтобы защитить всю транзакцию
    b.mutex.Lock()
    defer b.mutex.Unlock() // Обеспечиваем разблокировку даже в случае возникновения паники

    // Проверяем достаточность средств на счёте-источнике
    if b.accounts[from] >= amount {
        // Выполняем транзакцию: две связанные операции
        b.accounts[from] -= amount
        b.accounts[to] += amount
        fmt.Printf("Transferred %d from %s to %s. New balances: %s=%d, %s=%d\n",
            amount, from, to, from, b.accounts[from], to, b.accounts[to])
    } else {
        fmt.Printf("Failed transfer of %d from %s to %s: insufficient funds\n",
            amount, from, to)
    }
}

func (b *Bank) GetBalance(account string) int {
    b.mutex.Lock()
    defer b.mutex.Unlock()
    return b.accounts[account]
}

func main() {
    bank := NewBank()
    var wg sync.WaitGroup

    // Запускаем несколько конкурентных транзакций
    wg.Add(4)
    go bank.Transfer("Alice", "Bob", 200, &wg)
    go bank.Transfer("Bob", "Alice", 100, &wg)
    go bank.Transfer("Alice", "Bob", 300, &wg)
    go bank.Transfer("Bob", "Alice", 50, &wg)

    wg.Wait()

    fmt.Printf("Final balances: Alice=%d, Bob=%d\n",
        bank.GetBalance("Alice"), bank.GetBalance("Bob"))
}

В общем случае, для получения и освобождения мьютекса требуются операции уровня системы, что добавляет оверхед в случае отсутствия ограничений. Это может замедлить программу по сравнению с не использующими блокировки альтернативами, например, с атомарными операциями.

В сложных случаях возможно возникновение взаимной блокировки (deadlock). Взаимная блокировка — это сбой конкурентности, при котором потоки попадают в ловушку цикличного ожидания ресурсов, не имея возможности продолжить из-за взаимных зависимостей. Мы можем модифицировать наш пример, чтобы добавить в него deadlock. Вместо глобального мьютекса создадим по мьютексу для каждого счёта. Если горутина получит счёт-источник, а затем счёт назначения, то возможен deadlock.

package main

import (
    "fmt"
    "sync"
    "time"
)

type Account struct {
    balance int
    mutex   sync.Mutex
}

type Bank struct {
    accounts map[string]*Account // Map ID счетов с объектами счетов с индивидуальными мьютексами
}

func NewBank() *Bank {
    return &Bank{
        accounts: map[string]*Account{
            "Alice": {balance: 1000},
            "Bob":   {balance: 500},
        },
    }
}

func (b *Bank) Transfer(from, to string, amount int, wg *sync.WaitGroup) {
    defer wg.Done()

    // Получаем счета
    fromAccount := b.accounts[from]
    toAccount := b.accounts[to]

    // Сначала блокируем счёт-источник
    fromAccount.mutex.Lock()
    fmt.Printf("Locked %s for transfer of %d to %s\n", from, amount, to)

    // Симулируем работу, чтобы повысить вероятность deadlock (необязательно, но помогает для демонстрации)
    time.Sleep(100 * time.Millisecond)

    // Далее пытаемся блокировать счёт назначения
    toAccount.mutex.Lock()
    fmt.Printf("Locked %s for transfer of %d from %s\n", to, amount, from)

    // Выполняем транзакцию
    if fromAccount.balance >= amount {
        fromAccount.balance -= amount
        toAccount.balance += amount
        fmt.Printf("Transferred %d from %s to %s. New balances: %s=%d, %s=%d\n",
            amount, from, to, from, fromAccount.balance, to, toAccount.balance)
    } else {
        fmt.Printf("Failed transfer of %d from %s to %s: insufficient funds\n",
            amount, from, to)
    }

    // Разблокируем оба счёта
    toAccount.mutex.Unlock()
    fromAccount.mutex.Unlock()
}

func (b *Bank) GetBalance(account string) int {
    acc := b.accounts[account]
    acc.mutex.Lock()
    defer acc.mutex.Unlock()
    return acc.balance
}

func main() {
    bank := NewBank()
    var wg sync.WaitGroup

    // Запускаем две транзакции в обоих направлениях, чтобы создать deadlock
    wg.Add(2)
    go bank.Transfer("Alice", "Bob", 200, &wg) // Alice -> Bob
    go bank.Transfer("Bob", "Alice", 100, &wg) // Bob -> Alice

    wg.Wait() // Это никогда не завершится из-за deadlock

    fmt.Printf("Final balances: Alice=%d, Bob=%d\n",
        bank.GetBalance("Alice"), bank.GetBalance("Bob"))
}

Взаимная блокировка в этом коде происходит из-за того, что две горутины получают мьютексы в разном порядке, приводя к циклическому ожиданию. Одна из стратегий, позволяющих избежать таких взаимных блокировок deadlock — использование упорядоченных мьютексов. Например, если счета пронумерованы, то мы всегда сначала блокируем счёт с меньшим номером.

▍ Заключение


Конкурентность — мощный инструмент современной разработки ПО, позволяющий программам использовать несколько процессоров для повышения производительности. Однако она добавляет существенные сложности, которые нужно тщательно учитывать. Возможность возникновения гонок данных, при которых несинхронизированный доступ к общей памяти приводит к непредсказуемым результатам, подчёркивает необходимость применения надёжных механизмов синхронизации. Горутины и каналы Go — это изящный и легковесный подход к конкурентности, позволяющий разработчикам эффективно распараллеливать такие задачи, как сетевые запросы или обработка данных, избегая при этом оверхеда, возникающего при работе с традиционными потоками. Однако стоит учитывать, что выигрыш производительности от использования параллелизма не гарантирован — в простых задачах из-за излишнего оверхеда горутин может происходить замедление, а при вычислительно-затратных операциях можно добиться существенной выгоды, отдача от которой, впрочем, становится с увеличением количества горутин всё более ничтожным.

Инструменты синхронизации наподобие sync.WaitGroup, атомарных операций из sync/atomic и мьютексов (sync.Mutex) обеспечивают защиту от сбоев конкурентности. Атомарные операции отлично подходят для изменения отдельных переменных, обеспечивая потокобезопасность с минимальным оверхедом, а мьютексы защищают многоэтапные операции со сложными структурами данных. Однако с мьютексами связаны свои риски, например, взаимные блокировки, возникающие из-за циклических зависимостей и требующие тщательного ппроектирования. Выбор подходящей стратегии конкурентности зависит от природы и масштаба задачи, а также от требований к производительности. В конечном итоге, для эффективного конкурентного программирования на Go нужен баланс между использованием параллелизма для скорости и сохранением простоты, корректности и эффективности в условиях ограничения общих ресурсов.

Telegram-канал со скидками, розыгрышами призов и новостями IT 💻
Теги:
Хабы:
+31
Комментарии2

Публикации

Информация

Сайт
ruvds.com
Дата регистрации
Дата основания
Численность
11–30 человек
Местоположение
Россия
Представитель
ruvds