Pull to refresh

Тестовое задание для Связного FixedThreadPool на C#. Что здесь не так? UPD

Reading time 25 min
Views 11K
UPDATE: Я не мог отказать себе в удовольствии исправить свой ошибочный код. Добавил раздел «Работа над ошибками», в котором привожу исправленный код и описание исправлений, основанное на полученных комментариях.

Это скорее пост-вопрос к специалистам, нежели просто кусок полезной информации. Приглашаю к дискуссии.
Недавно я имел счастье послать своё резюме в Связной на позицию .NET разработчика. В ответ меня попросили сделать тестовое задание на знание многопоточности. Я не могу назвать себя экспертом в этой области, но, тем не менее, прекрасно понял, как мне показалось, как реализовать следующие требования:

Требуется реализация класса на языке C#, аналогичного FixedThreadPool в Java, со следующими требованиями:
  • В конструктор этого класса должно передаваться количество потоков, которые будут выполнять задачи.
  • Интерфейс класса должен предоставлять методы: boolean execute(Task task, Priority priority) и void stop()
  • Интерфейс Task должен содержать один метод: void execute(), который вызывается в произвольном потоке.
  • Тип Priority — это перечисление из трёх приоритетов: HIGH, NORMAL, LOW. При этом во время выбора следующего задания из очереди действуют такие правила: на три задачи с приоритетом HIGH выполняется одна задача с приоритетом NORMAL, задачи с приоритетом LOW не выполняются, пока в очереди есть хоть одна задача с другим приоритетом.
  • До вызова метода stop() задачи ставятся в очередь на выполнение и метод boolean execute(Task task, Priority priority) сразу же возвращает true, не дожидаясь завершения выполнения задачи; а после вызова stop() новые задачи не добавляются в очередь на выполнение, и метод boolean execute(Task task, Priority priority) сразу же возвращает false.
  • Метод stop() ожидает завершения всех текущих задач (не очищая очередь).


Поскольку в задании не было сказано какими примитивами я должен пользоваться, должен ли сделать всё на простейших Thread или же использовать ThreadPool, TPL и т.п., я решил, что задание предполагает использование самых базовых элементов: Thread, ManualResetEvents и т.п. Написал за несколько часов, отослал. Сегодня позвонил и получил ответ через кадровика, который звучал примерно так: «это даже не близко к том, что надо». Это меня озадачило, ибо код работает и протестирован, явных огрехов, на мой взгляд нету.

Итак, на ваш суд представляю мою реализацию FixedThreadPool и сопутствующих классов. Сразу предупреждаю, что, по их мнению реализация ошибочна, и, соответственно, брать мою идею за основу не стоит. Некоторые коментарии по коду:

  • я решил инкапсулировать потоки задач в самом классе задачи,
  • два параметра с типом ILog нужны только для тестовых целей, к основной функциональности они, понятное дело, отношения не имеют,
  • весь проект, включая тестовое приложение можно загрузкить по ссылке (27 килобайт): тестовый проект на ifolder


