Последнее время приходится писать «маленькие» серверы для многопоточной обработки относительно небольших объемов данных. Хочу поделится с хабрасообществом определенным приемом в написании таких приложений.
Все задачи можно формализовать в эти 3 пункта:
1. Есть набор данных.
2. Эти данные нужно обработать (нам не важно как и что обрабатывать, главное это делать параллельно).
3. Данные постоянно поступают новые.
Для иллюстрации отдельных моментов давайте решим задачу выборки данных из множества RSS каналов.
Решение этой задачи я привел в следующем коде, который можно просто скопировать и запустить, код с комментариями, которые, надеюсь доступно описывают отдельные моменты.
Данная статья ориентирована на тех людей, кто еще не до конца понимает как работать с пространством System.Threading. Если есть какие либо вопросы, задавайте их в коментариях, постараюсь ответить.
Все задачи можно формализовать в эти 3 пункта:
1. Есть набор данных.
2. Эти данные нужно обработать (нам не важно как и что обрабатывать, главное это делать параллельно).
3. Данные постоянно поступают новые.
Для иллюстрации отдельных моментов давайте решим задачу выборки данных из множества RSS каналов.
Решение этой задачи я привел в следующем коде, который можно просто скопировать и запустить, код с комментариями, которые, надеюсь доступно описывают отдельные моменты.
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Net;
using System.IO;
namespace ConsoleApplication1
{
class Program
{
internal class RSSServer
{
//Главный поток для выборки данных на обработку
private Thread _server;
//Переменная контролирующая главный цикл выборки данных
private bool _start = false;
//Результирующий список с данными RSS каналов
public List<string> ResultData
{
get;
set;
}
#region Счетчики для контроля установки событий
private int _countAll = 0; //Количество выбранных данных для обработки
private int _countEnd = 0; //Количество обработанных данных в текущей выборке
#endregion
//Событие с ручным сбросом - самый главный элемент для синхронизации потока выборки новых данных и обработки уже имеющихся
private ManualResetEvent _mre = new ManualResetEvent(false);
/// <summary>
/// Этот метод будет работать в главном потоке
/// </summary>
private void GetTask()
{
string[] _rssURLs;
/* Используем "бесконечный" цикл, этот прием наоболее оптимален в таких задачах */
while (_start)
{
_rssURLs = GetURLs(); //Получаем данные из источника
if (_rssURLs.Length > 0) //Проверка на то, что данные для обработки были получены
{
_mre.Reset(); //Сбрасываем событие, теперь пока оно не будет установлено, поток заснет при вызове метода WaitOne
_countAll = _rssURLs.Length; //Здесь нам сихронизация не нужна, т.к. значение будет изменяться только в этом потоке
_countEnd = 0; //Здесь тоже не синхронизируем переменную, т.к. она будет установлена в 0, уже после того как все остальные потоки ее меняющие уже закончат работу
ResultData = new List<string>(_countAll);
foreach (string s in _rssURLs) //Начинаем обработку данных каждый экземпляр отдельно.
{
ProccessingRSS(s);
}
_mre.WaitOne(); // Теперь ждем до тех пор, пока у нас не обработаются элементы данных
foreach (string x in ResultData)
{
Console.WriteLine(x);
}
/* В этом примере принудительно прерывается выполнение цикла,
* т.к. иначе данные в ResultData будут просто перезаписываться многократно
*
*/
break;
}
else
{
/* Здесь используется небольшая задержка в случае, если данных для обработки еще не поступило.
* 200 милисекунд здесь выбрано произвольно, если у вас данных не будет продолжительное время, то конструкция выше
* просто начнет нагружать процессоры практически на 50%, если не больше. Можете убедится убрав строчку ниже.
* В реальности данная задержка должна вычисляться в зависимости от различных параметров, я обычно использую алгоритм
* который после каждого пустого запроса ждет данных на 1 секунду дольше, это ограничено сверху определенным числом,
* которое либо устанавливается через параметры (предпочтительный вариант), либо сразу зашивается в код.
*/
Thread.Sleep(200);
}
}
}
private void ProccessingRSS(string _rssURL)
{
/*
* Вот здесь начинается самое интересное. Для обработки каждой порции данных используется встроенный пул потоков.
*/
ThreadPool.QueueUserWorkItem(new WaitCallback(ProcRSS), _rssURL);
}
private void ProcRSS(object rss)
{
//Весь метод помещаем в try
try
{
string _rss = (string)rss;
HttpWebRequest hwr = (HttpWebRequest)HttpWebRequest.Create(_rss);
HttpWebResponse hwrr = (HttpWebResponse)hwr.GetResponse();
string response = (new StreamReader(hwrr.GetResponseStream())).ReadToEnd();
//Обязательно блокируем объект, в который будет помещаться результат
lock (ResultData)
{
ResultData.Add(response);
}
}
catch
{
//Игнорируем все исключения (в реальных задачах этого делать нельзя)
}
finally
{
Interlocked.Increment(ref _countEnd); //Увеличиваем переменную, специальным методом, т.к. она будет увеличиваться в различных потоках
if (_countEnd >= _countAll) //Если это был последний обрабатываемый поток, то устанавливаем событие, и в главном потоке начинается новая выборка данных
_mre.Set();
}
}
private string[] GetURLs()
{
return new string[2] { "http://habrahabr.ru/rss/", "http://www.cbr.ru/scripts/RssPress.asp" }; //Источник данных неважен, для примера это просто массив адресов.
}
public void StartServer()
{
_server = new Thread(new ThreadStart(GetTask));
//Если у потока будет установлено это свойство в true, то это поток завершится,
//если завершится работа главного потока, иначе возможно, что поток продолжит свою работу,
//даже если приложени будет "якобы" закрыто
_server.IsBackground = true;
//Это имя будет видно в отладчике,
//поэтому рекомендую всегда давать имя потокам, которые создаются явно
_server.Name = "GetTaskThread";
//Если поток еще не стартовал, то стартуем его
if ((_server.ThreadState & ThreadState.Unstarted) == ThreadState.Unstarted)
{
_start = true; //Устанавливаем свойство в true, что бы выборка данных была постоянной
_server.Start();
}
}
public void StopServer()
{
_start = false;
}
}
static void Main(string[] args)
{
RSSServer _rServer = new RSSServer();
_rServer.StartServer();
Console.Read();
_rServer.StopServer();
}
}
}
* This source code was highlighted with Source Code Highlighter.
Данная статья ориентирована на тех людей, кто еще не до конца понимает как работать с пространством System.Threading. Если есть какие либо вопросы, задавайте их в коментариях, постараюсь ответить.