Pull to refresh

Comments 14

+500 Статья именно того формата, который лично мне нужен от Хабра. Пишите о том, что вы делаете и о том, что вы знаете: как делали и что знаете. Не надо о том, как лучше сделать и о том, что все знают... Надо только про себя.

рад, что статья вам понравилась !

Что-то у вас переделка read как-то наоборот вышла. Была проблема, которая и не проблема вовсе (обычно), вы сделали решение, которое всё ломает (обычно). Но у вас почему-то стало лучше, а не хуже. Возможно, что-то вы недоговариваете…


Вот у вас сервер не прислал очередного пакета, читать нечего. Раньше код просто спал и ждал пакет, теперь он сразу же возвращает управление. Внимание, вопрос: чем таким может заниматься читающая задача когда ей читать нечего? Обычно ей заниматься нечем, а потому блокировка (асинхронная) на чтении из потока — желательна. Если же эта задача занимается сразу двумя делами — почему бы не запустить две разные задачи?


Посмотрим код:


        tokio::spawn(async move {
            loop {
                match &mut *reader.lock().await {
                    Some(reader) => {
                        if let Some(packets) = reader.read().ok() {
                            input_queue.lock().unwrap().push_back(packets);
                        }
                    },
                    None => {
                        message_income.send_error_message(String::from("Not connected to TCP"));
                    },
                };

                sleep(Duration::from_millis(READ_TIMEOUT)).await;
            }
        })

Как видно, ничем у вас задача не занята между чтениями, просто спит. Зачем спит-то?


Вам надо не try_read использовать, а вернуться к нормальному read, а сон убрать.

нормальный read в моем случае не работает. Вкратце, ситуация следующая: в процессе входа в мир клиент сначала логинится к LoginServer, после выбора рилма из списка - происходит коннект на WorldServer. Т.е. когда я делаю этот второй коннект, я подменяю текущий TCPStream на новый. И вот если при коннекте на мой локальный сервер (mangos) все ок, то при коннекте на любой удаленный - клиент просто висит, долго, а потом в конце-концов коннектится таки, но не может прочитать пакеты и падает (нормальный read прочел пакет с 0 длины почему-то). На мой взгляд, причина тут в том, что чтение происходит непрерывно, и когда я объявляю сон (либо просто использую try_read), то на клиентской стороне образуется зазор для соединения. В случае с нормальным read такого зазора нету, поэтому либо делать сон с минимально малым временем на соединение, либо - использовать try_read.

Что такое "зазор для соединения" и зачем он нужен-то?


Что-то мне кажется, проблема где-то в другом месте. Например, в механизме "подмены" TCPStream. Вы случайно не путаетесь "подменить его" во время выполнения вызова read? В таком случае вызов read у вас продолжает читать данные от LoginServer, которых нет (что не удивительно), из-за чего он и ждёт пока не закроется соединение (это объясняет возврат 0). Кстати, вы вообще закрываете соединение к LoginServer?


Просто отмените задачу чтения, дождитесь её окончания и запустите новую.

вы правы, я не закрываю соединение. Ваше объяснение очень похоже на действительное положение дел. Я попробую отрефакторить и напишу, что в итоге получилось.

насчет сна - разве он не для того, чтобы снять нагрузку на CPU ? насколько я знаю, loop создает такую нагрузку.

Если использовать нормальное ожидание — не будет никакой избыточной нагрузки.

можете уточнить, что вы подразумеваете под "нормальным ожиданием" ?

Вторая придирка к тому как вы читаете и парсите пакеты. Давно уже мечтаю найти в каком таком учебнике учат читать данные из потока TCP таким образом, и сжечь всю эту ересь. Увы, уже слишком поздно, и остаётся только объяснять каждый раз одно и то же…


Вы исходите из того, что у вас каждый раз считывается целое количество пакетов. Но это не так. Если непрочитанных пакетов будет больше, чем вмещается в окно TCP или в ваш фиксированный буфер — вы прочитаете "обрывок" одного из пакетов, после чего весь ваш клиент упадёт (судя по обилию unwrap в коде парсера).


А ведь библиотека даёт вам все необходимые инструменты, чтобы можно было читать пакеты не боясь прочитать лишь обрывок! Смотрите:


  1. Используем BufReader<R> с внутренним буфером, чтобы не бояться делать слишком частые вызовы read:


    pub struct Reader {
        stream: BufReader<OwnedReadHalf>,
        // …
    }
    
    impl Reader {
        pub fn new(reader: OwnedReadHalf) -> Self {
            Self {
                stream:  BufReader::new(reader),
                // …
            }
        }
    }

  2. Читаем длину пакета прямо из потока:


    let size = self.reader.read_u16().await? as usize;

  3. Создаём буфер нужного размера и читаем сразу в него:


    let buf = vec![0u8; size];
    self.reader.read_exact(buf.as_mut_slice()).await?;


Всё! Если в BufReader есть свежий пакет — он будет прочитан сразу же, если в BufReader нет части данных — задача подождёт пока они появятся. Если в BufReader есть ещё пакеты — они дождутся своей очереди.

это интересное замечание, очень хочется проверить, только не уверен, что правильно вас до конца понимаю. Вот метод, который использует Reader и как он используется в Client :

// stream.rs
pub async fn read(&mut self) -> Result<Vec<Vec<u8>>, Error> {
	let mut buffer = [0u8; 65536];

	match self.stream.read(&mut buffer).await {
		Ok(bytes_count) => {
			// ...
		},
		Err(err) => {
			// ...
		},
	}
}

// client.rs
fn handle_read(&mut self) -> JoinHandle<()> {
	let input_queue = Arc::clone(&self._input_queue);
	let reader = Arc::clone(&self._reader);

	tokio::spawn(async move {
		loop {
			match &mut *reader.lock().await {
				Some(reader) => {
					if let Some(packets) = reader.read().await.ok() {
						input_queue.lock().unwrap().push_back(packets);
					}
				},
				None => {},
			};

			sleep(Duration::from_millis(READ_TIMEOUT)).await;
		}
	})
}

правильно ли я понимаю, что bytes_count может быть длиной фрагмента, а не целого пакета ? (на практике такого еще не было, но мало ли)

далее, ваш второй пример, строка, на которой вы получаете `size` по всей видимости используется вне этого Reader (т.к. self.reader я использую только в Client ):

let size = self.reader.read_u16().await? as usize;

но в таком случае, у меня не предусмотрен вызов read_u16 из Reader. Кажется, в вашем примере чего-то не хватает. Можете, пожалуйста, уточнить, где подразумевается использовать код начиная с пункта 2 ? Если внутри метода read , то зачем нам получать size, если он уже возвращается в bytes_count; если извне - то как в таком случае должен выглядеть Reader ? просто обернуть stream в BufReader явно недостаточно

Да, bytes_count может не делиться на пакеты нацело.


Посмотрите в ваш метод parse_packets, он первым делом преобразует переданный ему вектор в курсор и читает из него пакеты. В начале каждого пакета идёт длина, потом количество байт соответствующее этой длине. Вот эту-то информацию я и предлагаю читать напрямую из self.reader, без промежуточного буфера (о буфере позаботится BufReader)

Sign up to leave a comment.

Articles