Пришла мне задача принимать gpg-зашифрованные csv, расшифровывать их с секретным ключом и паролем из AWS Secrets Manager, а затем сразу конвертировать в parquet, что бы после подключить AWS Athena и не думать о преобразованиях, партициях и так далее. Все должно стартовать само, когда в S3 появляется новый файл.

S3

Так как всё происходит в AWS, я вооружаюсь terraform и иду создавать 3 бакета.

  • raw - непосредственно сюда будут загружать csv.gpg файлы, и по event'ам из этого бакета будет начинаться процесс конвертации

  • decrypted - в этот бакет будут складываться расшифрованные файлы

  • parquet - в этом бакете будут храниться сконвертированные из csv, paraquet файлы

  • scripts - этот бакет нужен для aws glue, тут хранится код который glue будет запускать для дешифровки и конвертации

resource "aws_s3_bucket" "raw" {
  bucket = "raw-csv-gpg"
}

# Отправляем евенты в eventbridge, что бы выцепить создание объекта
resource "aws_s3_bucket_notification" "raw" {
  bucket = aws_s3_bucket.raw.bucket
  eventbridge = true
}

resource "aws_s3_bucket" "decrypted" {
  bucket = "decrypted-csv"
}

resource "aws_s3_bucket" "parquet" {
  bucket = "parquet"
}

resource "aws_s3_bucket" "scripts" {
  bucket = "glue-scripts"
}

# Для простоты, просто загружаем готовые python файлы в бакет scripts
resource "aws_s3_object" "decrypt_script" {
  bucket = aws_s3_bucket.scripts.bucket
  key = "decrypt.py"
  source = "${path.module}/src/decrypt.py"
}

resource "aws_s3_object" "convert_script" {
  bucket = aws_s3_bucket.scripts.bucket
  key = "convert.py"
  source = "${path.module}/src/convert.py"
}

EventBridge

Создаём правило в eventbridge которые слушает бакет raw, и если видит что в него загрузили файл с постфиксом .csv.gpg то запускается лямбда которая будет запускать workflow. Предвижу вопрос: "eventbridge умеет запускать workflow сам, зачем ты добавил лямбду?" Я отвечу, я не нашёл возможности передавать в glue в качестве параметра какой файл взять и начать обрабатывать. А если такого файла нет то будут обрабатываться всегда все файлы которые находятся в бакете, этого мне захотелось избежать

resource "aws_cloudwatch_event_rule" "s3_upload" {
  name = "s3-csv-gpg-trigger"

  event_pattern = jsonencode({
    "source": ["aws.s3"],
    "detail-type": ["Object Created"],
    "detail": {
      "bucket": {
        "name": [aws_s3_bucket.raw.bucket]
      },
      "object": {
        "key": [{ "suffix": "csv.gpg" }]
      }
    }
  })
}

resource "aws_cloudwatch_event_target" "lambda_target" {
  rule      = aws_cloudwatch_event_rule.s3_upload.name
  target_id = "InvokeLambdaStarter"
  arn       = aws_lambda_function.workflow_starter.arn
}

resource "aws_lambda_permission" "allow_cloudwatch" {
  statement_id  = "AllowExecutionFromCloudWatch"
  action        = "lambda:InvokeFunction"
  function_name = aws_lambda_function.workflow_starter.function_name
  principal     = "events.amazonaws.com"
  source_arn    = aws_cloudwatch_event_rule.s3_upload.arn
}

Lambda

Простая лямбда, которая слушает eventbridge видит евент от s3 что создан файлик, вытаскивает из евента имя файла и запускает Glue workflow

data "archive_file" "lambda_zip" {
  type        = "zip"
  source_file = "lambda/lambda_handler.py"
  output_path = "lambda/lambda_handler.zip"
}

resource "aws_lambda_function" "workflow_starter" {
  function_name    = "glue-workflow-start"
  filename         = data.archive_file.lambda_zip.output_path
  source_code_hash = data.archive_file.lambda_zip.output_base64sha256
  handler          = "lambda_handler.lambda_handler"
  runtime          = "python3.14"
  role             = aws_iam_role.lambda_glue_invoker_role.arn
  timeout          = 60
  environment {
      variables = {
        GLUE_WORKFLOW_NAME = aws_glue_workflow.convert_workflow.name
      }
    }
}

resource "aws_iam_role" "lambda_glue_invoker_role" {
  name = "lambda-glue-invoker"

  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [{
      Action    = "sts:AssumeRole"
      Effect    = "Allow"
      Principal = { Service = "lambda.amazonaws.com" }
    }]
  })
}

