Ремарка

Данная работа носит подготовительный и исследовательский характер и служит вводной частью к следующей статье, в которой будут рассмотрены реализации уже конкретных алгоритмов поверх описанного подхода. Автор не утверждает, что представленная архитектура является идеальной, что именно так следует строить микрофреймворк или что предложенный код полностью соответствует академическим стандартам MapReduce. Более того, реализация может опускать некоторые детали, свойственные промышленным системам, и не претендует на абсолютную корректность передачи всех концепций оригинальной парадигмы. и так же автор не утверждает что корректно обьеснет что емсь MapReduce

Главная цель статьи — поделиться экспериментами, продемонстрировать минималистичный подход к построению абстракций, показать, как можно выразить MapReduce-модель в максимально компактном и понятном виде, а также сформировать основу, на которой в дальнейшем можно обсуждать уже сами алгоритмы, их свойства и особенности распределённой обработки данных.

Такая форма изложения позволяет сосредоточиться не на «правильности» инфраструктуры, а на идеях, которые за ней стоят, и использовать предложенный микрофреймворк как инструмент для дальнейших экспериментов, тестов и обучающих примеров. На данном этапе.

Вкратце MapReduce

MapReduce — это парадигма обработки данных, подход к работе с большими объёмами информации. Постерегусь называть «классом алгоритмов» (вопрос определений), скорее, речь о конкретной организации вычислений: где данные проходят через три этапа - Map --> Shuffle --> Reduce.

  • Mapи Reduceсодержат основную бизнес- или алгоритмическую логику.

  • Shuffle связывает эти этапы, группируя данные по ключам.

MapReduce предполагает логическую независимость стадий: каждую из них — Map, Shuffle или Reduce — можно запускать отдельно. При этом стадии выполняются в строгом порядке: сначала Map, Shuffle, Reduce соответственно. разберем все 3 этапа на 3 мною подготовленных схемах соответственном порядке:

Особенность подхода — горизонтальная масштабируемость на этапах Map и Reduce. Данные можно разделить на несколько или множество партиций, каждая из которых обрабатывается на отдельном узле. Количество партиций и узлов может варьироваться в зависимости от инфраструктуры и объёма данных, что позволяет гибко распределять нагрузку и ускорять обработку больших наборов данных.

На этапе Map исходные данные преобразуются в пары «ключ–значение» и записываются в ROM, а на этапе Reduce данные собираются по ключам, каждая партиция обрабатывается строго по ключу, и выполняется операция редьюсинга после чего итоговый результат снова записывается в ROM. Суть этого процесса я попыталась отобразить на след. схеме:

Map

схема стадии Map
схема стадии Map

На этапе Map есть некоторое множество W^{map} = \{W_i^{map}\}_{i=1}^{N_W} воркеров (может быть динамичным). Помимо воркеров существует Executor — необязательный компонент, но с его использованием реализация становится проще. Executor выполняет роль мастер-узла/процесса: он контролирует распределение данных/файлов F = \{f_i\}_{i=1}^{N_F} по воркерам, указывая каждому, какой фрагмент данных читать. В рамках ленивых вычислений и итераторов такую функциональность реализовать куда приятнее, то есть Executor фактически отвечает за распределение данных. E:W^{map}\rightrightarrows P(F),∀i \neq jj,E(w_i​)∩E(w_j​)=∅, w_i,w_j \in  W^{map}  (1)

Каждый воркер считывает некоторое подмножество множество файлов F определенные ей мультиотображением E о котором речь была выше, после посредством мультиотображения in: F \rightrightarrows I , преобразует их в объекты некоторого множества I. Затем к этому набору данных применяется второе мультитображение map: I \rightrightarrows KV  , которое переводит объекты в пары ключ-значение KV=K×V , представляющего собой декартово произведение множества ключей K и значений V Наконец, результат записывается в выходные файлы уже просто отображением out:  KV \rightrightarrows F' чтобы записать результат в выходные файлы F' в ROM. и так мы получаем мультиотображение из F \rightrightarrows F' где каждый воркер выполняет мультиотображение только для части данных P(F), что в совокупности со свойством один гарантирует что каждый файл данных обрабатывается только 1 раз.

Shuffle

схема этапа Shuffle
схема этапа Shuffle

Этап Shuffle, как уже отмечалось, носит преимущественно технический характер и сам по себе не содержит бизнес-логики (хотя при желании туда можно встроить дополнительные преобразования). Его задача — подготовить данные к Reduce. Поскольку Map записывает результаты в набор файлов, заранее неизвестно, какие ключи окажутся в каком из них. Shuffle выполняет группировку данных по ключам и тем самым передаёт Reduce чёткую структуру вида «ключ → перечень значений», необходимую для корректной обработки на следующем этапе. можно более кратко изобразить как Shuf:KV⟶(K ⟶ P(V))

