Привет всем гоферам! В данной статье я хочу разобрать как можно воспользоваться модулем sync/atomic
для типа float64.
Постановка задачи
У нас есть канал из которого мы можем читать результаты выполнения задач. Результат задачи содержит флаг показывающий была ли ошибка при выполнении задачи и результат выполнения этой задачи (тип float64). Нужно найти сумму всех результатов и количество ошибок.
Реализация с использованием sync.Mutex
Задачу можно решить с использованием sync.Mutex
:
package main
import (
"fmt"
"math"
"sync"
"sync/atomic"
)
const countWorkers = 1000
const countTasks = 10000000
type Result struct {
value float64
hasError bool
}
func MakeTasks(count int) <-chan Result {
ch := make(chan Result)
go func() {
for i := 0; i < count; i++ {
ch <- Result{
value: float64(i) * 2.42,
hasError: (i % 10) == 0,
}
}
close(ch)
}()
return ch
}
func ProcessUsingMutex(ch <-chan Result, countWorkers int) (float64, int64) {
var wg sync.WaitGroup
var errMu sync.Mutex
var mu sync.Mutex
var countErrors int64
var result float64
for i := 0; i < countWorkers; i++ {
wg.Add(1)
go func() {
for item := range ch {
if item.hasError {
errMu.Lock()
countErrors++
errMu.Unlock()
} else {
mu.Lock()
result += item.value
mu.Unlock()
}
}
wg.Done()
}()
}
wg.Wait()
return result, countErrors
}
func main() {
ch := MakeTasks(countTasks)
fmt.Println(ProcessUsingMutex(ch, countWorkers))
}
Реализация с использованием sync/atomic
Но использование sync.Mutex
может замедлять нашу программу. Поэтому можно переписать решение с использованием sync/atomic
. Для количества ошибок это сделать довольно просто. Вместо использования sync.Mutex
используем atomic.AddInt64
, то есть:
errMu.Lock()
countErrors++
errMu.Unlock()
заменяем на
atomic.AddInt64(&countErrors, 1)
Но для float64 нет функции AddFloat64
. Но её можно реализовать. Для того, что бы лучше понять как это можно сделать напишем функцию AddInt64
с использованием CompareAndSwapInt64
func AddInt64(addr *int64, delta int64) (new int64) {
for {
v := *addr
nxt := v+delta
if atomic.CompareAndSwapInt64(addr, v, nxt) {
return nxt
}
}
}
Но для float64
нет и CompareAndSwap
, но мы можем сконвертировать значение float64
в uint64
используя math.Float64bits
и оперировать уже uint64
.
Реализация AddFloat64
может быть такой:
func AddFloat64(addr *uint64, delta float64) uint64 {
for {
cur := atomic.LoadUint64(addr)
curVal := math.Float64frombits(cur)
nxtVal := curVal + delta
nxt := math.Float64bits(nxtVal)
if atomic.CompareAndSwapUint64(addr, cur, nxt) {
return nxt
}
}
}
После того, как мы реализовали AddFloat64
можно полностью переписать нашу функцию, без использования мютексов:
func ProcessUsingAtomic(ch <-chan Result, countWorkers int) (float64, int64) {
var wg sync.WaitGroup
var countErrors int64
var total uint64
for i := 0; i < countWorkers; i++ {
wg.Add(1)
go func() {
for item := range ch {
if item.hasError {
atomic.AddInt64(&countErrors, 1)
} else {
AddFloat64(&total, item.value)
}
}
wg.Done()
}()
}
wg.Wait()
return math.Float64frombits(atomic.LoadUint64(&total)), atomic.LoadInt64(&countErrors)
}
Для сравнения mutex и atomic я написал небольшой бенчмарк
package main
import (
"testing"
)
func BenchmarkProcessUsingAtomic(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()
ch := MakeTasks(countTasks)
b.StartTimer()
ProcessUsingAtomic(ch, countWorkers)
}
}
func BenchmarkProcessUsingMutex(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()
ch := MakeTasks(countTasks)
b.StartTimer()
ProcessUsingMutex(ch, countWorkers)
}
}
Запуск данного benchmark показывает что atomic на 5-7% быстрее.
BenchmarkProcessUsingAtomic-8 1 2468118459 ns/op
BenchmarkProcessUsingMutex-8 1 2640532917 ns/op
PASS
Заключение
Модуль sync/atomic
может быть полезен, если его использовать вместо мютексов так как атомики работают немного быстрее мютексов. Если хочется использовать атомики для типов, которые не поддерживаются в sync/atomic
можно попробовать использовать стандартные функции для реализации нужного функционала, как я описал в данной статье или воспользоваться структурой Value
из модуля sync/atomic
.