Комментарии 14
+500 Статья именно того формата, который лично мне нужен от Хабра. Пишите о том, что вы делаете и о том, что вы знаете: как делали и что знаете. Не надо о том, как лучше сделать и о том, что все знают... Надо только про себя.
Автор, пеши еще!
Что-то у вас переделка read как-то наоборот вышла. Была проблема, которая и не проблема вовсе (обычно), вы сделали решение, которое всё ломает (обычно). Но у вас почему-то стало лучше, а не хуже. Возможно, что-то вы недоговариваете…
Вот у вас сервер не прислал очередного пакета, читать нечего. Раньше код просто спал и ждал пакет, теперь он сразу же возвращает управление. Внимание, вопрос: чем таким может заниматься читающая задача когда ей читать нечего? Обычно ей заниматься нечем, а потому блокировка (асинхронная) на чтении из потока — желательна. Если же эта задача занимается сразу двумя делами — почему бы не запустить две разные задачи?
Посмотрим код:
tokio::spawn(async move {
loop {
match &mut *reader.lock().await {
Some(reader) => {
if let Some(packets) = reader.read().ok() {
input_queue.lock().unwrap().push_back(packets);
}
},
None => {
message_income.send_error_message(String::from("Not connected to TCP"));
},
};
sleep(Duration::from_millis(READ_TIMEOUT)).await;
}
})
Как видно, ничем у вас задача не занята между чтениями, просто спит. Зачем спит-то?
Вам надо не try_read использовать, а вернуться к нормальному read, а сон убрать.
нормальный read в моем случае не работает. Вкратце, ситуация следующая: в процессе входа в мир клиент сначала логинится к LoginServer, после выбора рилма из списка - происходит коннект на WorldServer. Т.е. когда я делаю этот второй коннект, я подменяю текущий TCPStream на новый. И вот если при коннекте на мой локальный сервер (mangos) все ок, то при коннекте на любой удаленный - клиент просто висит, долго, а потом в конце-концов коннектится таки, но не может прочитать пакеты и падает (нормальный read прочел пакет с 0 длины почему-то). На мой взгляд, причина тут в том, что чтение происходит непрерывно, и когда я объявляю сон (либо просто использую try_read), то на клиентской стороне образуется зазор для соединения. В случае с нормальным read такого зазора нету, поэтому либо делать сон с минимально малым временем на соединение, либо - использовать try_read.
Что такое "зазор для соединения" и зачем он нужен-то?
Что-то мне кажется, проблема где-то в другом месте. Например, в механизме "подмены" TCPStream. Вы случайно не путаетесь "подменить его" во время выполнения вызова read? В таком случае вызов read у вас продолжает читать данные от LoginServer, которых нет (что не удивительно), из-за чего он и ждёт пока не закроется соединение (это объясняет возврат 0). Кстати, вы вообще закрываете соединение к LoginServer?
Просто отмените задачу чтения, дождитесь её окончания и запустите новую.
насчет сна - разве он не для того, чтобы снять нагрузку на CPU ? насколько я знаю, loop создает такую нагрузку.
Вторая придирка к тому как вы читаете и парсите пакеты. Давно уже мечтаю найти в каком таком учебнике учат читать данные из потока TCP таким образом, и сжечь всю эту ересь. Увы, уже слишком поздно, и остаётся только объяснять каждый раз одно и то же…
Вы исходите из того, что у вас каждый раз считывается целое количество пакетов. Но это не так. Если непрочитанных пакетов будет больше, чем вмещается в окно TCP или в ваш фиксированный буфер — вы прочитаете "обрывок" одного из пакетов, после чего весь ваш клиент упадёт (судя по обилию unwrap в коде парсера).
А ведь библиотека даёт вам все необходимые инструменты, чтобы можно было читать пакеты не боясь прочитать лишь обрывок! Смотрите:
Используем
BufReader<R>
с внутренним буфером, чтобы не бояться делать слишком частые вызовы read:
pub struct Reader { stream: BufReader<OwnedReadHalf>, // … } impl Reader { pub fn new(reader: OwnedReadHalf) -> Self { Self { stream: BufReader::new(reader), // … } } }
Читаем длину пакета прямо из потока:
let size = self.reader.read_u16().await? as usize;
Создаём буфер нужного размера и читаем сразу в него:
let buf = vec![0u8; size]; self.reader.read_exact(buf.as_mut_slice()).await?;
Всё! Если в BufReader есть свежий пакет — он будет прочитан сразу же, если в BufReader нет части данных — задача подождёт пока они появятся. Если в BufReader есть ещё пакеты — они дождутся своей очереди.
это интересное замечание, очень хочется проверить, только не уверен, что правильно вас до конца понимаю. Вот метод, который использует Reader
и как он используется в Client
:
// stream.rs
pub async fn read(&mut self) -> Result<Vec<Vec<u8>>, Error> {
let mut buffer = [0u8; 65536];
match self.stream.read(&mut buffer).await {
Ok(bytes_count) => {
// ...
},
Err(err) => {
// ...
},
}
}
// client.rs
fn handle_read(&mut self) -> JoinHandle<()> {
let input_queue = Arc::clone(&self._input_queue);
let reader = Arc::clone(&self._reader);
tokio::spawn(async move {
loop {
match &mut *reader.lock().await {
Some(reader) => {
if let Some(packets) = reader.read().await.ok() {
input_queue.lock().unwrap().push_back(packets);
}
},
None => {},
};
sleep(Duration::from_millis(READ_TIMEOUT)).await;
}
})
}
правильно ли я понимаю, что bytes_count
может быть длиной фрагмента, а не целого пакета ? (на практике такого еще не было, но мало ли)
далее, ваш второй пример, строка, на которой вы получаете `size` по всей видимости используется вне этого Reader (т.к. self.reader
я использую только в Client
):
let size = self.reader.read_u16().await? as usize;
но в таком случае, у меня не предусмотрен вызов read_u16
из Reader. Кажется, в вашем примере чего-то не хватает. Можете, пожалуйста, уточнить, где подразумевается использовать код начиная с пункта 2 ? Если внутри метода read
, то зачем нам получать size, если он уже возвращается в bytes_count
; если извне - то как в таком случае должен выглядеть Reader
? просто обернуть stream в BufReader
явно недостаточно
Да, bytes_count может не делиться на пакеты нацело.
Посмотрите в ваш метод parse_packets, он первым делом преобразует переданный ему вектор в курсор и читает из него пакеты. В начале каждого пакета идёт длина, потом количество байт соответствующее этой длине. Вот эту-то информацию я и предлагаю читать напрямую из self.reader, без промежуточного буфера (о буфере позаботится BufReader)
Консольный UI и отказ от асинхронного кода в idewave-cli