FixedThreadPool
    /// <summary>
    /// Пул потоков, выполняющий одновременно не более определённого количество задач с учётом их
    /// приоритетов.
    /// </summary>
    /// <remarks>
    /// Тестовый проект для компании Связной.
    /// </remarks>
    public sealed class FixedThreadPool
    {
        #region Constructors

        /// <summary>
        /// Инициализирует новый экземпляр пул потоков максимальным количеством одновременно
        /// выполняемых задач.
        /// </summary>
        /// <param name="aConcurrentTaskNumber">
        /// Максимальное количество одновременно выполняемых задач.
        /// </param>
        /// <param name="aLog">
        /// Сервис логгирования.
        /// </param>
        /// <param name="aPriorityLog">
        /// Лог для вывода приоритета отобранной для выполнения задачи.
        /// </param>
        /// <exception cref="ArgumentOutOfRangeException">
        /// Неправильно заданное максимальное количество одновременно выполняемых задач.
        /// </exception>
        public FixedThreadPool
            (int aConcurrentTaskNumber, ILog aLog = null, ILog aPriorityLog = null)
        {
            if (aConcurrentTaskNumber <= 1)
            {
                throw new ArgumentOutOfRangeException(
                    "aConcurrentTaskNumber",
                    "Количество одновременно выполняемых задач должно быть больше единицы.");
            }

            Log = aLog;
            PriorityLog = aPriorityLog;

            mConcurrentTaskNumber = aConcurrentTaskNumber;

            LogMessage("Создан поток планировщика.");
            Thread lTaskSchedulerThread =
                new Thread(TaskSchedulerLogic) {Name = "Поток планировщика."};
            lTaskSchedulerThread.Start();
            LogMessage("Поток планировщика запущен.");
        }

        #endregion

        #region Public methods

        /// <summary>
        /// Ставит задачу <paramref name="aTask"/> в очередь на выполнение с приоритетом 
        /// <paramref name="aTaskPriority"/>.
        /// </summary>
        /// <param name="aTask">
        /// Задача для постановки в очередь на выполнения..
        /// </param>
        /// <param name="aTaskPriority">
        /// Приоритет задачи.
        /// </param>
        /// <returns>
        /// <see langword="true"/> - задача поставлена в очередь на выполнение. 
        /// <see langword="false"/> - задача не была поставлена в очередь на выполнение, так как
        /// работа пула потока была остановлена.
        /// </returns>
        /// <exception cref="ArgumentNullException">
        /// Задача для постановки в очередь на выполнения не задана.
        /// </exception>
        public bool Execute(Task aTask, TaskPriority aTaskPriority)
        {
            if (aTask == null)
            {
                throw new ArgumentNullException(
                    "aTask", "Задача для постановки в очередь на выполнения не задана.");
            }

            LogMessage("Получена новая задача для выполнения.");
            lock (mIsStoppedLock)
            {
                if (IsStopped)
                {
                    // Запрошена остановка.
                    LogPriority(aTaskPriority, ConsoleColor.DarkGray);
                    // Отклонять новые задачи.
                    return false;
                }
            }

            // Добавить задачу в очередь.
            EnqueueTask(aTask, aTaskPriority);
            LogMessage("Задача добавлена в очередь задач.", ConsoleColor.DarkYellow);

            return true;
        }

        /// <summary>
        /// Останавливает добавлении задач в очередь пула потоков, очередь не очищается. Возвращает
        /// выполнение только после окончания всех имеющихся задач в очереди.
        /// </summary>
        /// <remarks>
        /// После вызова этого метода дальнейшее добавление задач в очередь на выполнение не
        /// возможно и метод <see cref="Execute"/> будет возвращать <see langword="false"/>.
        /// Имеющиеся, на момент выполнения этого метода, задачи в очереди будут выполнены.
        /// </remarks>
        public void Stop()
        {
            // Выставить признак окончания работы пула.
            lock (mIsStoppedLock)
            {
                IsStopped = true;
                LogMessage("Запрошена остановка пула.");
            }

            // Дождаться окончания выполнения всех задач, оставшихся в очереди.
            LogMessage(
                "Начато ожидание завершения выполнения всех оставшихся задач в очереди.",
                ConsoleColor.DarkRed);
            lock (mTaskSchedulerLock)
            {
                // Сигнализировать об изменении в условии блокировки по mTaskSchedulerLock.
                Monitor.Pulse(mTaskSchedulerLock);
            }

            mPoolStoppedGate.WaitOne();

            LogMessage("Дождались окончания выполнения всех задач в очереди.", ConsoleColor.DarkRed);
        }

        #endregion

        #region Private properties

        /// <summary>
        /// Получает/устанавливает признак того, остановлена ли работа пула.
        /// </summary>
        /// <value>
        /// <see langword="true"/> - работа пула остановлена, дальнейшее добавление задач в очередь
        /// не возможно. <see langword="false"/> - работа пула продолжается.
        /// </value>
        /// <remarks>
        /// Объект синхронизации доступа <see cref="mIsStoppedLock"/>.
        /// </remarks>
        private bool IsStopped { get; set; }

        /// <summary>
        /// Получает/устанавливает сервис логгирования приоритета задачи.
        /// </summary>
        private ILog PriorityLog { get; set; }

        /// <summary>
        /// Получает/устанавливает сервис логгирования сообщений.
        /// </summary>
        private ILog Log { get; set; }

        #endregion

        #region Private methods

        /// <summary>
        /// Логика работы планировщика выполнения задач. Выполняется в отдельном потоке.
        /// </summary>
        private void TaskSchedulerLogic()
        {
            lock (mTaskSchedulerLock)
            {
                while (true)
                {
                    // Отпускаем монитор и ждём сигнала, поданного через mTaskSchedulerLock.
                    Monitor.Wait(mTaskSchedulerLock);
                    lock (mQueuedTasksLock)
                    {
                        lock (mIsStoppedLock)
                        {
                            lock (mRunningTasksLock)
                            {
                                if (IsStopped &&
                                    !mRunningTasks.Any() &&
                                    !mQueuedTasks.Any())
                                {
                                    LogMessage(
                                        "Запрошена остановка пула и больше нет задач в очереди на выполнение.");
                                    LogMessage("Планировщик - Выход из потока планировщика.");
                                    // Сигнализировать об окончании выполнения последней задачи.
                                    mPoolStoppedGate.Set();
                                    return;
                                }
                            }
                        }
                    }

                    lock (mQueuedTasksLock)
                    {
                        if (!mQueuedTasks.Any())
                        {
                            // Очередь задач пуста.
                            // Продолжить ожидание.
                            continue;
                        }
                    }

                    lock (mRunningTasksLock)
                    {
                        if (mRunningTasks.Count >= mConcurrentTaskNumber)
                        {
                            // Список выполняемых задач полон.
                            // Продолжить ожидание.
                            continue;
                        }
                    }

                    LogMessage(
                        "Дождались появления задачи в очереди задач.",
                        ConsoleColor.DarkRed);
                    // Дождаться и получить следующую задачу для выполнения.
                    TaskListEntry lTask = DequeueTask();
                    LogMessage("Планировщик - Получена новая задача для выполнения.");
                    // Подписаться на событие завершения выполнения задачи.
                    lTask.Task.Finished += OnTaskFinished;
                    // Добавить задачу в список выполняемых задач.
                    lock (mRunningTasksLock)
                    {
                        mRunningTasks.Add(lTask);
                    }
                    if (lTask.TaskPriority ==
                        TaskPriority.High)
                    {
                        // Запущена задача с высоким приоритетом.
                        // Увеличить значение счётчика запущенных задач с высоким приоритетом на единицу.
                        Interlocked.Increment(ref mQueuedHighPriorityTaskCounter);
                    }
                    else if (lTask.TaskPriority ==
                             TaskPriority.Normal)
                    {
                        // Запущена задача с обычным приоритетом.
                        // Уменьшить значение счётчика запущенных задач с высоким приоритетом на HighPriorityTaskFactor.
                        Interlocked.Add(
                            ref mQueuedHighPriorityTaskCounter, -HighPriorityTaskFactor);
                    }

                    // Запустить задачу на выполнение.
                    lTask.Task.Execute();
                    LogMessage(
                        string.Format(
                            "Планировщик - Запущена задача с приоритетом {0}.",
                            lTask.TaskPriority),
                        ConsoleColor.DarkYellow);

                    lock (mRunningTasksLock)
                    {
                        LogMessage(
                            string.Format(
                                "В списке выполняющихся задач {0} задач.",
                                mRunningTasks.Count));
                    }
                }
            }
        }

        /// <summary>
        /// Обрабатывает событие завершения задачи.
        /// </summary>
        /// <param name="aSender">
        /// Задача-источник события.
        /// </param>
        /// <param name="aEventArgs">
        /// Параметры события.
        /// </param>
        private void OnTaskFinished(object aSender, EventArgs aEventArgs)
        {
            Task lSender = aSender as Task;
            Debug.Assert(
                lSender != null, "В параметре aSender задача должна передавать ссылку на себя.");
            // Отписаться от события завершения, чтобы задача могла быть убрана сборщиком мусора.
            lSender.Finished -= OnTaskFinished;
            // Удалить задачу из списка выполняющихся.
            lock (mRunningTasksLock)
            {
                // Найти и удалить закончившуюся задачу из списка выполняющихся задач.
                TaskListEntry lEntry = mRunningTasks.First(aEntry => aEntry.Task == lSender);
                mRunningTasks.Remove(lEntry);
                LogMessage(
                    string.Format("Задача с приоритетом {0} завершена.", lEntry.TaskPriority),
                    ConsoleColor.Red);
            }

            lock (mTaskSchedulerLock)
            {
                // Сигнализировать об изменении в условии блокировки по mTaskSchedulerLock.
                Monitor.Pulse(mTaskSchedulerLock);
            }
        }

        /// <summary>
        /// Добавляет задачу <paramref name="aTask"/> в очередь задач на выполнение.
        /// </summary>
        /// <param name="aTask">
        /// Задача, добавляемая в очередь задач на выполнение.
        /// </param>
        /// <param name="aTaskPriority">
        /// Приоритет задачи.
        /// </param>
        private void EnqueueTask(Task aTask, TaskPriority aTaskPriority)
        {
            TaskListEntry lEntry = new TaskListEntry(aTask, aTaskPriority);
            LogPriority(aTaskPriority, ConsoleColor.Green);
            lock (mQueuedTasksLock)
            {
                // Добавить задачу в очередь задач на выполнение.
                mQueuedTasks.Add(lEntry);
                LogMessage(
                    string.Format(
                        "В очередь добавлена задача с приоритетом {0}", lEntry.TaskPriority),
                    ConsoleColor.Green);
                // Поднять барьер доступа к очереди задач.
                LogMessage("Поднять барьер доступа к очереди задач.", ConsoleColor.DarkRed);
            }
            lock (mTaskSchedulerLock)
            {
                // Сигнализировать об изменении в условии блокировки по mTaskSchedulerLock.
                Monitor.Pulse(mTaskSchedulerLock);
            }
        }

        /// <summary>
        /// Ожидает появления в очереди задач хотя бы одной задачи. Дождавшись изымает следующую
        /// задачу из очереди, учитывая правила приоритетов.
        /// </summary>
        /// <returns>
        /// Задача, изъятая из очереди задач на выполнение.
        /// </returns>
        private TaskListEntry DequeueTask()
        {
            TaskListEntry lNextTask;
            lock (mQueuedTasksLock)
            {
                lNextTask = FindNextTaskUsingPriorityRules();
                LogPriority(lNextTask.TaskPriority, ConsoleColor.Red);
                LogMessage(
                    string.Format(
                        "Получена задача из очереди задач с приоритетом {0}.",
                        lNextTask.TaskPriority),
                    ConsoleColor.DarkRed);
                mQueuedTasks.Remove(lNextTask);
            }
            lock (mTaskSchedulerLock)
            {
                // Сигнализировать об изменении в условии блокировки по mTaskSchedulerLock.
                Monitor.Pulse(mTaskSchedulerLock);
            }

            return lNextTask;
        }

        /// <summary>
        /// Находит следующую задачу с учётом правил приоритезации.
        /// </summary>
        /// <returns>
        /// Найденная следующая задача для выполнения.
        /// </returns>
        /// <remarks>
        /// ЗАМЕЧАНИЯ ПО ПРАВИЛАМ ПРИОРИТЕЗАЦИИ: Полученные мной правила выбора следующей задачи
        /// определены в задании не полностью. В частности, не было сказано что делать, если в
        /// очереди задач есть только задачи с обычным приоритетом. Также не определено, что нужно
        /// делать, если было выполнено больше, чем три задачи с высоким приоритетом. Поэтому я
        /// додумал эти правила, исходя из предположения, что лучше пусть выполняется хоть что-то,
        /// чем ждать появления задач с каким-то определённым приоритетом.
        /// </remarks>
        private TaskListEntry FindNextTaskUsingPriorityRules()
        {
            TaskListEntry lNextTask;
            lock (mQueuedTasksLock)
            {
                Debug.Assert(
                    mQueuedTasks.Count > 0,
                    "Метод FindNextTaskUsingPriorityRules не должен вызываться, если очередь задач пуста.");
                // По умолчанию будет выполняться задача с высоким приоритетом.
                TaskPriority lNextTaskPriority = TaskPriority.High;
                // Проверить возможность выполнения задачи с более низкими приоритетами.
                if (mQueuedTasks.All(aEntry => aEntry.TaskPriority == TaskPriority.Low))
                {
                    // В очереди задач все задачи с низким приоритетом.
                    // Следующая задача будет с низким приоритетом.
                    lNextTaskPriority = TaskPriority.Low;
                }
                else
                {
                    // Условие для задач с низким приоритетом не выполняется.
                    if (mQueuedTasks.Any(
                        aEntry => aEntry.TaskPriority == TaskPriority.Normal) &&
                        (mQueuedTasks.All(
                            aEntry => aEntry.TaskPriority != TaskPriority.High) ||
                         Interlocked.CompareExchange(ref mQueuedHighPriorityTaskCounter, 0, 0) >=
                         HighPriorityTaskFactor))
                    {
                        // В списке задач на выполнение есть задачи с обычным приоритетом и
                        // выполнено достаточное количество задач с высоким приоритетом или
                        // в очереди задач все задачи имеют приоритет ниже высокого.
                        // Следующая задача будет с обычным приоритетом.
                        lNextTaskPriority = TaskPriority.Normal;
                    }
                }

                lNextTask = mQueuedTasks.First(
                    aEntry => aEntry.TaskPriority == lNextTaskPriority);
            }

            return lNextTask;
        }

        /// <summary>
        /// Выводит сообщение в лог.
        /// </summary>
        /// <param name="aMessage">
        /// Сообщение.
        /// </param>
        /// <param name="aColor">
        /// Цвет сообщения.
        /// </param>
        private void LogMessage(string aMessage, ConsoleColor aColor = ConsoleColor.Yellow)
        {
            if (Log == null)
            {
                return;
            }

            Log.WriteMessage(aMessage, aColor);
        }

        /// <summary>
        /// Выводит приоритет задачи в лог.
        /// </summary>
        /// <param name="aTaskPriority">
        /// Приоритет задачи.
        /// </param>
        /// <param name="aColor">
        /// Цвет сообщения.
        /// </param>
        private void LogPriority(TaskPriority aTaskPriority, ConsoleColor aColor)
        {
            if (PriorityLog == null)
            {
                return;
            }

            string lPriority = aTaskPriority == TaskPriority.High
                                   ? "H"
                                   : aTaskPriority == TaskPriority.Normal ? "N" : "L";
            PriorityLog.WriteMessage(lPriority, aColor);
        }

        #endregion

        #region Private data

        /// <summary>
        /// Количество задач с высоким приоритетом, которое должно быть поставлено в очередь до
        /// того, как можно будет поставить задачу с обычным приоритетом, если в очереди имеются
        /// задачи с высоким приоритетом.
        /// </summary>
        private const int HighPriorityTaskFactor = 3;

        /// <summary>
        /// Максимальное количество одновременно выполняемых задач. Установка значения возможна
        /// только в конструкторе.
        /// </summary>
        private readonly int mConcurrentTaskNumber;

        /// <summary>
        /// Объект синхронизации доступа к свойству <see cref="IsStopped"/>.
        /// </summary>
        private readonly object mIsStoppedLock = new object();

        /// <summary>
        /// Барьер остановки работы пула. Поднимается, когда запрошена остановка и все задачи
        /// завершили своё выполнение.
        /// </summary>
        private readonly ManualResetEvent mPoolStoppedGate = new ManualResetEvent(false);

        /// <summary>
        /// Список задач, поставленных в очередь на выполнение.
        /// </summary>
        /// <remarks>
        /// Объект синхронизации доступа <see cref="mQueuedTasksLock"/>.
        /// </remarks>
        private readonly IList<TaskListEntry> mQueuedTasks = new List<TaskListEntry>();

        /// <summary>
        /// Объект синхронизации доступа к списку задач <see cref="mQueuedTasks"/>.
        /// </summary>
        private readonly object mQueuedTasksLock = new object();

        /// <summary>
        /// Список выполняющихся в настоящий момент задач.
        /// </summary>
        /// <remarks>
        /// Объект синхронизации доступа <see cref="mRunningTasksLock"/>.
        /// </remarks>
        private readonly IList<TaskListEntry> mRunningTasks = new List<TaskListEntry>();

        /// <summary>
        /// Объект синхронизации доступа к <see cref="mRunningTasks"/>.
        /// </summary>
        private readonly object mRunningTasksLock = new object();

        /// <summary>
        /// Объект синхронизации, используемый для блокировки/запуска потока планировщика.
        /// </summary>
        private readonly object mTaskSchedulerLock = new object();

        /// <summary>
        /// Счётчик задач с высоким приоритетом, запущенных на выполнение. Каждая запущенная задача
        /// с высоким приоритетом увеличивает это значение на единицу, каждая запущенная задача с
        /// обычным приоритетом уменьшает это значение на <see cref="HighPriorityTaskFactor"/>.
        /// </summary>
        /// <remarks>
        /// Синхронизация доступа должна выполняться по средствам использования методов класса 
        /// <see cref="Interlocked"/>.
        /// </remarks>
        private int mQueuedHighPriorityTaskCounter;

        #endregion

        #region Nested type: TaskListEntry

        /// <summary>
        /// Элемент списка задач. 
        /// </summary>
        /// <remarks>
        /// Объекты после создания не изменяемы.
        /// </remarks>
        private struct TaskListEntry
        {
            #region Constructors

            /// <summary>
            /// Инициализирует новый экземпляр элемента списка задач задачей и её приоритетом.
            /// </summary>
            /// <param name="aTask">
            /// Задача.
            /// </param>
            /// <param name="aTaskPriority">
            /// Приоритет задачи.
            /// </param>
            public TaskListEntry(Task aTask, TaskPriority aTaskPriority)
            {
                mTask = aTask;
                mTaskPriority = aTaskPriority;
            }

            #endregion

            #region Public properties

            /// <summary>
            /// Задача.
            /// </summary>
            public Task Task
            {
                get { return mTask; }
            }

            /// <summary>
            /// Приоритет задачи.
            /// </summary>
            public TaskPriority TaskPriority
            {
                get { return mTaskPriority; }
            }

            #endregion

            #region Private data

            private readonly Task mTask;
            private readonly TaskPriority mTaskPriority;

            #endregion
        }

        #endregion
    }



