Ремарка
Данная работа носит подготовительный и исследовательский характер и служит вводной частью к следующей статье, в которой будут рассмотрены реализации уже конкретных алгоритмов поверх описанного подхода. Автор не утверждает, что представленная архитектура является идеальной, что именно так следует строить микрофреймворк или что предложенный код полностью соответствует академическим стандартам . Более того, реализация может опускать некоторые детали, свойственные промышленным системам, и не претендует на абсолютную корректность передачи всех концепций оригинальной парадигмы. и так же автор не утверждает что корректно обьеснет что емсь MapReduce
Главная цель статьи — поделиться экспериментами, продемонстрировать минималистичный подход к построению абстракций, показать, как можно выразить в максимально компактном и понятном виде, а также сформировать основу, на которой в дальнейшем можно обсуждать уже сами алгоритмы, их свойства и особенности распределённой обработки данных.
Такая форма изложения позволяет сосредоточиться не на «правильности» инфраструктуры, а на идеях, которые за ней стоят, и использовать предложенный микрофреймворк как инструмент для дальнейших экспериментов, тестов и обучающих примеров. На данном этапе.
Вкратце MapReduce
MapReduce — это парадигма обработки данных, подход к работе с большими объёмами информации. Постерегусь называть «классом алгоритмов» (вопрос определений), скорее, речь о конкретной организации вычислений: где данные проходят через три этапа - Map --> Shuffle --> Reduce.
MapиReduceсодержат основную бизнес- или алгоритмическую логику.Shuffleсвязывает эти этапы, группируя данные по ключам.
MapReduce предполагает логическую независимость стадий: каждую из них — , или — можно запускать отдельно. При этом стадии выполняются в строгом порядке: сначала соответственно. разберем все 3 этапа на 3 мною подготовленных схемах соответственном порядке:
Особенность подхода — горизонтальная масштабируемость на этапах и Данные можно разделить на несколько или множество партиций, каждая из которых обрабатывается на отдельном узле. Количество партиций и узлов может варьироваться в зависимости от инфраструктуры и объёма данных, что позволяет гибко распределять нагрузку и ускорять обработку больших наборов данных.
На этапе исходные данные преобразуются в пары «ключ–значение» и записываются в , а на этапе Reduce данные собираются по ключам, каждая партиция обрабатывается строго по ключу, и выполняется операция редьюсинга после чего итоговый результат снова записывается в . Суть этого процесса я попыталась отобразить на след. схеме:
Map

На этапе Map есть некоторое множество воркеров (может быть динамичным). Помимо воркеров существует Executor — необязательный компонент, но с его использованием реализация становится проще. Executor выполняет роль мастер-узла/процесса: он контролирует распределение данных/файлов по воркерам, указывая каждому, какой фрагмент данных читать. В рамках ленивых вычислений и итераторов такую функциональность реализовать куда приятнее, то есть Executor фактически отвечает за распределение данных.
Каждый воркер считывает некоторое подмножество множество файлов определенные ей мультиотображением E о котором речь была выше, после посредством мультиотображения , преобразует их в объекты некоторого множества . Затем к этому набору данных применяется второе мультитображение , которое переводит объекты в пары ключ-значение , представляющего собой декартово произведение множества ключей и значений Наконец, результат записывается в выходные файлы уже просто отображением чтобы записать результат в выходные файлы F' в ROM. и так мы получаем мультиотображение из где каждый воркер выполняет мультиотображение только для части данных что в совокупности со свойством один гарантирует что каждый файл данных обрабатывается только 1 раз.
Shuffle

Этап Shuffle, как уже отмечалось, носит преимущественно технический характер и сам по себе не содержит бизнес-логики (хотя при желании туда можно встроить дополнительные преобразования). Его задача — подготовить данные к Reduce. Поскольку Map записывает результаты в набор файлов, заранее неизвестно, какие ключи окажутся в каком из них. Shuffle выполняет группировку данных по ключам и тем самым передаёт Reduce чёткую структуру вида «ключ → перечень значений», необходимую для корректной обработки на следующем этапе. можно более кратко изобразить как
подробно на чтении и записи на данном этапе останавливаться не буду — они происходят аналогично предыдущему этапу. Однако здесь важно подчеркнуть, что чтение входных данных на этапе Reduce является обычным отображением, а не мультитображением.
На данном этапе о целесообразности использования нескольких воркеров говорить рано: архитектурно процесс можно реализовать как в многопоточном/многопроцессном режиме, так и в виде одного компактного мини-процесса. При этом, чтобы избежать переписывания файлов, сейчас вполне достаточно формировать один мета файл, содержащий указатели на фрагменты F' на каждый ключ и место, где хранятся его данные.
Reduce

