Как стать автором
Обновить

Делайте что угодно со своими файлами, ну почти

Уровень сложностиСредний
Время на прочтение8 мин
Количество просмотров8.7K

У меня есть решение для тех случаев, когда ваш процессинг файлов довольно стандартный. Решение пока сыроватое, но уже способное делать интересные и полезные вещи.

Для начала уточню, что когда я говорю стандартный процессинг файлов, я имею в виду что-то вроде получить файл от клиента, проверить размер и тип, загрузить в S3. Туда же относятся некоторые задачи, для решения которых обычно пишут скриптики, которые проходят по директориям с файлами и, к примеру, запускают какую-нибудь команду для обработки каждого файла или группы файлов.

То, о чем я хочу рассказать называется Capyfile. Я бы назвал это конвейером для обработки файлов, который можно конфигурировать под свои самые разнообразные нужды. Код можно найти на GitHub. Написано все на Go. И для начала работы с ним, нужны всего две вещи:

  1.  конфигурационный файл, определяющий что конвейер будет делать

  2. раннер который будет обрабатывать файлы в соответствии с инструкциями из конфигурационного файла

Теперь подробности.

Конфигурационный файл конвейера

Как уже говорилось ранее, первое, что нам нужно это конфигурационный файл. С помощью этого файла вы можете структурировать конвейеры и конфигурировать операции которые к ним принадлежат. А в данный момент есть поддержка двух форматов - YAML и JSON.

Типичный файл конфигурации выглядит так.

---
version: '1.2'
name: photos
processors:
  - name: archive
    operations:
      # read the files from the directory
      - name: filesystem_input_read
        params:
          target:
            sourceType: value
            source: "/home/user/Photos/*"
      # check the file type
      - name: file_type_validate
        params:
          allowedMimeTypes:
            sourceType: value
            source:
              - image/jpeg
              - image/x-canon-cr2
              - image/heic
              - image/heif
      # if the file type is right, upload the file to S3
      - name: s3_upload
        targetFiles: without_errors
        params:
          accessKeyId:
            sourceType: env_var
            source: AWS_ACCESS_KEY_ID
          secretAccessKey:
            sourceType: env_var
            source: AWS_SECRET_ACCESS_KEY
          endpoint:
            sourceType: value
            source: "s3.amazonaws.com"
          region:
            sourceType: value
            source: "us-east-1"
          bucket:
            sourceType: env_var
            source: AWS_PHOTOS_BUCKET
      # if the file type is right, and it is successfully uploaded to S3,
      # remove the file from the filesystem
      - name: filesystem_input_remove
        targetFiles: without_errors

Здесь видны три основные сущности. Самая верхняя – это сервис. Затем следуют процессоры, и конечно же операции принадлежащие каждому процессору. Таким образом каждый конвейер имеет своего рода идентификатор, по которому к нему можно обращаться. Он состоит из имени сервиса и имени процессора, между которыми разделитель (двоеточие для консоли и слэш для сервера). Вам решать, как их именовать. К примеру, обратиться к конвейеру из конфигурационного файла выше можно используя идентификатор photos:archive.

А теперь перейдём к операциям. Каждой конвейер может состоять из множества операции, и вы можете размещать их в любом порядке который в вашем случае имеет смысл. В данный момент доступны следующие из них:

  • http_multipart_form_input_read - читаем HTTP запрос как multipart/form-data

  • http_octet_stream_input_read - читаем HTTP запрос как application/octet-stream

  • filesystem_input_read - читаем из локальной файловой системы

  • filesystem_input_write - пишем в локальную файловую систему

  • filesystem_input_remove - удаляем из локальной файловой системы

  • file_size_validate - проверяем размер файла

  • file_type_validate - проверяем MIME-тип файла

  • file_time_validate - проверяем временные метки файла

  • exiftool_metadata_cleanup - чистим метаданные файла (нужен exiftool)

  • image_convert - конвертируем изображение в другой формат (нужен libvips)

  • s3_upload - загружаем файл в S3

  • command_exec - выполняем произвольную команду

Каждая из операций принимает такие параметры:

  • targetFiles - указать операции какие файлы она должна обрабатывать

    • without_errors (по умолчанию) - операция обработает файлы без ошибок (ошибок валидации либо любых других)

    • with_errors - операция обработает лишь файлы с ошибками

    • all - операция обработает все файлы

  • cleanupPolicy - указать операции, что делать с созданными ею файлами когда пришло время подчищать мусор

    • keep_files (по умолчанию) - оставляем созданные операцией файлы

    • remove_files - удаляем созданные операцией файлы

  • maxPacketSize - указать операции количество файлов которые она будет обрабатывать за раз

И конечно же, операция сама по себе должна быть сконфигурирована. Здесь у каждой операции свои параметры, которые она принимает через свойство params. Список параметров можно найти в документации на GitHub. Значения для них можно получить из следующих источников:

  • value - берем значение из конфигурационного файла

  • env_var - берем значение из переменной окружения

  • secret - берем значение из секрета

  • file - берем значение из файла

  • http_get - берем значение из HTTP GET параметра

  • http_post - берем значение из HTTP POST параметра

  • http_header - берем значение из HTTP заголовка

  • etcd - берем значение из etcd

