Привет всем гоферам! В данной статье я хочу разобрать как можно воспользоваться модулем sync/atomic для типа float64.
Постановка задачи
У нас есть канал из которого мы можем читать результаты выполнения задач. Результат задачи содержит флаг показывающий была ли ошибка при выполнении задачи и результат выполнения этой задачи (тип float64). Нужно найти сумму всех результатов и количество ошибок.
Реализация с использованием sync.Mutex
Задачу можно решить с использованием sync.Mutex:
package main import ( "fmt" "math" "sync" "sync/atomic" ) const countWorkers = 1000 const countTasks = 10000000 type Result struct { value float64 hasError bool } func MakeTasks(count int) <-chan Result { ch := make(chan Result) go func() { for i := 0; i < count; i++ { ch <- Result{ value: float64(i) * 2.42, hasError: (i % 10) == 0, } } close(ch) }() return ch } func ProcessUsingMutex(ch <-chan Result, countWorkers int) (float64, int64) { var wg sync.WaitGroup var errMu sync.Mutex var mu sync.Mutex var countErrors int64 var result float64 for i := 0; i < countWorkers; i++ { wg.Add(1) go func() { for item := range ch { if item.hasError { errMu.Lock() countErrors++ errMu.Unlock() } else { mu.Lock() result += item.value mu.Unlock() } } wg.Done() }() } wg.Wait() return result, countErrors } func main() { ch := MakeTasks(countTasks) fmt.Println(ProcessUsingMutex(ch, countWorkers)) }
Реализация с использованием sync/atomic
Но использование sync.Mutex может замедлять нашу программу. Поэтому можно переписать решение с использованием sync/atomic. Для количества ошибок это сделать довольно просто. Вместо использования sync.Mutex используем atomic.AddInt64, то есть:
errMu.Lock() countErrors++ errMu.Unlock()
заменяем на
atomic.AddInt64(&countErrors, 1)
Но для float64 нет функции AddFloat64. Но её можно реализовать. Для того, что бы лучше понять как это можно сделать напишем функцию AddInt64 с использованием CompareAndSwapInt64
func AddInt64(addr *int64, delta int64) (new int64) { for { v := *addr nxt := v+delta if atomic.CompareAndSwapInt64(addr, v, nxt) { return nxt } } }
Но для float64 нет и CompareAndSwap, но мы можем сконвертировать значение float64 в uint64 используя math.Float64bits и оперировать уже uint64.
Реализация AddFloat64 может быть такой:
func AddFloat64(addr *uint64, delta float64) uint64 { for { cur := atomic.LoadUint64(addr) curVal := math.Float64frombits(cur) nxtVal := curVal + delta nxt := math.Float64bits(nxtVal) if atomic.CompareAndSwapUint64(addr, cur, nxt) { return nxt } } }
После того, как мы реализовали AddFloat64 можно полностью переписать нашу функцию, без использования мютексов:
func ProcessUsingAtomic(ch <-chan Result, countWorkers int) (float64, int64) { var wg sync.WaitGroup var countErrors int64 var total uint64 for i := 0; i < countWorkers; i++ { wg.Add(1) go func() { for item := range ch { if item.hasError { atomic.AddInt64(&countErrors, 1) } else { AddFloat64(&total, item.value) } } wg.Done() }() } wg.Wait() return math.Float64frombits(atomic.LoadUint64(&total)), atomic.LoadInt64(&countErrors) }
Для сравнения mutex и atomic я написал небольшой бенчмарк
package main import ( "testing" ) func BenchmarkProcessUsingAtomic(b *testing.B) { for i := 0; i < b.N; i++ { b.StopTimer() ch := MakeTasks(countTasks) b.StartTimer() ProcessUsingAtomic(ch, countWorkers) } } func BenchmarkProcessUsingMutex(b *testing.B) { for i := 0; i < b.N; i++ { b.StopTimer() ch := MakeTasks(countTasks) b.StartTimer() ProcessUsingMutex(ch, countWorkers) } }
Запуск данного benchmark показывает что atomic на 5-7% быстрее.
BenchmarkProcessUsingAtomic-8 1 2468118459 ns/op
BenchmarkProcessUsingMutex-8 1 2640532917 ns/op
PASS
Заключение
Модуль sync/atomic может быть полезен, если его использовать вместо мютексов так как атомики работают немного быстрее мютексов. Если хочется использовать атомики для типов, которые не поддерживаются в sync/atomic можно попробовать использовать стандартные функции для реализации нужного функционала, как я описал в данной статье или воспользоваться структурой Value из модуля sync/atomic.