Как стать автором
Обновить

Комментарии 21

А на кой вам вообще понадобилось создавать новый рантайм tokio, и чем не устроил тот, в котором выполняется main?


Если что, actix_rt::spawn — это синоним для tokio::task::spawn_local...

Наверное, потому что:

tokio::task::spawn_local запускает асинхронную задачу в текущем локальном контексте выполнения, который обычно ограничен одним потоком выполнения (thread). Это означает, что задача будет выполняться в пределах текущего потока выполнения без возможности переключения на другие потоки.

Так про tokio::task::spawn_local я написал просто для демонстрации связи между actix и tokio. Запускать задачу можно при помощи обычного tokio::spawn.


Зачем для запуска tokio::spawn создавать новый рантайм?

Да, можно. Это упрощает пример на 1 строчку кода. Для этого примера создавать новый runtime - избыточно. Лучшее, враг хорошего...

Зачем тут вообще два или более потока?

Уважаемый комментатор,

Пример кода, приведенный в статье, является упрощенным и синтетическим. Это сделано с целью воспроизведения проблемы и наглядной демонстрации ее решения. Сложности возникают, когда весь этот код работает в разных потоках.

Буду признателен, если Вы сможете переписать данный пример кода таким образом, чтобы продемонстрировать возникающую проблему и предложить оптимальный вариант ее решения. Это позволит читателям лучше понять суть обсуждаемого вопроса. Заранее благодарю за помощь в улучшении качества статьи.

Теперь по поводу вашего решения. При подобном использовании канала теряется возможность отмены задачи, если внешняя задача будет отменена — клиент продолжит выполнять запрос.


Документация на oneshot рекомендует делать как-то так:


    tokio::task::spawn_local(async move {
        tokio::select! {
            _ = sender.closed() => { }
            value = client.get(url).send().await.unwrap().body() => {
                let _ = sender.send(value);
            }
        }
    });

Кстати, почему бы не оформить это в виде универсальной функции?


pub fn send_future<F>(future: F)
    -> impl Future<Output = <F as Future>::Output> + Send
where 
    F: Future + 'static,
    <F as Future>::Output: Send
{
    let (mut tx, rx) = tokio::sync::oneshot::channel();
    tokio::task::spawn_local(async move {
        tokio::select! {
            _ = tx.closed() => { }
            value = future => {
                let _ = tx.send(value);
            }
        };
    });

    async move {
        rx.await.unwrap()
    }
}

// …

tokio::spawn(async move {
    send_future(async move {
        let client = Client::new();
        let url = "http://127.0.0.1:8080/hello";
        client.get(url).send().await?.body().await?;
    }).await;
});

LGTM

Так, я тут подумал и понял что все эти каналы нафиг не нужны.


Функция tokio::task::spawn_local, она же actix_rt::spawn, уже делает всё что нужно! Проверять уже лень, но вот так должно сработать:


    let http_task = actix_rt::spawn(async move {
        let client = Client::new();
        let url = "http://127.0.0.1:8080/hello";
        client.get(url).send().await?.body().await?;
    });
    tokio::spawn(async move {
        http_task.await
    });

И даже вот так:


tokio::spawn(async move {
    actix_rt::spawn(async move {
        let client = Client::new();
        let url = "http://127.0.0.1:8080/hello";
        client.get(url).send().await?.body().await?;
    }).await;
});

Хотя, в отличии от send_future из моего комментария выше, тут снова будет проблема с отменой...

Не проканало :)

error[E0277]: `Rc<(dyn actix_web::dev::Service<ConnectRequest, Future = Pin<Box<(dyn std::future::Future<Output = Result<ConnectResponse, SendRequestError>> + 'static)>>, Response = ConnectResponse, Error = SendRequestError> + 'static)>` cannot be sent between threads safely
   --> src\main.rs:17:26
    |
17  |       let r = tokio::spawn(async move {
    |  _____________------------_^
    | |             |
    | |             required by a bound introduced by this call
18  | |         actix_rt::spawn(async move {
19  | |             let client = Client::new();
20  | |             let url = "http://127.0.0.1:8080/hello";
21  | |             client.get(url).send().await.unwrap().body().await.unwrap()
22  | |         }).await
23  | |     }).await;
    | |     ^
    | |     |
    | |_____`Rc<(dyn actix_web::dev::Service<ConnectRequest, Future = Pin<Box<(dyn std::future::Future<Output = Result<ConnectResponse, SendRequestError>> + 'static)>>, Response = ConnectResponse, Error = SendRequestError> + 'static)>` cannot be sent between threads safely
    |       within this `[async block@src\main.rs:17:26: 23:6]`

Вот так точно проканать должно:


pub fn send_future<F>(future: F)
    -> impl Future<Output = Result<<F as Future>::Output, JoinError>> + Send
where 
    F: Future + 'static,
    <F as Future>::Output: Send
{
    SentFuture(spawn_local(future))
}

pub struct SentFuture<T>(JoinHandle<T>);

impl<T> Future for SentFuture<T> {
    type Output = <JoinHandle<T> as Future>::Output;

    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
        let inner = Pin::into_inner(self);
        let inner = Pin::new(&mut inner.0);
        Future::poll(inner, ctx)
    }
}

impl<T> Drop for SentFuture<T> {
    fn drop(&mut self) {
        self.0.abort();
    }
}

Работает! Респект! Вот итоговый код

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use actix_web::{web, App, HttpResponse, HttpServer};
use awc::Client;
use tokio::task::{JoinError, JoinHandle, spawn_local};