Подобно этапу Map, на стадии Reduce также существует множество воркеров . Каждый воркер получает данные, считывая их из файлов, сформированных на предыдущем этапе. Однако, в отличие от Map- стадии, где входные данные обрабатываются в произвольном порядке, на этапе Reduce каждый воркер читает данные строго, сгруппированные по одному конкретному ключу .
Задача воркера — выполнить операцию над всеми значениями, соответствующими его ключу , и записать итоговый результат в ROM В виде некоторых файлов .
Назначение ключей воркерам распределяет Экзекьютор — . Теоретически можно реализовать распределение и без центрального мастера, но такая схема уже не будет полностью соответствовать классической архитектуре , где именно мастер обеспечивает детерминированное распределение ключей между -воркерами и согласованность всей вычислительной фазы.
Принципы API
В рамках данной статьи я хочу достигнуть 2 целей: сделать универсальное и сделать простую реализацию (ну и походу простую задачу с почетом слов)
требования к следующие:
API должно чётко отражать стадии классической MapReduce (
→→→→), представляя каждую как независимый модуль с отдельным интерфейсом и единственной ответственностью.Универсальность и отсутствие привязки к конкретным технологиям
API не должно зависеть: от инфраструктуры, форматов хранения, от конкретных источников данных, от сетевых библиотек. Только абстракции и стандартные типы.API должно быть универсальным: одинаково позволять работать локально, поверх распределённых фреймворков и встраиваться в любые Scala-проекты. Пользователь должен свободно задавать реализации всех стадий (
,,,) через единые абстрактные интерфейсы.Использовать только стандартную библиотеку
Никаких внешних зависимостей.
Только коллекции, итераторы, типы, исключения, файловые, concurrency primitives при необходимости.Соответствие принципу
для стадий ввода/вывода ()
В абстракциях ввода/вывода правильно использовать ковариантность/контравариантность:Поддержка ленивых вычислений на всех стадиях
Все операции должны работать надIterator,LazyListили аналогами.
API не должно требовать загрузки данных в память.
Стадии,,должны поддерживать потоковую обработку.
касательно моей мини реализации - следующие:
работать строго локально
использовать виртуальные потоки как симуляцию воркеров
ограничить Сериализуемыми/десериализуемыми объектами (у меня за это отвечает circe)
конечному пользователь должно быть достаточно определить операцию map и reduce в виде лямбд
Пишем API
первое что определю это абстракцию ключ значения и операции
package pico.map.reduce.framework.api
//просто минималистичный клаас ключ значение
case class KV[k, v](key: k, value: v)
// задаем класс который считывается из исходных файлов I
// и типы для ключа и значения K,O
// возвращает итератор (), так как map — это мультиотображение
trait PMap[I, K, O]:
// соответствует принципам API:
// 1) описывает только мультиотображени Map.
// 2) вход I, ключ K и значение O не привязаны к формату или инфраструктуре.
// 3) интерфейс не ограничивает исполнение.
// 4) только стандартная либа.
// 5) Не содержит IO.
// 6) Ленивость обеспечивается через Iterator.
def pMap(input: I): Iterator[KV[K, O]]
// стадия Shuffle: группировка значений по ключу
trait PShuffle[K, O]:
// соответствует принципам API:
// 1) Описывает только отображение Shuffle.
// 2) K и O абстрактны, интерфейс не привязан к формату или инфраструктуре.
// 3) определяет лишь форму группировки.
// 4) Использует только стандартную библиотеку.
// 5) Не содержит IO.
// 6) Ленивость обеспечивается через Iterator.
def pShuffle(iterator: Iterator[KV[K, O]]): Iterator[KV[K, Iterator[O]]]
// стадия Reduce: объединяет два значения одного типа в одно
trait PReduce[O] {
// соответствует принципам API:
// 1) Описывает только отображение reduce.
// 2) интерфейс не привязан к формату или инфраструктуре.
// 3) определяет лишь операцию слияния.
// 4) только стандартную библиотека.
// 5) Не содержит IO.
// 6) Ленивость обеспечивается через Iterator.
def pReduce(o1: O, o2: O): O
}
//примечание, хоть тут все и написано в одном блоке
//в репозитории оно разнесено на несколько файлов (4 если быть точной)
Согласно принятой схеме, каждая из стадий предполагает наличие . Чтобы не перегружать код большим количеством отдельных трейтов или интерфейсов, я планирую сделать общую абстракцию для всех стадий, объединяя их функциональность в одном месте. Это позволит сохранить чистоту и гибкость , не усложняя структуру типов и интерфейсов.
package pico.map.reduce.framework.api
// PIN — это абстракция источника данных (Input).
trait PIN[+I]:
// 1) Описывает только источник данных в виде мультиотображения.
// 2) не привязан к конкретному формату данных или инфраструктуре.
// 3) не накладывая ограничений на исполнение.
// 4) Использует только стандартную библиотеку.
// 5) соответсвует PECS как коньсюмер: реализуется через ковариантность [+I].
// 6) Ленивость обеспечивается через Iterator.
def pRead(): Iterator[I]
object PIN:
// фабричный метод.
def apply[I](f: => Iterator[I]): PIN[I] = () => f
//POUT — это абстракция для «выходного потока» (Output).
trait POUT[-O]:
// 1) описывает только стадию вывода (out)
// 2) Тип O абстрактен и не привязан к конкретному формату данных или инфраструктуре
// 3) Интерфейс определяет только операцию записи, не накладывая ограничений на реализацию
// 4) Использует только стандартную библиотеку Scala
// 5) соответсвует PECS как продюсер: реализуется через контравариантность [-I].
// 6) В плане ленивости запускает pipeline.
def pWrite(o: O): Unit
object POUT:
//фабричный метод
def apply[O](f: O => Unit): POUT[O] = f(_)
// тут так же код разкидан на несколько файлов в репозитории (точнее по 2м)
MapReduce предполагает многоузловую архитектуру, каждому воркеру необходимо как-то определить свою область выгрузки, чтобы избежать конфликтов при параллельной записи. Я решила реализовать это посредством введения — фабрики, которая порождает для каждого воркера отдельный экземпляр .
Каждый экземпляр , как вариант, создаваемый генератором, захватывает в замыкании свою уникальную конфигурацию: путь к файлу, каталог или namespace. Таким образом, при обработке нескольких воркеров, работающих одновременно, каждый пишет в свою «область», не пересекающуюся с другими.
Кроме того, генератор интегрируется с ленивой обработкой: создается только в момент, когда воркер начинает обработку, что обеспечивает pull-based исполнение пайплайна. Это позволяет строить локальную экспериментальную -систему, где данные обрабатываются и записываются по мере их поступления, без необходимости загружать все данные в память.
Такой подход сохраняет принципы :
каждый воркер обрабатывает свою часть данных;
порядок записи и разбиение на файлы детерминированы;
ленивость и потоковая обработка данных сохраняются;
структура полностью совместима с параллельным исполнением на нескольких узлах.
package pico.map.reduce.framework.api
//генератор
trait POUTGenerator[-O]:
//1) Описывает только аспект генерации Output-ресурса.
// 2) Тип O абстрактен.
// 3) Интерфейс минимален.
// 4) ��спользует только стандартную библиотеку Scala.
// 5) Соответствует PECS (Consumer Super) концептуально.
// 6) Полностью совместим с ленивым исполнением:
// позволяя строить pull-based pipeline.
def generate(): POUT[O]
//фабрика
object POUTGenerator:
def apply[O](f: () => POUT[O]): POUTGenerator[O] = () => f()можно заметить что согласно класической схеме MapReduce должно быть строго в , я решила так не ограничиваться.
Однако для корректной работы конвейера одних абстракций для стадий и IO недостаточно: необходимо обеспечить соответствие типов между всеми стадиями ( → → ).
Для этого вводём спецификации (Spec) для каждой стадии. Спецификация описывает, какие типы данных принимает и возвращает стадия, какие абстракции источника (PIN) и консьюмера (POUT) используются, и какие операции выполняются:
package pico.map.reduce.framework.api
// нужно ограничить стадии и их количество
sealed trait PSpec
// Спецификация для Map
case class PMapSpec[I, K, O](
in: PIN[I], // Источник данных
mapper: PMap[I, K, O], // Функция преобразования
outGen: POUTGenerator[KV[K, O]] // Генератор выходных данных (POUT)
) extends PSpec
/*
1) Этап Map — только преобразование данных из I в KV[K, O].
2) Типы I, K, O абстрактны.
3) Спека задаёт только связь входа, функции map и выхода.
4) Используется только stdlib.
5) PECS обеспечено IO.
6) Ленивость обеспечена Iterator.
*/
// Спецификация для Shuffle
case class PShuffleSpec[K, O](
in: PIN[KV[K, O]], // Источник данных из Map
shuffle: PShuffle[K, O], // Функция группировки по ключу
out: POUTGenerator[O] // Генератор выходных данных
) extends PSpec
/*
1) Этап Shuffle — только группировка KV по ключу.
2) K, O абстрактны.
3) Спека задаёт форму связи входа, функции shuffle и выхода.
4) Используется только стандартная библиотека.
5) PECS обеспечено IO.
6) Ленивость обеспечена Iterator.
*/
// Спецификация для Reduce
case class PReduceSpec[O](
in: PIN[Iterator[O]], // Источник данных — сгруппированные значения для Reduce
reduce: PReduce[O], // Функция объединения значений
out: POUTGenerator[O] // Генератор выходных данных
) extends PSpec
/*
1) Этап Reduce — только объединение значений O в итоговое V
2) Тип O абстрактен
3) Спека задаёт только связь входа, функции reduce и выхода
4) Используется только стандартная библиотека.
5) PECS обеспечено IO.
6) Ленивость обеспечена Iterator.
*/
Остается только определиться с экзекьюторами, которые будут управлять исполнением на основе своих спецификаций. Именно в них будет реализована логика распределения задач по воркерам. При этом API не накладывает ограничений: при желании всю обработку можно выполнять и в однопоточном режиме.
package pico.map.reduce.framework.api
// Экзекьюторы отвечают за выполнение конкретного этапа пайплайна.
// Они получают на вход соответствующую спецификацию (Spec), которая обеспечивает:
// - соответствие типов между стадиями,
// - правильное отображение / мультиотображение данных,
// - ленивость и потоковую обработку через Iterator, а также связывает PIN и POUT.
trait PMapExecutor[I, K, O]:
def pExecute(spec: PMapSpec[I, K, O]): Unit
trait PShuffleExecutor[K, O]:
def pExecute(spec: PShuffleSpec[K, O]): Unit
trait PReduceExecutor[O]:
def pReduce(spec: PReduceSpec[O]): Unitи последнее оставлю без коментариев это абстракция самой MapReduce джобы:
package pico.map.reduce.framework.api
trait MapReduceJob:
def pExecute(): UnitВ демонстрационных кодовых блоках из статьи намеренно опущены фабричные методы и синтаксические расширения (apply, toSpec и т.д.). Они используются исключительно для удобства при написании кода и не несут самостоятельного логического смысла для работы -конвейера. Основная цель статьи — показать архитектуру, абстракции, спецификации и экзекьюторы, а не удобство синтаксиса. Но можете зайти на Репозиторий и увидеть их.
моя схема реализации и ее код.
Для экспериментальной реализации я выбрала упрощённый подход, ориентированный на локальное выполнение:
Строго локальное выполнение
Весь пайплайн работает на одной машине, без необходимости развертывания на кластере. Это позволяет легко тестировать и отлаживать алгоритмы без сложной инфраструктуры.Виртуальные потоки вместо воркеров
Для симуляции многопоточности используются виртуальные потоки, которые выполняют функции воркеров. Каждый поток получает свою часть данных, обрабатывает её и записывает результат, при этом сохраняется логика распределения и изолированность, аналогичная настоящей многоузловой архитектуре.Сериализация и десериализация данных
Для хранения и передачи промежуточных результатов используется Circe, что позволяет работать только с сериализуемыми/десериализуемыми объектами. Это упрощает запись ви чтение с диска между этапами Map,и Reduce.Простота для пользователя
Конечному пользователю достаточно определить операции map и reduce в виде лямбд-функций. Весь остальной конвейер, включая чтение данных, группировку и запись, управляется черези спецификации, что делает использование простым и удобным.
На стадии мы сознательно избегаем накладывать заранее фиксированную структуру на сырые данные, поэтому считываем их построчно из произвольных текстовых файлов (например .txt). Входом может быть либо путь к конкретному файлу, либо к директории: в первом случае обрабатываются все строки указанного файла, во втором — строки всех файлов, найденных рекурсивным обходом каталога по принципу ls -R.
Считанные строки преобразуются функцией map: String -> KV[K, V]. Полученные пары ключ–значение сериализуются в JSON и сохраняются на диск. Чтобы избежать ситуации, когда для большого числа входных файлов образуется такой же объём выходных JSON-файлов, результаты предварительно агрегируются: в один выходной файл записывается сразу несколько JSON-объектов (пакетами по N штук).
Все выходные файлы помещаются в одну директорию — это упрощает следующую стадию, поскольку -воркеры заранее знают, где искать данные. В роли воркеров на этапе Map выступают виртуальные потоки из пула.
Один воркер может обрабатывать несколько партиций, но делает это последовательно, завершая работу с одной партицией перед переходом к следующей. Такой подход позволяет эффективно использовать пул потоков, избегая лишнего количества конкурирующих задач и упрощая управление ресурсами.

