Pull to refresh
103.18
IBS
IBS – технологический партнер лидеров экономики

Обновить данные в ClickHouse без UPDATE: кейс IBS

Reading time16 min
Views5.6K

Привет, Хабр! Меня зовут Антон, я – старший разработчик в отделе разработки баз данных в IBS. В этой статье я расскажу о том, как нашей командой была решена задача по сохранению в ClickHouse большого количества данных, генерируемых веб-приложением, с последующим получением сохранённых данных в агрегированном виде.

Решение задачи, описанной выше, было бы простым и вряд ли заслуживающим отдельной статьи на Хабре. Но наш случай представлял собой ряд нюансов: здесь есть технические дубли записей и бизнес-дубли (обновления), есть агрегированные данные и необходимость обновления агрегированных данных. А это уже пример не совсем типичного использования ClickHouse, которым мы и хотим поделиться.

Исходные данные

Над задачей наша команда – я и два бэкендера – начали работать при создании приложения для крупного госзаказчика. Этот продукт представлял собой сложную структуру, регулирующую кучу процессов. Мониторинг для этих процессов реализуется несколькими разными системами. Ниже я расскажу, как мы оптимизировали созданную ещё до нашего вмешательства систему, отвечающую за сохранение результатов обработки документов.

По каждому обрабатываемому документу формируется ряд параметров (метрик), которые нужно сохранять для дальнейшего анализа. Ранее обработанные документы могут поступать на повторную обработку. Результат этой обработки может отличаться от результата предыдущей, что обуславливает необходимость обновления ранее сохранённых и уже агрегированных метрик. Агрегированные данные в различных разрезах отображаются на одной из форм веб-приложения, поэтому все запросы к агрегированным данным должны выполняться очень быстро, в идеале за несколько секунд.

Итак, сформулируем строго требования к системе

  1. Система должна сохранять метрики, рассчитывать на их основе агрегированные данные и предоставлять возможность быстро получать агрегированные данные в различных разрезах.

  2. Система должна игнорировать возможное поступление дублей метрик.

  3. Система должна обеспечивать возможность изменения ранее сохранённых метрик и соответствующее изменение агрегированных данных.

  4. Система должна быть достаточно производительной, чтобы обеспечивать возможность обработки нескольких десятков миллионов поступающих метрик в сутки.

  5. Запрос на получение агрегированных данных в различных разрезах должен выполняться очень быстро (за несколько секунд).

Изначально система была на основе Elasticsearch. Однако со временем количество метрик росло, и движок перестал удовлетворять по скорости выдачи агрегированных данных. Это неудивительно, т.к. при каждом запросе ES вынужден был заново рассчитывать агрегаты по всему объёму данных, чтобы учесть их изменения. Мы решили поэкспериментировать с решением этой задачи на ClickHouse. Во-первых, потому что он хорошо справляется с расчётом агрегатов на больших объёмах данных. Во-вторых, он уже используется в решении некоторых задач в нашем приложении. И всё бы хорошо, если бы не п.3 из списка наших требований к системе – необходимость обновления сохранённых записей. Ведь как сказано в официальной документации, операция UPDATE в ClickHouse не предназначена для частого выполнения, потому что она тяжёлая. Поэтому пришлось подумать над решением, которое бы использовало сильные стороны ClickHouse и в то же время обходило эту его «слабую сторону».

Такое решение нам удалось найти. То, что у нас получилось, можно называть «инкрементальным логическим обновлением». «Логическим», потому что, конечно, никаких UPDATE-ов из-за указанных особенностей ClickHouse мы не делаем. Вместо этого производится логическое аннулирование устаревших данных с записью обновлённых. А «инкрементальным», потому что рассчитываем агрегаты не по всему набору накопленных данных, а только по вновь поступившим данным. Агрегаты при этом тоже обновляются без операции UPDATE. Далее будет подробнее описано то, каким образом это было реализовано.

Теперь перейдём к реализации решения

Схема решения

Для решения задачи нам понадобились 4 таблицы и 4 материализованных представления (МП). МП вместе образуют что-то типа конвейера, в который поступают «грязные» (в плане наличия дублей) неагрегированные данные, а выходят – очищенные от дублей, актуальные данные в агрегированном виде. Схематично это представлено на Рис. 1

Рис.1. Схема решения. Набор таблиц и МП для последовательного преобразования данных.
Рис.1. Схема решения. Набор таблиц и МП для последовательного преобразования данных.

