Привет, Хабр!
Если бы мне сказали, что я однажды заменю привычный Python или Scala для работы с данными на Rust, я бы, пожалуй, ухмыльнулся и продолжил привычное дело. Но времена меняются, и Rust вполне уверенно пробивает себе дорогу в мир больших данных. Сегодня я расскажу вам о трех фреймворках, которые делают Rust конкурентом в обработке данных.
И первый фреймворк – Polars.
Polars
В отличие от pandas, где нагрузка на процессор и память может быть ощутима, Polars использует фичи Rust для параллельных вычислений и оптимизированного использования ресурсов.
Главные преимущества Polars:
Быстрая обработка больших данных: Polars написан на Rust и использует многопоточность.
Поддержка многомерных данных: Polars поддерживает работу с многомерными массивами.
Ленивая и физическая вычислительная модель: Polars поддерживает ленивые вычисления
Работа с различными форматами данных: Поддержка CSV, JSON, Parquet, IPC, Avro и других форматов данных.
Примеры использования
Чтение и обработка CSV-файла
Загрузим данные из CSV-файла, отфильтруем строки и выполним групповую агрегацию:
use polars::prelude::*;
use std::fs::File;
fn main() -> Result<()> {
// чтение CSV-файла
let file = File::open("data.csv")?;
let df = CsvReader::new(file)
.infer_schema(None)
.has_header(true)
.finish()?;
// фильтрация данных по условию
let filtered_df = df
.lazy()
.filter(col("column_name").gt(lit(10)))
.collect()?;
// группировка и агрегация
let result_df = filtered_df
.lazy()
.groupby([col("group_column")])
.agg([col("value_column").mean()])
.collect()?;
println!("{:?}", result_df);
Ok(())
}
Создание DataFrame и выполнение операций
Создадим DF и выполним операции сложения и фильтрации.
use polars::prelude::*;
fn main() -> Result<()> {
// создание DataFrame
let df = df![
"column1" => &[1, 2, 3, 4, 5],
"column2" => &[10, 20, 30, 40, 50]
]?;
// добавление нового столбца с результатом сложения
let df = df.lazy()
.with_column((col("column1") + col("column2")).alias("sum"))
.collect()?;
// фильтрация строк, где сумма больше 30
let filtered_df = df.lazy()
.filter(col("sum").gt(lit(30)))
.collect()?;
println!("{:?}", filtered_df);
Ok(())
}
Ленивые вычисления и работа с JSON
use polars::prelude::*;
fn main() -> Result<()> {
let json_data = r#"
[
{"name": "Alice", "age": 25},
{"name": "Bob", "age": 30},
{"name": "Charlie", "age": 35}
]
"#;
// чтение JSON данных
let df = JsonReader::new(json_data.as_bytes()).finish()?;
// ленивые вычисления: фильтрация и вычисление среднего возраста
let result = df.lazy()
.filter(col("age").gt(lit(25)))
.select([col("age").mean()])
.collect()?;
println!("{:?}", result);
Ok(())
}
Подробне с Polars можно ознакомиться здесь.
Arroyo
Arroyo — это распределенный движок потоковой обработки, ориентированный на stateful вычисления с поддержкой как ограниченных, так и неограниченных потоков данных.
Arroyo разработан с использованием модели Dataflow, что позволяет управлять состояниями потоков данных, делая возможными различные сложные вычисления, такие как оконные агрегации, join-операции и многое другое. Весь этот функционал так же реализован на основе Rust.
Примеры использования Arroyo
Реализуем базовую настройку для обработки потока событий и подсчет количества событий в окне времени:
use arroyo::pipeline::Pipeline;
use arroyo::window::TumblingWindow;
fn main() {
// инициализируем конвейер обработки данных
let mut pipeline = Pipeline::new();
// источник данных
let source = pipeline.add_source("source_name");
// применяем оконную функцию с временем окна в 5 минут
let windowed = source.window(TumblingWindow::minutes(5))
.count();
// выводим результат в консоль
pipeline.add_sink(windowed, |result| println!("Количество событий: {:?}", result));
// запуск конвейера
pipeline.run();
}
Теперь реализуем stateful обработку, где нужно будет отслеживать состояние между событиями, например, вычисление среднего значения на основе предыдущих данных:
use arroyo::state::StatefulOperator;
use arroyo::pipeline::Pipeline;
struct AverageState {
sum: f64,
count: u64,
}
impl StatefulOperator for AverageState {
type Input = f64;
type Output = f64;
fn process(&mut self, value: Self::Input) -> Option<Self::Output> {
self.sum += value;
self.count += 1;
Some(self.sum / self.count as f64)
}
}
fn main() {
let mut pipeline = Pipeline::new();
// инициализируем источник данных
let source = pipeline.add_source("numeric_data");
// применяем stateful операцию для вычисления среднего значения
let averaged = source.stateful_operator(AverageState { sum: 0.0, count: 0 });
// отправляем результат в консоль
pipeline.add_sink(averaged, |avg| println!("Среднее значение: {:?}", avg));
// запуск конвейера
pipeline.run();
}
Теперь рассмотрим как Arroyo может использоваться для более сложных задач, например объединение нескольких потоков данных и выполнение оконных агрегаций:
use arroyo::pipeline::Pipeline;
use arroyo::window::SlidingWindow;
fn main() {
let mut pipeline = Pipeline::new();
// инициализация двух источников данных
let source1 = pipeline.add_source("source1");
let source2 = pipeline.add_source("source2");
// оконные операции на двух потоках данных
let windowed1 = source1.window(SlidingWindow::minutes(10)).sum();
let windowed2 = source2.window(SlidingWindow::minutes(10)).sum();
// join двух потоков данных по ключу
let joined = windowed1.join(windowed2, |key1, key2| key1 == key2);
// обработка результата
pipeline.add_sink(joined, |result| println!("Join result: {:?}", result));
// запуск конвейера
pipeline.run();
}
Подробнее с Arryo можно ознакомиться здесь.
Timber
Timber — это простой логгер, разработанный для приложений, работающих в условиях многопоточности. Его основное назначение — упрощение процесса логирования в параллельных задачах. Timber позволяет вести логи как в стандартный вывод, так и в указанный файл.
Основные фичи Timber:
Timber поддерживает уровни логирования, которые можно настроить через макросы.
По умолчанию Timber выводит логи в stdout, но может быть легко перенастроен на запись в файл.
Пример использования Timber в приложении, которое может переключаться между режимами отладки и релиза, изменяя уровни логирования:
#[macro_use(timber)]
use timber;
#[cfg(debug)]
pub mod level {
pub const ERR: i32 = 1;
pub const DEB: i32 = 2;
pub const INF: i32 = 7;
}
#[cfg(not(debug))]
pub mod level {
pub const ERR: i32 = 1;
pub const DEB: i32 = 0;
pub const INF: i32 = 3;
}
// макросы для упрощения логирования
macro_rules! log_err{($($arg:tt)*) => {timber!($crate::level::ERR, "ERR", $($arg)*)}}
macro_rules! log_deb{($($arg:tt)*) => {timber!($crate::level::DEB, "DEB", $($arg)*)}}
macro_rules! log_inf{($($arg:tt)*) => {timber!($crate::level::INF, "INF", $($arg)*)}}
fn main() {
timber::init("log.txt").unwrap(); // инициализация логирования в файл
log_err!("Ошибка! Этот лог будет виден всегда.");
log_deb!("Отладка. Этот лог виден только в режиме отладки.");
log_inf!("Информация. Этот лог будет виден и в релизе, и в отладке.");
}
Можно определить константы уровней логов ERR
, DEB
, INF
, и компилятор будет игнорировать ненужные строки в релизной сборке.
Больше практических инструментов коллеги из OTUS рассматривают в рамках практических онлайн-курсов от экспертов рынка. Подробнее ознакомиться с курсами можно в каталоге.