подробно на чтении и записи на данном этапе останавливаться не буду — они происходят аналогично предыдущему этапу. Однако здесь важно подчеркнуть, что чтение входных данных на этапе Reduce является обычным отображением, а не мультитображением.

На данном этапе о целесообразности использования нескольких воркеров говорить рано: архитектурно процесс можно реализовать как в многопоточном/многопроцессном режиме, так и в виде одного компактного мини-процесса. При этом, чтобы избежать переписывания файлов, сейчас вполне достаточно формировать один мета файл, содержащий указатели на фрагменты F' на каждый ключ и место, где хранятся его данные.

Reduce

схема: Reduce
схема: Reduce

Подобно этапу Map, на стадии Reduce также существует множество воркеров {W^{reduce}}. Каждый воркер получает данные, считывая их из файлов, сформированных на предыдущем этапе. Однако, в отличие от Map- стадии, где входные данные обрабатываются в произвольном порядке, на этапе Reduce каждый воркер читает данные строго, сгруппированные по одному конкретному ключу reduce: P(V) \rightarrow V .

Задача воркера W_i^{reduce} — выполнить операцию reduce: P(V) \rightarrow V над всеми значениями, соответствующими его ключу P(V), и записать итоговый результат в ROM В виде некоторых файлов F' .

Назначение ключей воркерам распределяет Экзекьютор Eexecutor . Теоретически можно реализовать распределение и без центрального мастера, но такая схема уже не будет полностью соответствовать классической архитектуре MapReduce, где именно мастер обеспечивает детерминированное распределение ключей между Reduce-воркерами и согласованность всей вычислительной фазы.

Принципы API

В рамках данной статьи я хочу достигнуть 2 целей: сделать универсальное API и сделать простую реализацию (ну и походу простую задачу с почетом слов)

требования к API следующие:

  1. API должно чётко отражать стадии классической MapReduce (InputMapShuffle ReduceOutput), представляя каждую как независимый модуль с отдельным интерфейсом и единственной ответственностью.

  2. Универсальность и отсутствие привязки к конкретным технологиям
    API не должно зависеть: от инфраструктуры, форматов хранения, от конкретных источников данных, от сетевых библиотек. Только абстракции и стандартные типы.

  3. API должно быть универсальным: одинаково позволять работать локально, поверх распределённых фреймворков и встраиваться в любые Scala-проекты. Пользователь должен свободно задавать реализации всех стадий (Map, Shuffle, Reduce, IO) через единые абстрактные интерфейсы.

  4. Использовать только стандартную библиотеку
    Никаких внешних зависимостей.
    Только коллекции, итераторы, типы, исключения, файловые API, concurrency primitives при необходимости.

  5. Соответствие принципу PECS для стадий ввода/вывода (IO)
    В абстракциях ввода/вывода правильно использовать ковариантность/контравариантность:

  6. Поддержка ленивых вычислений на всех стадиях
    Все операции должны работать над Iterator, LazyList или аналогами.
    API не должно требовать загрузки данных в память.
    Стадии Map, Shuffle, Reduce должны поддерживать потоковую обработку.

касательно моей мини реализации - следующие:

  • работать строго локально

  • использовать виртуальные потоки как симуляцию воркеров

  • ограничить Сериализуемыми/десериализуемыми объектами (у меня за это отвечает circe)

  • конечному пользователь должно быть достаточно определить операцию map и reduce в виде лямбд

Пишем API

первое что определю это абстракцию ключ значения и операции map/shuffle/reduce

package pico.map.reduce.framework.api

//просто минималистичный клаас ключ значение
case class KV[k, v](key: k, value: v)

// задаем класс который считывается из исходных файлов I
// и типы для ключа и значения K,O
// возвращает итератор (), так как map — это мультиотображение
trait PMap[I, K, O]:
  // соответствует принципам API:
  // 1) описывает только мультиотображени Map.
  // 2) вход I, ключ K и значение O не привязаны к формату или инфраструктуре.
  // 3) интерфейс не ограничивает исполнение.
  // 4) только стандартная либа.
  // 5) Не содержит IO.
  // 6) Ленивость обеспечивается через Iterator.
  def pMap(input: I): Iterator[KV[K, O]]

// стадия Shuffle: группировка значений по ключу
trait PShuffle[K, O]:
  // соответствует принципам API:
  // 1) Описывает только отображение Shuffle.
  // 2) K и O абстрактны, интерфейс не привязан к формату или инфраструктуре.
  // 3) определяет лишь форму группировки.
  // 4) Использует только стандартную библиотеку.
  // 5) Не содержит IO.
  // 6) Ленивость обеспечивается через Iterator.
  def pShuffle(iterator: Iterator[KV[K, O]]): Iterator[KV[K, Iterator[O]]]

