Pull to refresh
2665.96
RUVDS.com
VDS/VPS-хостинг. Скидка 15% по коду HABR15

Go: жарим общие данные. Атомно, быстро и без мьютексов

Level of difficultyMedium
Reading time8 min
Views7.6K


Как правило, в Go для безопасного доступа к общим данным используются мьютексы. Да, каналы тоже можно приспособить для изменения общих данных, так как они потокобезопасны, но это усложняет и замедляет логику.

Но в этой статье мы поговорим о другом. Современные процессоры имеют поддержку атомарных операций, что позволяет на основе них организовывать работу с общими данными до нескольких раз быстрее, чем с помощью общепринятых вариантов. Так как мьютексы реализованы на основе ОС, каналы сделаны на основе внутреннего кода Go с использованием тех же мьютексов из ОС под капотом, а атомарные операции делает сам процессор аппаратно за существенно меньшее количество тактов.

Пример кода, который при многопоточности неожиданно начинает глючить


Начнём с основ. Вначале будет код, который параллельно к переменной прибавляет и убавляет по миллиону единиц. В типе A лежит счётчик, тип B добавляет 1 (метод B.II), тип С убавляет 1 (метод C.DD). И всё это параллельно. Казалось бы, тут не нужны никакие примитивы синхронизации, так как общее число прибавлений и убавлений равно миллиону, а поэтому результат будет равен нулю.

package main

import (
	"fmt"
	"sync"
)

type A struct {
	i int
}

func (a *A) Inc(delta int) {
	a.i += delta
}

func (a *A) Get() int {
	return a.i
}

type B struct {
}

func (b *B) II(c int) {
	for i := 0; i < c; i++ {
		a.Inc(1)
	}
	wg.Done()
}

type C struct {
}

func (c *C) DD(count int) {
	for i := 0; i < count; i++ {
		a.Inc(-1)
	}
	wg.Done()
}

var a A
var b B
var c C

var wg sync.WaitGroup

func main() {
	var nCount = 1000000
        a.i = 0

	wg.Add(1)
	go b.II(nCount)

	wg.Add(1)
	go c.DD(nCount)

	wg.Wait()
	fmt.Println("Sum", a.Get())
}

Однако если мы запустим этот код, то получим что угодно, но только не ноль. Пробуем!

Объяснить это можно только тем, что процессы параллельно лезут к одним и тем же ячейкам памяти, их действия накладываются друг на друга (хотя чему тут накладываться? инкремент/декремент — одна из самых примитивных операций), но компилятор не догадывается их использовать, а использует несколько регистров, вступают в действие кеши L1, L2 и L3, и на выходе вместо нуля белиберда.

Классика: мьютексы решают вопрос


Хорошо, теперь перепишем этот код классически, на базе мьютексов:

package main

import (
	"fmt"
	"sync"
)

type A struct {
	i int
	m sync.Mutex
}

func (a *A) Inc(delta int) {
	a.m.Lock()
	defer a.m.Unlock()

	a.i += delta
}

func (a *A) Get() int {
	a.m.Lock()
	defer a.m.Unlock()
	return a.i
}

type B struct {
}

func (b *B) II(c int) {
	for i := 0; i < c; i++ {
		a.Inc(1)
	}
	wg.Done()
}

type C struct {
}

func (c *C) DD(count int) {
	for i := 0; i < count; i++ {
		a.Inc(-1)
	}
	wg.Done()
}

var a A
var b B
var c C
var wg sync.WaitGroup

func main() {
	var nCount = 1000000
	a.i = 0

	wg.Add(1)
	go b.II(nCount)

	wg.Add(1)
	go c.DD(nCount)

	wg.Wait()
	fmt.Println("Sum", a.Get())
}

Мьютекс ожидаемо делает своё дело, и на выходе мы получаем ноль. Проверить самому. Кстати, мьютекс можно разместить в любом объекте, не только в защищаемом, и он сделает своё благородное дело. Только из соображений понятности и поддержки кода мьютексы обычно размещают в защищаемых объектах.

На сцену выходят примы: атомарные операции


Теперь же решим эту задачу неблокирующим, быстрым и красивым способом. Атомарным.

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
)

type A struct {
	i int64
}

func (a *A) Inc(delta int64) {
	atomic.AddInt64(&a.i, delta)
}

func (a *A) Get() int64 {
	return atomic.LoadInt64(&a.i)
}

type B struct {
}

func (b *B) II(c int) {
	for i := 0; i < c; i++ {
		a.Inc(1)
	}
	wg.Done()
}

type C struct {
}

func (c *C) DD(count int) {
	for i := 0; i < count; i++ {
		a.Inc(-1)
	}
	wg.Done()
}

var a A
var b B
var c C

var wg sync.WaitGroup

func main() {
	var nCount = 1000000
	a.i = 0

	wg.Add(1)
	go b.II(nCount)

	wg.Add(1)
	go c.DD(nCount)

	wg.Wait()
	fmt.Println("Sum", a.Get())
}

Во дела! Работает! Мы тоже получили на выходе ноль после параллельной работы увеличивающей и уменьшающей процедуры.

