Недавно столкнулся с проблемой подружить веб-фреймворк Axum и библиотеку rust-s3. Собственно, задача сделать 2 эндпойнта:
Загрузка файла в хранилище и получение ссылки
Скачка файла из хранилища по ссылке
Разумеется, без временных файлов и без удержания целиком всех данных файла в памяти.
Так как для работы с S3 нужны некоторые служебные объекты (настройки доступа к конкретному bucket), вынесем непосредственную работу в структуру UploadService:
#[derive(Clone)] pub struct UploadService { bucket: Arc<s3::Bucket> }
Для внедрения зависимости (DI) в обработчик эндпойнта, нам необходимо, чтобы наша структура реализовывала трейт Clone. Так как сервис будет клонироваться на каждый запрос, обернём s3::Bucket в Arc, чтобы клонирование было максимально дешёвым.
Теперь реализуем конструктор экземпляра сервиса:
use s3::{Bucket, Region}; use s3::creds::Credentials; ... impl UploadService { pub fn new() -> Self { let bucket_name = std::env::var("UPLOAD_BUCKET_NAME") .expect("Expected UPLOAD_BUCKET_NAME environment variable"); let region = Region::Custom { region: std::env::var("UPLOAD_BUCKET_REGION") .expect("Expected UPLOAD_BUCKET_REGION environment variable"), endpoint: std::env::var("UPLOAD_BUCKET_ENDPOINT") .expect("Expected UPLOAD_BUCKET_ENDPOINT environment variable") }; let credentials = Credentials::new( Some( &std::env::var("UPLOAD_BUCKET_ACCESS_KEY") .expect("Expected UPLOAD_BUCKET_ACCESS_KEY environment variable") ), Some( &std::env::var("UPLOAD_BUCKET_SECRET_KEY") .expect("Expected UPLOAD_BUCKET_SECRET_KEY environment variable") ), None, None, None ).unwrap(); let bucket = Bucket::new(&bucket_name, region, credentials).unwrap() .with_path_style(); Self { bucket: Arc::new(bucket) } } ...
Сервис конфигурируется с помощью переменных окружения UPLOAD_BUCKET_NAME, UPLOAD_BUCKET_REGION, UPLOAD_BUCKET_ACCESS_KEY, UPLOAD_BUCKET_SECRET_KEY и UPLOAD_BUCKET_ENDPOINT. Последний параметр необходим так как я использую не Amazon S3, а другого S3-совместимого провайдера (Scaleway). При использовании Amazon S3 можно явно задать нужный регион с помощью одного из значений из перечисления s3::Region (например, s3::Region::UsWest1), либо воспользоваться s3::Region::from_str для парсинга региона из строки типа us-west-1. Кстати, в наборе перечислений региона помимо стандартных регионов Amazon есть ещё Digital Ocean, Wasabi и Yandex.
Теперь самое сложное – функция загрузки файла в хранилище:
use std::sync::{Arc, Mutex}; use std::path::Path; use std::ffi::OsStr; use axum::http::StatusCode; use axum::extract::multipart::Field; use async_hash::{Sha256, Digest}; use async_compat::CompatExt; use futures::TryStreamExt; use uuid::Uuid; ... pub async fn upload<'a>(&self, field: Field<'a>) -> Result<String, StatusCode> { let orig_filename = field.file_name() .unwrap_or("file") .to_owned(); let mimetype = field.content_type() .unwrap_or("application/octet-stream") .to_owned(); let digest = Arc::new(Mutex::new(Sha256::new())); let mut reader = field .map_ok(|chunk| { if let Ok(mut digest) = digest.lock() { digest.update(&chunk); } chunk }) .map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)) .into_async_read() .compat(); let tmp_filename = format!("tmp/{}.bin", Uuid::new_v4()); self.bucket.put_object_stream_with_content_type( &mut reader, &tmp_filename, &mimetype ) .await .map_err(|err| { log::error!("S3 upload error: {:?}", err); StatusCode::INTERNAL_SERVER_ERROR })?; drop(reader); // Release digest borrow let mut result = Err(StatusCode::INTERNAL_SERVER_ERROR); if let Some(digest) = Arc::into_inner(digest).and_then(|m| m.into_inner().ok()) { let digest = hex::encode(digest.finalize()); let ext = Path::new(&orig_filename).extension().and_then(OsStr::to_str); let mut filename = if let Some(ext) = ext { format!("{}.{}", digest, ext) } else { digest }; filename.make_ascii_lowercase(); match self.bucket.copy_object_internal(&tmp_filename, &filename).await { Ok(_) => result = Ok(format!("/uploads/{}", &filename)), Err(err) => log::error!("S3 copy error: {:?}", err) } } if let Err(err) = self.bucket.delete_object(&tmp_filename).await { log::error!("S3 delete error: {:?}", err); } result } ...
Функция принимает одно поле из multipart/form-data запроса (обработка запроса будет рассмотрена ниже), определяет исходное имя файла и mime-тип (если эти данные отсутствуют, используется “file” и “application/octet-stream” в качестве значений по умолчанию). Затем данные поля превращаются в AsyncRead с помощью библиотеки async-compat. При этом наш читатель потока по мере чтения потока вычисляет его SHA256 хеш (пригодится в будущем).
Теперь мы можем загрузить файл в S3-хранилище под временным именем “tmp/<UUID>.bin” (UUID генерируется случайным образом). Если в этот момент возникает ошибка, функция возвращает код Internal Server Error.
Мы имеем файл в S3-хранилище и посчитанный SHA256 от его данных. Теперь можно переименовать файл в его окончательное имя (я хочу использовать SHA256 в качестве имени файла, чтобы одинаковые файлы не дублировались в хранилище). Для этого я беру HEX-представление SHA256 и приписываю расширение файла взятое из оригинального имени (если оно там было). Результат приводится к нижнему регистру (на случай если расширение файла было не в нижнем регистре) и далее мы выполняем копирование S3-объекта (так как API S3 не имеет функции переименования). Если копирование успешно, то у нас получается результирующий URL-файла.
Наконец, можно удалить временный объект из S3. Это происходит в любом случае – и если копирование было успешным, и если нет.
Последняя функция нашего сервиса – отдача файла по ссылке (теоретически это можно делегировать веб-серверу, но как минимум удобно иметь эту функцию при локальной разработке, как максимум нам может требоваться реализовать какую-нибудь дополнительную бизнес-логику вроде проверки прав доступа к файлу):
use axum::response::IntoResponse; use axum::body::StreamBody; use s3::error::S3Error; ... pub async fn download( &self, filename: &str ) -> Result<impl IntoResponse, StatusCode> { let stream = self.bucket.get_object_stream(filename) .await .map_err(|err| match err { S3Error::HttpFailWithBody(status_code, body) => match status_code { 404 => StatusCode::NOT_FOUND, _ => { log::error!( "S3 download HTTP error with code={} and body={:?}", status_code, body ); StatusCode::INTERNAL_SERVER_ERROR } } err => { log::error!("S3 download error: {:?}", err); StatusCode::INTERNAL_SERVER_ERROR } })?; Ok(StreamBody::from(stream.bytes)) } }
Здесь всё просто – получаем стрим S3-объекта, маппим ошибку отсутствия файла на 404-ую ошибку Axum, а остальные ошибки на 500-ую, возвращаем StreamBody.
Остаётся реализовать обработчики самих эндпойнтов:
use axum::{Extension, Json}; use axum::extract::{Multipart, Path}; use axum::http::{HeaderMap, HeaderValue, StatusCode}; use axum::http::header::CACHE_CONTROL; use axum::response::IntoResponse; #[derive(Debug, serde::Serialize)] pub struct UploadResponse { pub url: String } pub async fn upload_file( Extension(upload_service): Extension<UploadService>, mut multipart: Multipart ) -> Result<impl IntoResponse, StatusCode> { while let Some(field) = multipart.next_field().await.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR )? { if let Some("upload") = field.name() { let url = upload_service.upload(field).await?; return Ok(Json(UploadResponse { url })); } } Err(StatusCode::BAD_REQUEST) } pub async fn download_file( Path(path): Path<String>, Extension(upload_service): Extension<UploadService> ) -> Result<impl IntoResponse, StatusCode> { let body = upload_service.download(&path).await?; let headers = HeaderMap::from_iter([ (CACHE_CONTROL, HeaderValue::from_str("max-age=31536000").unwrap()) // One year ]); Ok((headers, body)) }
Обработчик загрузки загружает по одному файлу за раз, при этом имя поля файла в отправленной форме ожидается “upload”. Обработчик скачки файла выставляет срок жизни файла в кеше один год, потому что изменения файла не предполагаются (если файл изменится, он будет иметь другой SHA256 и другое имя).
Последнее, что нам остаётся – создать роутер и запустить сервер:
use std::str::FromStr; use axum::extract::DefaultBodyLimit; #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { env_logger::init_from_env(env_logger::Env::default().default_filter_or("info")); let upload_service = UploadService::new(); let router = axum::Router::new() .route("/uploads", axum::routing::post(upload_file)) .route("/uploads/*path", axum::routing::get(download_file)) .layer(Extension(upload_service)) .layer(DefaultBodyLimit::max(8 * 1024 * 1024)); let address = std::env::var("HOST").expect("Expected HOST environment variable"); let port = std::env::var("PORT").expect("Expected PORT environment variable") .parse::<u16>().expect("PORT environment variable must be an integer"); log::info!("Listening on http://{}:{}/", address, port); axum::Server::bind( &std::net::SocketAddr::new( std::net::IpAddr::from_str(&address).unwrap(), port ) ).serve(router.into_make_service()).await?; Ok(()) }
Экземляр UploadService передаётся через Extension (механизм DI в Axum), также может быть полезно задать DefaultBodyLimit, потому что стандартное значение 1 МБ может подходить не для всех ситуаций. Хост и порт для прослушивания получаются из соответствующих переменных окружения.
Вероятно, нам также может требоваться добавить какую-нибудь проверку авторизации в эндпойнт загрузки (а, возможно, и скачки), но это зависит от конкретной функции конкретного сервиса.
Зависимости в Cargo.toml:
[package] name = "uploader" version = "0.1.0" edition = "2021" [dependencies] log = "0.4.20" env_logger = "0.10.0" tokio = { version = "1", features = ["macros", "rt-multi-thread"] } axum = { version = "0.6.20", features = ["multipart"] } serde = "1.0.188" uuid = { version = "1.4.1", features = ["v4"] } rust-s3 = "0.34.0-rc1" futures = "0.3.28" async-compat = "0.2.2" async-hash = "0.5.1" hex = "0.4.3"
В качестве бонуса пример браузерного кода на TypeScript выполняющего загрузку файла:
interface UploadResponse { url: string; } async function uploadFile(file: Blob, filename?: string): Promise<UploadResponse | "error"> { const data = new FormData(); data.append("upload", file, filename); const response = await fetch("/uploads", { method: "post", body: data }); if (response.status >= 200 && response.status <= 299) { return await response.json(); } else { return "error"; } }
Наш сервис готов.
