Недавно передо мной встала задача: организовать простое распознавание текста из загруженных документов (сканы PDF, изображения PNG/JPG) на ресурсах AWS. Классический подход – воспользоваться сервисом Amazon Textract или даже запустить Tesseract внутри Lambda. Однако, подход с использованием Amazon Bedrock мне показался более привлекательным и не требующим "танцев с бубном" вокруг структуры ответа Textract.
В этом посте я расскажу, как на практике за пару часов реализовал безсерверный (serverless) OCR-сервис на AWS, используя AWS Lambda и модель из Amazon Bedrock для извлечения текста. Статья ориентирована на опытных AWS-архитекторов, поэтому мы углубимся в архитектуру, покажу код (Terraform для инфраструктуры и Python для Lambda), обсудим масштабирование, ограничения и прикинем стоимость решения в регионе eu-central-1 (Франкфурт). Поехали!
Архитектура решения
Первоначальная идея заключалась в том, чтобы вызывать Lambda-функцию напрямую через S3 Event Notification при загрузке нового объекта. Но довольно быстро стало понятно: это может привести к серьёзным проблемам с троттлингом и превышением лимита одновременных вызовов Lambda (concurrent executions) на большом объеме данных.
Например, представим ситуацию: пользователь загружает пачку из 500 документов в S3. Это может вызвать 500 параллельных вызовов Lambda-функции. Если функция достаточно тяжёлая (например, делает вызов к Bedrock и работает по 5-10 секунд), то можно быстро упереться в лимит на одновременные вызовы (по умолчанию 1000 на аккаунт). К тому же, при резком всплеске входящего трафика S3 не имеет встроенного механизма контроля скорости (rate limiting) для событий – события просто полетят без удержания.
Как я решил эту проблему:
Я заменил прямой вызов Lambda с S3 на асинхронную буферизацию через EventBridge + SQS + SNS. Выглядит это так:
Документ загружается в S3 – это триггерит стандартное событие в EventBridge. Многие не знают, но события S3 автоматически появляются в default event bus. Мы настраиваем EventBridge Rule, которое фильтрует нужные события (например,
PutObjectв нужном бакете с нужным префиксомuploads/).Это событие мы отправляем в SQS и далее уже в топик SNS:
отправляет одно сообщение в SQS – для буферизации и контроля потока;
вызывает Lambda-функцию, которая достаёт событие из топика SNS и запускает OCR-обработку.
Такой паттерн решает сразу несколько задач:
Позволяет отделить момент загрузки файла и начало его обработки.
Через SQS можно легко масштабировать обработку (например, если очередь растёт – можно добавить ещё одну Lambda или контейнер).
Можно делать дедупликацию, ретраи, или даже фильтровать ненужные события через EventBridge.
Вторая архитектурная проблема: размер сообщения в SQS
Изначально я хотел, чтобы результат OCR попадал сразу во вторую SQS из первой Lambda. Но тут мы наталкиваемся на суровое ограничение: размер одного сообщения в SQS — не более 256 КБ. Это может быть критичным, особенно если модель Bedrock вернёт длинный распознанный текст или structured output (например, JSON с кучей данных из PDF-документа).
Как обойти ограничение:
Я изменил паттерн следующим образом:
Результат работы Bedrock кладётся в S3 (например, в бакет
ocr-results-bucket/uuid.json).В SQS идёт только ссылка на файл результата:
{ "resultKey": "ocr-results/123.json", "sourceKey": "uploads/filename.png" }.Lambda-подписчик, который читает из очереди, просто подтягивает JSON с результатом из S3 и пишет нужные поля в DynamoDB.
Это надёжно, масштабируемо и укладывается в лимиты. К тому же, если понадобятся ручные ретраи или проверка ошибок, удобно просто перечитать S3-файл.
Итоговая архитектура:
Пользователь загружает файл в S3 в папку
uploads/EventBridge ловит
PutObjectсобытие и отправляет его в SNS topic:в очередь SQS (буфер);
и одновременно запускает Lambda
InvokeBedrock(через SNS);
Lambda получает путь к файлу, вызывает Bedrock, сохраняет результат OCR в S3 (в
ocr-results/) и публикует сообщение в SQS c ссылкой на результат.Вторая Lambda
SaveToDBобрабатывает очередь, подтягивает JSON с результатом из S3 и записывает в DynamoDB только ссылку на этот объект. Это позволяет избежать превышения лимитов размера записи в DynamoDB и снизить нагрузку при чтении, ведь полный результат можно получить по ссылке из S3.
Всё — мы минимизировали риски троттлинга, соблюли лимиты SQS и сохранили serverless-подход.
Terraform конфигурация
Теперь, когда архитектура проработана, можно описать всё это в Terraform. Нам понадобится:
S3-бакет для загрузки исходных документов
EventBridge Rule
SNS-топик и подписки (на Lambda и SQS)
Очередь SQS
Lambda-функции (InvokeBedrock и SaveToDB)
IAM роли и политики для Lambda
DynamoDB таблица
S3-бакет для хранения результатов OCR
Код ниже разворачивает инфраструктуру под нашу архитектуру и учитывает:
безопасную передачу данных через SQS
размещение результатов в S3
фан-аут через SNS
минимизацию рисков троттлинга
main.tf
provider "aws" { region = "eu-central-1" } # S3 buckets resource "aws_s3_bucket" "uploads" { bucket = "ocr-input-documents" } resource "aws_s3_bucket" "results" { bucket = "ocr-processed-results" } # DynamoDB для хранения метаданных (ключей) resource "aws_dynamodb_table" "ocr_index" { name = "OCRResultsIndex" billing_mode = "PAY_PER_REQUEST" hash_key = "Id" attribute { name = "Id" type = "S" } } # SNS Topic resource "aws_sns_topic" "ocr_topic" { name = "ocr-trigger-topic" } # SQS Queue resource "aws_sqs_queue" "ocr_queue" { name = "ocr-processing-queue" } # Подписка SQS к SNS resource "aws_sns_topic_subscription" "sqs_sub" { topic_arn = aws_sns_topic.ocr_topic.arn protocol = "sqs" endpoint = aws_sqs_queue.ocr_queue.arn } # Подписка Lambda на SNS (InvokeBedrock) resource "aws_lambda_permission" "allow_sns_invoke_lambda" { statement_id = "AllowExecutionFromSNS" action = "lambda:InvokeFunction" function_name = aws_lambda_function.invoke_bedrock.function_name principal = "sns.amazonaws.com" source_arn = aws_sns_topic.ocr_topic.arn } resource "aws_sns_topic_subscription" "lambda_sub" { topic_arn = aws_sns_topic.ocr_topic.arn protocol = "lambda" endpoint = aws_lambda_function.invoke_bedrock.arn } # EventBridge Rule: отлавливаем PutObject в бакете resource "aws_cloudwatch_event_rule" "s3_put" { name = "ocr-upload-trigger" event_pattern = jsonencode({ "source": ["aws.s3"], "detail-type": ["Object Created"], "detail": { "bucket": {"name": [aws_s3_bucket.uploads.bucket]}, "object": {"key": [{"prefix": "uploads/"}]} } }) } # Target — SNS resource "aws_cloudwatch_event_target" "to_sns" { rule = aws_cloudwatch_event_rule.s3_put.name target_id = "SendToSNS" arn = aws_sns_topic.ocr_topic.arn } resource "aws_lambda_function" "invoke_bedrock" { function_name = "InvokeBedrockOCR" role = aws_iam_role.lambda_role.arn handler = "invoke.lambda_handler" runtime = "python3.10" filename = "lambda_invoke.zip" environment { variables = { RESULT_BUCKET = aws_s3_bucket.results.bucket QUEUE_URL = aws_sqs_queue.ocr_queue.id } } } resource "aws_lambda_function" "save_result" { function_name = "SaveOCRToDynamo" role = aws_iam_role.lambda_role.arn handler = "save.lambda_handler" runtime = "python3.10" filename = "lambda_save.zip" environment { variables = { TABLE_NAME = aws_dynamodb_table.ocr_index.name RESULT_BUCKET = aws_s3_bucket.results.bucket } } } # Привязка очереди к функции сохранения resource "aws_lambda_event_source_mapping" "sqs_trigger" { event_source_arn = aws_sqs_queue.ocr_queue.arn function_name = aws_lambda_function.save_result.arn batch_size = 1 } # IAM Role + Policy resource "aws_iam_role" "lambda_role" { name = "ocr_lambda_role" assume_role_policy = jsonencode({ Version = "2012-10-17", Statement = [{ Effect = "Allow", Principal = { Service = "lambda.amazonaws.com" }, Action = "sts:AssumeRole" }] }) } resource "aws_iam_role_policy" "lambda_policy" { name = "ocr_lambda_inline" role = aws_iam_role.lambda_role.id policy = jsonencode({ Version = "2012-10-17", Statement = [ { Effect = "Allow", Action = ["s3:GetObject", "s3:PutObject"], Resource = [ "${aws_s3_bucket.uploads.arn}/*", "${aws_s3_bucket.results.arn}/*" ] }, { Effect = "Allow", Action = ["bedrock:InvokeModel"], Resource = "*" }, { Effect = "Allow", Action = ["dynamodb:PutItem"], Resource = aws_dynamodb_table.ocr_index.arn }, { Effect = "Allow", Action = ["sqs:SendMessage", "sqs:ReceiveMessage", "sqs:DeleteMessage"], Resource = aws_sqs_queue.ocr_queue.arn }, { Effect = "Allow", Action = [ "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ], Resource = "arn:aws:logs:*:*:*" } ] }) }
Lambda-функция: обработка PDF и вызов Bedrock
Теперь разберём первую Lambda-функцию (InvokeBedrock), которая:
Получает уведомление из SNS о новом объекте в S3
Определяет, является ли файл PDF или изображением
Если это PDF — передаёт его содержимое напрямую в Bedrock, так как модели Claude 3 умеют обрабатывать PDF-файлы без предварительной конвертации; если это изображение (JPEG, PNG), оно кодируется в base64 и также передаётся модели
Вызывает Amazon Bedrock с содержимым файла и ожидает ответ
Сохраняет результат в S3 и публикует ссылку на него в SQS
lambda_invoke_bedrock.py
import os import json import base64 import boto3 import mimetypes from urllib.parse import unquote_plus from aws_lambda_powertools import Logger logger = Logger(service="ocr-bedrock") s3_client = boto3.client("s3") bedrock_client = boto3.client("bedrock-runtime") sqs_client = boto3.client("sqs") RESULT_BUCKET = os.environ.get("RESULT_BUCKET") QUEUE_URL = os.environ.get("QUEUE_URL") MODEL_ID = "anthropic.claude-3-sonnet-20240229-v1:0" @logger.inject_lambda_context def lambda_handler(event, context): # noqa: E501 """Lambda function to process S3 events and invoke Bedrock model for OCR. Args: event (dict): The event data from S3. context (LambdaContext): The context object for the Lambda function. Raises: Exception: If there is an error processing the file or invoking the model. """ for record in event["Records"]: try: message = json.loads(record["Sns"]["Message"]) bucket = message["detail"]["bucket"]["name"] key = unquote_plus(message["detail"]["object"]["key"]) logger.info(f"Processing file: s3://{bucket}/{key}") obj = s3_client.get_object(Bucket=bucket, Key=key) content = obj["Body"].read() mime_type, _ = mimetypes.guess_type(key) prompt_text = "Extract all the text from the document and return it as plain text." if mime_type not in ["image/jpeg", "image/png", "application/pdf"]: logger.warning(f"Unsupported MIME type: {mime_type}") continue encoded = base64.b64encode(content).decode("utf-8") request_body = { "anthropic_version": "bedrock-2023-05-31", "messages": [ { "role": "user", "content": [ {"type": "text", "text": prompt_text}, { "type": "document" if mime_type in ["application/pdf"] else "image", "source": { "type": "base64", "media_type": mime_type, "data": encoded, }, }, ], }, ], } response = bedrock_client.invoke_model( modelId=MODEL_ID, body=json.dumps(request_body), contentType="application/json" ) response_body = response["body"].read().decode("utf-8") result_data = json.loads(response_body) output_text = result_data.get("outputs", [{}])[0].get("content", [{}])[0].get("text", "") result_key = key.replace("uploads/", "ocr-results/") + ".json" s3_client.put_object( Bucket=RESULT_BUCKET, Key=result_key, Body=json.dumps({"text": output_text, "source": key}, ensure_ascii=False).encode("utf-8"), ) sqs_client.send_message( QueueUrl=QUEUE_URL, MessageBody=json.dumps({"result_key": result_key, "source_key": key}), ) logger.info(f"Result saved to s3://{RESULT_BUCKET}/{result_key} and queued in SQS") except Exception as e: logger.exception(f"Error processing file: {e}") raise
Общий смысл работы InvokeBedrock Lambda
Функция запускается при получении события из SNS, читает загруженный файл из S3 (PDF или изображение), передаёт его в Amazon Bedrock (модель Claude 3), получает текст, сохраняет его в S3, а ссылку на результат публикует в очередь SQS.
Подробное объяснение по шагам:
Логгирование и инициализация
from aws_lambda_powertools import Logger logger = Logger(service="ocr-bedrock")Используем aws-lambda-powertools для структурированного логгирования. Все логи автоматически маркируются ID вызова Lambda, и это удобно при отладке.
Извлекаем путь к файлу из события
message = json.loads(record["Sns"]["Message"]) bucket = message["detail"]["bucket"]["name"] key = unquote_plus(message["detail"]["object"]["key"])SNS сообщение содержит S3-событие. Мы достаём имя бакета и ключ файла.
Получаем содержимое файла из S3
obj = s3_client.get_object(Bucket=bucket, Key=key) content = obj["Body"].read()Загрузка файла в память для дальнейшей передачи в Bedrock.
Определяем MIME-тип
mime_type, = mimetypes.guesstype(key)Это позволяет нам понять, что за тип файла — PDF или изображение.
Кодируем содержимое
encoded = base64.b64encode(content).decode("utf-8")Для моделей Bedrock мультимодальный ввод передаётся как base64.
Формируем prompt и payload
prompt_text = "Extract all the text ..." ... request_body = {"messages": [...]}Мы передаём модели текстовую инструкцию + изображение/документ.
Вызов Bedrock
response = bedrock_client.invoke_model(...) response_body = response["body"].read().decode("utf-8")Модель обрабатывает документ и возвращает результат (текст в JSON).
Парсинг результата
result_data = json.loads(response_body) output_text = result_data.get("outputs", ...).get("text", "")Мы пытаемся достать текст из структуры ответа модели.
Сохраняем результат в S3
result_key = key.replace("uploads/", "ocr-results/") + ".json" s3_client.put_object(..., Key=result_key, Body=json.dumps(...))Распознанный текст сохраняется в S3 как JSON-файл.
Публикуем ссылку на результат в SQS
sqs_client.send_message( QueueUrl=QUEUE_URL, MessageBody=json.dumps({"result_key": ..., "source_key": ...}) )В очередь мы кладём только ссылку на результат, не сам результат — чтобы не превышать лимит в 256 КБ.
Обработка ошибок
except Exception as e: logger.exception(...)Ошибки логируются в CloudWatch с полным stack trace.
Lambda-функция: сохранение ссылки в DynamoDB
Вторая функция (SaveOCRToDynamo) срабатывает при поступлении сообщения в очередь SQS. Она очень простая по логике:
Получает сообщение из очереди — это JSON с полями
result_keyиsource_keyСохраняет эту информацию в таблицу DynamoDB:
Id— UUID илиmessageIdSourceFile— путь к исходному файлу в S3ResultFile— путь к JSON с ��езультатом в S3
Пример кода:
lambda_savedb.py
import os import json import boto3 from aws_lambda_powertools import Logger logger = Logger(service="ocr-save") dynamodb = boto3.resource("dynamodb") table = dynamodb.Table(os.environ.get("TABLE_NAME")) @logger.inject_lambda_context def lambda_handler(event, context): for record in event["Records"]: try: body = json.loads(record["body"]) result_key = body["result_key"] source_key = body.get("source_key", "unknown") record_id = record.get("messageId") item = { "Id": record_id, "SourceFile": source_key, "ResultFile": result_key } table.put_item(Item=item) logger.info(f"Saved result reference for {source_key} to DynamoDB") except Exception as e: logger.exception(f"Error saving result to DynamoDB: {e}")
Таким образом, мы сохраняем только ссылку на результат, а не весь текст, что помогает избежать лимитов DynamoDB на размер записи (400 КБ) и избыточного чтения. Доступ к полному результату можно получить по ключу ResultFile (s3://...).
Масштабирование и ограничения
Вся архитектура построена по принципу "event-driven" и полностью serverless, что даёт отличные возможности масштабирования:
Lambda: автоматически масштабируется. Если одновременно загружается 1000+ документов, AWS поднимает столько экземпляров, сколько нужно (в пределах лимита concurrent executions — по умолчанию 1000). При необходимости лимит можно поднять через запрос в Support.
SQS: разгружает нагрузку и гарантирует буферизацию. Даже если Lambda временно не справляется, сообщения надёжно лежат в очереди.
SNS: фан-аут позволяет подключить дополнительных подписчиков — например, для мониторинга, аналитики или триггеров на дообработку.
DynamoDB: режим On-Demand позволяет не думать о настройке throughput. Важно лишь не превышать размер записи и следить за partition key.
S3: отлично масштабируется и служит как для хранения оригиналов, так и результатов.
Bedrock: основное узкое место. Важно понимать:
Ограничение на размер запроса — ~5MB
Токенные лимиты у модели (например, max_tokens=4096 или 8192)
Возможна задержка при инференсе (особенно на больших PDF)
Не все модели доступны в каждом регионе — лучше использовать
eu-central-1илиus-east-1
Если объём запросов становится стабильным и большим — можно рассмотреть Provisioned Throughput для Bedrock, чтобы избежать холодных стартов и задержек.
Дополнительные ограничения Amazon Bedrock по нагрузке
Amazon Bedrock, несмотря на всю магию, всё же не бесконечен. У него есть:
Лимиты по числу запросов в минуту (TPS) на конкретную модель — это может стать узким горлышком при массовой обработке;
Лимиты по токенам: как входным (prompt + media), так и выходным (генерация ответа). У Sonnet, например, ~200k токенов на сутки по умолчанию на аккаунт (может варьироваться).
Решения и обходные пути:
Запрос увеличения квот через Service Quotas — первый и обязательный шаг, если вы видите throttling.
Кросс-региональный инференс (inference profiles) — фича, которая позволяет вам в своём регионе (например,
eu-central-1) вызывать модель, размещённую в другом регионе, где доступна нужная квота. Это может сильно разгрузить точку доступа и обойти временные ограничения.Подробнее: Using Amazon Bedrock inference profiles
Пример: вы можете создать Bedrock inference profile, который направляет вызов на модель в
us-east-1, даже если ваша Lambda работает вeu-central-1. Это даёт больше гибкости и повышает устойчивость под нагрузкой.Rate Limiting + очередь — если нагрузка слишком велика, вы можете либо замедлять поступление запросов через throttling на уровне приложения, либо использовать очереди с
visibility timeout, чтобы выравнивать поток.
Пример расчета стоимости (eu-central-1)
Допустим, мы обрабатываем 1000 документов в месяц:
Компонент | Кол-во | Оценка стоимости |
|---|---|---|
Bedrock (Claude Sonnet) | ~4000 токенов (вход + выход) на 1 документ | ~$0.036 × 1000 = $36.00 |
Lambda (Invoke + Save) | до 6 сек × 1000 вызовов | ~$0.10 |
SQS | 1000 сообщений | ~$0.01 (или в пределах Free Tier) |
DynamoDB | 1000 записей | <$0.01 (On-Demand) |
S3 (хранение + запросы) | ~2 ГБ + операции | ~$0.05 |
Итого | ~$36.16 / месяц |
Если заменить Sonnet на Claude Haiku, стоимость может упасть до $10–12 за тот же объём (но качество может быть чуть ниже). Также можно использовать batch-инференс или комбинировать документы.
Заключение
Мы построили полнофункциональный serverless-сервис для OCR на базе Amazon Bedrock. Архитектура получилась простой, надёжной и масштабируемой. Вместо традиционного OCR мы используем LLM, что позволяет не просто "считывать текст", а делать это гибко, контекстно, с возможностью уточнять, резюмировать или извлекать только нужные поля.
Фишки подхода:
минимальные затраты на поддержку инфраструктуры
масштабируемость "из коробки"
понятная схема работы, легко расширяемая под новые задачи
возможность дообработки, A/B тестирования моделей и фан-аута через SNS
⚠️ Важно: В своем решении я не претендую на уникальность. Также, код в статье адаптирован для демонстрации архитектуры и упрощён для читаемости. Некоторые фрагменты могут быть не полностью production-ready — например, типы ошибок Bedrock, исключения, retry-логика, валидация параметров и пр.
В продакшене обязательно:
убедитесь в правильной настройке IAM-ролей,
проверьте лимиты и квоты в Bedrock, Lambda и SQS,
добавьте retry/timeout/логгирование,
протестируйте код под реальную нагрузку.
Если будете использовать код как основу — тестируйте и дорабатывайте под свой проект. Всё, что здесь показано, не содержит чувствительных данных и подготовлено исключительно в ознакомительных целях.
Bonus: Step Functions
Все вышеперечисленное можно описать в одной простой State Machine:

Comment: >- A Bedrock OCR state machine that invokes a Bedrock model and saves the results to S3 and DynamoDB. StartAt: Prepare Bedrock payload States: Prepare Bedrock payload: Type: Pass Next: Bedrock InvokeModel Bedrock InvokeModel: Type: Task Resource: arn:aws:states:::bedrock:invokeModel Parameters: {} Next: Save to s3 Save to s3: Type: Task Resource: arn:aws:states:::lambda:invoke OutputPath: $.Payload Parameters: Payload.$: $ Retry: - ErrorEquals: - Lambda.ServiceException - Lambda.AWSLambdaException - Lambda.SdkClientException - Lambda.TooManyRequestsException IntervalSeconds: 1 MaxAttempts: 3 BackoffRate: 2 JitterStrategy: FULL Next: Parallel State Parallel State: Comment: >- A Parallel state can be used to create parallel branches of execution in your state machine. Type: Parallel Branches: - StartAt: DynamoDB PutItem States: DynamoDB PutItem: Type: Task Resource: arn:aws:states:::dynamodb:putItem Parameters: TableName: MyDynamoDBTable Item: Column: S: MyEntry End: true - StartAt: Do whatever else States: Do whatever else: Type: Pass End: true End: true
