Подробный технический разбор: как создать масштабируемое гибридное SaaS-хранилище для данных из области анализа безопасности.

В области SaaS-безопасности наиболее уязвимая плоскость атаки — это объём данных. При создании платформы для обнаружения мошенничества, аномалий или нарушения соответствия в таких корпоративных средах как Google Workspace или Microsoft 365, одной обработки данных недостаточно. Кроме этого приходится поглощать миллионы электронных сообщений, логов о прикреплённых файлах и записей о пользовательской активности.

Самый бесхитростный вариант — сбросить всё это в стандартную транзакционную базу данных, например, в PostgreSQL. Но тем самым вы подпишете приговор проекту. При использовании аналитических запросов (напр., “Найти все электронные сообщения от external-domain.com , отправленные в финансовый отдел за прошлый год") вы просто застопорите обычную OLTP-базу данных с построчной организацией записей. Напротив, попытка сконструировать полномасштабное озеро данных с применением Snowflake или BigQuery в качестве минимально жизнеспособного продукта (MVP) — это зачастую перебор, причём, запредельно дорогой.

В этой статье будет рассказано об архитектуре гибридного конвейера поглощения данных. Мы напишем систему, в которой на Golang написан код для высококонкурентного поглощения данных, а база данных DuckDB выступает в качестве встроенного высокопроизводительного аналитического хранилища данных.

1. Цель проекта и архитектура

Цель

Система должна действовать как «прицеп» к клиентской о��лачной среде. Она должна подключаться к клиентскому API, скачивать клиентскую историю переписки и хранить её в формате, оптимизированном для тяжеловесных операций чтения (сканирование на предмет мошеннических паттернов).

«Гибридное» решение

Мы совместили на уровне долговременного хранения данных две разные технологии::

1.       PostgreSQL (OLTP): хранит данные о «состоянии» — идентификатор клиента (Tenant ID), учётные данные API, состояние задачи и оповещения. Для этого требуется обеспечить соответствие с ACID и реляционную целостность.

2.      DuckDB (OLAP): хранит данные о «событиях»: электронные письма и пользовательские логи. DuckDB — это колоночная SQL-база данных, работающая во встроенном режиме (как SQLite), но векторизованная для обработки аналитических запросов.

Схематическое изображение системы

Ниже показана архитектура системы, которую мы стремимся построить. Планировщик (Scheduler) инициирует задания, Сервис поглощения (Ingestion Service) выбирает данные, а Аналитический сервис (Analysis Service) (на будущее) их потребляет.

2. Уровень провайдера: Паттерн Стратегия

Выстраивая масштабируемую систему, недопустимо связывать бизнес-логику с конкретными провайдерами. Сегодня в таком качестве выступают Google и Microsoft, а завтра могут оказаться tomorrow Slack или Zoom.

Эта проблема решается при помощи паттерна Стратегия. Мы определяем строгий интерфейс, обязательный к реализации любым провайдером..

Код (internal/ingestion/provider.go)

Этот интерфейс абстрагирует сложность OAuth, пагинации и квот API.

type ProviderClient interface {
    // Выбирает список пользователей (почтовых адресов) для клиента
    GetUsers(ctx context.Context, tenantID uuid.UUID) ([]domain.User, error)
    
    // Выбирает электронные сообщения по конкретному пользователю, но ТОЛЬКО спустя некоторое время
    GetEmails(ctx context.Context, tenantID uuid.UUID, externalUserID string, receivedAfter time.Time) ([]domain.Email, error)
}

Затем мы реализуем этот интерфейс для каждого провайдера. Обратите внимание, что в нашем минимально жизнеспособном продукте (MVP) часть функций имитируется, но в продакшне такой код содержал бы вызовы к клиентам, выполняемые по  HTTP.

// Образец реализации для Microsoft
type microsoftProvider struct{}
func (p *microsoftProvider) GetUsers(ctx context.Context, tenantID uuid.UUID) ([]domain.User, error) {
    // В продакшне: вызов графового API /v1.0/users
    return []domain.User{
        {ID: uuid.New(), Name: "Satya Nadella", Mail: "satya@msft.example.com"},
    }, nil
}

3. Основная логика: надёжная оркестрация

Структура Service действует как оркестратор. В ней содержатся ссылки на репозиторий хранилища и словарь провайдеров.

Логика синхронизации (internal/ingestion/service.go)

Критически важна функция SyncTenant. Она выполняет многоэтапный процесс синхронизации.

Отказоустойчивость и инкрементная синхронизация

Здесь реализуются два ключевых инженерных решения:

1.       Инкрементная синхронизация: Сначала запрашиваем GetLastSyncTime.  Это избавляет нас от необходимости повторно скачивать терабайты старых электронных сообщений.

2.      Обработка частичных отказов: при отказе одного пользователя (например, при ошибке доступа к API), логируем предупреждение (log.Warn) и переходим к следующему пользователю — continue. Таким образом, из-за отдельно взятого отказа всё задание не отменяется.

func (s *Service) SyncTenant(ctx context.Context, tenantID uuid.UUID, provider domain.Provider) error {
    // 1. Получаем контрольную точку
    lastSyncTime, err := s.repo.GetLastSyncTime(ctx, tenantID, provider)
    if err != nil {
        return err
    }
    // 2. Обновляем список пользователей 
    users, _ := client.GetUsers(ctx, tenantID)
    s.repo.SaveUsers(ctx, users)
    // 3. Попользовательская синхронизация электронных сообщений
    for _, u := range users {
        // Выбираем только дельту (электронные сообщения, поступившие после lastSyncTime)
        emails, err := client.GetEmails(ctx, tenantID, u.ExternalUserID, lastSyncTime)
        
        if err != nil {
            // САМОЕ ВАЖНОЕ: Логируем и продолжаем. Критического отказа не происходит.
            slog.Warn("Failed to get emails", "user", u.Email, "err", err)
            continue
        }
        if len(emails) > 0 {
            s.repo.SaveEmails(ctx, emails)
        }
    }
    return nil
}

4. Высокопроизводительное хранилище: внутреннее устройство DuckDB

Именно здесь происходит магия, связанная с производительностью. В DuckDB можно записывать данные существенно быстрее, чем позволяют стандартные вставки в SQL.

Оптимизация 1: Прикрепитель (Appender)

С таблицей users используем duckdb.Appender. Он работает, минуя парсер SQL, и записывает данные прямо в базовом колоночном формате.

// internal/storage/duckdb.go
func (r *duckDBRepo) SaveUsers(ctx context.Context, users []domain.User) error {
    conn, _ := r.db.Conn(ctx)
    defer conn.Close()
    // Инициализируем Прикрепитель, задавая ему в качестве значения таблицу "users" 
    app, _ := duckdb.NewAppenderFromConn(conn, "", "users")
    defer app.Close()
    for _, u := range users {
        // Прикрепление с нулевым копированием
        app.AppendRow(
            u.ID.String(),
            u.TenantID.String(),
            u.ExternalUserID,
            u.Email,
            u.Name,
            string(u.Provider),
        )
    }
    return app.Flush() // Фиксация пакета
}

Оптимизация 2: Разрешение конфликтов

При работе с электронными сообщениями окна синхронизации при выборке таких сообщений могут частично перекрываться, из-за чего одно и то же электронное письмо может быть выбрано дважды. Чтобы гарантировать идемпотентность, воспользуемся опцией ON CONFLICT DO NOTHING.

func (r *duckDBRepo) SaveEmails(ctx context.Context, emails []domain.Email) error {
    // ... Настройка транзакции ...
    query := `
        INSERT INTO emails (...) VALUES (...)
        ON CONFLICT (tenant_id, external_message_id, provider) DO NOTHING
    `
    // ... Выполнение ...
}

Оптимизация 3: Аналитическое индексирование

DuckDB автоматически создаёт для столбцов индексы “Min/Max”. Запрос с целью найти последнюю контрольную точку выполняется мгновенно (фактически, O(1)), так как DuckDB знает максимальное значение received_at, даже не сканируя таблицу.

SELECT MAX(received_at) FROM emails WHERE tenant_id = ? AND provider = ?

5. Разработка и тестирование

Логику мы реализуем через модульные тесты, а для интеграции воспользуемся Docker.

Модульное тестирование (internal/ingestion/service_test.go)

При помощи stretchr/testify сымитируем репозиторий и провайдер. Таким образом мы сможем протестировать логику оркестрации (напр., "Пропускается ли отказавший пользователь") без необходимости взаимодействовать с реальной базой данных.

func TestService_SyncTenant_Success(t *testing.T) {
    // Настройка моков
    mockRepo := new(MockRepository)
    mockProvider := new(MockProviderClient)
    service := NewService(mockRepo)
    // Определяем ожидания
    mockRepo.On("GetLastSyncTime", ...).Return(time.Time{}, nil)
    mockProvider.On("GetUsers", ...).Return([]domain.User{mockUser}, nil)
    
    // Выполняем логику
    err := service.SyncTenant(ctx, tenantID, provider)
    // Проверяем
    assert.NoError(t, err)
    mockRepo.AssertExpectations(t)
}

Сборка и запуск на локальной машине

1. Чистая сборка:

В этом проекте используется стандартный инструментарий языка Go.

# Инициализируем зависимости
go mod tidy
# Собираем бинарный файл
# CGO_ENABLED=1 требуется для DuckDB
export CGO_ENABLED=1 
go build -v -o ingestion-service ./cmd

2. Выполняем бинарный файл:

# Создаём каталог с данными
mkdir -p /data
export DUCKDB_PATH="/data/security.db"
# Выполняем
./ingestion-service

6. Развёртывание с Docker Compose

В качестве среды для «псевдоразвёртывания» воспользуемся Docker Compose. Так получится подобрать окружение (операционную систему, библиотеки), соответствующие продакшну.

Dockerfile

Применим многоэтапную сборку.

1.       Сборщик: golang:1.25 (Содержит компиляторы, большой).

2.      Выполнитель: debian:bookworm-slim (Маленький, готовый к использованию в продакшне).

Замечание: воспользоваться alpine будет не так просто, поскольку DuckDB требует glibc, отсутствующий в Alpine.

# Этап 1: Сборка
FROM golang:1.25.4-bookworm AS builder
WORKDIR /app
COPY . .
RUN go build -v -o /ingestion-service ./cmd
# Этап 2: Выполнение
FROM debian:bookworm-slim
RUN apt-get update && apt-get install -y libstdc++6 ca-certificates
COPY --from=builder /ingestion-service /usr/local/bin/ingestion-service
ENV DUCKDB_PATH="/data/security.db"
CMD ["/usr/local/bin/ingestion-service"]

Файл Compose

Он поднимает сервис и монтирует локальный том, поэтому информация надёжно сохранится в базе данных, даже если контейнер погибнет.

version: "3.8"
services:
  ingestor:
    build: .
    volumes:
      - ./data:/data  # Долговременное сохранение DuckDB на хосте
    environment:
      - DUCKDB_PATH=/data/security.db

Чтобы выполнить весь стек:

docker-compose up --build

7. Резюме

Данная архитектура демонстрирует современный подход к инженерии данных в Go.

  • Мы обошлись без таких сложных вещей как управление отдельным кластером Kafka и стеком Hadoop на этапе создания MVP.

  • Мы задействовали интерфейсы Go и на их основе создали поддающуюся тестированию модульную систему провайдеров.

  • Опираясь на DuckDB мы получили в рамках единственного бинарного файла такую аналитическую мощность, как в промышленном хранилище данных.

Такая конфигурация позволяет поглощать миллионы электронных сообщений силами всего одной маленькой виртуальной машины и является экономичной и мощной основой для SaaS в области безопасности.

Исходный код: https://github.com/neudinger/email-ingestion.git