Недавно возникла необходимость решить следующую несложную задачку: есть несколько десятков устройств (учебных комплексов), у которых нужно регулярно запрашивать их текущее состояние. Комплексы общаются по протоколу UDP, и хотелось сделать так, чтобы не задумываться о цикле опроса и определении, от какого же устройства пришел ответ, а просто посылать запрос — и когда пришел результат — записывать его. Задачу эту я решал и раньше, но захотелось посмотреть, насколько концепция async/await упростит и сократит код. Оказалось, что финальный результат занимает меньше странички.
Вся логика опроса состоит всего лишь из двух методов — цикла чтения сокета UDP и метода посылки команды на устройство.
Когда посылаем команду, есть две вещи, которые надо принять во внимание — это 1) после посылки команды нам надо ждать ответа от устройства и 2) ответ может не прийти — тогда необходимо вернуть исключение, которое скажет нам о таймауте.
Асинхронный метод посылки команды выглядит следующим образом (*см. Update 1):
Здесь _client — это стандартный UdpClient.
Мы посылаем команду и по await ждем результата, который нам должен вернуть Task, сохраненный в словарике с ключом нашего соединения (именно от него мы и ждем ответ). Когда чтение начинается — мы заносим TaskCompletionSource в словарик, когда мы получаем ответ и соединение больше не нужно, либо при выброшенном исключении — удаляем из словарика.
Сам словарик (ConcurrentDictionary используем вместо Dictionary для того, чтобы избежать проблем с кросспоточными вызовами):
Тут есть момент, который заслуживает внимания — это метод-расширение WithCancellation(token). Он нужен для того, чтобы поддержать отмену операции при помощи CancellationToken, и отменяет задачу, возвращая исключение при превышении заданного таймаута.
А вот и сам цикл чтения: читаем, пока хватит сил, и если пришедшая датаграмма имеет адресом соединение, ключ с параметрами которого мы уже занесли в словарик, то результат помещается в TaskCompletionSource по этому ключу, и мы переходим обратно в метод посылки сообщения на await tcs.Task, только уже имея на руках нужный результат от устройства, этот результат и вернем в место вызова.
Итог радует. Вот так async-await упростил задачу опроса множества устройств по протоколу UDP.
Update 1
Как было справедливо отмечено в комментариях, метод SendReceiveUdpAsync необходимо обернуть в try{} finally{}, чтобы в случае отмены задачи и выброса исключения удалялось значение из словарика.
Update 2
Использование Reactive Extensions для той же задачи
habrahabr.ru/post/238445
Вся логика опроса состоит всего лишь из двух методов — цикла чтения сокета UDP и метода посылки команды на устройство.
Когда посылаем команду, есть две вещи, которые надо принять во внимание — это 1) после посылки команды нам надо ждать ответа от устройства и 2) ответ может не прийти — тогда необходимо вернуть исключение, которое скажет нам о таймауте.
Асинхронный метод посылки команды выглядит следующим образом (*см. Update 1):
public async Task<byte[]> SendReceiveAsync(byte[] msg, string ip, int port, int timeOut)
{
var endPoint = new IPEndPoint(IPAddress.Parse(ip), port);
var tcs = new TaskCompletionSource<byte[]>();
try
{
var tokenSource = new CancellationTokenSource(timeOut);
var token = tokenSource.Token;
if (!_tcsDictionary.ContainsKey(endPoint)) _tcsDictionary.TryAdd(endPoint, tcs);
_client.Send(msg, msg.Length, ip, port);
var result = await tcs.Task.WithCancellation(token);
return result;
}
finally
{
_tcsDictionary.TryRemove(endPoint, out tcs);
}
}
Здесь _client — это стандартный UdpClient.
Мы посылаем команду и по await ждем результата, который нам должен вернуть Task, сохраненный в словарике с ключом нашего соединения (именно от него мы и ждем ответ). Когда чтение начинается — мы заносим TaskCompletionSource в словарик, когда мы получаем ответ и соединение больше не нужно, либо при выброшенном исключении — удаляем из словарика.
Сам словарик (ConcurrentDictionary используем вместо Dictionary для того, чтобы избежать проблем с кросспоточными вызовами):
private ConcurrentDictionary<Tuple<string,int>, TaskCompletionSource<byte[]>> _tcsDictionary;
Тут есть момент, который заслуживает внимания — это метод-расширение WithCancellation(token). Он нужен для того, чтобы поддержать отмену операции при помощи CancellationToken, и отменяет задачу, возвращая исключение при превышении заданного таймаута.
static class TaskExtension
{
public static async Task<T> WithCancellation<T>(this Task<T> task, CancellationToken cancellationToken)
{
var tcs = new TaskCompletionSource<bool>();
using (cancellationToken.Register(
s => ((TaskCompletionSource<bool>)s).TrySetResult(true), tcs))
if (task != await Task.WhenAny(task, tcs.Task))
throw new OperationCanceledException(cancellationToken);
return await task;
}
}
А вот и сам цикл чтения: читаем, пока хватит сил, и если пришедшая датаграмма имеет адресом соединение, ключ с параметрами которого мы уже занесли в словарик, то результат помещается в TaskCompletionSource по этому ключу, и мы переходим обратно в метод посылки сообщения на await tcs.Task, только уже имея на руках нужный результат от устройства, этот результат и вернем в место вызова.
Task.Run(() =>
{
IPEndPoint ipEndPoint = null;
while (true)
{
try
{
var receivedBytes = _client.Receive(ref ipEndPoint);
TaskCompletionSource<byte[]> tcs;
if (_tcsDictionary.TryGetValue(ipEndPoint, out tcs)) tcs.SetResult(receivedBytes);
}
catch (SocketException)
{
;//при невозможности соединения продолжаем работать
}
}
});
Итог радует. Вот так async-await упростил задачу опроса множества устройств по протоколу UDP.
Update 1
Как было справедливо отмечено в комментариях, метод SendReceiveUdpAsync необходимо обернуть в try{} finally{}, чтобы в случае отмены задачи и выброса исключения удалялось значение из словарика.
Update 2
Использование Reactive Extensions для той же задачи
habrahabr.ru/post/238445