hyper - это низкоуровневая HTTP-библиотека на Rust. Обычно она служит строительным блоком для более высокоуровневых библиотек, таких как axum, tonic, reqwest и других. Но иногда знание hyper могут требовать в вакансиях.
hyper предоставляет api как для клиента, так и для сервера, но в этой статье мы сосредоточимся исключительно на серверной части.
Первый "Hello, World!"
Создаём новый проект:
cargo new hyper-server
Добавим зависимости в Cargo.toml:
[dependencies] hyper = { version = "1.6.0", features = ["server", "http1"] } hyper-util = { version = "0.1", features = ["full"] } http-body-util = "0.1.3" tokio = { version = "1.44.2", features = ["rt-multi-thread", "macros"] }
Что делает каждая библиотека:
hyper- основная HTTP-библиотека, включаем фичи поддержки HTTP/1.1-сервераhyper-util- предоставляет утилиты и адаптеры поверхhyper, упрощающие его использованиеhttp-body-util- вспомогательная библиотека для работы с HTTP-bodytokio- асинхронный рантайм. С версии 1.0hyperбыл отвязан от рантаймаtokio, поэтому при желании можно использовать альтернативный рантайм
Необходимые импорты:
use hyper::{ Request, Response, body::{Bytes, Incoming}, server::conn::http1, service::service_fn, }; use hyper_util::{Full, rt::TokioIo}; use std::{convert::Infallible, net::SocketAddr}; use tokio::net::TcpListener;
Пояснения:
Request- представляет HTTP-запросResponse- представляет HTTP-ответBytes- тип для эффективного представления неизменяемых бинарных данныхIncoming- асинхронный поток байтов, представляющий тело HTTP-запроса, поступающее от клиентаhttp1- модуль для работы с HTTP/1.1-соединениямиservice_fn- функция адаптер, позволяющая создатьServiceиз обычной асинхронной функцииFull- вспомогательная обёртка для заранее известного соде��жимогоBodyTokioIo- адаптер, который оборачиваетtokio::net::TcpStreamв тип, совместимый сhyperInfallible- тип ошибки, которая никогда не происходитSocketAddr- структура, представляющая IP-адрес и портTcpListener- асинхронный TCP-сервер
Обработчик запроса:
async fn hello(_: Request<Incoming>) -> Result<Response<Full<Bytes>>, Infallible> { Ok(Response::new(Full::new(Bytes::from("Hello, World!")))) }
С первого взгляда обработчик может показаться достаточно громоздким:
_: Request<Incoming>- входящий HTTP-запрос от клиента. Используем нижнее подчёркивание, чтобы явно показать, что игнорируем содержимое запроса, так как ответ всегда один и тот жеResult<Response<Full<Bytes>>, Infallible>- тип возвращаемого значения обработчика:Response<Full<Bytes>>- HTTP-ответ с телом, представленным блоком байтInfallible- обозначает, что наш обработчик не может вернуть ошибку
Response::new(Full::new(Bytes::from("Hello, World!")))- оборачивает строку в байты и преобразует их в тело ответа
Запуск сервера:
#[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> { // IP-адрес и порт, на которых будет работать сервер let addr = SocketAddr::from(([0, 0, 0, 0], 3000)); // Создаём асинхронный TCP-сервер let listener = TcpListener::bind(addr).await?; // Бесконечный цикл: принимаем входящие соединения loop { // Ждём новое соединение от клиента let (stream, _) = listener.accept().await?; // Адаптер, который оборачивает `TcpStream` и делает его совместимым с hyper let io = TokioIo::new(stream); // Для каждого соединения создаётся отдельная асинхронная задача tokio::task::spawn(async move { // HTTP/1.1 сервер, привязанный к нашему обработчику `hello` if let Err(err) = http1::Builder::new() .serve_connection(io, service_fn(hello)) .await { // обрабатываем возможные ошибки eprintln!("Error serving connection: {:?}", err); } }); } }
Запускаем сервер:
cargo run
Проверяем работу сервера:
curl http://127.0.0.1:3000
В данном примере используется модуль http1. Если ван нужен http2, то вы можете посмотреть его реализацию в официальном примере.
Эхо сервер
Новые импорты:
use hyper::{ body::{Body, Frame}, Method, StatusCode } use http_body_util::{Empty, combinators::BoxBody, BodyExt};
Пояснения:
Body- трейт, описывающийHTTPтелоFrame- единица данных в теле HTTP-сообщения. Используется для представления либо части тела (Bytes), либо сигнала конца потокаMethod- перечисление всех возможных HTTP-методов (GET,POST,PUT,DELETEи т.д.)StatusCode- перечисление стандартных HTTP-статусов (200 OK,404 Not Found,500 Internal Server Errorи т.д.)Empty- вспомогательная обёртка для пустого содержимогоBodyBoxBody- обобщённый тип тела ответа. Позволяет вернуть из функции разные типыBodyBodyExt- набор удобных методов для работы с телом запроса
Вспомогательные функции для создания полных и пустых тел запросов:
/// Создаёт пустое тело ответа fn empty() -> BoxBody<Bytes, hyper::Error> { Empty::<Bytes>::new() .map_err(|never| match never {}) .boxed() } /// Оборачивает переданный chunk (например, `&str` или `Vec<u8>`) в тело ответа fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, hyper::Error> { Full::new(chunk.into()) .map_err(|never| match never {}) .boxed() }
Обработчики:
/// Возвращает клиенту тело запроса без изменений. fn echo(req: Request<Incoming>) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> { Ok(Response::new(req.into_body().boxed())) } /// Обработчик HTTP-запроса, который возвращает тело запроса, /// преобразованное в верхний ASCII-регистр async fn echo_uppercase(req: Request<Incoming>) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> { // Преобразуем поток тела запроса по частям let frame_stream = req.into_body().map_frame(|frame| { let frame = if let Ok(data) = frame.into_data() { // Преобразуем каждый байт в верхний ASCII-регистр data.iter() .map(|byte| byte.to_ascii_uppercase()) .collect::<Bytes>() } else { Bytes::new() }; Frame::data(frame) }); Ok(Response::new(frame_stream.boxed())) } /// Обработчик HTTP-запроса, который возвращает тело запроса в перевёрнутом виде async fn echo_reversed(req: Request<Incoming>) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> { // Защита от слишком больших тел let upper = req.body().size_hint().upper().unwrap_or(u64::MAX); // Если размер тела запроса превышает 64 КБ, возвращает ошибку `413 Payload Too Large` if upper > 1024 * 64 { let response = Response::builder() .status(StatusCode::PAYLOAD_TOO_LARGE) .body(full("Body too big")) .unwrap(); return Ok(response); } // Читаем всё тело целиком let whole_body = req.collect().await?.to_bytes(); // Разворачиваем байты в обратном порядке let reversed_body = whole_body.iter() .rev() .cloned() .collect::<Vec<u8>>(); Ok(Response::new(full(reversed_body))) } /// Возвращает ответ `404 Not Found` с пустым телом async fn not_found() -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> { let response = Response::builder() .status(StatusCode::NOT_FOUND) .body(empty()) .unwrap(); Ok(response) }
Как говорилось в начале статьи, hyper - это достаточно низкоуровневая библиотека, и в отличие от axum или actix-web, не предоставляет встроенную маршрутизацию. Поэтому её нужно реализовать вручную:
async fn router( req: Request<Incoming>, ) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> { // Сопоставляем HTTP-метод и путь match (req.method(), req.uri().path()) { (&Method::POST, "/echo") => echo(req), (&Method::POST, "/echo/uppercase") => echo_uppercase(req).await, (&Method::POST, "/echo/reversed") => echo_reversed(req).await, _ => not_found().await, } }
В функции main обновляем service_fn :
.serve_connection(io, service_fn(router))
Запускаем сервер и проверяем его работу:
curl -X POST http://127.0.0.1:3000/echo -d "hello world" curl -X POST http://127.0.0.1:3000/echo/uppercase -d "hello world" curl -X POST http://127.0.0.1:3000/echo/reversed -d "hello world"
Middleware
Добавим к текущему эхо серверу middleware, который будет логировать метод и путь каждого HTTP-запроса
Новая зависимость в Cargo.toml:
[dependencies] tower = "0.5.2" # предоставляет абстракции для middleware
Новые импорты:
use hyper::service::Service; use tower::ServiceBuilder;
Service- используется для подключения пользовательской логики к сетевому соединению (например,service_fn(router)создаёт имплементацию этого трейта)ServiceBuilder- позволяет подключить middleware к сервису
Пишем простой middleware для логирования запросов:
/// Простая обёртка, логирующая HTTP-запросы #[derive(Debug, Clone)] pub struct Logger<S> { /// Сервис, к которому делегируется обработка запроса inner: S, } impl<S> Logger<S> { /// Создание нового логера, оборачивающего другой сервис pub fn new(inner: S) -> Self { Logger { inner } } } type Req = Request<Incoming>; /// Реализация трейта `Service` для `Logger`, /// позволяющая использовать его как middleware impl<S> Service<Req> for Logger<S> where S: Service<Req>, // Внутренний сервис должен реализовывать трейт `Service` { // Тип ответа будет таким же, как у внутреннего сервиса type Response = S::Response; // Тип ошибки - тоже такой же type Error = S::Error; // Тип возвращаемого future (обработчика запроса) type Future = S::Future; /// Метод, вызываемый при каждом запросе fn call(&self, req: Req) -> Self::Future { // Логируем метод и путь запроса println!("processing request: {} {}", req.method(), req.uri().path()); // Передаём запрос дальше во внутренний сервис self.inner.call(req) } }
Обновляем обработчик в функции main:
tokio::task::spawn(async move { // Оборачиваем обработчик маршрутов в Service let svc = service_fn(router); // Добавляем наш middleware к обработчику let svc = ServiceBuilder::new().layer_fn(Logger::new).service(svc); // Ожидаем завершение соединения (или ошибку) if let Err(err) = http1::Builder::new().serve_connection(io, svc).await { eprintln!("server error: {}", err); } });
Теперь, при каждом запросе, будут выводиться записи вида:
processing request: POST /echo/reversed processing request: POST /echo
Для упрощения примера, запросы логируются с помощью println!. Если вам нужен полноценный логер, то ознакомьтесь с крейтом tracing
Gracefully Shutdown
Graceful shutdown - процесс, при котором сервер:
перестаёт принимать новые подключения
позволяет завершить текущие соединения
корректно освобождает ресурсы (файлы, БД, сокеты и т.п.)
Что нужно для реализации:
Сигнал завершения
Цикл, обрабатывающий входящие соединения
Наблюдатель, координирующий завершение соединений
Добавим в tokio новую фичу для обработки сигналов:
[dependencies] tokio = { version = "1.44.2", features = [..., "signal"] } # добавляем фичу для обработки сигналов
Обработка сигнала:
async fn shutdown_signal() { // Ожидание сигнала CTRL+C tokio::signal::ctrl_c() .await .expect("failed to install CTRL+C signal handler"); }
Обновляем функцию main для отслеживания сигнала отключения и подключаем отслеживание сигнала завершения для соединений:
#[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> { // IP-адрес и порт, где будет слушать сервер let addr = SocketAddr::from(([0, 0, 0, 0], 3000)); // Создаём асинхронный TCP-сервер let listener = TcpListener::bind(addr).await?; // Создаём объект для управления "graceful shutdown" (мягкое завершение) let graceful = hyper_util::server::graceful::GracefulShutdown::new(); // Подключаем сигнал завершения // После его получения начнётся завершение работы сервера let mut signal = std::pin::pin!(shutdown_signal()); // Запускаем цикл, чтобы постоянно принимать входящие соединения loop { tokio::select! { // Обработка новых соединений Ok((stream, _addr)) = listener.accept() => { // Адаптер, который оборачивает `TcpStream` и делает его совместимым с hyper let io = TokioIo::new(stream); // Оборачиваем обработчик маршрутов в Service let svc = service_fn(router); // Добавляем наш middleware к обработчику let svc = ServiceBuilder::new().layer_fn(Logger::new).service(svc); // Создаём соединение HTTP/1.1 let conn = http1::Builder::new().serve_connection(io, svc); // Подключаем отслеживание сигнала завершения для соединения let fut = graceful.watch(conn); // Для каждого соединения создаётся отдельная асинхронная задача tokio::task::spawn(async move { // Ожидаем завершение соединения (или ошибку) if let Err(err) = fut.await { eprintln!("server error: {}", err); } }); }, // Получен сигнал завершения _ = &mut signal => { // Закрываем listener, чтобы не принимать новые соединения drop(listener); eprintln!("graceful shutdown signal received"); // Прерываем основной цикл break; } } } // После выхода из цикла — ждём завершения всех активных соединений tokio::select! { // Успешное завершение всех соединений _ = graceful.shutdown() => { eprintln!("all connections gracefully closed"); }, // Если соединения не закрылись за 10 секунд — принудительно завершаем _ = tokio::time::sleep(std::time::Duration::from_secs(10)) => { eprintln!("timed out wait for all connections to close"); } } Ok(()) }
Теперь, при сигнале отключения, будет выводиться:
graceful shutdown signal received all connections gracefully closed
Собираем сервер в docker контейнер
Dockerfile
FROM rust:1.86-slim AS builder RUN apt-get update && apt-get install musl-tools -y && rustup target add x86_64-unknown-linux-musl WORKDIR /usr/src/app COPY Cargo.toml Cargo.lock ./ COPY src src RUN cargo build --target x86_64-unknown-linux-musl --release FROM scratch COPY --from=builder /usr/src/app/target/x86_64-unknown-linux-musl/release/hyper-server /usr/local/bin/hyper-server EXPOSE 3000 CMD ["hyper-server"]
Сборка контейнера:
docker build -t hyper-server .
Запуск контейнера:
docker run --rm -p 3000:3000 hyper-server
Проверяем работу сервера:
curl -X POST http://127.0.0.1:3000/echo -d "hello world"
Заключение
Если вы хотите продолжить изучение hyper, то ознакомитесь с официальной документацией и примерами.
Это моя первая статья на Хабре, если вы нашли какие-то ошибки или неточности, буду рад уточнениям в комментариях.
