У меня есть решение для тех случаев, когда ваш процессинг файлов довольно стандартный. Решение пока сыроватое, но уже способное делать интересные и полезные вещи.
Для начала уточню, что когда я говорю стандартный процессинг файлов, я имею в виду что-то вроде получить файл от клиента, проверить размер и тип, загрузить в S3. Туда же относятся некоторые задачи, для решения которых обычно пишут скриптики, которые проходят по директориям с файлами и, к примеру, запускают какую-нибудь команду для обработки каждого файла или группы файлов.
То, о чем я хочу рассказать называется Capyfile. Я бы назвал это конвейером для обработки файлов, который можно конфигурировать под свои самые разнообразные нужды. Код можно найти на GitHub. Написано все на Go. И для начала работы с ним, нужны всего две вещи:
конфигурационный файл, определяющий что конвейер будет делать
раннер который будет обрабатывать файлы в соответствии с инструкциями из конфигурационного файла
Теперь подробности.
Конфигурационный файл конвейера
Как уже говорилось ранее, первое, что нам нужно это конфигурационный файл. С помощью этого файла вы можете структурировать конвейеры и конфигурировать операции которые к ним принадлежат. А в данный момент есть поддержка двух форматов - YAML и JSON.
Типичный файл конфигурации выглядит так.
--- version: '1.2' name: photos processors: - name: archive operations: # read the files from the directory - name: filesystem_input_read params: target: sourceType: value source: "/home/user/Photos/*" # check the file type - name: file_type_validate params: allowedMimeTypes: sourceType: value source: - image/jpeg - image/x-canon-cr2 - image/heic - image/heif # if the file type is right, upload the file to S3 - name: s3_upload targetFiles: without_errors params: accessKeyId: sourceType: env_var source: AWS_ACCESS_KEY_ID secretAccessKey: sourceType: env_var source: AWS_SECRET_ACCESS_KEY endpoint: sourceType: value source: "s3.amazonaws.com" region: sourceType: value source: "us-east-1" bucket: sourceType: env_var source: AWS_PHOTOS_BUCKET # if the file type is right, and it is successfully uploaded to S3, # remove the file from the filesystem - name: filesystem_input_remove targetFiles: without_errors
Здесь видны три основные сущности. Самая верхняя – это сервис. Затем следуют процессоры, и конечно же операции принадлежащие каждому процессору. Таким образом каждый конвейер имеет своего рода идентификатор, по которому к нему можно обращаться. Он состоит из имени сервиса и имени процессора, между которыми разделитель (двоеточие для консоли и слэш для сервера). Вам решать, как их именовать. К примеру, обратиться к конвейеру из конфигурационного файла выше можно используя идентификатор photos:archive.
А теперь перейдём к операциям. Каждой конвейер может состоять из множества операции, и вы можете размещать их в любом порядке который в вашем случае имеет смысл. В данный момент доступны следующие из них:
http_multipart_form_input_read- читаем HTTP запрос какmultipart/form-datahttp_octet_stream_input_read- читаем HTTP запрос какapplication/octet-streamfilesystem_input_read- читаем из локальной файловой системыfilesystem_input_write- пишем в локальную файловую системуfilesystem_input_remove- удаляем из локальной файловой системыfile_size_validate- проверяем размер файлаfile_type_validate- проверяем MIME-тип файлаfile_time_validate- проверяем временные метки файлаexiftool_metadata_cleanup- чистим метаданные файла (нужен exiftool)image_convert- конвертируем изображение в другой формат (нужен libvips)s3_upload- загружаем файл в S3command_exec- выполняем произвольную команду
Каждая из операций принимает такие параметры:
targetFiles- указать операции какие файлы она должна обрабатыватьwithout_errors(по умолчанию) - операция обработает файлы без ошибок (ошибок валидации либо любых других)with_errors- операция обработает лишь файлы с ошибкамиall- операция обработает все файлы
cleanupPolicy- указать операции, что делать с созданными ею файлами когда пришло время подчищать мусорkeep_files(по умолчанию) - оставляем созданные операцией файлыremove_files- удаляем созданные операцией файлы
maxPacketSize- указать операции количество файлов которые она будет обрабатывать за раз
И конечно же, операция сама по себе должна быть сконфигурирована. Здесь у каждой операции свои параметры, которые она принимает через свойство params. Список параметров можно найти в документации на GitHub. Значения для них можно получить из следующих источников:
value- берем значение из конфигурационного файлаenv_var- берем значение из переменной окруженияsecret- берем значение из секретаfile- берем значение из файлаhttp_get- берем значение из HTTP GET параметраhttp_post- берем значение из HTTP POST параметраhttp_header- берем значение из HTTP заголовкаetcd- берем значение из etcd
Это практически все, что нужно знать о конфигурационном файле для конвейера.
Запуск конвейера
Допустим, мы написали конфигурационный файл для нашего конвейера. Как его теперь запустить? В данный момент доступно три варианта запуска:
capysvr- HTTP сервер запускающий конвейер под каждый запросcapycmd- консольное приложение для запуска конвейераcapyworker- воркер запускающий конвейер с определенной периодичностью
Каждый из раннеров принимает как аргумент файл конфигурации. И для каждого из них есть возможность включить конкурентный режим.
Теперь, когда мы знаем как конвейер настроить и запустить, перейдем к примерам.
Пример 1: архивируем логи
К примеру, есть куча ротированных логов. И логи которые старше месяца, нужно сжать и архивировать в S3.
Напишем конфиг:
--- version: '1.2' name: logs processors: - name: archive operations: - name: filesystem_input_read cleanupPolicy: keep_files params: target: sourceType: env_var source: INPUT_READ_TARGET - name: file_time_validate params: maxMtime: sourceType: env_var source: MAX_LOG_FILE_TIME_RFC3339 - name: command_exec cleanupPolicy: remove_files params: commandName: sourceType: value source: gzip commandArgs: sourceType: value source: ["{{.AbsolutePath}}"] outputFileDestination: sourceType: value source: "{{.AbsolutePath}}.gz" - name: command_exec params: commandName: sourceType: value source: aws commandArgs: sourceType: value source: [ "s3", "cp", "{{.AbsolutePath}}", "s3://MY_LOGS_BUCKET_HERE/{{.Filename}}", ] - name: filesystem_input_remove params: removeOriginalFile: sourceType: value source: false
И запустим через capycmd:
INPUT_READ_TARGET=/var/logs/rotated-logs/* \ MAX_LOG_FILE_TIME_RFC3339=$(date -d "30 days ago" -u +"%Y-%m-%dT%H:%M:%SZ") \ capycmd -f logs.pipeline.yml -c logs:archive
Вывод будет выглядеть как-то так:
Running logs:archive service processor... [/var/logs/rotated-logs/access-2023-09-26.log] filesystem_input_read FINISHED file read finished [/var/logs/rotated-logs/access-2023-09-28.log] filesystem_input_read FINISHED file read finished [/var/logs/rotated-logs/access-2023-12-23.log] filesystem_input_read FINISHED file read finished [/var/logs/rotated-logs/access-2023-12-28.log] filesystem_input_read FINISHED file read finished [/var/logs/rotated-logs/access-2023-12-28.log] file_time_validate STARTED file time validation started [/var/logs/rotated-logs/access-2023-12-28.log] file_time_validate FINISHED file mtime is too new [/var/logs/rotated-logs/access-2023-09-26.log] file_time_validate STARTED file time validation started [/var/logs/rotated-logs/access-2023-09-26.log] file_time_validate FINISHED file time is valid [/var/logs/rotated-logs/access-2023-09-28.log] file_time_validate STARTED file time validation started [/var/logs/rotated-logs/access-2023-09-28.log] file_time_validate FINISHED file time is valid [/var/logs/rotated-logs/access-2023-12-23.log] file_time_validate STARTED file time validation started [/var/logs/rotated-logs/access-2023-12-23.log] file_time_validate FINISHED file mtime is too new [/var/logs/rotated-logs/access-2023-09-26.log] command_exec STARTED command execution has started [/var/logs/rotated-logs/access-2023-09-26.log] command_exec FINISHED command execution has finished [/var/logs/rotated-logs/access-2023-09-28.log] command_exec STARTED command execution has started [/var/logs/rotated-logs/access-2023-09-28.log] command_exec FINISHED command execution has finished [/var/logs/rotated-logs/access-2023-12-28.log] command_exec SKIPPED skipped due to "without_errors" target files policy [/var/logs/rotated-logs/access-2023-12-23.log] command_exec SKIPPED skipped due to "without_errors" target files policy [/var/logs/rotated-logs/access-2023-09-26.log] filesystem_input_write STARTED file write started [/var/logs/rotated-logs/access-2023-09-28.log] filesystem_input_write STARTED file write started [/var/logs/rotated-logs/access-2023-09-26.log] filesystem_input_write FINISHED file write finished [/var/logs/rotated-logs/access-2023-09-28.log] filesystem_input_write FINISHED file write finished [/var/logs/rotated-logs/access-2023-12-28.log] filesystem_input_write SKIPPED skipped due to "without_errors" target files policy [/var/logs/rotated-logs/access-2023-12-23.log] filesystem_input_write SKIPPED skipped due to "without_errors" target files policy [/var/logs/rotated-logs/access-2023-09-28.log] filesystem_input_remove STARTED file remove started [/var/logs/rotated-logs/access-2023-09-28.log] filesystem_input_remove FINISHED file remove finished [/var/logs/rotated-logs/access-2023-09-26.log] filesystem_input_remove STARTED file remove started [/var/logs/rotated-logs/access-2023-09-26.log] filesystem_input_remove FINISHED file remove finished [/var/logs/rotated-logs/access-2023-12-28.log] filesystem_input_remove SKIPPED skipped due to "without_errors" target files policy [/var/logs/rotated-logs/access-2023-12-23.log] filesystem_input_remove SKIPPED skipped due to "without_errors" target files policy ...
Пример 2: загружаем документы по HTTP
Допустим, нам нужен HTTP эндпоинт принимающий лишь документы, которые не больше 10МБ, и затем загружать их в S3 хранилище.
Как обычно, пишем конфиг:
version: '1.2' name: documents processors: - name: upload operations: - name: http_multipart_form_input_read - name: file_size_validate params: maxFileSize: sourceType: value source: 1048576 - name: file_type_validate params: allowedMimeTypes: sourceType: value source: - application/pdf - application/msword - application/vnd.openxmlformats-officedocument.wordprocessingml.document - name: s3_upload params: accessKeyId: sourceType: secret source: aws_access_key_id secretAccessKey: sourceType: secret source: aws_secret_access_key endpoint: sourceType: env_var source: AWS_ENDPOINT region: sourceType: env_var source: AWS_REGION bucket: sourceType: env_var source: AWS_DOCUMENTS_BUCKET
И запускаем с помощью capysvr завернутого в Docker:
docker run \ --name capyfile_server \ --mount type=bind,source=./docs.pipeline.yml,target=/etc/capyfile/docs.pipeline.yml \ --env CAPYFILE_SERVICE_DEFINITION_FILE=/etc/capyfile/docs.pipeline.yml \ --env AWS_ENDPOINT=s3.amazonaws.com \ --env AWS_REGION=us-west-1 \ --env AWS_DOCUMENTS_BUCKET=my_documents_bucket \ --secret aws_access_key_id \ --secret aws_secret_access_key \ -p 8024:80 \ capyfile/capysvr:latest
Теперь обратиться к нему по HTTP можно так:
curl \ -F "file1=@$HOME/Documents/document.pdf" \ -F "file2=@$HOME/Documents/document.docx" \ -F "file3=@$HOME/Documents/very-large-document.pdf" \ -F "file4=@$HOME/Documents/program.run" \ http://localhost/documents/upload
В ответ сервер вернет JSON подобного формата:
{ "status": "PARTIAL", "code": "PARTIAL", "message": "successfully uploaded 2 of 4 files", "files": [ { "url": "https://my_documents_bucket.s3.us-west-1.amazonaws.com/abcdKDNJW_DDWse.pdf", "filename": "abcdKDNJW_DDWse.pdf", "originalFilename": "document.pdf", "mime": "application/pdf", "size": 5892728, "status": "SUCCESS", "code": "FILE_SUCCESSFULLY_UPLOADED", "message": "file successfully uploaded" }, { "url": "https://my_documents_bucket.s3.us-west-1.amazonaws.com/abcdKDNJW_DDWsd.docx", "filename": "abcdKDNJW_DDWsd.docx", "originalFilename": "document.docx", "mime": "application/vnd.openxmlformats-officedocument.wordprocessingml.document", "size": 3145728, "status": "SUCCESS", "code": "FILE_SUCCESSFULLY_UPLOADED", "message": "file successfully uploaded" } ], "errors": [ { "originalFilename": "very-large-document.pdf", "status": "ERROR", "code": "FILE_IS_TOO_BIG", "message": "file size can not be greater than 10 MB" }, { "originalFilename": "program.run", "status": "ERROR", "code": "FILE_MIME_TYPE_IS_NOT_ALLOWED", "message": "file MIME type \"application/x-makeself\" is not allowed" } ], "meta": { "totalUploads": 4, "successfulUploads": 2, "failedUploads": 2 } }
В конце
Ну в общем такое я пишу по выходным. Роботы там еще много, но уже что-то вырисовывается, чему я даже немного рад. Кому идея понравилась, загляните в репозиторий.