Достаточно было только использовать специфическую атомарную функцию atomic.AddInt64. Атомарное сложение — очень полезная функция, так как неблокирующим способом мы можем корректно складывать некие результаты из разных потоков практически без затрат времени на синхронизацию. Также я использовал atomic.LoadInt64, но в данном примере она необязательна.

Специфика атомарных операций в том, что они работают только с узким набором данных — это 32-х, 64-х битные данные и указатели.

▍ Сравнение скорости мьютексов и атомарных операций


Для того чтобы сравнить скорость действия атомарных операций и мьютексов я увеличил число операций до 100 миллионов.

Итак, атомарные операции: 2.12 секунды, мьютексы 5.85 секунды.
Как и следовало ожидать, атомарные операции существенно быстрее, примерно в 3 раза.

Частый случай использования атомарных операций, когда несколько горутин что-то делают с общим аккумулятором. Например, суммы убытков и прибылей из разных источников складываются и отображаются онлайн. А вот суммирование массива несколькими горутинами через атомарное сложение не ускоришь. Так как оно существенно дороже по процессорному времени, чем операция обычного сложения из-за сбросов кешей процессора.

С точки зрения процессора (и всех его многочисленных ядер) есть мир до атомарной операции и после. На основе этого принципа из атомарных операций можно делать самопальные конструкции для синхронизации многопоточной работы. И довольно быстрые.

Важный момент: скорость атомарных операций в виртуальной среде может быть значительно ниже из-за того, что они могут эмулироваться. Похоже, что так происходит в WSL.

Самодельный мьютекс на основе CompareAndSwap


Для того, чтобы создавать самодельные мьютексы, можно использовать специальную атомарную фунцию
CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)

С ней всё ещё проще. Сделаем самодельный мьютекс.

type MyMutex struct {
    i int64
}

func (m *MyMutex) get() int64 {
    return atomic.LoadInt64(&m.i)
}

func (m *MyMutex) set(val int64) {
    atomic.StoreInt64(&m.i, val)
}

func (m *MyMutex) Lock() {
    for atomic.CompareAndSwapInt64(&m.i, 0, 1) != true {
    }
    return
}

func (m *MyMutex) Unlock() {
    if m.get() == 0 {
      panic(fmt.Errorf("Double unlocking"))
    }

    m.set(0)
}

Этот подход известен в системном программировании как Compare and swap. Также для этой цели можно использовать подход Test and set, но пакет sync/atomic не предоставляет функций для этого подхода. «Compare and swap» в википедии считают более хорошим подходом. Дополнительная информация. При тестировании этот самодельный мьютекс показывает на 40% худшие результаты, чем оригинальный. Из-за нескольких причин:
  1. Есть цикл (ниже), на который тратится много процессорного времени.
    for atomic.CompareAndSwapInt64(&m.i, 0, 1) != true {
    }
    
    Если бы могли в него вставить, что-то типа ассемблерных инструкций NO_OPERATION или какой-то микрозадержки, то стал бы лучше работать. Стандартная функция time.Sleep() плохо работает для этой цели. При выставлении 1 наносекунды там может быть в тысячи раз большая реальная задержка.
  2. Планировщик ОС, при использовании системного мьютекса, выделяет поменьше квантов времени тому потоку, который просто ждёт захвата мьютекса.
  3. Библиотечный мьютекс, наверняка, написан на более низком уровне.
  4. Обвязка для самодельного мьютекса, скорее всего, менее эффективно компилируется в ассемблерные инструкции.

Безопасная многопоточная работа с общими данными без каналов и мьютексов


Хорошо, а если нам нужны какие-то более сложные операции и структуры данных? Можно ли тут получить пользу от атомарных операций?

Да, можно! Перейдём к самому интересному. Создадим на основе атомарных операций самодельную потокобезопасную конструкцию, которая будет контролировать состояние конечного автомата. И сделаем это по другому, чем ранее в статье.

▍ Конечный автомат, обслуживающий несколько потоков на атомарных операциях


Некий автомат имеет 3 состояния: открыт, занят уменьшением, занят увеличением. Это условно. Состояний может быть больше. Они могут быть называться как угодно. Суть в том, что есть общие данные любого вида и мы организуем к ним бесконфликтный доступ разных методов в зависимости от состояния объекта. Без использования мьютексов и каналов.

Например, потому что мы знаем, что атомарные операции могут быть сильно быстрее, или просто любим ненормальное программирование.

Итак, представляю код. В нём (для демонстрации потенциальных вариантов реализации) я обошёлся без функции CompareAndSwap. Хотя с ней было бы чуть проще.

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
)

const (
	Open int64 = iota
	Increasing
	Decreasing
)

type A struct {
	state int64
	i     int64
}

func (a *A) SetState(state int64) {
	atomic.StoreInt64(&a.state, state)
}

func (a *A) GetState() int64 {
	return atomic.LoadInt64(&a.state)
}

