Pull to refresh
387.26
Ozon Tech
Стремимся делать лучший e-commerce в России

База по шардированию базы

Reading time 10 min
Views 22K

Возможность горизонтального масштабирования это одно из важнейших нефункциональных требований индустрии в последнее время. Рост бизнеса со стороны IT выглядит чаще всего как рост нагрузки и цены отказа системы. Нам всем хочется создавать такие приложения, которые будут одинаково быстро и стабильно работать как с сотней, так и с сотней тысяч клиентов. Для этого необходимо еще на стадии проектирования закладывать потенциал для масштабирования, одним из способов которого является шардирование.

Мы на пальцах рассмотрим что такое шардирование, как оно помогает в масштабировании и даже рассмотрим тот самый этап «роста».

О чём речь?

Шардинг (или шардирование) — это разделение хранилища на несколько независимых частей, шардов (от англ. shard — осколок). Не путайте шардирование с репликацией, в случае которой выделенные экземпляры базы данных являются не составными частями общего хранилища, а копиями друг друга. 

Шардирование помогает оптимизировать хранение данных приложения за счёт их распределения между инсталляциями БД (которые находятся на разных железках), что улучшает отзывчивость сервиса, так как размер данных в целом на каждом инстансе станет меньше. 

Шардирование — это разновидность партиционирования (от англ. partition — деление, раздел). Отличие в том, что партиционирование подразумевает разделение данных внутри одной БД, а шардирование распределяет их по разным экземплярам БД. 

Способы шардирования

Осуществить шардирование можно несколькими способами:

  1. Средствами БД. Некоторые базы — MongoDB, Elasticsearch, ClickHouse и другие — умеют самостоятельно распределять данные между своими экземплярами, для этого достаточно настроить конфигурацию. На мой взгляд, это лучший вариант.

  2. Надстройками к БД. Самый спорный способ — применение надстроек, которые выполняют шардирование, например Vitess или Citus, поскольку при этом есть риск потери данных и производительности.

  3. Клиентскими средствами. В этом случае экземпляры БД даже не подозревают о существовании друг друга, шардированием управляет стороннее приложение — со всеми вытекающими рисками.

Методы работы в этих способах схожи: мы выбираем ключ для распределения данных (это может быть идентификатор, временная метка или хеш записи) и в соответствии с ним записываем информацию в нужный шард. Как правило, ключи стараются выбирать так, чтобы данные были равномерно распределены по шардам. Сделать это не сложно — достаточно ориентироваться на текущее содержимое БД. 

Важно учитывать для чего вы делаете шардирование. В случае если требуется распределить нагрузку на запись, необходимо подобрать такой ключ, который обеспечит равномерное распределение запросов между инстансами. Нельзя забывать и о «горячих» данных, запросы к которым происходят чаще, из-за чего нагрузка на шарды оказывается неравномерной. Для этого можно добавить в приложение метрику, показывающую, сколько раз в какой шард будут попадать данные по конкретному ключу. 

Пример шардирования

Давайте в качестве примера сделаем клиентское шардирование горячо любимой в Ozon PostgreSQL. Приложение будет на Go, а мигрировать будем с помощью Goose. Для начала нам надо добавить сами шарды, то есть развернуть еще одну инсталляцию БД. Отвлекаться на детальный разбор того, как правильно раскатывать PostgreSQL, мы не будем.

Добавим в наш Storage маппинг шардов: 

// Обозначим количество шардов.
const bucketQuantity = 2

const (
    Shard1 ShardNum = iota
    Shard2
)
// Для лучшей семантики.
type ShardNum int
type shardMap map[ShardNum]*sqlx.DB

type Storage struct {
    shardMap shardMap
}

Напишем конструктор для Storage который возьмёт на себя все задачи по инициализации соединений с БД.

func initShardMap(ctx context.Context, dsns map[ShardNum]string) shardMap {
    m := make(shardMap, len(dsns))
    for sh, dsn := range dsns {
   	 m[sh] = discoveryShard(ctx, dsn)
    }

    return m
}