// стадия Reduce: объединяет два значения одного типа в одно
trait PReduce[O] {
  // соответствует принципам API:
  // 1) Описывает только отображение reduce.
  // 2) интерфейс не привязан к формату или инфраструктуре.
  // 3) определяет лишь операцию слияния.
  // 4) только стандартную библиотека.
  // 5) Не содержит IO.
  // 6) Ленивость обеспечивается через Iterator.
  def pReduce(o1: O, o2: O): O
}

//примечание, хоть тут все и написано в одном блоке
//в репозитории оно разнесено на несколько файлов (4 если быть точной)

Согласно принятой схеме, каждая из стадий предполагает наличие IO. Чтобы не перегружать код большим количеством отдельных трейтов или интерфейсов, я планирую сделать общую абстракцию для всех стадий, объединяя их функциональность в одном месте. Это позволит сохранить чистоту и гибкость API, не усложняя структуру типов и интерфейсов.

package pico.map.reduce.framework.api

// PIN — это абстракция источника данных (Input).
trait PIN[+I]:
  // 1) Описывает только источник данных в виде мультиотображения.
  // 2) не привязан к конкретному формату данных или инфраструктуре.
  // 3) не накладывая ограничений на исполнение.
  // 4) Использует только стандартную библиотеку.
  // 5) соответсвует PECS как коньсюмер: реализуется через ковариантность [+I].
  // 6) Ленивость обеспечивается через Iterator.
  def pRead(): Iterator[I]

object PIN:
  // фабричный метод.
  def apply[I](f: => Iterator[I]): PIN[I] = () => f


//POUT — это абстракция для «выходного потока» (Output).
trait POUT[-O]:
  // 1) описывает только стадию вывода (out)
  // 2) Тип O абстрактен и не привязан к конкретному формату данных или инфраструктуре
  // 3) Интерфейс определяет только операцию записи, не накладывая ограничений на реализацию
  // 4) Использует только стандартную библиотеку Scala
  // 5) соответсвует PECS как продюсер: реализуется через контравариантность [-I].
  // 6) В плане ленивости запускает pipeline. 
  def pWrite(o: O): Unit

object POUT:
  //фабричный метод
  def apply[O](f: O => Unit): POUT[O] = f(_)

// тут так же код разкидан на несколько файлов в репозитории (точнее по 2м)

MapReduce предполагает многоузловую архитектуру, каждому воркеру необходимо как-то определить свою область выгрузки, чтобы избежать конфликтов при параллельной записи. Я решила реализовать это посредством введения POUTGenerator — фабрики, которая порождает для каждого воркера отдельный экземпляр POUT.

Каждый экземпляр POUT, как вариант, создаваемый генератором, захватывает в замыкании свою уникальную конфигурацию: путь к файлу, каталог или namespace. Таким образом, при обработке нескольких воркеров, работающих одновременно, каждый пишет в свою «область», не пересекающуюся с другими.

Кроме того, генератор интегрируется с ленивой обработкой: POUT создается только в момент, когда воркер начинает обработку, что обеспечивает pull-based исполнение пайплайна. Это позволяет строить локальную экспериментальную MapReduce-систему, где данные обрабатываются и записываются по мере их поступления, без необходимости загружать все данные в память.

Такой подход сохраняет принципы MapReduce:

  • каждый воркер обрабатывает свою часть данных;

  • порядок записи и разбиение на файлы детерминированы;

  • ленивость и потоковая обработка данных сохраняются;

  • структура полностью совместима с параллельным исполнением на нескольких узлах.

package pico.map.reduce.framework.api

//генератор
trait POUTGenerator[-O]:
  //1) Описывает только аспект генерации Output-ресурса.
  // 2) Тип O абстрактен.
  // 3) Интерфейс минимален.
  // 4) ��спользует только стандартную библиотеку Scala.
  // 5) Соответствует PECS (Consumer Super) концептуально.
  // 6) Полностью совместим с ленивым исполнением:
  //    позволяя строить pull-based pipeline.
  def generate(): POUT[O]

//фабрика
object POUTGenerator:
  def apply[O](f: () => POUT[O]): POUTGenerator[O] = () => f()

можно заметить что согласно класической схеме MapReduce IO должно быть строго в ROM, я решила так не ограничиваться.

Однако для корректной работы конвейера одних абстракций для стадий и IO недостаточно: необходимо обеспечить соответствие типов между всеми стадиями (MapShuffleReduce).

Для этого вводём спецификации (Spec) для каждой стадии. Спецификация описывает, какие типы данных принимает и возвращает стадия, какие абстракции источника (PIN) и консьюмера (POUT) используются, и какие операции выполняются:

package pico.map.reduce.framework.api

// нужно ограничить стадии и их количество
sealed trait PSpec

