UDP и C# Reactive Extensions

  • Tutorial
Недавно прочитал пост о UDP и C# async/await, в котором описание решения несложной задачи по опросу устройств по UDP одним клиентом. Решение задачи с помощью async\await действительно сокращает объем кода, по сравнению с ручной реализацией асинхронных вызовов. С другой стороны создает много проблем с синхронизацией задач, конкурентным доступом к данным и обработкой исключений. Полученное решение очень подвержено ошибкам. Первоначальная версия автора содержала ошибки неосвобождения ресурсов.

Можно ли сделать проще и надежнее?


А в чем собственно проблема?


Проблема в методе UdpClient.Receive(-Async). Этот метод не реентераблен, то есть если клиент уже ждет прихода датаграммы, то нельзя вызвать этот метод еще раз. Даже если не выпадет ошибка, то вполне можно получить датаграмму, ожидаемую другим «потоком». Поэтому нужно писать дополнительный код, который синхронизирует действия пользователя и состояние UdpClient.

async\await и Tasks Parallel Library не имеет готовых средств синхронизации. Нужно или руками писать код, как в исходной статье, или использовать готовые библиотеки, вроде TPL Dataflow. Но, увы, Dataflow очень тяжеловесен.

Reactive Extensions


Вместо TPL Dataflow можно использовать Reactive Extensions (RX). RX описывает асинхронные потоки данных (асинхронные последовательности). В RX много функций, для создания потоков данных и манипуляции ими. RX позволяет работать не только с IO, но и «потоками событий», генерируемыми элементами интерфейса. Это позволяет всю программу описать в виде набора потоков данных.

Пример кода
Для решения исходной задачи понадобится в проект добавить библиотеку Rx-Main из NuGet и написать несколько хелперов:
public static IObservable<UdpReceiveResult> ReceiveObservable(this UdpClient client)
{
    return client.ReceiveAsync().ToObservable();
}

public static IObservable<int> SendObservable(this UdpClient client, byte[] msg, int bytes, string ip, int port)
{
    return client.SendAsync(msg, bytes, ip, port).ToObservable();
}

public static IObservable<UdpReceiveResult> ReceiveStream(this UdpClient client)
{
    return Observable.Defer(() => client.ReceiveObservable()).Repeat();
}

Первые два хелпера превращают Task в IObservable (асинхронную последовательность из одного элемента) с помощью метода-расширения.
Последний хелпер как раз показывает пример манипуляции последовательностями.
Observable.Defer — откладывает вызов конструктора последовательности в параметре до того, как появится подписчик.
Метод-расширение .Repeat() повторяет бесконечно исходную последовательность.
Вместе два метода создают бесконечный цикл получения датаграмм из сокета.

Теперь метод отправки и получения данных:
public IObservable<byte[]> SendReceiveUdpAsync(byte[] msg, string ip, int port, int timeOut)
{
    var o = from _ in this.client.SendObservable(msg, msg.Length, ip, port)
            from r in receiveStream
            where r.RemoteEndPoint.Address.ToString() == ip && r.RemoteEndPoint.Port == port
            select r.Buffer;

    return o.Take(1).Timeout(TimeSpan.FromMilliseconds(timeOut));
}

Да-да, RX поддерживает Linq для асинхронных последовательностей.
Это Linq выражение довольно тяжело для понимания без знания RX, но суть его очень простая: после получения результата из потока SendObservable подписаться на поток receiveStream и получить только те элементы, которые удовлетворяют предикату в where, вернуть буфер из полученной датаграммы. Далее берется один результат получившейся последовательности и навешивается тайм-аут.

Самая важная часть кода — определение receiveStream:
receiveStream = client.ReceiveStream().Publish().RefCount();


Горячие, холодные и теплые последовательности
Когда вы работаете с последовательностями RX, то важно знать их «температуру».

Холодные последовательности — те, которые появляются при появлении подписчика последовательности и исчезают когда подписчик перестает существовать.
Метод-расширение ReceiveStream возвращает как раз такую последовательность. Это значит, что у каждого подписчика будет своя последовательность, то есть будут параллельно происходить несколько вызов UdpClient.ReceiveAsync и проблема, описанная в начале, не решается.

Горячие последовательности — которые существуют независимо от подписчиков. Например последовательность движений мыши пользователя. Функция Publish в коде выше позволяет превратить холодную последовательность в горячую. Но это несет другую проблему. Если в конструкторе UdpClient не указать порт и вызывать Receive до вызова Send, то выпадет ошибка.

Поэтому нам нужен промежуточный вариант — последовательность должна быть общей для всех подписчиков и должна существовать, пока есть хотя бы один подписчик. Такая последовательность называется «теплой» и создается вызовом RefCount.

Подписка на события
Для тестирования я написал также функцию «сервера»:
public IDisposable Listen(Func<UdpReceiveResult, byte[]> process)
{
    return receiveStream.Subscribe(async r =>
    {
        var msg = process(r);
        await client.SendObservable(msg, msg.Length, r.RemoteEndPoint);
    });
}