func discoveryShard(ctx context.Context, dsn string) *sqlx.DB {
    db, err := sqlx.ConnectContext(ctx, "postgres", dsn)
    if err != nil {
   	 panic(err)
    }

    return db
}

func NewStorage(ctx context.Context, dsns map[ShardNum]string) *Storage {
    return &Storage{
   	 shardMap: initShardMap(ctx, dsns),
    }
}

Переходим к работе с данными. Реализуем методы для их записи в шарды и чтения оттуда. Начинается всё с определения того, в какой шард идти. 

При условии равномерности распределения наших ID (представим, что это действительно так) нам хватит классического остатка от деления. Выглядеть это будет примерно так:

func (s *Storage) shardByItemID(itemID int64) ShardNum {
   return ShardNum(itemID % bucketQuantity)
}

У нас есть вот такой незаурядный  метод чтения из БД. Тут стоит обратить внимание на то, что мы выполняем запрос на инстансе БД из нашего маппинга, а получаем инстанс (*sqlx.DB) по идентификатору шарда из сигнатуры.

func (s *Storage) getItemsByID(ctx context.Context, shard *sqlx.DB, itemsIDs []int64) ([]models.Item, error) {
   items := make([]models.Item, 0)
 
   query, args, err := sq.
       Select(itemsTableFields...).
       From(itemsTable).
       Where(sq.Eq{itemIDField: itemsIDs}).
       PlaceholderFormat(sq.Dollar).
       ToSql()
   if err != nil {
       err = errors.Wrap(err, "[create query]")
       return items, err
   }
 
   err = shard.SelectContext(ctx, &items, query, args...)
   return items, err
}

Сам идентификатор шарда мы получаем чуть выше, когда распределяем наши ItemIDs по кубышкам. Само распределение выглядит вот так:

func (s *Storage) sortItemsIDsByShard(itemIDs ...int64) map[ShardNum][]int64 {
   shardToItems := make(map[ShardNum][]int64)
 
   for _, id := range itemIDs {
       shardID := s.shardByItemID(id)
       if _, ok := shardToItems[shardID]; !ok {
           shardToItems[shardID] = make([]int64, 0)
       }
 
       shardToItems[shardID] = append(shardToItems[shardID], id)
   }
 
   return shardToItems
}

Ну и инфраструктурная обёрточка — чтобы запросы выполнялись параллельно. Вот так будет выглядеть публичный метод получения Item. Кажется, что он довольно большой, но в действительности  большую часть метода съедают раскручивания каналов.

func (s *Storage) GetItems(ctx context.Context, itemIDs ...int64) ([]models.Item, error) {
   shardToItems := s.sortItemsIDsByShard(itemIDs...)
 
   respChan := make(chan []models.Item, len(shardToItems))
   errChan := make(chan error, len(shardToItems))
   wg := &sync.WaitGroup{}
 
   for shardID, ids := range shardToItems {
       wg.Add(1)
       shard := s.shardMap[shardID]
       go s.asyncGetItemsByID(ctx, shard, ids, wg, respChan, errChan)
   }
 
   wg.Wait()
   close(respChan)
   close(errChan)
 
   result := make([]models.Item, 0)
   for items := range respChan {
       result = append(result, items...)
   }
 
   errs := make([]error, 0, len(errChan))
   for e := range errChan {
       errs = append(errs, e)
   }
   err := multierr.Combine(errs...)
 
   return result, err
}

Для того чтобы не терять смысл getItemsByID за нагромождением каналов и Wait-групп, мы просто обернем всё это в asyncGetItemsByID:

