Привет, Хабр! Сегодня снова с вами AliExpress Order Management System. В прошлый раз мы рассказывали о внутреннем устройстве нашего решения по in-app шардированию баз. Сегодня же поговорим о том, как мы увеличивали количество шардов без длительного даунтайма. Спойлер: в конце - самое интересное ;)
Глоссарий
Шардинг – разделение единого набора данных на разные фрагменты для повышения надежности и производительности.
Шардированная база данных – база данных, данные в которой хранятся на множестве физических серверов.
Ключ шардирования – это одно или несколько свойств, от которых зависит логический адрес данных. Ключ шардирования подаётся на вход хеш-функции, которая на выходе выдаст значение бакета. В нашем случае ключом шардирования является идентификатор пользователя.
Бакет– другое название для логического адреса данных. В нашей системе фиксированное значение бакетов — 65536.
Исходные данные
Для начала стоит сказать несколько слов о нашей системе – система управления заказами является связующим звеном между множеством доменов AliExpress, хранит информацию о миллиардах заказов. Данные хранятся в шардированной базе PostgreSQL, 64 шарда с данными по 500 ГБ, заполненность на 60% на момент начала. Плюс шард с конфигурацией кластера (config) и шард с нешардированными данными (solid).
Хьюстон, у нас проблема!
Причина необходимости расширения - в локальной базе хранилась история по заказам, начиная с конца 2020 года, а у пользователей история заказов начинается с 2009 года. Чтобы у пользователей появилась полная история заказов и у других доменов появился унифицированный доступ к ним, мы решили загрузить исторические заказы в локальную систему. Несмотря на 60% заполненность баз, мы не могли загрузить исторические заказы в систему, потому что их объем превышал имеющиеся лимиты.
История по заказам превышала текущий набор данных в полтора раза и на шардах не было столько свободного места.
Что делать?
1) Загрузить исторические заказы в текущие шарды. Это невозможно, поскольку на одну базу установлен лимит в 500 ГБ от инфры, связанный с возможностью оперативно решать проблемы с таким объемом данных.
2) Положить данные в новые шарды по соседству. Тоже нельзя, поскольку это поломает схему шардирования.
3) Добавить новые шарды и решардировать базу сервиса, после чего залить историю. Мы остановились на данном варианте.
Есть идея: решардинг!
Решардинг - это перераспределение данных между шардами, необходимое для сохранения консистентной схемы шардирования из-за изменения конфигурации шардов. В нашем случае решардинг связан с добавлением новых шардов. В результате часть данных с существующих шаров будет перенесена на новые шарды.
Требования к решардингу
Основные требования, которые мы ставили себе перед началом процесса решардинга:
Сохранение консистентности данных. Мы не могли потерять часть заказов пользователей или информацию об их изменении;
Сохранение работоспособности приложения. Сервис с заказами пользователей является ядром системы, от его работоспособности зависит работоспособность множества других сервисов, поэтому важно было выполнить процесс решардинга незаметно для других сервисов;
Минимальный даунтайм. Процесс решардинга должен быть быстрым. Чем длиннее даунтайм, тем дольше пользователи не делают покупки, а следовательно компания потеряет деньги;
Возможность отката к начальной конфигурации кластера. Так как это первый опыт решардинга, реализация должна поддерживать аварийный откат к начальной конфигурации в случае возникновения проблем.
Что в итоге?
Опираясь на эти требования, мы рассматривали ряд возможных решений.
Первый, и самый очевидный вариант – логическая репликация БД силами команды DBA. Каждая реплика создается как полная копия старого шарда. Обновляется конфигурация кластера и зачищаются данные, которые не принадлежат шардам, с учётом новой конфигурации. Но этот вариант не оптимален. При копировании данных из старого шарда в новый будет длительный даунтайм, и кластер нельзя будет вернуть к начальной конфигурации, потому что новые данные уже будут записываться на новые шарды.
Другой вариант, это программный перенос данных на новые шарды, запись на шард с учётом новой конфигурации и двойное чтение — для случаев, когда часть данных ещё не перенесена на новые шарды. Но при такой реализации ломается механизм батчевых запросов с лимитом (когда часть данных ещё не переехала на новый шард). В этом случае появляются проблемы с записью на новый шард, когда данные по ключу шардирования ещё не перенесены на новый шард и нельзя откатиться к предыдущей конфигурации.
В итоге было принято решение использовать программный перенос данных на новые шарды, двойную запись данных на старые и новые шарды, которая будет работать до момента полного переключения конфигурации на новые шарды, и audit log для поддержания консистентности данных.
Конфигурация для каждого дата шарда содержит id – уникальный идентификатор шарда, массив бакетов, которые к нему привязаны, и строку подключения.
Конфигурация дата шардов
Для целей решардинга были добавлены поля:
AllowedDataShardOperations, отображающее разрешённые операции на шарде. Это поле имеет следующий диапазон значений:
None – операции на шарде не выполняются,
Read – разрешены операции только на чтение,
Write – операции записи;
ReadWrite – разрешены все операции.
PrimaryDataShardId, которое связывает производные конфигурации для массива бакетов переносимых с одного старого шарда на новый.
Перед началом процесса решардинга у всех существующих шардов были разрешены операции на чтение и запись. Для каждого существующего шарда был добавлен новый шард, на который переносилась часть данных привязанных к первой половине диапазона бакетов, для новых шардов была разрешена только операция на запись.
Резолв подключения к реплицированной базе
Для двойной записи мы переопределили класс DbConnection, чтобы поддержать подключения к праймари и реплика базам.
Под праймари базой понимаем базу с мастер данными системы в текущий момент, под реплика базой - базу с копией данных с праймари шарда. Перед началом процесса решардинга все старые базы будут выступать в роли праймари, т. к. в новые базы еще не перенесены данные. По окончании процесса переключения уже новые базы будут выступать в роли праймари.
public class ReplicatedDbConnection<TDbConnection> : IDisposable, IAsyncDisposable
where TDbConnection : DbConnection
{
private readonly bool _isExecuteAllowed; // отображает возможность выполнения write запросов на текущем подключении
public TDbConnection PrimaryConnection { get; } // подключение к праймаре базе, публичное, для возможности выполнения read операций
internal TDbConnection ReplicaConnection { get; } // подключение к реплика базе
public bool IsReplicationEnabled => ReplicaConnection != null; // отображает наличие реплики
internal async Task<int> ExecuteAsync(
string sql,
ReplicatedDbTransaction transaction,
object param = null,
int? commandTimeout = null,
CommandType? commandType = null,
CancellationToken token = default) // метод для выполнения запросов
internal async Task<IEnumerable<T>> ExecuteWithReturningAsync<T>(
ReplicatedDbTransaction transaction,
string sql,
object param = null,
int? commandTimeout = null,
CommandType? commandType = null,
CancellationToken token = default) // метод для выполнения запросов с возвращаемыми данными
}
Каждый инстанс сервиса хранит в кеше конфигурацию шардов в виде InteralTree, в роли интервалов выступает массив бакетов.
При выполнении запроса на шардированной БД в параметрах передается ключ шардирования, этот ключ кешируется и определяется номер бакета, к которому привязаны данные. По бакету из кеша получаем конфигурацию. Для бакета доступно максимум 2 конфигурации – одна для праймари, а вторая для реплика шарда.
В праймари подключении будет указан шард с конфигурацией либо с разрешенными операциями ReadWrite, либо с разрешенной операцией Read (в readonly режиме конфигурация шарда будет, когда бакеты будут переноситься с праймари на реплику).
В реплика подключении будет или шард в состоянии Write, или в состоянии None (если праймари шард будет в состоянии Read). Также реплики может не быть совсем, если подключение не реплицированное (в случае когда бакет не будет переноситься на новый шард).
Операции на реплицированной базе
Чтение данных производится с праймари базы, для записи логика будет посложнее.
Операция записи выполняется и на праймари, и на реплике шардах. Для поддержания консистентности данных между праймари и репликой все операции записи выполняются в реплицированной транзакции и логируются с помощью механизма audit log для поддержания консистентности данных между двумя базами.
Для каждой операции записи передается контекст транзакции и массив значений для audit log, привязанных к данным, изменяемым в текущем запросе.
protected virtual async Task<int> ExecuteAsync(
ReplicatedTransactionContextBase<TDbConnection> transactionContext,
AuditLogValue[] auditLogValues,
string sql,
object param = null,
int? commandTimeout = null,
CommandType? commandType = null,
CancellationToken token = default)
{
var res = await transactionContext.ReplicatedDbConnection.ExecuteAsync(
sql,
transactionContext.ReplicatedDbTransaction,
param,
commandTimeout,
commandType,
token);
if (transactionContext.ReplicatedDbConnection.IsReplicationEnabled)
{
transactionContext.AddAuditLogValues(auditLogValues);
}
return res;
}
Значение AuditLogValue используется для логирования изменяемых данных в текущей транзакции. Класс содержит:
EntityId – идентификатор записи, которую записывают или изменяют в операции,
ShardKeyValue – ключ шардирования для записи,
IsReplicationFailed – флаг, который отображает – удалось записать данные в реплику или нет,
LeaseCount – счётчик, отображающий сколько операций вносят изменения в данные, привязанные к EntityId,
UpdatedAt – последнее обновление информации о процессах, которые вносят изменения для текущего EntityId,
ContextJson – JSON для дополнительной информации.
Реплицированный контекст используется в одной транзакции и выглядит следующим образом:
public abstract class ReplicatedTransactionContextBase<TDbConnection> : IDisposable
where TDbConnection : DbConnection
{
protected internal HashSet<AuditLogValue> AuditLogValues { get; } = new(); - сет со значениями для audit log, которые менялись в текущей транзакции
public ReplicatedDbTransaction ReplicatedDbTransaction { get; init; }
public ReplicatedDbConnection<TDbConnection> ReplicatedDbConnection { get; init; }
protected abstract Task BeforeCommitTransaction(); // метод, который выполняется перед коммитом транзакции
protected abstract Task OnSuccessCommitTransaction(); // метод, который выполняется после успешного коммита транзакции
protected abstract Task OnFailCommitTransaction(); // метод, который выполняется при неудачном коммите транзакции
// метод коммитит транзакцию и диспозит подключение к БД
public async Task CommitTransactionAsync()
{
if (ReplicatedDbConnection.IsReplicationEnabled)
{
await BeforeCommitTransaction();
}
try
{
await ReplicatedDbTransaction.CommitAsync();
}
catch (Exception)
{
if (ReplicatedDbConnection.IsReplicationEnabled)
{
await OnFailCommitTransaction();
}
return;
}
if (ReplicatedDbConnection.IsReplicationEnabled)
{
await OnSuccessCommitTransaction();
}
}
Класс абстрактный, чтобы была возможность переопределить логику AuditLog. При решардинге использовали реализацию ReplicatedTransactionContextWithDbAuditLog, которая записывает значение для AuditLog в праймари базу в той же транзакции.
При записи в реплицированную базу сначала происходит запись в праймари базу, затем в реплику. Если в реплику данные записать не удалось, флагу IsReplicationFailed из AuditLogValue присваивается значение true. После записи в базы добавляем значения AuditLogValues, переданные на вход метода в хешсет контекста.
Обработка значений для аудит лога происходит в коммите реплицированной транзакции. Логика абстрактных методов описана для реализации ReplicatedTransactionContextWithDbAuditLog.
Если транзакция не реплицированная (при резолве коннекта нет реплики для ключей шардирования), коммитится праймари транзакция и на этом выполнение метода завершается.
В случае с реплицированной транзакцией:
Вызываем метод BeforeCommitTransaction, в котором значения AuditLogValues записываются в праймари базу в той же транзакции.
Коммитим праймари транзакцию, если коммит выполнить не удалось, то вызывается метод OnFailCommitTransaction, в котором чистим хешсет AuditLogValues.
В случае успешного коммита праймари транзакции пробуем закоммитить транзакцию в реплику. Если не удалось, проставляем флагу IsReplicaFail в классе транзакции значение true.
Вызываем метод OnSuccessCommitTransaction, в котором будут обновляться AuditLogValues в праймари бд.
Метод OnSuccessCommitTransaction выполняется в новой транзакции с блокировкой for update.
Если флаг IsReplicationFailed = true, то значения AuditLogValues остаются в базе, чтобы далее повторно перенести данные, привязанные к EntityId из праймари в реплику. По этому декрементим значение LeaseCount для этих AuditLogValues, чтобы сервис, занимающийся синхронизацией данных, видел, что эти значения не заблокированы операциями записи.
Если коммит в реплику успешен (IsReplicationFailed = false), читаем значения AuditLogValues для текущей транзакции из праймари базы и делим на две группы. В первую группу попадают значения, закоммиченные в реплике и не заблокированные другими операциями (LeaseCount = 1), во вторую группу – значения, которые не удалось закоммитить в реплику или значения, заблокированные другими операциями на запись. AuditLogValues из первой группы удаляем из БД, а для значений из второй группы декрементим LeaseCount.
Перенос данных
Для переноса данных с исходных шардов создан инструмент, состоящий из двух частей:
Сервиса, который хранит в себе список задач на перенос данных между шардами, информацию для доступа к шардам напрямую, список таблиц кластера для переноса,
Демона, который опрашивает сервис, получает задачи, резервирует задачи под себя и вызывает методы сервиса для переноса данных в автоматическом режиме.
Чтобы оперировать бакетами при переносе данных, нужно получить связи бакетов с заказами. На solid шарде уже была таблица со связями идентификаторов пользователей и заказов, эту таблицу дополнили полем bucket.
Все операции идут через чистый SQL, поэтому для корректной обработки вставок в целевой шард необходимо дополнительно собрать служебную информацию с таблиц в базе.
Следующий sql запрос собирает все первичные ключи и относящиеся к ним колонки. Это нужно для составления запросов на вставку с ON CONFLICT ON CONSTRAINT при переносе данных для случаев, когда на целевом шарде уже есть данные по заказу благодаря двойной записи.
select tc.constraint_name, tc.table_name, ccu.column_name
from information_schema.table_constraints tc
join information_schema.constraint_column_usage ccu on tc.table_name = ccu.table_name
where tc.table_schema = 'public'
and tc.table_name = ANY (@Tables)
and tc.constraint_type = 'PRIMARY KEY'
Значения первичных ключей запрашиваются один раз при регистрации кластера с шардами и сохраняются в сервисе.
Алгоритм переноса состоит из трех шагов:
Сбор идентификаторов и ключей шардирования сущностей для переноса
Перенос
Проверка после переноса
Первым делом собираем данные с таблицы со связями заказов и бакетов по переносимому бакету. Это нужно для детального мониторинга процесса переноса – так проще отследить, какие заказы успешно перенеслись, а при переносе каких возникли проблемы.
Во втором шаге сначала необходимо получить информацию о типах колонок из сервисных таблиц в базе, выполнив следующий sql для каждой переносимой таблицы:
select column_name, data_type FROM information_schema.columns WHERE table_name = @Table
Информация о типах колонок нужна для приведения типов при вставке данных на новый шард. В отличии от первичных ключей данные о колонках в таблице кешируются только на одну итерацию алгоритма. Так сделано, чтобы автоматически подхватывать изменения схемы БД в случае раскатывания миграций во время переноса.
По списку заказов из бакета переносим данные из указанных таблиц с источника на целевой шард. Весь набор данных по одному заказу переносится в одной транзакции.
Для проверки успеха переноса сверяем количество записей в таблицах по каждому заказу между источником и целевым шардом.
Обработка ошибок
При проблемах на этапе сбора данных перезапускаем сбор.
При ошибках переноса или проверки сущности повторяем перенос и проверку, обрабатывая только не прошедшие проверку и не перенесенные заказы. Основная часть перезапусков - программная, в алгоритм была заложена финальная проверка после переноса всего бакета, которая разово автоматически перезапускала процесс при наличии проблем. Если же одного перезапуска не хватало, то система оставляла такой бакет для ручного рестарта.
Балансировка
Для балансировки нагрузки на шарды было предусмотрено лимитирование количества параллельно обрабатываемых задач на каждый шард на уровне получения задач демоном от сервиса.
Audit Log
Обработка лога ведется аналогично переносу данных, только источником ключей для переноса является таблица audit_log на конкретном шарде.
После того как все задачи переноса данных по шарду завершены и audit log мониторится демоном, можно приступать к переключению бакетов.
Переключение бакетов
Перед началом переключения бакетов с праймари базы на реплику надо было решить ряд вопросов.
Во-первых, для ряда сущностей id в БД выделялся из последовательности при вставке в базу. Из-за этого при двойной записи на праймари и реплике id для этих объектов не будут совпадать. Для решения этой проблемы стали получать id из последовательности в сервисе и передавать его на вставку в БД. Кроме того, сдвинули текущее значение в последовательностях на реплике, чтобы не пересечься с уже существующими значениями на праймари БД. Для этого в последовательностях на реплике увеличили текущее значение на порядок.
Во-вторых, у нас много инстансов одного сервиса, и у каждого инстанса свой кеш подключений к базам. Централизованно сбросить кеш можно было только путем редеплоя сервиса, но процесс редеплоя длится до 5 минут.
Поэтому для централизованной инвалидации кеша воспользовались ETCD Watch API.
Watch API предоставляет интерфейс для асинхронного мониторинга изменений ключей. Добавили бекграунд сервис, который позволял отслеживать изменения ключа в ETCD, при изменении ключа конфигурация подключения инвалидировалась, и инстанс сервиса логировал это событие в солид базу.
В платформе данный функционал реализован с помощью бекграунд сервиса. В каждом инстансе сервиса при изменении ключа etcd Watch API вызывает метод, который сбрасывает кеш конфигурации шардов и пишет в БД сервис, на котором сбросили конфигурацию.
public class InvalidateDbCacheEtcdWatchService : EtcdWatchBackgroundServiceBase<InvalidateDbCacheEtcdWatchConfig>
{
...
protected override void WatchAction(WatchEvent[] watchEvents)
{
using var scope = _serviceProvider.CreateScope();
var podsWithUpdatedCacheService = scope.ServiceProvider.GetRequiredService<IPodsWithUpdatedCacheService>();
foreach(var watchEvent in watchEvents)
{
_shardConfigurationCache.Invalidate();
var podName = Environment.GetEnvironmentVariable("HOSTNAME");
podsWithUpdatedCacheService.Insert(podName, CancellationToken.None).GetAwaiter().GetResult();
}
}
}
После того как все данные перенесены из праймари в реплика базу, можем начинать процесс переключения.
Переключение производится для каждой связки праймари реплика независимо от остальных баз, также можно переключать любое количество бакетов за один шаг, начиная от 1 до всех бакетов, которые должны быть перенесены на реплику.
Перед началом переключения у всех бакетов, привязанных к праймари базе, разрешены операции ReadWrite, у бакетов реплики - Write (состояние 1).
Первым шагом часть бакетов (например, 10) переводятся в readonly режим. По id конфигурации праймари шарда получаем конфигурации, которые привязаны к нему (или Id или PrimaryDataShardId равны текущему id праймари шарда). Далее валидируем состояние кластера, проверяем что нет бакетов в readonly состоянии и ещё остались непереключенные бакеты. По конфигурации реплики рассчитываем количество бакетов, которое будем переключать, на случай если непереключённых бакетов меньше, чем запросили на переключение.
Конфигурации для праймари в ReadWrite и реплики в Write разделяем на 2 конфигурации. У праймари останется конфигурация в ReadWrite для 10-1023 бакетов и добавится конфигурация в Read для 0-9 бакетов. У реплики останется конфигурация Write для 10-511 бакетов и добавится конфигурация в None для 0-9 бакетов (состояние 2).
В этом состоянии данные, которые будут принадлежать бакетам с 10 по 1023, будут обрабатываться без изменений, но данные, которые принадлежат бакетам 0-9, будут доступны только на чтение с праймари.
Далее сбрасываем кеш конфигураций на всех инстансах сервиса, чтобы они получили обновленную конфигурацию и обрабатываем значения AuditlogValues для праймари базы, которую переключаем.
На момент обработки AuditlogValues их обычно нет, но пару раз наблюдали, что в момент переключения конфигурации было пару записей (меньше 10). Это связано с тем, что в основном неудачная запись в реплику происходит или в момент редеплоя сервиса или в момент переключения конфигурации, кроме того постоянно работает процесс синхронизации данных, которые не удалось записать на реплику.
Следующим шагом переводим бакеты из readonly режима на ReadWrite с реплики. Перед началом переключения валидируется наличие бакетов в readonly режиме и затем переключаем на ReadWrite с реплики. В результате этого шага на праймари 0-9 бакеты будут в Write, 10-1023 - ReadWrite, на реплике 0-9 - ReadWrite6, 10-511 - Write (состояние 3). При такй конфигурации для бакетов 0-9 уже реплика будет выступать в роли мастер данных.
Опять сбрасываем кеш конфигураций.
Для оставшихся бакетов алгоритм будет аналогичный, за исключением того, что в readonly режиме для праймари и реплики будет по 3 конфигурации (состояние 4).
Один цикл алгоритма проходит за пару секунд и не зависит от количества бакетов, которое было указано для переключения. Также на любом этапе переключения, есть возможность откатиться к начальной конфигурации.
В результате решардинга мы разделили 64 шарда на 128 и суммарно перенесли 9.5 Тб данных между шардами без даунтаймов и потери работоспособности системы для покупателей и внутренних сервисов.. Новая конфигурация шардированной базы позволила безболезненно перенести 28 Тб данных (по 225 Гб на каждый шард) из китайской инфраструктуры в локальные базы, тем самым предоставить покупателям полную историю по их заказам.