#[actix_rt::main]
async fn main()  {
    actix_rt::spawn(async {
        HttpServer::new(|| {
            App::new()
                .service(web::resource("/hello").route(web::get().to(ok)))
        })
            .bind("127.0.0.1:8080")?
            .run()
            .await
    });
    let res = send_future(
        async move {
                let client = Client::new();
                let url = "http://127.0.0.1:8080/hello";
                client.get(url).send().await.unwrap().body().await.unwrap()
            }

    ).await.unwrap();
    println!("Response: {:?}", res);
}
async fn ok() -> HttpResponse {
    HttpResponse::Ok()
        .content_type("text/html; charset=utf-8")
        .body("OK")
}


pub fn send_future<F>(future: F)
                      -> impl Future<Output = Result<<F as Future>::Output, JoinError>> + Send
    where
        F: Future + 'static,
        <F as Future>::Output: Send
{
    SentFuture(spawn_local(future))
}

pub struct SentFuture<T>(JoinHandle<T>);

impl<T> Future for SentFuture<T> {
    type Output = <JoinHandle<T> as Future>::Output;

    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
        let inner = Pin::into_inner(self);
        let inner = Pin::new(&mut inner.0);
        Future::poll(inner, ctx)
    }
}

impl<T> Drop for SentFuture<T> {
    fn drop(&mut self) {
        self.0.abort();
    }
}

по факту получается не городим каналы а просто структура SentFuture используется для обёртывания (экранирования) асинхронной задачи, представленной JoinHandle, и предоставляет интерфейс, который позволяет использовать эту задачу как будущее (Future). Она не добавляет новую функциональность к будущему, а просто оборачивает JoinHandle для интеграции с асинхронным кодом и предоставляет совместимость с Future. Я все верно понял?

Не совсем так, вся интеграция с асинхронным кодом у JoinHandle уже есть. Обёртка SentFuture тут нужна исключительно чтобы вызвать `abort()` в деструкторе.

#[actix_rt::main]
async fn main() {
    actix_rt::spawn(async {
        HttpServer::new(|| App::new().service(web::resource("/hello").route(web::get().to(ok))))
            .bind("127.0.0.1:8080")?
            .run()
            .await
    });
    let r = actix_rt::spawn(async move {
        //  ^^^^^^^^^^^^^^^ вместо tokio::spawn
        let client = Client::new();
        let url = "http://127.0.0.1:8080/hello";
        client.get(url).send().await.unwrap().body().await.unwrap()
    })
    .await
    .unwrap();
    println!("{:?}", r);
}

Проверил, отлично работает. tokio тут в зависимости вообще не нужен. У автора кривые руки, лучше бы на Го писал

Уважаемый комментатор, благодарю Вас за предложенный вариант кода и замечания.

Вы правы, что в Вашем коде создание задачи происходит в текущей thread. Однако если изменить его, создав задачу в новой thread, то проблема опять проявится - мы получим ту же ошибку "future cannot be sent between threads safely".

Цель моей статьи - показать типичную проблему при взаимодействии библиотек Tokio и Actix, когда из Tokio нужно вызвать код Actix. Ваш код не демонстрирует эту проблему.

Пример кода в статье максимально упрощен, чтобы наглядно воспроизвести ошибку для читателей. Конечно, в реальном приложении код будет сложнее. Но суть проблемы остается той же.

Я ценю, что Вы уделили время, чтобы предложить альтернативный подход. Но в данном случае он не решает рассматриваемую в статье проблему взаимодействия Tokio и Actix. Еще раз спасибо за отзыв, обсуждение таких нюансов всегда полезно для углубления понимания темы.


А вас не смутило что этот код приведён в статье как пример работающего, но делающего не то что нужно?

Уважаемый, благодарю Вас за комментарий. К сожалению, я не совсем понял Вашу мысль о том, что приведенный мною пример кода работает неправильно.

Я представил работающий фрагмент кода для иллюстрации определенной концепции. Затем я показал, как этот же код может сломаться при использовании в многопоточной среде и не скомпилируется, что демонстрирует важность учета многопоточности.

Если я неверно истолковал Ваш комментарий, прошу прощения. Буду благодарен, если Вы уточните свою мысль, чтобы я мог лучше понять Вашу позицию и внести необходимые исправления в статью. Мне важно представить информацию максимально корректно для читателей.

Я к тому, что если бы actix_rt::spawn делал ровно то что вам нужно — статьи бы не было

actix_rt::spawn запускает задачу в текущем потоке (thread). Например:

use actix_rt::{spawn, Arbiter}; 

fn main() {
  spawn(|| {
    println!("Running in the main thread"); 
  });
}

Здесь мы запускаем задачу println в основном потоке приложения.

Если нужен новый поток, используем Arbiter:

let arbiter = Arbiter::new();

arbiter.spawn(|| {
  println!("Running in a separate thread");
});

Arbiter создает пул потоков и позволяет запускать задачи в отдельных потоках из этого пула.

Таким образом, actix_rt::spawn для текущего потока, а Arbiter - для создания новых потоков.

Другими словами actix_rt::spawn не делает то что не нужно. А если нужно несколько thread то нужно использовать иной механизм.

Похоже у вас проблемы с пониманием что такое thread... В моей статье как раз и разбирается кейс когда не 1 thread, а несколько, начинаются проблемы...

Похоже у вас проблемы с пониманием что такое thread…

Похоже, у вас проблемы с чтением комментариев...

Зарегистрируйтесь на Хабре, чтобы оставить комментарий

Публикации

Истории