Когда у вас 50+ узлов Tarantool в кластере, ручное управление соединениями превращается в боль. Узлы падают, реплики становятся мастерами, новые инстансы добавляются — и все это нужно отслеживать в реальном времени. 

Рассказываем, как мы спроектировали go-discovery — библиотеку для автоматического обнаружения узлов кластера Tarantool 3.0.

Почему статического пула недостаточно

В Tarantool 3.0 конфигурация кластера хранится централизованно — в etcd или Tarantool Config Storage. Это удобно для оркестрации, но предполагает новую задачу для клиентских приложений — например, создает трудности, когда клиентскому приложению нужно определить, куда именно отправлять запросы и как реагировать на изменения.

Так, упрощенно типовая схема взаимодействия под капотом имеет примерно следующий вид:

То есть в подобной реализации контроль над кластерами был несколько усложнен, что являлось для нас точкой роста и точно требовало оптимизации. 

Исходя из этого, мы начали искать вариант решения, который позволит нам:

  • получать список узлов по критериям: конкретные роли, теги, режим RW/RO;

  • следить за изменениями: узел упал, реплика стала мастером, добавился новый инстанс;

  • управлять пулом соединений: автоматически добавлять/удалять соединения, балансировать нагрузку.

Существующий go-tarantool/pool решает только третью задачу — и то частично. Например, он умеет держать соединения и следить за их состоянием, но не знает ничего про топологию кластера, поскольку список адресов задается статически при создании пула. То есть готового решения у нас не было и требовались другие способы внедрить Autodiscovery, то есть возможность автоматического обнаружения и идентификации узлов.

Рассмотренные альтернативы

В поиске вариантов решения под наши задачи мы рассматривали несколько подходов:

  • разработка отдельного микросервиса;

  • расширение go-tarantool-коннектора;

  • написание отдельной библиотеки.

Отдельный микросервис

В контексте разработки отдельного микросервиса наша идея заключалась в том, чтобы поднять сервис, который мониторит etcd и отдает клиентам актуальный список узлов через API. Вариант достоин внимания, но имеет некоторые нюансы. Например, с внедрением отдельного сервиса появлятся дополнительная точка отказа и Latency на каждый запрос списка узлов. Более того, для деплоя и мониторинга нужна инфраструктура.

Поэтому от этого варианта временно решили отказаться.

Примечание: В будущем такой сервис может появиться на базе самого Tarantool, но это отдельная история.

Интеграция в go-tarantool

Также мы рассматривали возможность расширения функционала существующего go-tarantool-коннектора. На первый взгляд, это вполне рациональное решение, которое к тому же потенциально можно реализовать минимальными усилиями. Но и здесь есть неочевидные нюансы. Так:

  • придется тащить etcd-клиент во все проекты, даже если им не нужен Autodiscovery;

  • для простоты обновления отдельных компонентов хочется релизить go-discovery отдельно от коннектора.

Поэтому данный вариант нам также не подошел.

Отдельная библиотека

Реализовать Autodiscovery можно и в виде отдельной библиотеки. Причем после ревью стало очевидно, что у такого варианта есть много преимуществ. Например:

  • обеспечивается разделение ответственности;

  • можно получать топологию кластера без функционала создания пула соединений;

  • API можно портировать на другие языки, что позволит разработчикам лучше ориентироваться в экосистеме библиотек и языков;

  • можно выстраивать независимые циклы разработки.

Исходя из этого, именно этот вариант реализации мы и выбрали. 

Архитектура go-discovery-библиотеки: уровни абстракции

При разработке мы решили разделить библиотеку go-discovery на множество пакетов:

  • Dial — вспомогательные типы для создания соединений;

  • Discoverer — базовый функционал получения списка узлов;

  • Filter — набор фильтров для списка узлов;

  • Observer — позволяет контролировать обновления списка узлов по событиям;

  • Subscriber — совмещает типы Discoverer, Observer и Filter для создания потока обновлений топологии.

Благодаря такой реализации можно использовать только базовые пакеты, например, если нужен просто список узлов, или подключить pool для полноценного управления соединениями.

Основные возможности go-discovery-библиотеки

О каждом узле у нас есть вся необходимая информация:

type Mode int