Метод Subscribe позволяет указать действие, которое будет вызываться на каждый элемент асинхронной последовательности. Также можно задать действие для конца последовательности и для исключения.

Также стоит обратить внимание, что RX поддерживает async\await, то есть вам не надо знать RX, чтобы использовать код, построенный на основе асинхронных последовательностей.

Заключение


Получившийся код не содержит ни одного цикла, ни одной явной синхронизации, ни одного создания потока или задачи. При этом код полностью асинхронный и безопасный.
RX обязательно стоить изучить, даже если вы не будете его использовать. Основная часть Rx была придумана с помощью применения принципа двойственности монад к стандартным интерфейсам IEnumerable и IEnumerator, поэтому RX получился компактный и мощный. Кроме того RX есть и для JavaScript, C++, Java, Scala и Python, Ruby.

Исходный код вместе с клиентом и сервером выложил на github — github.com/gandjustas/UdpRxSample.
Поделиться публикацией
Ой, у вас баннер убежал!

Ну. И что?
Реклама
Комментарии 10
    +3
    Хочу отметить, что получившийся код проигрывает по скорости коду прошлого автора, если параллельно выполняется слишком много запросов. Существует ли способ избежать проверки всех прилетевших событий по-одному в блоке Where? Какой-нибудь GroupBy или Join?

    И еще такой вопрос. В какой момент SendReceiveUdpAsync подписывается на receiveStream? Такое ощущение, что уже после вызова SendObservable. Не получится ли, что ответ от сервера вдруг прилетит раньше, чем SendReceiveUdpAsync успеет подписаться на входящий поток?
      0
      Да, Rx скоростью не блещет. Как минимум в 2 раза медленнее TPL. Но если дело касается IO, то скорость работы малозначительна.
      Для High Frequency, увы, не подходит. Но есть RX для CPP, там компилятор успешно инлайнит большинство вызовов.

      Избежать проверки всех можно с помощью GroupBy до оператора Connect.

      SendReceiveUdpAsync подписывается на receiveStream после SendObservable, это моя ошибка, в которую я постоянно попадаю. Надо использовать Zip, а не SelectMany.
      Поправлю пост и код в ближайшее время.
        +1
        Я говорил не про какие-то два раза, а про отличие в алгоритмической сложности. На тысяче одновременных запросов ваше решение проиграет решению NeoNN в тысячу раз: узким местом является условие Where — точнее, та структура, которую оно строит.

        PS GroupBy, как и Join, действительно присутствуют — но они тут не помогут, поскольку негде сохранить накопленный результат. Для того, чтобы воспользоваться ими, надо иметь не набор одиночных запросов — а поток запросов, с которым можно будет слить поток ответов. Но я бы не сказал, что это выходит проще решения без Rx.
          0
          В реальном случае, когда много параллельных запросов, имеет смысл сделать не один метод SendReceiveUdpAsync, а асинхронную последовательность вызовов Send, сгрупированную по (ip,port) и джоинить на последовательность ответов, сгруппированную по тому же ключу. Внутри используется Dictionary для сопоставления, так что алгоритмическая сложность будет такая же.

          Но для понимания такой код будет слишком сложный.

          А если параллельных вызовов нет или мало, например каждый вызов запускается по таймеру, то никаких проблем нет.
      0
      Скажите пожалуйста а Rx подходит для веб-решений(asp .net mvc)? Нет ли противопоказаний?
        0
        Практически нет. В вебе все состояние хранится вне веб-сервера. А в RX состояние скрыто внутри подписчиков.

        Фактически единственная область применения RX — stateful приложение, которое вынуждена обрабатывать много потоков данных, получаемых по сети, от устройств от пользователя. При этом RX плохо (точнее никак) масштабируется. Вынести часть кода RX на другие серверы штатными способами нельзя.
          0
          Всё зависит от того, как готовить. Вот Netflix, например и веб-сервера и клиентов пишут на Rx (там Rx-Js и Rx-Java, но сути это не меняет). С нагрузками у них всё в порядке.
          В целом оно работает, но, конечно, такую мощную и развивающуюся штуку, как asp.net mvc переписывать на Rx — в 95% случаев — это перебор.
            0
            Когда в C# не было async\await я активно пользовался RX. С момента появления async\await нужда в нем почти отпала. В Java нету async\await, поэтому rx-java отлично применяется. UI на JS — отличный пример приложения со множеством потоков данных.
              0
              Согласен, для клиентской разработки применений гораздо больше.
          0
          Если на клиенте, то RxJS отлично подходит для многих задач. Особенно в связке с MVVM фреймворками.
          На сервере можно применять обычно нет нужды. Отлично подходит, когда есть много асинхронных потоков данных.

        Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

        Самое читаемое