unc (s *Storage) asyncGetItemsByID(
   ctx context.Context,
   shard *sqlx.DB,
   itemsIDs []int64,
   wg *sync.WaitGroup,
   resp chan<- []models.Item,
   errs chan<- error,
) {
   defer wg.Done()
   items, err := s.getItemsByID(ctx, shard, itemsIDs)
   if err != nil {
       errs <- errors.Wrapf(err, "[getItemsByID] can't select from shard %d", shard)
   }
 
   resp <- items
}
Всё то же самое мы проделываем для записи данных в шарды:
func (s *Storage) AddItems(ctx context.Context, items ...models.Item) error {
   itemsByShardMap := s.itemsByShard(items...)
   errChan := make(chan error, len(itemsByShardMap))
   wg := &sync.WaitGroup{}
 
   for shardID, items := range itemsByShardMap {
       wg.Add(1)
       shard := s.shardMap[shardID]
       go s.asyncAddItems(ctx, errChan, wg, shard, items...)
   }
 
   wg.Wait()
   close(errChan)
 
   errs := make([]error, 0, len(errChan))
   for e := range errChan {
       errs = append(errs, e)
   }
 
   return multierr.Combine(errs...)
}
 
func (s *Storage) itemsByShard(items ...models.Item) map[ShardNum][]models.Item {
   itemsByShard := make(map[ShardNum][]models.Item)
 
   for _, item := range items {
       shardID := s.shardByItemID(item.ID)
       if _, ok := itemsByShard[shardID]; !ok {
           itemsByShard[shardID] = make([]models.Item, 0)
       }
 
       itemsByShard[shardID] = append(itemsByShard[shardID], item)
   }
 
   return itemsByShard
}
 
func (s *Storage) asyncAddItems(
   ctx context.Context,
   errChan chan<- error, wg *sync.WaitGroup,
   shard *sqlx.DB,
   items ...models.Item) {
   defer wg.Done()
   err := s.addItems(ctx, shard, items...)
   errChan <- errors.Wrapf(err, "[asyncAddItems] can't insert to shard")
}
 
func (s *Storage) addItems(ctx context.Context, shard *sqlx.DB, items ...models.Item) error {
   q := sq.
       Insert(itemsTable).
       Columns(itemsTableFields...).
       PlaceholderFormat(sq.Dollar)
 
   for _, item := range items {
       q = q.Values(item.ID, item.CreatedAt)
   }
 
   query, args, err := q.ToSql()
   if err != nil {
       return errors.Wrap(err, "[create query]")
   }
 
   _, err = shard.DB.ExecContext(ctx, query, args...)
   return err
}

Ну и скриптик для миграции всего этого дела:

#!/usr/bin/env bash
export MIGRATION_DIR=./migrations/
 
if [ "${STAGE}" = "production" ]; then
   if [ "$1" = "--dryrun" ]; then
       goose -dir ${MIGRATION_DIR} postgres "user=${USER1} password=${PASSWORD1} dbname=${DBNAME1} host=${HOST1} port=${PORT1} sslmode=disable" status
       goose -dir ${MIGRATION_DIR} postgres "user=${USER2} password=${PASSWORD2} dbname=${DBNAME2} host=${HOST2} port=${PORT2} sslmode=disable" status
   else
       goose -dir ${MIGRATION_DIR} postgres "user=${USER1} password=${PASSWORD1} dbname=${DBNAME1} host=${HOST1} port=${PORT1} sslmode=disable" up
       goose -dir ${MIGRATION_DIR} postgres "user=${USER2} password=${PASSWORD2} dbname=${DBNAME2} host=${HOST2} port=${PORT2} sslmode=disable" up
   fi
 
elif [ "${STAGE}" = "staging" ]; then
   if [ "$1" = "--dryrun" ]; then
       goose -dir ${MIGRATION_DIR} postgres "user=${USER1} password=${PASSWORD1} dbname=${DBNAME1} host=${HOST1} port=${PORT1} sslmode=disable" status
   else
       goose -dir ${MIGRATION_DIR} postgres "user=${USER1} password=${PASSWORD1} dbname=${DBNAME1} host=${HOST1} port=${PORT1} sslmode=disable" up
   fi
elif [ "${STAGE}" = "development" ]; then
   exit 0
fi