// Спецификация для Map
case class PMapSpec[I, K, O](
  in: PIN[I],                           // Источник данных
  mapper: PMap[I, K, O],                // Функция преобразования
  outGen: POUTGenerator[KV[K, O]]       // Генератор выходных данных (POUT)
) extends PSpec
/*
  1) Этап Map — только преобразование данных из I в KV[K, O].
  2) Типы I, K, O абстрактны.
  3) Спека задаёт только связь входа, функции map и выхода.
  4) Используется только stdlib.
  5) PECS обеспечено IO.
  6) Ленивость обеспечена Iterator.
*/

// Спецификация для Shuffle
case class PShuffleSpec[K, O](
  in: PIN[KV[K, O]],                     // Источник данных из Map
  shuffle: PShuffle[K, O],               // Функция группировки по ключу
  out: POUTGenerator[O]                  // Генератор выходных данных
) extends PSpec
/*
  1) Этап Shuffle — только группировка KV по ключу.
  2) K, O абстрактны.
  3) Спека задаёт форму связи входа, функции shuffle и выхода.
  4) Используется только стандартная библиотека.
  5) PECS обеспечено IO.
  6) Ленивость обеспечена Iterator.
*/

// Спецификация для Reduce
case class PReduceSpec[O](
  in: PIN[Iterator[O]],                  // Источник данных — сгруппированные значения для Reduce
  reduce: PReduce[O],                    // Функция объединения значений
  out: POUTGenerator[O]                  // Генератор выходных данных
) extends PSpec
/*
  1) Этап Reduce — только объединение значений O в итоговое V
  2) Тип O абстрактен
  3) Спека задаёт только связь входа, функции reduce и выхода
  4) Используется только стандартная библиотека.
  5) PECS обеспечено IO.
  6) Ленивость обеспечена Iterator.
*/

Остается только определиться с экзекьюторами, которые будут управлять исполнением на основе своих спецификаций. Именно в них будет реализована логика распределения задач по воркерам. При этом API не накладывает ограничений: при желании всю обработку можно выполнять и в однопоточном режиме.

package pico.map.reduce.framework.api

// Экзекьюторы отвечают за выполнение конкретного этапа пайплайна.
// Они получают на вход соответствующую спецификацию (Spec), которая обеспечивает:
// - соответствие типов между стадиями,
// - правильное отображение / мультиотображение данных,
// - ленивость и потоковую обработку через Iterator, а также связывает PIN и POUT.
trait PMapExecutor[I, K, O]:
  def pExecute(spec: PMapSpec[I, K, O]): Unit

trait PShuffleExecutor[K, O]:
  def pExecute(spec: PShuffleSpec[K, O]): Unit

trait PReduceExecutor[O]:
  def pReduce(spec: PReduceSpec[O]): Unit

и последнее оставлю без коментариев это абстракция самой MapReduce джобы:

package pico.map.reduce.framework.api

trait MapReduceJob:
  def pExecute(): Unit

В демонстрационных кодовых блоках из статьи намеренно опущены фабричные методы и синтаксические расширения (apply, toSpec и т.д.). Они используются исключительно для удобства при написании кода и не несут самостоятельного логического смысла для работы MapReduce-конвейера. Основная цель статьи — показать архитектуру, абстракции, спецификации и экзекьюторы, а не удобство синтаксиса. Но можете зайти на Репозиторий и увидеть их.

моя схема реализации и ее код.

Для экспериментальной реализации MapReduce я выбрала упрощённый подход, ориентированный на локальное выполнение:

  1. Строго локальное выполнение
    Весь пайплайн работает на одной машине, без необходимости развертывания на кластере. Это позволяет легко тестировать и отлаживать алгоритмы без сложной инфраструктуры.

  2. Виртуальные потоки вместо воркеров
    Для симуляции многопоточности используются виртуальные потоки, которые выполняют функции воркеров. Каждый поток получает свою часть данных, обрабатывает её и записывает результат, при этом сохраняется логика распределения и изолированность, аналогичная настоящей многоузловой архитектуре.

  3. Сериализация и десериализация данных
    Для хранения и передачи промежуточных результатов используется Circe, что позволяет работать только с сериализуемыми/десериализуемыми объектами. Это упрощает запись в JSON-файлы и чтение с диска между этапами Map, Shuffle и Reduce.

  4. Простота для пользователя
    Конечному пользователю достаточно определить операции map и reduce в виде лямбд-функций. Весь остальной конвейер, включая чтение данных, группировку и запись, управляется через API и спецификации, что делает использование простым и удобным.

На стадии Map мы сознательно избегаем накладывать заранее фиксированную структуру на сырые данные, поэтому считываем их построчно String из произвольных текстовых файлов (например .txt). Входом может быть либо путь к конкретному файлу, либо к директории: в первом случае обрабатываются все строки указанного файла, во втором — строки всех файлов, найденных рекурсивным обходом каталога по принципу ls -R.

