Введение: два источника правды - одна большая проблема
Представьте: вы строите систему верификации дипломов. Требования простые - данные должны быть неизменяемыми (привет, блокчейн) и при этом быстро доступными для запросов (привет, PostgreSQL). Казалось бы, идеальное решение - писать в оба хранилища. Но дьявол, как всегда, кроется в деталях.
Наш проект использует паттерн двойной записи (Dual-Write):
Solana — гарантирует неизменность и прозрачность данных о выданных дипломах
PostgreSQL (Supabase) — обеспечивает быстрые выборки и сложные запросы
Звучит красиво на архитектурных диаграммах, но в production всё не так радужно. Главная проблема — частичные сбои. Транзакция в Solana прошла успешно, диплом записан в блокчейн навечно, а вот запись в PostgreSQL упала. Пользователь получил подтверждение, но половина системы о его дипломе не знает.
Сегодня я покажу, как мы столкнулись с этой проблемой лицом к лицу и какие паттерны применили для её решения.
Анатомия сбоя: где именно всё ломается?
Давайте посмотрим на реальный код из нашего internal/api/handlers.rs. Функция issue_diploma - это то место, где происходит магия... и где всё может пойти не так:
pub async fn issue_diploma( State(state): State<Arc<AppState>>, mut multipart: Multipart, ) -> Result<Json<IssueResponse>, AppError> { // ... парсинг multipart данных ... // Генерируем хеш диплома let hash = hashing::generate_hash( &file_bytes, &req.issuer_id, &req.recipient_id, issued_at, req.serial.as_deref(), ); // Подписываем хеш приватным ключом let signature = hashing::sign_hash(&hash, &state.issuer_keypair)?; let diploma = Diploma { hash: hash.clone(), issuer_id: req.issuer_id.clone(), recipient_id: req.recipient_id.clone(), signature: Some(signature.clone()), issued_at, serial: req.serial.clone(), ipfs_cid: None, }; // КРИТИЧЕСКАЯ ТОЧКА №1: Запись в блокчейн Solana // Эта операция может занять 1-3 секунды и стоит денег (газ) let chain_record = state.chain_client.write_hash(&hash, &diploma).await?; // Подготавливаем данные для базы let credential_data = serde_json::json!({ "hash": &diploma.hash, "issuer_id": &diploma.issuer_id, "recipient_id": &diploma.recipient_id, "solana_tx_id": &chain_record.tx_id, "issued_at": diploma.issued_at.to_rfc3339(), }); // КРИТИЧЕСКАЯ ТОЧКА №2: Запись в PostgreSQL // А вот здесь может произойти катастрофа let db_response = state .db_client .from("credentials") .insert(credential_data.to_string()) .execute() .await; // Обработка ошибки базы данных let db_response = match db_response { Ok(response) => response, Err(e) => { tracing::error!("Database request failed: {}", e); return Err(AppError::Database(format!("Database request failed: {}", e))); } }; if !db_response.status().is_success() { let status = db_response.status(); let error_body = db_response.text().await.unwrap_or_default(); // ВОТ ОНА - КРИТИЧЕСКАЯ НЕСОГЛАСОВАННОСТЬ! tracing::error!( "CRITICAL INCONSISTENCY: Failed to save to Supabase after successful Solana transaction. \ tx_id: {}, hash: {}. Status: {}. Body: {}", chain_record.tx_id, hash, status, error_body ); // Мы уже записали в блокчейн, откатить нельзя! return Err(AppError::Internal( "Failed to save credential record after blockchain confirmation.".to_string(), )); } // Если всё хорошо - возвращаем успешный ответ Ok(Json(IssueResponse { hash, tx_id: chain_record.tx_id, signature: Some(signature), issued_at, })) }
Вот как выглядит последовательность операций и точка отказа:
┌──────────┐ ┌────────────┐ ┌─────────┐ ┌────────────┐ │ Клиент │────▶│ Rust API │───▶│ Solana │────▶│ УСПЕХ │ └──────────┘ └────────────┘ └─────────┘ └────────────┘ │ │ │ ▼ │ ┌────────────┐ └───────────────────────────▶│ PostgreSQL │ └────────────┘ │ ▼ ┌────────────┐ │ СБОЙ! │ └────────────┘ │ ▼ ┌──────────────────────┐ │ РАССИНХРОН: │ │ • Блокчейн: ✓ есть | │ • База: ✗ нет | └──────────────────────┘
Последствия рассинхрона
Что происходит после такого сбоя? У нас есть несколько неприятных сценариев:
Пользователь не может найти свой диплом через API запросы к базе данных
Невозможность построить аналитику — данные в базе неполные
Проблемы с аудитом — в блокчейне есть запись, в отчётах её нет
Дублирование при повторной попытке — пользователь может попробовать выпустить диплом ещё раз
Теория на практике: паттерны для спасения данных
Проблема согласованности: выбираем стратегию
В распределённых системах есть два основных подхода к согласованности:
1. Строгая согласованность (Strong Consistency) - все узлы видят одинаковые данные в один момент времени. Это дорого и сложно, особенно когда один из узлов - публичный блокчейн.
2. Итоговая согласованность (Eventual Consistency) - данные могут временно различаться, но в конечном итоге придут к согласованному состоянию.
Мы выбрали итоговую согласованность. Почему? Откатить транзакцию в Solana после подтверждения - невозможно. Значит, нужно гарантировать, что PostgreSQL рано или поздно получит эти данные.
Паттерн Saga: длинные транзакции с компенсацией
Паттерн Saga разбивает длинную распределённую транзакцию на последовательность локальных транзакций. Каждый шаг может иметь компенсирующую транзакцию для отката.
Как это могло бы выглядеть в нашем случае:
// Псевдокод Saga для выпуска диплома enum SagaStep { SaveToDatabase, // Шаг 1 WriteToBlockchain, // Шаг 2 UpdateDatabaseStatus // Шаг 3 } async fn issue_diploma_saga(diploma: Diploma) -> Result<(), SagaError> { // Шаг 1: Сохраняем в БД со статусом "pending" let db_record = match save_to_database_with_status(&diploma, "pending").await { Ok(record) => record, Err(e) => { // Ничего откатывать не нужно, просто выходим return Err(SagaError::DatabaseFailed(e)); } }; // Шаг 2: Пишем в блокчейн let tx_id = match write_to_blockchain(&diploma).await { Ok(tx) => tx, Err(e) => { // Компенсирующая транзакция: помечаем запись как неудачную mark_database_record_failed(&db_record.id).await?; return Err(SagaError::BlockchainFailed(e)); } }; // Шаг 3: Обновляем статус в БД на "confirmed" match update_database_status(&db_record.id, "confirmed", &tx_id).await { Ok(_) => Ok(()), Err(e) => { // Здесь компенсация сложная: в блокчейне уже есть запись // Можно только пометить в БД для ручного вмешательства mark_for_manual_reconciliation(&db_record.id, &tx_id).await?; Err(SagaError::InconsistentState(e)) } } }
Проблема с Saga в блокчейне: Компенсирующие транзакции в Solana стоят денег (газ) и не отменяют предыдущие записи, а добавляют новые. Это делает паттерн дорогим и сложным.
Идемпотентность и повторные попытки
Идемпотентность - это свойство операции давать один и тот же результат при повторных вызовах. В нашем контексте это критически важно.
Вот как мы могли бы добавить механизм повторных попыток:
use tokio::time::{sleep, Duration}; async fn write_to_database_with_retry( db_client: &Postgrest, data: serde_json::Value, max_retries: u32, ) -> Result<(), AppError> { let mut retries = 0; let mut backoff = Duration::from_millis(100); loop { match db_client .from("credentials") .insert(data.to_string()) .execute() .await { Ok(response) if response.status().is_success() => { return Ok(()); } Ok(response) if response.status() == 409 => { // Конфликт - запись уже существует (идемпотентность!) tracing::info!("Record already exists, considering it success"); return Ok(()); } Ok(_) | Err(_) if retries < max_retries => { retries += 1; tracing::warn!( "Database write failed, retry {}/{} after {:?}", retries, max_retries, backoff ); sleep(backoff).await; backoff *= 2; // Экспоненциальная задержка } _ => { return Err(AppError::Database( "Failed after maximum retries".to_string() )); } } } }
Недостаток: Если база недоступна долго (например, плановое обслуживание), пользователь будет ждать. А блокчейн-транзакция уже выполнена!
Наше решение: паттерн Outbox и фоновый Reconciliation Job
После анализа различных подходов, мы остановились на комбинации двух паттернов:
Паттерн Transactional Outbox
Суть паттерна Outbox - вместо записи напрямую в две системы, мы делаем одну атомарную транзакцию в первичное хранилище, включая событие в таблицу outbox.
Вот как изменится наша архитектура:
// Новая структура для outbox #[derive(Serialize, Deserialize)] struct OutboxEvent { id: Uuid, event_type: String, payload: serde_json::Value, status: String, // "pending", "processing", "completed", "failed" created_at: DateTime<Utc>, processed_at: Option<DateTime<Utc>>, retry_count: u32, error_message: Option<String>, } // Изменённая функция issue_diploma pub async fn issue_diploma_with_outbox( State(state): State<Arc<AppState>>, mut multipart: Multipart, ) -> Result<Json<IssueResponse>, AppError> { // ... парсинг и генерация хеша как раньше ... // ВАЖНО: Сначала пишем в БД в одной транзакции let mut transaction = state.db_client.begin_transaction().await?; // Сохраняем диплом со статусом "pending_blockchain" let credential_data = serde_json::json!({ "hash": &diploma.hash, "issuer_id": &diploma.issuer_id, "recipient_id": &diploma.recipient_id, "status": "pending_blockchain", "issued_at": diploma.issued_at.to_rfc3339(), }); transaction .from("credentials") .insert(credential_data.to_string()) .execute() .await?; // Добавляем событие в outbox let outbox_event = serde_json::json!({ "id": Uuid::new_v4(), "event_type": "WRITE_TO_BLOCKCHAIN", "payload": serde_json::to_value(&diploma)?, "status": "pending", "created_at": Utc::now(), "retry_count": 0, }); transaction .from("outbox_events") .insert(outbox_event.to_string()) .execute() .await?; // Коммитим транзакцию - либо всё, либо ничего! transaction.commit().await?; // Возвращаем ответ пользователю Ok(Json(IssueResponse { hash: diploma.hash, tx_id: "pending".to_string(), // Будет обновлён асинхронно signature: Some(signature), issued_at: diploma.issued_at, })) }
Теперь нужен фоновый процесс для обработки событий из outbox:
// Фоновый воркер для обработки outbox async fn outbox_processor(state: Arc<AppState>) { loop { // Получаем необработанные события let events = fetch_pending_outbox_events(&state.db_client).await; for event in events { match event.event_type.as_str() { "WRITE_TO_BLOCKCHAIN" => { process_blockchain_write(event, &state).await; } _ => { tracing::warn!("Unknown event type: {}", event.event_type); } } } // Спим перед следующей итерацией tokio::time::sleep(Duration::from_secs(5)).await; } } async fn process_blockchain_write( event: OutboxEvent, state: &Arc<AppState> ) { let diploma: Diploma = serde_json::from_value(event.payload.clone()) .expect("Failed to deserialize diploma"); // Пытаемся записать в блокчейн match state.chain_client.write_hash(&diploma.hash, &diploma).await { Ok(chain_record) => { // Успех! Обновляем статусы let mut transaction = state.db_client.begin_transaction().await.unwrap(); // Обновляем credential transaction .from("credentials") .update(serde_json::json!({ "status": "confirmed", "solana_tx_id": chain_record.tx_id, }).to_string()) .eq("hash", &diploma.hash) .execute() .await .unwrap(); // Помечаем событие как обработанное transaction .from("outbox_events") .update(serde_json::json!({ "status": "completed", "processed_at": Utc::now(), }).to_string()) .eq("id", event.id.to_string()) .execute() .await .unwrap(); transaction.commit().await.unwrap(); } Err(e) => { // Ошибка - увеличиваем счётчик попыток update_outbox_event_retry(&state.db_client, event.id, e.to_string()).await; } } }
Reconciliation Job: сверочный процесс
Даже с Outbox паттерном что-то может пойти не так. Поэтому мы добавили фоновый процесс сверки:
// Периодическая сверка данных между Solana и PostgreSQL async fn reconciliation_job(state: Arc<AppState>) { loop { tracing::info!("Starting reconciliation check..."); // Получаем последние транзакции из Solana за последний час let cutoff_time = Utc::now() - Duration::from_secs(3600); let blockchain_records = fetch_recent_blockchain_transactions( &state.chain_client, cutoff_time ).await; for record in blockchain_records { // Проверяем, есть ли запись в PostgreSQL let db_result = state .db_client .from("credentials") .select("hash") .eq("hash", &record.hash) .single() .execute() .await; if db_result.is_err() || !db_result.unwrap().status().is_success() { // Запись отсутствует в БД - добавляем tracing::warn!( "Found orphaned blockchain record: hash={}, tx_id={}", record.hash, record.tx_id ); // Восстанавливаем запись из блокчейна let recovery_data = serde_json::json!({ "hash": record.hash, "solana_tx_id": record.tx_id, "status": "recovered_from_blockchain", "recovered_at": Utc::now(), // Остальные поля берём из метаданных транзакции }); match state .db_client .from("credentials") .insert(recovery_data.to_string()) .execute() .await { Ok(_) => { tracing::info!("Successfully recovered record: {}", record.hash); // Отправляем алерт команде send_alert( "Data inconsistency detected and fixed", &format!("Recovered hash {} from blockchain", record.hash) ).await; } Err(e) => { tracing::error!("Failed to recover record: {}", e); } } } } // Запускаем сверку каждые 5 минут tokio::time::sleep(Duration::from_secs(300)).await; } }
Визуализация нового подхода:
┌──────────┐ ┌────────────┐ ┌─────────────┐ │ Клиент │────▶│ Rust API │────▶│ PostgreSQL │ └──────────┘ └────────────┘ │ + Outbox │ └─────────────┘ │ ▼ ┌─────────────┐ │ УСПЕХ │ │ (Атомарно) │ └─────────────┘ │ ┌──────────────────┼──────────────────┐ ▼ ▼ ▼ ┌─────────────────┐ ┌──────────────┐ ┌──────────────┐ │ Outbox Processor│ │Reconciliation│ │ Monitoring │ │ (Async) │ │ Job │ │ & Alerts │ └─────────────────┘ └──────────────┘ └──────────────┘ │ │ ▼ ▼ ┌──────────┐ ┌──────────┐ │ Solana │◀─────│ Проверка │ └──────────┘ └──────────┘
Что можно улучшить: взгляд в будущее
Очередь с повторными попытками
Вместо простого outbox в БД, можно использовать полноценную очередь сообщений:
// Интеграция с Redis Streams для более надёжной доставки use redis::AsyncCommands; async fn publish_to_queue( redis_client: &redis::Client, diploma: &Diploma, ) -> Result<(), AppError> { let mut conn = redis_client.get_async_connection().await?; let event = serde_json::json!({ "type": "WRITE_TO_BLOCKCHAIN", "payload": diploma, "timestamp": Utc::now().to_rfc3339(), "retry_count": 0, }); // Добавляем в Redis Stream с автогенерацией ID conn.xadd( "diploma:outbox", "*", &[("event", serde_json::to_string(&event)?)], ).await?; Ok(()) } // Консьюмер с группой для гарантированной доставки async fn consume_from_queue(redis_client: &redis::Client, state: Arc<AppState>) { let mut conn = redis_client.get_async_connection().await.unwrap(); // Создаём consumer группу let _: Result<(), _> = conn.xgroup_create_mkstream( "diploma:outbox", "blockchain_writers", "$", ).await; loop { // Читаем события из очереди let events: Vec<StreamReadReply> = conn.xreadgroup( &["diploma:outbox"], "blockchain_writers", "worker_1", &[">"], Some(1), None, ).await.unwrap(); for event in events { // Обрабатываем и подтверждаем process_event(event, &state).await; conn.xack("diploma:outbox", "blockchain_writers", &[event.id]).await.unwrap(); } } }
Мониторинг и алертинг
Критически важно отслеживать состояние системы:
use prometheus::{register_counter_vec, register_histogram_vec, CounterVec, HistogramVec}; lazy_static! { static ref INCONSISTENCY_COUNTER: CounterVec = register_counter_vec!( "diploma_inconsistencies_total", "Total number of data inconsistencies detected", &["type"] ).unwrap(); static ref RECONCILIATION_DURATION: HistogramVec = register_histogram_vec!( "reconciliation_duration_seconds", "Time taken to reconcile records", &["status"] ).unwrap(); } // Используем метрики в коде async fn monitor_inconsistency(inconsistency_type: &str) { INCONSISTENCY_COUNTER .with_label_values(&[inconsistency_type]) .inc(); // Алерт если слишком много несогласованностей let total = INCONSISTENCY_COUNTER .with_label_values(&[inconsistency_type]) .get(); if total > 10.0 { send_critical_alert( "High inconsistency rate detected", &format!("Type: {}, Count: {}", inconsistency_type, total) ).await; } }
Event Sourcing для полной трассировки
Можно пойти ещё дальше и хранить все события как неизменяемый лог:
#[derive(Serialize, Deserialize)] enum DiplomaEvent { Created { hash: String, issuer_id: String, recipient_id: String, timestamp: DateTime<Utc>, }, BlockchainWriteRequested { hash: String, timestamp: DateTime<Utc>, }, BlockchainWriteCompleted { hash: String, tx_id: String, timestamp: DateTime<Utc>, }, BlockchainWriteFailed { hash: String, error: String, retry_count: u32, timestamp: DateTime<Utc>, }, ReconciliationDetected { hash: String, source: String, // "blockchain" или "database" timestamp: DateTime<Utc>, }, } // Это даёт нам полную историю каждого диплома async fn append_event( db_client: &Postgrest, event: DiplomaEvent, ) -> Result<(), AppError> { let event_data = serde_json::json!({ "event_type": event.variant_name(), "payload": serde_json::to_value(&event)?, "timestamp": Utc::now(), }); db_client .from("diploma_events") .insert(event_data.to_string()) .execute() .await?; Ok(()) }
Заключение: уроки, извлечённые из транзакций
Работая с паттерном двойной записи между Solana и PostgreSQL, мы извлекли несколько важных уроков:
Никогда не полагайтесь на последовательные вызовы — если первый прошёл успешно, это не гарантирует успех второго. Особенно когда первый — это необратимая операция в блокчейне.
Проектируйте с учётом сбоев — вопрос не в том, упадёт ли система, а в том, когда это произойдёт. Паттерн Outbox и фоновая сверка — это не избыточность, а необходимость.
Итоговая согласованность — ваш друг, не пытайтесь достичь строгой согласованности между блокчейном и традиционной БД. Это дорого, сложно и часто невозможно.
Мониторинг критичен — лучше получить алерт о рассинхроне через минуту, чем узнать о проблеме от пользователя через неделю.
Идемпотентность спасает — проектируйте операции так, чтобы их можно было безопасно повторять. Это упрощает восстановление после сбоев.
Для коллег, работающих с Web3-бэкендами на Rust: блокчейн — это не серебряная пуля. Это мощный инструмент, но он требует тщательного проектирования всей системы. Двойная запись кажется простым решением, пока вы не столкнётесь с первым production‑сбоем в 3 часа ночи.
Помните: в распределённых системах всё, что может пойти не так, обязательно пойдёт не так. Проектируйте соответственно.
Полезные ссылки
Telegram — автор
Если у вас есть опыт решения подобных проблем или вопросы по реализации - давайте обсудим в комментариях. Особенно интересно услышать про альтернативные подходы к синхронизации блокчейна с традиционными БД.
