Как стать автором
Обновить
588.3
OTUS
Цифровые навыки от ведущих экспертов

Кастомные memory pools в Golang

Уровень сложностиПростой
Время на прочтение7 мин
Количество просмотров2.2K

Привет, Хабр!

Сегодня рассмотрим, почему стандартный sync.Pool в Go не всегда спасает, когда речь идёт о высоконагруженных сервисах, и как кастомные memory pools позволяют взять под контроль работу с памятью.

Что не так с sync.Pool?

sync.Pool отлично подходит для переиспользования объектов в простых сценариях. Он работает хорошо, когда нагрузка стабильна и объекты используются короткое время. Но как только запускаем систему с жесткими требованиями к задержкам, возникают две проблемки:

  1. Непредсказуемость очистки:
    Сборщик мусора может в любой момент очистить объекты, находящиеся в sync.Pool. В самый неподходящий момент можно получить пустой пул, хотя ожидали, что объекты будут готовы к использованию.

  2. Отсутствие контроля над жизненным циклом объектов:
    Стандартный sync.Pool не позволяет задавать точную политику возврата объектов, обнуления их состояния или ограничения на количество одновременно существующих буферов. В итоге система вынуждена полагаться на внутренние механизмы GC, что может приводить к дополнительным задержкам и непредвиденным аллокациям.

Собственный memory pool позволяет полностью контролировать момент создания и возврата объектов, устанавливать точные лимиты на количество буферов, а также реализовывать собственные политики очистки и сброса состояния.

Архитектура кастомного memory poo

Основная мысль проста: используем каналы для хранения готовых к использованию объектов и атомарные операции для отслеживания общего количества аллокаций.

В реализации есть два момента:

Структура Object: каждый объект содержит уникальный ID для отладки, фиксированный буфер Data заранее определённого размера (чтобы избежать неожиданных аллокаций) и временную метку CreatedAt, которая помогает отслеживать «возраст» объекта для своевременного обновления или очистки.

Структура MemoryPool: основная структура, реализующая пул памяти, включает канал pool для хранения свободных объектов (с емкостью, равной максимально допустимому числу объектов), параметр maxSize для ограничения количества создаваемых объектов, атомарно отслеживаемое currentAlloc, размер буфера objectSize для каждого объекта и логгер logger для детального мониторинга создания, использования и возврата объектов в пул.

Конструктор

В функции NewMemoryPool создаём канал с заданной емкостью и сразу проводим предаллокацию — заполняем пул заранее половиной максимально допустимого числа объектов. Почему половина? Потому что это позволяет снизить задержки на первых запросах, когда объект уже готов к использованию. Если объект нужен — он есть, а если нет, можно создавать новые до достижения лимита.

func NewMemoryPool(maxSize, objectSize int, logger *log.Logger) *MemoryPool {
	mp := &MemoryPool{
		pool:       make(chan *Object, maxSize),
		maxSize:    int32(maxSize),
		objectSize: objectSize,
		logger:     logger,
	}

	initialAlloc := maxSize / 2
	for i := 0; i < initialAlloc; i++ {
		mp.pool <- &Object{
			ID:        i,
			Data:      make([]byte, objectSize),
			CreatedAt: time.Now(),
		}
		atomic.AddInt32(&mp.currentAlloc, 1)
	}
	logger.Printf("MemoryPool инициализирован: %d объектов из %d", initialAlloc, maxSize)
	return mp
}

Здесь каждый объект создаётся заранее, и счетчик текущих аллокаций обновляется атомарно.

Метод Get:

Метод Get отвечает за получение объекта из пула. Его логика такова:

  • Сначала пытаемся забрать объект из канала. Если там что‑то есть, отлично — объект сразу возвращается.

  • Если канал пуст, проверяем, достигли ли мы лимита аллокаций. Если нет, создаём новый объект и возвращаем его.

  • Если лимит достигнут, ждём, пока какой‑нибудь объект освободится, используя контекст (context) для задания таймаута. Это гарантирует, что запрос не зависнет бесконечно.