Считанные строки преобразуются функцией map: String -> KV[K, V]. Полученные пары ключ–значение сериализуются в JSON и сохраняются на диск. Чтобы избежать ситуации, когда для большого числа входных файлов образуется такой же объём выходных JSON-файлов, результаты предварительно агрегируются: в один выходной файл записывается сразу несколько JSON-объектов (пакетами по N штук).

Все выходные файлы помещаются в одну директорию — это упрощает следующую стадию, поскольку Reduce-воркеры заранее знают, где искать данные. В роли воркеров на этапе Map выступают виртуальные потоки из пула.

Один воркер может обрабатывать несколько партиций, но делает это последовательно, завершая работу с одной партицией перед переходом к следующей. Такой подход позволяет эффективно использовать пул потоков, избегая лишнего количества конкурирующих задач и упрощая управление ресурсами.

схема реализации Map
схема реализации Map
package pico.map.reduce.framework.engine.local

//import ...

object LocalFilePIN {
  
  
  def apply(url: String): PIN[String] =
    () => {
      
      val path = Paths.get(url)
      // возращаем просто скакатенированый итератор со всеми строками
      if (Files.isDirectory(path)) {
        listFilesRecursive(path)
          .filter(Files.isRegularFile(_))
          .flatMap(safeRead)
      } else if (Files.isRegularFile(path)) {
        safeRead(path) 
      } else {
        Iterator.empty
      }
    }

 
  // получает полный рекурсивынй список итераторов построчно с каждого файла
  private def listFilesRecursive(path: Path): Iterator[Path] =
    Iterator.single(path).flatMap {
      case p if Files.isRegularFile(p) => Iterator(p)
      case p if Files.isDirectory(p) => Files.list(p).toScala(Iterator).flatMap(listFilesRecursive)
      case _ => Iterator.empty
    }

  //для безопастного чтении, столкнулась изначально что не все было в UTF8
  private def safeRead(path: Path): Iterator[String] = {
    implicit val codec: Codec = Codec.UTF8
      .onMalformedInput(CodingErrorAction.REPLACE)
      .onUnmappableCharacter(CodingErrorAction.REPLACE)

    try Source.fromFile(path.toFile)(codec).getLines()
    catch {
      case _: Throwable => Iterator.empty
    }
  }
}

//POUT чтоб выгрузить в ROM Json-файлы
object LocalFileOUT {
  //говорить тут не о чем собо
  //O: Encoder ограничевает типами которые можно в Json преобразовать
  def apply[O: Encoder](path: String): POUT[O] = {
    val p = Paths.get(path)
    val writer = Files.newBufferedWriter(p, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING)
      
    (o: O) => this.synchronized {
      try {
        val json = o.asJson.noSpaces
        writer.write(json) // преобразуем в JSOn
        writer.newLine()
        writer.flush()
      } catch {
        case e: Throwable => e.printStackTrace()
      }
    }
  }
}


// LocalMapExecutor — локальная реализация Map-этапа на виртуальных потоках.
// Позволяет запускать MapReduce пайплайн **строго локально**, с симуляцией воркеров
// через виртуальные потоки, используя ленивость и спецификации.
object LocalMapExecutor {

  // Создаёт исполнителя Map-этапа.
  // I — тип входных данных, K — тип ключа (Encoder нужен для сериализации через Circe),
  // O — тип значения.
  // chunkSize — количество KV-объектов, которые будут записаны одним блоком (для группировки в один JSON-файл)
  def apply[I, K: Encoder, O: Encoder](chunkSize: Int = 10000): PMapExecutor[I, K, O] = {
    case PMapSpec(in, mapper, gen) =>

      // 1) Создание виртуального пула потоков для имитации воркеров
      val pool = Executors.newVirtualThreadPerTaskExecutor()
      given ExecutionContext = ExecutionContext.fromExecutor(pool)

      // 2) Чтение данных лениво через PIN
      in.pRead()
        // 3) Применение Map-операции, ленивое преобразование I -> KV[K,O]
        .flatMap(mapper.pMap)
        // 4) Группировка результатов по chunkSize для записи в JSON-файлы
        .grouped(chunkSize)
        .map(_.iterator)
        .foreach { chunk =>
          // 5) Для каждого блока создаём отдельный POUT через POUTGenerator,
          //    чтобы каждому воркеру была назначена своя область вывода
          val pout = gen.generate()
          // 6) Асинхронное выполнение записи через Future (виртуальный поток)
          Future {
            chunk.foreach(pout.pWrite)
          }
        }

      // 7) Ожидание завершения всех воркеров
      pool.shutdown()
      pool.awaitTermination(Long.MaxValue, TimeUnit.NANOSECONDS)
  }
}

