Как стать автором
Обновить

Как я построил serverless OCR-сервис на AWS Lambda и Amazon Bedrock

Уровень сложностиСредний
Время на прочтение13 мин
Количество просмотров1.1K

Недавно передо мной встала задача: организовать простое распознавание текста из загруженных документов (сканы PDF, изображения PNG/JPG) на ресурсах AWS. Классический подход – воспользоваться сервисом Amazon Textract или даже запустить Tesseract внутри Lambda. Однако, подход с использованием Amazon Bedrock мне показался более привлекательным и не требующим "танцев с бубном" вокруг структуры ответа Textract.

В этом посте я расскажу, как на практике за пару часов реализовал безсерверный (serverless) OCR-сервис на AWS, используя AWS Lambda и модель из Amazon Bedrock для извлечения текста. Статья ориентирована на опытных AWS-архитекторов, поэтому мы углубимся в архитектуру, покажу код (Terraform для инфраструктуры и Python для Lambda), обсудим масштабирование, ограничения и прикинем стоимость решения в регионе eu-central-1 (Франкфурт). Поехали!

Архитектура решения

Первоначальная идея заключалась в том, чтобы вызывать Lambda-функцию напрямую через S3 Event Notification при загрузке нового объекта. Но довольно быстро стало понятно: это может привести к серьёзным проблемам с троттлингом и превышением лимита одновременных вызовов Lambda (concurrent executions) на большом объеме данных.

Например, представим ситуацию: пользователь загружает пачку из 500 документов в S3. Это может вызвать 500 параллельных вызовов Lambda-функции. Если функция достаточно тяжёлая (например, делает вызов к Bedrock и работает по 5-10 секунд), то можно быстро упереться в лимит на одновременные вызовы (по умолчанию 1000 на аккаунт). К тому же, при резком всплеске входящего трафика S3 не имеет встроенного механизма контроля скорости (rate limiting) для событий – события просто полетят без удержания.

Как я решил эту проблему:

Я заменил прямой вызов Lambda с S3 на асинхронную буферизацию через EventBridge + SQS + SNS. Выглядит это так:

  1. Документ загружается в S3 – это триггерит стандартное событие в EventBridge. Многие не знают, но события S3 автоматически появляются в default event bus. Мы настраиваем EventBridge Rule, которое фильтрует нужные события (например, PutObject в нужном бакете с нужным префиксом uploads/).

  2. Это событие мы отправляем в SQS и далее уже в топик SNS:

    • отправляет одно сообщение в SQS – для буферизации и контроля потока;

    • вызывает Lambda-функцию, которая достаёт событие из топика SNS и запускает OCR-обработку.

Такой паттерн решает сразу несколько задач:

  • Позволяет отделить момент загрузки файла и начало его обработки.

  • Через SQS можно легко масштабировать обработку (например, если очередь растёт – можно добавить ещё одну Lambda или контейнер).

  • Можно делать дедупликацию, ретраи, или даже фильтровать ненужные события через EventBridge.

Вторая архитектурная проблема: размер сообщения в SQS

Изначально я хотел, чтобы результат OCR попадал сразу во вторую SQS из первой Lambda. Но тут мы наталкиваемся на суровое ограничение: размер одного сообщения в SQS — не более 256 КБ. Это может быть критичным, особенно если модель Bedrock вернёт длинный распознанный текст или structured output (например, JSON с кучей данных из PDF-документа).

Как обойти ограничение:

Я изменил паттерн следующим образом:

  • Результат работы Bedrock кладётся в S3 (например, в бакет ocr-results-bucket/uuid.json).

  • В SQS идёт только ссылка на файл результата: { "resultKey": "ocr-results/123.json", "sourceKey": "uploads/filename.png" }.

  • Lambda-подписчик, который читает из очереди, просто подтягивает JSON с результатом из S3 и пишет нужные поля в DynamoDB.

Это надёжно, масштабируемо и укладывается в лимиты. К тому же, если понадобятся ручные ретраи или проверка ошибок, удобно просто перечитать S3-файл.