Это практически все, что нужно знать о конфигурационном файле для конвейера.

Запуск конвейера

Допустим, мы написали конфигурационный файл для нашего конвейера. Как его теперь запустить? В данный момент доступно три варианта запуска: 

  1. capysvr - HTTP сервер запускающий конвейер под каждый запрос

  2. capycmd - консольное приложение для запуска конвейера

  3. capyworker - воркер запускающий конвейер с определенной периодичностью

Каждый из раннеров принимает как аргумент файл конфигурации. И для каждого из них есть возможность включить конкурентный режим.

Теперь, когда мы знаем как конвейер настроить и запустить, перейдем к примерам.

Пример 1: архивируем логи

К примеру, есть куча ротированных логов. И логи которые старше месяца, нужно сжать и архивировать в S3.

Напишем конфиг:

---
version: '1.2'
name: logs
processors:
  - name: archive
    operations:
      - name: filesystem_input_read
        cleanupPolicy: keep_files
        params:
          target:
            sourceType: env_var
            source: INPUT_READ_TARGET
      - name: file_time_validate
        params:
          maxMtime:
            sourceType: env_var
            source: MAX_LOG_FILE_TIME_RFC3339
      - name: command_exec
        cleanupPolicy: remove_files
        params:
          commandName:
            sourceType: value
            source: gzip
          commandArgs:
            sourceType: value
            source: ["{{.AbsolutePath}}"]
          outputFileDestination:
            sourceType: value
            source: "{{.AbsolutePath}}.gz"
      - name: command_exec
        params:
          commandName:
            sourceType: value
            source: aws
          commandArgs:
            sourceType: value
            source: [
              "s3",
              "cp", "{{.AbsolutePath}}",
              "s3://MY_LOGS_BUCKET_HERE/{{.Filename}}",
            ]
      - name: filesystem_input_remove
        params:
          removeOriginalFile:
            sourceType: value
            source: false

И запустим через capycmd:

