Как стать автором
Обновить

Копируем файлы пачками в AWS S3

Время на прочтение4 мин
Количество просмотров4.5K

В одном из проектов встала следующая задача: пользователь загружает пачку файлов через клиента (CloudBerry Explorer, к примеру) в S3 бакет, мы копируем эти файлы в архив и шлем SNS уведомление о том, что все сделано. Перекладывать файлы в архив нужно начинать только тогда, когда пользователь загрузит все, что хотел. Пользователей мало и загружают батчи они довольно редко. Но файлов может быть много.

Чтобы понять, что пора начинать архивацию, зададим определенную структуру каталогов и будем просить пользователя загружать триггер-файлы с расширением .trigger когда он закончит. Этакая эмуляция кнопки Done. Структура каталогов будет такой:

<batch_name>/done.trigger
<batch_name>/files/<file_key_1>
<batch_name>/files/<file_key_2>
...
<batch_name>/files/<file_key_n>

Как видим, для каждой пачки создается свой каталог <batch_name> с подкаталогом files, в который и заливаются уже пользовательские файлы с каталогами и именами, которые он хочет. Триггер-файл загружается в <batch_name> и по ключу этого  файла можно понять какие конкретно файлы нужно отправить в архив. Но здесь есть один нюанс, мы хотим при копировании в архив вырезать каталог files. Т.е. файл <batch_name>/files/<file_key_1> скопировать в <batch_name>/<file_key_1>.

К счастью, S3 позволяет отслеживать загрузку файлов с определенным суффиксом и отправлять уведомления при наструплении этого события. В качестве получаетеля этих уведомлений можно указать аж 3 сервиса: SNS, SQS и Lambda-функцию. Но тут не без нюансов. Так, первые 2 типа поддерживают только стандартные очереди и SNS, а FIFO не поддерживают, увы.

Мы же, в случае загрузки триггера, будем ставить задачу в стандартную очередь SQS и в приложении будем эту очередь опрашивать. На питоне это выглядит примерно так:

…
session = boto3.Session(profile_name=profile)
queue = session.resource('sqs', region_name=region).Queue(queue_url)
while True:
   # Здесь можно усыпить процесс, если хочется
   messages = queue.receive_messages(AttributeNames=('ALL',), WaitTimeSeconds=20)
   if not messages:
       continue
   message = messages[0]
   # Process message

Окей, мы разобрались как понять, что пользователь закончил заливать свои файлы и пора их архивировать.

Теперь нам нужно скопировать файлы в архив и отправить SNS уведомление. Уведомление нужно отправлять только после того как все файлы уедут в архив. Что нам может предложить для такого случая Амазон?

S3 Batch Operations

Амазон позволяет копировать объекты батчами между бакетами и даже между аккаунтами. Но функционал этот довольно ограничен. Рассмотрим чуть подробнее.

Поставить задачу на копирование можно двумя способами: с использованием отчета об инвентаризации и с помощью CSV манифеста.

Оба способа очень похожи друг на друга и обладают одним и тем же недостатком - они оба недостаточно гибкие. Так к примеру, при постановке задачи на копирование мы можем указать список копируемых файлов из батча в CSV манифесте, а так же можем указать каталог куда все эти файлы скопировать, но выбрать произвольное имя или вообще как-то модифицировать ключ, увы, нельзя. К примеру, если у нас в файле указаны загруженные пользователем файлы из примера выше,

<batch_name>/done.trigger
<batch_name>/files/<file_key_1>
<batch_name>/files/<file_key_2>
...
<batch_name>/files/<file_key_n>

то можно все эти файлы загрузить в каталог, скажем, archive и получить пути вида archive/<batch_name>/files/<file_key_1> , но вот каталог files из пути не выдрать, т.е. файл <batch_name>/files/<file_key_1> скопировать в archive/<batch_name>/<file_key_1>, увы, нельзя.

В случае с использованием сервиса инвентаризации все то же самое, только вместо манифеста - отчет с похожим содержимым. А очень бы хотелось на самом деле. 

К слову, задачи на копирование батчами можно ставить как программно, так и через Консоль.

SDK Python

Когда гибкости стандартных S3 Batch Operations не хватает, Амазон советует пользоваться SDK, что мы и сделаем. У нас питон, а для питона синхронная либа boto3. boto3 под капотом делает вызовы к REST API амазоновских сервисов, поэтому операция копирования файла сводится к отправке HTTP запроса, а это значит, что мы можем немного распараллелить процесс копирования. Однопоточный синхронный код значительно проще конечно, что мне очень импонирует, но его производительности, к сожалению, будет не достаточно, т.к. файлов может быть много, а очень долго ждать не хочется. В итоге имеем вот такой простенький скриптик:

from concurrent.futures import as_completed
from concurrent.futures.thread import ThreadPoolExecutor

import boto3
import click


def get_source_prefix(message_body: str) -> str:
    ...


def modify_key(key) -> str:
    ...


@click.command()
@click.option('--queue-url')
@click.option('--source-bucket')
@click.option('--target-bucket')
@click.option('--target-prefix')
@click.option('--max-workers')
@click.option('--region')
@click.option('--profile')
def main(queue_url, source_bucket, target_bucket, target_prefix, max_workers, region, profile):
    session = boto3.Session(profile_name=profile)
    queue = session.resource('sqs', region_name=region).Queue(queue_url)
    s3_client = session.client('s3')
    paginator = s3_client.get_paginator('list_objects_v2')

    while True:
        # Здесь можно усыпить процесс, если очень хочется
        # По умолчанию, метод возвращает одно сообщение
        messages = queue.receive_messages(AttributeNames=('ALL',), WaitTimeSeconds=20)
        if not messages:
            continue
        message = messages[0]

        kwargs = {'Bucket': source_bucket, 'Prefix': f'{get_source_prefix(message.body)}/files'}
        keys = [k for page in paginator.paginate(**kwargs) for k in page.get('Contents', [])]

        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            futures_to_keys = {
                executor.submit(
                    s3_client.copy,
                    {'Bucket': source_bucket, 'Key': key['Key']},
                    target_bucket,
                    f'{target_prefix}/{modify_key(key["Key"])}'
                ): key
                for key in keys if not key['Key'].endswith('.trigger')
            }
            as_completed(futures_to_keys)

            # шлем SNS уведомление
            ...

        message.delete()


if __name__ == '__main__':
    main()
Теги:
Хабы:
Всего голосов 3: ↑0 и ↓3-3
Комментарии4

Публикации

Истории

Работа

Python разработчик
136 вакансий
Data Scientist
60 вакансий

Ближайшие события