func (mp *MemoryPool) Get(ctx context.Context) (*Object, error) {
	select {
	case obj := <-mp.pool:
		mp.logger.Printf("Объект ID %d получен из пула", obj.ID)
		return obj, nil
	default:
		if atomic.LoadInt32(&mp.currentAlloc) < mp.maxSize {
			newID := int(atomic.AddInt32(&mp.currentAlloc, 1))
			obj := &Object{
				ID:        newID,
				Data:      make([]byte, mp.objectSize),
				CreatedAt: time.Now(),
			}
			mp.logger.Printf("Создан новый объект ID %d", obj.ID)
			return obj, nil
		}
		select {
		case obj := <-mp.pool:
			mp.logger.Printf("Объект ID %d получен после ожидания", obj.ID)
			return obj, nil
		case <-ctx.Done():
			mp.logger.Println("Таймаут ожидания объекта в MemoryPool")
			return nil, errors.New("таймаут ожидания объекта")
		}
	}
}

Если используем select с контекстом, чтобы не ждать бесконечно.

Метод Put

После использования объекта его необходимо вернуть в пул. Но прежде чем это сделать, очищаем буфер объекта — гарантируем, что следующий пользователь получит «чистый» ресурс без остаточных данных. Если объект «старый» (например, создан более минуты назад), можно логировать этот факт для анализа работы пула. Если же канал переполнен, объект просто не сохраняется, и сборщик мусора его заберёт.

func (mp *MemoryPool) Put(obj *Object) {
	for i := range obj.Data {
		obj.Data[i] = 0
	}
	if time.Since(obj.CreatedAt) > time.Minute {
		mp.logger.Printf("Объект ID %d устарел, перерабатываем...", obj.ID)
	}
	select {
	case mp.pool <- obj:
		mp.logger.Printf("Объект ID %d возвращён в пул", obj.ID)
	default:
		mp.logger.Printf("Пул переполнен, объект ID %d отброшен", obj.ID)
	}
}

Таким образом каждый объект возвращается в рабочее состояние, и пул не выходит за рамки установленных лимитов.

Научиться использовать Go для разработки высокопроизводительных приложений и интеграций можно на онлайн-курсе.

Теперь перейдем к примерам применения.

Примеры применения

Обработка изображений

package main

import (
	"context"
	"fmt"
	"log"
	"net/http"
	"time"

	"example.com/mempool"
)

var (
	imagePool *mempool.MemoryPool
	logger    = log.Default()
)

func init() {
	// Инициализируем пул: максимум 250 объектов, каждый буфер — 16КБ (под обработку изображения)
	imagePool = mempool.NewMemoryPool(250, 16*1024, logger)
}

func imageProcessHandler(w http.ResponseWriter, r *http.Request) {
	// Устанавливаем таймаут ожидания буфера в 60 мс, чтобы не зависать
	ctx, cancel := context.WithTimeout(r.Context(), 60*time.Millisecond)
	defer cancel()

	// Берём объект из пула
	buf, err := imagePool.Get(ctx)
	if err != nil {
		http.Error(w, "Сервис перегружен, повторите попытку позже", http.StatusServiceUnavailable)
		return
	}
	defer imagePool.Put(buf)

	// Симулируем обработку изображения: наложение фильтра, ресайз и прочую магию
	time.Sleep(20 * time.Millisecond)

	// Допустим, что в результате обработки мы записываем некий индекс буфера в ответ
	fmt.Fprintf(w, "Изображение обработано, буфер ID: %d", buf.ID)
}

func main() {
	http.HandleFunc("/process-image", imageProcessHandler)
	logger.Println("Сервер обработки изображений запущен на :8080")
	log.Fatal(http.ListenAndServe(":8080", nil))
}

Используем кастомный memory pool для обработки изображений. Заранее создаём пул с 250 объектами, каждый из которых имеет буфер фиксированного размера (16КБ). Это позволяет сразу же по запросу получать готовый к использованию объект.

Обработка сообщений в очереди с высокой пропускной способностью

package main

import (
	"context"
	"fmt"
	"log"
	"sync"
	"time"

	"example.com/mempool"
)

var (
	messagePool *mempool.MemoryPool
	msgLogger   = log.Default()
)

func init() {
	// Инициализируем пул для сообщений: максимум 1000 объектов, размер буфера — 2КБ
	messagePool = mempool.NewMemoryPool(1000, 2*1024, msgLogger)
}