На этапе Shuffle необходимо считать все пары KV[K, V] из JSON-файлов, сформированных стадией Map. Поскольку все выходные файлы Map мы заранее сохранили в одну директорию, Shuffle-процесс может просто последовательно пройтись по ней и загрузить все объекты.

Задача Shuffle состоит в том, чтобы сгруппировать данные по ключу: все значения V, относящиеся к одному и тому же ключу K, собираются вместе и записываются в один выходной JSON-файл. В результате после Shuffle формируется структура вида один ключ — один файл, содержащий все значения {V}, соответствующие этому ключу.

Такой подход обеспечивает корректность последующей Reduce-стадии: воркер Reduce-этапа может однозначно и без дополнительного поиска взять все значения для своего ключа, обработать их и выдать итоговый результат.

Следует отметить, что реализация этапа Shuffle является полностью статичной: логика чтения, группировки и распределения пар KV[K, V] жёстко зашита в систему и не подлежит настройке пользователем. Таким образом обеспечивается единообразное и детерминированное поведение всего конвейера обработки данных.

схема реализации Shuffle
схема реализации Shuffle
package pico.map.reduce.framework.engine.local

object LocalFilePINJson {
  //ограничиваем тем что может быть обработано circe
  def apply[I: Decoder](url: String): PIN[I] =
    () => {
              //все тоже самое
    }

  //единственое что поменялось это десереализация из json в обьекты типа I 
  private def safeReadJson[I: Decoder](path: Path): Iterator[I] = {
    implicit val codec: Codec = Codec.UTF8
      .onMalformedInput(CodingErrorAction.REPLACE)
      .onUnmappableCharacter(CodingErrorAction.REPLACE)

    try {
      Source.fromFile(path.toFile)(codec)
        .getLines()
        .flatMap { line =>
          //декодер в действии 
          decode[I](line).toOption
        }
    } catch {
      case _: Throwable =>
        Iterator.empty
    }
  }
}

//OUT как у прошлой стадии

//сделал только одну реализацию совсем простую
//на самом деле хотела полностью ленивую 
//и в коде кто посмотрит может обнаружить попытку эту
//но я забила, по итогу мне все это надо для следующей статьи ток как не посмотри
//и я для реализации не использую настолько большое количество данных чтоб вызвать OutOfMemory
object SimplyShuffle {
  def apply[K, O](): PShuffle[K, O] =
    iter =>
      iter.toSeq
        .groupBy(_.key)
        .iterator
        .map { case (k, kvs) => KV(k, kvs.iterator.map(_.value)) }

И наконец, на этапе Reduce каждый воркер просто берет свой файл, содержащий все значения для одного ключа, и выполняет пользовательскую операцию reduce: Iterator[V] -> V. Полученное итоговое значение V сериализуется в JSON и записывается в ROM. Как и на Map-стадии, несколько результирующих значений могут быть объединены пакетами по N штук в один выходной файл, чтобы не создавать избыточное количество маленьких файлов.

Стоит уточнить, что вся описанная реализация разработана исключительно для экспериментальных целей. Она не предназначена для реальных продакшен-нагрузок или распределённых бизнес-задач. Для этого существуют более гибкие подходы и API, позволяющие реализовать воркеры в Docker или Kubernetes, распределять вычисления по независимым машинам, использовать инфраструктуру Hadoop MapReduce, запускаться поверх YARN и т.д. Данная локальная реализация ориентирована на небольшие объёмы данных и нужна прежде всего как базовая платформа для следующей статьи — уже не про фреймворк, а про сами алгоритмы.

схема Reduce
схема Reduce
object LocalReduceExecutor {
  def apply[O](chunkSize: Int = 1024): PReduceExecutor[O] =
    case PReduceSpec(in, reduce, gen) =>

      // создаём пул виртуальных потоков
      val pool = Executors.newVirtualThreadPerTaskExecutor()
      given ExecutionContext = ExecutionContext.fromExecutor(pool)

      in.pRead() 
        // каждая запись — это Iterator[O], который пришёл из Shuffle
        // здесь выполняется reduce: O1, O2, ... -> single O
        .map(iter => iter.reduce(reduce.pReduce))

        // результаты группируются батчами, чтобы не создавать лишних задач
        .grouped(chunkSize)

        // для каждого чанка создаём отдельный POUT (через POUTGenerator)
        // и параллельно пишем результат
        .foreach { chunk =>
          val pout = gen.generate()
          Future {
            chunk.foreach(pout.pWrite)
          }
        }

      // ждём завершения всех виртуальных потоков
      pool.shutdown()
      pool.awaitTermination(Long.MaxValue, java.util.concurrent.TimeUnit.NANOSECONDS)
}

//IO точно такое же как и в первом шаге

сборка полной Job из спецификаций и локальных executors

Остаётся последний шаг: собрать полноценную джобу, объединяющую три стадии — MapShuffleReduce — в единый конвейер.
Как было оговорено ранее, я хочу, чтобы пользователю было достаточно передать:

  • только директорию с исходными файлами,

  • функцию map: I ⇒ Iterator[KV[K, O]],

  • функцию reduce: (O, O) ⇒ O,

Поскольку локальные реализации всех трёх стадий (LocalMapExecutor, SimplyShuffle, LocalReduceExecutor) уже существуют, объединение сводится к тому, чтобы:

  1. Сгенерировать цепочку PSpec (MapSpecShuffleSpecReduceSpec) на основе входных параметров.

  2. Последовательно вызвать соответствующие executors, передав им их спецификации.

  3. Обеспечить совместимость типов и единый формат IO, что уже гарантируется самим API через типизированные PSpec.

Итоговое «склеивание» получается тонким слоем над API:
джоба создаёт спецификации на основе переданных map/reduce-функций и структуры входных файлов, затем локальные экзекьюторы исполняют каждую стадию — строго последовательно, но с внутренним параллелизмом на virtual threads.

То есть конечному пользователю остаётся описать только логику обработки, а детали исполнения (конкурентность, файловые области, сериализация, чанки, порядок стадий) полностью берёт на себя фреймворк.

object LocalJob:

  // Основной метод для создания MapReduce задачи на локальной машине.
  // Параметры:
  // K — тип ключа после этапа map
  // O — тип значения после этапа map
  // sourcePath — путь к исходным файлам
  // map — функция маппинга (разбивает входные данные на ключ-значение)
  // reduce — функция редьюса (сводит значения с одинаковыми ключами)
  //
  // Дополнение о сериализации:
  // Для локальной реализации фреймворка используется JSON-формат, а сериализацию/
  // десериализацию обеспечивает библиотека Circe. Контекстные параметры
  // Encoder и Decoder (K: Encoder, K: Decoder, O: Encoder, O: Decoder)
  // автоматически подставляются компилятором через implicit/given-механизм Scala
  // и позволяют:
  //   • записывать пары KV[K, O] в JSON-файлы между стадиями,
  //   • читать значения K и O обратно при переходе к следующей стадии.
  //
  // Таким образом, ограничения "K и O должны быть сериализуемыми" реализованы 
  // через требование наличия Encoder/Decoder, а Circe выступает универсальным
  // backend-ом для JSON-IO на всех локальных стадиях.
  def apply[K: Encoder : Decoder, O: Encoder : Decoder](
      sourcePath: String,
      map: PMap[String, K, O],
      reduce: PReduce[O]
  ): MapReduceJob = () => {

    // Определяем базовый путь проекта, чтобы все этапы писали/читали файлы относительно него
    val basePath: Path = Paths.get(System.getProperty("user.dir"))
    val pathMap = basePath.resolve("map/").toString       // папка для хранения промежуточного вывода map
    val pathShuffle = basePath.resolve("shuffle/").toString // папка для хранения пром��жуточного вывода shuffle
    val pathOut = basePath.resolve("out/").toString       // папка для финального результата reduce

    // Создаём "исполнителей" для каждого этапа
    // Они управляют конкретной реализацией Map, Shuffle и Reduce
    given mapExecutor: PMapExecutor[String, K, O] = LocalMapExecutor()
    given shuffleExecutor: PShuffleExecutor[K, O] = LocalShuffleExecutor()
    given reduceExecutor: PReduceExecutor[O] = LocalReduceExecutor[O]()

    // Имплиситы для кодирования и декодирования ключ-значение через Circe JSON
    // Это нужно для сериализации промежуточных данных в файлы
    implicit def kvEncoder[K: Encoder, O: Encoder]: Encoder[KV[K, O]] = deriveEncoder[KV[K, O]]
    implicit def kvDecoder[K: Decoder, O: Decoder]: Decoder[KV[K, O]] = deriveDecoder[KV[K, O]]

    // Универсальный генератор файлов для записи данных любого типа T
    // Функция возвращает замыкание, которое каждый раз создаёт новый файл с уникальным именем
    def makeGenerator[T: Encoder](dir: String): POUTGenerator[T] = {
      val counter = new AtomicLong(0) // счётчик для уникальных имён файлов
      () => {
        val partNum = counter.getAndIncrement()
        val filePath = Paths.get(dir).resolve(s"part-$partNum")
        LocalFileOUT[T](filePath.toString) // создаём объект для записи данных типа T в файл
      }
    }

    // ----------------------------
    // Этап Map
    // ----------------------------
    val mapIN: PIN[String] = LocalFilePIN(sourcePath) // читатель исходных файлов
    val mapGen: POUTGenerator[KV[K, O]] = makeGenerator[KV[K, O]](pathMap) // генератор выходных файлов map
    (mapIN, map, mapGen).toSpec.pMapExecute // выполнение Map

    // ----------------------------
    // Этап Shuffle
    // ----------------------------
    val shuffleIN: PIN[KV[K, O]] = LocalFilePINJson[KV[K, O]](pathMap) // чтение промежуточного вывода Map
    val shuffle = SimplyShuffle[K, O]() // простой shuffle: группировка значений по ключу
    val shuffleGen: POUTGenerator[O] = makeGenerator[O](pathShuffle) // генератор файлов shuffle
    (shuffleIN, shuffle, shuffleGen).toSpec.pShuffleExecute // выполнение Shuffle

    // ----------------------------
    // Этап Reduce
    // ----------------------------
    // чтение сгруппированных данных
    val reduceIN: PIN[Iterator[O]] = LocalFilePINJsonIterPerFile[O](pathShuffle)
    // генератор файлов Reduce
val reduceGen: POUTGenerator[O] = makeGenerator[O](pathOut)
    (reduceIN, reduce, reduceGen).toSpec.pReduceExecute // выполнение Reduce
  }

Решим мини задачу на посчет строк

чтоб убедиться что все работает решим классическую тестовую задачу MapReduce - подсчет слов.

package pico.map.reduce.framework.api.demo

import io.circe.{Decoder, Encoder}
import pico.map.reduce.framework.engine.local.LocalJob
import pico.map.reduce.framework.api.{KV, PMap, PReduce}

object example1 {

