Достаточно большое название? Да? В этом посте я покажу Вам альтернативный подход в создании простого событийно-ориентированного HTTP-сервера на C#, используя мощь Reactive Extensions.
Я не очень хорош в объяснениях, поэтому процитирую очень интересную статью от Dan York о событийной модели node.js:
Много вещей происходит вокруг этого в экосистеме .NET:
Используя класс HttpListener и Reactive Extensions, мы можем создать нечто наподобие этого:
Некоторые замечания к данному коду:
Вы можете создавать веб-приложения любого типа, основанные на данной концепции. Приложение уровня “hello world” будет выглядеть так:
Рекомендую, чтобы все, что Вы будете делать – было асинхронным. Например, если вы подключаетесь к базе данных, то это должно быть асинхронной операцией, и Вы должны будете удерживать вместе callbacks/observables/Tasks и т.п.
Существует еще более интересное применение, которым я бы хотел поделиться, и которое называется long polling:
Итак, перед нами наиболее простой пример long polling, работающего через вышеприведенный код:
Как Вы можете видеть, мы заставляем наблюдателей работать… При этом отсутствует какая-либо блокирующая операция. Даже чтение из потока – асинхронная операция.
Ниже приведено видео, демонстрирующее работу кода:

И, под конец, исходный код опубликован здесь под opensource, если Вы захотите углубиться в него шаг за шагом или просто изучить.
Отдельные благодарности Gustavo Machado, Silvio Massari и ребятам из Nancy framework за советы и часть кода, который я украл у них.
Введение
Я не очень хорош в объяснениях, поэтому процитирую очень интересную статью от Dan York о событийной модели node.js:
“Традиционный” режим веб-серверов всегда был основан на модели потоков. Когда Вы запускаете Apache или любой другой веб-сервер, он начинает принимать подключения. Когда он принимает подключение, то держит данное подключение открытым до тех пор, пока не закончит обработку страницы, либо другой транзакции. Если чтение страницы с диска или запись результатов в базу данных занимает несколько микросекунд, то веб-сервер блокируется для операций ввода/вывода. (Это именуется как “блокирующее I/O”). Для масштабирования такого типа серверов, Вам потребуется запустить дополнительные копии самого сервера (именуется как “на основе потоков”, т.к. каждая копия обычно требует дополнительный поток операционной системы).
В противоположность этому, Node.JS, использует событийно-ориентированную модель, при которой веб-сервер принимает запросы, быстро ставит их на обработку, затем принимается за следующий запрос. Когда изначальный запрос завершен, то он возвращается в очередь обработки и когда достигает конца очереди, результаты возвращаются обратно (или выполняется все, что потребует следующее действие). Данная модель весьма эффективна и масштабируема, потому что веб-сервер обычно всегда принимает запросы, т.к. не ждет завершения ни одной операции чтения или записи. (Данный метод называется как “неблокирующее I/O” или “событийно-ориентированное I/O”).
Что происходит в мире .NET?
Много вещей происходит вокруг этого в экосистеме .NET:
- Manos de mono (не для .NET, но близок к нему) был создан не так давно, следуя данной концепции
- Node.Net — имплементация Node.JS для среды выполнения .Net, используя JScript.Net
- Kayak – асинхронный HTTP-сервер, написанный на C#
- Frank является клоном Sinatra, написанный на F#
- Nancy будет поддерживать асинхронные обработчики скоро
Альтернативный подход
Используя класс HttpListener и Reactive Extensions, мы можем создать нечто наподобие этого:
public class HttpServer : IObservable<RequestContext>, IDisposable { private readonly HttpListener listener; private readonly IObservable<RequestContext> stream; public HttpServer(string url) { listener = new HttpListener(); listener.Prefixes.Add(url); listener.Start(); stream = ObservableHttpContext(); } private IObservable<RequestContext> ObservableHttpContext() { return Observable.Create<RequestContext>(obs => Observable.FromAsyncPattern<HttpListenerContext>(listener.BeginGetContext, listener.EndGetContext)() .Select(c => new RequestContext(c.Request, c.Response)) .Subscribe(obs)) .Repeat() .Retry() .Publish() .RefCount(); } public void Dispose() { listener.Stop(); } public IDisposable Subscribe(IObserver<RequestContext> observer) { return stream.Subscribe(observer); } }
Некоторые замечания к данному коду:
- FromAsyncPattern – удобный метод, которые поставляются с Rx. Данный метод конвертирует сигнатуры Begin/End в IObservable
- RequestContext является легкой оберткой для работы с HttpListener. Я не собираюсь приводить здесь его код, однако Вы сможете посмотреть весь исходный код чуть позже.
- Повторюсь: если Вы когда-либо видели использование HttpListener, то уверен, вы видели код внутри while цикла. Это то же самое.
- Пробуйте еще раз: если мы получаем ошибку – тогда пробуем еще раз.
- Publish/Refcount: это поможет нам создать “теплых” наблюдателей из “холодных”. Они ведут себя наподобие “горячих”. Вы можете прочитать больше тут и тут.
Пример использования
Вы можете создавать веб-приложения любого типа, основанные на данной концепции. Приложение уровня “hello world” будет выглядеть так:
static void Main() { //a stream os messages var subject = new Subject<string>(); using(var server = new HttpServer("http://*:5555/")) { var handler = server.Where(ctx => ctx.Request.Url.EndsWith("/hello")) .Subscribe(ctx => ctx.Respond(new StringResponse("world"))); Console.ReadLine(); handler.Dispose(); } }
Рекомендую, чтобы все, что Вы будете делать – было асинхронным. Например, если вы подключаетесь к базе данных, то это должно быть асинхронной операцией, и Вы должны будете удерживать вместе callbacks/observables/Tasks и т.п.
Существует еще более интересное применение, которым я бы хотел поделиться, и которое называется long polling:
Long polling является вариацией традиционной техники polling и позволяет эмулировать отправку информации от сервера клиенту. При long polling, клиент запрашивает информацию от сервера в той же манере, что и при нормальном запросе. Однако если сервер не имеет никакой доступной информации для клиента, то вместо отправки пустого ответа, сервер удерживает запрос и ждет доступности информации.
Итак, перед нами наиболее простой пример long polling, работающего через вышеприведенный код:
class Program { static void Main() { //a stream os messages var subject = new Subject<string>(); using(var server = new HttpServer("http://*:5555/")) { //the listeners stream and subscription var listeners = server .Where(ctx => ctx.Request.HttpMethod == "GET") .Subscribe(ctx => subject.Take(1) //wait the next message to end the request .Subscribe(m => ctx.Respond(new StringResponse(m)))); //the publishing stream and subscrition var publisher = server .Where(ctx => ctx.Request.HttpMethod == "POST") .Subscribe(ctx => ctx.Request.InputStream.ReadBytes(ctx.Request.ContentLength) .Subscribe(bts => { ctx.Respond(new EmptyResponse(201)); subject.OnNext(Encoding.UTF8.GetString(bts)); })); Console.ReadLine(); listeners.Dispose(); publisher.Dispose(); } } }
Как Вы можете видеть, мы заставляем наблюдателей работать… При этом отсутствует какая-либо блокирующая операция. Даже чтение из потока – асинхронная операция.
Хотите увидеть работающий код?
Ниже приведено видео, демонстрирующее работу кода:

И, под конец, исходный код опубликован здесь под opensource, если Вы захотите углубиться в него шаг за шагом или просто изучить.
Отдельные благодарности Gustavo Machado, Silvio Massari и ребятам из Nancy framework за советы и часть кода, который я украл у них.
