В одном из проектов встала следующая задача: пользователь загружает пачку файлов через клиента (CloudBerry Explorer, к примеру) в S3 бакет, мы копируем эти файлы в архив и шлем SNS уведомление о том, что все сделано. Перекладывать файлы в архив нужно начинать только тогда, когда пользователь загрузит все, что хотел. Пользователей мало и загружают батчи они довольно редко. Но файлов может быть много.
Чтобы понять, что пора начинать архивацию, зададим определенную структуру каталогов и будем просить пользователя загружать триггер-файлы с расширением .trigger когда он закончит. Этакая эмуляция кнопки Done. Структура каталогов будет такой:
<batch_name>/done.trigger
<batch_name>/files/<file_key_1>
<batch_name>/files/<file_key_2>
...
<batch_name>/files/<file_key_n>
Как видим, для каждой пачки создается свой каталог <batch_name> с подкаталогом files, в который и заливаются уже пользовательские файлы с каталогами и именами, которые он хочет. Триггер-файл загружается в <batch_name> и по ключу этого файла можно понять какие конкретно файлы нужно отправить в архив. Но здесь есть один нюанс, мы хотим при копировании в архив вырезать каталог files. Т.е. файл <batch_name>/files/<file_key_1> скопировать в <batch_name>/<file_key_1>.
К счастью, S3 позволяет отслеживать загрузку файлов с определенным суффиксом и отправлять уведомления при наструплении этого события. В качестве получаетеля этих уведомлений можно указать аж 3 сервиса: SNS, SQS и Lambda-функцию. Но тут не без нюансов. Так, первые 2 типа поддерживают только стандартные очереди и SNS, а FIFO не поддерживают, увы.
Мы же, в случае загрузки триггера, будем ставить задачу в стандартную очередь SQS и в приложении будем эту очередь опрашивать. На питоне это выглядит примерно так:
… session = boto3.Session(profile_name=profile) queue = session.resource('sqs', region_name=region).Queue(queue_url) while True: # Здесь можно усыпить процесс, если хочется messages = queue.receive_messages(AttributeNames=('ALL',), WaitTimeSeconds=20) if not messages: continue message = messages[0] # Process message
Окей, мы разобрались как понять, что пользователь закончил заливать свои файлы и пора их архивировать.
Теперь нам нужно скопировать файлы в архив и отправить SNS уведомление. Уведомление нужно отправлять только после того как все файлы уедут в архив. Что нам может предложить для такого случая Амазон?
S3 Batch Operations
Амазон позволяет копировать объекты батчами между бакетами и даже между аккаунтами. Но функционал этот довольно ограничен. Рассмотрим чуть подробнее.
Поставить задачу на копирование можно двумя способами: с использованием отчета об инвентаризации и с помощью CSV манифеста.
Оба способа очень похожи друг на друга и обладают одним и тем же недостатком - они оба недостаточно гибкие. Так к примеру, при постановке задачи на копирование мы можем указать список копируемых файлов из батча в CSV манифесте, а так же можем указать каталог куда все эти файлы скопировать, но выбрать произвольное имя или вообще как-то модифицировать ключ, увы, нельзя. К примеру, если у нас в файле указаны загруженные пользователем файлы из примера выше,<batch_name>/done.trigger
<batch_name>/files/<file_key_1>
<batch_name>/files/<file_key_2>
...
<batch_name>/files/<file_key_n>
то можно все эти файлы загрузить в каталог, скажем, archive и получить пути вида archive/<batch_name>/files/<file_key_1> , но вот каталог files из пути не выдрать, т.е. файл <batch_name>/files/<file_key_1> скопировать в archive/<batch_name>/<file_key_1>, увы, нельзя.
В случае с использованием сервиса инвентаризации все то же самое, только вместо манифеста - отчет с похожим содержимым. А очень бы хотелось на самом деле.
К слову, задачи на копирование батчами можно ставить как программно, так и через Консоль.
SDK Python
Когда гибкости стандартных S3 Batch Operations не хватает, Амазон советует пользоваться SDK, что мы и сделаем. У нас питон, а для питона синхронная либа boto3. boto3 под капотом делает вызовы к REST API амазоновских сервисов, поэтому операция копирования файла сводится к отправке HTTP запроса, а это значит, что мы можем немного распараллелить процесс копирования. Однопоточный синхронный код значительно проще конечно, что мне очень импонирует, но его производительности, к сожалению, будет не достаточно, т.к. файлов может быть много, а очень долго ждать не хочется. В итоге имеем вот такой простенький скриптик:
from concurrent.futures import as_completed from concurrent.futures.thread import ThreadPoolExecutor import boto3 import click def get_source_prefix(message_body: str) -> str: ... def modify_key(key) -> str: ... @click.command() @click.option('--queue-url') @click.option('--source-bucket') @click.option('--target-bucket') @click.option('--target-prefix') @click.option('--max-workers') @click.option('--region') @click.option('--profile') def main(queue_url, source_bucket, target_bucket, target_prefix, max_workers, region, profile): session = boto3.Session(profile_name=profile) queue = session.resource('sqs', region_name=region).Queue(queue_url) s3_client = session.client('s3') paginator = s3_client.get_paginator('list_objects_v2') while True: # Здесь можно усыпить процесс, если очень хочется # По умолчанию, метод возвращает одно сообщение messages = queue.receive_messages(AttributeNames=('ALL',), WaitTimeSeconds=20) if not messages: continue message = messages[0] kwargs = {'Bucket': source_bucket, 'Prefix': f'{get_source_prefix(message.body)}/files'} keys = [k for page in paginator.paginate(**kwargs) for k in page.get('Contents', [])] with ThreadPoolExecutor(max_workers=max_workers) as executor: futures_to_keys = { executor.submit( s3_client.copy, {'Bucket': source_bucket, 'Key': key['Key']}, target_bucket, f'{target_prefix}/{modify_key(key["Key"])}' ): key for key in keys if not key['Key'].endswith('.trigger') } as_completed(futures_to_keys) # шлем SNS уведомление ... message.delete() if __name__ == '__main__': main()
