Достаточно большое название? Да? В этом посте я покажу Вам альтернативный подход в создании простого событийно-ориентированного HTTP-сервера на C#, используя мощь Reactive Extensions.
Я не очень хорош в объяснениях, поэтому процитирую очень интересную статью от Dan York о событийной модели node.js:
Много вещей происходит вокруг этого в экосистеме .NET:
Используя класс HttpListener и Reactive Extensions, мы можем создать нечто наподобие этого:
Некоторые замечания к данному коду:
Вы можете создавать веб-приложения любого типа, основанные на данной концепции. Приложение уровня “hello world” будет выглядеть так:
Рекомендую, чтобы все, что Вы будете делать – было асинхронным. Например, если вы подключаетесь к базе данных, то это должно быть асинхронной операцией, и Вы должны будете удерживать вместе callbacks/observables/Tasks и т.п.
Существует еще более интересное применение, которым я бы хотел поделиться, и которое называется long polling:
Итак, перед нами наиболее простой пример long polling, работающего через вышеприведенный код:
Как Вы можете видеть, мы заставляем наблюдателей работать… При этом отсутствует какая-либо блокирующая операция. Даже чтение из потока – асинхронная операция.
Ниже приведено видео, демонстрирующее работу кода:

И, под конец, исходный код опубликован здесь под opensource, если Вы захотите углубиться в него шаг за шагом или просто изучить.
Отдельные благодарности Gustavo Machado, Silvio Massari и ребятам из Nancy framework за советы и часть кода, который я украл у них.
Введение
Я не очень хорош в объяснениях, поэтому процитирую очень интересную статью от Dan York о событийной модели node.js:
“Традиционный” режим веб-серверов всегда был основан на модели потоков. Когда Вы запускаете Apache или любой другой веб-сервер, он начинает принимать подключения. Когда он принимает подключение, то держит данное подключение открытым до тех пор, пока не закончит обработку страницы, либо другой транзакции. Если чтение страницы с диска или запись результатов в базу данных занимает несколько микросекунд, то веб-сервер блокируется для операций ввода/вывода. (Это именуется как “блокирующее I/O”). Для масштабирования такого типа серверов, Вам потребуется запустить дополнительные копии самого сервера (именуется как “на основе потоков”, т.к. каждая копия обычно требует дополнительный поток операционной системы).
В противоположность этому, Node.JS, использует событийно-ориентированную модель, при которой веб-сервер принимает запросы, быстро ставит их на обработку, затем принимается за следующий запрос. Когда изначальный запрос завершен, то он возвращается в очередь обработки и когда достигает конца очереди, результаты возвращаются обратно (или выполняется все, что потребует следующее действие). Данная модель весьма эффективна и масштабируема, потому что веб-сервер обычно всегда принимает запросы, т.к. не ждет завершения ни одной операции чтения или записи. (Данный метод называется как “неблокирующее I/O” или “событийно-ориентированное I/O”).
Что происходит в мире .NET?
Много вещей происходит вокруг этого в экосистеме .NET:
- Manos de mono (не для .NET, но близок к нему) был создан не так давно, следуя данной концепции
- Node.Net — имплементация Node.JS для среды выполнения .Net, используя JScript.Net
- Kayak – асинхронный HTTP-сервер, написанный на C#
- Frank является клоном Sinatra, написанный на F#
- Nancy будет поддерживать асинхронные обработчики скоро
Альтернативный подход
Используя класс HttpListener и Reactive Extensions, мы можем создать нечто наподобие этого:
public class HttpServer : IObservable<RequestContext>, IDisposable
{
private readonly HttpListener listener;
private readonly IObservable<RequestContext> stream;
public HttpServer(string url)
{
listener = new HttpListener();
listener.Prefixes.Add(url);
listener.Start();
stream = ObservableHttpContext();
}
private IObservable<RequestContext> ObservableHttpContext()
{
return Observable.Create<RequestContext>(obs =>
Observable.FromAsyncPattern<HttpListenerContext>(listener.BeginGetContext,
listener.EndGetContext)()
.Select(c => new RequestContext(c.Request, c.Response))
.Subscribe(obs))
.Repeat()
.Retry()
.Publish()
.RefCount();
}
public void Dispose()
{
listener.Stop();
}
public IDisposable Subscribe(IObserver<RequestContext> observer)
{
return stream.Subscribe(observer);
}
}
Некоторые замечания к данному коду:
- FromAsyncPattern – удобный метод, которые поставляются с Rx. Данный метод конвертирует сигнатуры Begin/End в IObservable
- RequestContext является легкой оберткой для работы с HttpListener. Я не собираюсь приводить здесь его код, однако Вы сможете посмотреть весь исходный код чуть позже.
- Повторюсь: если Вы когда-либо видели использование HttpListener, то уверен, вы видели код внутри while цикла. Это то же самое.
- Пробуйте еще раз: если мы получаем ошибку – тогда пробуем еще раз.
- Publish/Refcount: это поможет нам создать “теплых” наблюдателей из “холодных”. Они ведут себя наподобие “горячих”. Вы можете прочитать больше тут и тут.
Пример использования
Вы можете создавать веб-приложения любого типа, основанные на данной концепции. Приложение уровня “hello world” будет выглядеть так:
static void Main()
{
//a stream os messages
var subject = new Subject<string>();
using(var server = new HttpServer("http://*:5555/"))
{
var handler = server.Where(ctx => ctx.Request.Url.EndsWith("/hello"))
.Subscribe(ctx => ctx.Respond(new StringResponse("world")));
Console.ReadLine();
handler.Dispose();
}
}
Рекомендую, чтобы все, что Вы будете делать – было асинхронным. Например, если вы подключаетесь к базе данных, то это должно быть асинхронной операцией, и Вы должны будете удерживать вместе callbacks/observables/Tasks и т.п.
Существует еще более интересное применение, которым я бы хотел поделиться, и которое называется long polling:
Long polling является вариацией традиционной техники polling и позволяет эмулировать отправку информации от сервера клиенту. При long polling, клиент запрашивает информацию от сервера в той же манере, что и при нормальном запросе. Однако если сервер не имеет никакой доступной информации для клиента, то вместо отправки пустого ответа, сервер удерживает запрос и ждет доступности информации.
Итак, перед нами наиболее простой пример long polling, работающего через вышеприведенный код:
class Program
{
static void Main()
{
//a stream os messages
var subject = new Subject<string>();
using(var server = new HttpServer("http://*:5555/"))
{
//the listeners stream and subscription
var listeners = server
.Where(ctx => ctx.Request.HttpMethod == "GET")
.Subscribe(ctx => subject.Take(1) //wait the next message to end the request
.Subscribe(m => ctx.Respond(new StringResponse(m))));
//the publishing stream and subscrition
var publisher = server
.Where(ctx => ctx.Request.HttpMethod == "POST")
.Subscribe(ctx => ctx.Request.InputStream.ReadBytes(ctx.Request.ContentLength)
.Subscribe(bts =>
{
ctx.Respond(new EmptyResponse(201));
subject.OnNext(Encoding.UTF8.GetString(bts));
}));
Console.ReadLine();
listeners.Dispose();
publisher.Dispose();
}
}
}
Как Вы можете видеть, мы заставляем наблюдателей работать… При этом отсутствует какая-либо блокирующая операция. Даже чтение из потока – асинхронная операция.
Хотите увидеть работающий код?
Ниже приведено видео, демонстрирующее работу кода:

И, под конец, исходный код опубликован здесь под opensource, если Вы захотите углубиться в него шаг за шагом или просто изучить.
Отдельные благодарности Gustavo Machado, Silvio Massari и ребятам из Nancy framework за советы и часть кода, который я украл у них.