package pico.map.reduce.framework.engine.local
//import ...
object LocalFilePIN {
def apply(url: String): PIN[String] =
() => {
val path = Paths.get(url)
// возращаем просто скакатенированый итератор со всеми строками
if (Files.isDirectory(path)) {
listFilesRecursive(path)
.filter(Files.isRegularFile(_))
.flatMap(safeRead)
} else if (Files.isRegularFile(path)) {
safeRead(path)
} else {
Iterator.empty
}
}
// получает полный рекурсивынй список итераторов построчно с каждого файла
private def listFilesRecursive(path: Path): Iterator[Path] =
Iterator.single(path).flatMap {
case p if Files.isRegularFile(p) => Iterator(p)
case p if Files.isDirectory(p) => Files.list(p).toScala(Iterator).flatMap(listFilesRecursive)
case _ => Iterator.empty
}
//для безопастного чтении, столкнулась изначально что не все было в UTF8
private def safeRead(path: Path): Iterator[String] = {
implicit val codec: Codec = Codec.UTF8
.onMalformedInput(CodingErrorAction.REPLACE)
.onUnmappableCharacter(CodingErrorAction.REPLACE)
try Source.fromFile(path.toFile)(codec).getLines()
catch {
case _: Throwable => Iterator.empty
}
}
}
//POUT чтоб выгрузить в ROM Json-файлы
object LocalFileOUT {
//говорить тут не о чем собо
//O: Encoder ограничевает типами которые можно в Json преобразовать
def apply[O: Encoder](path: String): POUT[O] = {
val p = Paths.get(path)
val writer = Files.newBufferedWriter(p, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING)
(o: O) => this.synchronized {
try {
val json = o.asJson.noSpaces
writer.write(json) // преобразуем в JSOn
writer.newLine()
writer.flush()
} catch {
case e: Throwable => e.printStackTrace()
}
}
}
}
// LocalMapExecutor — локальная реализация Map-этапа на виртуальных потоках.
// Позволяет запускать MapReduce пайплайн **строго локально**, с симуляцией воркеров
// через виртуальные потоки, используя ленивость и спецификации.
object LocalMapExecutor {
// Создаёт исполнителя Map-этапа.
// I — тип входных данных, K — тип ключа (Encoder нужен для сериализации через Circe),
// O — тип значения.
// chunkSize — количество KV-объектов, которые будут записаны одним блоком (для группировки в один JSON-файл)
def apply[I, K: Encoder, O: Encoder](chunkSize: Int = 10000): PMapExecutor[I, K, O] = {
case PMapSpec(in, mapper, gen) =>
// 1) Создание виртуального пула потоков для имитации воркеров
val pool = Executors.newVirtualThreadPerTaskExecutor()
given ExecutionContext = ExecutionContext.fromExecutor(pool)
// 2) Чтение данных лениво через PIN
in.pRead()
// 3) Применение Map-операции, ленивое преобразование I -> KV[K,O]
.flatMap(mapper.pMap)
// 4) Группировка результатов по chunkSize для записи в JSON-файлы
.grouped(chunkSize)
.map(_.iterator)
.foreach { chunk =>
// 5) Для каждого блока создаём отдельный POUT через POUTGenerator,
// чтобы каждому воркеру была назначена своя область вывода
val pout = gen.generate()
// 6) Асинхронное выполнение записи через Future (виртуальный поток)
Future {
chunk.foreach(pout.pWrite)
}
}
// 7) Ожидание завершения всех воркеров
pool.shutdown()
pool.awaitTermination(Long.MaxValue, TimeUnit.NANOSECONDS)
}
}
На этапе Shuffle необходимо считать все пары KV[K, V] из , сформированных стадией Map. Поскольку все выходные файлы Map мы заранее сохранили в одну директорию, Shuffle-процесс может просто последовательно пройтись по ней и загрузить все объекты.
Задача Shuffle состоит в том, чтобы сгруппировать данные по ключу: все значения V, относящиеся к одному и тому же ключу K, собираются вместе и записываются в один выходной . В результате после Shuffle формируется структура вида один ключ — один файл, содержащий все значения {V}, соответствующие этому ключу.
Такой подход обеспечивает корректность последующей Reduce-стадии: воркер Reduce-этапа может однозначно и без дополнительного поиска взять все значения для своего ключа, обработать их и выдать итоговый результат.
Следует отметить, что реализация этапа является полностью статичной: логика чтения, группировки и распределения пар KV[K, V] жёстко зашита в систему и не подлежит настройке пользователем. Таким образом обеспечивается единообразное и детерминированное поведение всего конвейера обработки данных.