Итоговая архитектура:

  1. Пользователь загружает файл в S3 в папку uploads/

  2. EventBridge ловит PutObject событие и отправляет его в SNS topic:

    • в очередь SQS (буфер);

    • и одновременно запускает Lambda InvokeBedrock (через SNS);

  3. Lambda получает путь к файлу, вызывает Bedrock, сохраняет результат OCR в S3 (в ocr-results/) и публикует сообщение в SQS c ссылкой на результат.

  4. Вторая Lambda SaveToDB обрабатывает очередь, подтягивает JSON с результатом из S3 и записывает в DynamoDB только ссылку на этот объект. Это позволяет избежать превышения лимитов размера записи в DynamoDB и снизить нагрузку при чтении, ведь полный результат можно получить по ссылке из S3.

Всё — мы минимизировали риски троттлинга, соблюли лимиты SQS и сохранили serverless-подход.

Terraform конфигурация

Теперь, когда архитектура проработана, можно описать всё это в Terraform. Нам понадобится:

  • S3-бакет для загрузки исходных документов

  • EventBridge Rule

  • SNS-топик и подписки (на Lambda и SQS)

  • Очередь SQS

  • Lambda-функции (InvokeBedrock и SaveToDB)

  • IAM роли и политики для Lambda

  • DynamoDB таблица

  • S3-бакет для хранения результатов OCR

Код ниже разворачивает инфраструктуру под нашу архитектуру и учитывает:

  • безопасную передачу данных через SQS

  • размещение результатов в S3

  • фан-аут через SNS

  • минимизацию рисков троттлинга

main.tf
provider "aws" {
  region = "eu-central-1"
}

# S3 buckets
resource "aws_s3_bucket" "uploads" {
  bucket = "ocr-input-documents"
}

resource "aws_s3_bucket" "results" {
  bucket = "ocr-processed-results"
}

# DynamoDB для хранения метаданных (ключей)
resource "aws_dynamodb_table" "ocr_index" {
  name         = "OCRResultsIndex"
  billing_mode = "PAY_PER_REQUEST"
  hash_key     = "Id"
  attribute {
    name = "Id"
    type = "S"
  }
}

# SNS Topic
resource "aws_sns_topic" "ocr_topic" {
  name = "ocr-trigger-topic"
}

# SQS Queue
resource "aws_sqs_queue" "ocr_queue" {
  name = "ocr-processing-queue"
}

# Подписка SQS к SNS
resource "aws_sns_topic_subscription" "sqs_sub" {
  topic_arn = aws_sns_topic.ocr_topic.arn
  protocol  = "sqs"
  endpoint  = aws_sqs_queue.ocr_queue.arn
}

# Подписка Lambda на SNS (InvokeBedrock)
resource "aws_lambda_permission" "allow_sns_invoke_lambda" {
  statement_id  = "AllowExecutionFromSNS"
  action        = "lambda:InvokeFunction"
  function_name = aws_lambda_function.invoke_bedrock.function_name
  principal     = "sns.amazonaws.com"
  source_arn    = aws_sns_topic.ocr_topic.arn
}

resource "aws_sns_topic_subscription" "lambda_sub" {
  topic_arn = aws_sns_topic.ocr_topic.arn
  protocol  = "lambda"
  endpoint  = aws_lambda_function.invoke_bedrock.arn
}

# EventBridge Rule: отлавливаем PutObject в бакете
resource "aws_cloudwatch_event_rule" "s3_put" {
  name        = "ocr-upload-trigger"
  event_pattern = jsonencode({
    "source": ["aws.s3"],
    "detail-type": ["Object Created"],
    "detail": {
      "bucket": {"name": [aws_s3_bucket.uploads.bucket]},
      "object": {"key": [{"prefix": "uploads/"}]}
    }
  })
}

# Target — SNS
resource "aws_cloudwatch_event_target" "to_sns" {
  rule      = aws_cloudwatch_event_rule.s3_put.name
  target_id = "SendToSNS"
  arn       = aws_sns_topic.ocr_topic.arn
}

resource "aws_lambda_function" "invoke_bedrock" {
  function_name = "InvokeBedrockOCR"
  role          = aws_iam_role.lambda_role.arn
  handler       = "invoke.lambda_handler"
  runtime       = "python3.10"
  filename      = "lambda_invoke.zip"

  environment {
    variables = {
      RESULT_BUCKET = aws_s3_bucket.results.bucket
      QUEUE_URL     = aws_sqs_queue.ocr_queue.id
    }
  }
}

