Pull to refresh

UDP и C# async/await

.NET *C# *
Tutorial
Недавно возникла необходимость решить следующую несложную задачку: есть несколько десятков устройств (учебных комплексов), у которых нужно регулярно запрашивать их текущее состояние. Комплексы общаются по протоколу UDP, и хотелось сделать так, чтобы не задумываться о цикле опроса и определении, от какого же устройства пришел ответ, а просто посылать запрос — и когда пришел результат — записывать его. Задачу эту я решал и раньше, но захотелось посмотреть, насколько концепция async/await упростит и сократит код. Оказалось, что финальный результат занимает меньше странички.



Вся логика опроса состоит всего лишь из двух методов — цикла чтения сокета UDP и метода посылки команды на устройство.

Когда посылаем команду, есть две вещи, которые надо принять во внимание — это 1) после посылки команды нам надо ждать ответа от устройства и 2) ответ может не прийти — тогда необходимо вернуть исключение, которое скажет нам о таймауте.

Асинхронный метод посылки команды выглядит следующим образом (*см. Update 1):

        public async Task<byte[]> SendReceiveAsync(byte[] msg, string ip, int port, int timeOut)
        {
            var endPoint = new IPEndPoint(IPAddress.Parse(ip), port);
            var tcs = new TaskCompletionSource<byte[]>();

            try
            {
                var tokenSource = new CancellationTokenSource(timeOut);
                var token = tokenSource.Token;
                if (!_tcsDictionary.ContainsKey(endPoint)) _tcsDictionary.TryAdd(endPoint, tcs);
                _client.Send(msg, msg.Length, ip, port);

                var result = await tcs.Task.WithCancellation(token);
                return result;
            }

            finally
            {
                _tcsDictionary.TryRemove(endPoint, out tcs);
            }
        }

Здесь _client — это стандартный UdpClient.
Мы посылаем команду и по await ждем результата, который нам должен вернуть Task, сохраненный в словарике с ключом нашего соединения (именно от него мы и ждем ответ). Когда чтение начинается — мы заносим TaskCompletionSource в словарик, когда мы получаем ответ и соединение больше не нужно, либо при выброшенном исключении — удаляем из словарика.

Сам словарик (ConcurrentDictionary используем вместо Dictionary для того, чтобы избежать проблем с кросспоточными вызовами):

private ConcurrentDictionary<Tuple<string,int>, TaskCompletionSource<byte[]>> _tcsDictionary;


Тут есть момент, который заслуживает внимания — это метод-расширение WithCancellation(token). Он нужен для того, чтобы поддержать отмену операции при помощи CancellationToken, и отменяет задачу, возвращая исключение при превышении заданного таймаута.


    static class TaskExtension
    {
        public static async Task<T> WithCancellation<T>(this Task<T> task, CancellationToken cancellationToken)
        {
            var tcs = new TaskCompletionSource<bool>();

            using (cancellationToken.Register(
                        s => ((TaskCompletionSource<bool>)s).TrySetResult(true), tcs))
                if (task != await Task.WhenAny(task, tcs.Task))
                    throw new OperationCanceledException(cancellationToken);
            return await task;
        }
    }


А вот и сам цикл чтения: читаем, пока хватит сил, и если пришедшая датаграмма имеет адресом соединение, ключ с параметрами которого мы уже занесли в словарик, то результат помещается в TaskCompletionSource по этому ключу, и мы переходим обратно в метод посылки сообщения на await tcs.Task, только уже имея на руках нужный результат от устройства, этот результат и вернем в место вызова.

            Task.Run(() =>
            {
                IPEndPoint ipEndPoint = null;

                while (true)
                {
                    try
                    {
                        var receivedBytes = _client.Receive(ref ipEndPoint);
                        TaskCompletionSource<byte[]> tcs;
                        if (_tcsDictionary.TryGetValue(ipEndPoint, out tcs)) tcs.SetResult(receivedBytes);
                    }
                    catch (SocketException)
                    {
                        ;//при невозможности соединения продолжаем работать
                    }

                }
            });

Итог радует. Вот так async-await упростил задачу опроса множества устройств по протоколу UDP.

Update 1
Как было справедливо отмечено в комментариях, метод SendReceiveUdpAsync необходимо обернуть в try{} finally{}, чтобы в случае отмены задачи и выброса исключения удалялось значение из словарика.

Update 2
Использование Reactive Extensions для той же задачи
habrahabr.ru/post/238445
Tags:
Hubs:
Total votes 25: ↑22 and ↓3 +19
Views 32K
Comments Comments 16