Как стать автором
Обновить

System.Threading.Channels — высокопроизводительный производитель-потребитель и асинхронность без аллокаций и стэк дайва

Время на прочтение18 мин
Количество просмотров38K
Всего голосов 25: ↑25 и ↓0+25
Комментарии7

Комментарии 7

Спасибо, интересно, хоть и, как выясняется — не совсем ново. Еще в 2018 году были анонсы.
Вопрос по теме: сделал тестовое приложение на винформс, в форме1: создается канал, стартует читатель и ждет данных, открывается форма2(параметр — созданный канал), открывается писатель и начинает писать, читатель получает, вычитывает все данные, выходит из функции.
Но, когда я при открытых формах пытаюсь повторно начать писать в тот же канал — получаю ошибку, что канал закрыт. Как быть в таком случае, когда через паузу снова появились данные для записи? Создавать новый канал?

Правильнее не закрывать канал.

Я понимаю, что где-то мой пробой, но хотелось бы выяснить для себя:
Вот мои писатель и читатель в разных формах, где закрытие и как избежать?
//========================
public async Task ChannelRunWriter(int delayMs, int numberOfReaders,
	int howManyMessages = 10, int maxCapacity = 10)
{
		_channel = Channel.CreateBounded<string>(maxCapacity);
	var writer = _channel.Writer;
	for (int i = 0; i < howManyMessages; i++)
	{
		Console.WriteLine($"2)Writing at {DateTime.Now.ToLongTimeString()}");
		await writer.WriteAsync($"[2]SomeText message '{i}");
	}
	writer.Complete();
}		

//============================
public static async Task ChannelRunReader(int delayMs, int numberOfReaders,
            int howManyMessages = 100, int maxCapacity = 10)
        {
            var finalDelayMs = 25;
            var finalNumberOfReaders = 1;
            var reader = channel.Reader;
            async Task Read(ChannelReader<string> theReader, int readerNumber)
            {
                while (await theReader.WaitToReadAsync())
                {
                    while (theReader.TryRead(out var theMessage))
                    {
                        Console.WriteLine($"*** Reader {readerNumber} read '{theMessage}' at {DateTime.Now.ToLongTimeString()}");
                        await Task.Delay(delayMs);
                    }
                }
            }
            var tasks = new List<Task>();
            for (int i = 0; i < finalNumberOfReaders; i++)
            {
                tasks.Add(Task.Run(() => Read(reader, i + 1)));
                await Task.Delay(10);
            }
            await reader.Completion;
            await Task.WhenAll(tasks);
        }


Вот тут закрытие: writer.Complete();. Очевидно, что после Complete в канал ничего записать уже нельзя, для того этот метод и сделан. Как сообщает Капитан Очевидность, если в канал планируется записывать что-то ещё — не надо вызывать Complete.


Кстати, делать WaitToReadAsync + TryRead в нескольких потоках — не лучшая идея, это ведет к Thundering herd problem, в таких случаях лучше вызывать простой ReadAsync.

Спасибо, понял, в чем ошибка.
Я хочу приспособить канал в приложении, которое читает данные из файловой системы (список файлов, фолдеров, может быть много), по ходу чтения в цикле проверяет существует ли файл, его размер и пишет в базу.
Хочу сделать так: приложение не пишет сразу в базу по одной записи на файл, а пишет в канал, на другом конце читатель вычитывает и пишет в базу.
Как можно использовать каналы для такой задачи вместо Reactive:
приложение, каком-то (нескольких) местах изменяются настройки приложения, нужно, чтобы активные формы перечитали конфиг и отреагировали на изменения.
Один писатель и читатели в разных местах, где надо реагировать? но если канал один, то первый же читатель вычитает сообщение и другие не смогут.

Никак, каналы не для этого предназначены.

Зарегистрируйтесь на Хабре, чтобы оставить комментарий

Публикации

Истории