resource "aws_lambda_function" "save_result" {
  function_name = "SaveOCRToDynamo"
  role          = aws_iam_role.lambda_role.arn
  handler       = "save.lambda_handler"
  runtime       = "python3.10"
  filename      = "lambda_save.zip"

  environment {
    variables = {
      TABLE_NAME    = aws_dynamodb_table.ocr_index.name
      RESULT_BUCKET = aws_s3_bucket.results.bucket
    }
  }
}

# Привязка очереди к функции сохранения
resource "aws_lambda_event_source_mapping" "sqs_trigger" {
  event_source_arn = aws_sqs_queue.ocr_queue.arn
  function_name    = aws_lambda_function.save_result.arn
  batch_size       = 1
}

# IAM Role + Policy
resource "aws_iam_role" "lambda_role" {
  name = "ocr_lambda_role"
  assume_role_policy = jsonencode({
    Version = "2012-10-17",
    Statement = [{
      Effect = "Allow",
      Principal = { Service = "lambda.amazonaws.com" },
      Action = "sts:AssumeRole"
    }]
  })
}

resource "aws_iam_role_policy" "lambda_policy" {
  name = "ocr_lambda_inline"
  role = aws_iam_role.lambda_role.id
  policy = jsonencode({
    Version = "2012-10-17",
    Statement = [
      {
        Effect = "Allow",
        Action = ["s3:GetObject", "s3:PutObject"],
        Resource = [
          "${aws_s3_bucket.uploads.arn}/*",
          "${aws_s3_bucket.results.arn}/*"
        ]
      },
      {
        Effect = "Allow",
        Action = ["bedrock:InvokeModel"],
        Resource = "*"
      },
      {
        Effect = "Allow",
        Action = ["dynamodb:PutItem"],
        Resource = aws_dynamodb_table.ocr_index.arn
      },
      {
        Effect = "Allow",
        Action = ["sqs:SendMessage", "sqs:ReceiveMessage", "sqs:DeleteMessage"],
        Resource = aws_sqs_queue.ocr_queue.arn
      },
      {
        Effect = "Allow",
        Action = [
          "logs:CreateLogGroup",
          "logs:CreateLogStream",
          "logs:PutLogEvents"
        ],
        Resource = "arn:aws:logs:*:*:*"
      }
    ]
  })
}

Lambda-функция: обработка PDF и вызов Bedrock

Теперь разберём первую Lambda-функцию (InvokeBedrock), которая:

  1. Получает уведомление из SNS о новом объекте в S3

  2. Определяет, является ли файл PDF или изображением

  3. Если это PDF — передаёт его содержимое напрямую в Bedrock, так как модели Claude 3 умеют обрабатывать PDF-файлы без предварительной конвертации; если это изображение (JPEG, PNG), оно кодируется в base64 и также передаётся модели

  4. Вызывает Amazon Bedrock с содержимым файла и ожидает ответ

  5. Сохраняет результат в S3 и публикует ссылку на него в SQS

lambda_invoke_bedrock.py
import os
import json
import base64
import boto3
import mimetypes
from urllib.parse import unquote_plus
from aws_lambda_powertools import Logger

logger = Logger(service="ocr-bedrock")

s3_client = boto3.client("s3")
bedrock_client = boto3.client("bedrock-runtime")
sqs_client = boto3.client("sqs")

RESULT_BUCKET = os.environ.get("RESULT_BUCKET")
QUEUE_URL = os.environ.get("QUEUE_URL")
MODEL_ID = "anthropic.claude-3-sonnet-20240229-v1:0"


