В этой статье будет показано как создать Rust бэкэнд, который использует MongoDB, документо-ориентированную БД, для хранения данных и Redis для кэширования, ограничения количества HTTP запросов и нотификаций пользователя. Для большей наглядности созданное приложение также будет предоставлять REST API. В итоге будет получена следующая архитектура:
MongoDB является хранилищем, в то время как Redis используется для следующего:
- кэш (включая изображения)
- ограничение количества HTTP запросов
- нотификации с использованием паттерна publish-subscribe
Обратите внимание, что указанные сценарии использования не означают, что для похожего сценария вам нужно использовать подход, описанный в статье. Примеры в первую очередь имеют целью познакомить вас с MongoDB и Redis.
Проект реализован с помощью MongoDB Rust driver и крейта redis-rs
.
Вы сможете протестировать REST API приложения, поскольку оно развёрнуто на Google Cloud Platform.
Доменная модель включает данные о планетах Солнечной системы и их спутниках.
Запуск MongoDB и Redis
Этот раздел не требует навыков программирования на Rust и может быть использован вне зависимости от языка программирования приложения.
Обе тулы могут быть запущены как Docker контейнер:
version: '3.8'
services:
...
mongodb:
image: mongo
container_name: mongodb
restart: always
environment:
MONGO_INITDB_ROOT_USERNAME: $MONGODB_USERNAME
MONGO_INITDB_ROOT_PASSWORD: $MONGODB_PASSWORD
MONGO_INITDB_DATABASE: solar_system_info
ports:
- 27017:27017
mongodb-seed:
image: mongo
container_name: mongodb-seed
depends_on:
- mongodb
volumes:
- ./mongodb-init:/mongodb-init
links:
- mongodb
command:
mongoimport --host mongodb --db solar_system_info --collection planets --authenticationDatabase admin --username $MONGODB_USERNAME --password $MONGODB_PASSWORD --drop --jsonArray --file /mongodb-init/init.json
redis:
image: redis:alpine
container_name: redis
ports:
- 6379:6379
Назначение контейнера mongodb-seed
будет показано далее.
Вы можете получить доступ к mongo
shell с помощью следующей команды:
docker exec -it mongodb mongo --username admin --password password
(где mongodb
— название Docker контейнера, mongo
— shell)
Теперь вы можете выполнять команды, например:
- получить список баз данных с помощью
show dbs
- получить все данные в определённой базе данных:
use solar_system_info
show collections
db.planets.find()
Доступ к Redis CLI может быть получен с помощью следующей команды:
docker exec -it redis redis-cli
Простейший пример команды выглядит так:
Пример команды Redis
> set mykey somevalue
OK
> get mykey
"somevalue"
Для получения списка ключей используйте команду keys *
.
Вы можете найти больше примеров команд для Redis CLI в этом гайде.
Инициализация данных
MongoDB инициализируется данными в формате JSON с использованием контейнера mongodb-seed
и команды mongoimport
:
mongodb-seed:
image: mongo
container_name: mongodb-seed
depends_on:
- mongodb
volumes:
- ./mongodb-init:/mongodb-init
links:
- mongodb
command:
mongoimport --host mongodb --db solar_system_info --collection planets --authenticationDatabase admin --username $MONGODB_USERNAME --password $MONGODB_PASSWORD --drop --jsonArray --file /mongodb-init/init.json
Также инициализация БД может быть выполнена с использованием JavaScript файла.
Приложение также работает с изображениями планет. Первоначально я собирался хранить их в MongoDB; это может быть сделано с помощью следующей команды:
mongofiles --host mongodb --db solar_system_info --authenticationDatabase admin --username $MONGODB_USERNAME --password $MONGODB_PASSWORD put /mongodb-init/images/*.jpg
Однако вскоре обнаружилось, что изображения не могут быть получены из БД из-за отсутствия поддержки GridFS в MongoDB Rust Driver (открытая задача). Поэтому для простоты используется крейт rust_embed, который позволяет включить изображения в бинарный исполняемый файл приложения во время компиляции (при разработке изображения загружаются из файловой системы). (Изображения также возможно хранить отдельно от приложения; папка images
должна быть смонтирована как volume в определении Docker Compose сервиса)
Далее будет показано как использовать MongoDB и Redis в Rust приложении.
Имплементация приложения
Зависимости
Приложение имплементировано с помощью:
[package]
name = "mongodb-redis"
version = "0.1.0"
edition = "2018"
[dependencies]
mongodb = "2.0.0-beta.1"
redis = { version = "0.20.2", features = ["tokio-comp", "connection-manager"] }
actix-web = "4.0.0-beta.7"
tokio = "1.7.1"
tokio-stream = "0.1.6"
chrono = { version = "0.4.19", features = ["serde"] }
serde = "1.0.126"
serde_json = "1.0.64"
dotenv = "0.15.0"
derive_more = "0.99.14"
log = "0.4.14"
env_logger = "0.8.4"
rust-embed = "5.9.0"
mime = "0.3.16"
Структура проекта
├───images
│
├───mongodb-init
│ init.json
│
└───src
db.rs
dto.rs
errors.rs
handlers.rs
index.html
main.rs
model.rs
redis.rs
services.rs
Функция main
#[actix_web::main]
async fn main() -> std::io::Result<()> {
dotenv::from_filename(".env.local").ok();
env_logger::init();
info!("Starting MongoDB & Redis demo server");
let mongodb_uri = env::var("MONGODB_URI").expect("MONGODB_URI env var should be specified");
let mongodb_client = MongoDbClient::new(mongodb_uri).await;
let redis_uri = env::var("REDIS_URI").expect("REDIS_URI env var should be specified");
let redis_client = redis::create_client(redis_uri)
.await
.expect("Can't create Redis client");
let redis_connection_manager = redis_client
.get_tokio_connection_manager()
.await
.expect("Can't create Redis connection manager");
let planet_service = Arc::new(PlanetService::new(
mongodb_client,
redis_client,
redis_connection_manager.clone(),
));
let rate_limiting_service = Arc::new(RateLimitingService::new(redis_connection_manager));
...
}
Здесь определён кастомный MongoDbClient
, клиент Redis и менеджер соединений Redis.
Работа с MongoDB
Начнём с функции, возвращающей список планет, хранящихся в БД, и использующей асинхронный API:
Функция, возвращающая список планет
const DB_NAME: &str = "solar_system_info";
const COLLECTION_NAME: &str = "planets";
pub async fn get_planets(
&self,
planet_type: Option<PlanetType>,
) -> Result<Vec<Planet>, CustomError> {
let filter = planet_type.map(|pt| {
doc! { "type": pt.to_string() }
});
let mut planets = self.get_planets_collection().find(filter, None).await?;
let mut result: Vec<Planet> = Vec::new();
while let Some(planet) = planets.next().await {
result.push(planet?);
}
Ok(result)
}
fn get_planets_collection(&self) -> Collection<Planet> {
self.client
.database(DB_NAME)
.collection::<Planet>(COLLECTION_NAME)
}
get_planets
также включает пример фильтрации документов MongoDB по определённому критерию.
Модель данных выглядит так:
#[derive(Serialize, Deserialize, Debug)]
pub struct Planet {
#[serde(rename = "_id", skip_serializing_if = "Option::is_none")]
pub id: Option<ObjectId>,
pub name: String,
pub r#type: PlanetType,
pub mean_radius: f32,
pub satellites: Option<Vec<Satellite>>,
}
#[derive(Copy, Clone, Eq, PartialEq, Serialize, Deserialize, Debug)]
pub enum PlanetType {
TerrestrialPlanet,
GasGiant,
IceGiant,
DwarfPlanet,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct Satellite {
pub name: String,
pub first_spacecraft_landing_date: Option<mongodb::bson::DateTime>,
}
Структуры содержат поля "обычных" типов (string
, f32
), а также:
ObjectId
(Planet.id
)- список (
Planet.satellites
) - дата/timestamp (
Satellite.first_spacecraft_landing_date
) - перечисление (
Planet.type
) - nullable поля (
Planet.id
,Planet.satellites
)
Проект также включает примеры получения, создания, обновления и удаления MongoDB документов. Я не буду подробно останавливаться на этих функциях ввиду очевидности кода их имплементации. Вы можете протестировать эти функции используя REST API:
получение всего списка
GET
http://localhost:9000/planets
Пример с фильтрацией:
GET
http://localhost:9000/planets?type=IceGiant
создание
POST
http://localhost:9000/planets
Body:
{ "name": "Pluto", "type": "DwarfPlanet", "mean_radius": 1188, "satellites": null }
получение по id
GET
http://localhost:9000/{planet_id}
обновление
PUT
http://localhost:9000/{planet_id}
Body:
{ "name": "Mercury", "type": "TerrestrialPlanet", "mean_radius": 2439.7, "satellites": null }
удаление
DELETE
http://localhost:9000/{planet_id}
получение изображения планеты
GET
http://localhost:9000/planets/{planet_id}/image
Используйте этот метод для тестирования кэширования с помощью Redis
MongoDB документы хранятся в формате BSON.
Работа с Redis
Redis клиент создаётся следующим образом:
pub async fn create_client(redis_uri: String) -> Result<Client, RedisError> {
Ok(Client::open(redis_uri)?)
}
Менеджер соединений Redis может быть создан так:
Получение менеджера соединений Redis
let redis_client = redis::create_client(redis_uri)
.await
.expect("Can't create Redis client");
let redis_connection_manager = redis_client
.get_tokio_connection_manager()
.await
.expect("Can't create Redis connection manager");
Кэширование
Рассмотрим функцию сервисного слоя, использующуюся для получения планеты по id:
pub async fn get_planet(&self, planet_id: &str) -> Result<Planet, CustomError> {
let cache_key = self.get_planet_cache_key(planet_id);
let mut con = self.redis_client.get_async_connection().await?;
let cached_planet = con.get(&cache_key).await?;
match cached_planet {
Value::Nil => {
debug!("Use database to retrieve a planet by id: {}", &planet_id);
let result: Planet = self
.mongodb_client
.get_planet(ObjectId::from_str(planet_id)?)
.await?;
let _: () = redis::pipe()
.atomic()
.set(&cache_key, &result)
.expire(&cache_key, 60)
.query_async(&mut con)
.await?;
Ok(result)
}
Value::Data(val) => {
debug!("Use cache to retrieve a planet by id: {}", planet_id);
Ok(serde_json::from_slice(&val)?)
}
_ => Err(RedisError {
message: "Unexpected response from Redis".to_string(),
}),
}
}
Если ключа нет в кэше (ветвь Nil
), пара ключ-значение помещается в Redis с помощью функции set
(с автоэкспирацией (её назначение будет показано далее)); во второй ветви выражения match
кэшированное значение конвертируется в целевую структуру.
Для помещения значения в кэш вам нужно имплементировать трейт ToRedisArgs
для структуры:
Имплементация трейта ToRedisArgs
impl ToRedisArgs for &Planet {
fn write_redis_args<W>(&self, out: &mut W)
where
W: ?Sized + RedisWrite,
{
out.write_arg_fmt(serde_json::to_string(self).expect("Can't serialize Planet as string"))
}
}
В функции get_planet
используется асинхронное соединение Redis. Следующий пример демонстрирует другой подход, ConnectionManager
, на примере очистки кэша с использованием функции del
:
pub async fn update_planet(
&self,
planet_id: &str,
planet: Planet,
) -> Result<Planet, CustomError> {
let updated_planet = self
.mongodb_client
.update_planet(ObjectId::from_str(planet_id)?, planet)
.await?;
let cache_key = self.get_planet_cache_key(planet_id);
self.redis_connection_manager.clone().del(cache_key).await?;
Ok(updated_planet)
}
Есть вероятность, что что-то пойдёт не так после успешного обновления (или удаления) сущности; например, проблема с сетью, что приведёт к ошибке при вызове Redis, или выключение/перезапуск приложения так, что функция del
не будет вызвана. Это может привести к некорректным данным в кэше; последствия этого могут быть уменьшены за счёт автоэкспирации кэшированных записей, указанной ранее.
ConnectionManager
может быть клонирован. Он также используется во всех оставшихся примерах использования Redis вместо Redis клиента.
Кэш изображений может быть имплементирован так же, как и кэши других типов данных (используя функции set
/get
):
pub async fn get_image_of_planet(&self, planet_id: &str) -> Result<Vec<u8>, CustomError> {
let cache_key = self.get_image_cache_key(planet_id);
let mut redis_connection_manager = self.redis_connection_manager.clone();
let cached_image = redis_connection_manager.get(&cache_key).await?;
match cached_image {
Value::Nil => {
debug!(
"Use database to retrieve an image of a planet by id: {}",
&planet_id
);
let planet = self
.mongodb_client
.get_planet(ObjectId::from_str(planet_id)?)
.await?;
let result = crate::db::get_image_of_planet(&planet.name).await;
let _: () = redis::pipe()
.atomic()
.set(&cache_key, result.clone())
.expire(&cache_key, 60)
.query_async(&mut redis_connection_manager)
.await?;
Ok(result)
}
Value::Data(val) => {
debug!(
"Use cache to retrieve an image of a planet by id: {}",
&planet_id
);
Ok(val)
}
_ => Err(RedisError {
message: "Unexpected response from Redis".to_string(),
}),
}
}
Кэширование может быть протестировано с использованием REST API, описанного выше.
Ограничение количества HTTP запросов
Эта фича реализована в соответствии с официальным гайдом следующим образом:
#[derive(Clone)]
pub struct RateLimitingService {
redis_connection_manager: ConnectionManager,
}
impl RateLimitingService {
pub fn new(redis_connection_manager: ConnectionManager) -> Self {
RateLimitingService {
redis_connection_manager,
}
}
pub async fn assert_rate_limit_not_exceeded(&self, ip_addr: String) -> Result<(), CustomError> {
let current_minute = Utc::now().minute();
let rate_limit_key = format!("{}:{}:{}", RATE_LIMIT_KEY_PREFIX, ip_addr, current_minute);
let (count,): (u64,) = redis::pipe()
.atomic()
.incr(&rate_limit_key, 1)
.expire(&rate_limit_key, 60)
.ignore()
.query_async(&mut self.redis_connection_manager.clone())
.await?;
if count > MAX_REQUESTS_PER_MINUTE {
Err(TooManyRequests {
actual_count: count,
permitted_count: MAX_REQUESTS_PER_MINUTE,
})
} else {
Ok(())
}
}
}
Redis ключ создаётся на каждую минуту + IP адрес клиента. После каждого вызова функции assert_rate_limit_not_exceeded
значение, соответствующее ключу, инкрементируется на 1. Чтобы хранилище не переполнилось из-за большого количества ранее созданных пар ключ-значение, ключ "экспайрится" через минуту.
Rate limiter может быть использован в Actix обработчике следующим образом:
pub async fn get_planets(
req: HttpRequest,
web::Query(query_params): web::Query<GetPlanetsQueryParams>,
rate_limit_service: web::Data<Arc<RateLimitingService>>,
planet_service: web::Data<Arc<PlanetService>>,
) -> Result<HttpResponse, CustomError> {
rate_limit_service
.assert_rate_limit_not_exceeded(get_ip_addr(&req)?)
.await?;
let planets = planet_service.get_planets(query_params.r#type).await?;
Ok(HttpResponse::Ok().json(planets.into_iter().map(PlanetDto::from).collect::<Vec<_>>()))
}
Если вызывать метод получения списка планет слишком часто, то будет получена следующая ошибка:
Нотификации
В этом проекте нотификации реализованы с помощью Redis Pub/Sub и Server-Sent Events для доставки сообщений пользователю.
При создании сущности публикуется событие:
pub async fn create_planet(&self, planet: Planet) -> Result<Planet, CustomError> {
let planet = self.mongodb_client.create_planet(planet).await?;
self.redis_connection_manager
.clone()
.publish(
NEW_PLANETS_CHANNEL_NAME,
serde_json::to_string(&PlanetMessage::from(&planet))?,
)
.await?;
Ok(planet)
}
Подписка реализуется так:
pub async fn get_new_planets_stream(
&self,
) -> Result<Receiver<Result<Bytes, CustomError>>, CustomError> {
let (tx, rx) = mpsc::channel::<Result<Bytes, CustomError>>(100);
tx.send(Ok(Bytes::from("data: Connected\n\n")))
.await
.expect("Can't send a message to the stream");
let mut pubsub_con = self
.redis_client
.get_async_connection()
.await?
.into_pubsub();
pubsub_con.subscribe(NEW_PLANETS_CHANNEL_NAME).await?;
tokio::spawn(async move {
while let Some(msg) = pubsub_con.on_message().next().await {
let payload = msg.get_payload().expect("Can't get payload of message");
let payload: String = FromRedisValue::from_redis_value(&payload)
.expect("Can't convert from Redis value");
let msg = Bytes::from(format!("data: Planet created: {:?}\n\n", payload));
tx.send(Ok(msg))
.await
.expect("Can't send a message to the stream");
}
});
Ok(rx)
}
Подписка используется в Actix обработчике так:
pub async fn sse(
planet_service: web::Data<Arc<PlanetService>>,
) -> Result<HttpResponse, CustomError> {
let new_planets_stream = planet_service.get_new_planets_stream().await?;
let response_stream = tokio_stream::wrappers::ReceiverStream::new(new_planets_stream);
Ok(HttpResponse::build(StatusCode::OK)
.insert_header(header::ContentType(mime::TEXT_EVENT_STREAM))
.streaming(response_stream))
}
Чтобы протестировать нотификации, вам нужно подписаться на события и сгенерировать событие. Далее приведены два подхода для этого; в обоих событие генерируется с использованием cURL:
подписка из браузера
Перейдите на
http://localhost:9000/
, где находится HTML страница:
подписка из командной строки с использованием cURL
Используйте
curl -X GET localhost:9000/events
:
Для генерации события используется следующий cURL запрос:
Запрос для тестирования нотификаций
curl -X POST -H 'Content-Type: application/json' -d '{
\"name\": \"Pluto\",
\"type\": \"DwarfPlanet\",
\"mean_radius\": 1188,
\"satellites\": null
}' localhost:9000/planets
Веб приложение
Некоторые аспекты этой темы были включены в предыдущие разделы, поэтому здесь будут освещены некоторые из оставшихся тем.
REST API обработчики
REST API обработчики определены так:
Определение REST API обработчиков
#[actix_web::main]
async fn main() -> std::io::Result<()> {
...
let enable_write_handlers = env::var("ENABLE_WRITE_HANDLERS")
.expect("ENABLE_WRITE_HANDLERS env var should be specified")
.parse::<bool>()
.expect("Can't parse ENABLE_WRITE_HANDLERS");
HttpServer::new(move || {
let mut app = App::new()
.route("/planets", web::get().to(handlers::get_planets))
.route("/planets/{planet_id}", web::get().to(handlers::get_planet))
.route(
"/planets/{planet_id}/image",
web::get().to(handlers::get_image_of_planet),
)
.route("/events", web::get().to(handlers::sse))
.route("/", web::get().to(handlers::index))
.data(Arc::clone(&planet_service))
.data(Arc::clone(&rate_limiting_service));
if enable_write_handlers {
app = app
.route("/planets", web::post().to(handlers::create_planet))
.route(
"/planets/{planet_id}",
web::put().to(handlers::update_planet),
)
.route(
"/planets/{planet_id}",
web::delete().to(handlers::delete_planet),
);
}
app
})
.bind("0.0.0.0:9000")?
.run()
.await
}
Обработка ошибок
Обработка ошибок имплементирована в соответствии с документацией.
Запуск и тестирование
Локально проект может быть запущен двумя способами:
с использованием Docker Compose (
docker-compose.yml
):
docker compose up
(илиdocker-compose up
в более старых версиях Docker)
без использования Docker
Запустите приложение с помощью
cargo run
(в этом случае сервисmongodb-redis
вdocker-compose.yml
должен быть отключён)
CI/CD
CI/CD сконфигурировано с помощью GitHub Actions workflow, который собирает Docker образ приложения и разворачивает его на Google Cloud Platform.
Для доступа к REST API развёрнутого приложения вы можете использовать один из доступных GET
эндпоинтов, например:
GET http://demo.romankudryashov.com:9000/planets
Пишущие методы REST API недоступны на production среде.
Заключение
В этой статье я показал как начать работу с MongoDB и Redis и примеры их использования в Rust приложении. Не стесняйтесь написать мне, если нашли какие-либо ошибки в статье или исходном коде. Спасибо за внимание!