const (
    ModeAll Mode = iota  // любой режим
    ModeRW               // можно писать и читать
    ModeRO               // только чтение
)

type Instance struct {
    Group      string   // имя группы
    Replicaset string   // имя репликасета
    Name       string   // имя узла
    Mode       Mode     // RW или RO
    URI        []string // адреса дл�� подключения
    Roles      []string // список ролей
    RoleTags   []string // теги из roles_cfg.tags
    AppTags    []string // теги из app.cfg.tags
}

Структура намеренно простая, поскольку мы не тащим сюда всю конфигурацию Tarantool — только то, что нужно для маршрутизации запросов. Вместе с тем этого достаточно, чтобы эффективно решать задачи в контексте наших сценариев.

Получение списка узлов: паттерн Discoverer

Библиотека читает конфигурацию кластера из централизованного хранилища и возвращает актуальный список узлов с их характеристиками: имя, группа, репликасет, роли, теги, режим работы (RW/RO), адреса для подключения.

Базовый интерфейс:

type Discoverer interface {
    Discovery(ctx context.Context) ([]Instance, error)
}

Одна реализация — один источник данных. Так, например, для etcd это выглядит таким образом:

type Etcd struct {
    client *clientv3.Client
    prefix string
}

func NewEtcd(client clientv3.Client, prefix string) Etcd {
    return &Etcd{client: client, prefix: prefix}
}

func (d *Etcd) Discovery(ctx context.Context) ([]Instance, error) {
    // Читаем конфигурацию из etcd по префиксу,
    // парсим YAML, возвращаем список Instance
}

Примечательно, что клиент сам создает и конфигурирует подключение к etcd — мы не навязываем свои настройки.

Кроме того, пакет содержит discoverer.Tarantool, который позволяет получать информацию из Tarantool Config Storage.

Фильтрация: композиция вместо наследования

Зачастую для работы нужны не все узлы кластера, а только их подмножество по определенным критериям. Поэтому в библиотеке мы предусмотрели возможность использования фильтров, что��ы комбинировать узлы по разным признакам. Например:

  • по группам — выбрать узлы из конкретных групп;

  • по репликасетам — ограничить выборку определенными репликасетами;

  • по именам — указать конкретные узлы;

  • по режиму RW/RO — только мастера или только реплики;

  • по ролям — узлы с определенными ролями (например, storage, api, router);

  • по тегам ролей — фильтрация по roles_cfg.tags;

  • по тегам приложения — фильтрация по app.cfg.tags.

Интерфейс фильтра довольно прост:

type Filter interface {
    Filter(instance Instance) bool
}

При этом мы предоставляем готовые реализации, которые покрывают типичные случаи:

type Groups struct {
    Groups []string
}

type Replicasets struct {
    Replicasets []string
}

type Names struct {
    Names []string
}

type Mode struct {
    Mode []Mode
}

type Roles struct {
    Roles []string
}

type RoleTags struct {
    RoleTags []string
}

type AppTags struct {
    AppTags []string
}

Более того, фильтры комбинируются через тип Filter в пакете Discoverer. Например:

type Filter struct {
    discoverer Discoverer
    filters    []Filter
}

func NewFilter(discoverer Discoverer, filters ...Filter) *Filter {
    return &Filter{discoverer: discoverer, filters: filters}
}

func (d *Filter) Discovery(ctx context.Context) ([]Instance, error) {
    instances, err := d.discoverer.Discovery(ctx)
    if err != nil {
        return nil, err
    }

    var result []Instance
    for _, inst := range instances {
        if d.matchAll(inst) {
            result = append(result, inst)
        }
    }
    return result, nil
}

Пример использования:

etcdcli, _ := clientv3.New(clientv3.Config{
    Endpoints: []string{"http://127.0.0.1:2379"},
})
defer etcdcli.Close()

// Получаем только RW-узлы с ролями vshard или config
di := discoverer.NewFilter(
    discoverer.NewEtcd(etcdcli, "/tarantool/config"),
    filter.Roles{Roles: []string{"vshard", "config"}},
    filter.Mode{Mode: []Mode{ModeRW}},
)
instances, err := d.Discovery(ctx)