resource "aws_iam_policy" "lambda_glue_policy" {
  name = "lambda-glue-invoker"

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Action = [
          "logs:CreateLogGroup",
          "logs:CreateLogStream",
          "logs:PutLogEvents"
        ]
        Effect   = "Allow"
        Resource = "arn:aws:logs:*:*:*"
      },
      {
        Effect   = "Allow"
        Action   = "glue:StartWorkflowRun"
        Resource = aws_glue_workflow.convert_workflow.arn
      }
    ]
  })
}

resource "aws_iam_role_policy_attachment" "lambda_policy_attach" {
  role       = aws_iam_role.lambda_glue_invoker_role.name
  policy_arn = aws_iam_policy.lambda_glue_policy.arn
}

Самая лямбда функция. Код писала ИИшка, прошу сильно не бить, свою задачу выполняет, меня устраивает

lambda_handler.py

import json
import os
import boto3

WORKFLOW_NAME = os.environ.get('GLUE_WORKFLOW_NAME')

glue_client = boto3.client('glue')

def lambda_handler(event, context):
    print("Received event: " + json.dumps(event, indent=2))

    if 'detail' not in event or 'bucket' not in event['detail'] or 'object' not in event['detail']:
        print("Event structure missing S3 details. Exiting.")
        return {'statusCode': 400, 'body': 'Invalid event structure'}

    s3_bucket = event['detail']['bucket']['name']
    s3_key = event['detail']['object']['key']

    if not s3_key.lower().endswith('.csv.gpg'):
        print(f"Skipping file: {s3_key}")
        return {'statusCode': 200, 'body': 'Skipped'}

    print(f"Starting workflow for s3://{s3_bucket}/{s3_key}")

    try:
        response = glue_client.start_workflow_run(
            Name=WORKFLOW_NAME,
            RunProperties={
                'input_bucket': s3_bucket,
                'input_key': s3_key
            }
        )
        print(f"Glue Workflow Run Started: {response['RunId']}")

        return {
            'statusCode': 200,
            'body': json.dumps({'message': 'Workflow started', 'RunId': response['RunId']})
        }

    except Exception as e:
        print(f"Error starting Glue Workflow: {e}")
        raise e

Glue

Теперь создаём AWS Glue. Нам требуется создать сам workflow который мы будем запускать, и две джобы которые будут заниматься расшифровкой и конвертированием.

resource "aws_glue_workflow" "convert_workflow" {
  name = "csv-gpg-convert"
}

resource "aws_glue_job" "decrypt" {
  name         = "decrypt-gpg"
  role_arn     = aws_iam_role.glue_decrypt.arn
  glue_version = "4.0"

  command {
    name            = "pythonshell"
    python_version  = "3.9"
    script_location = "s3://${aws_s3_bucket.scripts.bucket}/decrypt.py"
  }

  default_arguments = {
    "--RAW_BUCKET"                = aws_s3_bucket.raw.bucket
    "--DECRYPTED_BUCKET"          = aws_s3_bucket.decrypted.bucket
    "--GPG_KEY_SECRET"            = "CSV_GPG_CONVERT/KEY"
    "--GPG_PASSPHRASE_SECRET"     = "CSV_GPG_CONVERT/PASSPHRASE"
    "--additional-python-modules" = "python-gnupg==0.5.5"
  }
}

resource "aws_glue_job" "convert" {
  name         = "csv-to-parquet"
  role_arn     = aws_iam_role.glue_convert.arn
  glue_version = "4.0"

  command {
    name            = "glueetl"
    script_location = "s3://${aws_s3_bucket.scripts.bucket}/convert.py"
    python_version  = "3"
  }

  default_arguments = {
    "--DECRYPTED_BUCKET" = aws_s3_bucket.decrypted.bucket
    "--PARQUET_BUCKET"   = aws_s3_bucket.parquet.bucket
  }
}

# Запускаем расшифровку по команде из лямбды
resource "aws_glue_trigger" "start_decrypt" {
  name          = "trigger-start-decrypt"
  type          = "EVENT"
  workflow_name = aws_glue_workflow.convert_workflow.name

  actions {
    job_name = aws_glue_job.decrypt.name
  }
}

# Конвертация запустится, только в том случае если job decrypt перешла в статус SUCCEEDED
resource "aws_glue_trigger" "convert" {
  name          = "trigger-convert"
  type          = "CONDITIONAL"
  workflow_name = aws_glue_workflow.convert_workflow.name

  predicate {
    conditions {
      job_name = aws_glue_job.decrypt.name
      state    = "SUCCEEDED"
    }
  }

  actions {
    job_name = aws_glue_job.convert.name
  }
}

