Pull to refresh

Распараллеливание задач с зависимостями —  пример на .NET

Reading time 8 min
Views 11K
Original author: Riccardo Terrell
Здравствуйте, коллеги!

На этой неделе мы отдали в перевод амбициозную по своей сложности книгу "Concurrency in .NET" издательства Manning:



Автор любезно выложил на сайте Medium отрывок из 13-й главы, который мы и предлагаем оценить задолго до премьеры.
Приятного чтения!

Допустим, вам нужно написать инструмент, позволяющий выполнять ряд асинхронных задач, у каждой из которых – свой набор зависимостей, влияющих на порядок выполнения операций. Такие проблемы можно решать при помощи последовательного и императивного выполнения, но, если вы хотите добиться максимальной производительности, то последовательные операции вам не подойдут. Вместо этого нужно организовать параллельное выполнение задач. Многие конкурентные проблемы можно трактовать как статические коллекции атомарных операций с зависимостями между их входными и выходными данными. По завершении операции ее вывод используется в качестве ввода для других, зависимых операций. Для оптимизации производительности задачи нужно назначать, исходя из их зависимостей, и алгоритм должен быть настроен так, чтобы зависимые задачи выполнялись настолько последовательно, насколько это необходимо, и настолько параллельно, насколько это возможно.

Вы хотите сделать многоразовый компонент, параллельно выполняющий серию задач, и гарантировать, что будут учтены все зависимости, которые могли бы повлиять на порядок выполняемых операций. Как построить такую модель программирования, которая бы обеспечивала базовый параллелизм коллекции операций, выполняемых эффективно, либо параллельно, либо последовательно, смотря какие зависимости возникают между данной операцией и другими?

Решение: Реализуем граф зависимостей при помощи класса MailboxProcessor из F# и предоставляем методы как стандартные задачи (Task), чтобы их можно было потреблять и из C#

Такое решение называется «Ориентированный ациклический граф» (DAG) и призвано сформировать граф, дробя операции на последовательности атомарных задач с четко определенными зависимостями. В данном случае важна ациклическая суть графа, поскольку она устраняет возможность взаимных блокировок между задачами, при условии, что задачи на самом деле полностью атомарны. Задавая граф, важно понимать все зависимости между задачами, особенно неявные зависимости, которые могут приводить к взаимным блокировкам или условиям гонки. Ниже приведен типичный пример графовидной структуры данных, с помощью которой можно представить ограничения, возникающие при планировании взаимодействий между операциями в данном графе.

Граф – исключительно мощная структура данных, и на ее основе можно писать сильные алгоритмы.



Рис. 1 Граф – это совокупность вершин, соединенных ребрами. В этом представлении ориентированного графа узел 1 зависит от узлов 4 и 5, узел 2 зависит от узла 5, узел 3 зависит от узлов 5 и 6 и т.д.

Структура DAG применима в качестве стратегии для параллельного выполнения задач с учетом порядка зависимостей, что позволяет повысить производительность. Структуру такого графа можно определить при помощи класса MailboxProcessor из языка F#; в данном классе сохраняется внутреннее состояние для задач, зарегистрированных для выполнения в форме зависимостей ребер.

Валидация ориентированного ациклического графа

При работе с любой графовой структурой данных, например DAG, приходится заботиться о правильной регистрации ребер. Например, возвращаясь к рисунку 1: что будет, если у нас зарегистрирован узел 2 с зависимостями от узла 5, а узла 5 не существует? Также может случиться, что некоторые ребра зависят друг от друга, из-за чего возникает ориентированный цикл. При наличии ориентированного цикла критически важно выполнять некоторые задачи параллельно; в противном случае некоторые задачи могут вечно дожидаться выполнения других, и возникнет взаимная блокировка.

Задача решается при помощи топологической сортировки: это означает, что мы можем упорядочить все вершины графа таким образом, чтобы любое ребро вело от вершины с меньшим номером к вершине с большим номером. Так, если задача A должна завершиться до задачи B, а задача B – до задачи C, которая, в свою очередь, должна завершиться до задачи A, то возникает циклическая ссылка, и система уведомит вас об этой ошибке, выбросив исключение. Если при управлении очередностью возникает ориентированный цикл, то решения нет. Проверка такого рода называется «обнаружение цикла в ориентированном графе». Если ориентированный граф удовлетворяет описанным правилам, то является ориентированным ациклическим графом, отлично подходящим для параллельного запуска нескольких задач, между которыми существуют зависимости.

Полная версия листинга 2, содержащая код валидации DAG, есть в исходном коде, выложенном онлайн.

В следующем листинге класс MailboxProccessor из F# используется как идеальный кандидат для реализации DAG, обеспечивающего параллельное выполнение операций, связанных зависимостями. Сначала давайте определим размеченное объединение, при помощи которого будем управлять задачами и выполнять их зависимости.