INPUT_READ_TARGET=/var/logs/rotated-logs/* \
MAX_LOG_FILE_TIME_RFC3339=$(date -d "30 days ago" -u +"%Y-%m-%dT%H:%M:%SZ") \
capycmd -f logs.pipeline.yml -c logs:archive

Вывод будет выглядеть как-то так:

Running logs:archive service processor...

[/var/logs/rotated-logs/access-2023-09-26.log] filesystem_input_read FINISHED file read finished
[/var/logs/rotated-logs/access-2023-09-28.log] filesystem_input_read FINISHED file read finished
[/var/logs/rotated-logs/access-2023-12-23.log] filesystem_input_read FINISHED file read finished
[/var/logs/rotated-logs/access-2023-12-28.log] filesystem_input_read FINISHED file read finished
[/var/logs/rotated-logs/access-2023-12-28.log] file_time_validate STARTED file time validation started
[/var/logs/rotated-logs/access-2023-12-28.log] file_time_validate FINISHED file mtime is too new
[/var/logs/rotated-logs/access-2023-09-26.log] file_time_validate STARTED file time validation started
[/var/logs/rotated-logs/access-2023-09-26.log] file_time_validate FINISHED file time is valid
[/var/logs/rotated-logs/access-2023-09-28.log] file_time_validate STARTED file time validation started
[/var/logs/rotated-logs/access-2023-09-28.log] file_time_validate FINISHED file time is valid
[/var/logs/rotated-logs/access-2023-12-23.log] file_time_validate STARTED file time validation started
[/var/logs/rotated-logs/access-2023-12-23.log] file_time_validate FINISHED file mtime is too new
[/var/logs/rotated-logs/access-2023-09-26.log] command_exec STARTED command execution has started
[/var/logs/rotated-logs/access-2023-09-26.log] command_exec FINISHED command execution has finished
[/var/logs/rotated-logs/access-2023-09-28.log] command_exec STARTED command execution has started
[/var/logs/rotated-logs/access-2023-09-28.log] command_exec FINISHED command execution has finished
[/var/logs/rotated-logs/access-2023-12-28.log] command_exec SKIPPED skipped due to "without_errors" target files policy
[/var/logs/rotated-logs/access-2023-12-23.log] command_exec SKIPPED skipped due to "without_errors" target files policy
[/var/logs/rotated-logs/access-2023-09-26.log] filesystem_input_write STARTED file write started
[/var/logs/rotated-logs/access-2023-09-28.log] filesystem_input_write STARTED file write started
[/var/logs/rotated-logs/access-2023-09-26.log] filesystem_input_write FINISHED file write finished
[/var/logs/rotated-logs/access-2023-09-28.log] filesystem_input_write FINISHED file write finished
[/var/logs/rotated-logs/access-2023-12-28.log] filesystem_input_write SKIPPED skipped due to "without_errors" target files policy
[/var/logs/rotated-logs/access-2023-12-23.log] filesystem_input_write SKIPPED skipped due to "without_errors" target files policy
[/var/logs/rotated-logs/access-2023-09-28.log] filesystem_input_remove STARTED file remove started
[/var/logs/rotated-logs/access-2023-09-28.log] filesystem_input_remove FINISHED file remove finished
[/var/logs/rotated-logs/access-2023-09-26.log] filesystem_input_remove STARTED file remove started
[/var/logs/rotated-logs/access-2023-09-26.log] filesystem_input_remove FINISHED file remove finished
[/var/logs/rotated-logs/access-2023-12-28.log] filesystem_input_remove SKIPPED skipped due to "without_errors" target files policy
[/var/logs/rotated-logs/access-2023-12-23.log] filesystem_input_remove SKIPPED skipped due to "without_errors" target files policy
...

Пример 2: загружаем документы по HTTP

Допустим, нам нужен HTTP эндпоинт принимающий лишь документы, которые не больше 10МБ, и затем загружать их в S3 хранилище.

Как обычно, пишем конфиг:

version: '1.2'
name: documents
processors:
  - name: upload
    operations:
      - name: http_multipart_form_input_read
      - name: file_size_validate
        params:
          maxFileSize:
            sourceType: value
            source: 1048576
      - name: file_type_validate
        params:
          allowedMimeTypes:
            sourceType: value
            source:
              - application/pdf
              - application/msword
              - application/vnd.openxmlformats-officedocument.wordprocessingml.document
      - name: s3_upload
        params:
          accessKeyId:
            sourceType: secret
            source: aws_access_key_id
          secretAccessKey:
            sourceType: secret
            source: aws_secret_access_key
          endpoint:
            sourceType: env_var
            source: AWS_ENDPOINT
          region:
            sourceType: env_var
            source: AWS_REGION
          bucket:
            sourceType: env_var
            source: AWS_DOCUMENTS_BUCKET

И запускаем с помощью capysvr завернутого в Docker:

docker run \
    --name capyfile_server \
    --mount type=bind,source=./docs.pipeline.yml,target=/etc/capyfile/docs.pipeline.yml \
    --env CAPYFILE_SERVICE_DEFINITION_FILE=/etc/capyfile/docs.pipeline.yml \
    --env AWS_ENDPOINT=s3.amazonaws.com \
    --env AWS_REGION=us-west-1 \
    --env AWS_DOCUMENTS_BUCKET=my_documents_bucket \
    --secret aws_access_key_id \
    --secret aws_secret_access_key \
    -p 8024:80 \
    capyfile/capysvr:latest

Теперь обратиться к нему по HTTP можно так:

curl \
    -F "file1=@$HOME/Documents/document.pdf" \
    -F "file2=@$HOME/Documents/document.docx" \
    -F "file3=@$HOME/Documents/very-large-document.pdf" \
    -F "file4=@$HOME/Documents/program.run" \
    http://localhost/documents/upload 

В ответ сервер вернет JSON подобного формата:

{
  "status": "PARTIAL",
  "code": "PARTIAL",
  "message": "successfully uploaded 2 of 4 files",
  "files": [
    {
      "url": "https://my_documents_bucket.s3.us-west-1.amazonaws.com/abcdKDNJW_DDWse.pdf",
      "filename": "abcdKDNJW_DDWse.pdf",
      "originalFilename": "document.pdf",
      "mime": "application/pdf",
      "size": 5892728,
      "status": "SUCCESS",
      "code": "FILE_SUCCESSFULLY_UPLOADED",
      "message": "file successfully uploaded"
    },
    {
      "url": "https://my_documents_bucket.s3.us-west-1.amazonaws.com/abcdKDNJW_DDWsd.docx",
      "filename": "abcdKDNJW_DDWsd.docx",
      "originalFilename": "document.docx",
      "mime": "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
      "size": 3145728,
      "status": "SUCCESS",
      "code": "FILE_SUCCESSFULLY_UPLOADED",
      "message": "file successfully uploaded"
    }
  ],
  "errors": [
    {
      "originalFilename": "very-large-document.pdf",
      "status": "ERROR",
      "code": "FILE_IS_TOO_BIG",
      "message": "file size can not be greater than 10 MB"
    },
    {
      "originalFilename": "program.run",
      "status": "ERROR",
      "code": "FILE_MIME_TYPE_IS_NOT_ALLOWED",
      "message": "file MIME type \"application/x-makeself\" is not allowed"
    }
  ],
  "meta": {
    "totalUploads": 4,
    "successfulUploads": 2,
    "failedUploads": 2
  }
}

В конце

Ну в общем такое я пишу по выходным. Роботы там еще много, но уже что-то вырисовывается, чему я даже немного рад. Кому идея понравилась, загляните в репозиторий.

Теги:
Хабы:
Всего голосов 1: ↑1 и ↓0+1
Комментарии16

Публикации