Введение в CDRS, Apache Cassandra driver полностью написанный на Rust

CDRS (Apache Cassandra driver written in Rust) — это мой собственный open source проект, который я решился разрабатывать после того, как обнаружил, что в плане драйверов для Cassandra в Rust экосистеме образовался дефицит.


Конечно, я не скажу, что их совсем нет. Они есть, но одна часть это заброшенные в зачаточном состоянии Hello World пакеты, а вторая часть это, наверное, единственный binding к драйверу от DataStax, написанному на С++.


Что касается CDRS, то средствами Rust он полностью имплементирует спецификацию 4-й версии протокола.


cargo.toml


Чтобы включить драйвер в свой проект, как обычно, необходимо следующее.


Во-первых, добавить CDRS в секцию dependencies вашего cargo.toml файла:


[dependencies]
cdrs = "1.0.0-beta.1"

Это позволит использовать TCP соединение без шифрования.


Если вы намерены создавать SSL-шифрованное соединение со свое базой данных, то CDRS должен быть включен с фичей "ssl":


[dependencies]
openssl = "0.9.6"
[dependencies.cdrs]
version = "1.0.0-beta.1"
features = ["ssl"]

Во-вторых, добавить его в lib.rs


extern crate CDRS

Установка соединения


TCP соединение


Для установки не шифрованного соединения вам понадобятся следующие модули


use cdrs::client::CDRS;
use cdrs::authenticators::{NoneAuthenticator, PasswordAuthenticator};
use cdrs::transport::TransportPlain;

Если так случилось, что ваш кластер не требует авторизации паролем, то соединение может быть установлено следующим образом:


let authenticator = NoneAuthenticator;
let addr = "127.0.0.1:9042";
let tcp_transport = TransportPlain::new(addr).unwrap();

// pass authenticator and transport into CDRS' constructor
let client = CDRS::new(tcp_transport, authenticator);
use cdrs::compression;
// start session without compression
let mut session = try!(client.start(compression::None));

Для установки соединения, требующего авторизации паролем, вместо NoneAuthenticator нужно использовать PasswordAuthenticator:


let authenticator = PasswordAuthenticator::new("user", "pass");

TLS соединение


Установление TLS соединение во многом похоже на процесс, описанный в предыдущем шаге, за исключением того, что вам понадобится PEM сертификат для создания SSL транспорта.


use cdrs::client::CDRS;
use cdrs::authenticators::PasswordAuthenticator;
use cdrs::transport::TransportTls;
use openssl::ssl::{SslConnectorBuilder, SslMethod};
use std::path::Path;

let authenticator = PasswordAuthenticator::new("user", "pass");
let addr = "127.0.0.1:9042";

// here needs to be a path of your SSL certificate
let path = Path::new("./node0.cer.pem");
let mut ssl_connector_builder = SslConnectorBuilder::new(SslMethod::tls()).unwrap();
ssl_connector_builder.builder_mut().set_ca_file(path).unwrap();
let connector = ssl_connector_builder.build();

let ssl_transport = TransportTls::new(addr, &connector).unwrap();

// pass authenticator and SSL transport into CDRS' constructor
let client = CDRS::new(ssl_transport, authenticator);

Connection pool


Для более простого управления существующими соединениям CDRS содержит ConnectionManager, который по своей сути есть адаптор для r2d2.


use cdrs::connection_manager::ConnectionManager;
//...
let config = r2d2::Config::builder()
    .pool_size(3)
    .build();
let transport = TransportPlain::new(ADDR).unwrap();
let authenticator = PasswordAuthenticator::new(USER, PASS);
let manager = ConnectionManager::new(transport, authenticator, Compression::None);

let pool = r2d2::Pool::new(config, manager).unwrap();

for _ in 0..20 {
    let pool = pool.clone();
    thread::spawn(move || {
        let conn = pool.get().unwrap();
        // use the connection
        // it will be returned to the pool when it falls out of scope.
    });
}

Сжатие — lz4 и snappy


Чтобы использовать lz4 и snappy сжатие, достаточно передать в конструктор сессии желаемый декодер:


// session without compression
let mut session_res = client.start(compression::None);
// session with lz4 compression
let mut session_res = client.start(compression::Lz4);
// session with snappy compression
let mut session_res = client.start(compression::Snappy);