Листинг 1 Тип сообщения и структура данных для координации выполнения задач в соответствии с их зависимостями

type TaskMessage = // #A
| AddTask of int * TaskInfo
| QueueTask of TaskInfo
| ExecuteTasks
and TaskInfo = // #B
{ Context : System.Threading.ExecutionContext
  Edges : int array; Id : int; Task : Func<Task>
  EdgesLeft : int option; Start : DateTimeOffset option
  End : DateTimeOffset option }


#A посылает к ParallelTasksDAG базовый агент dagAgent, отвечающий за координацию выполнения задач

#B Обертывает детали каждой задачи для выполнения

Тип TaskMessage представляет оболочки сообщений, отправляемых базовому агенту типа ParallelTasksDAG. Эти сообщения используются для координации задач и синхронизации зависимостей. Тип TaskInfo содержит и отслеживает подробности зарегистрированных задач в процессе выполнения DAG, в том числе, ребра зависимостей. Контекст выполнения (https://msdn.microsoft.com/en-us/library/system.threading.executioncontext(v=vs.110).aspx) захватывается с целью обращения к информации при отложенном выполнении, например, такой информации: текущий пользователь, любое состояние, ассоциированное с логическим потоком выполнения, информация о безопасном доступе к коду и т.п. После срабатывания события публикуется время начала и завершения выполнения.

Листинг 2 Агент DAG на F# для распараллеливания выполнения операций, связанных зависимостями

type ParallelTasksDAG() =
   let onTaskCompleted = new Event<TaskInfo>() // #A
   let dagAgent = new MailboxProcessor<TaskMessage>(fun inbox ->
   let rec loop (tasks : Dictionary<int, TaskInfo>) // #B
                (edges : Dictionary<int, int list>) = async { // #B
       let! msg = inbox.Receive() // #C
       match msg with
      | ExecuteTasks -> // #D
         let fromTo = new Dictionary<int, int list>()
         let ops = new Dictionary<int, TaskInfo>() // #E
         for KeyValue(key, value) in tasks do // #F
         let operation =
            { value with EdgesLeft = Some(value.Edges.Length) }
         for from in operation.Edges do
           let exists, lstDependencies = fromTo.TryGetValue(from)
           if not <| exists then
              fromTo.Add(from, [ operation.Id ])
           else fromTo.[from] <- (operation.Id :: lstDependencies)
         ops.Add(key, operation)
         ops |> Seq.iter (fun kv -> // #F
             match kv.Value.EdgesLeft with
            | Some(n) when n = 0 -> inbox.Post(QueueTask(kv.Value))
            | _ -> ())
        return! loop ops fromTo
     | QueueTask(op) -> // #G
          Async.Start <| async { // #G
           let start = DateTimeOffset.Now
           match op.Context with // #H
           | null -> op.Task.Invoke() |> Async.AwaitATsk
           | ctx -> ExecutionContext.Run(ctx.CreateCopy(), // #I
                    (fun op -> let opCtx = (op :?> TaskInfo)
                    opCtx.Task.Invoke().ConfigureAwait(false)), taskInfo)
                   let end’ = DateTimeOffset.Now
                   onTaskCompleted.Trigger 
                    { op with Start = Some(start)
                      End = Some(end’) } // #L
                  let exists, deps = edges.TryGetValue(op.Id)
                  if exists && deps.Length > 0 then
                    let depOps = getDependentOperation deps tasks []
                    edges.Remove(op.Id) |> ignore
                   depOps |> Seq.iter (fun nestedOp ->
                        inbox.Post(QueueTask(nestedOp))) }
                   return! loop tasks edges
    | AddTask(id, op) -> tasks.Add(id, op) // #M
                         return! loop tasks edges }
  loop (new Dictionary<int, TaskInfo>(HashIdentity.Structural))
       (new Dictionary<int, int list>(HashIdentity.Structural)))

[<CLIEventAttribute>]
member this.OnTaskCompleted = onTaskCompleted.Publish // #L
member this.ExecuteTasks() = dagAgent.Post ExecuteTasks // #N
member this.AddTask(id, task, [<ParamArray>] edges : int array) =
  let data = { Context = ExecutionContext.Capture()
               Edges = edges; Id = id; Task = task
               NumRemainingEdges = None; Start = None; End = None }
  dagAgent.Post(AddTask(id, data)) // #O


#A Экземпляр класса onTaskCompletedEvent, используется для уведомления о завершении задачи

#B Внутреннее состояние агента для отслеживания регистров задач и их зависимостей. Коллекции изменяемы, поскольку в ходе выполнения ParallelTasksDAG состояние меняется, и поскольку они унаследовали потокобезопасность, так как находятся в Agent

#C Асинхронно ожидаем выполнения

#D Оболочка сообщения, запускающего ParallelTasksDAG

#E Коллекция, отображаемая на монотонно увеличиваемый индекс с задачей к запуску

#F Процесс перебирает список задач, анализируя зависимости с другими задачами для создания топологической структуры, в которой представлен порядок выполнения задач

#G Оболочка сообщения для постановки задачи в очередь, ее выполнения и, в конечном итоге, для удаления этой задачи из состояния агента в качестве активной зависимости после того как задача завершится

#H Если подхваченный ExecutionContext равен null, то запускаем задачу в текущем контексте, в противном случае переходим к #I

#I Запускаем задачу в перехваченном ExecutionContext

#L Инициируем и публикуем событие onTaskCompleted, чтобы дать уведомление о завершении задачи. В событии содержится информация о задаче

#M Оболочка сообщения для добавления задачи на выполнение в соответствии с ее зависимостями, если таковые имеются

#N Запускает выполнение зарегистрированных задач

#O Добавление задачи, ее зависимостей и текущего ExecutionContext для выполнения DAG.

Цель функции AddTask – зарегистрировать задачу с произвольными ребрами зависимостей. Эта функция принимает уникальный ID, задачу, которая должна быть выполнена, и множество ребер, представляющих ID всех других зарегистрированных задач, которые должны быть выполнены прежде, чем можно будет приступать к выполнению данной задачи. Если массив пуст, это означает, что зависимостей нет. Экземпляр MailboxProcessor под названием dagAgent хранит зарегистрированные задачи в актуальном состоянии “tasks,” представляющим собой словарь (tasks : Dictionary<int, TaskInfo>), соотносящий ID каждой задачи и ее подробности. Более того, в Agent также хранится состояние зависимостей ребер по ID каждой задачи (edges : Dictionary<int, int list>). Когда агент получает уведомление о необходимости приступить к выполнению, в рамках этого процесса проверяется, чтобы все зависимости ребер были зарегистрированы, и чтобы в графе не было циклов. Данный этап верификации доступен а полной реализации ParallelTasksDAG, приведенной в онлайновом коде. Далее предлагаю пример на C#, где ссылаюсь на библиотеку that F# для запуска ParallelTasksDAG (и потребляю ее). Зарегистированные задачи отражают зависимости, представленные выше на рис. 1.

Func<int, int, Func<Task>> action = (id, delay) => async () => {
  Console.WriteLine($”Starting operation{id} in Thread Id
  {Thread.CurrentThread.ManagedThreadId}…”);
  await Task.Delay(delay);
};
var dagAsync = new DAG.ParallelTasksDAG();
dagAsync.OnTaskCompleted.Subscribe(op =>   
     Console.WriteLine($”Operation {op.Id} completed in Thread Id { Thread.CurrentThread.ManagedThreadId}”));
dagAsync.AddTask(1, action(1, 600), 4, 5);
dagAsync.AddTask(2, action(2, 200), 5);
dagAsync.AddTask(3, action(3, 800), 6, 5);
dagAsync.AddTask(4, action(4, 500), 6);
dagAsync.AddTask(5, action(5, 450), 7, 8);
dagAsync.AddTask(6, action(6, 100), 7);
dagAsync.AddTask(7, action(7, 900));
dagAsync.AddTask(8, action(8, 700));
dagAsync.ExecuteTasks();

Цель вспомогательной функции – вывести сообщение о том, что началось выполнение задачи, сославшись при этом на Id актуального потока для подтверждения многопоточности. С другой стороны, событие OnTaskCompleted регистрируется для выдачи уведомления о завершении каждой задачи с выводом в консоль ID задачи и Id актуального потока. Вот вывод, который мы получим при вызове метода ExecuteTasks.

Starting operation 8 in Thread Id 23…
Starting operation 7 in Thread Id 24…
Operation 8 Completed in Thread Id 23
Operation 7 Completed in Thread Id 24
Starting operation 5 in Thread Id 23…
Starting operation 6 in Thread Id 25…
Operation 6 Completed in Thread Id 25
Starting operation 4 in Thread Id 24…
Operation 5 Completed in Thread Id 23
Starting operation 2 in Thread Id 27…
Starting operation 3 in Thread Id 30…
Operation 4 Completed in Thread Id 24
Starting operation 1 in Thread Id 28…
Operation 2 Completed in Thread Id 27
Operation 1 Completed in Thread Id 28
Operation 3 Completed in Thread Id 30

Как видите, задачи выполняются параллельно в разных потоках (ID потока у них отличаются), и порядок зависимостей у них сохраняется.

В сущности, вот так и распараллеливаются задачи, имеющие зависимости. Подробнее читайте в книге Concurrency in .NET.
Tags:
Hubs:
+13
Comments 12
Comments Comments 12

Articles

Information

Website
piter.com
Registered
Founded
Employees
201–500 employees
Location
Россия