AWS Glue расшифровка и конвертирование csv в paraquet
Пришла мне задача принимать gpg-зашифрованные csv, расшифровывать их с секретным ключом и паролем из AWS Secrets Manager, а затем сразу конвертировать в parquet, что бы после подключить AWS Athena и не думать о преобразованиях, партициях и так далее. Все должно стартовать само, когда в S3 появляется новый файл.
S3
Так как всё происходит в AWS, я вооружаюсь terraform и иду создавать 3 бакета.
raw - непосредственно сюда будут загружать csv.gpg файлы, и по event'ам из этого бакета будет начинаться процесс конвертации
decrypted - в этот бакет будут складываться расшифрованные файлы
parquet - в этом бакете будут храниться сконвертированные из csv, paraquet файлы
scripts - этот бакет нужен для aws glue, тут хранится код который glue будет запускать для дешифровки и конвертации
resource "aws_s3_bucket" "raw" {
bucket = "raw-csv-gpg"
}
# Отправляем евенты в eventbridge, что бы выцепить создание объекта
resource "aws_s3_bucket_notification" "raw" {
bucket = aws_s3_bucket.raw.bucket
eventbridge = true
}
resource "aws_s3_bucket" "decrypted" {
bucket = "decrypted-csv"
}
resource "aws_s3_bucket" "parquet" {
bucket = "parquet"
}
resource "aws_s3_bucket" "scripts" {
bucket = "glue-scripts"
}
# Для простоты, просто загружаем готовые python файлы в бакет scripts
resource "aws_s3_object" "decrypt_script" {
bucket = aws_s3_bucket.scripts.bucket
key = "decrypt.py"
source = "${path.module}/src/decrypt.py"
}
resource "aws_s3_object" "convert_script" {
bucket = aws_s3_bucket.scripts.bucket
key = "convert.py"
source = "${path.module}/src/convert.py"
}
EventBridge
Создаём правило в eventbridge которые слушает бакет raw, и если видит что в него загрузили файл с постфиксом .csv.gpg то запускается лямбда которая будет запускать workflow. Предвижу вопрос: "eventbridge умеет запускать workflow сам, зачем ты добавил лямбду?" Я отвечу, я не нашёл возможности передавать в glue в качестве параметра какой файл взять и начать обрабатывать. А если такого файла нет то будут обрабатываться всегда все файлы которые находятся в бакете, этого мне захотелось избежать
resource "aws_cloudwatch_event_rule" "s3_upload" {
name = "s3-csv-gpg-trigger"
event_pattern = jsonencode({
"source": ["aws.s3"],
"detail-type": ["Object Created"],
"detail": {
"bucket": {
"name": [aws_s3_bucket.raw.bucket]
},
"object": {
"key": [{ "suffix": "csv.gpg" }]
}
}
})
}
resource "aws_cloudwatch_event_target" "lambda_target" {
rule = aws_cloudwatch_event_rule.s3_upload.name
target_id = "InvokeLambdaStarter"
arn = aws_lambda_function.workflow_starter.arn
}
resource "aws_lambda_permission" "allow_cloudwatch" {
statement_id = "AllowExecutionFromCloudWatch"
action = "lambda:InvokeFunction"
function_name = aws_lambda_function.workflow_starter.function_name
principal = "events.amazonaws.com"
source_arn = aws_cloudwatch_event_rule.s3_upload.arn
}
Lambda
Простая лямбда, которая слушает eventbridge видит евент от s3 что создан файлик, вытаскивает из евента имя файла и запускает Glue workflow
data "archive_file" "lambda_zip" {
type = "zip"
source_file = "lambda/lambda_handler.py"
output_path = "lambda/lambda_handler.zip"
}
resource "aws_lambda_function" "workflow_starter" {
function_name = "glue-workflow-start"
filename = data.archive_file.lambda_zip.output_path
source_code_hash = data.archive_file.lambda_zip.output_base64sha256
handler = "lambda_handler.lambda_handler"
runtime = "python3.14"
role = aws_iam_role.lambda_glue_invoker_role.arn
timeout = 60
environment {
variables = {
GLUE_WORKFLOW_NAME = aws_glue_workflow.convert_workflow.name
}
}
}
resource "aws_iam_role" "lambda_glue_invoker_role" {
name = "lambda-glue-invoker"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [{
Action = "sts:AssumeRole"
Effect = "Allow"
Principal = { Service = "lambda.amazonaws.com" }
}]
})
}
resource "aws_iam_policy" "lambda_glue_policy" {
name = "lambda-glue-invoker"
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Action = [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
]
Effect = "Allow"
Resource = "arn:aws:logs:*:*:*"
},
{
Effect = "Allow"
Action = "glue:StartWorkflowRun"
Resource = aws_glue_workflow.convert_workflow.arn
}
]
})
}
resource "aws_iam_role_policy_attachment" "lambda_policy_attach" {
role = aws_iam_role.lambda_glue_invoker_role.name
policy_arn = aws_iam_policy.lambda_glue_policy.arn
}
Самая лямбда функция. Код писала ИИшка, прошу сильно не бить, свою задачу выполняет, меня устраивает
lambda_handler.py
import json
import os
import boto3
WORKFLOW_NAME = os.environ.get('GLUE_WORKFLOW_NAME')
glue_client = boto3.client('glue')
def lambda_handler(event, context):
print("Received event: " + json.dumps(event, indent=2))
if 'detail' not in event or 'bucket' not in event['detail'] or 'object' not in event['detail']:
print("Event structure missing S3 details. Exiting.")
return {'statusCode': 400, 'body': 'Invalid event structure'}
s3_bucket = event['detail']['bucket']['name']
s3_key = event['detail']['object']['key']
if not s3_key.lower().endswith('.csv.gpg'):
print(f"Skipping file: {s3_key}")
return {'statusCode': 200, 'body': 'Skipped'}
print(f"Starting workflow for s3://{s3_bucket}/{s3_key}")
try:
response = glue_client.start_workflow_run(
Name=WORKFLOW_NAME,
RunProperties={
'input_bucket': s3_bucket,
'input_key': s3_key
}
)
print(f"Glue Workflow Run Started: {response['RunId']}")
return {
'statusCode': 200,
'body': json.dumps({'message': 'Workflow started', 'RunId': response['RunId']})
}
except Exception as e:
print(f"Error starting Glue Workflow: {e}")
raise e
Glue
Теперь создаём AWS Glue. Нам требуется создать сам workflow который мы будем запускать, и две джобы которые будут заниматься расшифровкой и конвертированием.
resource "aws_glue_workflow" "convert_workflow" {
name = "csv-gpg-convert"
}
resource "aws_glue_job" "decrypt" {
name = "decrypt-gpg"
role_arn = aws_iam_role.glue_decrypt.arn
glue_version = "4.0"
command {
name = "pythonshell"
python_version = "3.9"
script_location = "s3://${aws_s3_bucket.scripts.bucket}/decrypt.py"
}
default_arguments = {
"--RAW_BUCKET" = aws_s3_bucket.raw.bucket
"--DECRYPTED_BUCKET" = aws_s3_bucket.decrypted.bucket
"--GPG_KEY_SECRET" = "CSV_GPG_CONVERT/KEY"
"--GPG_PASSPHRASE_SECRET" = "CSV_GPG_CONVERT/PASSPHRASE"
"--additional-python-modules" = "python-gnupg==0.5.5"
}
}
resource "aws_glue_job" "convert" {
name = "csv-to-parquet"
role_arn = aws_iam_role.glue_convert.arn
glue_version = "4.0"
command {
name = "glueetl"
script_location = "s3://${aws_s3_bucket.scripts.bucket}/convert.py"
python_version = "3"
}
default_arguments = {
"--DECRYPTED_BUCKET" = aws_s3_bucket.decrypted.bucket
"--PARQUET_BUCKET" = aws_s3_bucket.parquet.bucket
}
}
# Запускаем расшифровку по команде из лямбды
resource "aws_glue_trigger" "start_decrypt" {
name = "trigger-start-decrypt"
type = "EVENT"
workflow_name = aws_glue_workflow.convert_workflow.name
actions {
job_name = aws_glue_job.decrypt.name
}
}
# Конвертация запустится, только в том случае если job decrypt перешла в статус SUCCEEDED
resource "aws_glue_trigger" "convert" {
name = "trigger-convert"
type = "CONDITIONAL"
workflow_name = aws_glue_workflow.convert_workflow.name
predicate {
conditions {
job_name = aws_glue_job.decrypt.name
state = "SUCCEEDED"
}
}
actions {
job_name = aws_glue_job.convert.name
}
}
resource "aws_iam_role" "glue_decrypt" {
name = "glue-decrypt-role"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [{
Effect = "Allow"
Principal = { Service = "glue.amazonaws.com" }
Action = "sts:AssumeRole"
}]
})
}
resource "aws_iam_role_policy" "glue_decrypt_policy" {
role = aws_iam_role.glue_decrypt.name
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Action = ["s3:ListBucket"]
Resource = [
"${aws_s3_bucket.raw.arn}",
"${aws_s3_bucket.decrypted.arn}"
]
},
{
Effect = "Allow"
Action = ["s3:GetObject", "s3:PutObject"]
Resource = [
"${aws_s3_bucket.raw.arn}/*",
"${aws_s3_bucket.decrypted.arn}/*",
"${aws_s3_bucket.scripts.arn}/*"
]
},
{
Effect = "Allow"
Action = ["secretsmanager:GetSecretValue"]
Resource = "arn:aws:secretsmanager:${var.region}:${data.aws_caller_identity.current.account_id}:secret:CSV_GPG_CONVERT*"
},
{
Effect = "Allow"
Action = ["glue:GetWorkflowRunProperties"]
Resource = [aws_glue_workflow.convert_workflow.arn]
},
{
Action = [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
]
Effect = "Allow"
Resource = "arn:aws:logs:*:*:*"
}
]
})
}
resource "aws_iam_role" "glue_convert" {
name = "glue-convert-role"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [{
Effect = "Allow"
Principal = { Service = "glue.amazonaws.com" }
Action = "sts:AssumeRole"
}]
})
}
resource "aws_iam_role_policy" "glue_convert_policy" {
name = "glue-convert-policy"
role = aws_iam_role.glue_convert.name
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Action = ["s3:ListBucket"]
Resource = [
"${aws_s3_bucket.decrypted.arn}",
"${aws_s3_bucket.parquet.arn}",
"${aws_s3_bucket.scripts.arn}"
]
},
{
Effect = "Allow"
Action = ["s3:GetObject"]
Resource = [
"${aws_s3_bucket.decrypted.arn}/*",
"${aws_s3_bucket.scripts.arn}/*"
]
},
{
Effect = "Allow"
Action = ["s3:PutObject"]
Resource = ["${aws_s3_bucket.parquet.arn}/*"]
},
{
Action = [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
]
Effect = "Allow"
Resource = "arn:aws:logs:*:*:*"
},
{
Effect = "Allow"
Action = ["glue:GetWorkflowRunProperties"]
Resource = [aws_glue_workflow.convert_workflow.arn]
}
]
})
}
Так же прикладываю файлы decrypt.py и convert.py которые делают основную работу. Код так же писала ИИшка, прошу сильно не бить
convert.py
import sys
import json
import boto3
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
args = getResolvedOptions(
sys.argv,
[
"JOB_NAME",
"DECRYPTED_BUCKET",
"PARQUET_BUCKET",
"WORKFLOW_NAME",
"WORKFLOW_RUN_ID"
]
)
workflow_name = args["WORKFLOW_NAME"]
workflow_run_id = args["WORKFLOW_RUN_ID"]
decrypted_bucket = args["DECRYPTED_BUCKET"]
parquet_bucket = args["PARQUET_BUCKET"]
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
glue_client = boto3.client("glue")
try:
resp = glue_client.get_workflow_run_properties(
Name=workflow_name,
RunId=workflow_run_id
)
run_props = resp.get("RunProperties", {})
print("Workflow Run Properties:")
print(json.dumps(run_props, indent=2))
except Exception as e:
print(f"[FATAL] Failed to read Workflow Run Properties: {e}")
raise
input_key = run_props.get("input_key")
if not input_key:
raise Exception(
"[CRITICAL] Missing 'input_key' in Workflow Run Properties. "
"Check EventBridge input_template formatting."
)
print(f"Input file (encrypted source): {input_key}")
csv_key = input_key.replace(".gpg", "")
input_path = f"s3://{decrypted_bucket}/{csv_key}"
parquet_folder = csv_key.replace(".csv", "")
output_path = f"s3://{parquet_bucket}/{parquet_folder}/"
print(f"Read from: {input_path}")
print(f"Write to: {output_path}")
try:
df = (
spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv(input_path)
)
df.write.mode("overwrite").parquet(output_path)
print("\n✔ Conversion completed successfully.\n")
except Exception as e:
print(f"[ERROR] Spark conversion error: {e}")
raise
job.commit()
decrypt.py
import boto3
import os
import sys
import shutil
import gnupg
from awsglue.utils import getResolvedOptions
args = getResolvedOptions(sys.argv, [
"RAW_BUCKET",
"DECRYPTED_BUCKET",
"GPG_KEY_SECRET",
"GPG_PASSPHRASE_SECRET",
"WORKFLOW_NAME",
"WORKFLOW_RUN_ID"
])
RAW_BUCKET = args["RAW_BUCKET"]
DECRYPTED_BUCKET = args["DECRYPTED_BUCKET"]
SECRET_KEY_NAME = args["GPG_KEY_SECRET"]
SECRET_PASSPHRASE_NAME = args["GPG_PASSPHRASE_SECRET"]
WORKFLOW_NAME = args['WORKFLOW_NAME']
RUN_ID = args['WORKFLOW_RUN_ID']
s3 = boto3.client("s3")
secrets = boto3.client("secretsmanager")
glue_client = boto3.client("glue")
GPG_HOME = "/tmp/gpg_home"
if os.path.exists(GPG_HOME):
shutil.rmtree(GPG_HOME)
os.makedirs(GPG_HOME, exist_ok=True)
gpg = gnupg.GPG(gnupghome=GPG_HOME)
print("GPG configured at", GPG_HOME)
print("Loading GPG key...")
private_key_raw = secrets.get_secret_value(SecretId=SECRET_KEY_NAME)["SecretString"]
print("Loading passphrase...")
passphrase = secrets.get_secret_value(SecretId=SECRET_PASSPHRASE_NAME)["SecretString"]
private_key = private_key_raw.replace("\r", "").strip()
print("Importing key...")
import_result = gpg.import_keys(private_key)
if not import_result.count:
raise Exception("Failed to import GPG key: " + str(import_result.stderr))
print("Key imported. Fingerprints:", import_result.fingerprints)
def decrypt_file(key):
enc_name = os.path.basename(key)
dec_name = enc_name.replace(".gpg", "")
enc_path = f"/tmp/{enc_name}"
dec_path = f"/tmp/{dec_name}"
print(f"Downloading s3://{RAW_BUCKET}/{key}...")
s3.download_file(RAW_BUCKET, key, enc_path)
print(f"Decrypting {enc_name}...")
with open(enc_path, "rb") as f:
result = gpg.decrypt_file(f, passphrase=passphrase, output=dec_path)
if not result.ok:
print("GPG ERROR:", result.stderr)
raise Exception("GPG decrypt failed")
print("Upload decrypted:", dec_name)
s3.upload_file(dec_path, DECRYPTED_BUCKET, dec_name)
os.remove(enc_path)
os.remove(dec_path)
def main():
print(f"Fetching properties for Workflow: {WORKFLOW_NAME}, RunId: {RUN_ID}")
try:
run_props = glue_client.get_workflow_run_properties(
Name=WORKFLOW_NAME,
RunId=RUN_ID
)['RunProperties']
print("debug:")
print(glue_client.get_workflow_run_properties(
Name=WORKFLOW_NAME,
RunId=RUN_ID
))
input_key = run_props.get('input_key')
if not input_key:
print("Warning: No 'input_key' found in RunProperties. Nothing to process.")
return
print(f"Triggered for file: {input_key}")
decrypt_file(input_key)
except Exception as e:
print(f"Error processing workflow properties: {str(e)}")
raise e
print("Cleaning up GPG home...")
shutil.rmtree(GPG_HOME, ignore_errors=True)
print("DONE")
if __name__ == "__main__":
main()
Вот собственно и всё.