Материализованное представление в ClickHouse

Сделаем небольшое отступление и напомним, как работает МП в ClickHouse. Как известно, здесь есть 2 вида МП: первые хранят данные; вторые, созданные с использованием конструкции TO [db]. [destination_table], не хранят данные и являются, по сути, after-insert триггером (ниже приведена его упрощённая структура).

CREATE MATERIALIZED VIEW [db.]view [ON CLUSTER]
TO [db.] [destination_table]
AS SELECT ... FROM [db.] [source_table]

Это значит, что, когда в таблицу source_table попадают данные, срабатывает МП, выполняет какие-либо манипуляции с помощью запроса SELECT над этими данными и записывает их в таблицу destination_table. Будем называть таблицу source_table «таблицей-источником», а таблицу destination_table – «таблицей-приёмником». Важно отметить, что такое МП видит не весь набор данных в таблице-источнике, а только что поступившие. Все другие таблицы, которые фигурируют в запросе SELECT, но не в конструкции FROM, не являются в этом смысле таблицами-источниками, что позволяет МП видеть в них весь набор данных.

Например, в таблицах в секциях JOIN МП видит весь набор данных. Как мы увидим в дальнейшем, это позволяет нам элегантно решить задачу.

Теперь рассмотрим детально решение

Упрощённо будем считать, что входная структура данных имеет следующий вид:

{
  RecId  String,     --Однозначно характеризует запись
  Time   DateTime64, --Время создания записи
  Sign   int8        --Признак записи
}

В действительности полей во входных данных гораздо больше. То, что здесь представлено одним полем RecId, является, на самом деле, набором из 8-и отдельных полей, но в рамках рассматриваемой задачи это несущественно, поэтому будем использовать RecId для простоты.

Наполнение данными

Изначально данные поступают в таблицу from_kafka_metrics из Kafka. Для этого мы использовали имеющуюся в ClickHouse функциональность для взаимодействия с Kafka, а именно создали указанную таблицу с движком ENGINE=Kafka. Далее МП, созданное над этой таблицей, перекладывало записи для постоянного хранения в таблицу metrics_dirty (рис. 2).

Рис. 2. Схема поступления данных из Kafka в таблицу metrics_dirty.
Рис. 2. Схема поступления данных из Kafka в таблицу metrics_dirty.

Такой перенос данных был нужен, потому что, как известно, в таблице с движком Kafka записи будут исчезать сразу после первого выполненного запроса SELECT, что нас не устраивало. Таким образом, в таблице metrics_dirty хранятся постоянно поступающие данные о метриках. Мы не будем рассматривать устройство таблицы from_kafka_metrics и МП from_kafka_metrics_mv, т.к. для этой статьи они не представляют интереса; перейдём сразу к таблице metrics_dirty.

Скрипт создания таблицы metrics_dirty выглядит следующим образом:

create table if not exists metrics_dirty on cluster some_cluster
(
  RecId   String,
  Time    DateTime64(6) default toDateTime64(now64(), 6),
  Sign    Int8 default 1   
)
ENGINE = ReplicatedMergeTree()
ORDER BY tuple();

Здесь RecId – это идентификатор, который позволяет различать записи. Time – время метрики. Поле Sign (признак записи) – специальное техническое целочисленное поле, о назначении которого будет сказано ниже. Ещё раз отмечу, что в этой таблице могут содержаться дубли метрик, т.е. записи с одинаковыми значениями полей RecId и Sign. Time у таких записей может быть как одинаковым, так и отличающимся; в любом случае такие записи для нас являются дублями.

Как видите, эта таблица, как и все другие, которые будут описаны ниже, является реплицированной. Кроме того, есть ещё одно важное обстоятельство, а именно над каждой из таблиц были созданы распределённые таблицы (ENGINE = Distributed), и все операции вставки и выборки производились именно с распределёнными таблицами. В этой статье я их не буду описывать, чтобы не плодить много кода, мне важно показать принцип. Но нужно учитывать, что в коде всех МП имеются в виду распределённые таблицы. Из этого следует ещё одно важное обстоятельство, о котором я скажу чуть позже. А пока продолжим рассматривать суть решения.

Устранение дублей