Важно, что в пакете Discoverer есть еще один полезный декоратор — Connectable. Он возвращает только те узлы, к которым реально можно подключиться прямо сейчас:

type Connectable struct {
    factory    DialerFactory
    discoverer Discoverer
}

func (d *Connectable) Discovery(ctx context.Context) ([]Instance, error) {
    instances, err := d.discoverer.Discovery(ctx)
    if err != nil {
        return nil, err
    }

    var connectable []Instance
    for _, inst := range instances {
        if d.canConnect(ctx, inst) {
            connectable = append(connectable, inst)
        }
    }
    return connectable, nil
}

Подписка на изменения: Observer Pattern

Однократный запрос списка узлов — это хорошо, но недостаточно. Конфигурация меняется, и мы хотим узнавать об этом. Именно поэтому в библиотеке мы реализовали несколько режимов отслеживания:

  • подписку на изменения — мгновенное уведомление при изменении конфигурации;

  • периодический опрос — Fallback-вариант с настраиваемым интервалом.

Сами события имеют следующий вид:

// Event is a base event for an instance configuration update.
type Event struct {
    // Type of the current event.
    Type EventType
    // Old refers to a previous configuration of the instance. The value
    // present for EventTypeUpdate and EventTypeRemove.
    Old Instance
    // New refers to a new configuration of the instance. This value is
    // present for EventTypeUpdate and EventTypeAdd.
    New Instance
}

// EventType defines an enumeration of available event types.
type EventType int

const (
    // EventTypeAdd defines an instance configuration add event.
    EventTypeAdd EventType = iota // add
    // EventTypeUpdate defines an instance configuration update event.
    EventTypeUpdate // update
    // EventTypeRemove defines an instance configuration remove event.
    EventTypeRemove // remove
)

При этом библиотека позволяет отслеживать три типа событий:

  • удаление узлов;

  • обновление узлов;

  • добавление узлов.

Интерфейс Observer имеет следующий вид:

type Observer interface {
    Observe(events []Event, err error)
}

Клиент реализует этот интерфейс и получает батчи событий. Если пришла ошибка, подписка завершен��.

Scheduler: когда проверять обновления

В пакете Scheduler предусмотрено два подхода:

  • периодический опрос;

  • Watch на etcd.

Периодический опрос реализуется через следующую структуру:

type Periodic struct {
    period time.Duration
}

func (s *Periodic) Wait(ctx context.Context) error {
    select {
    case <-time.After(s.period):
        return nil
    case <-ctx.Done():
        return ctx.Err()
    }
}

Структура Watch на etcd:

type EtcdWatch struct {
    client *clientv3.Client
    prefix string
}

func (s *EtcdWatch) Wait(ctx context.Context) error {
    watchChan := s.client.Watch(ctx, s.prefix, clientv3.WithPrefix())
    select {
    case <-watchChan:
        return nil
    case <-ctx.Done():
        return ctx.Err()
    }
}

Здесь важно отметить, что Watch эффективнее, поскольку мы не тратим ресурсы на Polling. Вместе с тем периодический опрос может пригодиться как Fallback.

Собираем вместе

// Пример реализации Observer на стороне клиента
type myObserver struct {
    instances map[string]Instance
    mu sync.Mutex
}

func (o *myObserver) Observe(events []Event, err error) {
    if err != nil {
        log.Printf("subscription error: %v", err)
        return
    }

    o.mu.Lock()
    defer o.mu.Unlock()

    for _, event := range events {
        switch event.Type {
        case EventTypeAdd:
            log.Printf("instance added: %s", event.New.Name)
            o.instances[event.New.Name] = event.New
        case EventTypeRemove:
            log.Printf("instance removed: %s", event.Old.Name)
            delete(o.instances, event.Old.Name)
        case EventTypeUpdate:
            log.Printf("instance updated: %s", event.New.Name)
            o.instances[event.New.Name] = event.New
        }
    }
}

// Настройка подписки
scheduler := scheduler.NewEtcdWatch(etcdcli, "/tarantool/config")
defer scheduler.Stop()

subscriber := subscriber.NewConnectable(
    subscriber.NewFilter(
    subscriber.NewSchedule(scheduler, discoverer),
        AppTagsFilter{AppTags: []string{"my-app"}},
    ),
)

