Pull to refresh

UDP и C# Reactive Extensions

Reading time4 min
Views23K
Недавно прочитал пост о UDP и C# async/await, в котором описание решения несложной задачи по опросу устройств по UDP одним клиентом. Решение задачи с помощью async\await действительно сокращает объем кода, по сравнению с ручной реализацией асинхронных вызовов. С другой стороны создает много проблем с синхронизацией задач, конкурентным доступом к данным и обработкой исключений. Полученное решение очень подвержено ошибкам. Первоначальная версия автора содержала ошибки неосвобождения ресурсов.

Можно ли сделать проще и надежнее?


А в чем собственно проблема?


Проблема в методе UdpClient.Receive(-Async). Этот метод не реентераблен, то есть если клиент уже ждет прихода датаграммы, то нельзя вызвать этот метод еще раз. Даже если не выпадет ошибка, то вполне можно получить датаграмму, ожидаемую другим «потоком». Поэтому нужно писать дополнительный код, который синхронизирует действия пользователя и состояние UdpClient.

async\await и Tasks Parallel Library не имеет готовых средств синхронизации. Нужно или руками писать код, как в исходной статье, или использовать готовые библиотеки, вроде TPL Dataflow. Но, увы, Dataflow очень тяжеловесен.

Reactive Extensions


Вместо TPL Dataflow можно использовать Reactive Extensions (RX). RX описывает асинхронные потоки данных (асинхронные последовательности). В RX много функций, для создания потоков данных и манипуляции ими. RX позволяет работать не только с IO, но и «потоками событий», генерируемыми элементами интерфейса. Это позволяет всю программу описать в виде набора потоков данных.

Пример кода
Для решения исходной задачи понадобится в проект добавить библиотеку Rx-Main из NuGet и написать несколько хелперов:
public static IObservable<UdpReceiveResult> ReceiveObservable(this UdpClient client)
{
    return client.ReceiveAsync().ToObservable();
}

public static IObservable<int> SendObservable(this UdpClient client, byte[] msg, int bytes, string ip, int port)
{
    return client.SendAsync(msg, bytes, ip, port).ToObservable();
}

public static IObservable<UdpReceiveResult> ReceiveStream(this UdpClient client)
{
    return Observable.Defer(() => client.ReceiveObservable()).Repeat();
}

Первые два хелпера превращают Task в IObservable (асинхронную последовательность из одного элемента) с помощью метода-расширения.
Последний хелпер как раз показывает пример манипуляции последовательностями.
Observable.Defer — откладывает вызов конструктора последовательности в параметре до того, как появится подписчик.
Метод-расширение .Repeat() повторяет бесконечно исходную последовательность.
Вместе два метода создают бесконечный цикл получения датаграмм из сокета.

Теперь метод отправки и получения данных:
public IObservable<byte[]> SendReceiveUdpAsync(byte[] msg, string ip, int port, int timeOut)
{
    var o = from _ in this.client.SendObservable(msg, msg.Length, ip, port)
            from r in receiveStream
            where r.RemoteEndPoint.Address.ToString() == ip && r.RemoteEndPoint.Port == port
            select r.Buffer;

    return o.Take(1).Timeout(TimeSpan.FromMilliseconds(timeOut));
}

Да-да, RX поддерживает Linq для асинхронных последовательностей.
Это Linq выражение довольно тяжело для понимания без знания RX, но суть его очень простая: после получения результата из потока SendObservable подписаться на поток receiveStream и получить только те элементы, которые удовлетворяют предикату в where, вернуть буфер из полученной датаграммы. Далее берется один результат получившейся последовательности и навешивается тайм-аут.

Самая важная часть кода — определение receiveStream:
receiveStream = client.ReceiveStream().Publish().RefCount();


Горячие, холодные и теплые последовательности
Когда вы работаете с последовательностями RX, то важно знать их «температуру».

Холодные последовательности — те, которые появляются при появлении подписчика последовательности и исчезают когда подписчик перестает существовать.
Метод-расширение ReceiveStream возвращает как раз такую последовательность. Это значит, что у каждого подписчика будет своя последовательность, то есть будут параллельно происходить несколько вызов UdpClient.ReceiveAsync и проблема, описанная в начале, не решается.

Горячие последовательности — которые существуют независимо от подписчиков. Например последовательность движений мыши пользователя. Функция Publish в коде выше позволяет превратить холодную последовательность в горячую. Но это несет другую проблему. Если в конструкторе UdpClient не указать порт и вызывать Receive до вызова Send, то выпадет ошибка.

Поэтому нам нужен промежуточный вариант — последовательность должна быть общей для всех подписчиков и должна существовать, пока есть хотя бы один подписчик. Такая последовательность называется «теплой» и создается вызовом RefCount.

Подписка на события
Для тестирования я написал также функцию «сервера»:
public IDisposable Listen(Func<UdpReceiveResult, byte[]> process)
{
    return receiveStream.Subscribe(async r =>
    {
        var msg = process(r);
        await client.SendObservable(msg, msg.Length, r.RemoteEndPoint);
    });
}

Метод Subscribe позволяет указать действие, которое будет вызываться на каждый элемент асинхронной последовательности. Также можно задать действие для конца последовательности и для исключения.

Также стоит обратить внимание, что RX поддерживает async\await, то есть вам не надо знать RX, чтобы использовать код, построенный на основе асинхронных последовательностей.

Заключение


Получившийся код не содержит ни одного цикла, ни одной явной синхронизации, ни одного создания потока или задачи. При этом код полностью асинхронный и безопасный.
RX обязательно стоить изучить, даже если вы не будете его использовать. Основная часть Rx была придумана с помощью применения принципа двойственности монад к стандартным интерфейсам IEnumerable и IEnumerator, поэтому RX получился компактный и мощный. Кроме того RX есть и для JavaScript, C++, Java, Scala и Python, Ruby.

Исходный код вместе с клиентом и сервером выложил на github — github.com/gandjustas/UdpRxSample.
Tags:
Hubs:
Total votes 22: ↑20 and ↓2+18
Comments10

Articles