Далее CDRS самостоятельно сообщит кластеру, что он готов принимать информацию в сжатом виде с выбранным декодером. Дальнейшая распаковка будет проходить автоматически и не требует каких-либо дальнейших действий от разработчика.


Выполнение запросов


Выполнение запросов к Cassandra серверу проходит исключительно в рамках существующей сессии, после выбора методов авторизации, сжатия, а также типа транспорта.


Для выполнения того или иного запроса необходимо создать объект Query, который с первого взгляда может показаться несколько избыточным для простых запросов, посколько содержит множество параметров, которые, вероятно, используются не так часто.


По этой причине был создан builder, который упрощает процесс конфигурирования запроса. Например, для простого 'USE my_namespace;' достаточно просто


let create_query: Query = QueryBuilder::new("USE my_namespace;").finalize();
let with_tracing = false;
let with_warnings = false;

let switched = session.query(create_query, with_tracing, with_warnings).is_ok();

Создание новой таблицы


Чтобы создать новую таблицу в Cassandra кластере, как и раньше, необходимо вначале сконфигурировать Queryи после этого выполнить запрос:


use std::default::Default;
use cdrs::query::{Query, QueryBuilder};
use cdrs::consistency::Consistency;

let mut create_query: Query = QueryBuilder::new("CREATE TABLE keyspace.authors (
    id int,
    name text,
    messages list<text>,
    PRIMARY KEY (id)
    );")
    .consistency(Consistency::One)
    .finalize();
let with_tracing = false;
let with_warnings = false;

let table_created = session.query(create_query, with_tracing, with_warnings).is_ok();

Что касается самого CQL запроса создания новой таблицы, то за более полной информацией лучше обратиться к специализированным ресурсам, например DataStax.


SELECT запрос и маппинг результатов


Предположим, что в нашей базе данных существует таблица авторов, при чем каждый автор имеет список своих сообщений. Пусть эти сообщения хранятся внутри list-колонки. В терминах Rust автор должен иметь следующий вид:


struct Author {
    pub name: String,
    pub messages: Vec<String>
}

Сам запрос может быть выполнен через Session::query метод, как это было сделано в случае создания таблицы. Естественно, CQL должен быть в данном случае чем-то вроде 'SELECT * FROM keyspace.authors;'. Если таблица содержит данные о каких-то авторах, мы можем попытаться отобразить полученные данные в коллекцию Rust структур, типа 'Vec<Author>'


//...
use cdrs::error::{Result as CResult};
let res_body = parsed.get_body();
let rows = res_body.into_rows().unwrap();
let messages: Vec<Author> = rows
    .iter()
    .map(|row| {
        let name: String = row.get_by_name("name").unwrap();
        let messages: Vec<String> = row
            // unwrap Option<CResult<T>>, where T implements AsRust
            .get_by_name("messages").unwrap().unwrap()
            .as_rust().unwrap();
        return Author {
            author: name,
            text: messages
        };
    })
    .collect();

Во время отображения результатов следует обратить внимание на следующие трейты:


  1. IntoRustByName. Говоря простым языком, этот трейт применяется по отношению к сложным типам Cassandra таким, как row (которая, строго говоря не является отдельным типом, определенным в спецификации, но по своему внутреннему устройству может рассматриваться, как что-то близкое к User Defined Type) и UDT. Грубо говоря, get_by_name пытается отыскать "свойство" по его имени, и если находит, то возвращает результат преобразования этого свойства к Rust типу или к CDRS типам, таким как List, 'Map', UDT. Сами же эти типы есть отображение соответствующих типов данных определенных в спецификации.


  2. AsRust. Этот трейт предназначен для конечного отображения в Rust типы. Полный список имплиментаторов можно увидеть в приведенной ссылке.

Prepare & Execute


Иногда бывает удобным вначале единожды подготовить сложный запрос, а после этого выполнить его несколько раз с различными данными в разное время. Для этого прекрасно подходит Prepare & Execute.


// prepare just once
let insert_table_cql = " insert into user_keyspace.users (user_name, password, gender, session_token, state) values  (?, ?, ?, ?, ?)";