Task
    /// <summary>
    /// Задача для выполнения в <see cref="FixedThreadPool1"/>.
    /// </summary>
    public class Task
    {
        #region Constructors

        /// <summary>
        /// Инициализирует новый экземпляр задачи для выполнения в <see cref="FixedThreadPool1"/>
        /// делегатом тела задачи.
        /// </summary>
        /// <param name="aTaskBody">
        /// Делегат тела задачи.
        /// </param>
        /// <exception cref="ArgumentNullException">
        /// Делегат тела задачи не задан.
        /// </exception>
        public Task(Action aTaskBody)
        {
            if (aTaskBody == null)
            {
                throw new ArgumentNullException("aTaskBody", "Делегат тела задачи не задан.");
            }

            TaskBody = aTaskBody;
        }

        #endregion

        #region Public properties

        /// <summary>
        /// Получает/устанавливает делегат тела задачи.
        /// </summary>
        public Action TaskBody { get; private set; }

        #endregion

        #region Events

        /// <summary>
        /// Событие, сообщающее о завершении выполнения задачи.
        /// </summary>
        public event EventHandler Finished;

        #endregion

        #region Public methods

        /// <summary>
        /// Начинает выполнение задачи.
        /// </summary>
        public void Execute()
        { 
            Thread lTaskThread =
                new Thread(
                    () =>
                        {
                            // Выполнить задачу.
                            TaskBody();
                            // Уведомить об её окончании.
                            EventHandler lFinished = Finished;
                            if (lFinished != null)
                            {
                                lFinished(this, EventArgs.Empty);
                            }
                        }) {Name = "Task thread."};
            lTaskThread.Start();
        }

        #endregion
    }