@logger.inject_lambda_context
def lambda_handler(event, context):  # noqa: E501
    """Lambda function to process S3 events and invoke Bedrock model for OCR.

    Args:
        event (dict): The event data from S3.
        context (LambdaContext): The context object for the Lambda function.

    Raises:
        Exception: If there is an error processing the file or invoking the model.
    """
    for record in event["Records"]:
        try:
            message = json.loads(record["Sns"]["Message"])
            bucket = message["detail"]["bucket"]["name"]
            key = unquote_plus(message["detail"]["object"]["key"])
            logger.info(f"Processing file: s3://{bucket}/{key}")

            obj = s3_client.get_object(Bucket=bucket, Key=key)
            content = obj["Body"].read()
            mime_type, _ = mimetypes.guess_type(key)

            prompt_text = "Extract all the text from the document and return it as plain text."

            if mime_type not in ["image/jpeg", "image/png", "application/pdf"]:
                logger.warning(f"Unsupported MIME type: {mime_type}")
                continue

            encoded = base64.b64encode(content).decode("utf-8")

            request_body = {
                "anthropic_version": "bedrock-2023-05-31",
                "messages": [
                    {
                        "role": "user",
                        "content": [
                            {"type": "text", "text": prompt_text},
                            {
                                "type": "document" if mime_type in ["application/pdf"] else "image",
                                "source": {
                                    "type": "base64",
                                    "media_type": mime_type,
                                    "data": encoded,
                                },
                            },
                        ],
                    },
                ],
            }

            response = bedrock_client.invoke_model(
                modelId=MODEL_ID, body=json.dumps(request_body), contentType="application/json"
            )

            response_body = response["body"].read().decode("utf-8")
            result_data = json.loads(response_body)

            output_text = result_data.get("outputs", [{}])[0].get("content", [{}])[0].get("text", "")

            result_key = key.replace("uploads/", "ocr-results/") + ".json"
            s3_client.put_object(
                Bucket=RESULT_BUCKET,
                Key=result_key,
                Body=json.dumps({"text": output_text, "source": key}, ensure_ascii=False).encode("utf-8"),
            )

            sqs_client.send_message(
                QueueUrl=QUEUE_URL,
                MessageBody=json.dumps({"result_key": result_key, "source_key": key}),
            )

            logger.info(f"Result saved to s3://{RESULT_BUCKET}/{result_key} and queued in SQS")

        except Exception as e:
            logger.exception(f"Error processing file: {e}")
            raise

Общий смысл работы InvokeBedrock Lambda

Функция запускается при получении события из SNS, читает загруженный файл из S3 (PDF или изображение), передаёт его в Amazon Bedrock (модель Claude 3), получает текст, сохраняет его в S3, а ссылку на результат публикует в очередь SQS.

Подробное объяснение по шагам:

  1. Логгирование и инициализация

    from aws_lambda_powertools import Logger
    logger = Logger(service="ocr-bedrock")

    Используем aws-lambda-powertools для структурированного логгирования. Все логи автоматически маркируются ID вызова Lambda, и это удобно при отладке.

  2. Извлекаем путь к файлу из события

    message = json.loads(record["Sns"]["Message"])
    bucket = message["detail"]["bucket"]["name"]
    key = unquote_plus(message["detail"]["object"]["key"])

    SNS сообщение содержит S3-событие. Мы достаём имя бакета и ключ файла.

  3. Получаем содержимое файла из S3

    obj = s3_client.get_object(Bucket=bucket, Key=key)
    content = obj["Body"].read()

    Загрузка файла в память для дальнейшей передачи в Bedrock.

  4. Определяем MIME-тип

    mime_type,  = mimetypes.guesstype(key)

    Это позволяет нам понять, что за тип файла — PDF или изображение.

  5. Кодируем содержимое

    encoded = base64.b64encode(content).decode("utf-8")

    Для моделей Bedrock мультимодальный ввод передаётся как base64.

  6. Формируем prompt и payload

    prompt_text = "Extract all the text ..."
    ...
    
    request_body = {"messages": [...]}

    Мы передаём модели текстовую инструкцию + изображение/документ.

  7. Вызов Bedrock

    response = bedrock_client.invoke_model(...)
    response_body = response["body"].read().decode("utf-8")

    Модель обрабатывает документ и возвращает результат (текст в JSON).

  8. Парсинг результата

    result_data = json.loads(response_body)
    output_text = result_data.get("outputs", ...).get("text", "")

    Мы пытаемся достать текст из структуры ответа модели.

  9. Сохраняем результат в S3

    result_key = key.replace("uploads/", "ocr-results/") + ".json"
    s3_client.put_object(..., Key=result_key, Body=json.dumps(...))

    Распознанный текст сохраняется в S3 как JSON-файл.

  10. Публикуем ссылку на результат в SQS

    sqs_client.send_message(
        QueueUrl=QUEUE_URL,
        MessageBody=json.dumps({"result_key": ..., "source_key": ...})
    )

    В очередь мы кладём только ссылку на результат, не сам результат — чтобы не превышать лимит в 256 КБ.

  11. Обработка ошибок

    except Exception as e:
        logger.exception(...)

    Ошибки логируются в CloudWatch с полным stack trace.