Для устранения дублей была создана таблица metrics_uniq, в которую данные поступали из metrics_dirty. Для этого было создано МП metrics_uniq_mv, в котором источником является таблица metrics_dirty, а приёмником – таблица metrics_uniq. Но поскольку из таблицы-источника МП видит лишь тот кусок данных, который привёл к его срабатыванию, наличие дублей поступивших записей мы проверяем по таблице-приёмнику, используя JOIN, т.к. в таблице из секции JOIN МП видит весь набор записей. Если только что пришедших данных (с таким же RecId) ещё нет в metrics_uniq, запрос SELECT их выберет из поступившего куска и не выберет, если записи с таким RecId уже имеются. Таким образом, в таблице metrics_uniq накапливаются только уникальные по набору полей [RecId, Sign] данные. Т.е., как вы поняли, в таблице может быть несколько записей с одинаковым RecId, но разным признаком Sign (1 и -1).

create table if not exists metrics_uniq on cluster some_cluster
(
    RecId   String,
    Time    DateTime64(6),
    Sign    Int8 default 1
)
ENGINE = ReplicatedMergeTree()
ORDER BY (Sign, RecId);

МП для наполнения уникальными данными таблицы metrics_uniq выглядит следующим образом:

create materialized view metrics_uniq_mv on cluster some_cluster
to metrics_uniq
as
select RecId,
       Time,
       Sign             
  from (select metrics_dirty.RecId,
               metrics_dirty.Time,
               metrics_dirty.Sign,               
               row_number() over(partition by metrics_dirty.RecId,
                                              metrics_dirty.Sign
                                     order by metrics_dirty.Time desc) as rn
          from metrics_dirty –-Таблица-источник с грязными данными
          left join metrics_uniq –-Таблица-приёмник с уникальными записями
            on metrics_uniq.RecId = metrics_dirty.RecId
           and metrics_uniq.Sign = metrics_dirty.Sign
         where coalesce(metrics_uniq.RecId, '') = '' –-Добавляем, если ещё нет в приёмнике
        )
 where rn = 1; --Выбираем 1 запись для исключения дублей

Как видите, при записи нового куска данных в таблицу с уникальными данными мы проверяем наличие таких данных в ней самой, хотя вставка данных осуществлялась в таблицу metrics_dirty. По сути, это и является корнем всего нашего построения.

Также в этом МП учтено, что дубли могут прийти в одном куске. Чтобы отсеять их, используем конструкцию row_number() и условие rn=1. И тут вы можете спросить: «Зачем проверять наличие дублей в поступившем куске данных, если таблица, в которую они были вставлены, реплицирована? Ведь как сказано в документации, дедупликация для таблиц семейства ReplicatedMergeTree включена по умолчанию». Дело в том, что дедупликация в ClickHouse техническая, т.е. она исключает добавление записей со ВСЕМИ одинаковыми полями, тогда как для нас дублями являются даже такие записи, у которых отличается значение в поле Time.

Может показаться странным, что для борьбы с дублями в таблице metrics_uniq мы использовали описанный подход, а не имеющиеся в ClickHouse возможности, в частности движок ReplacingMergeTree. Указанный движок в фоновом режиме автоматически удаляет дубли по нужному набору полей без лишней логики в МП. Но это бы не помешало МП вставлять в таблицу дубли и вызывать срабатыванием МП, вносящее дубли далее по конвейеру в таблицу агрегированных данных. Т.е. мы бы добились уникальности данных в этой таблице, но не защиту агрегированных данных от поступления дублей. Таким образом, поскольку мы самостоятельно написали логику для защиты от дублей, решено было использовать “обычный” движок MergeTree, чтобы иметь возможность отслеживать появление дублей в metrics_uniq.

Отмечу, что приведённое МП имеет упрощённый вид по сравнению с оригиналом, т.к. при нагрузочном тестировании указанный запрос на больших объёмах данных выполнялся недопустимо долго. Причина этого в том, что ClickHouse, к сожалению, не использует индексы таблиц в секции JOIN. Эту проблему удалось решить добавлением секции WITH перед запросом. Не буду подробно останавливаться на этом в статье, скажу только, что в конструкции WITH предварительно формировался набор данных из таблицы-источника, и далее по этому набору производилась выборка данных для подзапроса в секции JOIN. Т.е. в секции JOIN уже была не таблица, а подзапрос к этой таблице с конструкцией WHERE и выборкой данных таких же, как в конструкции WITH. После этого запрос и МП стали работать быстро, поэтому все МП, в которых был JOIN, мы переписали в такой же форме.

Логическое обновление