resource "aws_iam_role" "glue_decrypt" {
  name = "glue-decrypt-role"

  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [{
      Effect = "Allow"
      Principal = { Service = "glue.amazonaws.com" }
      Action = "sts:AssumeRole"
    }]
  })
}

resource "aws_iam_role_policy" "glue_decrypt_policy" {
  role = aws_iam_role.glue_decrypt.name

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect   = "Allow"
        Action   = ["s3:ListBucket"]
        Resource = [
          "${aws_s3_bucket.raw.arn}",
          "${aws_s3_bucket.decrypted.arn}"
        ]
      },
      {
        Effect   = "Allow"
        Action   = ["s3:GetObject", "s3:PutObject"]
        Resource = [
          "${aws_s3_bucket.raw.arn}/*",
          "${aws_s3_bucket.decrypted.arn}/*",
          "${aws_s3_bucket.scripts.arn}/*"
        ]
      },
      {
        Effect   = "Allow"
        Action   = ["secretsmanager:GetSecretValue"]
        Resource = "arn:aws:secretsmanager:${var.region}:${data.aws_caller_identity.current.account_id}:secret:CSV_GPG_CONVERT*"
      },
      {
        Effect   = "Allow"
        Action   = ["glue:GetWorkflowRunProperties"]
        Resource = [aws_glue_workflow.convert_workflow.arn]
      },
      {
        Action = [
          "logs:CreateLogGroup",
          "logs:CreateLogStream",
          "logs:PutLogEvents"
        ]
        Effect   = "Allow"
        Resource = "arn:aws:logs:*:*:*"
      }
    ]
  })
}

resource "aws_iam_role" "glue_convert" {
  name = "glue-convert-role"

  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [{
      Effect = "Allow"
      Principal = { Service = "glue.amazonaws.com" }
      Action = "sts:AssumeRole"
    }]
  })
}

resource "aws_iam_role_policy" "glue_convert_policy" {
  name = "glue-convert-policy"
  role = aws_iam_role.glue_convert.name

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect   = "Allow"
        Action   = ["s3:ListBucket"]
        Resource = [
            "${aws_s3_bucket.decrypted.arn}",
            "${aws_s3_bucket.parquet.arn}",
            "${aws_s3_bucket.scripts.arn}"
          ]
      },
      {
        Effect   = "Allow"
        Action   = ["s3:GetObject"]
        Resource = [
            "${aws_s3_bucket.decrypted.arn}/*",
            "${aws_s3_bucket.scripts.arn}/*"
          ]
      },
      {
        Effect   = "Allow"
        Action   = ["s3:PutObject"]
        Resource = ["${aws_s3_bucket.parquet.arn}/*"]
      },
      {
        Action = [
          "logs:CreateLogGroup",
          "logs:CreateLogStream",
          "logs:PutLogEvents"
        ]
        Effect   = "Allow"
        Resource = "arn:aws:logs:*:*:*"
      },
      {
        Effect   = "Allow"
        Action   = ["glue:GetWorkflowRunProperties"]
        Resource = [aws_glue_workflow.convert_workflow.arn]
      }
    ]
  })
}

Так же прикладываю файлы decrypt.py и convert.py которые делают основную работу. Код так же писала ИИшка, прошу сильно не бить

convert.py

import sys
import json
import boto3
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(
    sys.argv,
    [
        "JOB_NAME",
        "DECRYPTED_BUCKET",
        "PARQUET_BUCKET",
        "WORKFLOW_NAME",
        "WORKFLOW_RUN_ID"
    ]
)

workflow_name = args["WORKFLOW_NAME"]
workflow_run_id = args["WORKFLOW_RUN_ID"]
decrypted_bucket = args["DECRYPTED_BUCKET"]
parquet_bucket = args["PARQUET_BUCKET"]

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

job = Job(glueContext)
job.init(args["JOB_NAME"], args)

glue_client = boto3.client("glue")

try:
    resp = glue_client.get_workflow_run_properties(
        Name=workflow_name,
        RunId=workflow_run_id
    )
    run_props = resp.get("RunProperties", {})
    print("Workflow Run Properties:")
    print(json.dumps(run_props, indent=2))

except Exception as e:
    print(f"[FATAL] Failed to read Workflow Run Properties: {e}")
    raise

input_key = run_props.get("input_key")