Lambda-функция: сохранение ссылки в DynamoDB

Вторая функция (SaveOCRToDynamo) срабатывает при поступлении сообщения в очередь SQS. Она очень простая по логике:

  1. Получает сообщение из очереди — это JSON с полями result_key и source_key

  2. Сохраняет эту информацию в таблицу DynamoDB:

    • Id — UUID или messageId

    • SourceFile — путь к исходному файлу в S3

    • ResultFile — путь к JSON с результатом в S3

Пример кода:

lambda_savedb.py
import os
import json
import boto3
from aws_lambda_powertools import Logger

logger = Logger(service="ocr-save")
dynamodb = boto3.resource("dynamodb")
table = dynamodb.Table(os.environ.get("TABLE_NAME"))

@logger.inject_lambda_context
def lambda_handler(event, context):
    for record in event["Records"]:
        try:
            body = json.loads(record["body"])
            result_key = body["result_key"]
            source_key = body.get("source_key", "unknown")
            record_id = record.get("messageId")

            item = {
                "Id": record_id,
                "SourceFile": source_key,
                "ResultFile": result_key
            }
            table.put_item(Item=item)
            logger.info(f"Saved result reference for {source_key} to DynamoDB")

        except Exception as e:
            logger.exception(f"Error saving result to DynamoDB: {e}")