observer := &myObserver{instances: make(map[string]Instance)}
subscriber.Subscribe(observer)
defer subscriber.Unsubscribe(observer)

Pool соединений с Autodiscovery

Теперь самое интересное — пул, который сам управляет соединениями на основе событий от Subscriber. Например, Pool:

  • автоматически добавляет соединения к новым узлам;

  • удаляет соединения к недоступным узлам;

  • балансирует нагрузку между доступными узлами.

Балансировка

Балансировщик имеет следующий интерфейс:

type Balancer interface {
    Add(instance Instance)
    Remove(name string)
    Next(mode Mode) (string, bool)
}

В библиотеке доступны две стратегии балансировки:

  • Round Robin — запросы распределяются по кругу между всеми узлами;

  • Priority — запросы идут на узлы с наивысшим приоритетом (например, предпочитать свой дата-центр), остальные используются как резерв.

Структура RoundRobinBalancer следующая:

type RoundRobinBalancer struct {
    instances []Instance
    current   uint64
    mu        sync.RWMutex
}

func (b *RoundRobinBalancer) Next(mode Mode) (string, bool) {
    b.mu.RLock()
    defer b.mu.RUnlock()

    candidates := b.filterByMode(mode)
    if len(candidates) == 0 {
        return "", false
    }

    idx := atomic.AddUint64(&b.current, 1) % uint64(len(candidates))
    return candidates[idx].Name, true
}

Структура PriorityBalancer:

type PriorityBalancer struct {

    priorityFunc func(Instance) int
    instances    []Instance
    mu           sync.RWMutex
}

func NewPriorityBalancer(priorityFunc func(Instance) int) *PriorityBalancer {
    return &PriorityBalancer{priorityFunc: priorityFunc}
}

func (b *PriorityBalancer) Next(mode Mode) (string, bool) {
    b.mu.RLock()
    defer b.mu.RUnlock()

    // Выбираем узлы с максимальным приоритетом,
    // среди них — round-robin
}

Причем типичный кейс, как уже упоминали раньше, — предпочитать узлы из своего ЦОДа:

balancer := NewPriorityBalancer(func(inst Instance) int {
    switch inst.Group {
    case "datacenter-msk":
        return 20  // высший приоритет
    case "datacenter-spb":
        return 10
    default:
        return 0
    }
})

Так, в примере выше все запросы пойдут в datacenter-msk. Если там все узлы недоступны — в datacenter-spb.

Pool

В итоге высокоуровневая реализация пула соединений к Tarantool с поддержкой балансировки нагрузки, обработки событий и настроек подключения имеет следующую структуру:

type Pool struct {
    pool       *tarantoolpool.Pool  // базовый пул из go-tarantool
    subscriber Subscriber
    balancer   Balancer
    factory    DialerFactory
    opts       tarantool.Opts
}

func NewPool(subscriber Subscriber, balancer Balancer,
    factory DialerFactory, opts tarantool.Opts) (*Pool, error) {
    // Подписываемся на события, инициализируем пул
}

func (p Pool) Do(request tarantool.Request, mode Mode) tarantool.Future {
    name, ok := p.balancer.Next(mode)
    if !ok {
        // Нет узлов с нужным режимом
    }
    return p.pool.Do(request, name)
}

// Observe реализует интерфейс Observer.
func (p *Pool) Observe(events []discovery.Event, err error) {

}

Примечание: Отдельно стоит отметить, что метод Do здесь позволяет выполнять запрос на нужном экземпляре (мастер, реплика или неважно).

Полный пример

В итоге полное решение для автоматического обнаружения и динамического обновления конфигурации пула соединений к Tarantool, управляемого посредством etcd, сводится к следующей реализации:

// Подключаемся к etcd.
etcd, err := clientv3.New(clientv3.Config{
    Endpoints: []string{"http://etcd-1:2379", "http://etcd-2:2379"},
})
if err != nil {
    fmt.Println("Unable to start etcd client:", err)
    return
}
defer etcd.Close()

// Конфигурируем пул подключений.
p, err := pool.NewPool(
    dial.NewNetDialerFactory("testuser", "testpass", tarantool.Opts{
        Timeout: 5 * time.Second,
    }),
    pool.NewRoundRobinBalancer(),
)
if err != nil {
    fmt.Println("Unable to create a pool:", err)
    return
}