TaskPriority

    /// <summary>
    /// Приоритет задачи.
    /// </summary>
    public enum TaskPriority
    {
        /// <summary>
        /// Высокий приоритет.
        /// </summary>
        High = 0,

        /// <summary>
        /// Обычный приоритет.
        /// </summary>
        Normal,

        /// <summary>
        /// Низкий приоритет.
        /// </summary>
        Low
    }




Работа над ошибками


Спасибо большое за комментарии и конструктивную критику. Я решил, что тема будет не закрыта, если не опубликовать исправленное решение. Для начала перечислю рекомендации и опишу какие вошли в код, а какие нет, и почему.
  • Создание потока — операция очень затратная. Пул потоков нужен для того, чтобы хранить в нём несколько уже созданных потоков. Это было моей главной ошибкой и, судя по всему, именно это вызвало столь негативную реакцию со стороны технического специалиста, проверяющего моё решение. Я просто слишком сконцентрировался на условиях задачи и проглядел требование, которое следовало из самого названия класса. Спасибо, Unrul, iaroshenko. Теперь я создаю необходимое количество потоков в конструкторе пула. Класс Task больше не занимается созданием потоков.
  • Поток планировщика не нужен. Действительно, после того, как создание потока было перенесено из класса задачи в класс пула, стало возможно переложить функцию извлечения задач из очереди на сами потоки. Спасибо, iaroshenko, romik.
  • В интерфейсе Task не должно быть event. Поскольку класс задачи больше не занимается созданием потоков, событие окончания обработки задачи теперь действительно не нужен. Спасибо, Nagg.
  • Блокировку на IsStopped можно заменить обычным volatile поле. С этим я не согласен. Несмотря на утверждение MSDN о том, что благодаря модификатору volatile читающий значение такого поля код будет получать самое актуальное значение, это не так. На самом деле, если говорить упрощённо, volatile позволяет гарантировать то, что компилятор, CLR и процессор не поменяют местами чтение/запись в volatile поле и другие поля. Более подробно можно почитать здесь. Однако в моём случае блокировка IsStopped нужна для другого. Без неё потенциально возможно, что в очередь попадёт новая задача после фактической остановки пула, а также возможен случай, когда поток задачи останется в состоянии ожидания, не смотря на то, что был дан сигнал завершения. В любом случае, небольшая перестраховка в многопоточном коде никогда не повредит. Это лучше, чем потом ловить нерегулярные ошибки. Не стану с пеной у рта защищать это решение. Если кто сможет опровергнуть мой довод на основе кода, буду признателен. Спасибо, elw00d.
  • Очередь с приоритетами реализовать отдельно от логики собственно ThreadPool'а. Этого я тоже не стал делать. Фактически пул содержит не так много кода, там больше комментариев и логгирования. Но, тем не менее, основная причина не в этом. Представьте, что условие отбора изменилось. Или же нужно обеспечить возможность настройки принципа выбора следующей задачи в момент создания экземпляра пула. Придётся создавать новый класс сортированной коллекции (очереди) и, соответственно, плодить по числу классов коллекций классы пулов. Это неправильное решение. В данном случае будет лучше вынести алгоритм выбора следующей задачи в отдельный класс, реализовав шаблон Strategy. Объект «отборщика» в таком случае будет передаваться в конструктор. Но я не буду усложнять своё решение, так как по условию задачи это не требуется. Спасибо, elw00d.
  • Interlocked-методы тоже неясно зачем (если уже используются мониторы и event'ы). Эту часть я тоже не стал трогать. Interlocked-методы позволяют избежать нарушение условий отбора задач с приоритетами High и Normal. Поскольку нет блокировок на доступ к счётчику mQueuedHighPriorityTaskCounter я предпочёл на всякий случай перестраховаться, использовав Interlocked-методы. Возможно более глубокий анализ кода выявит мою неправоту, но как я уже говорил, я не самый большой специалист в многопоточности и предпочитаю подстелить соломку, тем более когда она стоит дёшево. Спасибо, elw00d.
  • Было бы очень интересно, если бы автор решил озвученные в комментариях недочеты и снова отправил в Связной обновленную версию решения. К коду это конечно имеет лишь опосредованное отношение, но тем не менее. Думаю, я не стану отправлять исправленное решение в Связной. Это будет нечестно, поскольку я воспользовался вашей помощью. Скорее всего, я отправлю им ссылку на эту статью, а там пусть сами решают нужен ли им «уже готовый специалист» или подходит человек, который умеет признавать свои ошибки и развиваться, двигаться дальше. Спасибо за поддержку, @ARC_Programmer.

Тестовый проект с исправлениями можно загрузить здесь. Исправленный код:

FixedThreadPool
// Проект: Eshva.Threading
// Имя файла: FixedThreadPool.cs
// GUID файла: 7F1EECB7-F28A-4A20-9536-26D174BCD437
// Автор: Mike Eshva (mike@eshva.ru)
// Дата создания: 04.06.2012

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;


namespace Eshva.Threading.Framework
{
    /// <summary>
    /// Пул потоков, выполняющий одновременно не более определённого количество задач с учётом их
    /// приоритетов.
    /// </summary>
    /// <remarks>
    /// Тестовый проект для компании Связной.
    /// </remarks>
    public sealed class FixedThreadPool
    {
        #region Constructors

        /// <summary>
        /// Инициализирует новый экземпляр пул потоков максимальным количеством одновременно
        /// выполняемых задач.
        /// </summary>
        /// <param name="aConcurrentTaskNumber">
        /// Максимальное количество одновременно выполняемых задач.
        /// </param>
        /// <param name="aLog">
        /// Сервис логгирования.
        /// </param>
        /// <param name="aPriorityLog">
        /// Лог для вывода приоритета отобранной для выполнения задачи.
        /// </param>
        /// <exception cref="ArgumentOutOfRangeException">
        /// Неправильно заданное максимальное количество одновременно выполняемых задач.
        /// </exception>
        public FixedThreadPool
            (int aConcurrentTaskNumber, ILog aLog = null, ILog aPriorityLog = null)
        {
            if (aConcurrentTaskNumber <= 1)
            {
                throw new ArgumentOutOfRangeException(
                    "aConcurrentTaskNumber",
                    "Количество одновременно выполняемых задач должно быть больше единицы.");
            }

            Log = aLog;
            PriorityLog = aPriorityLog;

            for (int lThreadIndex = 0; lThreadIndex < aConcurrentTaskNumber; lThreadIndex++)
            {
                string lThreadName = string.Format("Task thread #{0}", lThreadIndex);
                Thread lTaskThread = new Thread(TaskThreadLogic) {Name = lThreadName};
                lTaskThread.Start();
            }
        }

        #endregion

        #region Public methods

        /// <summary>
        /// Ставит задачу <paramref name="aTask"/> в очередь на выполнение с приоритетом 
        /// <paramref name="aTaskPriority"/>.
        /// </summary>
        /// <param name="aTask">
        /// Задача для постановки в очередь на выполнения..
        /// </param>
        /// <param name="aTaskPriority">
        /// Приоритет задачи.
        /// </param>
        /// <returns>
        /// <see langword="true"/> - задача поставлена в очередь на выполнение. 
        /// <see langword="false"/> - задача не была поставлена в очередь на выполнение, так как
        /// работа пула потока была остановлена.
        /// </returns>
        /// <exception cref="ArgumentNullException">
        /// Задача для постановки в очередь на выполнения не задана.
        /// </exception>
        public bool Execute(Task aTask, TaskPriority aTaskPriority)
        {
            if (aTask == null)
            {
                throw new ArgumentNullException(
                    "aTask", "Задача для постановки в очередь на выполнения не задана.");
            }

            LogMessage("Получена новая задача для выполнения.");
            lock (mIsStoppedLock)
            {
                if (IsStopped)
                {
                    // Запрошена остановка.
                    LogPriority(aTaskPriority, ConsoleColor.DarkGray);
                    // Отклонять новые задачи.
                    return false;
                }
            }

            // Добавить задачу в очередь.
            EnqueueTask(aTask, aTaskPriority);
            LogMessage("Задача добавлена в очередь задач.", ConsoleColor.DarkYellow);

            return true;
        }

        /// <summary>
        /// Останавливает добавлении задач в очередь пула потоков, очередь не очищается. Возвращает
        /// выполнение только после окончания всех имеющихся задач в очереди.
        /// </summary>
        /// <remarks>
        /// После вызова этого метода дальнейшее добавление задач в очередь на выполнение не
        /// возможно и метод <see cref="Execute"/> будет возвращать <see langword="false"/>.
        /// Имеющиеся, на момент выполнения этого метода, задачи в очереди будут выполнены.
        /// </remarks>
        public void Stop()
        {
            // Выставить признак окончания работы пула.
            lock (mIsStoppedLock)
            {
                IsStopped = true;
                LogMessage("Запрошена остановка пула.");
            }

            // Дождаться окончания выполнения всех задач, оставшихся в очереди.
            LogMessage(
                "Начато ожидание завершения выполнения всех оставшихся задач в очереди.",
                ConsoleColor.DarkRed);
            lock (mTaskSchedulerLock)
            {
                // Сигнализировать об изменении в условии блокировки по mTaskSchedulerLock.
                Monitor.PulseAll(mTaskSchedulerLock);
            }

            mPoolStoppedGate.WaitOne();

            LogMessage("Дождались окончания выполнения всех задач в очереди.", ConsoleColor.DarkRed);
        }

        #endregion

        #region Private properties

        /// <summary>
        /// Получает/устанавливает признак того, остановлена ли работа пула.
        /// </summary>
        /// <value>
        /// <see langword="true"/> - работа пула остановлена, дальнейшее добавление задач в очередь
        /// не возможно. <see langword="false"/> - работа пула продолжается.
        /// </value>
        /// <remarks>
        /// Объект синхронизации доступа <see cref="mIsStoppedLock"/>.
        /// </remarks>
        private bool IsStopped { get; set; }

        /// <summary>
        /// Получает/устанавливает сервис логгирования приоритета задачи.
        /// </summary>
        private ILog PriorityLog { get; set; }

        /// <summary>
        /// Получает/устанавливает сервис логгирования сообщений.
        /// </summary>
        private ILog Log { get; set; }

        #endregion

        #region Private methods

        private void TaskThreadLogic()
        {
            lock (mTaskSchedulerLock)
            {
                while (true)
                {
                    // Отпускаем монитор и ждём сигнала, поданного через mTaskSchedulerLock.
                    Monitor.Wait(mTaskSchedulerLock);
                    lock (mQueuedTasksLock)
                    {
                        if (!mQueuedTasks.Any())
                        {
                            lock (mIsStoppedLock)
                            {
                                if (IsStopped)
                                {
                                    LogMessage(
                                        "Запрошена остановка пула и больше нет задач в очереди на выполнение.");
                                    LogMessage("Планировщик - Выход из потока планировщика.");
                                    // Сигнализировать об окончании выполнения последней задачи.
                                    mPoolStoppedGate.Set();
                                    return;
                                }
                            }
                            // Задач в очереди нет, но не было запроса на остановку пула.
                            // Ждём дальше появления задач.
                            continue;
                        }
                    }

                    LogMessage(
                        "Дождались появления задачи в очереди задач.",
                        ConsoleColor.DarkRed);
                    // Дождаться и получить следующую задачу для выполнения.
                    TaskListEntry lTask = DequeueTask();
                    LogMessage("Планировщик - Получена новая задача для выполнения.");

                    switch (lTask.TaskPriority)
                    {
                        case TaskPriority.High:
                            Interlocked.Increment(ref mQueuedHighPriorityTaskCounter);
                            break;
                        case TaskPriority.Normal:
                            Interlocked.Add(
                                ref mQueuedHighPriorityTaskCounter, -HighPriorityTaskFactor);
                            break;
                    }

                    // Запустить задачу на выполнение.
                    lTask.Task.Execute();
                    LogMessage(
                        string.Format(
                            "Планировщик - Запущена задача с приоритетом {0}.",
                            lTask.TaskPriority),
                        ConsoleColor.DarkYellow);

                    lock (mQueuedTasksLock)
                    {
                        lock (mIsStoppedLock)
                        {
                            if (IsStopped &&
                                !mQueuedTasks.Any())
                            {
                                LogMessage(
                                    "Запрошена остановка пула и больше нет задач в очереди на выполнение.");
                                LogMessage("Планировщик - Выход из потока планировщика.");
                                // Сигнализировать об окончании выполнения последней задачи.
                                mPoolStoppedGate.Set();
                                return;
                            }
                        }
                    }
                }
            }
        }

        /// <summary>
        /// Добавляет задачу <paramref name="aTask"/> в очередь задач на выполнение.
        /// </summary>
        /// <param name="aTask">
        /// Задача, добавляемая в очередь задач на выполнение.
        /// </param>
        /// <param name="aTaskPriority">
        /// Приоритет задачи.
        /// </param>
        private void EnqueueTask(Task aTask, TaskPriority aTaskPriority)
        {
            TaskListEntry lEntry = new TaskListEntry(aTask, aTaskPriority);
            LogPriority(aTaskPriority, ConsoleColor.Green);
            lock (mQueuedTasksLock)
            {
                // Добавить задачу в очередь задач на выполнение.
                mQueuedTasks.Add(lEntry);
                LogMessage(
                    string.Format(
                        "В очередь добавлена задача с приоритетом {0}", lEntry.TaskPriority),
                    ConsoleColor.Green);
                // Поднять барьер доступа к очереди задач.
                LogMessage("Поднять барьер доступа к очереди задач.", ConsoleColor.DarkRed);
            }
            lock (mTaskSchedulerLock)
            {
                // Сигнализировать об изменении в условии блокировки по mTaskSchedulerLock.
                Monitor.Pulse(mTaskSchedulerLock);
            }
        }

        /// <summary>
        /// Ожидает появления в очереди задач хотя бы одной задачи. Дождавшись изымает следующую
        /// задачу из очереди, учитывая правила приоритетов.
        /// </summary>
        /// <returns>
        /// Задача, изъятая из очереди задач на выполнение.
        /// </returns>
        private TaskListEntry DequeueTask()
        {
            TaskListEntry lNextTask;
            lock (mQueuedTasksLock)
            {
                lNextTask = FindNextTaskUsingPriorityRules();
                LogPriority(lNextTask.TaskPriority, ConsoleColor.Red);
                LogMessage(
                    string.Format(
                        "Получена задача из очереди задач с приоритетом {0}.",
                        lNextTask.TaskPriority),
                    ConsoleColor.DarkRed);
                mQueuedTasks.Remove(lNextTask);
            }
            lock (mTaskSchedulerLock)
            {
                // Сигнализировать об изменении в условии блокировки по mTaskSchedulerLock.
                Monitor.Pulse(mTaskSchedulerLock);
            }

            return lNextTask;
        }

        /// <summary>
        /// Находит следующую задачу с учётом правил приоритезации.
        /// </summary>
        /// <returns>
        /// Найденная следующая задача для выполнения.
        /// </returns>
        /// <remarks>
        /// ЗАМЕЧАНИЯ ПО ПРАВИЛАМ ПРИОРИТЕЗАЦИИ: Полученные мной правила выбора следующей задачи
        /// определены в задании не полностью. В частности, не было сказано что делать, если в
        /// очереди задач есть только задачи с обычным приоритетом. Также не определено, что нужно
        /// делать, если было выполнено больше, чем три задачи с высоким приоритетом. Поэтому я
        /// додумал эти правила, исходя из предположения, что лучше пусть выполняется хоть что-то,
        /// чем ждать появления задач с каким-то определённым приоритетом.
        /// </remarks>
        private TaskListEntry FindNextTaskUsingPriorityRules()
        {
            TaskListEntry lNextTask;
            lock (mQueuedTasksLock)
            {
                Debug.Assert(
                    mQueuedTasks.Count > 0,
                    "Метод FindNextTaskUsingPriorityRules не должен вызываться, если очередь задач пуста.");
                // По умолчанию будет выполняться задача с высоким приоритетом.
                TaskPriority lNextTaskPriority = TaskPriority.High;
                // Проверить возможность выполнения задачи с более низкими приоритетами.
                if (mQueuedTasks.All(aEntry => aEntry.TaskPriority == TaskPriority.Low))
                {
                    // В очереди задач все задачи с низким приоритетом.
                    // Следующая задача будет с низким приоритетом.
                    lNextTaskPriority = TaskPriority.Low;
                }
                else
                {
                    // Условие для задач с низким приоритетом не выполняется.
                    if (mQueuedTasks.Any(
                        aEntry => aEntry.TaskPriority == TaskPriority.Normal) &&
                        (mQueuedTasks.All(
                            aEntry => aEntry.TaskPriority != TaskPriority.High) ||
                         Interlocked.CompareExchange(ref mQueuedHighPriorityTaskCounter, 0, 0) >=
                         HighPriorityTaskFactor))
                    {
                        // В списке задач на выполнение есть задачи с обычным приоритетом и
                        // выполнено достаточное количество задач с высоким приоритетом или
                        // в очереди задач все задачи имеют приоритет ниже высокого.
                        // Следующая задача будет с обычным приоритетом.
                        lNextTaskPriority = TaskPriority.Normal;
                    }
                }

                lNextTask = mQueuedTasks.First(
                    aEntry => aEntry.TaskPriority == lNextTaskPriority);
            }

            return lNextTask;
        }

        /// <summary>
        /// Выводит сообщение в лог.
        /// </summary>
        /// <param name="aMessage">
        /// Сообщение.
        /// </param>
        /// <param name="aColor">
        /// Цвет сообщения.
        /// </param>
        private void LogMessage(string aMessage, ConsoleColor aColor = ConsoleColor.Yellow)
        {
            if (Log == null)
            {
                return;
            }

            Log.WriteMessage(aMessage, aColor);
        }

        /// <summary>
        /// Выводит приоритет задачи в лог.
        /// </summary>
        /// <param name="aTaskPriority">
        /// Приоритет задачи.
        /// </param>
        /// <param name="aColor">
        /// Цвет сообщения.
        /// </param>
        private void LogPriority(TaskPriority aTaskPriority, ConsoleColor aColor)
        {
            if (PriorityLog == null)
            {
                return;
            }

            string lPriority = aTaskPriority == TaskPriority.High
                                   ? "H"
                                   : aTaskPriority == TaskPriority.Normal ? "N" : "L";
            PriorityLog.WriteMessage(lPriority, aColor);
        }

        #endregion

        #region Private data

        /// <summary>
        /// Количество задач с высоким приоритетом, которое должно быть поставлено в очередь до
        /// того, как можно будет поставить задачу с обычным приоритетом, если в очереди имеются
        /// задачи с высоким приоритетом.
        /// </summary>
        private const int HighPriorityTaskFactor = 3;

        /// <summary>
        /// Объект синхронизации доступа к свойству <see cref="IsStopped"/>.
        /// </summary>
        private readonly object mIsStoppedLock = new object();

        /// <summary>
        /// Барьер остановки работы пула. Поднимается, когда запрошена остановка и все задачи
        /// завершили своё выполнение.
        /// </summary>
        private readonly ManualResetEvent mPoolStoppedGate = new ManualResetEvent(false);

        /// <summary>
        /// Список задач, поставленных в очередь на выполнение.
        /// </summary>
        /// <remarks>
        /// Объект синхронизации доступа <see cref="mQueuedTasksLock"/>.
        /// </remarks>
        private readonly IList<TaskListEntry> mQueuedTasks = new List<TaskListEntry>();

        /// <summary>
        /// Объект синхронизации доступа к списку задач <see cref="mQueuedTasks"/>.
        /// </summary>
        private readonly object mQueuedTasksLock = new object();

        /// <summary>
        /// Объект синхронизации, используемый для блокировки/запуска потока планировщика.
        /// </summary>
        private readonly object mTaskSchedulerLock = new object();

        /// <summary>
        /// Счётчик задач с высоким приоритетом, запущенных на выполнение. Каждая запущенная задача
        /// с высоким приоритетом увеличивает это значение на единицу, каждая запущенная задача с
        /// обычным приоритетом уменьшает это значение на <see cref="HighPriorityTaskFactor"/>.
        /// </summary>
        /// <remarks>
        /// Синхронизация доступа должна выполняться по средствам использования методов класса 
        /// <see cref="Interlocked"/>.
        /// </remarks>
        private int mQueuedHighPriorityTaskCounter;

        #endregion

        #region Nested type: TaskListEntry

        /// <summary>
        /// Элемент списка задач. 
        /// </summary>
        /// <remarks>
        /// Объекты после создания не изменяемы.
        /// </remarks>
        private struct TaskListEntry
        {
            #region Constructors

            /// <summary>
            /// Инициализирует новый экземпляр элемента списка задач задачей и её приоритетом.
            /// </summary>
            /// <param name="aTask">
            /// Задача.
            /// </param>
            /// <param name="aTaskPriority">
            /// Приоритет задачи.
            /// </param>
            public TaskListEntry(Task aTask, TaskPriority aTaskPriority)
            {
                mTask = aTask;
                mTaskPriority = aTaskPriority;
            }

            #endregion

            #region Public properties

            /// <summary>
            /// Задача.
            /// </summary>
            public Task Task
            {
                get { return mTask; }
            }

            /// <summary>
            /// Приоритет задачи.
            /// </summary>
            public TaskPriority TaskPriority
            {
                get { return mTaskPriority; }
            }

            #endregion

            #region Private data

            private readonly Task mTask;
            private readonly TaskPriority mTaskPriority;

            #endregion
        }

        #endregion
    }
}


Task
// Проект: Eshva.Threading
// Имя файла: Task.cs
// GUID файла: 292467E7-4816-4407-BB9B-3309D13C8614
// Автор: Mike Eshva (mike@eshva.ru)
// Дата создания: 04.06.2012

using System;


namespace Eshva.Threading.Framework
{
    /// <summary>
    /// Задача для выполнения в <see cref="FixedThreadPool"/>.
    /// </summary>
    public class Task
    {
        #region Constructors

        /// <summary>
        /// Инициализирует новый экземпляр задачи для выполнения в <see cref="FixedThreadPool"/>
        /// делегатом тела задачи.
        /// </summary>
        /// <param name="aTaskBody">
        /// Делегат тела задачи.
        /// </param>
        /// <exception cref="ArgumentNullException">
        /// Делегат тела задачи не задан.
        /// </exception>
        public Task(Action aTaskBody)
        {
            if (aTaskBody == null)
            {
                throw new ArgumentNullException("aTaskBody", "Делегат тела задачи не задан.");
            }

            TaskBody = aTaskBody;
        }

        #endregion

        #region Public methods

        /// <summary>
        /// Начинает выполнение задачи.
        /// </summary>
        public void Execute()
        {
            TaskBody();
        }

        #endregion

        #region Private properties

        /// <summary>
        /// Получает/устанавливает делегат тела задачи.
        /// </summary>
        private Action TaskBody { get; set; }

        #endregion
    }
}

Tags:
Hubs:
+13
Comments 51
Comments Comments 51

Articles