if not input_key:
    raise Exception(
        "[CRITICAL] Missing 'input_key' in Workflow Run Properties. "
        "Check EventBridge input_template formatting."
    )

print(f"Input file (encrypted source): {input_key}")


csv_key = input_key.replace(".gpg", "")
input_path = f"s3://{decrypted_bucket}/{csv_key}"

parquet_folder = csv_key.replace(".csv", "")
output_path = f"s3://{parquet_bucket}/{parquet_folder}/"

print(f"Read from:  {input_path}")
print(f"Write to:   {output_path}")

try:
    df = (
        spark.read
            .option("header", "true")
            .option("inferSchema", "true")
            .csv(input_path)
    )

    df.write.mode("overwrite").parquet(output_path)

    print("\n✔ Conversion completed successfully.\n")

except Exception as e:
    print(f"[ERROR] Spark conversion error: {e}")
    raise


job.commit()

decrypt.py

import boto3
import os
import sys
import shutil
import gnupg
from awsglue.utils import getResolvedOptions

args = getResolvedOptions(sys.argv, [
    "RAW_BUCKET",
    "DECRYPTED_BUCKET",
    "GPG_KEY_SECRET",
    "GPG_PASSPHRASE_SECRET",
    "WORKFLOW_NAME",
    "WORKFLOW_RUN_ID"
])

RAW_BUCKET = args["RAW_BUCKET"]
DECRYPTED_BUCKET = args["DECRYPTED_BUCKET"]
SECRET_KEY_NAME = args["GPG_KEY_SECRET"]
SECRET_PASSPHRASE_NAME = args["GPG_PASSPHRASE_SECRET"]
WORKFLOW_NAME = args['WORKFLOW_NAME']
RUN_ID = args['WORKFLOW_RUN_ID']

s3 = boto3.client("s3")
secrets = boto3.client("secretsmanager")
glue_client = boto3.client("glue")

GPG_HOME = "/tmp/gpg_home"

if os.path.exists(GPG_HOME):
    shutil.rmtree(GPG_HOME)

os.makedirs(GPG_HOME, exist_ok=True)

gpg = gnupg.GPG(gnupghome=GPG_HOME)

print("GPG configured at", GPG_HOME)

print("Loading GPG key...")
private_key_raw = secrets.get_secret_value(SecretId=SECRET_KEY_NAME)["SecretString"]

print("Loading passphrase...")
passphrase = secrets.get_secret_value(SecretId=SECRET_PASSPHRASE_NAME)["SecretString"]

private_key = private_key_raw.replace("\r", "").strip()

print("Importing key...")
import_result = gpg.import_keys(private_key)

if not import_result.count:
    raise Exception("Failed to import GPG key: " + str(import_result.stderr))

print("Key imported. Fingerprints:", import_result.fingerprints)


def decrypt_file(key):
    enc_name = os.path.basename(key)
    dec_name = enc_name.replace(".gpg", "")

    enc_path = f"/tmp/{enc_name}"
    dec_path = f"/tmp/{dec_name}"

    print(f"Downloading s3://{RAW_BUCKET}/{key}...")
    s3.download_file(RAW_BUCKET, key, enc_path)

    print(f"Decrypting {enc_name}...")
    with open(enc_path, "rb") as f:
        result = gpg.decrypt_file(f, passphrase=passphrase, output=dec_path)

    if not result.ok:
        print("GPG ERROR:", result.stderr)
        raise Exception("GPG decrypt failed")

    print("Upload decrypted:", dec_name)
    s3.upload_file(dec_path, DECRYPTED_BUCKET, dec_name)

    os.remove(enc_path)
    os.remove(dec_path)

def main():
    print(f"Fetching properties for Workflow: {WORKFLOW_NAME}, RunId: {RUN_ID}")

    try:
        run_props = glue_client.get_workflow_run_properties(
            Name=WORKFLOW_NAME,
            RunId=RUN_ID
        )['RunProperties']

        print("debug:")
        print(glue_client.get_workflow_run_properties(
            Name=WORKFLOW_NAME,
            RunId=RUN_ID
        ))

        input_key = run_props.get('input_key')

        if not input_key:
            print("Warning: No 'input_key' found in RunProperties. Nothing to process.")
            return

        print(f"Triggered for file: {input_key}")
        decrypt_file(input_key)

    except Exception as e:
        print(f"Error processing workflow properties: {str(e)}")
        raise e

    print("Cleaning up GPG home...")
    shutil.rmtree(GPG_HOME, ignore_errors=True)

    print("DONE")


if __name__ == "__main__":
    main()

Вот собственно и всё.