Практика использования пространства System.Threading при написании многопоточных приложений в .NET.

    Последнее время приходится писать «маленькие» серверы для многопоточной обработки относительно небольших объемов данных. Хочу поделится с хабрасообществом определенным приемом в написании таких приложений.

    Все задачи можно формализовать в эти 3 пункта:
    1. Есть набор данных.
    2. Эти данные нужно обработать (нам не важно как и что обрабатывать, главное это делать параллельно).
    3. Данные постоянно поступают новые.
    Для иллюстрации отдельных моментов давайте решим задачу выборки данных из множества RSS каналов.
    Решение этой задачи я привел в следующем коде, который можно просто скопировать и запустить, код с комментариями, которые, надеюсь доступно описывают отдельные моменты.

    using System;
    using System.Collections.Generic;
    using System.Text;
    using System.Threading;
    using System.Net;
    using System.IO;

    namespace ConsoleApplication1
    {
     class Program
     {
      internal class RSSServer
      {
       //Главный поток для выборки данных на обработку
       private Thread _server;

       //Переменная контролирующая главный цикл выборки данных
       private bool _start = false;

       //Результирующий список с данными RSS каналов
       public List<string> ResultData
       {
        get;
        set;
       }

       #region Счетчики для контроля установки событий

       private int _countAll = 0; //Количество выбранных данных для обработки
       private int _countEnd = 0; //Количество обработанных данных в текущей выборке

       #endregion

       //Событие с ручным сбросом - самый главный элемент для синхронизации потока выборки новых данных и обработки уже имеющихся
       private ManualResetEvent _mre = new ManualResetEvent(false);

       /// <summary>
       /// Этот метод будет работать в главном потоке
       /// </summary>
       private void GetTask()
       {
        string[] _rssURLs;

        /* Используем "бесконечный" цикл, этот прием наоболее оптимален в таких задачах */
        while (_start)
        {
         _rssURLs = GetURLs(); //Получаем данные из источника
         if (_rssURLs.Length > 0) //Проверка на то, что данные для обработки были получены
         {
          _mre.Reset(); //Сбрасываем событие, теперь пока оно не будет установлено, поток заснет при вызове метода WaitOne
          _countAll = _rssURLs.Length; //Здесь нам сихронизация не нужна, т.к. значение будет изменяться только в этом потоке
          _countEnd = 0; //Здесь тоже не синхронизируем переменную, т.к. она будет установлена в 0, уже после того как все остальные потоки ее меняющие уже закончат работу
          ResultData = new List<string>(_countAll);
          foreach (string s in _rssURLs) //Начинаем обработку данных каждый экземпляр отдельно.
          {
           ProccessingRSS(s);
          }
          _mre.WaitOne(); // Теперь ждем до тех пор, пока у нас не обработаются элементы данных
          foreach (string x in ResultData)
          {
           Console.WriteLine(x);
          }
          /* В этом примере принудительно прерывается выполнение цикла,
           * т.к. иначе данные в ResultData будут просто перезаписываться многократно
           *
           */
          break;
         }
         else
         {
          /* Здесь используется небольшая задержка в случае, если данных для обработки еще не поступило.
           * 200 милисекунд здесь выбрано произвольно, если у вас данных не будет продолжительное время, то конструкция выше
           * просто начнет нагружать процессоры практически на 50%, если не больше. Можете убедится убрав строчку ниже.
           * В реальности данная задержка должна вычисляться в зависимости от различных параметров, я обычно использую алгоритм
           * который после каждого пустого запроса ждет данных на 1 секунду дольше, это ограничено сверху определенным числом,
           * которое либо устанавливается через параметры (предпочтительный вариант), либо сразу зашивается в код.
          */
          Thread.Sleep(200);
         }

        }
       }

       private void ProccessingRSS(string _rssURL)
       {
        /*
         * Вот здесь начинается самое интересное. Для обработки каждой порции данных используется встроенный пул потоков.
         */
        ThreadPool.QueueUserWorkItem(new WaitCallback(ProcRSS), _rssURL);
       }

       private void ProcRSS(object rss)
       {
        //Весь метод помещаем в try
        try
        {
         string _rss = (string)rss;
         HttpWebRequest hwr = (HttpWebRequest)HttpWebRequest.Create(_rss);
         HttpWebResponse hwrr = (HttpWebResponse)hwr.GetResponse();
         string response = (new StreamReader(hwrr.GetResponseStream())).ReadToEnd();
         //Обязательно блокируем объект, в который будет помещаться результат
         lock (ResultData)
         {
          ResultData.Add(response);
         }
        }
        catch
        {
         //Игнорируем все исключения (в реальных задачах этого делать нельзя)
        }
        finally
        {
         Interlocked.Increment(ref _countEnd); //Увеличиваем переменную, специальным методом, т.к. она будет увеличиваться в различных потоках
         if (_countEnd >= _countAll) //Если это был последний обрабатываемый поток, то устанавливаем событие, и в главном потоке начинается новая выборка данных
          _mre.Set();
        }
       }

       private string[] GetURLs()
       {
        return new string[2] { "http://habrahabr.ru/rss/", "http://www.cbr.ru/scripts/RssPress.asp" }; //Источник данных неважен, для примера это просто массив адресов.
       }

       public void StartServer()
       {
        _server = new Thread(new ThreadStart(GetTask));

        //Если у потока будет установлено это свойство в true, то это поток завершится,
        //если завершится работа главного потока, иначе возможно, что поток продолжит свою работу,
        //даже если приложени будет "якобы" закрыто
        _server.IsBackground = true;

        //Это имя будет видно в отладчике,
        //поэтому рекомендую всегда давать имя потокам, которые создаются явно
        _server.Name = "GetTaskThread";

        //Если поток еще не стартовал, то стартуем его
        if ((_server.ThreadState & ThreadState.Unstarted) == ThreadState.Unstarted)
        {
         _start = true; //Устанавливаем свойство в true, что бы выборка данных была постоянной
         _server.Start();
        }
       }

       public void StopServer()
       {
        _start = false;
       }
      }

      static void Main(string[] args)
      {
       RSSServer _rServer = new RSSServer();
       _rServer.StartServer();
       Console.Read();
       _rServer.StopServer();
      }
     }
    }

    * This source code was highlighted with Source Code Highlighter.


    Данная статья ориентирована на тех людей, кто еще не до конца понимает как работать с пространством System.Threading. Если есть какие либо вопросы, задавайте их в коментариях, постараюсь ответить.

    AdBlock похитил этот баннер, но баннеры не зубы — отрастут

    Подробнее
    Реклама

    Комментарии 18

      +6
      автор вам, конечно, спасибо
      но это не статья это исходный код с комментариями :)

      один вопрос, может быть для данной задачи имеет смысл сделать загрузку фидов по таймеру (скажем раз в минуту), а не в бесконечном цикле с паузой потока? я спросил исключительно из-за фразы «наоболее оптимален в таких задачах», интересно почему наимболее оптимален?
        0
        Я все-таки видимо не удачный пример взял… обычно такую схему я использую для «уведомления» Web-сервисов, о наступлении каких-то событий в БД (новые данные, который нужно обработать и т.п.). Можно использовать и таймер, здесь просто пример циклом, как наиболее распространенный подход (которые я встречал).

        А по поводу исходного кода я отметил отдельно, что в коментариях рассказывается как используются определенные элементы.
          +1
          Для «слежения» за бд есть штука — Sqldependency.
            0
            А еще есть Web-сервисы, с которых я получаю данные… Источник данных не важен, важно что данные там могут появится в любую минуту, и должны быть почти сразу обработаны. С другой стороны постоянные запросы к Web серверам, например, создадут ненужную нагрузку, но такие моменты можно всегда регулировать.
        +1
        Как нормально обрабатывать интерфейс во время работы, например, цикла из System.Threading?
        Только запускать последний в фоновом потоке?
          0
          Тут задача решается другими способами. Я для этого обычно использую асинхронный вызов делегата, если нужно выполнить свой метод, либо использую методы для асинхронных вызовов в классе, которые их предоставляет.
          +3
          А теперь с использованием Parallel Extensions CTP:
          using System;
          using System.IO;
          using System.Linq;
          using System.Net;
          using System.Threading;

          namespace ParallelRSS
          {
            class Program
            {
              private static string ProcessRSS(string feedUrl)
              {
                HttpWebRequest request = (HttpWebRequest)WebRequest.Create(feedUrl);
                var response = request.GetResponse();
                return (new StreamReader(response.GetResponseStream())).ReadToEnd();
              }

              private static string[] GetURLs()
              {
                return new string[] {
                  "habrahabr.ru/rss/",
                  "www.cbr.ru/scripts/RssPress.asp"
                };
              }

              static void Main(string[] args)
              {
                try
                {
                  var data = GetURLs().AsParallel().Select(url => ProcessRSS(url));
                  foreach (string x in data)
                  {
                    Console.WriteLine(x);
                  }
                }
                catch (AggregateException e)
                {
                  
                }
                Console.Read();
              }
            }
          }

          * This source code was highlighted with Source Code Highlighter.

          Я не профессиональный .NET-разработчик, и код вполне может содержать фатальные недостатки по сравнению с примером в статье. Но концепция мне очень нравится :-)
            –1
            LINQ для процессов? 8-))) Хахаха, это похоже на MSStyle 8-). Прикольная концепция. Но не очень проста для понимания.
              +2
              Симпатично… по моему все понятно.
              0
              Для подобных задач намного лучше подходить ThreadPool. Когда потоки простые и однотипные, вы просто создаёте ThreadPool и размещаете в нём задания. Он сам парралелит на оптимальное или указанное число потоков ваши задания, и вам не надо следить за количеством тредов. + ресайклинг.

              Подробно в MSDN

              Вот собственно говоря, оттуда самый простой пример работы.

              class MainApp
              {
                static void Main()
                {
                 for(int i=0;i<30;i++)
                 {
                   ThreadPool.QueueUserWorkItem(new WaitCallback(PoolFunc));
                 }
                 Console.ReadLine();
                }

                static void PoolFunc(object state)
                {
                 int workerThreads,completionPortThreads;
                 ThreadPool.GetAvailableThreads(out workerThreads,
                   out completionPortThreads);
                 Console.WriteLine("WorkerThreads: {0}, CompletionPortThreads: {1}",
                   workerThreads, completionPortThreads);

                 Thread.Sleep(15000);
                 ConnectionSocket connection = new ConnectionSocket();
                 connection.Connect();
                }
              }


              * This source code was highlighted with Source Code Highlighter.

                +1
                Так вы видимо не заметили, что для обработки каждого элемента данных и используется ThreadPool. Отдельный поток только для того, что бы не блокировался основной поток приложения.
                  0
                  Ну, вот как-то странно выходит. Главный поток не поточить через него?
                    0
                    Блин, из за вас теперь сижу в студии, и думаю, как оптимизировать 8-)))
                      0
                      С технической точки зрения — всё правильно. Спорно конечно про бесконечный цикл, но на это можно сейчас забить. Есть объекты синхронизации, правильные инкременты. Но, вот просто мне думается, что это можно как-то упростить.
                        0
                        Использование ThreadPool не приведёт к блокировке основного потока.
                        Думаю, лучше использовать System.Threading.Timer в качестве альтернативы вашему основному потоку. Всё же выглядит лучше, чем while (true).
                          0
                          ThreadPool не блокирует, а вот постоянная выборка данных блокирует, именно она вынесена в отдельный поток.

                          Можно использовать Timer, но тогда нельзя будет использовать собственные алгоритмы задержки… А менять постоянно число таймера… какой смысл тогда в нем? У меня например есть задача проверки обработанных данных на удаленном Web сервисе, сначало через 1 минуту, затем через 5 минут, потом через 10 минут, потом снова через 1 минуту, потом еще через 30 секунд… Думаете с таймером будет удобнее?
                            0
                            Всё зависит от задачи. В приведённом коде таймер оптимальней.
                      +1
                      Кстати для синхронизации доступа к List стоит использовать SyncRoot:
                      lock (ResultData.SyncRoot)
                      {
                      ResultData.Add(response);
                      }

                      Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

                      Самое читаемое