let prepared = session.prepare(insert_table_cql.to_string(), true, true)
    .unwrap()
    .get_body()
    .into_prepared()
    .unwrap();

// execute later and possible few times with different values
let v: Vec<Value> = vec![Value::new_normal(String::from("john").into_bytes()),
                             Value::new_normal(String::from("pwd").into_bytes()),
                             Value::new_normal(String::from("male").into_bytes()),
                             Value::new_normal(String::from("09000").into_bytes()),
                             Value::new_normal(String::from("FL").into_bytes())];
let execution_params = QueryParamsBuilder::new(Consistency::One).values(v).finalize();
// without tracing and warnings
let executed = session.execute(prepared.id, execution_params, false, false);

Также имеет смысл комбинировать Prepare & Batch для выполнения сразу нескольких подготовленных запросов. Простейший пример Batch также можно найти в примерах.


Cassandra events


Кроме всего вышеописанного, CDRS предоставляет возможность подписаться и следить за событиями, которые публикует сервер.


let (mut listener, stream) = session.listen_for(vec![SimpleServerEvent::SchemaChange]).unwrap();

thread::spawn(move || listener.start(&Compression::None).unwrap());

let topology_changes = stream
    // inspects all events in a stream
    .inspect(|event| println!("inspect event {:?}", event))
    // filter by event's type: topology changes
    .filter(|event| event == &SimpleServerEvent::TopologyChange)
    // filter by event's specific information: new node was added
    .filter(|event| {
        match event {
            &ServerEvent::TopologyChange(ref event) => {
                event.change_type == TopologyChangeType::NewNode
            },
            _ => false
        }
    });

println!("Start listen for server events");

for change in topology_changes {
    println!("server event {:?}", change);
}

Чтобы найти полный список событий лучше всего обратиться в саму спецификацию, а также к документации драйвера.


В будущем есть планы использовать события для "умного" load balancing.


Полезные ссылки


  • +33
  • 4.6k
  • 9
Share post

Similar posts

AdBlock has stolen the banner, but banners are not teeth — they will be back

More
Ads

Comments 9

    +2
    Чистая реализация во многих случаях оказывается удобнее сторонних биндингов, хорошее дело делаете.

    Я делаю примерно тоже самое для PHP (ровно по тем же самым причинам — драйвера есть, но подходящего нет): https://github.com/Tatikoma/react-cassandra, но вы дальше меня продвинулись…
      0
      Я не видел содержания самих бенчмарков, но один из контрибьюторов приводил сравнение скорости работы по сравнению с биндингом. Вселяет оптимизм, но опять-таки без самих бенчмарков о чем-то определенном говорить пока сложно.

      unning 2 tests
      test bench_c_driver ... bench: 15,921,280 ns/iter (+/- 9,756,013)
      test bench_rust_driver ... bench: 848,061 ns/iter (+/- 505,422)

        0
        если это «настоящие» цифры, то очевидно есть куда стремится. Будем надеяться на лучшее.
          +1
          я один раз решил сранвить как быстро либа mysql на расте работает сравнительно с php. выиграл php с огромным отрывом. Написал разработчику — оказалось что он не что-то не буферизовал, и это сильно тормозило. Может быть вам тоже стоит покапать в этом направлении? (если это возможно)
            +1
            Конечно. В этом есть смысл, поскольку, один из важных критериев, почему некоторые выбирают рас это быстродействие. Сейчас как раз все усилия направлены на написание тестов и попутную ревизию излишних аллокаций памяти (которые, сам знаю, там присутствуют). Надеюсь в ближайшее время, когда появятся первые бенчмарки, можно говорить более конкретно.
        +1
        Нужны тесты, а вообще идея очень хорошая. Rust как никогда к месту, безопасный, быстрый.
          0
          Собственно, именно к написанию тестов сейчас прилагается больше всего усилий.
            0
            и эти тесты тоже (все-таки критически важная часть любого проекта), однако я имел ввиду тесты скорости записи/занимаемой памяти/отклика по сравнению с java реализацией, интересно было бы посмотреть какой выигрыш.
              0
              В этом плане, безусловно.

              К сожадению, на этот счет пока никаких конкретных замеров мною не производилось, потому ничего определенно сейчас сказать не могу.

        Only users with full accounts can post comments. Log in, please.