Суть этого метода заключается в том, что нужно каким-то образом помечать обновляемые данные неактуальными и заменять их актуальными. Для этого была создана ещё одна таблица – metrics_logic_upd, чтобы обновлять в ней записи не физически, а логически. Т.е. для того, чтобы исправить уже добавленную условную запись А1, приложение сначала отправляет запись А1’ (A1 штрих), как бы аннулируя первоначальное значение, а затем отправляет запись А2, которое логически заменяет А1. Отмечу, что приложению известно, что оно отправляет изменённые данные, поэтому перед тем, как отправить их, оно отправляет записи для логического аннулирования старых. Несмотря на это, данная реализация прощает и нарушенный порядок, т.е. даже если порядок аннулирующих и обновлённых записей изменится – это не приведёт к нарушению логики.

Для реализации логического обновления нам и потребовалось дополнительное поле Sign, которое было добавлено в атрибуты записи. Для записей А1 – значение Sign=1, а для А1’ – значение Sign=-1; RecId в записях А1 и А2 одинаковые, а Time может быть любым, оно для нас несущественно. Таким образом, когда в таблице оказываются записи А1 и А1’, сумма Sign для этих записей становится равной 0, что является признаком логического аннулирования записи A1. В результате в агрегированной таблице будет вносить вклад в итоговый результат только запись А2.

Так выглядит скрипт создания таблицы metrics_logic_upd:

create table metrics_logic_upd on cluster some_cluster
(
     RecId       String,
     Time        DateTime64(6),
     Total       Int8,
     CreateTime  DateTime64(6) default toDateTime64(now64(), 6)
)
ENGINE = ReplicatedMergeTree()
ORDER BY (CreateTime, RecId);

Здесь в поле Total хранится сумма значений в поле Sign записей A1 и A1’, если такие имеются. Т.е. возможными значениями для этого поля являются 1 для неаннулированных записей (одна запись с Sign=1) и 0 – для аннулированных (по одной записи с Sign=1 и Sign=-1).

Так выглядит скрипт создания МП для выполнения «логического» обновления.

create materialized view metrics_logic_upd_mv on cluster some_cluster 
to metrics_logic_upd
as         
select q.RecId,
       q.Time,
       q.Total,
       toDateTime64(now64(), 6) as CreateTime
  from (select uniq.RecId,
               uniq.Time,
               uniq.Sign + coalesce(logic_upd.Total, 0) as Total --Складываем признаки для записей с одинаковым первичным ключом для учёта логического аннулирования
          from (select metrics_uniq.RecId,
                       metrics_uniq.Time,
                       metrics_uniq.Sign
                  from metrics_uniq
                ) uniq
          left join (select metrics_logic_upd.RecId,
                            metrics_logic_upd.Total
                       from metrics_logic_upd
                    ) logic_upd
            on logic_upd.RecId = uniq.RecId                    
        ) q;

В конструкции FROM указана таблица-источник metrics_uniq, при вставке записей в которую МП срабатывает; в секции LEFT JOIN указана та же таблица, что и в секции TO – это позволяет «обновить» записи в metrics_logic_upd, используя уже имеющиеся в ней записи.

Как видно из кода, старые записи в ней никуда не деваются, просто появляются новые записи с тем же RecId, но большей датой CreateTime. Таким образом, выбирая записи с максимальным значением CreateTime для каждого RecId, мы получаем актуальные на текущий момент данные.              

Как видно из представленного кода МП, оно очень простое, к тому же, поскольку в МП приходит только небольшой вновь добавляемый кусок данных, запрос в этом МП является достаточно легковесным. Может возникнуть вопрос, почему в секции LEFT JOIN выбираются все записи по RecId, а не последние имеющиеся с максимальным CreateTime? Это делается потому, что других записей с таким же RecId в ней быть не может, так как на предыдущем шаге мы уже избавились от дублей.

Теперь остался последний шаг – обновить пришедшие данные в агрегированной таблице. Для этой таблицы в качестве движка мы выбрали ENGINE = ReplicatedVersionedCollapsingMergeTree, потому что он в фоновом режиме удаляет предыдущие рассчитанные данные и уменьшает количество “мусора” в таблице, что приятно. О принципе работы с этим движком подробно описано в документации, я на этом останавливаться не буду, отмечу только, что при запросе данных из этой таблицы нужно учитывать наличие неактуальных данных, т.к. их удаление происходит в непредсказуемый момент времени.

