Фото: James P. Blair/National Geographic Creative
Вы когда-нибудь сталкивались со следующей проблемой в rust, когда использовали std::sync::Mutex
в асинхронном коде?
7 | tokio::spawn(/* some future here */);
| ^^^^^^^^^^^^ future returned by `fut` is not `Send`
|
127 | T: Future + Send + 'static,
| ---- required by this bound in `tokio::task::spawn::spawn`
|
Как указывает ошибка, это происходит из-за нереализованного типажа Send
для переданной футуры. В мире async-await это происходит из-за использования !Send
объекта между вызовами await (далее await-точки). В частности, один из подобных интересных случаев — это блокировка мьютекса из стандартной библиотеки.
Как только Mutex
заблокирован, можно работать с вложенным объектом с помощью обертки MutexGuard
. Это тот самый тип, который реализует !Send
и является причиной указанной ошибки. Мотивация в реализации этого типажа прямо следует из спецификаций соответствующих вызовов операционных систем (к примеру pthread_mutex_unlock и ReleaseMutex).
Давайте попробуем решить эту ошибку со следующим (неработающим) примером:
use std::sync::Mutex;
#[tokio::main]
async fn main() {
let counter = Mutex::new(0);
tokio::spawn(async move {
let _guard = counter.lock().unwrap();
async {}.await;
});
}
Этот достаточно упрощенный пример (мьютекс без Arc
в большинстве случаев бесполезен) и еще вдобавок переусложнен (к примеру вместо числового типа за мьютексом может быть использован атомарный тип), но для иллюстрации сгодится. Далее примеры будут использовать tokio 2.0, по большей части из-за того что я просто привык к нему. Тем не менее, очень похожий код можно получить для futures 3.0, async-std, smol или возможно другого асинхронного рантайма, если не указано иначе.
Закрепление задачи за потоком
Поскольку Send
— требование для многопоточности, один из способов убрать ошибку компиляции — это закрепить задачу (прим.пер: task) за одним потоком, либо же воспользоваться специализированным однопоточным асинхронным рантаймом, к примеру futures::executor::LocalPool. В таком случае экзекьютор будет выполнять задачу только на том же самом потоке, где она была порождена. Тогда код будет выглядеть следующим образом:
// ...
let local = tokio::task::LocalSet::new();
local.spawn_local(async move {
let _guard = counter.lock().unwrap();
async {}.await;
});
// Need to be explicitly awaited unlike `tokio::spawn`
local.await;
Этот код, впрочем, имеет семантические отличия от от исходного. Если текущий поток перегружен, он не может выгрузить задачи на другие потоки, в крайних случаях это может привести к голоданию ядер процессора. Более того, это скорее обходной путь, нежели полноценное решение, которое вдобавок может привести к куда более серьезным проблемам. Как вы, возможно, знаете, rust не защищает от взаимных блокировок, и это один и из тех способов, где их очень легко получить.
Допустим другая задача также работает с тем же мьютексом и await-точка является ожиданием этой задачи. После переключение контекста, вторая задача будет заблокирована, что и приведет к состоянию взаимной блокировки. Так что в общем случае использование общего состояния вместе с !Send
-порождением задач — сомнительный подход в большинстве случаев. В однопоточном случае, как правило, достаточно RefCell
, в ином случае же придется воспользоваться другим решением.
Ограничение времени жизни
Другая уловка, нежели общее решение, является подход с пересмотром времени жизни MutexGuard
. В примере, вызов drop
происходил в конце области видимости, т.е. после await
. Поскольку переключение между задачами может происходить только в await-точках, все еще возможно использовать !Send
объекты в этих рамках. Конкретно для этого примера, будет работать ограничение жизни для MutexGuard
прямо перед вызовом асинхронной функции:
// ...
tokio::spawn(async move {
// For one-liner it looks better with omitted variable declaration.
let guard = counter.lock().unwrap();
// ..
drop(guard);
async {}.await;
});
На практике, указанный подход работает для многих случаев, тем не менее не для всех. К примеру, критическая секция может быть использована для нескольких операций для обеспечения транзакционности. В таком случае операционная система не может управлять мьютексом, а значит быть использован из стандартной библиотеке, вместо этого он должен быть реализован на основе среды выполнения асинхронного экзекьютора.
Асинхронный мьютекс
Последнее, и наконец общее, решение — это асинхронный мьютекс. Хочу отметить, что smol не представляет данного примитива, но он может быть реализован самостоятельно либо использован через сторонний крейт (например async_mutex). Вот полный пример с асинхронным мьютексом из токио:
use tokio::sync::Mutex;
#[tokio::main]
async fn main() {
let counter = Mutex::new(0);
tokio::spawn(async move {
let _guard = counter.lock().await;
async {}.await;
});
}
Как можно заметить, помимо выделения await
части из метода lock()
, убран unwrap
. Для стандартного мьютекса блокировка возвращает Result
, который несет информацию о возникшей панике на предыдущих блокировках мьютекса в других потоках. В асинхронном мьютексе блокировка — это футура, разрешающаяся в тип аналогичный MutexGuard
. А значит в асинхронном случае информация о панике теряется, просто закрывая критическую секцию при раскрутке стэка. С одной стороны это достаточно удобно, поскольку не возникает взаимных блокировок, с другой достаточно просто получить неконсистентное состояние синхронизируемого объекта.
Другая проблема с асинхронным мьютексом — это поведение при взаимных блокировках. Как я уже упоминал, rust не защищает от них и само собой асинхронный подход не исключение. Однако достаточно важный момент, что отладка в последнем случае существенно сложнее. Взаимная блокировка асинхронного мьютекса блокирует задачу, а не поток. Так что при адекватной нагрузке на процессор можно не заметить заблокированные задачи. При взаимной блокировке в случае с потоками можно заметить, что процесс простаивает, и при подключении отладчиком можно увидеть в трассировке стека блокирующий системный вызов. В случае с асинхронным мьютексом, состояние блокировки хранится во внутренней структуре экзекьютора (или другого крейта) и отследить это состояние уже существенно сложнее.
Выводы
При выборе мьютекса в асинхронном коде необходимо определиться с компромиссами того или иного подхода. В первую очередь, стоит убедиться в необходимости мьютекса и многопоточного экзекьютора. После этого решение достаточно простое: либо использовать мьютекс из стандартной библиотеки, если он не используется между await-точками, иначе в лучше использовать асинхронную версию примитива. Последнее утверждение вероятно не всегда истинно, для места критичного к производительности имеет смысл написать релевантный бенчмарк.