Все началось с просьбы сделать отчеты в реальном времени. На первый взгляд задача выглядела простой, но довольно быстро выяснилось, что существующая архитектура для этого не подходит.
Проект был разбит на множество микросервисов, каждый из которых хранил данные в собственной PostgreSQL-базе. Чтобы строить сквозные отчеты, информацию нужно было где-то объединять.
На тот момент аналитика уже работала через ETL: раз в сутки Airflow восстанавливал общую PostgreSQL из ежедневных бекапов, а Redash выполнял запросы уже к ней. Решение было надежным и не требовало нагрузки на production, но для real-time оно не годилось — в лучшем случае отчеты показывали состояние системы на начало дня.
Какую проблему нужно было решить
Существующая схема была построена на классическом пакетном ETL. Раз в сутки Airflow скачивал резервные копии всех PostgreSQL-баз из S3, восстанавливал их и последовательно обновлял единую базу, к которой обращался Redash.
Пока объем данных был небольшим, такой подход не доставлял особых проблем. Но со временем начали проявляться его ограничения.
Во-первых, все отчеты всегда отставали минимум на сутки. Если утром в системе происходили изменения, увидеть их можно было только после следующего запуска ETL. Для финансовых показателей и операционной аналитики такой задержки было уже слишком много.
Во-вторых, сам процесс синхронизации оказался достаточно тяжелым. Во время восстановления нескольких дампов одновременно заметно возрастала нагрузка на дисковую подсистему, а передача резервных копий занимала существенную часть пропускной способности сети.
И наконец, мы теряли историю изменений. Если одна и та же запись в течение дня несколько раз обновлялась, в аналитическую базу попадало только ее последнее состояние на момент создания резервной копии. Все промежуточные изменения просто исчезали, а вместе с ними и возможность анализировать последовательность событий.

В какой-то момент стало понятно, что увеличивать частоту ETL бессмысленно. Хотелось получить не просто более свежие данные, а механизм, который будет передавать изменения практически сразу после их появления и при этом не создавать дополнительную нагрузку на рабочие базы.
Какие варианты рассматривали
Прежде чем собирать отдельный контур аналитики, хотелось понять, можно ли обойтись более простыми решениями. Некоторые варианты отпали практически сразу, некоторые даже удалось проверить на тестовом стенде.
Прямые запросы к рабочим PostgreSQL
Первая идея выглядела максимально простой: подключить Redash напрямую к базам микросервисов и строить отчеты без каких-либо промежуточных хранилищ.
На небольших объемах это действительно работает. BI-система отправляет запросы в PostgreSQL, получает данные и сразу строит графики.
Но в нашем случае такой подход быстро показал свои ограничения.
Каждый отчет объединял данные сразу из нескольких сервисов и содержал тяжелые JOIN, GROUP BY и агрегатные функции. Для OLTP-базы такие запросы — не самая приятная нагрузка. Пока пользователь смотрит дашборд, PostgreSQL вынужден сканировать большие объемы данных, тратя ресурсы, которые в этот момент нужны основному приложению.
Кроме производительности есть и менее очевидная проблема. Длительные транзакции на чтение мешают работе VACUUM: старые версии строк не могут быть удалены, таблицы постепенно разрастаются, а вместе с ними увеличивается размер индексов и время выполнения обычных запросов.
Проще говоря, чем активнее используется аналитика, тем сильнее она начинает влиять на production
PostgreSQL Foreign Data Wrapper (FDW)
Следующей идеей было вообще отказаться от копирования данных и использовать встроенные возможности PostgreSQL. Для этого отлично подходит postgres_fdw, который позволяет подключить удаленную базу как обычную таблицу и выполнять запросы так, будто все данные находятся в одном месте.
Настройка выглядит достаточно просто:
CREATE EXTENSION postgres_fdw; CREATE SERVER msg_server FOREIGN DATA WRAPPER postgres_fdw OPTIONS ( host 'msg-db-prod.internal', dbname 'msg_production', port '5432' ); CREATE USER MAPPING FOR current_user SERVER msg_server OPTIONS ( user 'replicator', password 'secure_pass' ); CREATE FOREIGN TABLE remote_user_messages ( id bigint, user_id bigint, message_text text, created_at timestamp ) SERVER msg_server OPTIONS ( schema_name 'public', table_name 'messages' );
На небольших запросах все работало именно так, как ожидаешь. Можно написать обычный SELECT, сделать JOIN с локальными таблицами и получить результат без каких-либо ETL-процессов.
Проблемы появились, когда мы начали прогонять реальные отчеты.
Оказалось, что при сложных JOIN и агрегациях оптимизатор не всегда может эффективно "протолкнуть" фильтрацию на удаленную сторону. В результате аналитическая база сначала получала большой объем данных по сети и только потом выполняла часть операций локально.
Если таких запросов несколько, задержки начинают накапливаться очень быстро. Время выполнения отчетов становится сильно зависимым от сети и состояния удаленных PostgreSQL, а любая проблема с одним сервисом сразу отражается на аналитике.
В итоге FDW оказался удобным инструментом для небольших выборок и административных задач, но использовать его как основу для построения real-time аналитики на нескольких микросервисах мы не стали.