create table metrics_agg on cluster some_cluster
(
    PeriodStart   DateTime64(6),  --Начало 15-и минутного интервала агрегации
    PeriodEnd     DateTime64(6),  --Конец 15-и минутного интервала агрегации
    `Count`       UInt64,         --Всего записей за интервал
    CreateTime    DateTime64(6),  --Время создания записи
    Sign          Int8,           --Тип строки: 1—строка состояния, -1—строка отмены состояния
    Version       UInt8           --Версия метрики
)
ENGINE = ReplicatedVersionedCollapsingMergeTree (Sign, Version)
ORDER BY (PeriodStart);

Так выглядит скрипт МП для наполнения/обновления данных в финальной, агрегированной таблице.

create materialized view metrics_agg_mv on cluster some_cluster
to metrics_agg
as
select q.PeriodStart,
       q.PeriodEnd,
       q.`Count`,
       toDateTime64(now64(), 6) as CreateTime,
       q.Sign,
       q.Version       
  from (select new_batch.IncTotal + coalesce(last_agg.`Count`, 0) as `Count`,
               new_batch.PeriodStart as PeriodStart,
               new_batch.PeriodEnd as PeriodEnd,
               coalesce(last_agg.Sign, 1) as Sign, --если записи ещё не было, будет признак 1
               coalesce(last_agg.Version, 1) as Version --если записи ещё не было, будет версия 1               
          from (select sum( Inc ) as IncTotal,                         
                       PeriodStart,
                       PeriodEnd
                  from (select RecId,
                               toDateTime64(toStartOfInterval( Time, interval 15 minute ), 6) as PeriodStart, --Начало 15-и минутки
                               toDateTime64(toStartOfInterval( Time, interval 15 minute ) + interval 15 minute, 6) as PeriodEnd, --Конец 15-и минутки
                               case 
                                   when Total = 1 then --Total = 1 (приход новой)
                                       1
                                   else --Total = 0 (пришло аннулирование)
                                       -1
                               end as Inc, --Считаем инкрмент для каждой записи
                               row_number() over(partition by RecId 
                                                     order by Time desc) as rn                               
                          from metrics_logic_upd
                         where Total in (0, 1)                         
                        )
                 where rn = 1
                 group by PeriodStart,
                          PeriodEnd
                ) new_batch
                    
          --Обращаемся к агрегированной таблице для получения текущих значений
          left join (select t.PeriodStart,
                            t.PeriodEnd,
                            t.`Count`,
                            sgn.Sign,
                            case
                              when sgn.Sign = -1 then t.Version --предыдущее значение версии для отменяемой
                              when sgn.Sign = 1 then t.Version + 1 --новое значение версии для новой
                            end as Version                            
                       from (select PeriodStart,
                                    PeriodEnd,
                                    `Count`,
                                    row_number() over(partition by PeriodStart, PeriodEnd                                         
                                                          order by CreateTime desc) as rn
                               from metrics_agg
                            ) t                      
	                  --Размножение строк на 2 для версионирования:
                      --  1-я для отмены предыдущего значения, точная копия с Sign = -1
                      --  2-я - новая с Sign = 1
                      cross join (select case 
                                           when number = 0 then -1 --для отмены старой записи
                                           when number = 1 then 1 --для создания новой записи
                                         end as Sign
                                    from numbers(2)
                                  ) sgn                                                
                      where rn = 1
                    ) last_agg          
            on last_agg.PeriodStart = new_batch.PeriodStart 
           and last_agg.PeriodEnd = new_batch.PeriodEnd                  
       ) q;

Его строение аналогично предыдущему за исключением того, что в этом случае в секции LEFT JOIN используются записи с максимальным CreateTime для набора за каждый период, т.к. данных по периодам может быть много, а нам для обновления всегда нужны последние имеющиеся.

Для получения актуальных агрегированных данных из таблицы XXX нужно использовать запрос такого вида:

select q.PeriodStart,
       q.PeriodEnd,
       q.`Count`
  from (select t.PeriodStart,
               t.PeriodEnd,
               t.`Count`
               row_number() over(partition by t.PeriodStart,
                                              t.PeriodEnd
                                     order by t.Version desc) as rn
          from metrics_agg t
         where t.Sign = 1
       ) q
 where q.rn = 1 --Выбираем актуальные данные

