UPDATE: Я не мог отказать себе в удовольствии исправить свой ошибочный код. Добавил раздел «Работа над ошибками», в котором привожу исправленный код и описание исправлений, основанное на полученных комментариях.
Это скорее пост-вопрос к специалистам, нежели просто кусок полезной информации. Приглашаю к дискуссии.
Недавно я имел счастье послать своё резюме в Связной на позицию .NET разработчика. В ответ меня попросили сделать тестовое задание на знание многопоточности. Я не могу назвать себя экспертом в этой области, но, тем не менее, прекрасно понял, как мне показалось, как реализовать следующие требования:
Требуется реализация класса на языке C#, аналогичного FixedThreadPool в Java, со следующими требованиями:
Поскольку в задании не было сказано какими примитивами я должен пользоваться, должен ли сделать всё на простейших Thread или же использовать ThreadPool, TPL и т.п., я решил, что задание предполагает использование самых базовых элементов: Thread, ManualResetEvents и т.п. Написал за несколько часов, отослал. Сегодня позвонил и получил ответ через кадровика, который звучал примерно так: «это даже не близко к том, что надо». Это меня озадачило, ибо код работает и протестирован, явных огрехов, на мой взгляд нету.
Итак, на ваш суд представляю мою реализацию FixedThreadPool и сопутствующих классов. Сразу предупреждаю, что, по их мнению реализация ошибочна, и, соответственно, брать мою идею за основу не стоит. Некоторые коментарии по коду:
Спасибо большое за комментарии и конструктивную критику. Я решил, что тема будет не закрыта, если не опубликовать исправленное решение. Для начала перечислю рекомендации и опишу какие вошли в код, а какие нет, и почему.
Тестовый проект с исправлениями можно загрузить здесь. Исправленный код:
Это скорее пост-вопрос к специалистам, нежели просто кусок полезной информации. Приглашаю к дискуссии.
Недавно я имел счастье послать своё резюме в Связной на позицию .NET разработчика. В ответ меня попросили сделать тестовое задание на знание многопоточности. Я не могу назвать себя экспертом в этой области, но, тем не менее, прекрасно понял, как мне показалось, как реализовать следующие требования:
Требуется реализация класса на языке C#, аналогичного FixedThreadPool в Java, со следующими требованиями:
- В конструктор этого класса должно передаваться количество потоков, которые будут выполнять задачи.
- Интерфейс класса должен предоставлять методы: boolean execute(Task task, Priority priority) и void stop()
- Интерфейс Task должен содержать один метод: void execute(), который вызывается в произвольном потоке.
- Тип Priority — это перечисление из трёх приоритетов: HIGH, NORMAL, LOW. При этом во время выбора следующего задания из очереди действуют такие правила: на три задачи с приоритетом HIGH выполняется одна задача с приоритетом NORMAL, задачи с приоритетом LOW не выполняются, пока в очереди есть хоть одна задача с другим приоритетом.
- До вызова метода stop() задачи ставятся в очередь на выполнение и метод boolean execute(Task task, Priority priority) сразу же возвращает true, не дожидаясь завершения выполнения задачи; а после вызова stop() новые задачи не добавляются в очередь на выполнение, и метод boolean execute(Task task, Priority priority) сразу же возвращает false.
- Метод stop() ожидает завершения всех текущих задач (не очищая очередь).
Поскольку в задании не было сказано какими примитивами я должен пользоваться, должен ли сделать всё на простейших Thread или же использовать ThreadPool, TPL и т.п., я решил, что задание предполагает использование самых базовых элементов: Thread, ManualResetEvents и т.п. Написал за несколько часов, отослал. Сегодня позвонил и получил ответ через кадровика, который звучал примерно так: «это даже не близко к том, что надо». Это меня озадачило, ибо код работает и протестирован, явных огрехов, на мой взгляд нету.
Итак, на ваш суд представляю мою реализацию FixedThreadPool и сопутствующих классов. Сразу предупреждаю, что, по их мнению реализация ошибочна, и, соответственно, брать мою идею за основу не стоит. Некоторые коментарии по коду:
- я решил инкапсулиро��ать потоки задач в самом классе задачи,
- два параметра с типом ILog нужны только для тестовых целей, к основной функциональности они, понятное дело, отношения не имеют,
- весь проект, включая тестовое приложение можно загрузкить по ссылке (27 килобайт): тестовый проект на ifolder
FixedThreadPool
/// <summary> /// Пул потоков, выполняющий одновременно не более определённого количество задач с учётом их /// приоритетов. /// </summary> /// <remarks> /// Тестовый проект для компании Связной. /// </remarks> public sealed class FixedThreadPool { #region Constructors /// <summary> /// Инициализирует новый экземпляр пул потоков максимальным количеством одновременно /// выполняемых задач. /// </summary> /// <param name="aConcurrentTaskNumber"> /// Максимальное количество одновременно выполняемых задач. /// </param> /// <param name="aLog"> /// Сервис логгирования. /// </param> /// <param name="aPriorityLog"> /// Лог для вывода приоритета отобранной для выполнения задачи. /// </param> /// <exception cref="ArgumentOutOfRangeException"> /// Неправильно заданное максимальное количество одновременно выполняемых задач. /// </exception> public FixedThreadPool (int aConcurrentTaskNumber, ILog aLog = null, ILog aPriorityLog = null) { if (aConcurrentTaskNumber <= 1) { throw new ArgumentOutOfRangeException( "aConcurrentTaskNumber", "Количество одновременно выполняемых задач должно быть больше единицы."); } Log = aLog; PriorityLog = aPriorityLog; mConcurrentTaskNumber = aConcurrentTaskNumber; LogMessage("Создан поток планировщика."); Thread lTaskSchedulerThread = new Thread(TaskSchedulerLogic) {Name = "Поток планировщика."}; lTaskSchedulerThread.Start(); LogMessage("Поток планировщика запущен."); } #endregion #region Public methods /// <summary> /// Ставит задачу <paramref name="aTask"/> в очередь на выполнение с приоритетом /// <paramref name="aTaskPriority"/>. /// </summary> /// <param name="aTask"> /// Задача для постановки в очередь на выполнения.. /// </param> /// <param name="aTaskPriority"> /// Приоритет задачи. /// </param> /// <returns> /// <see langword="true"/> - задача поставлена в очередь на выполнение. /// <see langword="false"/> - задача не была поставлена в очередь на выполнение, так как /// работа пула потока была остановлена. /// </returns> /// <exception cref="ArgumentNullException"> /// Задача для постановки в очередь на выполнения не задана. /// </exception> public bool Execute(Task aTask, TaskPriority aTaskPriority) { if (aTask == null) { throw new ArgumentNullException( "aTask", "Задача для постановки в очередь на выполнения не задана."); } LogMessage("Получена новая задача для выполнения."); lock (mIsStoppedLock) { if (IsStopped) { // Запрошена остановка. LogPriority(aTaskPriority, ConsoleColor.DarkGray); // Отклонять новые задачи. return false; } } // Добавить задачу в очередь. EnqueueTask(aTask, aTaskPriority); LogMessage("Задача добавлена в очередь задач.", ConsoleColor.DarkYellow); return true; } /// <summary> /// Останавливает добавлении задач в очередь пула потоков, очередь не очищается. Возвращает /// выполнение только после окончания всех имеющихся задач в очереди. /// </summary> /// <remarks> /// После вызова этого метода дальнейшее добавление задач в очередь на выполнение не /// возможно и метод <see cref="Execute"/> будет возвращать <see langword="false"/>. /// Имеющиеся, на момент выполнения этого метода, задачи в очереди будут выполнены. /// </remarks> public void Stop() { // Выставить признак окончания работы пула. lock (mIsStoppedLock) { IsStopped = true; LogMessage("Запрошена остановка пула."); } // Дождаться окончания выполнения всех задач, оставшихся в очереди. LogMessage( "Начато ожидание завершения выполнения всех оставшихся задач в очереди.", ConsoleColor.DarkRed); lock (mTaskSchedulerLock) { // Сигнализировать об изменении в условии блокировки по mTaskSchedulerLock. Monitor.Pulse(mTaskSchedulerLock); } mPoolStoppedGate.WaitOne(); LogMessage("Дождались окончания выполнения всех задач в очереди.", ConsoleColor.DarkRed); } #endregion #region Private properties /// <summary> /// Получает/устанавливает признак того, остановлена ли работа пула. /// </summary> /// <value> /// <see langword="true"/> - работа пула остановлена, дальнейшее добавление задач в очередь /// не возможно. <see langword="false"/> - работа пула продолжается. /// </value> /// <remarks> /// Объект синхронизации доступа <see cref="mIsStoppedLock"/>. /// </remarks> private bool IsStopped { get; set; } /// <summary> /// Получает/устанавливает сервис логгирования приоритета задачи. /// </summary> private ILog PriorityLog { get; set; } /// <summary> /// Получает/устанавливает сервис логгирования сообщений. /// </summary> private ILog Log { get; set; } #endregion #region Private methods /// <summary> /// Логика работы планировщика выполнения задач. Выполняется в отдельном потоке. /// </summary> private void TaskSchedulerLogic() { lock (mTaskSchedulerLock) { while (true) { // Отпускаем монитор и ждём сигнала, поданного через mTaskSchedulerLock. Monitor.Wait(mTaskSchedulerLock); lock (mQueuedTasksLock) { lock (mIsStoppedLock) { lock (mRunningTasksLock) { if (IsStopped && !mRunningTasks.Any() && !mQueuedTasks.Any()) { LogMessage( "Запрошена остановка пула и больше нет задач в очереди на выполнение."); LogMessage("Планировщик - Выход из потока планировщика."); // Сигнализировать об окончании выполнения последней задачи. mPoolStoppedGate.Set(); return; } } } } lock (mQueuedTasksLock) { if (!mQueuedTasks.Any()) { // Очередь задач пуста. // Продолжить ожидание. continue; } } lock (mRunningTasksLock) { if (mRunningTasks.Count >= mConcurrentTaskNumber) { // Список выполняемых задач полон. // Продолжить ожидание. continue; } } LogMessage( "Дождались появления задачи в очереди задач.", ConsoleColor.DarkRed); // Дождаться и получить следующую задачу для выполнения. TaskListEntry lTask = DequeueTask(); LogMessage("Планировщик - Получена новая задача для выполнения."); // Подписаться на событие завершения выполнения задачи. lTask.Task.Finished += OnTaskFinished; // Добавить задачу в список выполняемых задач. lock (mRunningTasksLock) { mRunningTasks.Add(lTask); } if (lTask.TaskPriority == TaskPriority.High) { // Запущена задача с высоким приоритетом. // Увеличить значение счётчика запущенных задач с высоким приоритетом на единицу. Interlocked.Increment(ref mQueuedHighPriorityTaskCounter); } else if (lTask.TaskPriority == TaskPriority.Normal) { // Запущена задача с обычным приоритетом. // Уменьшить значение счётчика запущенных задач с высоким приоритетом на HighPriorityTaskFactor. Interlocked.Add( ref mQueuedHighPriorityTaskCounter, -HighPriorityTaskFactor); } // Запустить задачу на выполнение. lTask.Task.Execute(); LogMessage( string.Format( "Планировщик - Запущена задача с приоритетом {0}.", lTask.TaskPriority), ConsoleColor.DarkYellow); lock (mRunningTasksLock) { LogMessage( string.Format( "В списке выполняющихся задач {0} задач.", mRunningTasks.Count)); } } } } /// <summary> /// Обрабатывает событие завершения задачи. /// </summary> /// <param name="aSender"> /// Задача-источник события. /// </param> /// <param name="aEventArgs"> /// Параметры события. /// </param> private void OnTaskFinished(object aSender, EventArgs aEventArgs) { Task lSender = aSender as Task; Debug.Assert( lSender != null, "В параметре aSender задача должна передавать ссылку на себя."); // Отписаться от события завершения, чтобы задача могла быть убрана сборщиком мусора. lSender.Finished -= OnTaskFinished; // Удалить задачу из списка выполняющихся. lock (mRunningTasksLock) { // Найти и удалить закончившуюся задачу из списка выполняющихся задач. TaskListEntry lEntry = mRunningTasks.First(aEntry => aEntry.Task == lSender); mRunningTasks.Remove(lEntry); LogMessage( string.Format("Задача с приоритетом {0} завершена.", lEntry.TaskPriority), ConsoleColor.Red); } lock (mTaskSchedulerLock) { // Сигнализировать об изменении в условии блокировки по mTaskSchedulerLock. Monitor.Pulse(mTaskSchedulerLock); } } /// <summary> /// Добавляет задачу <paramref name="aTask"/> в очередь задач на выполнение. /// </summary> /// <param name="aTask"> /// Задача, добавляемая в очередь задач на выполнение. /// </param> /// <param name="aTaskPriority"> /// Приоритет задачи. /// </param> private void EnqueueTask(Task aTask, TaskPriority aTaskPriority) { TaskListEntry lEntry = new TaskListEntry(aTask, aTaskPriority); LogPriority(aTaskPriority, ConsoleColor.Green); lock (mQueuedTasksLock) { // Добавить задачу в очередь задач на выполнение. mQueuedTasks.Add(lEntry); LogMessage( string.Format( "В очередь добавлена задача с приоритетом {0}", lEntry.TaskPriority), ConsoleColor.Green); // Поднять барьер доступа к очереди задач. LogMessage("Поднять барьер доступа к очереди задач.", ConsoleColor.DarkRed); } lock (mTaskSchedulerLock) { // Сигнализировать об изменении в условии блокировки по mTaskSchedulerLock. Monitor.Pulse(mTaskSchedulerLock); } } /// <summary> /// Ожидает появления в очереди задач хотя бы одной задачи. Дождавшись изымает следующую /// задачу из очереди, учитывая правила приоритетов. /// </summary> /// <returns> /// Задача, изъятая из очереди задач на выполнение. /// </returns> private TaskListEntry DequeueTask() { TaskListEntry lNextTask; lock (mQueuedTasksLock) { lNextTask = FindNextTaskUsingPriorityRules(); LogPriority(lNextTask.TaskPriority, ConsoleColor.Red); LogMessage( string.Format( "Получена задача из очереди задач с приоритетом {0}.", lNextTask.TaskPriority), ConsoleColor.DarkRed); mQueuedTasks.Remove(lNextTask); } lock (mTaskSchedulerLock) { // Сигнализировать об изменении в условии блокировки по mTaskSchedulerLock. Monitor.Pulse(mTaskSchedulerLock); } return lNextTask; } /// <summary> /// Находит следующую задачу с учётом правил приоритезации. /// </summary> /// <returns> /// Найденная следующая задача для выполнения. /// </returns> /// <remarks> /// ЗАМЕЧАНИЯ ПО ПРАВИЛАМ ПРИОРИТЕЗАЦИИ: Полученные мной правила выбора следующей задачи /// определены в задании не полностью. В частности, не было сказано что делать, если в /// очереди задач есть только задачи с обычным приоритетом. Также не определено, что нужно /// делать, если было выполнено больше, чем три задачи с высоким приоритетом. Поэтому я /// додумал эти правила, исходя из предположения, что лучше пусть выполняется хоть что-то, /// чем ждать появления задач с каким-то определённым приоритетом. /// </remarks> private TaskListEntry FindNextTaskUsingPriorityRules() { TaskListEntry lNextTask; lock (mQueuedTasksLock) { Debug.Assert( mQueuedTasks.Count > 0, "Метод FindNextTaskUsingPriorityRules не должен вызываться, если очередь задач пуста."); // По умолчанию будет выполняться задача с высоким приоритетом. TaskPriority lNextTaskPriority = TaskPriority.High; // Проверить возможность выполнения задачи с более низкими приоритетами. if (mQueuedTasks.All(aEntry => aEntry.TaskPriority == TaskPriority.Low)) { // В очереди задач все задачи с низким приоритетом. // Следующая задача будет с низким приоритетом. lNextTaskPriority = TaskPriority.Low; } else { // Условие для задач с низким приоритетом не выполняется. if (mQueuedTasks.Any( aEntry => aEntry.TaskPriority == TaskPriority.Normal) && (mQueuedTasks.All( aEntry => aEntry.TaskPriority != TaskPriority.High) || Interlocked.CompareExchange(ref mQueuedHighPriorityTaskCounter, 0, 0) >= HighPriorityTaskFactor)) { // В списке задач на выполнение есть задачи с обычным приоритетом и // выполнено достаточное количество задач с высоким приоритетом или // в очереди задач все задачи имеют приоритет ниже высокого. // Следующая задача будет с обычным приоритетом. lNextTaskPriority = TaskPriority.Normal; } } lNextTask = mQueuedTasks.First( aEntry => aEntry.TaskPriority == lNextTaskPriority); } return lNextTask; } /// <summary> /// Выводит сообщение в лог. /// </summary> /// <param name="aMessage"> /// Сообщение. /// </param> /// <param name="aColor"> /// Цвет сообщения. /// </param> private void LogMessage(string aMessage, ConsoleColor aColor = ConsoleColor.Yellow) { if (Log == null) { return; } Log.WriteMessage(aMessage, aColor); } /// <summary> /// Выводит приоритет задачи в лог. /// </summary> /// <param name="aTaskPriority"> /// Приоритет задачи. /// </param> /// <param name="aColor"> /// Цвет сообщения. /// </param> private void LogPriority(TaskPriority aTaskPriority, ConsoleColor aColor) { if (PriorityLog == null) { return; } string lPriority = aTaskPriority == TaskPriority.High ? "H" : aTaskPriority == TaskPriority.Normal ? "N" : "L"; PriorityLog.WriteMessage(lPriority, aColor); } #endregion #region Private data /// <summary> /// Количество задач с высоким приоритетом, которое должно быть поставлено в очередь до /// того, как можно будет поставить задачу с обычным приоритетом, если в очереди имеются /// задачи с высоким приоритетом. /// </summary> private const int HighPriorityTaskFactor = 3; /// <summary> /// Максимальное количество одновременно выполняемых задач. Установка значения возможна /// только в конструкторе. /// </summary> private readonly int mConcurrentTaskNumber; /// <summary> /// Объект синхронизации доступа к свойству <see cref="IsStopped"/>. /// </summary> private readonly object mIsStoppedLock = new object(); /// <summary> /// Барьер остановки работы пула. Поднимается, когда запрошена остановка и все задачи /// завершили своё выполнение. /// </summary> private readonly ManualResetEvent mPoolStoppedGate = new ManualResetEvent(false); /// <summary> /// Список задач, поставленных в очередь на выполнение. /// </summary> /// <remarks> /// Объект синхронизации доступа <see cref="mQueuedTasksLock"/>. /// </remarks> private readonly IList<TaskListEntry> mQueuedTasks = new List<TaskListEntry>(); /// <summary> /// Объект синхронизации доступа к списку задач <see cref="mQueuedTasks"/>. /// </summary> private readonly object mQueuedTasksLock = new object(); /// <summary> /// Список выполняющихся в настоящий момент задач. /// </summary> /// <remarks> /// Объект синхронизации доступа <see cref="mRunningTasksLock"/>. /// </remarks> private readonly IList<TaskListEntry> mRunningTasks = new List<TaskListEntry>(); /// <summary> /// Объект синхронизации доступа к <see cref="mRunningTasks"/>. /// </summary> private readonly object mRunningTasksLock = new object(); /// <summary> /// Объект синхронизации, используемый для блокировки/запуска потока планировщика. /// </summary> private readonly object mTaskSchedulerLock = new object(); /// <summary> /// Счётчик задач с высоким приоритетом, запущенных на выполнение. Каждая запущенная задача /// с высоким приоритетом увеличивает это значение на единицу, каждая запущенная задача с /// обычным приоритетом уменьшает это значение на <see cref="HighPriorityTaskFactor"/>. /// </summary> /// <remarks> /// Синхронизация доступа должна выполняться по средствам использования методов класса /// <see cref="Interlocked"/>. /// </remarks> private int mQueuedHighPriorityTaskCounter; #endregion #region Nested type: TaskListEntry /// <summary> /// Элемент списка задач. /// </summary> /// <remarks> /// Объекты после создания не изменяемы. /// </remarks> private struct TaskListEntry { #region Constructors /// <summary> /// Инициализирует новый экземпляр элемента списка задач задачей и её приоритетом. /// </summary> /// <param name="aTask"> /// Задача. /// </param> /// <param name="aTaskPriority"> /// Приоритет задачи. /// </param> public TaskListEntry(Task aTask, TaskPriority aTaskPriority) { mTask = aTask; mTaskPriority = aTaskPriority; } #endregion #region Public properties /// <summary> /// Задача. /// </summary> public Task Task { get { return mTask; } } /// <summary> /// Приоритет задачи. /// </summary> public TaskPriority TaskPriority { get { return mTaskPriority; } } #endregion #region Private data private readonly Task mTask; private readonly TaskPriority mTaskPriority; #endregion } #endregion }
Task
/// <summary> /// Задача для выполнения в <see cref="FixedThreadPool1"/>. /// </summary> public class Task { #region Constructors /// <summary> /// Инициализирует новый экземпляр задачи для выполнения в <see cref="FixedThreadPool1"/> /// делегатом тела задачи. /// </summary> /// <param name="aTaskBody"> /// Делегат тела задачи. /// </param> /// <exception cref="ArgumentNullException"> /// Делегат тела задачи не задан. /// </exception> public Task(Action aTaskBody) { if (aTaskBody == null) { throw new ArgumentNullException("aTaskBody", "Делегат тела задачи не задан."); } TaskBody = aTaskBody; } #endregion #region Public properties /// <summary> /// Получает/устанавливает делегат тела задачи. /// </summary> public Action TaskBody { get; private set; } #endregion #region Events /// <summary> /// Событие, сообщающее о завершении выполнения задачи. /// </summary> public event EventHandler Finished; #endregion #region Public methods /// <summary> /// Начинает выполнение задачи. /// </summary> public void Execute() { Thread lTaskThread = new Thread( () => { // Выполнить задачу. TaskBody(); // Уведомить об её окончании. EventHandler lFinished = Finished; if (lFinished != null) { lFinished(this, EventArgs.Empty); } }) {Name = "Task thread."}; lTaskThread.Start(); } #endregion }
TaskPriority
/// <summary> /// Приоритет задачи. /// </summary> public enum TaskPriority { /// <summary> /// Высокий приоритет. /// </summary> High = 0, /// <summary> /// Обычный приоритет. /// </summary> Normal, /// <summary> /// Низкий приоритет. /// </summary> Low }
Работа над ошибками
Спасибо большое за комментарии и конструктивную критику. Я решил, что тема будет не закрыта, если не опубликовать исправленное решение. Для начала перечислю рекомендации и опишу какие вошли в код, а какие нет, и почему.
- Создание потока — операция очень затратная. Пул потоков нужен для того, чтобы хранить в нём несколько уже созданных потоков. Это было моей главной ошибкой и, судя по всему, именно это вызвало столь негативную реакцию со стороны технического специалиста, проверяющего моё решение. Я просто слишком сконцентрировался на условиях задачи и проглядел требование, которое следовало из самого названия класса. Спасибо, Unrul, iaroshenko. Теперь я создаю необходимое количество потоков в конструкторе пула. Класс Task больше не занимается созданием потоков.
- Поток планировщика не нужен. Действительно, после того, как создание потока было перенесено из класса задачи в класс пула, стало возможно переложить функцию извлечения задач из очереди на сами потоки. Спасибо, iaroshenko, romik.
- В интерфейсе Task не должно быть event. Поскольку класс задачи больше не занимается созданием потоков, событие окончания обработки задачи теперь действительно не нужен. Спасибо, Nagg.
- Блокировку на IsStopped можно заменить обычным volatile поле. С этим я не согласен. Несмотря на утверждение MSDN о том, что благодаря модификатору volatile читающий значение такого поля код будет получать самое актуальное значение, это не так. На самом деле, если говорить упрощённо, volatile позволяет гарантировать то, что компилятор, CLR и процессор не поменяют местами чтение/запись в volatile поле и другие поля. Более подробно можно почитать здесь. Однако в моём случае блокировка IsStopped нужна для другого. Без неё потенциально возможно, что в очередь попадёт новая задача после фактической остановки пула, а также возможен случай, когда поток задачи останется в состоянии ожидания, не смотря на то, что был дан сигнал завершения. В любом случае, небольшая перестраховка в многопоточном коде никогда не повредит. Это лучше, чем потом ловить нерегулярные ошибки. Не стану с пеной у рта защищать это решение. Если кто сможет опровергнуть мой довод на основе кода, буду признателен. Спасибо, elw00d.
- Очередь с приоритетами реализовать отдельно от логики собственно ThreadPool'а. Этого я тоже не стал делать. Фактически пул содержит не так много кода, там больше комментариев и логгирования. Но, тем не менее, основная причина не в этом. Представьте, что условие отбора изменилось. Или же нужно обеспечить возможность настройки принципа выбора следующей задачи в момент создания экземпляра пула. Придётся создавать новый класс сортированной коллекции (очереди) и, соответственно, плодить по числу классов коллекций классы пулов. Это неправильное решение. В данном случае будет лучше вынести алгоритм выбора следующей задачи в отдельный класс, реализовав шаблон Strategy. Объект «отборщика» в таком случае будет передаваться в конструктор. Но я не буду усложнять своё решение, так как по условию задачи это не требуется. Спасибо, elw00d.
- Interlocked-методы тоже неясно зачем (если уже используются мониторы и event'ы). Эту часть я тоже не стал трогать. Interlocked-методы позволяют избежать нарушение условий отбора задач с приоритетами High и Normal. Поскольку нет блокировок на доступ к счётчику mQueuedHighPriorityTaskCounter я предпочёл на всякий случай перестраховаться, использовав Interlocked-методы. Возможно более глубокий анализ кода выявит мою неправоту, но как я уже говорил, я не самый большой специалист в многопоточности и предпочитаю подстелить соломку, тем более когда она стоит дёшево. Спасибо, elw00d.
- Было бы очень интересно, если бы автор решил озвученные в комментариях недочеты и снова отправил в Связной обновленную версию решения. К коду это конечно имеет лишь опосредованное отношение, но тем не менее. Думаю, я не стану отправлять исправленное решение в Связной. Это будет нечестно, поскольку я воспользовался вашей помощью. Скорее всего, я отправлю им ссылку на эту статью, а там пусть сами решают нужен ли им «уже готовый специалист» или подходит человек, который умеет признавать свои ошибки и развиваться, двигаться дальше. Спасибо за поддержку, @ARC_Programmer.
Тестовый проект с исправлениями можно загрузить здесь. Исправленный код:
FixedThreadPool
// Проект: Eshva.Threading // Имя файла: FixedThreadPool.cs // GUID файла: 7F1EECB7-F28A-4A20-9536-26D174BCD437 // Автор: Mike Eshva (mike@eshva.ru) // Дата создания: 04.06.2012 using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Threading; namespace Eshva.Threading.Framework { /// <summary> /// Пул потоков, выполняющий одновременно не более определённого количество задач с учётом их /// приоритетов. /// </summary> /// <remarks> /// Тестовый проект для компании Связной. /// </remarks> public sealed class FixedThreadPool { #region Constructors /// <summary> /// Инициализирует новый экземпляр пул потоков максимальным количеством одновременно /// выполняемых задач. /// </summary> /// <param name="aConcurrentTaskNumber"> /// Максимальное количество одновременно выполняемых задач. /// </param> /// <param name="aLog"> /// Сервис логгирования. /// </param> /// <param name="aPriorityLog"> /// Лог для вывода приоритета отобранной для выполнения задачи. /// </param> /// <exception cref="ArgumentOutOfRangeException"> /// Неправильно заданное максимальное количество одновременно выполняемых задач. /// </exception> public FixedThreadPool (int aConcurrentTaskNumber, ILog aLog = null, ILog aPriorityLog = null) { if (aConcurrentTaskNumber <= 1) { throw new ArgumentOutOfRangeException( "aConcurrentTaskNumber", "Количество одновременно выполняемых задач должно быть больше единицы."); } Log = aLog; PriorityLog = aPriorityLog; for (int lThreadIndex = 0; lThreadIndex < aConcurrentTaskNumber; lThreadIndex++) { string lThreadName = string.Format("Task thread #{0}", lThreadIndex); Thread lTaskThread = new Thread(TaskThreadLogic) {Name = lThreadName}; lTaskThread.Start(); } } #endregion #region Public methods /// <summary> /// Ставит задачу <paramref name="aTask"/> в очередь на выполнение с приоритетом /// <paramref name="aTaskPriority"/>. /// </summary> /// <param name="aTask"> /// Задача для постановки в очередь на выполнения.. /// </param> /// <param name="aTaskPriority"> /// Приоритет задачи. /// </param> /// <returns> /// <see langword="true"/> - задача поставлена в очередь на выполнение. /// <see langword="false"/> - задача не была поставлена в очередь на выполнение, так как /// работа пула потока была остановлена. /// </returns> /// <exception cref="ArgumentNullException"> /// Задача для постановки в очередь на выполнения не задана. /// </exception> public bool Execute(Task aTask, TaskPriority aTaskPriority) { if (aTask == null) { throw new ArgumentNullException( "aTask", "Задача для постановки в очередь на выполнения не задана."); } LogMessage("Получена новая задача для выполнения."); lock (mIsStoppedLock) { if (IsStopped) { // Запрошена остановка. LogPriority(aTaskPriority, ConsoleColor.DarkGray); // Отклонять новые задачи. return false; } } // Добавить задачу в очередь. EnqueueTask(aTask, aTaskPriority); LogMessage("Задача добавлена в очередь задач.", ConsoleColor.DarkYellow); return true; } /// <summary> /// Останавливает добавлении задач в очередь пула потоков, очередь не очищается. Возвращает /// выполнение только после окончания всех имеющихся задач в очереди. /// </summary> /// <remarks> /// После вызова этого метода дальнейшее добавление задач в очередь на выполнение не /// возможно и метод <see cref="Execute"/> будет возвращать <see langword="false"/>. /// Имеющиеся, на момент выполнения этого метода, задачи в очереди будут выполнены. /// </remarks> public void Stop() { // Выставить признак окончания работы пула. lock (mIsStoppedLock) { IsStopped = true; LogMessage("Запрошена остановка пула."); } // Дождаться окончания выполнения всех задач, оставшихся в очереди. LogMessage( "Начато ожидание завершения выполнения всех оставшихся задач в очереди.", ConsoleColor.DarkRed); lock (mTaskSchedulerLock) { // Сигнализировать об изменении в условии блокировки по mTaskSchedulerLock. Monitor.PulseAll(mTaskSchedulerLock); } mPoolStoppedGate.WaitOne(); LogMessage("Дождались окончания выполнения всех задач в очереди.", ConsoleColor.DarkRed); } #endregion #region Private properties /// <summary> /// Получает/устанавливает признак того, остановлена ли работа пула. /// </summary> /// <value> /// <see langword="true"/> - работа пула остановлена, дальнейшее добавление задач в очередь /// не возможно. <see langword="false"/> - работа пула продолжается. /// </value> /// <remarks> /// Объект синхронизации доступа <see cref="mIsStoppedLock"/>. /// </remarks> private bool IsStopped { get; set; } /// <summary> /// Получает/устанавливает сервис логгирования приоритета задачи. /// </summary> private ILog PriorityLog { get; set; } /// <summary> /// Получает/устанавливает сервис логгирования сообщений. /// </summary> private ILog Log { get; set; } #endregion #region Private methods private void TaskThreadLogic() { lock (mTaskSchedulerLock) { while (true) { // Отпускаем монитор и ждём сигнала, поданного через mTaskSchedulerLock. Monitor.Wait(mTaskSchedulerLock); lock (mQueuedTasksLock) { if (!mQueuedTasks.Any()) { lock (mIsStoppedLock) { if (IsStopped) { LogMessage( "Запрошена остановка пула и больше нет задач в очереди на выполнение."); LogMessage("Планировщик - Выход из потока планировщика."); // Сигнализировать об окончании выполнения последней задачи. mPoolStoppedGate.Set(); return; } } // Задач в очереди нет, но не было запроса на остановку пула. // Ждём дальше появления задач. continue; } } LogMessage( "Дождались появления задачи в очереди задач.", ConsoleColor.DarkRed); // Дождаться и получить следующую задачу для выполнения. TaskListEntry lTask = DequeueTask(); LogMessage("Планировщик - Получена новая задача для выполнения."); switch (lTask.TaskPriority) { case TaskPriority.High: Interlocked.Increment(ref mQueuedHighPriorityTaskCounter); break; case TaskPriority.Normal: Interlocked.Add( ref mQueuedHighPriorityTaskCounter, -HighPriorityTaskFactor); break; } // Запустить задачу на выполнение. lTask.Task.Execute(); LogMessage( string.Format( "Планировщик - Запущена задача с приоритетом {0}.", lTask.TaskPriority), ConsoleColor.DarkYellow); lock (mQueuedTasksLock) { lock (mIsStoppedLock) { if (IsStopped && !mQueuedTasks.Any()) { LogMessage( "Запрошена остановка пула и больше нет задач в очереди на выполнение."); LogMessage("Планировщик - Выход из потока планировщика."); // Сигнализировать об окончании выполнения последней задачи. mPoolStoppedGate.Set(); return; } } } } } } /// <summary> /// Добавляет задачу <paramref name="aTask"/> в очередь задач на выполнение. /// </summary> /// <param name="aTask"> /// Задача, добавляемая в очередь задач на выполнение. /// </param> /// <param name="aTaskPriority"> /// Приоритет задачи. /// </param> private void EnqueueTask(Task aTask, TaskPriority aTaskPriority) { TaskListEntry lEntry = new TaskListEntry(aTask, aTaskPriority); LogPriority(aTaskPriority, ConsoleColor.Green); lock (mQueuedTasksLock) { // Добавить задачу в очередь задач на выполнение. mQueuedTasks.Add(lEntry); LogMessage( string.Format( "В очередь добавлена задача с приоритетом {0}", lEntry.TaskPriority), ConsoleColor.Green); // Поднять барьер доступа к очереди задач. LogMessage("Поднять барьер доступа к очереди задач.", ConsoleColor.DarkRed); } lock (mTaskSchedulerLock) { // Сигнализировать об изменении в условии блокировки по mTaskSchedulerLock. Monitor.Pulse(mTaskSchedulerLock); } } /// <summary> /// Ожидает появления в очереди задач хотя бы одной задачи. Дождавшись изымает следующую /// задачу из очереди, учитывая правила приоритетов. /// </summary> /// <returns> /// Задача, изъятая из очереди задач на выполнение. /// </returns> private TaskListEntry DequeueTask() { TaskListEntry lNextTask; lock (mQueuedTasksLock) { lNextTask = FindNextTaskUsingPriorityRules(); LogPriority(lNextTask.TaskPriority, ConsoleColor.Red); LogMessage( string.Format( "Получена задача из очереди задач с приоритетом {0}.", lNextTask.TaskPriority), ConsoleColor.DarkRed); mQueuedTasks.Remove(lNextTask); } lock (mTaskSchedulerLock) { // Сигнализировать об изменении в условии блокировки по mTaskSchedulerLock. Monitor.Pulse(mTaskSchedulerLock); } return lNextTask; } /// <summary> /// Находит следующую задачу с учётом правил приоритезации. /// </summary> /// <returns> /// Найденная следующая задача для выполнения. /// </returns> /// <remarks> /// ЗАМЕЧАНИЯ ПО ПРАВИЛАМ ПРИОРИТЕЗАЦИИ: Полученные мной правила выбора следующей задачи /// определены в задании не полностью. В частности, не было сказано что делать, если в /// очереди задач есть только задачи с обычным приоритетом. Также не определено, что нужно /// делать, если было выполнено больше, чем три задачи с высоким приоритетом. Поэтому я /// додумал эти правила, исходя из предположения, что лучше пусть выполняется хоть что-то, /// чем ждать появления задач с каким-то определённым приоритетом. /// </remarks> private TaskListEntry FindNextTaskUsingPriorityRules() { TaskListEntry lNextTask; lock (mQueuedTasksLock) { Debug.Assert( mQueuedTasks.Count > 0, "Метод FindNextTaskUsingPriorityRules не должен вызываться, если очередь задач пуста."); // По умолчанию будет выполняться задача с высоким приоритетом. TaskPriority lNextTaskPriority = TaskPriority.High; // Проверить возможность выполнения задачи с более низкими приоритетами. if (mQueuedTasks.All(aEntry => aEntry.TaskPriority == TaskPriority.Low)) { // В очереди задач все задачи с низким приоритетом. // Следующая задача будет с низким приоритетом. lNextTaskPriority = TaskPriority.Low; } else { // Условие для задач с низким приоритетом не выполняется. if (mQueuedTasks.Any( aEntry => aEntry.TaskPriority == TaskPriority.Normal) && (mQueuedTasks.All( aEntry => aEntry.TaskPriority != TaskPriority.High) || Interlocked.CompareExchange(ref mQueuedHighPriorityTaskCounter, 0, 0) >= HighPriorityTaskFactor)) { // В списке задач на выполнение есть задачи с обычным приоритетом и // выполнено достаточное количество задач с высоким приоритетом или // в очереди задач все задачи имеют приоритет ниже высокого. // Следующая задача будет с обычным приоритетом. lNextTaskPriority = TaskPriority.Normal; } } lNextTask = mQueuedTasks.First( aEntry => aEntry.TaskPriority == lNextTaskPriority); } return lNextTask; } /// <summary> /// Выводит сообщение в лог. /// </summary> /// <param name="aMessage"> /// Сообщение. /// </param> /// <param name="aColor"> /// Цвет сообщения. /// </param> private void LogMessage(string aMessage, ConsoleColor aColor = ConsoleColor.Yellow) { if (Log == null) { return; } Log.WriteMessage(aMessage, aColor); } /// <summary> /// Выводит приоритет задачи в лог. /// </summary> /// <param name="aTaskPriority"> /// Приоритет задачи. /// </param> /// <param name="aColor"> /// Цвет сообщения. /// </param> private void LogPriority(TaskPriority aTaskPriority, ConsoleColor aColor) { if (PriorityLog == null) { return; } string lPriority = aTaskPriority == TaskPriority.High ? "H" : aTaskPriority == TaskPriority.Normal ? "N" : "L"; PriorityLog.WriteMessage(lPriority, aColor); } #endregion #region Private data /// <summary> /// Количество задач с высоким приоритетом, которое должно быть поставлено в очередь до /// того, как можно будет поставить задачу с обычным приоритетом, если в очереди имеются /// задачи с высоким приоритетом. /// </summary> private const int HighPriorityTaskFactor = 3; /// <summary> /// Объект синхронизации доступа к свойству <see cref="IsStopped"/>. /// </summary> private readonly object mIsStoppedLock = new object(); /// <summary> /// Барьер остановки работы пула. Поднимается, когда запрошена остановка и все задачи /// завершили своё выполнение. /// </summary> private readonly ManualResetEvent mPoolStoppedGate = new ManualResetEvent(false); /// <summary> /// Список задач, поставленных в очередь на выполнение. /// </summary> /// <remarks> /// Объект синхронизации доступа <see cref="mQueuedTasksLock"/>. /// </remarks> private readonly IList<TaskListEntry> mQueuedTasks = new List<TaskListEntry>(); /// <summary> /// Объект синхронизации доступа к списку задач <see cref="mQueuedTasks"/>. /// </summary> private readonly object mQueuedTasksLock = new object(); /// <summary> /// Объект синхронизации, используемый для блокировки/запуска потока планировщика. /// </summary> private readonly object mTaskSchedulerLock = new object(); /// <summary> /// Счётчик задач с высоким приоритетом, запущенных на выполнение. Каждая запущенная задача /// с высоким приоритетом увеличивает это значение на единицу, каждая запущенная задача с /// обычным приоритетом уменьшает это значение на <see cref="HighPriorityTaskFactor"/>. /// </summary> /// <remarks> /// Синхронизация доступа должна выполняться по средствам использования методов класса /// <see cref="Interlocked"/>. /// </remarks> private int mQueuedHighPriorityTaskCounter; #endregion #region Nested type: TaskListEntry /// <summary> /// Элемент списка задач. /// </summary> /// <remarks> /// Объекты после создания не изменяемы. /// </remarks> private struct TaskListEntry { #region Constructors /// <summary> /// Инициализирует новый экземпляр элемента списка задач задачей и её приоритетом. /// </summary> /// <param name="aTask"> /// Задача. /// </param> /// <param name="aTaskPriority"> /// Приоритет задачи. /// </param> public TaskListEntry(Task aTask, TaskPriority aTaskPriority) { mTask = aTask; mTaskPriority = aTaskPriority; } #endregion #region Public properties /// <summary> /// Задача. /// </summary> public Task Task { get { return mTask; } } /// <summary> /// Приоритет задачи. /// </summary> public TaskPriority TaskPriority { get { return mTaskPriority; } } #endregion #region Private data private readonly Task mTask; private readonly TaskPriority mTaskPriority; #endregion } #endregion } }
Task
// Проект: Eshva.Threading // Имя файла: Task.cs // GUID файла: 292467E7-4816-4407-BB9B-3309D13C8614 // Автор: Mike Eshva (mike@eshva.ru) // Дата создания: 04.06.2012 using System; namespace Eshva.Threading.Framework { /// <summary> /// Задача для выполнения в <see cref="FixedThreadPool"/>. /// </summary> public class Task { #region Constructors /// <summary> /// Инициализирует новый экземпляр задачи для выполнения в <see cref="FixedThreadPool"/> /// делегатом тела задачи. /// </summary> /// <param name="aTaskBody"> /// Делегат тела задачи. /// </param> /// <exception cref="ArgumentNullException"> /// Делегат тела задачи не задан. /// </exception> public Task(Action aTaskBody) { if (aTaskBody == null) { throw new ArgumentNullException("aTaskBody", "Делегат тела задачи не задан."); } TaskBody = aTaskBody; } #endregion #region Public methods /// <summary> /// Начинает выполнение задачи. /// </summary> public void Execute() { TaskBody(); } #endregion #region Private properties /// <summary> /// Получает/устанавливает делегат тела задачи. /// </summary> private Action TaskBody { get; set; } #endregion } }