Очень удобно шардировать в приложении, где еще нет данных, а следовательно нет необходимости их перетаскивать. Но что делать, если мы шардим рабочее приложение? Тут, как говорится у нас на Руси, case-by-case.

  • Приложение можно остановить? Прекрасно! Останавливаем разбор очередей, отключаем обработку запросов или вовсе останавливаем приложение, делаем резервную копию базы, если ещё не сделали, перетаскиваем данные в соответствии с выбранным ключом и снова вводим приложение в эксплуатацию, предварительно проведя регрессионное тестирование.

  • Приложение должно отвечать на запросы? Тогда делаем временную надстройку внутри репозитория. Пишем данные всегда в новые шарды. Читаем сначала из нового шарда; если там нет нужных данных, то обращаемся к старым шардам. Если данные оказываются в старом шарде, то при желании можно их переносить в новый — и в конце концов данные перераспределятся между шардами. Только не забудьте добавить метрики, чтобы не пропустить этот знаменательный момент. И обязательно проверьте, все ли данные переехали или только те, к которым идут запросы. Да, и на первых порах это не то чтобы положительно скажется на отзывчивости приложения, будьте к этому готовы. 

  • Если приложение пишет данные в БД только по событиям из условной kafka, а синхронные запросы (REST/GRPC) только читающие (классическая ситуация для event sourcing), то мы отключаем чтение из kafka, выкатываем в prod инстанс версии приложения, которое уже живет с новой схемой шардов, но синхронные запросы шлем только на инстанс приложения предыдущей версии (оно же canary-deploy). Далее джоба внутри приложения последовательно читает данные по старому маппингу, и пишет в новый, после переноса можно сразу же и удалять данные в старой схеме. 

Можно просто скопировать данные в другой шард, а потом просто удалить их из источника, но на практике я такого не встречал. 

Для полноты картины разберём вариант решардинга в условиях, когда нам не хотелось бы останавливать сервис. Писать данные будем только в новый маппинг шардов, а вот читать их будем сразу из старого и нового. 

Начинается всё с создания дублирующей схемы шардов внутри Storage. Внесём изменения в константы:

const legacyBucketQuantity = 2
const bucketQuantity = 3
 
const (
   Shard1 ShardNum = iota
   Shard2
   Shard3
)

Заведём внутри Storage shardMapLegacy, который содержит дорешардинговый маппинг:

type Storage struct {
    shardMapLegacy shardMap
    shardMap   	shardMap
}

Ну и инициализация. В конструктор Storage теперь будем передавать также две схемы:

func NewStorage(ctx context.Context, dsns map[ShardNum]string, dsnsLegacy map[ShardNum]string) *Storage {
   return &Storage{
       shardMap:       initShardMap(ctx, dsns),
       shardMapLegacy: initShardMap(ctx, dsnsLegacy),
   }
}

Заводим метод для получения shardID, чтобы после переноса данных его удалить:

func (s *Storage) legacyShardByItemID(itemID int64) ShardNum {
    return ShardNum(itemID % legacyBucketQuantity)
}

Ну и ещё чуть-чуть дублирования кода. Речь о практически полной копии sortItemsIDsByShard; разница лишь в том, что для получения идентификатора шарда мы используем ранее модифицированную функцию. 

func (s *Storage) sortItemsIDsByLegacyShard(itemIDs ...int64) map[ShardNum][]int64 {
    shardToItems := make(map[ShardNum][]int64)

    for _, id := range itemIDs {
   	 shardID := s.legacyShardByItemID(id)
   	 if _, ok := shardToItems[shardID]; !ok {
   		 shardToItems[shardID] = make([]int64, 0)
   	 }

   	 shardToItems[shardID] = append(shardToItems[shardID], id)
    }

    return shardToItems
}

Метод добавления Items в изменении не нуждается, так как мы условились, что данные пишем всегда в свежие шарды, а вот GetItems надо подправить. Теперь он будет конкурентно выполнять запрос сразу в две схемы, а полученные данных мы будем склеивать, отдавая предпочтение данным с актуального маппинга шардов.

