Pull to refresh

Начало работы с MongoDB и Redis на Rust

Reading time13 min
Views11K

В этой статье будет показано как создать Rust бэкэнд, который использует MongoDB, документо-ориентированную БД, для хранения данных и Redis для кэширования, ограничения количества HTTP запросов и нотификаций пользователя. Для большей наглядности созданное приложение также будет предоставлять REST API. В итоге будет получена следующая архитектура:


architecture


MongoDB является хранилищем, в то время как Redis используется для следующего:



Обратите внимание, что указанные сценарии использования не означают, что для похожего сценария вам нужно использовать подход, описанный в статье. Примеры в первую очередь имеют целью познакомить вас с MongoDB и Redis.


Проект реализован с помощью MongoDB Rust driver и крейта redis-rs.


Вы сможете протестировать REST API приложения, поскольку оно развёрнуто на Google Cloud Platform.


Доменная модель включает данные о планетах Солнечной системы и их спутниках.


Запуск MongoDB и Redis


Этот раздел не требует навыков программирования на Rust и может быть использован вне зависимости от языка программирования приложения.


Обе тулы могут быть запущены как Docker контейнер:


docker-compose.yml


version: '3.8'

services:

  ...

  mongodb:
    image: mongo
    container_name: mongodb
    restart: always
    environment:
      MONGO_INITDB_ROOT_USERNAME: $MONGODB_USERNAME
      MONGO_INITDB_ROOT_PASSWORD: $MONGODB_PASSWORD
      MONGO_INITDB_DATABASE: solar_system_info
    ports:
      - 27017:27017

  mongodb-seed:
    image: mongo
    container_name: mongodb-seed
    depends_on:
      - mongodb
    volumes:
      - ./mongodb-init:/mongodb-init
    links:
      - mongodb
    command:
      mongoimport --host mongodb --db solar_system_info --collection planets --authenticationDatabase admin --username $MONGODB_USERNAME --password $MONGODB_PASSWORD --drop --jsonArray --file /mongodb-init/init.json

  redis:
    image: redis:alpine
    container_name: redis
    ports:
      - 6379:6379

Назначение контейнера mongodb-seed будет показано далее.


Вы можете получить доступ к mongo shell с помощью следующей команды:


docker exec -it mongodb mongo --username admin --password password


(где mongodb — название Docker контейнера, mongo — shell)


Теперь вы можете выполнять команды, например:


  • получить список баз данных с помощью show dbs
  • получить все данные в определённой базе данных:
    • use solar_system_info
    • show collections
    • db.planets.find()

Доступ к Redis CLI может быть получен с помощью следующей команды:


docker exec -it redis redis-cli


Простейший пример команды выглядит так:


Пример команды Redis


> set mykey somevalue
OK
> get mykey
"somevalue"

Для получения списка ключей используйте команду keys *.


Вы можете найти больше примеров команд для Redis CLI в этом гайде.


Инициализация данных


MongoDB инициализируется данными в формате JSON с использованием контейнера mongodb-seed и команды mongoimport:


docker-compose.yml


mongodb-seed:
  image: mongo
  container_name: mongodb-seed
  depends_on:
    - mongodb
  volumes:
    - ./mongodb-init:/mongodb-init
  links:
    - mongodb
  command:
    mongoimport --host mongodb --db solar_system_info --collection planets --authenticationDatabase admin --username $MONGODB_USERNAME --password $MONGODB_PASSWORD --drop --jsonArray --file /mongodb-init/init.json

Также инициализация БД может быть выполнена с использованием JavaScript файла.


Приложение также работает с изображениями планет. Первоначально я собирался хранить их в MongoDB; это может быть сделано с помощью следующей команды:


mongofiles --host mongodb --db solar_system_info --authenticationDatabase admin --username $MONGODB_USERNAME --password $MONGODB_PASSWORD put /mongodb-init/images/*.jpg


Однако вскоре обнаружилось, что изображения не могут быть получены из БД из-за отсутствия поддержки GridFS в MongoDB Rust Driver (открытая задача). Поэтому для простоты используется крейт rust_embed, который позволяет включить изображения в бинарный исполняемый файл приложения во время компиляции (при разработке изображения загружаются из файловой системы). (Изображения также возможно хранить отдельно от приложения; папка images должна быть смонтирована как volume в определении Docker Compose сервиса)


Далее будет показано как использовать MongoDB и Redis в Rust приложении.


Имплементация приложения


Зависимости


Приложение имплементировано с помощью:



Cargo.toml


[package]
name = "mongodb-redis"
version = "0.1.0"
edition = "2018"

[dependencies]
mongodb = "2.0.0-beta.1"
redis = { version = "0.20.2", features = ["tokio-comp", "connection-manager"] }
actix-web = "4.0.0-beta.7"
tokio = "1.7.1"
tokio-stream = "0.1.6"
chrono = { version = "0.4.19", features = ["serde"] }
serde = "1.0.126"
serde_json = "1.0.64"
dotenv = "0.15.0"
derive_more = "0.99.14"
log = "0.4.14"
env_logger = "0.8.4"
rust-embed = "5.9.0"
mime = "0.3.16"

Структура проекта


Структура проекта


├───images
│
├───mongodb-init
│       init.json
│
└───src
        db.rs
        dto.rs
        errors.rs
        handlers.rs
        index.html
        main.rs
        model.rs
        redis.rs
        services.rs

Функция main


Функция main


#[actix_web::main]
async fn main() -> std::io::Result<()> {
    dotenv::from_filename(".env.local").ok();
    env_logger::init();

    info!("Starting MongoDB & Redis demo server");

    let mongodb_uri = env::var("MONGODB_URI").expect("MONGODB_URI env var should be specified");
    let mongodb_client = MongoDbClient::new(mongodb_uri).await;

    let redis_uri = env::var("REDIS_URI").expect("REDIS_URI env var should be specified");
    let redis_client = redis::create_client(redis_uri)
        .await
        .expect("Can't create Redis client");
    let redis_connection_manager = redis_client
        .get_tokio_connection_manager()
        .await
        .expect("Can't create Redis connection manager");

    let planet_service = Arc::new(PlanetService::new(
        mongodb_client,
        redis_client,
        redis_connection_manager.clone(),
    ));
    let rate_limiting_service = Arc::new(RateLimitingService::new(redis_connection_manager));

    ...
}

Здесь определён кастомный MongoDbClient, клиент Redis и менеджер соединений Redis.


Работа с MongoDB


Начнём с функции, возвращающей список планет, хранящихся в БД, и использующей асинхронный API:


Функция, возвращающая список планет


const DB_NAME: &str = "solar_system_info";
const COLLECTION_NAME: &str = "planets";

pub async fn get_planets(
    &self,
    planet_type: Option<PlanetType>,
) -> Result<Vec<Planet>, CustomError> {
    let filter = planet_type.map(|pt| {
        doc! { "type": pt.to_string() }
    });

    let mut planets = self.get_planets_collection().find(filter, None).await?;

    let mut result: Vec<Planet> = Vec::new();
    while let Some(planet) = planets.next().await {
        result.push(planet?);
    }

    Ok(result)
}

fn get_planets_collection(&self) -> Collection<Planet> {
    self.client
        .database(DB_NAME)
        .collection::<Planet>(COLLECTION_NAME)
}

get_planets также включает пример фильтрации документов MongoDB по определённому критерию.


Модель данных выглядит так:


Модель данных


#[derive(Serialize, Deserialize, Debug)]
pub struct Planet {
    #[serde(rename = "_id", skip_serializing_if = "Option::is_none")]
    pub id: Option<ObjectId>,
    pub name: String,
    pub r#type: PlanetType,
    pub mean_radius: f32,
    pub satellites: Option<Vec<Satellite>>,
}

#[derive(Copy, Clone, Eq, PartialEq, Serialize, Deserialize, Debug)]
pub enum PlanetType {
    TerrestrialPlanet,
    GasGiant,
    IceGiant,
    DwarfPlanet,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct Satellite {
    pub name: String,
    pub first_spacecraft_landing_date: Option<mongodb::bson::DateTime>,
}

Структуры содержат поля "обычных" типов (string, f32), а также:


  • ObjectId (Planet.id)
  • список (Planet.satellites)
  • дата/timestamp (Satellite.first_spacecraft_landing_date)
  • перечисление (Planet.type)
  • nullable поля (Planet.id, Planet.satellites)

Проект также включает примеры получения, создания, обновления и удаления MongoDB документов. Я не буду подробно останавливаться на этих функциях ввиду очевидности кода их имплементации. Вы можете протестировать эти функции используя REST API:


  • получение всего списка


    GET http://localhost:9000/planets


    Пример с фильтрацией:


    GET http://localhost:9000/planets?type=IceGiant


  • создание


    POST http://localhost:9000/planets


    Body:


    {
        "name": "Pluto",
        "type": "DwarfPlanet",
        "mean_radius": 1188,
        "satellites": null
    }

  • получение по id


    GET http://localhost:9000/{planet_id}


  • обновление


    PUT http://localhost:9000/{planet_id}


    Body:


    {
        "name": "Mercury",
        "type": "TerrestrialPlanet",
        "mean_radius": 2439.7,
        "satellites": null
    }

  • удаление


    DELETE http://localhost:9000/{planet_id}


  • получение изображения планеты


    GET http://localhost:9000/planets/{planet_id}/image


    Используйте этот метод для тестирования кэширования с помощью Redis



MongoDB документы хранятся в формате BSON.


Работа с Redis


Redis клиент создаётся следующим образом:


Создание Redis клиента


pub async fn create_client(redis_uri: String) -> Result<Client, RedisError> {
    Ok(Client::open(redis_uri)?)
}

Менеджер соединений Redis может быть создан так:


Получение менеджера соединений Redis


let redis_client = redis::create_client(redis_uri)
    .await
    .expect("Can't create Redis client");
let redis_connection_manager = redis_client
    .get_tokio_connection_manager()
    .await
    .expect("Can't create Redis connection manager");

Кэширование


Рассмотрим функцию сервисного слоя, использующуюся для получения планеты по id:


Получение планеты по id


pub async fn get_planet(&self, planet_id: &str) -> Result<Planet, CustomError> {
    let cache_key = self.get_planet_cache_key(planet_id);
    let mut con = self.redis_client.get_async_connection().await?;

    let cached_planet = con.get(&cache_key).await?;
    match cached_planet {
        Value::Nil => {
            debug!("Use database to retrieve a planet by id: {}", &planet_id);
            let result: Planet = self
                .mongodb_client
                .get_planet(ObjectId::from_str(planet_id)?)
                .await?;

            let _: () = redis::pipe()
                .atomic()
                .set(&cache_key, &result)
                .expire(&cache_key, 60)
                .query_async(&mut con)
                .await?;

            Ok(result)
        }
        Value::Data(val) => {
            debug!("Use cache to retrieve a planet by id: {}", planet_id);
            Ok(serde_json::from_slice(&val)?)
        }
        _ => Err(RedisError {
            message: "Unexpected response from Redis".to_string(),
        }),
    }
}

Если ключа нет в кэше (ветвь Nil), пара ключ-значение помещается в Redis с помощью функции set (с автоэкспирацией (её назначение будет показано далее)); во второй ветви выражения match кэшированное значение конвертируется в целевую структуру.


Для помещения значения в кэш вам нужно имплементировать трейт ToRedisArgs для структуры:


Имплементация трейта ToRedisArgs


impl ToRedisArgs for &Planet {
    fn write_redis_args<W>(&self, out: &mut W)
    where
        W: ?Sized + RedisWrite,
    {
        out.write_arg_fmt(serde_json::to_string(self).expect("Can't serialize Planet as string"))
    }
}

В функции get_planet используется асинхронное соединение Redis. Следующий пример демонстрирует другой подход, ConnectionManager, на примере очистки кэша с использованием функции del:


Пример очистки кэша


pub async fn update_planet(
    &self,
    planet_id: &str,
    planet: Planet,
) -> Result<Planet, CustomError> {
    let updated_planet = self
        .mongodb_client
        .update_planet(ObjectId::from_str(planet_id)?, planet)
        .await?;

    let cache_key = self.get_planet_cache_key(planet_id);
    self.redis_connection_manager.clone().del(cache_key).await?;

    Ok(updated_planet)
}

Есть вероятность, что что-то пойдёт не так после успешного обновления (или удаления) сущности; например, проблема с сетью, что приведёт к ошибке при вызове Redis, или выключение/перезапуск приложения так, что функция del не будет вызвана. Это может привести к некорректным данным в кэше; последствия этого могут быть уменьшены за счёт автоэкспирации кэшированных записей, указанной ранее.


ConnectionManager может быть клонирован. Он также используется во всех оставшихся примерах использования Redis вместо Redis клиента.


Кэш изображений может быть имплементирован так же, как и кэши других типов данных (используя функции set/get):


Кэширование изображений


pub async fn get_image_of_planet(&self, planet_id: &str) -> Result<Vec<u8>, CustomError> {
    let cache_key = self.get_image_cache_key(planet_id);
    let mut redis_connection_manager = self.redis_connection_manager.clone();

    let cached_image = redis_connection_manager.get(&cache_key).await?;
    match cached_image {
        Value::Nil => {
            debug!(
                "Use database to retrieve an image of a planet by id: {}",
                &planet_id
            );
            let planet = self
                .mongodb_client
                .get_planet(ObjectId::from_str(planet_id)?)
                .await?;
            let result = crate::db::get_image_of_planet(&planet.name).await;

            let _: () = redis::pipe()
                .atomic()
                .set(&cache_key, result.clone())
                .expire(&cache_key, 60)
                .query_async(&mut redis_connection_manager)
                .await?;

            Ok(result)
        }
        Value::Data(val) => {
            debug!(
                "Use cache to retrieve an image of a planet by id: {}",
                &planet_id
            );
            Ok(val)
        }
        _ => Err(RedisError {
            message: "Unexpected response from Redis".to_string(),
        }),
    }
}

Кэширование может быть протестировано с использованием REST API, описанного выше.


Ограничение количества HTTP запросов


Эта фича реализована в соответствии с официальным гайдом следующим образом:


Имплементация rate limiter


#[derive(Clone)]
pub struct RateLimitingService {
    redis_connection_manager: ConnectionManager,
}

impl RateLimitingService {
    pub fn new(redis_connection_manager: ConnectionManager) -> Self {
        RateLimitingService {
            redis_connection_manager,
        }
    }

    pub async fn assert_rate_limit_not_exceeded(&self, ip_addr: String) -> Result<(), CustomError> {
        let current_minute = Utc::now().minute();
        let rate_limit_key = format!("{}:{}:{}", RATE_LIMIT_KEY_PREFIX, ip_addr, current_minute);

        let (count,): (u64,) = redis::pipe()
            .atomic()
            .incr(&rate_limit_key, 1)
            .expire(&rate_limit_key, 60)
            .ignore()
            .query_async(&mut self.redis_connection_manager.clone())
            .await?;

        if count > MAX_REQUESTS_PER_MINUTE {
            Err(TooManyRequests {
                actual_count: count,
                permitted_count: MAX_REQUESTS_PER_MINUTE,
            })
        } else {
            Ok(())
        }
    }
}

Redis ключ создаётся на каждую минуту + IP адрес клиента. После каждого вызова функции assert_rate_limit_not_exceeded значение, соответствующее ключу, инкрементируется на 1. Чтобы хранилище не переполнилось из-за большого количества ранее созданных пар ключ-значение, ключ "экспайрится" через минуту.


Rate limiter может быть использован в Actix обработчике следующим образом:


Использование rate limiter


pub async fn get_planets(
    req: HttpRequest,
    web::Query(query_params): web::Query<GetPlanetsQueryParams>,
    rate_limit_service: web::Data<Arc<RateLimitingService>>,
    planet_service: web::Data<Arc<PlanetService>>,
) -> Result<HttpResponse, CustomError> {
    rate_limit_service
        .assert_rate_limit_not_exceeded(get_ip_addr(&req)?)
        .await?;

    let planets = planet_service.get_planets(query_params.r#type).await?;
    Ok(HttpResponse::Ok().json(planets.into_iter().map(PlanetDto::from).collect::<Vec<_>>()))
}

Если вызывать метод получения списка планет слишком часто, то будет получена следующая ошибка:


rate limiting


Нотификации


В этом проекте нотификации реализованы с помощью Redis Pub/Sub и Server-Sent Events для доставки сообщений пользователю.


При создании сущности публикуется событие:


Публикация события в Redis


pub async fn create_planet(&self, planet: Planet) -> Result<Planet, CustomError> {
    let planet = self.mongodb_client.create_planet(planet).await?;
    self.redis_connection_manager
        .clone()
        .publish(
            NEW_PLANETS_CHANNEL_NAME,
            serde_json::to_string(&PlanetMessage::from(&planet))?,
        )
        .await?;
    Ok(planet)
}

Подписка реализуется так:


Пример подписки в Redis


pub async fn get_new_planets_stream(
    &self,
) -> Result<Receiver<Result<Bytes, CustomError>>, CustomError> {
    let (tx, rx) = mpsc::channel::<Result<Bytes, CustomError>>(100);

    tx.send(Ok(Bytes::from("data: Connected\n\n")))
        .await
        .expect("Can't send a message to the stream");

    let mut pubsub_con = self
        .redis_client
        .get_async_connection()
        .await?
        .into_pubsub();
    pubsub_con.subscribe(NEW_PLANETS_CHANNEL_NAME).await?;

    tokio::spawn(async move {
        while let Some(msg) = pubsub_con.on_message().next().await {
            let payload = msg.get_payload().expect("Can't get payload of message");
            let payload: String = FromRedisValue::from_redis_value(&payload)
                .expect("Can't convert from Redis value");
            let msg = Bytes::from(format!("data: Planet created: {:?}\n\n", payload));
            tx.send(Ok(msg))
                .await
                .expect("Can't send a message to the stream");
        }
    });

    Ok(rx)
}

Подписка используется в Actix обработчике так:


Пример SSE handler


pub async fn sse(
    planet_service: web::Data<Arc<PlanetService>>,
) -> Result<HttpResponse, CustomError> {
    let new_planets_stream = planet_service.get_new_planets_stream().await?;
    let response_stream = tokio_stream::wrappers::ReceiverStream::new(new_planets_stream);

    Ok(HttpResponse::build(StatusCode::OK)
        .insert_header(header::ContentType(mime::TEXT_EVENT_STREAM))
        .streaming(response_stream))
}

Чтобы протестировать нотификации, вам нужно подписаться на события и сгенерировать событие. Далее приведены два подхода для этого; в обоих событие генерируется с использованием cURL:


  • подписка из браузера


    Перейдите на http://localhost:9000/, где находится HTML страница:


    sse notifications browser


  • подписка из командной строки с использованием cURL


    Используйте curl -X GET localhost:9000/events:


    sse notifications curl



Для генерации события используется следующий cURL запрос:


Запрос для тестирования нотификаций


curl -X POST -H 'Content-Type: application/json' -d '{
    \"name\": \"Pluto\",
    \"type\": \"DwarfPlanet\",
    \"mean_radius\": 1188,
    \"satellites\": null
}' localhost:9000/planets

Веб приложение


Некоторые аспекты этой темы были включены в предыдущие разделы, поэтому здесь будут освещены некоторые из оставшихся тем.


REST API обработчики


REST API обработчики определены так:


Определение REST API обработчиков


#[actix_web::main]
async fn main() -> std::io::Result<()> {
    ...

    let enable_write_handlers = env::var("ENABLE_WRITE_HANDLERS")
        .expect("ENABLE_WRITE_HANDLERS env var should be specified")
        .parse::<bool>()
        .expect("Can't parse ENABLE_WRITE_HANDLERS");

    HttpServer::new(move || {
        let mut app = App::new()
            .route("/planets", web::get().to(handlers::get_planets))
            .route("/planets/{planet_id}", web::get().to(handlers::get_planet))
            .route(
                "/planets/{planet_id}/image",
                web::get().to(handlers::get_image_of_planet),
            )
            .route("/events", web::get().to(handlers::sse))
            .route("/", web::get().to(handlers::index))
            .data(Arc::clone(&planet_service))
            .data(Arc::clone(&rate_limiting_service));

        if enable_write_handlers {
            app = app
                .route("/planets", web::post().to(handlers::create_planet))
                .route(
                    "/planets/{planet_id}",
                    web::put().to(handlers::update_planet),
                )
                .route(
                    "/planets/{planet_id}",
                    web::delete().to(handlers::delete_planet),
                );
        }

        app
    })
    .bind("0.0.0.0:9000")?
    .run()
    .await
}

Обработка ошибок


Обработка ошибок имплементирована в соответствии с документацией.


Запуск и тестирование


Локально проект может быть запущен двумя способами:


  • с использованием Docker Compose (docker-compose.yml):


    docker compose up (или docker-compose up в более старых версиях Docker)


  • без использования Docker


    Запустите приложение с помощью cargo run (в этом случае сервис mongodb-redis в docker-compose.yml должен быть отключён)



CI/CD


CI/CD сконфигурировано с помощью GitHub Actions workflow, который собирает Docker образ приложения и разворачивает его на Google Cloud Platform.


Для доступа к REST API развёрнутого приложения вы можете использовать один из доступных GET эндпоинтов, например:


GET http://demo.romankudryashov.com:9000/planets


Пишущие методы REST API недоступны на production среде.


Заключение


В этой статье я показал как начать работу с MongoDB и Redis и примеры их использования в Rust приложении. Не стесняйтесь написать мне, если нашли какие-либо ошибки в статье или исходном коде. Спасибо за внимание!


Полезные ссылки


Tags:
Hubs:
Total votes 12: ↑12 and ↓0+12
Comments13

Articles