Таким образом, мы сохраняем только ссылку на результат, а не весь текст, что помогает избежать лимитов DynamoDB на размер записи (400 КБ) и избыточного чтения. Доступ к полному результату можно получить по ключу ResultFile (s3://...).

Масштабирование и ограничения

Вся архитектура построена по принципу "event-driven" и полностью serverless, что даёт отличные возможности масштабирования:

  • Lambda: автоматически масштабируется. Если одновременно загружается 1000+ документов, AWS поднимает столько экземпляров, сколько нужно (в пределах лимита concurrent executions — по умолчанию 1000). При необходимости лимит можно поднять через запрос в Support.

  • SQS: разгружает нагрузку и гарантирует буферизацию. Даже если Lambda временно не справляется, сообщения надёжно лежат в очереди.

  • SNS: фан-аут позволяет подключить дополнительных подписчиков — например, для мониторинга, аналитики или триггеров на дообработку.

  • DynamoDB: режим On-Demand позволяет не думать о настройке throughput. Важно лишь не превышать размер записи и следить за partition key.

  • S3: отлично масштабируется и служит как для хранения оригиналов, так и результатов.

  • Bedrock: основное узкое место. Важно понимать:

    • Ограничение на размер запроса — ~5MB

    • Токенные лимиты у модели (например, max_tokens=4096 или 8192)

    • Возможна задержка при инференсе (особенно на больших PDF)

    • Не все модели доступны в каждом регионе — лучше использовать eu-central-1 или us-east-1

Если объём запросов становится стабильным и большим — можно рассмотреть Provisioned Throughput для Bedrock, чтобы избежать холодных стартов и задержек.

Дополнительные ограничения Amazon Bedrock по нагрузке

Amazon Bedrock, несмотря на всю магию, всё же не бесконечен. У него есть:

  • Лимиты по числу запросов в минуту (TPS) на конкретную модель — это может стать узким горлышком при массовой обработке;

  • Лимиты по токенам: как входным (prompt + media), так и выходным (генерация ответа). У Sonnet, например, ~200k токенов на сутки по умолчанию на аккаунт (может варьироваться).

Решения и обходные пути:

  1. Запрос увеличения квот через Service Quotas — первый и обязательный шаг, если вы видите throttling.

  2. Кросс-региональный инференс (inference profiles) — фича, которая позволяет вам в своём регионе (например, eu-central-1) вызывать модель, размещённую в другом регионе, где доступна нужная квота. Это может сильно разгрузить точку доступа и обойти временные ограничения.

    Подробнее: Using Amazon Bedrock inference profiles

    Пример: вы можете создать Bedrock inference profile, который направляет вызов на модель в us-east-1, даже если ваша Lambda работает в eu-central-1. Это даёт больше гибкости и повышает устойчивость под нагрузкой.

  3. Rate Limiting + очередь — если нагрузка слишком велика, вы можете либо замедлять поступление запросов через throttling на уровне приложения, либо использовать очереди с visibility timeout, чтобы выравнивать поток.

Пример расчета стоимости (eu-central-1)

Допустим, мы обрабатываем 1000 документов в месяц:

Компонент

Кол-во

Оценка стоимости

Bedrock (Claude Sonnet)

~4000 токенов (вход + выход) на 1 документ

~$0.036 × 1000 = $36.00

Lambda (Invoke + Save)

до 6 сек × 1000 вызовов

~$0.10

SQS

1000 сообщений

~$0.01 (или в пределах Free Tier)

DynamoDB

1000 записей

<$0.01 (On-Demand)

S3 (хранение + запросы)

~2 ГБ + операции

~$0.05

Итого

~$36.16 / месяц

Если заменить Sonnet на Claude Haiku, стоимость может упасть до $10–12 за тот же объём (но качество может быть чуть ниже). Также можно использовать batch-инференс или комбинировать документы.

Заключение

Мы построили полнофункциональный serverless-сервис для OCR на базе Amazon Bedrock. Архитектура получилась простой, надёжной и масштабируемой. Вместо традиционного OCR мы используем LLM, что позволяет не просто "считывать текст", а делать это гибко, контекстно, с возможностью уточнять, резюмировать или извлекать только нужные поля.

Фишки подхода:

  • минимальные затраты на поддержку инфраструктуры

  • масштабируемость "из коробки"

  • понятная схема работы, легко расширяемая под новые задачи

  • возможность дообработки, A/B тестирования моделей и фан-аута через SNS


⚠️ Важно: В своем решении я не претендую на уникальность. Также, код в статье адаптирован для демонстрации архитектуры и упрощён для читаемости. Некоторые фрагменты могут быть не полностью production-ready — например, типы ошибок Bedrock, исключения, retry-логика, валидация параметров и пр.

В продакшене обязательно:

  • убедитесь в правильной настройке IAM-ролей,

  • проверьте лимиты и квоты в Bedrock, Lambda и SQS,

  • добавьте retry/timeout/логгирование,

  • протестируйте код под реальную нагрузку.

Если будете использовать код как основу — тестируйте и дорабатывайте под свой проект. Всё, что здесь показано, не содержит чувствительных данных и подготовлено исключительно в ознакомительных целях.


Bonus: Step Functions

Все вышеперечисленное можно описать в одной простой State Machine:

Comment: >-
  A Bedrock OCR state machine that invokes a Bedrock model and saves the
  results to S3 and DynamoDB.
StartAt: Prepare Bedrock payload
States:
  Prepare Bedrock payload:
    Type: Pass
    Next: Bedrock InvokeModel
  Bedrock InvokeModel:
    Type: Task
    Resource: arn:aws:states:::bedrock:invokeModel
    Parameters: {}
    Next: Save to s3
  Save to s3:
    Type: Task
    Resource: arn:aws:states:::lambda:invoke
    OutputPath: $.Payload
    Parameters:
      Payload.$: $
    Retry:
      - ErrorEquals:
          - Lambda.ServiceException
          - Lambda.AWSLambdaException
          - Lambda.SdkClientException
          - Lambda.TooManyRequestsException
        IntervalSeconds: 1
        MaxAttempts: 3
        BackoffRate: 2
        JitterStrategy: FULL
    Next: Parallel State
  Parallel State:
    Comment: >-
      A Parallel state can be used to create parallel branches of execution in
      your state machine.
    Type: Parallel
    Branches:
      - StartAt: DynamoDB PutItem
        States:
          DynamoDB PutItem:
            Type: Task
            Resource: arn:aws:states:::dynamodb:putItem
            Parameters:
              TableName: MyDynamoDBTable
              Item:
                Column:
                  S: MyEntry
            End: true
      - StartAt: Do whatever else
        States:
          Do whatever else:
            Type: Pass
            End: true
    End: true
Теги:
Хабы:
0
Комментарии0

Публикации

Работа

Data Scientist
41 вакансия

Ближайшие события