Когда мы работаем с реплицированными системами, вопрос стратегии переключения между репликами, а тем более ее реализация — это довольно значительная головная боль. Если вашей системе необходимо работать с отказами штатно, то наш опыт может подсказать пару новых идей, как можно сделать отказы контролируемыми.
Я Владислав Доронин — Go-разработчик в команде S3 облачной платформы Cloud.ru Evolution. Хочу рассказать про подход к управлению отказами реплик, который мы кристаллизовали опытом выхода из строя разных частей системы. Практика показала, что массовые и не очень отказы приводят к взлету задержки ответов и увеличению количества client-side повторов, которые тоже висят. Пускай на уровне записи из-за требований репликации и гарантии мы много поделать с ситуацией не можем (хотя и там не все безнадежно), то вот чтение гораздо более гибкое. У нас получилось сделать retry на чтении красивыми, об этом сегодня и поговорим.

Необходимо немного рассказать про то, как устроено наше хранилище. Прежде всего, наш S3 самописный, от ceph только тесты на совместимость. Хранилище — это набор связанных сервисов, представляющих собой модульный монолит. В качестве бэкенда хранения мы используем SDS на магнитных дисках. Про архитектуру нашего S3 более подробно можно почитать в статьях Как мы сделали собственное S3-совместимое объектное хранилище и Как мы сделали собственный Software-Defined Storage для публичного облака Cloud.ru Evolution. В данный момент нас интересует только часть, ответственная за транспортировку данных.
Во всех storage class, кроме SingleAZ, сохраняемые данные реплицируются. Как много реплик необходимо, определяется настройками storage class. Данные записываются кусками максимум по 64 МБ, каждый такой кусок или chunk записывается на свой набор реплик.
Наборы реплик мы называем VolumeSet, которые имеют следующую структуру:
type VolumeSet struct {
ID VolumeSetID
Writable bool // достаточно ли на всех репликах места
Replicas []VolumeSetReplica
StorageClass StorageClassName
}
type VolumeSetReplica struct {
Cluster SDSClusterID
Volume SDSVolumeID // идентификатор volume в конкретном кластере
PrimaryOS OSAddr // OS - прокси-сервис для работы с volume
State VolumeSetReplicaState
}Эти наборы создаются в фоне по мере необходимости для каждого storage class. Реплики создаются обязательно в разных DC, чтобы исключить ситуацию с отказом одного DC и следующей за этим полной недоступностью данных. При записи chunk мы выбираем нужный VolumeSet и пытаемся записать данные на все реплики. Если хотя бы одна запись оказалась неуспешной, вся операция прерывается и мы пробуем другой volset. Поэтому в случае, когда мы читаем chunk и какая-то реплика оказалась проблемной, мы можем рассчитывать на то, что необходимые данные есть в полном объеме на других репликах.
А теперь поговорим о проблемах, с которыми мы сталкивались.
Retry-штормы
Когда приходит запрос на чтение, мы должны последовательно вытащить все chunk объекта и записать их в клиентское соединение. Но вот что-то пошло не так, и реплика нам не отвечает... Изначально мы исходили из предположения, что это проблемы с конкретной нодой OS (прокси-сервис до SDS-volume), поэтому прежде чем переключать реплику, мы пробовали так называемые failover. Мы ходили к SDS-кластеру реплики, но используя другие прокси, и так несколько раз, и только потом уже другая реплика.
Как это выглядело:

На бумаге такая логика казалась оправданной, ведь отказ ноды более вероятен, чем отказ кластера, правда? Но практика показала, что нет. Если есть проблема, она чаще всего системная, поэтому пробовать «соседей» не очень выгодно, ведь каждая попытка — это время, которое добавляется к latency запроса.
Допустим, у volset 3 реплики. В таком случае мы не гарантируем, что будет задействовано 3 DC, гарантируется только 2 отдельных, так что 2 реплики могут быть в одном DC. Если мы потеряем с ним связь, такой подход может привести к дереву retry, где мы сначала попробовали все failover в одном DC, потом во втором и только после этого пошли в живой — это слишком долго.
Чтобы снизить последствия недоступности нод-хранилища, мы опрашиваем все серверы на предмет их работоспособности. Но опрос периодический, а это приводит к отставанию актуального статуса в част��х системы, которые в этой информации заинтересованы, поэтому на короткие промежутки времени такие отказы имеют разрушительное влияние. Запросы висят и едят память обрабатывающих их процессов, это снижает пропускную способность, новые запросы тормозят — все копится как снежный ком. Нужно было как-то это исправлять.
Нам помог пессимистичный подход к вопросу: мы решили, что если не смогли достучаться до ноды, то что-то мутное происходит во всем DC, поэтому сразу пробуем следующий и только после второй попытки начинаем пробовать failover — если отказали DC и 1 нода в другом DC, то мы все еще можем достать данные. Если померли два DC, то тут уже пиши пропало. Помимо этого мы решили, что отставание статуса связности с сервисами — это тоже беда, так что мы решили собрать кеш состояний в памяти, который собирается как обратная связь от попыток работы с ними.
Это решает две проблемы:
сильно нивелирует побочные эффекты от отставания, так как нам не нужно ждать новый тик обновления, кеш обновляется сразу, как был получен ответ от сервера: обрыв соединения, успешный ответ, отмена соединения по таймауту и тд.,
новые запросы могут работать с обратной связью от уже отработавших, позволяя в реальном времени менять приоритет походов к тем или иным дискам.
Получившийся у нас кеш имеет следующую структуру:
type Tracker struct {
cfg Config
clock clockwork.Clock
failoverOSes *containers.ExpirableLRU[failoverKey, tp.OSAddr]
mu struct {
sync.RWMutex
unavailableByErrors map[tp.OSAddr]errorCounter // обратная связь о попытках доступа к данным
// immutable: словари ниже заменяются полностью, никогда не обновляются, соде��жат обработанные результаты пингов статуса
dcByAddr map[tp.OSAddr]tp.DC
availableByPingPerSDS map[tp.SDSClusterID][]tp.OSAddr
unavailableByPing map[tp.OSAddr]struct{}
}
}
func (t *Tracker) OnSuccess(addr tp.OSAddr){...}
func (t *Tracker) OnError(addr tp.OSAddr) {...}
func (t *Tracker) IsAvailable(addr tp.OSAddr) bool {...}
func (t *Tracker) GetFailoverOS(cid tp.SDSClusterID, vid tp.SDSVolumeID) (tp.OSAddr, bool) {...}
...Этот кеш позволяет менять приоритеты реплик, пропускать их, если у них слишком много ошибок, или банить целый DC в случае обнаружения сломанной сетевой связности. Поскольку мы хотели внедрить пессимистичную логику переключения между репликами и применить к ней обратную связь (все-таки мы работаем в условиях множества запросов и результат их работы может помочь обработать следующие запросы лучше), а также попробовать итераторы, мы выделили всю логику работы с retry на чтении (и записи, но об этом в другой раз) в отдельный модуль ostracker, из которого нам интересен только один метод:
func GetReadReplicas(vs tp.VolumeSet, tracker *Tracker) (ReadReplicaIterator, GetRequestDoneCallback) {
onRequestDone := func(rr ReadReplica, err error) {
switch osErrorToResultType(err) {
case success:
tracker.OnSuccess(rr.OS)
...
case unavailable:
tracker.OnError(rr.OS)
case failure:
}
}
unfiltered := getUnfilteredReadReplicas(vs, tracker)
return filterUnavailableReadReplicas(unfiltered, tracker), onRequestDone
}И все для того, чтобы получить доступ к логике переключения реплик (которая все еще включает построение и использование списка failover) с помощью простого for replica := range replicas. Причем итераторы тут позволили скрывать логику послойно, добавляя разные wrapper к оригинальному, где-то фильтруя, где-то обрабатывая переключения, но сохраняя при этом простоту использования. Выделение логики переключения реплик позволило нам неинвазивно работать над стратегией переключения.
Большой размер chunk
Как я уже говорил, данные хранятся в нарезанном виде чанками по 64 MB (для объектов меньше chunk всего один). Проблема становится видна, когда объект, который пытаются прочитать, довольно большой. Возьмем для примера happy-сценарий: chunk валяется на магнитном диске с пропускной способностью в районе 100 Мбайт/с, над ним висит прокся в виде SDS, которая и управляет диском, а за свою работу требует плату в виде пропускной способности, которая падает до 50 Мбайт/с в лучшем случае. Сетевая шина широкая и в happy-сценарии не нагружена, так что первый chunk объекта мы прочитаем за 1,28 c. Читаем мы с os с помощью grpc-вызова, так что пока он не закончится, написать в клиентское соединение мы ничего не можем, мы заблокированы.
В случае, если у объекта больше, чем один chunk (все-таки через PutObject можно положить 5 ГБ, а через MultipartUpload аж 5 TБ), то получение каждого чанка будет виснуть, это не есть хорошо, поэтому мы сделали prefetch. Первый чанк все еще заставлял запрос висеть без данных, но вот с последующими уже было проще, так как они подгружались в память параллельно первому, и так вплоть до конца файла у нас всегда подгружено N (конфигурируется) дополнительных чанков, чтобы запрос не залипал периодически. Читать весь объект с дисков в память не очень целесообразно, потому что соединение с клиентом обычно не очень широкое, и мы можем просто себе всю память забить объектом, который неясно, когда будут готовы прочесть, так что только небольшой N для сглаживания задержки.
Как можно понять, в случае вала запросов на чтение, память мы будем грести лопатой, все-таки N чанков по 64 MБ, так еще и множество параллельных запросов. Прибавим к этому еще и тот факт, что данные на дисках могут быть преобразованы перед сохранением (например, скомпрессированы для холодных storage class или зашифрованы по пользовательскому запросу) и получим, что декодирование потребует дополнительной памяти к уже немалому потреблению. Сами по себе аллокации — это недешево, но в Go есть еще и дополнительная стоимость в виде запусков GC, которые останавливают все запросы на ноде, пока прибираются. Поэтому мы решили пулировать память на чтении чанков, это слегка скрасило ситуацию, поскольку позволило переиспользовать память и снизить количество освобождаемой GC памяти.
Но изначальную проблему это все еще не решает: grpc-вызов блокируется на очень длительное время. Кроме того, при большой нагрузке пропускная способность дисков падает из-за большого количество случайных чтений. Ждать в лучшем случае 1,28 секунды —слишком дорого. Снижать размер чанка мы не хотим, так как это создаст ненужное дополнительное IO в другой части системы, но хотим читать за раз меньше. Понятно, что мы не оставили это без тайм-аута, но и он не может быть равным лучшему времени, а скорее в пару-тройку раз более щадящим, чтобы не отваливать запросы при малейших подозрениях на проблемы. Кроме того, для разных размеров читаемых данных тайм-ауты должны быть разными, мы не хотим ждать ошибки для одного байта столько же, сколько ждем для десятков мегабайт. Поэтому очевидным решением стало заменить grpc-вызов созданием grpc-стрима.
Выглядит это примерно так:
func (sr *StreamReader) Read(ctx context.Context, p []byte) (int, error) {
...
if err := sr.receiveChunk(); err != nil {
return read, err
}
...
n := copy(p[read:], sr.receivedChunk.EncodedData[sr.chunkRead:])
read += n
...
return read, nil
}
func (sr *StreamReader) receiveChunk() error {
...
resp, err := sr.r.Recv()
if sr.receivedChunk == nil {
sr.receivedChunk = resp
}
// some err handling
}Как можно увидеть, появился тип StreamReader, который напоминает по сигнатуре интерфейс io.Reader, только поддерживающий контекст. Мы написали таких оберток, поскольку отмена контекста очень важна для нас и терять его нельзя.
Что нам дает такая замена? Во-первых, Read за раз вытаскивает максимум 512 КБ (и не ходит никуда, пока их полностью не прочитают), что происходит значительно быстрее, чем все 64 МБ. Во-вторых, мы не заблокированы на протяжении всего чтения, а только на кусках по 512 КБ, где в промежутках можем оценивать, с какой скоростью получается читать. Да и ошибка чтения перестает быть такой проблемой. Все-таки если читать разом все 64 МБ и потерять последние байты в случае резкого обрыва соединения, то выкинуть придется все 64 МБ и начинать заново, поскольку нашей логике буффер grpc-библиотеки недоступен, мы не можем взять оттуда полезное нам. Теперь же мы не только увидим обрыв раньше (ведь тайм-аут тоже удается снизить), но и прочитанные данные сохраним и в новой реплике сможем читать с offset и меньшее количество данных.
Чтобы воспользоваться всеми этими возможностями, нужна была отдельная логика, которая и offset посчитает, и медленное чтение обнаружит, и переключится правильно. Поэтому мы придумали себе мультиплексор.
Задача перед ним стоит простая: нужно предоставить Reader, который будет отдавать байты объекта как единый поток, но внутри переключаться между репликами, если с репликой что-то случилось или если чтение слишком медленное. Также при переключении мультиплексор должен брать во внимание то количество байт, которые уже были прочитаны, и стартовать чтение со следующих реплик уже со смещением. Тут вырисовываются две сущности:
merge-буффер, который принимает данные с разных реплик (с разным offset) и клеит их в единый поток;
логика переключения, которая срабатывает при возникновении ошибки или если чтение слишком медленное. Мы решили, что переключение на медленном чтении не должно останавливать чтение с медленной реплики, просто будет дополнительный поток на случай, если со следующими репликами тоже что-то будет не так.
Как работает merge-буффер:
type mergeBuffer struct {
mu struct {
sync.Mutex
Buf *containers.RingBuffer
Merger *merger // содержит логику подсчета количества записанных байт
WakeupWritersCh chan struct{}
Err error
}
wakeupReaderCh chan struct{}
}
func (mb *mergeBuffer) Write(ctx context.Context, dataOffset int, data []byte) (int, error) {
for {
...
xsync.WithLocked(&mb.mu, func() {
...
bufLen := mb.mu.Buf.Len()
n = mb.mu.Merger.Merge(dataOffset, data)
if mb.mu.Buf.Len() != bufLen {
mb.wakeupReader() // будим заблокированных читателей, если появились новые данные
}
wakeupWritersCh = mb.mu.WakeupWritersCh
})
...
select {
case <-ctx.Done():
return merged, ctx.Err()
case <-wakeupWritersCh: // блокируем запись до тех пор, пока из буффера не прочитают данные
}
}
}
func (mb *mergeBuffer) Read(ctx context.Context, p []byte) (int, error) {
for {
...
xsync.WithLocked(&mb.mu, func() {
n, _ = mb.mu.Buf.Read(p)
if n == 0 {
err = mb.mu.Err
return
}
mb.wakeupWriters() // будим писателей, если смогли прочитать ненулевое количество байт
})
...
select {
case <-ctx.Done():
return 0, ctx.Err()
case <-mb.wakeupReaderCh:
}
}
}
... дальше логика merger с подсчетом количества новых байтЗдесь важно обратить внимание на то, что пустой буффер заблокирует чтение. А после того, как новые данные будут записаны, буффер заблокирует запись, пока из него не будут данные прочитаны. Это такая точка синхронизации потоков, которая позволяет имитировать единый поток, составленный из множества источников.
Переключение же между репликами происходит следующим образом:
...
func (r *Reader) run(ctx context.Context) {
startNextReplicaCh := make(chan struct{}, r.cfg.InitialReplicaCount)
var wg sync.WaitGroup
defer func() {
// закрываем и освобождаем все, что нужно
}()
for startFn := range r.replicas { // это как раз итератор реплик от трекера
select {
case <-ctx.Done():
return
case startNextReplicaCh <- struct{}{}:
}
wg.Add(1)
go func() {
defer wg.Done()
r.runReplicaRead(ctx, startFn, startNextReplicaCh) // каждая реплика запускается в отдельной горутине, если что-то пошло не так, придет сигнал в startNextReplicaCh, который запустит новую реплику
}()
}
}
// вот так, например, работает обработка медленного чтения
// мы учитываем только то время, которое провели внутри этого вызова Read, чтобы исключить ситуацию с тем, что
// Merge-буффер заблокирован (куда пишется результат этого Read), а также медленное чтение с клиентской стороны,
// мы не хотим переключаться между репликами, если соединение с клиентом недостаточно широкое
func (r *timedSegmentReader) Read(ctx context.Context, p []byte) (int, error) {
if r.closed {
return 0, io.EOF
}
now := r.clock.Now()
timeout := r.targetSegmentTimeout - r.currentSegmentElapsed // считаем время только внутри этого Read
timer := r.clock.AfterFunc(timeout, r.doAfterElapsed) // внутри запись в startNextReplicaCh
defer timer.Stop()
n, err := r.source.Read(ctx, p)
r.currentSegmentElapsed += r.clock.Since(now)
r.currentSegmentRead += n
if r.currentSegmentRead >= r.targetSegmentSize {
r.currentSegmentRead %= r.targetSegmentSize
r.currentSegmentElapsed = 0
}
return n, err
}Что нам все это дает? Возможность собрать вот такую схему:

В таком дизайне каждый важный аспект схемы чтения поддается конфигурации благодаря предусмотренным параметрам конфига и удобно модифицируется. Prefetch мы убрали, теперь только один chunk находится в активном чтении. Но ничего не мешает увеличить количество одновременных копий такой схемы, поскольку она потребляет не более чем 512 KБ + небольшой оверхед декодирования (около 2 КБ) — это позволяет на одной ноде разрешать больше одновременных операций чтения.
Есть еще одна вещь, о которой хочу упомянуть. У API GetObject есть возможность указывать range байт, которые хочется получить, то есть читать могут и неполные объекты. Поэтому изначальный дизайн хранилища включал в себя возможность читать с volume любые участки объекта, после чего появилось сжатие перед записью и читать случайный байт из объекта стало сложнее. Мы решили эту задачу, разбивая каждый chunk на блоки и сжимая их по отдельности. В процессе мы формируем BlockIndex-структуру получившегося объекта с упорядоченным списком смещений каждого блока. Это позволяет при чтении случайного диапазона байт не вытаскивать и декодировать весь chunk, а только блоки, в которые входит запрошенный диапазон. То же касается и шифрования: оно работает также с блоками, что позволяет не менять подход к чтению с дисков, а только адаптировать декодер на работу с неполным набором блоков.
Что получилось по цифрам
Пойдем по двум последним версиям. Для референса я буду давать показатели на здоровом кластере и кластере с отключенной одной зоной доступности в рамках одной версии. Кластер тестовый на VM, в одном бакете несколько сотен объектов по 160 МБ, загруженных мультипартом по 5 МБ. Нагрузка снимается warp, отказ DC симулировался периодическим отключением сервисов-прокси до SDS.
Версия без стриминга
Здоровый кластер:
Report: GET (2124 reqs). Ran Duration: 10m2s
* Objects per request: 1. Size: 167772160 bytes. Concurrency: 20. Hosts: 8.
* Average: 563.05 MiB/s, 3.52 obj/s (602s)
* Reqs: Avg: 5688.7ms, 50%: 5602.7ms, 90%: 7515.1ms, 99%: 8505.4ms, Fastest: 2760.0ms, Slowest: 11358.8ms, StdDev: 1190.0ms
* TTFB: Avg: 850ms, Best: 186ms, 25th: 542ms, Median: 780ms, 75th: 1.107s, 90th: 1.53s, 99th: 2.051s, Worst: 6.097s StdDev: 436ms
Throughput, split into 602 x 1s:
* Fastest: 622.6MiB/s, 3.89 obj/s
* 50% Median: 564.3MiB/s, 3.53 obj/s
* Slowest: 311.8MiB/s, 1.95 obj/sКластер с отключенным DC:
Report: GET (1428 reqs). Ran Duration: 10m3s
* Objects per request: 1. Size: 167772160 bytes. Concurrency: 20. Hosts: 8.
* Average: 377.92 MiB/s, 2.36 obj/s (603s)
* Reqs: Avg: 8476.1ms, 50%: 8562.4ms, 90%: 10726.7ms, 99%: 11475.1ms, Fastest: 3886.0ms, Slowest: 14984.0ms, StdDev: 1566.3ms
* TTFB: Avg: 1.248s, Best: 159ms, 25th: 673ms, Median: 1.234s, 75th: 1.866s, 90th: 2.432s, 99th: 2.775s, Worst: 4.984s StdDev: 756ms
Throughput, split into 603 x 1s:
* Fastest: 444.9MiB/s, 2.78 obj/s
* 50% Median: 377.4MiB/s, 2.36 obj/s
* Slowest: 236.3MiB/s, 1.48 obj/sПадение средней пропускной способности — 21%.
Актуальная версия — стриминг и пессимистичное переключение реплик
Здоровый кластер:
Report: GET (1290 reqs). Ran Duration: 10m13s
* Objects per request: 1. Size: 167772160 bytes. Concurrency: 20. Hosts: 8.
* Average: 335.85 MiB/s, 2.10 obj/s (613s)
* Reqs: Avg: 9588.8ms, 50%: 8322.5ms, 90%: 20149.4ms, 99%: 26422.6ms, Fastest: 2954.7ms, Slowest: 41927.1ms, StdDev: 5488.4ms
* TTFB: Avg: 230ms, Best: 18ms, 25th: 91ms, Median: 120ms, 75th: 210ms, 90th: 769ms, 99th: 1.51s, Worst: 4.981s StdDev: 346ms
Throughput, split into 613 x 1s:
* Fastest: 414.6MiB/s, 2.59 obj/s
* 50% Median: 340.3MiB/s, 2.13 obj/s
* Slowest: 12.2MiB/s, 0.08 obj/sКластер с отключенным DC:
Report: GET (1219 reqs). Ran Duration: 10m6s
* Objects per request: 1. Size: 167772160 bytes. Concurrency: 20. Hosts: 8.
* Average: 320.92 MiB/s, 2.01 obj/s (606s)
* Reqs: Avg: 9937.3ms, 50%: 10044.8ms, 90%: 12463.7ms, 99%: 13282.9ms, Fastest: 4711.2ms, Slowest: 16215.3ms, StdDev: 1754.6ms
* TTFB: Avg: 94ms, Best: 19ms, 25th: 59ms, Median: 88ms, 75th: 128ms, 90th: 179ms, 99th: 240ms, Worst: 550ms StdDev: 55ms
Throughput, split into 606 x 1s:
* Fastest: 374.4MiB/s, 2.34 obj/s
* 50% Median: 323.0MiB/s, 2.02 obj/s
* Slowest: 61.2MiB/s, 0.38 obj/sПадение средней пропускной способности — 4,5%.
В этих цифрах стоит обратить внимание на два важных аспекта:
это виртуальный кластер, где на каждой ноде развернуты все сервисы хранилища, поэтому его работа может быть весьма нестабильной, так как сервисы мешают друг другу;
стриминг обладает существенным минусом: из-за снижения размера читаемого за заход объема данных растет случайность IO, которая не очень нравится винтам, плюс был отключен prefetch, так что референсное значение для новой версии ниже аналогичного для предыдущих.
Выводы
Пессимизм в условиях сложных систем может приносить больше пользы, чем вреда. Это в свою очередь помогает работать с проблемой изолировано, не приводя к каскадным отказам в результате retry-штормов. Эвристика в выборе следующей попытки, даже минимальная без повторов «авось теперь сработает» позволяет снижать влияние локальных отказов на работоспособность системы в целом.
Касательно же работы с потоками данных через поточные API заставляет разграничивать слои обработки и доставки через обертки. Это позволяет не размывать логику, а выделять хорошо изменяемые изолированные модули. Такой подход хорошо конфигурируется и позволяет экономить на затрачиваемых ресурсах.
В свою очередь снижение размера читаемого за заход на диск объема данных сильно увеличивает случайность IO-операций, что в случае с винтами приводит к снижению пропускной способности. Она становится тем незаметнее, чем больше кластер. В то же время этот подход позволил отключить prefetch и снизить тем самым расход оперативной памяти, а вместе с тем и нагрузку на внутреннюю сетевую инфраструктуру за счет снижения объемов параллельно считываемых данных, что хорошо сказывается на стабильности.