func (a *A) AddState(delta int64) int64 {
	return atomic.AddInt64(&a.state, delta)
}
// По хорошему тут должно быть 2 процедуры: для состояния Increasing и для Decreasing,
// но не хотелось плодить почти одинаковый код
func (a *A) Inc(delta int64) {
	if delta > 0 {
		a.SetState(Increasing)
	} else {
		a.SetState(Decreasing)
	}
	a.i += delta
	a.SetState(Open)
}

func (a *A) Init() {
	a.SetState(Open)
}

func (a *A) Get() int64 {
	return a.i
}

type B struct {
}

func (b *B) II(c int) {
	for i := 0; i < c; i++ {
	i_begin:
		for a.GetState() != Open {
		}
		if a.AddState(Increasing) == Increasing {
			a.Inc(1)
		} else {
			goto i_begin
		}

	}
	wg.Done()
}

type C struct {
}

func (c *C) DD(count int) {
	for i := 0; i < count; i++ {
	d_begin:
		for a.GetState() != Open {
		}
		if a.AddState(Decreasing) == Decreasing {
			a.Inc(-1)
		} else {
			goto d_begin
		}
	}
	wg.Done()
}

var a A
var b B
var c C
var wg sync.WaitGroup

func main() {
	a.Init()
	var nCount = 1000000

	wg.Add(1)
	go b.II(nCount)

	wg.Add(1)
	go c.DD(nCount)

	wg.Wait()
	fmt.Println("Sum", a.Get())
}

Запустить в Go Playground. Если проблем с многопоточностью нет, то код должен вывести ноль как результат. И выводит.

Обратите внимание, что в этом коде операция инкремента теперь обычная, а не атомарная, но тем не менее всё корректно считается.

Логика: когда объект A объявляет о том, что он открыт, то объекты B и C пытаются переключить его состояние в Increasing или Decreasing с помощью добавления к нулю (это состояние Open) значений Increasing или Decreasing. Тот, кто первый добавил, получает, например, Increasing. Второй уже получит значение Increasing + Decreasing, которое не даст ему начать делать свою работу и переведёт в состояние ожидания, пока состояние не станет Open. Мы можем использовать сколько угодно состояний, главное, чтобы сумма номеров любых остальных не равнялась никакому номеру другого состояния.

Вот кусочек с главным хаком, который делает нашу работу потокобезопасной:

for a.GetState() != Open {
}
if a.AddState(Decreasing) == Decreasing {
   a.Inc(-1)
}

Например, можно использовать степени двойки: 1, 2, 4, 8, 16, 32. Сумма любого количества состояний не будет равна никакому другому номеру состояния. Каждая степень двойки представлена бинарно отдельным битом, который не пересекается с другими битами. Каждое состояние будет представлено машинным словом со всего одной битовой единицей, только в разном месте. И поэтому при сложении состояний мы будем получать результат с несколькими единицами.

Наш переключатель реагирует только на чистые состояния, в которых только по одному включённому биту. И поэтому каждая горутина легко отличает ситуации, когда ей удалось занять очередь на исполнение, и начинает исполнять свою работу.

Для 100 миллионов циклов этот код выполнялся 14 секунд, аналогичный код с тремя состояними на мьютексах выполнялся 25.3 секунды. Код на мьютексах немного сложнее, чем мог бы быть, но только для того, чтобы имитировать объект с 3 состояниями и с минимально блокирующим доступом к нему. Наверное, более правильно его переписать на каналах, но тогда он будет выполняться ещё дольше, так как под капотом каналов всё те же мьютексы.

Сразу скажу, что в Go Playground не получится потестить мои скрипты со 100М выполнений, и функция времени там работает некорректно. Для того, чтобы воспроизвести мои эксперименты вам придётся компилировать самому.

Я не призываю переписывать все мьютексы на атомарных операциях, то что я рассказал, это скорее proof-of-concept. Но при этом нужно знать об атомарных операциях и почаще их использовать — инструмент великолепный и быстрый!

Вывод


Атомарные функции в Go дают возможность организовать как блокирующие, так и в особенности неблокирующие алгоритмы с механизмами синхронизации, которые иногда работают в 2-3 раза быстрее, чем мьютексы и каналы. В некоторых ситуациях низкоуровневого программирования это может дать существенную пользу, хотя и требует экстремальной внимательности во время разработки.

Во всех остальных ситуациях разработчики Go рекомендуют использовать каналы и мьютексы, как более понятные и наглядные в применении.

Нативные мьютексы от ОС имеют более тесную связь с планировшиком задач, поэтому нет смысла переписывать именно мьютексы на атомарных операциях, но в других задачах, в особенности в неблокирующих алгоритмах, стоит рассмотреть атомарные операции для ускорения алгоритмов.

Более детально посмотреть на атомарные функции вы можете в документации Go.

Ранее на Хабре на эту тему: Атомарные и неатомарные операции, Atomic operation.

Telegram-канал со скидками, розыгрышами призов и новостями IT 💻
Tags:
Hubs:
If this publication inspired you and you want to support the author, do not hesitate to click on the button
+47
Comments29

Articles

Information

Website
ruvds.com
Registered
Founded
Employees
11–30 employees
Location
Россия
Representative
ruvds