func (s *Storage) GetItems(ctx context.Context, itemIDs ...int64) ([]models.Item, error) {
   wg := &sync.WaitGroup{}
 
   resultLegacy := make([]models.Item, 0)
   resultActual := make([]models.Item, 0)
 
   var err error
 
   wg.Add(1)
   go func() {
       defer wg.Done()
 
       res, e := s.getItems(ctx, itemIDs...)
       err = multierr.Append(err, e)
       resultActual = res
   }()
 
   wg.Add(1)
   go func() {
       defer wg.Done()
 
       res, e := s.getItemsFromLegacyShardMap(ctx, itemIDs...)
       err = multierr.Append(err, e)
       resultLegacy = res
   }()
 
   wg.Wait()
   result := mergeItems(resultActual, resultLegacy)
 
   return result, err
}

Склейка результата выглядит так: 

func mergeItems(items, legacyItems []models.Item) []models.Item {
   itemsMap := make(map[models.Item]struct{})
 
   for _, item := range legacyItems {
       itemsMap[item] = struct{}{}
   }
 
   for _, item := range items {
       itemsMap[item] = struct{}{}
   }
 
   mergedItems := make([]models.Item, 0, len(itemsMap))
   for item, _ := range itemsMap {
       mergedItems = append(mergedItems, item)
   }
 
   return mergedItems
}

Опционально можно добавить метрику для отслеживания частоты запросов по старому маппингу, которая будет сигнализировать нам о том, что данные перетащились и можно отключать дублирующий легаси-флоу. 

При таком подходе остаётся только один вопрос: что делать с «мёртвыми» данными,  которые лежат не в своих шардах после решардинга? 

Вариант в лоб: предположим, что мы зарешардились с двух до четырёх шардов, идём — и на каждом легаси-шарде выполняем запрос на удаление записей, где полученный для ID ключ шардирования не соответствует текущему шарду:

DELETE FROM items
WHERE ctid IN (
   SELECT ctid
   FROM items
   WHERE id % 4 NOT IN (2, 2-4)
   );

З.Ы. Вариант SQL-запроса предполагает, что мы храним гошный UInt64 в постгревом BigInt. В этом случае положительные гошные числа могут превратиться в отрицательные постгревые, поэтому делаем NOT IN для ренджа.

Иногда встречаются системы, где данные имеют свойства ‎‎‎‎«протухать». В таких системах самое логичное оставить данные после решардинга, и дождаться пока они «протухнут».

И пара слов об упячках, с которыми я сталкивался, — о партиционировании внутри одной БД и шардинге целыми партициями. Поначалу кажется, что это логично и даже элегантно. Ведь для решардинга достаточно просто перетащить целую партицию с одного шарда в другой. И это ФАТАЛЬНАЯ ОШИБКА. Со временем вы устанете от трёхэтажного мата негодования, вызванного пятиэтажными пакетными запросами, из-за которых горячие данные не будут нормально попадать в кеш. Такой способ работает лишь в том случае, если партиционирование выполняется по дате, но запросы, как правило, обращаются к свежим или старым данным, как, например, во многих OLAP-системах. В остальных случаях перспективнее держать данные в рамках одной партиции, а решардить их путём постепенного переноса, если, конечно, БД не предусматривает своих вариантов решения проблемы решардинга.

Вместо вывода

Рано или поздно вам придётся заняться решардингом. А потом ещё раз, и ещё, и ещё, особенно, если бизнес будет расти и данных будет становиться всё больше. Поэтому приберегите инструменты, которые вам помогли однажды, и ничего страшного, если запускать вы их будете раз в полгода. 

Клиентское шардирование — надо. 

Ключ шардирования выбираем с умом, предварительно медитируем над метриками, чтобы чётко видеть картину того, как данные пишутся, запрашиваются и хранятся.

Не так страшно шардирование, как решардинг. Важно заранее подумать о том, как вы будете решать вопросы консистентности при решардинге.

Tags:
Hubs:
+25
Comments 19
Comments Comments 19

Articles

Information

Website
ozon.tech
Registered
Founded
Employees
1,001–5,000 employees
Location
Россия