  def main(args: Array[String]): Unit = {
    // Первый аргумент — путь к директории с входными текстовыми файлами.
    val path = args(0)

    // Этап Map:
    // Берём строку, разбиваем на слова по непробельным символам (\W+),
    // фильтруем пустые токены и для каждого слова создаём пару KV(word, (word, 1)).
    // Значение (word, 1) — классический формат для подсчёта слов.
    val map: PMap[String, String, (String, Int)] =
      input => input
        .split("""\W+""")
        .iterator
        .filter(_.trim.nonEmpty)
        .map(word => KV(word, (word, 1)))

    // Этап Reduce:
    // Складываем частоты: (word, count1) + (word, count2) = (word, count1 + count2).
    // Формат строго соответствует структуре Map-значений.
    val reduce: PReduce[(String, Int)] =
      (kv1, kv2) => (kv1._1, kv1._2 + kv2._2)

    // Собираем локальную MapReduce-задачу.
    // Здесь LocalJob автоматически:
    //   • создаст PIN для чтения строк из файлов,
    //   • выполнит этап Map,
    //   • сериализует результаты в JSON через Circe,
    //   • выполнит Shuffle,
    //   • выполнит Reduce,
    //   • и запишет итоговые результаты в выходную директорию.
    val job = LocalJob[String, (String, Int)](
      path,
      map,
      reduce
    )

    // Запуск полного конвейера Map → Shuffle → Reduce.
    job.pExecute()
  }

}

решила запустить код над директрорией где у меня лежал python код кагого то старого проекта и вот такой мини срез вышел:

...
["unit164",2]
["removeSearchFromUrl",2]
["26108968",16]
["august",6]
["ginput_counter",36]
["prop_map_20",2]
["y",13]
["prop_map_38",2]
["base_image",6]
["block94",2]
["03T10",15]
["unit84",2]
["add",12]
["listing",247]
["Souk",12]
["instant_video_call_dom",6]
["67rem",6]
["PROJECTS_URL",7]
["beach",1]
["immediate",72]
["P301",12]
["map_search_form",3]
["amentis",5]
....

как видим посчет слов удался, так же попробовала запустить просто на 1ом воркере везде, получилось мнего медленее.

В результате эта небольшая демонстрация подтверждает корректность всей локальной реализации MapReduce-конвейера: от абстрактных API-спецификаций и исполнителей до реальных файловых PIN/POUT-механизмов, сериализации Circe и параллельной обработки через виртуальные потоки. Классический пример подсчёта слов работает “как из учебника”, что показывает: разработанные абстракции действительно позволяют выразить MapReduce-логику естественно, минималистично и без привязки к инфраструктуре. Именно такую основу удобно использовать в следующей статье — уже для обсуждения алгоритмов и экспериментальных задач поверх этого фреймворка.

P.S. Код проекта доступен здесь на GitHub.

P.P.S. Если проект понравился, буду рада ⭐ на репозитории!

P.P.S. Планирую развивать фреймворк дальше:

  • проработать иерархию ошибок в API;

  • добавить множество реализаций, например:

    • создание Job поверх Apache Hadoop MapReduce, Apache YARN, Apache Spark;

    • IO через JDBC;

    • исполнение воркеров в Docker/Kubernetes.

P.P.P.S. Если хотите помочь с вышеописанным планом, буду рада вашим Pull Request’ам.

P.P.P.P.S. В следующей статье будет подробно про алгоритмы. MapReduce позволяет решать множество интересных задач.