Комментарии 21
А на кой вам вообще понадобилось создавать новый рантайм tokio, и чем не устроил тот, в котором выполняется main?
Если что, actix_rt::spawn — это синоним для tokio::task::spawn_local...
Наверное, потому что:
tokio::task::spawn_local
запускает асинхронную задачу в текущем локальном контексте выполнения, который обычно ограничен одним потоком выполнения (thread). Это означает, что задача будет выполняться в пределах текущего потока выполнения без возможности переключения на другие потоки.
Так про tokio::task::spawn_local я написал просто для демонстрации связи между actix и tokio. Запускать задачу можно при помощи обычного tokio::spawn.
Зачем для запуска tokio::spawn создавать новый рантайм?
Зачем тут вообще два или более потока?
Уважаемый комментатор,
Пример кода, приведенный в статье, является упрощенным и синтетическим. Это сделано с целью воспроизведения проблемы и наглядной демонстрации ее решения. Сложности возникают, когда весь этот код работает в разных потоках.
Буду признателен, если Вы сможете переписать данный пример кода таким образом, чтобы продемонстрировать возникающую проблему и предложить оптимальный вариант ее решения. Это позволит читателям лучше понять суть обсуждаемого вопроса. Заранее благодарю за помощь в улучшении качества статьи.
Теперь по поводу вашего решения. При подобном использовании канала теряется возможность отмены задачи, если внешняя задача будет отменена — клиент продолжит выполнять запрос.
Документация на oneshot рекомендует делать как-то так:
tokio::task::spawn_local(async move {
tokio::select! {
_ = sender.closed() => { }
value = client.get(url).send().await.unwrap().body() => {
let _ = sender.send(value);
}
}
});
Кстати, почему бы не оформить это в виде универсальной функции?
pub fn send_future<F>(future: F)
-> impl Future<Output = <F as Future>::Output> + Send
where
F: Future + 'static,
<F as Future>::Output: Send
{
let (mut tx, rx) = tokio::sync::oneshot::channel();
tokio::task::spawn_local(async move {
tokio::select! {
_ = tx.closed() => { }
value = future => {
let _ = tx.send(value);
}
};
});
async move {
rx.await.unwrap()
}
}
// …
tokio::spawn(async move {
send_future(async move {
let client = Client::new();
let url = "http://127.0.0.1:8080/hello";
client.get(url).send().await?.body().await?;
}).await;
});
Так, я тут подумал и понял что все эти каналы нафиг не нужны.
Функция tokio::task::spawn_local, она же actix_rt::spawn, уже делает всё что нужно! Проверять уже лень, но вот так должно сработать:
let http_task = actix_rt::spawn(async move {
let client = Client::new();
let url = "http://127.0.0.1:8080/hello";
client.get(url).send().await?.body().await?;
});
tokio::spawn(async move {
http_task.await
});
И даже вот так:
tokio::spawn(async move {
actix_rt::spawn(async move {
let client = Client::new();
let url = "http://127.0.0.1:8080/hello";
client.get(url).send().await?.body().await?;
}).await;
});
Хотя, в отличии от send_future из моего комментария выше, тут снова будет проблема с отменой...
Не проканало :)
error[E0277]: `Rc<(dyn actix_web::dev::Service<ConnectRequest, Future = Pin<Box<(dyn std::future::Future<Output = Result<ConnectResponse, SendRequestError>> + 'static)>>, Response = ConnectResponse, Error = SendRequestError> + 'static)>` cannot be sent between threads safely
--> src\main.rs:17:26
|
17 | let r = tokio::spawn(async move {
| _____________------------_^
| | |
| | required by a bound introduced by this call
18 | | actix_rt::spawn(async move {
19 | | let client = Client::new();
20 | | let url = "http://127.0.0.1:8080/hello";
21 | | client.get(url).send().await.unwrap().body().await.unwrap()
22 | | }).await
23 | | }).await;
| | ^
| | |
| |_____`Rc<(dyn actix_web::dev::Service<ConnectRequest, Future = Pin<Box<(dyn std::future::Future<Output = Result<ConnectResponse, SendRequestError>> + 'static)>>, Response = ConnectResponse, Error = SendRequestError> + 'static)>` cannot be sent between threads safely
| within this `[async block@src\main.rs:17:26: 23:6]`
Вот так точно проканать должно:
pub fn send_future<F>(future: F)
-> impl Future<Output = Result<<F as Future>::Output, JoinError>> + Send
where
F: Future + 'static,
<F as Future>::Output: Send
{
SentFuture(spawn_local(future))
}
pub struct SentFuture<T>(JoinHandle<T>);
impl<T> Future for SentFuture<T> {
type Output = <JoinHandle<T> as Future>::Output;
fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
let inner = Pin::into_inner(self);
let inner = Pin::new(&mut inner.0);
Future::poll(inner, ctx)
}
}
impl<T> Drop for SentFuture<T> {
fn drop(&mut self) {
self.0.abort();
}
}
Работает! Респект! Вот итоговый код
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use actix_web::{web, App, HttpResponse, HttpServer};
use awc::Client;
use tokio::task::{JoinError, JoinHandle, spawn_local};
#[actix_rt::main]
async fn main() {
actix_rt::spawn(async {
HttpServer::new(|| {
App::new()
.service(web::resource("/hello").route(web::get().to(ok)))
})
.bind("127.0.0.1:8080")?
.run()
.await
});
let res = send_future(
async move {
let client = Client::new();
let url = "http://127.0.0.1:8080/hello";
client.get(url).send().await.unwrap().body().await.unwrap()
}
).await.unwrap();
println!("Response: {:?}", res);
}
async fn ok() -> HttpResponse {
HttpResponse::Ok()
.content_type("text/html; charset=utf-8")
.body("OK")
}
pub fn send_future<F>(future: F)
-> impl Future<Output = Result<<F as Future>::Output, JoinError>> + Send
where
F: Future + 'static,
<F as Future>::Output: Send
{
SentFuture(spawn_local(future))
}
pub struct SentFuture<T>(JoinHandle<T>);
impl<T> Future for SentFuture<T> {
type Output = <JoinHandle<T> as Future>::Output;
fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
let inner = Pin::into_inner(self);
let inner = Pin::new(&mut inner.0);
Future::poll(inner, ctx)
}
}
impl<T> Drop for SentFuture<T> {
fn drop(&mut self) {
self.0.abort();
}
}
по факту получается не городим каналы а просто структура SentFuture
используется для обёртывания (экранирования) асинхронной задачи, представленной JoinHandle
, и предоставляет интерфейс, который позволяет использовать эту задачу как будущее (Future
). Она не добавляет новую функциональность к будущему, а просто оборачивает JoinHandle
для интеграции с асинхронным кодом и предоставляет совместимость с Future
. Я все верно понял?
#[actix_rt::main]
async fn main() {
actix_rt::spawn(async {
HttpServer::new(|| App::new().service(web::resource("/hello").route(web::get().to(ok))))
.bind("127.0.0.1:8080")?
.run()
.await
});
let r = actix_rt::spawn(async move {
// ^^^^^^^^^^^^^^^ вместо tokio::spawn
let client = Client::new();
let url = "http://127.0.0.1:8080/hello";
client.get(url).send().await.unwrap().body().await.unwrap()
})
.await
.unwrap();
println!("{:?}", r);
}
Проверил, отлично работает. tokio тут в зависимости вообще не нужен. У автора кривые руки, лучше бы на Го писал
Уважаемый комментатор, благодарю Вас за предложенный вариант кода и замечания.
Вы правы, что в Вашем коде создание задачи происходит в текущей thread. Однако если изменить его, создав задачу в новой thread, то проблема опять проявится - мы получим ту же ошибку "future cannot be sent between threads safely".
Цель моей статьи - показать типичную проблему при взаимодействии библиотек Tokio и Actix, когда из Tokio нужно вызвать код Actix. Ваш код не демонстрирует эту проблему.
Пример кода в статье максимально упрощен, чтобы наглядно воспроизвести ошибку для читателей. Конечно, в реальном приложении код будет сложнее. Но суть проблемы остается той же.
Я ценю, что Вы уделили время, чтобы предложить альтернативный подход. Но в данном случае он не решает рассматриваемую в статье проблему взаимодействия Tokio и Actix. Еще раз спасибо за отзыв, обсуждение таких нюансов всегда полезно для углубления понимания темы.
А вас не смутило что этот код приведён в статье как пример работающего, но делающего не то что нужно?
Уважаемый, благодарю Вас за комментарий. К сожалению, я не совсем понял Вашу мысль о том, что приведенный мною пример кода работает неправильно.
Я представил работающий фрагмент кода для иллюстрации определенной концепции. Затем я показал, как этот же код может сломаться при использовании в многопоточной среде и не скомпилируется, что демонстрирует важность учета многопоточности.
Если я неверно истолковал Ваш комментарий, прошу прощения. Буду благодарен, если Вы уточните свою мысль, чтобы я мог лучше понять Вашу позицию и внести необходимые исправления в статью. Мне важно представить информацию максимально корректно для читателей.
Я к тому, что если бы actix_rt::spawn делал ровно то что вам нужно — статьи бы не было
actix_rt::spawn запускает задачу в текущем потоке (thread). Например:
use actix_rt::{spawn, Arbiter};
fn main() {
spawn(|| {
println!("Running in the main thread");
});
}
Здесь мы запускаем задачу println в основном потоке приложения.
Если нужен новый поток, используем Arbiter:
let arbiter = Arbiter::new();
arbiter.spawn(|| {
println!("Running in a separate thread");
});
Arbiter создает пул потоков и позволяет запускать задачи в отдельных потоках из этого пула.
Таким образом, actix_rt::spawn для текущего потока, а Arbiter - для создания новых потоков.
Другими словами actix_rt::spawn не делает то что не нужно. А если нужно несколько thread то нужно использовать иной механизм.
Похоже у вас проблемы с пониманием что такое thread... В моей статье как раз и разбирается кейс когда не 1 thread, а несколько, начинаются проблемы...
Проблема «error: future cannot be sent between threads safely» при использовании Rust-библиотек Tokio и Actix