Здесь мы выбираем записи состояния с Sign = 1 (неотменённые) с максимальным значением Version для каждой метрики.

А теперь ВАЖНОЕ замечание. Как вы могли заметить в коде всех МП, в них один общий принцип – поиск записанных данных в таблицы перед записью новых данных. Но, поскольку, как известно, ClickHouse не транзакционный, а Eventually Consistent, то, что вы в него записали, когда-нибудь обязательно можно будет увидеть в запросе SELECT, но не факт, что сразу после записи. На самом деле это является слабым местом всего решения, и нужно обязательно его учитывать, если что-то похожее вы решите делать у себя. В нашем случае это актуально в основном при повторной загрузке документов, при этом между повторами проходит значительное время, за которое данные успевают распределиться. Мы за время наших экспериментов не столкнулись с проблемами из-за этого, а на будущее имеем в виду возможность синхронной вставки. Для этого в ClickHouse есть:

  1. команда FLUSH DISTRIBUTED

  2. настройка insert_distributed_sync

  3. настройка fsync_after_insert

Пока ни с чем из этого мы не экспериментировали. Если у вас есть опыт использования чего-то из этого, интересно будет его узнать.

Посмотрим теперь, как работает получившийся механизм

Для теста вставим одну запись в таблицу metrics_dirty:

insert into metrics_dirty
values('A1', toDateTime64(now64(), 6), 1);

после чего в наших таблицах появятся такие записи

в metrics_dirty и metrics_uniq:

RecId|Time                   |Sign|
-----+-----------------------+----+
A1   |2022-10-19 06:05:41:319|   1|

в metrics_logic_upd:

RecId|Time                   |Total|CreateTime             |
-----+-----------------------+-----+-----------------------+
A1   |2022-10-19 06:05:41:319|    1|2022-10-19 06:05:41:329|

в metrics_agg:

PeriodStart          |PeriodEnd            |Count|CreateTime             |Sign|Version|
---------------------+---------------------+-----+-----------------------+----+-------+
2022-10-19 06:00:00:0|2022-10-19 06:15:00:0|    1|2022-10-19 06:05:41:336|   1|      1|

Проверим, как работает защита от дублей; выполним эту вставку ещё раз и посмотрим, какие записи окажутся в таблицах

в metrics_dirty:

RecId|Time                   |Sign|
-----+-----------------------+----+
A1   |2022-10-19 06:05:41:319|   1|
A1   |2022-10-19 06:08:42:103|   1|

в metrics_uniq:

RecId|Time                   |Sign|
-----+-----------------------+----+
A1   |2022-10-19 06:05:41:319|   1|

Как видно, в таблице уникальных записей дубль не появился, как и в таблицах metrics_logic_upd и metrics_agg.

Теперь проверим логическое обновление: добавим сначала аннулирующую запись, для этого просто изменим в первоначальном запросе значение Sign на -1

insert into metrics_dirty
values('A1', toDateTime64(now64(), 6), -1);

После этого получим такие записи

в metrics_dirty:

RecId|Time                   |Sign|
-----+-----------------------+----+
A1   |2022-10-19 06:05:41:319|   1|
A1   |2022-10-19 06:08:42:103|   1|
A1   |2022-10-19 06:13:47:409|  -1|

в metrics_uniq:

RecId|Time                   |Sign|
-----+-----------------------+----+
A1   |2022-10-19 06:05:41:319|   1|
A1   |2022-10-19 06:13:47:409|  -1|	

в metrics_logic_upd:

RecId|Time                   |Total|CreateTime             |
-----+-----------------------+-----+-----------------------+
A1   |2022-10-19 06:05:41:319|    1|2022-10-19 06:05:41:329|
A1   |2022-10-19 06:13:47:409|    0|2022-10-19 06:13:47:418|

в metrics_agg:

PeriodStart          |PeriodEnd            |Count|CreateTime             |Sign|Version|
---------------------+---------------------+-----+-----------------------+----+-------+
2022-10-19 06:00:00:0|2022-10-19 06:15:00:0|    1|2022-10-19 06:05:41:336|   1|      1|
2022-10-19 06:00:00:0|2022-10-19 06:15:00:0|    0|2022-10-19 06:13:47:426|  -1|      1|
2022-10-19 06:00:00:0|2022-10-19 06:15:00:0|    0|2022-10-19 06:13:47:426|   1|      2|