func processMessage(id int, wg *sync.WaitGroup) {
	defer wg.Done()

	// Контекст с коротким таймаутом, чтобы не блокировать обработку
	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Millisecond)
	defer cancel()

	// Получаем буфер для обработки сообщения
	obj, err := messagePool.Get(ctx)
	if err != nil {
		msgLogger.Printf("Сообщение %d: не удалось получить буфер: %v", id, err)
		return
	}
	defer messagePool.Put(obj)

	// Симуляция обработки сообщения: заполняем буфер данными и имитируем задержку
	for i := range obj.Data {
		obj.Data[i] = byte(id % 256)
	}
	time.Sleep(10 * time.Millisecond)
	msgLogger.Printf("Сообщение %d обработано с использованием объекта ID %d", id, obj.ID)
}

func main() {
	var wg sync.WaitGroup
	// Запускаем 1500 параллельных обработок сообщений
	for i := 0; i < 1500; i++ {
		wg.Add(1)
		go processMessage(i, &wg)
	}
	wg.Wait()
	fmt.Println("Все сообщения обработаны")
}

Каждое сообщение обрабатывается в отдельной горутине, где с помощью контекста с таймаутом получаем буфер из memory pool. Если буфер не успевает освободиться, запрос завершается с ошибкой, что позволяет избежать зависания системы. В данном примере размер буфера установлен в 2КБ, что вполне достаточно для обработки небольших сообщений, а также установлен лимит пула в 1000 объектов.

Кэширование HTTP-ответов для ускорения отдачи контента

package main

import (
	"context"
	"fmt"
	"log"
	"net/http"
	"time"

	"example.com/mempool"
)

var (
	cachePool   *mempool.MemoryPool
	cacheLogger = log.Default()
)

func init() {
	// Инициализируем пул для кэширования: максимум 200 объектов, размер буфера — 32КБ
	cachePool = mempool.NewMemoryPool(200, 32*1024, cacheLogger)
}

func cacheHandler(w http.ResponseWriter, r *http.Request) {
	// Контекст с таймаутом для получения буфера в 40 мс
	ctx, cancel := context.WithTimeout(r.Context(), 40*time.Millisecond)
	defer cancel()

	// Получаем буфер для формирования кэшированного ответа
	buf, err := cachePool.Get(ctx)
	if err != nil {
		http.Error(w, "Не удалось получить буфер для кэширования", http.StatusServiceUnavailable)
		return
	}
	defer cachePool.Put(buf)

	// Симулируем сбор данных из нескольких источников
	time.Sleep(8 * time.Millisecond)
	response := "Кэшированный ответ: Hello, Go-хабы!"
	n := copy(buf.Data, []byte(response))
	cacheLogger.Printf("Сформирован кэшированный ответ: %d байт", n)
	fmt.Fprintf(w, "%s", string(buf.Data[:n]))
}

func main() {
	http.HandleFunc("/cache", cacheHandler)
	cacheLogger.Println("Кэш-сервер запущен на :8081")
	log.Fatal(http.ListenAndServe(":8081", nil))
}

Здесь пул инициализируется с 200 объектами, каждый размером 32КБ, что позволяет обрабатывать любые объемные ответы. При получении запроса мы с помощью контекста с таймаутом в 40 мс получаем буфер из пула. Если буфер не успевает освободиться, сервер возвращает ошибку, что предотвращает зависание при высокой нагрузке.


Пишите комментарии и делитесь своими кейсами применения кастомных memory pool.

А напоследок напомню про открытые уроки по Go, которые пройдут в марте в Otus:

  • 11 марта: Дженерики в GO.
    На вебинаре вы увидите дженерики в Go с разных сторон; рассмотрите синтаксис, внутренние механизмы и практические подходы. Записаться

  • 18 марта: Работа с gRPC.
    После занятия вы сможете писать обратно совместимые Protobuf схемы и gRPC сервисы, научитесь взаимодействию через сетевые протоколы. Записаться

Теги:
Хабы:
Всего голосов 8: ↑2 и ↓6-2
Комментарии4

Публикации

Информация

Сайт
otus.ru
Дата регистрации
Дата основания
Численность
101–200 человек
Местоположение
Россия
Представитель
OTUS