// Создаем Scheduler, который будет следить за изменениями в префиксе.
sched := scheduler.NewEtcdWatch(etcd, "/prefix")
defer sched.Stop()

// Создаем Discoverer, который будет получать конфигурацию кластера.
disc := discoverer.NewFilter(
    // Получаем конфигурацию из префикса.
    discoverer.NewEtcd(etcd, "/prefix"),
    // Выбираем экземпляры только из группы foo.
    filter.GroupOneOf{Groups: []string{"foo"}},
)

// Создаем Subscriber, который будет обновлять конфигурацию в пуле
// соединений.
sub := subscriber.NewSchedule(sched, disc)

// Подписываем pool соединений на обновления.
err := sub.Subscribe(context.Background(), p)
if err != nil {
    fmt.Println("Failed to subscribe:", err)
    return
}
defer sub.Unsubscribe(p)

for {
    // Создаем любой запрос..
    request := tarantool.NewEvalRequest("return box.cfg.listen")
    // Отправляем запрос в пул соединений.
    result, err := p.Do(request, discovery.ModeAny).Get()
    if errors.Is(err, pool.ErrNoConnectedInstances) {
        // Пул соединений еще не обнаружил экземпляр для выполнения
        // запроса. Повторяем запрос.
        continue
    } else if err != nil {
        // Другая ошибка.
        fmt.Println("Unexpected error: %w", err)
    }

    // Обрабатываем result.
    break
}

В результате мы имеем реализацию, где etcd служит для централизованного хранения и отслеживания изменений конфигурации кластера Tarantool, а пул подключений управляет нагрузкой и автоматически перенаправляет запросы на доступные узлы. Это позволяет повысить отказоустойчивость и эффективность работы приложения, быстро реагируя на изменения инфраструктуры и поддерживая стабильную обработку запросов независимо от текущих условий доступности компонентов.

Ограничения текущей реализации библиотеки

Несмотря на функциональность нашей go-discovery-библиотеки, в ее текущей версии пока остаются определенные ограничения. Остановимся на некоторых из них. При этом сразу нужно оговориться, что над всеми упомянутыми ниже аспектами мы уже работаем, чтобы расширить возможности библиотеки.

  • Только централизованное хранилище. На данный момент поддерживается загрузка конфигурации из etcd и Tarantool Config Storage, а загрузка из локального файла конфигурации — нет.

  • Режим Failover. При off или manual Failover режим узла (RW/RO) хранится в etcd, и мы можем фильтровать заранее. Но при election или supervised режим определяется динамически. В результате приходится при каждом запросе проверять box.info.ro на узле. Это добавляет Latency, но других вариантов нет.

  • Теги узлов. Сейчас в Tarantool нельзя задать теги для отдельного узла — только для роли (roles_cfg.tags) или приложения (app.cfg.tags). Полноценные теги на уровне инстанса появятся позже, и тогда мы добавим TagsFilter.

  • Бенчмарки. Бенчмарков пока нет, но библиотека в активной разработке. При этом архитектура подразумевает, что узкое место — это сеть (запросы к etcd и Tarantool), а не сама библиотека.

Итоги и планы

Созданная нами go-discovery-библиотека уже доступна на GitHub под лицензией BSD-2. Она достойно зарекомендовала себя в условиях реальных проектов и значительно упростила работу с любыми кластерами Tarantool. При этом важно, что наша реализация — не «костыль», а эффективное решение, которое легко кастомизируется под нужные сценарии: например, можно гибко комбинировать фильтры и декораторы, легко добавлять новые источники данных. 

Но работа над go-discovery-библиотекой на этом не заканчивается. Так, в перспективе мы планируем добавить:

  • прямое получение топологии от Tarantool — когда узлы научатся отдавать информацию о соседях;

  • теги для отдельных узлов — сейчас поддерживаются только теги ролей и приложений;

  • метрики и трейсинг — интеграцию с Prometheus и OpenTelemetry.

Попробуйте go-discovery и расскажите, что думаете. Если есть вопросы по архитектуре или конкретным решениям, пишите в комментариях.