package pico.map.reduce.framework.engine.local
object LocalFilePINJson {
//ограничиваем тем что может быть обработано circe
def apply[I: Decoder](url: String): PIN[I] =
() => {
//все тоже самое
}
//единственое что поменялось это десереализация из json в обьекты типа I
private def safeReadJson[I: Decoder](path: Path): Iterator[I] = {
implicit val codec: Codec = Codec.UTF8
.onMalformedInput(CodingErrorAction.REPLACE)
.onUnmappableCharacter(CodingErrorAction.REPLACE)
try {
Source.fromFile(path.toFile)(codec)
.getLines()
.flatMap { line =>
//декодер в действии
decode[I](line).toOption
}
} catch {
case _: Throwable =>
Iterator.empty
}
}
}
//OUT как у прошлой стадии
//сделал только одну реализацию совсем простую
//на самом деле хотела полностью ленивую
//и в коде кто посмотрит может обнаружить попытку эту
//но я забила, по итогу мне все это надо для следующей статьи ток как не посмотри
//и я для реализации не использую настолько большое количество данных чтоб вызвать OutOfMemory
object SimplyShuffle {
def apply[K, O](): PShuffle[K, O] =
iter =>
iter.toSeq
.groupBy(_.key)
.iterator
.map { case (k, kvs) => KV(k, kvs.iterator.map(_.value)) }
И наконец, на этапе Reduce каждый воркер просто берет свой файл, содержащий все значения для одного ключа, и выполняет пользовательскую операцию reduce: Iterator[V] -> V. Полученное итоговое значение V сериализуется в JSON и записывается в ROM. Как и на Map-стадии, несколько результирующих значений могут быть объединены пакетами по штук в один выходной файл, чтобы не создавать избыточное количество маленьких файлов.
Стоит уточнить, что вся описанная реализация разработана исключительно для экспериментальных целей. Она не предназначена для реальных продакшен-нагрузок или распределённых бизнес-задач. Для этого существуют более гибкие подходы и , позволяющие реализовать воркеры в или , распределять вычисления по независимым машинам, использовать инфраструктуру Hadoop , запускаться поверх и т.д. Данная локальная реализация ориентирована на небольшие объёмы данных и нужна прежде всего как базовая платформа для следующей статьи — уже не про фреймворк, а про сами алгоритмы.

object LocalReduceExecutor {
def apply[O](chunkSize: Int = 1024): PReduceExecutor[O] =
case PReduceSpec(in, reduce, gen) =>
// создаём пул виртуальных потоков
val pool = Executors.newVirtualThreadPerTaskExecutor()
given ExecutionContext = ExecutionContext.fromExecutor(pool)
in.pRead()
// каждая запись — это Iterator[O], который пришёл из Shuffle
// здесь выполняется reduce: O1, O2, ... -> single O
.map(iter => iter.reduce(reduce.pReduce))
// результаты группируются батчами, чтобы не создавать лишних задач
.grouped(chunkSize)
// для каждого чанка создаём отдельный POUT (через POUTGenerator)
// и параллельно пишем результат
.foreach { chunk =>
val pout = gen.generate()
Future {
chunk.foreach(pout.pWrite)
}
}
// ждём завершения всех виртуальных потоков
pool.shutdown()
pool.awaitTermination(Long.MaxValue, java.util.concurrent.TimeUnit.NANOSECONDS)
}
//IO точно такое же как и в первом шаге
сборка полной Job из спецификаций и локальных executors
Остаётся последний шаг: собрать полноценную джобу, объединяющую три стадии — → → — в единый конвейер.
Как было оговорено ранее, я хочу, чтобы пользователю было достаточно передать:
только директорию с исходными файлами,
функцию
,функцию
,
Поскольку локальные реализации всех трёх стадий (LocalMapExecutor, SimplyShuffle, LocalReduceExecutor) уже существуют, объединение сводится к тому, чтобы:
Сгенерировать цепочку PSpec (
→→) на основе входных параметров.Последовательно вызвать соответствующие
, передав им их спецификации.Обеспечить совместимость типов и единый формат
, что уже гарантируется самимчерез типизированные.
Итоговое «склеивание» получается тонким слоем над
джоба создаёт спецификации на основе переданных map/reduce-функций и структуры входных файлов, затем локальные экзекьюторы исполняют каждую стадию — строго последовательно, но с внутренним параллелизмом на .
То есть конечному пользователю остаётся описать только логику обработки, а детали исполнения (конкурентность, файловые области, сериализация, чанки, порядок стадий) полностью берёт на себя фреймворк.
object LocalJob:
// Основной метод для создания MapReduce задачи на локальной машине.
// Параметры:
// K — тип ключа после этапа map
// O — тип значения после этапа map
// sourcePath — путь к исходным файлам
// map — функция маппинга (разбивает входные данные на ключ-значение)
// reduce — функция редьюса (сводит значения с одинаковыми ключами)
//
// Дополнение о сериализации:
// Для локальной реализации фреймворка используется JSON-формат, а сериализацию/
// десериализацию обеспечивает библиотека Circe. Контекстные параметры
// Encoder и Decoder (K: Encoder, K: Decoder, O: Encoder, O: Decoder)
// автоматически подставляются компилятором через implicit/given-механизм Scala
// и позволяют:
// • записывать пары KV[K, O] в JSON-файлы между стадиями,
// • читать значения K и O обратно при переходе к следующей стадии.
//
// Таким образом, ограничения "K и O должны быть сериализуемыми" реализованы
// через требование наличия Encoder/Decoder, а Circe выступает универсальным
// backend-ом для JSON-IO на всех локальных стадиях.
def apply[K: Encoder : Decoder, O: Encoder : Decoder](
sourcePath: String,
map: PMap[String, K, O],
reduce: PReduce[O]
): MapReduceJob = () => {
// Определяем базовый путь проекта, чтобы все этапы писали/читали файлы относительно него
val basePath: Path = Paths.get(System.getProperty("user.dir"))
val pathMap = basePath.resolve("map/").toString // папка для хранения промежуточного вывода map
val pathShuffle = basePath.resolve("shuffle/").toString // папка для хранения пром��жуточного вывода shuffle
val pathOut = basePath.resolve("out/").toString // папка для финального результата reduce
// Создаём "исполнителей" для каждого этапа
// Они управляют конкретной реализацией Map, Shuffle и Reduce
given mapExecutor: PMapExecutor[String, K, O] = LocalMapExecutor()
given shuffleExecutor: PShuffleExecutor[K, O] = LocalShuffleExecutor()
given reduceExecutor: PReduceExecutor[O] = LocalReduceExecutor[O]()
// Имплиситы для кодирования и декодирования ключ-значение через Circe JSON
// Это нужно для сериализации промежуточных данных в файлы
implicit def kvEncoder[K: Encoder, O: Encoder]: Encoder[KV[K, O]] = deriveEncoder[KV[K, O]]
implicit def kvDecoder[K: Decoder, O: Decoder]: Decoder[KV[K, O]] = deriveDecoder[KV[K, O]]
// Универсальный генератор файлов для записи данных любого типа T
// Функция возвращает замыкание, которое каждый раз создаёт новый файл с уникальным именем
def makeGenerator[T: Encoder](dir: String): POUTGenerator[T] = {
val counter = new AtomicLong(0) // счётчик для уникальных имён файлов
() => {
val partNum = counter.getAndIncrement()
val filePath = Paths.get(dir).resolve(s"part-$partNum")
LocalFileOUT[T](filePath.toString) // создаём объект для записи данных типа T в файл
}
}
// ----------------------------
// Этап Map
// ----------------------------
val mapIN: PIN[String] = LocalFilePIN(sourcePath) // читатель исходных файлов
val mapGen: POUTGenerator[KV[K, O]] = makeGenerator[KV[K, O]](pathMap) // генератор выходных файлов map
(mapIN, map, mapGen).toSpec.pMapExecute // выполнение Map
// ----------------------------
// Этап Shuffle
// ----------------------------
val shuffleIN: PIN[KV[K, O]] = LocalFilePINJson[KV[K, O]](pathMap) // чтение промежуточного вывода Map
val shuffle = SimplyShuffle[K, O]() // простой shuffle: группировка значений по ключу
val shuffleGen: POUTGenerator[O] = makeGenerator[O](pathShuffle) // генератор файлов shuffle
(shuffleIN, shuffle, shuffleGen).toSpec.pShuffleExecute // выполнение Shuffle
// ----------------------------
// Этап Reduce
// ----------------------------
// чтение сгруппированных данных
val reduceIN: PIN[Iterator[O]] = LocalFilePINJsonIterPerFile[O](pathShuffle)
// генератор файлов Reduce
val reduceGen: POUTGenerator[O] = makeGenerator[O](pathOut)
(reduceIN, reduce, reduceGen).toSpec.pReduceExecute // выполнение Reduce
}
Решим мини задачу на посчет строк
чтоб убедиться что все работает решим классическую тестовую задачу - подсчет слов.
package pico.map.reduce.framework.api.demo
import io.circe.{Decoder, Encoder}
import pico.map.reduce.framework.engine.local.LocalJob
import pico.map.reduce.framework.api.{KV, PMap, PReduce}
object example1 {
def main(args: Array[String]): Unit = {
// Первый аргумент — путь к директории с входными текстовыми файлами.
val path = args(0)
// Этап Map:
// Берём строку, разбиваем на слова по непробельным символам (\W+),
// фильтруем пустые токены и для каждого слова создаём пару KV(word, (word, 1)).
// Значение (word, 1) — классический формат для подсчёта слов.
val map: PMap[String, String, (String, Int)] =
input => input
.split("""\W+""")
.iterator
.filter(_.trim.nonEmpty)
.map(word => KV(word, (word, 1)))
// Этап Reduce:
// Складываем частоты: (word, count1) + (word, count2) = (word, count1 + count2).
// Формат строго соответствует структуре Map-значений.
val reduce: PReduce[(String, Int)] =
(kv1, kv2) => (kv1._1, kv1._2 + kv2._2)
// Собираем локальную MapReduce-задачу.
// Здесь LocalJob автоматически:
// • создаст PIN для чтения строк из файлов,
// • выполнит этап Map,
// • сериализует результаты в JSON через Circe,
// • выполнит Shuffle,
// • выполнит Reduce,
// • и запишет итоговые результаты в выходную директорию.
val job = LocalJob[String, (String, Int)](
path,
map,
reduce
)
// Запуск полного конвейера Map → Shuffle → Reduce.
job.pExecute()
}
}
решила запустить код над директрорией где у меня лежал python код кагого то старого проекта и вот такой мини срез вышел:
...
["unit164",2]
["removeSearchFromUrl",2]
["26108968",16]
["august",6]
["ginput_counter",36]
["prop_map_20",2]
["y",13]
["prop_map_38",2]
["base_image",6]
["block94",2]
["03T10",15]
["unit84",2]
["add",12]
["listing",247]
["Souk",12]
["instant_video_call_dom",6]
["67rem",6]
["PROJECTS_URL",7]
["beach",1]
["immediate",72]
["P301",12]
["map_search_form",3]
["amentis",5]
....как видим посчет слов удался, так же попробовала запустить просто на 1ом воркере везде, получилось мнего медленее.
В результате эта небольшая демонстрация подтверждает корректность всей локальной реализации -конвейера: от абстрактных -спецификаций и исполнителей до реальных файловых PIN/POUT-механизмов, сериализации Circe и параллельной обработки через виртуальные потоки. Классический пример подсчёта слов работает “как из учебника”, что показывает: разработанные абстракции действительно позволяют выразить -логику естественно, минималистично и без привязки к инфраструктуре. Именно такую основу удобно использовать в следующей статье — уже для обсуждения алгоритмов и экспериментальных задач поверх этого фреймворка.
P.S. Код проекта доступен здесь на GitHub.
P.P.S. Если проект понравился, буду рада ⭐ на репозитории!
P.P.S. Планирую развивать фреймворк дальше:
проработать иерархию ошибок в API;
добавить множество реализаций, например:
создание Job поверх
,,;черезисполнение воркеров в
.
P.P.P.S. Если хотите помочь с вышеописанным планом, буду рада вашим Pull Request’ам.
P.P.P.P.S. В следующей статье будет подробно про алгоритмы. MapReduce позволяет решать множество интересных задач.