Как видно, в таблице metrics_dirty копятся все приходящие записи; в metrics_uniq добавилась ещё одна запись, которая является логическим аннулированием первой; далее в metrics_logic_upd видим, что последняя имеющаяся запись для A1 обладает значением Total=0, что является признаком её аннулирования. Аналогично в metrics_agg последняя запись (с наибольшим VERSION и Sign=1) имеет Count=0, т.е. она не будет вносить вклад при суммировании по Count, как и должно быть для аннулированных записей.

Теперь добавим исправляющую запись для A1, назовём её A2

insert into metrics_dirty
values('A2', toDateTime64(now64(), 6), 1);

Пропустим таблицы metrics_dirty и metrics_uniq по причине очевидности того, что в них будет содержаться. В metrics_logic_upd будут такие записи:

RecId|Time                   |Total|CreateTime             |
-----+-----------------------+-----+-----------------------+
A1   |2022-10-19 06:05:41:319|    1|2022-10-19 06:05:41:329|
A1   |2022-10-19 06:13:47:409|    0|2022-10-19 06:13:47:418|
A2   |2022-10-19 06:23:47:763|    1|2022-10-19 06:23:47:784|

в metrics_agg:

PeriodStart          |PeriodEnd            |Count| CreateTime            |Sign|Version|
---------------------+---------------------+-----+-----------------------+----+-------+
2022-10-19 06:00:00:0|2022-10-19 06:15:00:0|    1|2022-10-19 06:05:41:336|   1|      1|
2022-10-19 06:00:00:0|2022-10-19 06:15:00:0|    0|2022-10-19 06:13:47:426|  -1|      1|
2022-10-19 06:00:00:0|2022-10-19 06:15:00:0|    0|2022-10-19 06:13:47:426|   1|      2|
2022-10-19 06:15:00:0|2022-10-19 06:30:00:0|    1|2022-10-19 06:23:47:795|  -1|      2|	
2022-10-19 06:15:00:0|2022-10-19 06:30:00:0|    1|2022-10-19 06:23:47:795|   1|      3|	

Теперь ещё раз посмотрим, как будут суммироваться данные в агрегированной таблице, добавим новую запись

insert into metrics_dirty
values('B1', toDateTime64(now64(), 6), 1);

в metrics_agg:

PeriodStart          |PeriodEnd            |Count| CreateTime            |Sign|Version|
---------------------+---------------------+-----+-----------------------+----+-------+
2022-10-19 06:00:00:0|2022-10-19 06:15:00:0|    1|2022-10-19 06:05:41:336|   1|      1|
2022-10-19 06:00:00:0|2022-10-19 06:15:00:0|    0|2022-10-19 06:13:47:426|  -1|      1|
2022-10-19 06:00:00:0|2022-10-19 06:15:00:0|    0|2022-10-19 06:13:47:426|   1|      2|
2022-10-19 06:15:00:0|2022-10-19 06:30:00:0|    1|2022-10-19 06:23:47:795|  -1|      2|	
2022-10-19 06:15:00:0|2022-10-19 06:30:00:0|    1|2022-10-19 06:23:47:795|   1|      3|
2022-10-19 06:15:00:0|2022-10-19 06:30:00:0|    2|2022-10-19 06:29:35:505|  -1|      3|
2022-10-19 06:15:00:0|2022-10-19 06:30:00:0|    2|2022-10-19 06:29:35:505|   1|      4|

Как видно, последняя по Version запись имеет Count=2, т.е. записи, как и ожидалось, суммировались.

Заключение

Подводя итог вышеизложенному, остаётся добавить только, что метод несмотря на наличие 4-х МП показывает хорошие результаты по загрузке данных. Даже после того, как был загружен 1 миллиард записей, загрузка куска в 5 миллионов записей укладывалась в 5 минут, что при нашей скорости поступления данных (до нескольких десятков миллионов записей в сутки) более чем достаточно.

Из минусов этого способа нужно отметить, что он будет становиться всё более медленным при увеличении количества данных. Отработать этот момент можно, например, выделив из массива данных те, что со временем уже не будут изменяться, и исключив их из поиска. Однако, скорее всего, есть и альтернативные варианты решения проблемы – было бы очень интересно услышать мнение профессионалов ClickHouse :)

Tags:
Hubs:
Total votes 12: ↑12 and ↓0+12
Comments11

Articles

Information

Website
www.ibs.ru
Registered
Founded
1992
Employees
1,001–5,000 employees
Location
Россия