Нативная логическая репликация PostgreSQL
Еще одним очевидным кандидатом была встроенная логическая репликация PostgreSQL. Механизм достаточно простой: на стороне источника создается PUBLICATION, на стороне получателя — SUBSCRIPTION, после чего изменения начинают передаваться автоматически.
На первый взгляд это выглядело именно тем, что нужно. Каждый микросервис мог публиковать изменения своих таблиц, а общая PostgreSQL-база постепенно собирала бы актуальное состояние всех данных.
Но после оценки архитектуры появились вопросы.
Во-первых, источников у нас было больше десяти, и каждый развивался независимо. Для каждого нужно создать публикации, подписки, настроить права доступа и следить за их состоянием. Пока сервисов немного, это не вызывает проблем, но со временем такая схема превращается в отдельную инфраструктуру, которую тоже нужно поддерживать.
Во-вторых, логическая репликация чувствительна к изменениям схемы. Добавление новых колонок, изменение типов данных или другие DDL-операции требуют дополнительного внимания и могут привести к остановке репликации. А когда команды регулярно выкатывают новые версии сервисов, вероятность подобных ситуаций становится вполне реальной.
И наконец, даже если решить все организационные вопросы, приемником все равно остается PostgreSQL. Он отлично справляется с транзакционной нагрузкой, но использовать его как универсальное аналитическое хранилище для сложных JOIN, агрегаций и обработки десятков миллионов строк — не самая сильная его сторона.
Поэтому от идеи Postgres-to-Postgres мы отказались еще на этапе проектирования и начали смотреть в сторону CDC и колоночных СУБД.

CDC через Debezium и Kafka с записью обратно в PostgreSQL
После этого мы решили зайти с другой стороны и отказаться от периодического ETL. Вместо этого хотелось передавать только изменения данных.
Схема выглядела вполне логично. Debezium читает WAL PostgreSQL и публикует события в Kafka, а JDBC Sink Connector забирает их из топиков и записывает в общую аналитическую PostgreSQL.
Получался полноценный поток изменений без ежедневных бэкапов и без постоянных запросов к рабочим базам.
На тот момент именно этот вариант казался основным кандидатом.
Но после нескольких нагрузочных тестов стало понятно, что мы просто перенесли узкое место в другое место системы.
Каждое изменение проходило длинную цепочку: запись в WAL → сериализация в JSON → Kafka → десериализация → JDBC Connector → INSERT или UPDATE в PostgreSQL. При небольшом потоке это практически незаметно, но с увеличением количества событий начинали расходоваться ресурсы на преобразование данных, а приемник снова упирался в ограничения строковой СУБД.
Самая большая проблема была даже не в Kafka и не в Debezium. Они спокойно справлялись с нагрузкой. Ограничением оказался PostgreSQL, который должен был постоянно принимать поток изменений, обновлять индексы и при этом оставаться аналитическим хранилищем для сложных запросов.
В итоге стало понятно, что сама идея CDC нам подходит, но в качестве конечного хранилища нужна база, изначально рассчитанная на большие объемы аналитических данных.
Именно в этот момент мы начали смотреть в сторону ClickHouse.

На чем в итоге остановились
После нескольких итераций стало понятно, что проблема была не в способе доставки данных, а в конечном хранилище.
Идея с CDC нам понравилась сразу: Debezium читает WAL PostgreSQL и никак не влияет на выполнение запросов или запись в рабочую базу. Все изменения публикуются в Kafka, а дальше их можно отправить в любое количество потребителей.
Оставалось выбрать систему, которая сможет без проблем принимать постоянный поток событий и быстро выполнять аналитические запросы.
Этой системой стал ClickHouse.
В итоговой схеме Debezium асинхронно читает журналы транзакций PostgreSQL и отправляет изменения в Kafka. Затем Altinity ClickHouse Sink Connector забирает сообщения из топиков пакетами и записывает их в ClickHouse.

Такой подход решил сразу несколько задач. Рабочие PostgreSQL больше не участвуют в выполнении аналитических запросов, Kafka выступает буфером между источником и потребителями, а ClickHouse получает данные в том формате, для которого он и создавался — большими последовательными пачками.
На практике разница оказалась заметной. Данные занимают значительно меньше места за счет колоночного хранения и встроенного сжатия, а запросы с агрегациями по десяткам миллионов записей выполняются за время, которое раньше было сложно получить даже на существенно меньших объемах в PostgreSQL.
Спасибо, что дочитали. Если будут вопросы или замечания по архитектуре — с удовольствием обсудим в комментариях. Во второй части перейдем к настройке всего конвейера и посмотрим, как эта схема работает на практике.
