Как стать автором
Обновить
763.03
Сбер
Технологии, меняющие мир

Изменить сохранения Spark Часть вторая: реализация партишенера

Время на прочтение44 мин
Количество просмотров2.6K

Автор: Иван Калининский, участник профессионального сообщества Сбера SberProfi DWH/BigData.

Профессиональное сообщество SberProfi DWH/BigData отвечает за развитие компетенций в таких направлениях, как экосистема Hadoop, Teradata, Oracle DB, GreenPlum, а также BI инструментах Qlik, SAP BO, Tableau и др.

Начну описание партишенера с UML-диаграммы:

Рисунок 1. UML-диаграмма классов OrderBucketsPartitioner
Рисунок 1. UML-диаграмма классов OrderBucketsPartitioner

Наверняка вы видели, как Spark разбирает запрос, строит план и выполняет его. Сопоставим эту схему с нашей конкретной реализацией (когда будете знакомиться с реализацией классов, возвращайтесь к этой схеме, чтобы увидеть соответствие):

Рисунок 2. Сопоставление схемы обработки запроса и конкретной реализации.
Рисунок 2. Сопоставление схемы обработки запроса и конкретной реализации.

Нужен метод, который позволил бы использовать партишенер для любого датафрейма. Пока не будем связываться с SQL (разве кто-то делает dataframe.repartition(...) с помощью SQL?).

Добавим такой метод в Scala Dataframe API, используя паттерн «Pimp my library».
Для этого создадим новый объект ru.kalininskii.orderbucketing.OrderBucketing. Он будет содержать implicit class с пока единственным методом repartitionWithOrderAndSort:

OrderBucketing
package ru.kalininskii.orderbucketing
 
import org.apache.spark.sql.{Column, DataFrame, PlanHelper}
import org.apache.spark.sql.catalyst.expressions.{Ascending, Expression, SortOrder}
import ru.kalininskii.orderbucketing.plans.logical.RepartitionWithOrderAndSort
 
object OrderBucketing {
 
  implicit class DataFramePartOps(ds: DataFrame) {
    def repartitionWithOrderAndSort(numLines: Int,
                                     numPartitions: Int,
                                     orderColumn: Column,
                                     partitionColumns: Seq[Column],
                                     sortColumns: Seq[Column]): DataFrame = {
      def toSortOrder(col: Column): SortOrder = {
        col.expr match {
          case order: SortOrder => order.copy(direction = Ascending)
          case expr: Expression => SortOrder(expr, Ascending)
          case _ => throw new Exception(s"Can not get order from $col")
        }
      }
 
      val orderExpression = toSortOrder(orderColumn)
      val partitionExpressions = partitionColumns.map(_.expr)
      val sortExpressions = sortColumns.map(toSortOrder)
 
      val logicalPlan = RepartitionWithOrderAndSort(
        orderExpression,
        partitionExpressions,
        sortExpressions,
        numLines,
        numPartitions,
        None,
        ds.queryExecution.logical)
 
      PlanHelper.planToDF(ds.sparkSession, logicalPlan)
    }
  }
 
}

Как можно видеть из кода выше, метод принимает аргумент partitionColumns: Seq[Column], в котором поля явного и неявного секционирования объединены. Так и должно быть, партишенер разделяет записи по секциям независимо от их внешнего представления. 

Объект PlanHelper пока что также обойдётся одним методом, planToDF. Этот метод вызывает приватный метод для пакета org.apache.spark.sql, поэтому, чтобы он мог выполняться, объект тоже должен находиться в этом пакете:

PlanHelper
package org.apache.spark.sql
 
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 
object PlanHelper {
 
  def planToDF(spark: SparkSession, logicalPlan: LogicalPlan): DataFrame = {
    Dataset.ofRows(spark, logicalPlan)
  }
}

Тип для описания одной секции RDD, а также ключа RDD, который понадобится нам в определённый момент, поместим в package object, чтобы он был доступен всему нашему пакету и не только

package object rangebucketing
package ru.kalininskii
 
import org.apache.spark.sql.catalyst.InternalRow
 
package object rangebucketing {
  type BucketsDistribution = (Int, InternalRow, Either[(InternalRow, Seq[String]), (InternalRow, InternalRow, Int)])
  type OrderAndSortKey = (InternalRow, InternalRow, InternalRow)
}

Теперь нужно определить планы, начиная с логического. И в них переопределить метод simpleString, так как иначе он выведет в план весь переданный объект distribution. Логический план для этого репартиционирования будет виден при использовании метода explain, и можно будет убедиться, что он действительно применяется. Кроме того, он встроен в формирование планов выполнения запросов и будет предоставлять информацию о физическом секционировании набора данных в переменной partitioning.

RepartitionWithOrderAndSort
package ru.kalininskii.orderbucketing.plans.logical
 
import org.apache.spark.sql.catalyst.expressions.{Expression, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, RepartitionOperation}
import ru.kalininskii.orderbucketing.BucketsDistribution
import ru.kalininskii.orderbucketing.plans.physical.OrderBucketsPartitioning
 
/**
 * Этот класс разделяет данные по условю уникальных точных значений [[Expression]]
 *
 * @param orderExpression      выражение, по интервалам значений которого будет производиться разделение
 *                             в пределах одного значения [[partitionExpressions]]
 * @param partitionExpressions выражения, по значениям которых будет производиться разделение.
 * @param sortExpressions      выражения, по значениям которых будет производиться локальная сортировка.
 * @param numLines             количество записей в одной секции RDD (и в записанном файле)
 * @param numPartitions        предполагаемое количество секций, может отличаться
 * @param distribution         информация об имеющемся распределении, которое надо воспроизвести
 * @param child                план получения набора данных, который нужно секционировать
 */
case class RepartitionWithOrderAndSort(
                                        orderExpression: SortOrder,
                                        partitionExpressions: Seq[Expression],
                                        sortExpressions: Seq[SortOrder],
                                        numLines: Int,
                                        numPartitions: Int,
                                        distribution: Option[Seq[BucketsDistribution]],
                                        child: LogicalPlan
                                      ) extends RepartitionOperation {
 
  override def nodeName: String = s"Repartition plan with " +
    s"${distribution.map(_ => "predefined").getOrElse("unknown")} distribution:"
 
  override def simpleString: String = {
    s"$nodeName ${partitionExpressions.mkString("partition by [", ", ", "], ")}" +
      s"global order by ${orderExpression.child}, " +
      s"${sortExpressions.map(_.child).mkString("local sort by [", ", ", "], ")}" +
      numLines + " rows per task, " + numPartitions + " initial tasks"
  }
 
  val partitioning: OrderBucketsPartitioning = OrderBucketsPartitioning(
    orderExpression, partitionExpressions, sortExpressions, numLines, numPartitions, distribution)
 
  override def maxRows: Option[Long] = child.maxRows
 
  override def shuffle: Boolean = true
}

Класс Partitioning и его расширения в структуре пакетов Spark относятся к физическим планам (org.apache.spark.sql.catalyst.plans.physical), но скорее представляют собой контейнеры для характеристик конкретного датафрейма.

Все встроенные реализации создаются кейс классом (см. Термины и определения) org.apache.spark.sql.catalyst.plans.logical.RepartitionByExpression, получая значения полей, переданных в указанный кейс класс.

И в нашем случае, OrderBucketsPartitioning можно считать описанием операции над таблицей, прочитанной из файловой системы или объектного хранилища. Не будет ошибкой отнести класс к логическому плану, ведь метод dataframe.explain(true) именно его выводит в каждом дереве логического плана).

В реализации есть неприятный момент: трейт (см. Термины и определения) Distribution   в исходном коде Spark объявлен как sealed, а значит не может быть расширен. Поэтому в методе satisfies0 я буду использовать OrderedDistribution, хотя она не лучшим образом подходит для описания полученного распределения.

OrderBucketsPartitioning
package ru.kalininskii.orderbucketing.plans.physical
 
import org.apache.spark.sql.catalyst.expressions.{Expression, SortOrder, Unevaluable}
import org.apache.spark.sql.catalyst.plans.physical.{Distribution, OrderedDistribution, Partitioning}
import org.apache.spark.sql.types.{DataType, IntegerType}
import ru.kalininskii.orderbucketing.BucketsDistribution
 
/**
 * Этот класс передаёт данные для партиционирования по диапазонам в пределах партиций
 *
 * @param orderExpression      выражение, по интервалам значений которого будет производиться разделение
 *                             в пределах одного значения [[partitionExpressions]]
 * @param partitionExpressions выражения, по значениям которых будет производиться разделение
 * @param sortExpressions      выражения, по значениям которых будет производиться локальная сортировка
 * @param numLines             количество записей в одной секции RDD (и в записанном файле)
 * @param numPartitions        предполагаемое количество секций, может отличаться
 * @param distribution         информация об имеющемся распределении, которое надо воспроизвести
 */
case class OrderBucketsPartitioning(
                                     orderExpression: SortOrder,
                                     partitionExpressions: Seq[Expression],
                                     sortExpressions: Seq[SortOrder],
                                     numLines: Int,
                                     numPartitions: Int,
                                     distribution: Option[Seq[BucketsDistribution]])
 
  extends Expression with Partitioning with Unevaluable {
 
  override def nodeName: String = s"Repartition with " +
    s"${distribution.map(_ => "predefined").getOrElse("unknown")} distribution:"
 
  override def simpleString: String = {
    s"$nodeName ${partitionExpressions.mkString("partition by [", ", ", "], ")}" +
      s"global order by ${orderExpression.child}, " +
      s"${sortExpressions.map(_.child).mkString("local sort by [", ", ", "], ")}" +
      numLines + " rows per task, " + numPartitions + " initial tasks"
  }
 
  override def children: Seq[Expression] = partitionExpressions :+ orderExpression
 
  override def nullable: Boolean = false
 
  override def dataType: DataType = IntegerType
 
  override def satisfies0(required: Distribution): Boolean = {
    super.satisfies0(required) || {
      required match {
        case o: OrderedDistribution =>
          orderExpression.semanticEquals(o.ordering.head)
        case _ => false
      }
    }
  }
}

Далее два основных класса, org.apache.spark.sql.execution.exchange.ShuffleExchangeOrderExec и org.apache.spark.sql.partitioning.OrderBucketsPartitioner. Они используют методы с ограниченной видимостью, поэтому должны находиться в пределах пакета org.apache.spark.sql. Эти классы выполняют трансформации RDD, относятся к планам выполнения.

Основное отличие org.apache.spark.sql.execution.exchange.ShuffleExchangeOrderExec от оригинала заключается в том, что для семплирования используется RDD, в котором MutablePair использует обе части:

  • первую - для значений полей секционирования ([P]);

  • вторую - для значения поля упорядочивания ([O]).

С моей точки зрения, это уменьшит объём данных для дальнейшей обработки и позволит использовать полученные структуры для других наборов данных. Полученные структуры в любом случае должны быть переданы на драйвер, а затем переданы исполнителям (так работает broadcast), поэтому разумно с самого начала уменьшать их, насколько это возможно.

ShuffleExchangeOrderExec
package org.apache.spark.sql.execution.exchange
 
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.Serializer
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.errors._
import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering
import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection}
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.partitioning.OrderBucketsPartitioner
import org.apache.spark.util.MutablePair
import ru.kalininskii.orderbucketing.OrderAndSortKey
import ru.kalininskii.orderbucketing.plans.physical.OrderBucketsPartitioning
 
import scala.reflect.ClassTag
 
/**
 * Выполняет shuffle, который описывается newPartitioning
 * Код частично заимствован из [[ShuffleExchangeExec]]
 */
case class ShuffleExchangeOrderExec(newPartitioning: OrderBucketsPartitioning,
                                    child: SparkPlan,
                                    partitioner: Option[Partitioner] = None) extends Exchange {
 
  override lazy val metrics: Map[String, SQLMetric] = Map(
    "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"))
 
  override def nodeName: String = "ExchangeWithOrder"
 
  override def outputPartitioning: Partitioning = newPartitioning
 
  private val serializer: Serializer = SparkEnv.get
    .serializerManager.getSerializer(implicitly[ClassTag[OrderAndSortKey]],
    implicitly[ClassTag[InternalRow]])
 
  override protected def doPrepare(): Unit = {}
 
  /**
   * Возвращает зависимость RDD [[ShuffleDependency]], которая перемешает записи
   * по схеме, определенной в `newPartitioning`. Партиции RDD, свзязанные с
   * возвращенной ShuffleDependency будут входными данными для shuffle.
   */
  private[exchange] def prepareShuffleDependency()
  : ShuffleDependency[OrderAndSortKey, InternalRow, InternalRow] = {
    val rdd = child.execute()
 
    val part: Partitioner = partitioner
      .getOrElse(ShuffleExchangeOrderExec.preparePartitioner(rdd, child.output, newPartitioning))
 
    ShuffleExchangeOrderExec.prepareShuffleDependency(rdd, child.output, newPartitioning, serializer, part)
  }
 
  /**
   * Возвращает [[ShuffledOrderRDD]], а это и есть набор данных после перемешивания.
   * [[ShuffledOrderRDD]], как и прочие наследники ShuffledRDD, основан на переданной [[ShuffleDependency]]
   */
  private[exchange] def preparePostShuffleRDD(
                                               shuffleDependency
                                               : ShuffleDependency[OrderAndSortKey, InternalRow, InternalRow]
                                             ): ShuffledOrderRDD = {
    new ShuffledOrderRDD(shuffleDependency)
  }
 
  /**
   * ShuffledOrderRDD может быть кеширован для переиспользования
   */
  private var cachedShuffleRDD: ShuffledOrderRDD = _
 
  protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") {
    // Returns the same ShuffleRowRDD if this plan is used by multiple plans.
    if (cachedShuffleRDD == null) {
      cachedShuffleRDD = preparePostShuffleRDD(prepareShuffleDependency())
    }
    cachedShuffleRDD
  }
 
}
 
 
object ShuffleExchangeOrderExec {
 
  def preparePartitioner(rdd: RDD[InternalRow],
                         outputAttributes: Seq[Attribute],
                         partitioning: OrderBucketsPartitioning): Partitioner = {
    // Internally, ValuesAndRangePartitioner runs a job on the RDD that samples keys to compute
    // partition bounds. To get accurate samples, we need to copy the mutable keys.
    val rddForSampling = rdd.mapPartitionsInternal { iter =>
      val projectionPart = UnsafeProjection.create(partitioning.partitionExpressions, outputAttributes)
      val projectionOrder = UnsafeProjection.create(partitioning.orderExpression.child :: Nil, outputAttributes)
      val mutablePair = new MutablePair[InternalRow, InternalRow]()
      iter.map(row => mutablePair.update(projectionPart(row).copy(), projectionOrder(row).copy()))
    }
    implicit val ordering: LazilyGeneratedOrdering =
      new LazilyGeneratedOrdering(Seq(partitioning.orderExpression),
        outputAttributes.filter(_.references equals partitioning.orderExpression.child.references))
 
    new OrderBucketsPartitioner(
      rddForSampling,
      partitioning.numLines,
      partitioning.numPartitions,
      partitioning.distribution)
  }
 
  private def getPartitionKeyExtractor(outputAttributes: Seq[Attribute],
                                       partitioning: OrderBucketsPartitioning,
                                       i: Int): InternalRow => OrderAndSortKey = {
    val projectionPart = UnsafeProjection.create(partitioning.partitionExpressions, outputAttributes)
    val projectionOrder = UnsafeProjection.create(partitioning.orderExpression.child :: Nil, outputAttributes)
    val projectionSort = UnsafeProjection.create(partitioning.sortExpressions.map(_.child), outputAttributes)
    row => (projectionPart(row), projectionOrder(row), projectionSort(row).copy())
  }
 
  /**
   * Возвращает зависимость RDD [[ShuffleDependency]], которая перемешает записи
   * по схеме, определенной в `newPartitioning`. Партиции RDD, свзязанные с
   * возвращенной ShuffleDependency будут входными данными для shuffle.
   */
  private def prepareShuffleDependency(rdd: RDD[InternalRow],
                                       outputAttributes: Seq[Attribute],
                                       partitioning: OrderBucketsPartitioning,
                                       serializer: Serializer,
                                       part: Partitioner
                                      )
  : ShuffleDependency[OrderAndSortKey, InternalRow, InternalRow] = {
    val rddWithKeys: RDD[Product2[OrderAndSortKey, InternalRow]] = {
      rdd.mapPartitionsWithIndexInternal((i, iter) => {
        val mutablePair = new MutablePair[OrderAndSortKey, InternalRow]()
        val keyGen = getPartitionKeyExtractor(outputAttributes, partitioning, i)
        iter.map { row => mutablePair.update(keyGen(row), row.copy) }
      })
    }
 
    def keyOrdering[A <: Product3[InternalRow, InternalRow, InternalRow]]: Ordering[A] = {
      val allExprs = outputAttributes
        .filter(attr => partitioning.sortExpressions.map(_.child.references).contains(attr.references))
      implicit val sortOrdering: Ordering[InternalRow] =
        new LazilyGeneratedOrdering(partitioning.sortExpressions, allExprs)
      Ordering.by(_._3)
    }
 
    implicit val order: Ordering[OrderAndSortKey] = keyOrdering
 
    // Now, we manually create a ShuffleDependency.
    new ShuffleDependency[OrderAndSortKey, InternalRow, InternalRow](
      rddWithKeys,
      part,
      serializer,
      Some(order)
    )
  }
}

Для того, чтобы не менять схему датафрейма и скрыть подробности реализации секционирования и сортировки, используется класс org.apache.spark.sql.execution.exchange.ShuffledOrderRDD

ShuffledOrderRDD
package org.apache.spark.sql.execution.exchange
 
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.{CoalescedPartitioner, ShuffledRowRDDPartition}
import org.apache.spark._
import ru.kalininskii.orderbucketing.OrderAndSortKey
 
/**
 * Специализированный класс-наследник [[org.apache.spark.rdd.ShuffledRDD]]
 * Поддерживает локальную сортировку по указанным полям
 */
class ShuffledOrderRDD(var dependency: ShuffleDependency[OrderAndSortKey, InternalRow, InternalRow])
  extends RDD[InternalRow](dependency.rdd.context, Nil) {
 
  private[this] val numPreShufflePartitions = dependency.partitioner.numPartitions
 
  private[this] val partitionStartIndices: Array[Int] = (0 until numPreShufflePartitions).toArray
 
  private[this] val part: Partitioner =
    new CoalescedPartitioner(dependency.partitioner, partitionStartIndices)
 
  override def getDependencies: Seq[Dependency[_]] = List(dependency)
 
  override val partitioner: Option[Partitioner] = Some(part)
 
  override def getPartitions: Array[Partition] = {
    assert(partitionStartIndices.length == part.numPartitions)
    Array.tabulate[Partition](partitionStartIndices.length) { i =>
      val startIndex = partitionStartIndices(i)
      val endIndex =
        if (i < partitionStartIndices.length - 1) {
          partitionStartIndices(i + 1)
        } else {
          numPreShufflePartitions
        }
      new ShuffledRowRDDPartition(i, startIndex, endIndex)
    }
  }
 
  override def getPreferredLocations(partition: Partition): Seq[String] = {
    val tracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
    val dep = dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]]
    tracker.getPreferredLocationsForShuffle(dep, partition.index)
  }
 
  override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
    val shuffledRowPartition = split.asInstanceOf[ShuffledRowRDDPartition]
    // The range of pre-shuffle partitions that we are fetching at here is
    // [startPreShufflePartitionIndex, endPreShufflePartitionIndex - 1].
    val reader =
    SparkEnv.get.shuffleManager.getReader(
      dependency.shuffleHandle,
      shuffledRowPartition.startPreShufflePartitionIndex,
      shuffledRowPartition.endPreShufflePartitionIndex,
      context)
    reader.read().asInstanceOf[Iterator[Product2[OrderAndSortKey, InternalRow]]].map(_._2)
  }
 
  override def clearDependencies() {
    super.clearDependencies()
    dependency = null
  }
}

И вот, долгожданный класс, который будет определять границы и отыскивать номер партиции RDD.

Основным полем в нём является private var rangeBounds: Map[P, (Array[O], Int)].
Это карта, ключом которой является InternalRow со значениями полей секционирования, а значением – кортеж из массива верхних границ партиций RDD (тоже InternalRow, но с другим содержимым) и стартового номера партиции для секции (в дальнейшем может называться "смещение").

Сэмплинг производится стандартным способом (RDD.sample), поэтому может упускать значения из маленьких партиций. Напомню, что класс создавался для обработки больших объемов данных, поэтому может неэффективно работать на нескольких десятках или сотнях строк.

Отбор границ также производится в RDD, при этом количество партиций RDD для каждой секции, явной или неявной, определяется динамически на основании собранной информации о количестве строк в сэмпле.

После получения всех отобранных границ, на драйвере выполняется алгоритм со следующими инвариантами:

  1. Нулевая партиция резервируется для не вошедших в статистику ключей (секций Hive и неявных секций). В случаях, когда нулевая партиция получает слишком большой объём данных, измените поля явного и неявного секционирования или откажитесь от них полностью;

  2. Значения явных и неявных секций могут следовать в неотсортированном порядке, но этот порядок должен был зафиксирован с использованием переменной "смещения". Смещение - это значение типа Int, оно входит в кортежMap[P, (Array[O], Int)];

  3. Стартовый номер секции – смещение, начинается с одного (см. п.1 – нулевая партиция зарезервирована, и для каждого следующего ключа прирастает на количество границ + 1, что соответствует реальному количеству партиций для секции (см. п.4);

  4. Массив значений поля упорядочивания не содержит самой последней верхней границы, это логично, чтобы не создавать всегда пустую дополнительную секцию.

Поиск партиции осуществляется сначала по ключу карты (конкретное значение всех полей секционирования), затем в массиве значений, последовательным или двоичным поиском, в зависимости от длины массива. Все записи, для которых ключ отсутствует в карте, отправляются в нулевую партицию RDD, что может привести к неприемлемо большому объёму этой партиции, если поля явного или неявного партиционирования выбраны неудачно и имеют слишком высокую селективность. В этом случае лучше не секционировать таблицу вообще, оставив только разделение на бакеты по полю упорядочивания.

Выбор границы одной партиции RDD, начиная с прототипа, осуществляется так: значения меньше либо равные верхней границе (она, как мы помним, определена заранее и известна), и больше, чем предыдущая верхняя граница, или предыдущая верхняя граница отсутствует (это первый файл в явной или неявной секции).

Я знаком с концепцией интервального секционирования в Oracle, и между этими двумя реализациями есть отличие в строгости неравенств. В Oracle секции определяются по условию «меньше конкретного значения» и неявному условию «больше или равно предыдущего конкретного значения».

Это связано с тем, что секционирование Oracle работает с конкретными интервалами: для времени это чаще всего сутки, для числовых значений – миллионы или миллиарды и так далее.

Поэтому очень удобно, к примеру, что можно указать для разделения по суткам: values less than ‘2021-12-01 00:00:00’ – при этом видно, что в новую партицию попадёт начало суток (и месяца), а сколь угодно близкое, но меньшее значение будет в партиции, относящейся к ‘2021-11-30’.

В нашем случае дело совсем в другом: пользователь ничего не должен знать об интервалах и файлах, его интересуют только конкретные данные и это задача явного секционирования (первый уровень).

Таким образом, задача разделения интервалов значений – обеспечить примерно равный размер файлов, лёгкость нахождения нужных файлов и совершенно прозрачное взаимодействие, если пользователь обращается напрямую к традиционным структурам. Поэтому границы интервалов определяются «на лету», хранятся в отдельной структуре и сопоставлены с физическими файлами. Каждая секция имеет свой индивидуальный набор границ. Кроме того, укажем на достаточно важный момент: если одну из границ, в нашем случае, верхнюю, мы определяем явно, то другая, нижняя граница не очень важна, но лучше, чтобы её можно было легко найти, если в дальнейшем она будет отсутствовать в карте данных. Для нижней границы это действительно так, потому что значение нужно прочитать из начала файла или из начала сегмента данных, в случае колоночного формата хранения.

OrderBucketsPartitioner
package org.apache.spark.sql.partitioning
 
import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
 
import org.apache.spark.{Partitioner, SparkEnv}
import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.util.{CollectionsUtils, Utils}
 
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
import scala.util.hashing.byteswap32
 
/**
 * Наследник [[org.apache.spark.Partitioner]] который разделяет по точным значениям некоторых выражений (полей).
 * И интервалов значений одного выражения (поля)
 *
 * @note The actual number of partitions created by the RangePartitioner might not be the same
 *       as the `partitions` parameter, in the case where the number of sampled records is less than
 *       the value of `partitions`.
 */
class OrderBucketsPartitioner[P: ClassTag, O: Ordering : ClassTag
](rdd: RDD[_ <: Product2[P, O]],
  val numLines: Int,
  val partitions: Int,
  val distribution: Option[Seq[(Int, P, Either[(O, Seq[String]), (O, O, Int)])]])
  extends Partitioner {
 
  // We allow partitions = 0, which happens when sorting an empty RDD under the default settings.
  require(partitions >= 0, s"Number of partitions cannot be negative but found $partitions.")
 
  private var ordering = implicitly[Ordering[O]]
 
  private def getDistribution: Option[Map[P, (Array[O], Int)]] = distribution flatMap {
    case Seq() => None
    case distr =>
      val (inner, outer) = distr.partition(_._3.isLeft)
 
      //внешние границы из распределения.
      val (singleFileBounds, manyFileBounds): (Seq[(P, (Int, (O, O, Int)))], Seq[(P, (Int, (O, O, Int)))]) =
        outer
          .map { case (index, p, right) => (p, (index, right.right.get)) }
          .partition { case (_, (_, (_, _, filesNum))) => filesNum == 1 }
      //Нужно определить внутренние границы, только для тех, где больше одного файла
      val outerBounds: Map[P, Seq[(Int, (O, O, Int))]] = manyFileBounds
        .groupBy(_._1)
        .map { case (p, values) => p -> values.map(bounds => bounds._2) }
      // границы единичных файлов уже определены
      val singleBounds: Array[(P, Array[(Int, Array[O])])] = singleFileBounds
        .groupBy(_._1)
        .map { case (p, values) =>
          (p, values.map(bounds => (bounds._2._1, Array(bounds._2._2._2))).toArray) }
        .toArray
 
      //внутренние границы получаются из rdd фильтром, сэмплированием, обработкой
      val innerBounds: Array[(P, Array[(Int, Array[O])])] =
        OrderBucketsPartitioner.getInnerBounds(rdd, numLines, partitions, outerBounds)
 
      //заранее определённые внутренние границы
      val givenBounds: Array[(P, Array[(Int, Array[O])])] = inner
        .map { case (index, p, left) => (p, (index, Array(left.left.get._1))) }
        .groupBy(_._1)
        .map { case (p, values) => p -> values.map(bounds => bounds._2).toArray }
        .toArray
 
      //теперь оба массива склеиваются, группируются, получаем все границы,
      // отсортированные по возрастанию, без самой верхней,
      val fixedBounds: Map[P, (Array[O], Int)] = (givenBounds ++ innerBounds ++ singleBounds)
        .groupBy(_._1)
        .map { case (p, values) =>
          val (indexes, bounds) = values.flatMap(_._2).unzip
          p -> (bounds.flatten.sorted.init,
            indexes.min)
        }
 
      Some(fixedBounds)
  }
 
  // Карта с массивами верхних границ для первых (partitions - 1) партиций
  private var rangeBounds: Map[P, (Array[O], Int)] = getDistribution.getOrElse {
    if (partitions <= 1) {
      Map.empty[P, (Array[O], Int)]
    } else {
      val groupedBounds = OrderBucketsPartitioner.getBounds(rdd, numLines, partitions)
 
      //индексы смещений, чтобы обеспечить уникальный номер партиции, если массив границ пустой, то это одна партиция
      val overallIndex = groupedBounds.scanLeft(1) { case (sum, (_, orders)) => sum + math.max(orders.length + 1, 1) }
 
      groupedBounds
        .zip(overallIndex)
        .map { case ((part, orders), index) => (part, (orders, index)) }
        .toMap
    }
  }
 
  def getRangeBounds: Map[P, (Array[O], Int)] = rangeBounds
 
  //весь массив плюс одна нулевая партиция для не попавших в сэмпл тасков
  // партиции, не попавшие в сэмплы должны быть небольшими, поэтому ничего страшного не будет
  // даже если они попадут в один таск, так будет даже лучше - меньше файлов
  def numPartitions: Int = rangeBounds.values.map(_._1.length + 1).sum + 1
 
  private var binarySearch: (Array[O], O) => Int = CollectionsUtils.makeBinarySearch[O]
 
  def getPartition(key: Any): Int = {
    val (p, k, _) = key.asInstanceOf[(P, O, InternalRow)]
    val (bounds, shift) = rangeBounds.getOrElse(p, (Array.empty[O], 0))
    var partition = 0
    if (bounds.length < 16) {
      // If we have less than 16 partitions naive search
      while (partition < bounds.length && ordering.gt(k, bounds(partition))) {
        partition += 1
      }
    } else {
      // метод бинарного поиска определён только один раз
      partition = binarySearch(bounds, k)
      // binarySearch either returns the match location or -[insertion point]-1
      if (partition < 0) {
        partition = -partition - 1
      }
      if (partition > bounds.length) {
        partition = bounds.length
      }
    }
    shift + partition
  }
 
  override def equals(other: Any): Boolean = other match {
    case r: OrderBucketsPartitioner[_, _] =>
      r.rangeBounds == rangeBounds
    case _ =>
      false
  }
 
  override def hashCode(): Int = {
    val prime = 31
    var result = 1
    var i = 0
    val arr = rangeBounds.values.map(_._1).toArray.flatten
    while (i < arr.length) {
      result = prime * result + arr(i).hashCode()
      i += 1
    }
    result = prime * result
    result
  }
 
  @throws(classOf[IOException])
  private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
    val sfactory = SparkEnv.get.serializer
    sfactory match {
      case _: JavaSerializer => out.defaultWriteObject()
      case _ =>
        out.writeObject(ordering)
        out.writeObject(binarySearch)
 
        val ser = sfactory.newInstance()
        Utils.serializeViaNestedStream(out, ser) { stream =>
          stream.writeObject(scala.reflect.classTag[Map[P, (Array[O], Int)]])
          stream.writeObject(rangeBounds)
        }
    }
  }
 
  @throws(classOf[IOException])
  private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
    val sfactory = SparkEnv.get.serializer
    sfactory match {
      case _: JavaSerializer => in.defaultReadObject()
      case _ =>
        ordering = in.readObject().asInstanceOf[Ordering[O]]
        binarySearch = in.readObject().asInstanceOf[(Array[O], O) => Int]
 
        val ser = sfactory.newInstance()
        Utils.deserializeViaNestedStream(in, ser) { ds =>
          implicit val classTag: ClassTag[Map[P, (Array[O], Int)]] = ds.readObject[ClassTag[Map[P, (Array[O], Int)]]]()
          rangeBounds = ds.readObject[Map[P, (Array[O], Int)]]()
        }
    }
  }
}
 
object OrderBucketsPartitioner {
 
  /**
   * Рассчитывает коэффициент сэмлирования используя обратную логарифмическую пропорцию,
   * чем больше набор данных, тем меньше часть, которая будет взята из него
   *
   * @param numLines - количество записей в файле - опасное предположение, но сделать его можно
   * @param numParts - приблизительно оцененное количество партиций
   */
  private def getSampleCoefficient(numLines: Int, numParts: Int): Double = {
    val numLinesD = numLines.toDouble
    (1e4 / (numLinesD * math.log(numLinesD * numParts))) min 1.0
  }
 
  /**
   * возвращаяет сэмпл RDD
   *
   * @param rdd               - основной набор данных
   * @param sampleCoefficient - 0 < коэффициент <= 1.0
   */
  private def getSampleRDD[O: Ordering : ClassTag,
    P: ClassTag](rdd: RDD[_ <: Product2[P, O]],
                 sampleCoefficient: Double): RDD[_ <: Product2[P, O]] = {
    val sampledRdd: RDD[_ <: Product2[P, O]] = if (sampleCoefficient < 1.0) {
      val seed = byteswap32(-rdd.id - 1)
      rdd.sample(withReplacement = false, sampleCoefficient, seed)
    } else {
      rdd
    }
    sampledRdd
  }
 
  /**
   * выполняется в каждой партиции сэмлпа RDD
   * получает верхние границы для каждого файла (таска, партиции RDD)
   *
   * @param iter     неотсортированный итератор со значениями партиций и поля упорядочивания
   * @param initStep шаг, равный желаемому количеству записей в одном файле
   * @param weight   количество реальных записей на строку сэмпла
   * @return границы
   */
  private def determineBoundsWithinPartition[P: ClassTag, O: Ordering : ClassTag
  ](iter: Iterator[(P, O)],
    weight: Double,
    initStep: Int): Iterator[(P, Array[O])] = {
    prepareMap(iter)
      .map { case (p, ordered) =>
        val partitions: Int = math.ceil(ordered.length * weight / initStep).intValue()
        val bounds: Array[O] = extractBounds(ordered, partitions, weight)
        (p, bounds)
      }.iterator
  }
 
  private def prepareMap[O: Ordering : ClassTag, P: ClassTag](iter: Iterator[(P, O)]): Map[P, Array[O]] = {
    val mappedParts = iter.toArray
      .groupBy(_._1)
      .map { case (p, opArray) => (p, opArray.map(_._2).sorted) }
    mappedParts
  }
 
  //отбор самих значений
  private def extractBounds[O: Ordering : ClassTag](values: Array[O],
                                                    partitions: Int,
                                                    weight: Double)(implicit ordering: Ordering[O]): Array[O] = {
    val numCandidates = values.length
    val partCount = numCandidates * weight
    var cumWeight = 0.0
    val step: Int = (partCount / partitions).intValue()
    var target = step
    val innerBounds = ArrayBuffer.empty[O]
    var i = 0
    var j = 0
    var prevInnerBound = Option.empty[O]
 
    while ((i < numCandidates) && (j < partitions - 1)) {
      val key = values(i)
      cumWeight += weight
      if (cumWeight >= target) {
        // Skip duplicate values.
        if (prevInnerBound.isEmpty || ordering.gt(key, prevInnerBound.get)) {
          innerBounds += key
          target += step
          j += 1
          prevInnerBound = Some(key)
        }
      }
      i += 1
    }
    innerBounds.toArray
  }
 
  /**
   * сэмплирует RDD и получает карту, где ключи - значения полей партиционирования,
   * значения - массивы с верхними границами поля упорядочивания
   *
   * @param rdd      основной набор данных
   * @param numLines желаемое количество записей в одном файле
   * @param numParts предполагаемое количество записей
   * @return границы
   */
  private def getBounds[P: ClassTag, O: Ordering : ClassTag](rdd: RDD[_ <: Product2[P, O]],
                                                             numLines: Int,
                                                             numParts: Int): Array[(P, Array[O])] = {
    val sampleCoefficient = getSampleCoefficient(numLines, numParts) //rdd.partitions.length
 
    val sampledRdd: RDD[_ <: Product2[P, O]] = getSampleRDD(rdd, sampleCoefficient)
 
    val weight = 1.0 / sampleCoefficient
    sampledRdd
      .map(row => (row._1, row._2))
      .partitionBy(new Partitioner {
        override def numPartitions: Int = numParts
 
        override def getPartition(key: Any): Int = Utils.nonNegativeMod(key.hashCode, numParts)
      })
      .mapPartitions(iter => determineBoundsWithinPartition(iter, weight, numLines))
      .collect()
  }
 
 
  /**
   * фильтрует RDD на равенство полей к нужным значениям и по нахождению в пределах внешних границ
   *
   * @param rdd    основной набор данных
   * @param bounds границы RDD
   * @return тщательно отфильтрованный RDD
   */
  private def filterRDDByOuterBounds[P: ClassTag, O: Ordering : ClassTag
  ](rdd: RDD[_ <: Product2[P, O]],
    bounds: Map[P, Seq[(Int, (O, O, Int))]]): RDD[_ <: Product2[P, O]] = {
    val ordering = implicitly[Ordering[O]]
    //получаем Option[Array[Tuple3]] и проверяем exists
    rdd
      .filter { item =>
        bounds
          .get(item._1)
          .exists(_.exists(b => ordering.lteq(b._2._1, item._2) && ordering.lteq(item._2, b._2._2)))
      }
  }
 
  /**
   * выполняется в каждой партиции сэмлпа RDD
   * получает верхние границы для каждого файла (таска, партиции RDD)
   *
   * @param iter     неотсортированный итератор со значениями партиций и поля упорядочивания
   * @param initStep шаг, равный желаемому количеству записей в одном файле
   * @param weight   количество реальных записей на строку сэмпла
   * @return границы
   */
  private def determineInnerBoundsWithinPartition[P: ClassTag, O: Ordering : ClassTag
  ](iter: Iterator[(P, O)],
    weight: Double,
    initStep: Int,
    outerBounds: Map[P, Seq[(Int, (O, O, Int))]]): Iterator[(P, Array[(Int, Array[O])])] = {
    val ordering = implicitly[Ordering[O]]
 
    prepareMap(iter)
      .map { case (p, orderedValues) =>
        (p, outerBounds(p).toArray.map { case (index, (lowerO, upperO, partitions)) =>
          val ordered = orderedValues
            .dropWhile(ordering.lt(_, lowerO))
            .takeWhile(ordering.lteq(_, upperO))
 
          val innerBounds: Array[O] = extractBounds(ordered, partitions, weight)
 
          (index, innerBounds :+ upperO)
        })
      }.iterator
  }
 
  /**
   * сэмплирует RDD и получает карту, где ключи - значения полей партиционирования,
   * значения - массивы с верхними границами поля упорядочивания
   * Важно, чтобы можно было использовать верхние границы в пределах партиции,
   * чтобы не допустить поглощения возможных внутренних партиций
   *
   * @param rdd      основной набор данных
   * @param numLines желаемое количество записей в одном файле
   * @param numParts предполагаемое количество записей
   * @param bounds   внешние границы RDD
   * @return границы
   */
  private def getInnerBounds[P: ClassTag, O: Ordering : ClassTag](rdd: RDD[_ <: Product2[P, O]],
                                                                  numLines: Int,
                                                                  numParts: Int,
                                                                  bounds: Map[P, Seq[(Int, (O, O, Int))]]
                                                                 ): Array[(P, Array[(Int, Array[O])])] = {
    val sampleCoefficient = getSampleCoefficient(numLines, numParts) //rdd.partitions.length
 
    val filterRdd: RDD[_ <: Product2[P, O]] = filterRDDByOuterBounds(rdd, bounds)
    val sampledRdd: RDD[_ <: Product2[P, O]] = getSampleRDD(filterRdd, sampleCoefficient)
 
    val weight = 1.0 / sampleCoefficient
    sampledRdd
      .map(row => (row._1, row._2))
      .partitionBy(new Partitioner {
        override def numPartitions: Int = numParts
 
        override def getPartition(key: Any): Int = Utils.nonNegativeMod(key.hashCode, bounds.size)
      })
      .mapPartitions(iter => determineInnerBoundsWithinPartition(iter, weight, numLines, bounds))
      .collect()
  }
}

Чтобы связать физический план и само выполнение, нам нужно будет создать объект, расширяющий SparkStrategy.

RepartitionStrategy
package ru.kalininskii.rangebucketing.strategy
 
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy, exchange}
import ru.kalininskii.rangebucketing.plans.logical.RepartitionByRangeBuckets
 
object RepartitionStrategy extends SparkStrategy {
  override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
    case r: RepartitionByRangeBuckets =>
      exchange.ShuffleExchangeBucketsExec(r.partitioning, planLater(r.child)) :: Nil
    case _ => Nil
  }
}

Теперь можно сделать инъекцию расширения в SparkSession:

Extension injection
spark = SparkSession
  .builder()
  .appName("PARTITION_TEST")
  .master("local[*]")
  .config(sparkConf)
  .withExtensions(e => {
    e.injectPlannerStrategy(_ => RepartitionStrategy)
  })
  .getOrCreate()

Если инъекции или способа реализации партишенера не будет, то мы увидим такую ошибку:

Сообщение об ошибке
assertion failed: No plan for RepartitionWithOrderAndSort [ts_part#55], id#13 ASC NULLS FIRST, 4000, 24
+- Project [id#13, name#14, amount#15, value#16, divider#17, ts#18, substring(cast(ts#18 as string), 0, 10) AS ts_part#55]
   +- InMemoryRelation [id#13, name#14, amount#15, value#16, divider#17, ts#18], StorageLevel(disk, memory, deserialized, 1 replicas)
         +- LocalTableScan [id#13, name#14, amount#15, value#16, divider#17, ts#18]

java.lang.AssertionError: assertion failed: No plan for RepartitionWithOrderAndSort [ts_part#55], id#13 ASC NULLS FIRST, 4000, 24
+- Project [id#13, name#14, amount#15, value#16, divider#17, ts#18, substring(cast(ts#18 as string), 0, 10) AS ts_part#55]
   +- InMemoryRelation [id#13, name#14, amount#15, value#16, divider#17, ts#18], StorageLevel(disk, memory, deserialized, 1 replicas)
         +- LocalTableScan [id#13, name#14, amount#15, value#16, divider#17, ts#18]

         at scala.Predef$.assert(Predef.scala:170)
         at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
         at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
         at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
         at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
         at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
         at scala.collection.Iterator$class.foreach(Iterator.scala:891)
         at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
         at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
         at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1334)
         at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75)
         at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67)
         at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
         at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
         at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
         at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
         at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
         at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
         at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
         at org.apache.spark.sql.execution.QueryExecution$$anonfun$toString$3.apply(QueryExecution.scala:207)
         at org.apache.spark.sql.execution.QueryExecution$$anonfun$toString$3.apply(QueryExecution.scala:207)
         at org.apache.spark.sql.execution.QueryExecution.stringOrError(QueryExecution.scala:99)
         at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:207)
         at org.apache.spark.sql.execution.command.ExplainCommand.run(commands.scala:167)
         at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
         at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
         at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
         at org.apache.spark.sql.Dataset.explain(Dataset.scala:485)
         at ru.kalininskii.rangebucketing.RangeBucketingPartitionTest$$anonfun$1.apply(RangeBucketingPartitionTest.scala:49)
         at ru.kalininskii.rangebucketing.RangeBucketingPartitionTest$$anonfun$1.apply(RangeBucketingPartitionTest.scala:42)

Сгенерируем набор данных с псевдослучайными значениями и применим к нему созданный партишенер

Датафрейм
    val dataFrameSource = getRandomDF(0, rangeLimit, 3, 0).persist()
      .withColumn("ts_part", fn.expr("substring(ts,0,10)"))
 
    dataFrameSource.printSchema()
 
    val dfRep = dataFrameSource
      .repartitionWithOrderAndSort(numLines, rangeLimit / numLines,
        fn.col("event_time"), List(fn.col("ts_part")), List(fn.col("id")))
      .persist()
 
    dfRep.explain(true)

В планах выполнения видны новые пункты, на всех этапах их можно отследить

План запроса с применением партишенера
== Parsed Logical Plan ==
'Repartition with unknown distribution: ['ts_part], 'id ASC NULLS FIRST, 4000, 24
+- 'Project [id#13, name#14, amount#15, value#16, divider#17, ts#18, 'substring('ts, 0, 10) AS ts_part#55]
   +- Project [_1#6 AS id#13, _2#7 AS name#14, _3#8 AS amount#15, _4#9 AS value#16, _5#10 AS divider#17, _6#11 AS ts#18]
      +- LocalRelation [_1#6, _2#7, _3#8, _4#9, _5#10, _6#11]

== Analyzed Logical Plan ==
id: int, name: string, amount: decimal(38,18), value: decimal(38,18), divider: string, ts: timestamp, ts_part: string
Repartition with unknown distribution: [ts_part#55], id#13 ASC NULLS FIRST, 4000, 24
+- Project [id#13, name#14, amount#15, value#16, divider#17, ts#18, substring(cast(ts#18 as string), 0, 10) AS ts_part#55]
   +- Project [_1#6 AS id#13, _2#7 AS name#14, _3#8 AS amount#15, _4#9 AS value#16, _5#10 AS divider#17, _6#11 AS ts#18]
      +- LocalRelation [_1#6, _2#7, _3#8, _4#9, _5#10, _6#11]

== Optimized Logical Plan ==
InMemoryRelation [id#13, name#14, amount#15, value#16, divider#17, ts#18, ts_part#55], StorageLevel(disk, memory, deserialized, 1 replicas)
   +- ExchangeByRangeBuckets repartition with unknown distribution:(ts_part#55, id#13 ASC NULLS FIRST, 4000, 24, None)
      +- *(1) Project [id#13, name#14, amount#15, value#16, divider#17, ts#18, substring(cast(ts#18 as string), 0, 10) AS ts_part#55]
         +- InMemoryTableScan [amount#15, divider#17, id#13, name#14, ts#18, value#16]
               +- InMemoryRelation [id#13, name#14, amount#15, value#16, divider#17, ts#18], StorageLevel(disk, memory, deserialized, 1 replicas)
                     +- LocalTableScan [id#13, name#14, amount#15, value#16, divider#17, ts#18]

== Physical Plan ==
InMemoryTableScan [id#13, name#14, amount#15, value#16, divider#17, ts#18, ts_part#55]
   +- InMemoryRelation [id#13, name#14, amount#15, value#16, divider#17, ts#18, ts_part#55], StorageLevel(disk, memory, deserialized, 1 replicas)
         +- ExchangeByRangeBuckets repartition with unknown distribution:(ts_part#55, id#13 ASC NULLS FIRST, 4000, 24, None)
            +- *(1) Project [id#13, name#14, amount#15, value#16, divider#17, ts#18, substring(cast(ts#18 as string), 0, 10) AS ts_part#55]
               +- InMemoryTableScan [amount#15, divider#17, id#13, name#14, ts#18, value#16]
                     +- InMemoryRelation [id#13, name#14, amount#15, value#16, divider#17, ts#18], StorageLevel(disk, memory, deserialized, 1 replicas)
                           +- LocalTableScan [id#13, name#14, amount#15, value#16, divider#17, ts#18]

Тесты должны показать, что классы планов на самом деле используются (простые юнит-тесты планов Spark):

Использование классов
    val logicalPlanString = "Repartition plan with unknown distribution: " +
      "partition by ['ts_part], " +
      "global order by 'event_time, " +
      "local sort by ['id], " +
      "4000 rows per task, 24 initial tasks"
 
    val analyzedPlanString = "Repartition plan with unknown distribution: " +
      "partition by [ts_part#64], " +
      "global order by event_time#21, " +
      "local sort by [id#15], " +
      "4000 rows per task, 24 initial tasks"
 
    val sparkPlanString = "ExchangeWithOrder Repartition with unknown distribution: " +
      "partition by [ts_part#64], " +
      "global order by event_time#21, " +
      "local sort by [id#15], " +
      "4000 rows per task, 24 initial tasks"
 
    assert(dfRep.queryExecution.logical.toString contains logicalPlanString)
    assert(dfRep.queryExecution.analyzed.toString contains analyzedPlanString)
    assert(dfRep.queryExecution.sparkPlan.toString contains sparkPlanString)

Количество секций всегда будет равно 28

Количество партиций RDD
println(dfRep.rdd.getNumPartitions)
28
dfRep.rdd.getNumPartitions should be(28)

Выведем датафрейм с номерами партиций, нулевая партиция должна отсутствовать, потому что не содержит ни одного значения. Количество – колонка «cnt_» - всегда немного меньше и количество достаточно равномерное, отклонение, как правило, остаётся в пределах пяти процентов.

Количество записей в партициях RDD
+----------+-----------+-----------------------+-----------------------+----+
|ts_part   |rdd_part_id|range_min_event_time   |range_max_event_time   |cnt_|
+----------+-----------+-----------------------+-----------------------+----+
|2021-09-05|10         |2021-09-05 21:57:56.186|2021-09-05 21:57:56.454|3672|
|2021-09-05|11         |2021-09-05 21:57:56.455|2021-09-05 21:57:56.547|3400|
|2021-09-05|12         |2021-09-05 21:57:56.548|2021-09-05 21:57:56.642|3761|
|2021-09-05|13         |2021-09-05 21:57:56.643|2021-09-06 21:57:56.447|3601|
|2021-09-05|14         |2021-09-06 21:57:56.448|2021-09-06 21:57:56.544|3601|
|2021-09-05|15         |2021-09-06 21:57:56.545|2021-09-06 21:57:56.641|3856|
|2021-09-05|16         |2021-09-06 21:57:56.642|2021-09-07 21:57:56.449|3711|
|2021-09-05|17         |2021-09-07 21:57:56.45 |2021-09-07 21:57:56.549|3703|
|2021-09-05|18         |2021-09-07 21:57:56.55 |2021-09-07 21:57:56.647|3776|
|2021-09-06|19         |2021-09-05 21:57:56.186|2021-09-05 21:57:56.454|3763|
|2021-09-06|20         |2021-09-05 21:57:56.455|2021-09-05 21:57:56.564|3916|
|2021-09-06|21         |2021-09-05 21:57:56.565|2021-09-06 21:57:56.241|3557|
|2021-09-06|22         |2021-09-06 21:57:56.242|2021-09-06 21:57:56.459|3721|
|2021-09-06|23         |2021-09-06 21:57:56.46 |2021-09-06 21:57:56.563|3714|
|2021-09-06|24         |2021-09-06 21:57:56.564|2021-09-07 21:57:56.246|3520|
|2021-09-06|25         |2021-09-07 21:57:56.247|2021-09-07 21:57:56.455|3671|
|2021-09-06|26         |2021-09-07 21:57:56.456|2021-09-07 21:57:56.556|3716|
|2021-09-06|27         |2021-09-07 21:57:56.557|2021-09-07 21:57:56.647|3670|
|2021-09-07|1          |2021-09-05 21:57:56.188|2021-09-05 21:57:56.454|3690|
|2021-09-07|2          |2021-09-05 21:57:56.455|2021-09-05 21:57:56.556|3704|
|2021-09-07|3          |2021-09-05 21:57:56.557|2021-09-05 21:57:56.644|3601|
|2021-09-07|4          |2021-09-05 21:57:56.645|2021-09-06 21:57:56.45 |3685|
|2021-09-07|5          |2021-09-06 21:57:56.451|2021-09-06 21:57:56.55 |3767|
|2021-09-07|6          |2021-09-06 21:57:56.551|2021-09-06 21:57:56.644|3846|
|2021-09-07|7          |2021-09-06 21:57:56.645|2021-09-07 21:57:56.452|3782|
|2021-09-07|8          |2021-09-07 21:57:56.453|2021-09-07 21:57:56.555|3775|
|2021-09-07|9          |2021-09-07 21:57:56.556|2021-09-07 21:57:56.647|3820|
+----------+-----------+-----------------------+-----------------------+----+

Сохраним и выведем, чтобы убедиться, что все файлы соответствуют исходными секциям

Проверка соответствия
dfRep
      .withColumn("rdd_part_id", fn.spark_partition_id())
      .groupBy(fn.col("ts_part") as "ts_part", fn.col("rdd_part_id"))
      .agg(fn.min("event_time") as "range_min_event_time",
        fn.max("event_time") as "range_max_event_time",
        fn.count(fn.lit(0)) as "cnt_")
      .orderBy("ts_part", "rdd_part_id")
      .show(100, false)
 
    dfRep
      .sortWithinPartitions(fn.col("id"))
      .write
      .mode(SaveMode.Overwrite)
      .partitionBy("ts_part")
      .format("parquet")
      .option("path", "/test/pa/sometable/snp")
      .saveAsTable("testschema.sometable")
 
    val dfT = spark.table("testschema.sometable")
 
    println(dfT.count)
 
    dfT
      .withColumn("file_name",
        fn.regexp_replace(fn.input_file_name(), ".*/test/pa/sometable/snp", ""))
      .groupBy(fn.col("ts_part") as "ts_part", fn.col("file_name"))
      .agg(fn.min("event_time") as "range_min_event_time",
        fn.max("event_time") as "range_max_event_time",
        fn.count(fn.lit(0)) as "cnt_")
      .orderBy("file_name")
      .show(100, false)
+----------+---------------------------------------------------------------------------------------+-----------------------+-----------------------+----+
|ts_part   |file_name                                                                              |range_min_event_time   |range_max_event_time   |cnt_|
+----------+---------------------------------------------------------------------------------------+-----------------------+-----------------------+----+
|2021-09-05|/ts_part=2021-09-05/part-00010-28321a07-711d-4936-b305-680199f90649.c000.snappy.parquet|2021-09-05 21:57:56.186|2021-09-05 21:57:56.454|3672|
|2021-09-05|/ts_part=2021-09-05/part-00011-28321a07-711d-4936-b305-680199f90649.c000.snappy.parquet|2021-09-05 21:57:56.455|2021-09-05 21:57:56.547|3400|
|2021-09-05|/ts_part=2021-09-05/part-00012-28321a07-711d-4936-b305-680199f90649.c000.snappy.parquet|2021-09-05 21:57:56.548|2021-09-05 21:57:56.642|3761|
|2021-09-05|/ts_part=2021-09-05/part-00013-28321a07-711d-4936-b305-680199f90649.c000.snappy.parquet|2021-09-05 21:57:56.643|2021-09-06 21:57:56.447|3601|
|2021-09-05|/ts_part=2021-09-05/part-00014-28321a07-711d-4936-b305-680199f90649.c000.snappy.parquet|2021-09-06 21:57:56.448|2021-09-06 21:57:56.544|3601|
|2021-09-05|/ts_part=2021-09-05/part-00015-28321a07-711d-4936-b305-680199f90649.c000.snappy.parquet|2021-09-06 21:57:56.545|2021-09-06 21:57:56.641|3856|
|2021-09-05|/ts_part=2021-09-05/part-00016-28321a07-711d-4936-b305-680199f90649.c000.snappy.parquet|2021-09-06 21:57:56.642|2021-09-07 21:57:56.449|3711|
|2021-09-05|/ts_part=2021-09-05/part-00017-28321a07-711d-4936-b305-680199f90649.c000.snappy.parquet|2021-09-07 21:57:56.45 |2021-09-07 21:57:56.549|3703|
|2021-09-05|/ts_part=2021-09-05/part-00018-28321a07-711d-4936-b305-680199f90649.c000.snappy.parquet|2021-09-07 21:57:56.55 |2021-09-07 21:57:56.647|3776|
|2021-09-06|/ts_part=2021-09-06/part-00019-28321a07-711d-4936-b305-680199f90649.c000.snappy.parquet|2021-09-05 21:57:56.186|2021-09-05 21:57:56.454|3763|
|2021-09-06|/ts_part=2021-09-06/part-00020-28321a07-711d-4936-b305-680199f90649.c000.snappy.parquet|2021-09-05 21:57:56.455|2021-09-05 21:57:56.564|3916|
|2021-09-06|/ts_part=2021-09-06/part-00021-28321a07-711d-4936-b305-680199f90649.c000.snappy.parquet|2021-09-05 21:57:56.565|2021-09-06 21:57:56.241|3557|
|2021-09-06|/ts_part=2021-09-06/part-00022-28321a07-711d-4936-b305-680199f90649.c000.snappy.parquet|2021-09-06 21:57:56.242|2021-09-06 21:57:56.459|3721|
|2021-09-06|/ts_part=2021-09-06/part-00023-28321a07-711d-4936-b305-680199f90649.c000.snappy.parquet|2021-09-06 21:57:56.46 |2021-09-06 21:57:56.563|3714|
|2021-09-06|/ts_part=2021-09-06/part-00024-28321a07-711d-4936-b305-680199f90649.c000.snappy.parquet|2021-09-06 21:57:56.564|2021-09-07 21:57:56.246|3520|
|2021-09-06|/ts_part=2021-09-06/part-00025-28321a07-711d-4936-b305-680199f90649.c000.snappy.parquet|2021-09-07 21:57:56.247|2021-09-07 21:57:56.455|3671|
|2021-09-06|/ts_part=2021-09-06/part-00026-28321a07-711d-4936-b305-680199f90649.c000.snappy.parquet|2021-09-07 21:57:56.456|2021-09-07 21:57:56.556|3716|
|2021-09-06|/ts_part=2021-09-06/part-00027-28321a07-711d-4936-b305-680199f90649.c000.snappy.parquet|2021-09-07 21:57:56.557|2021-09-07 21:57:56.647|3670|
|2021-09-07|/ts_part=2021-09-07/part-00001-28321a07-711d-4936-b305-680199f90649.c000.snappy.parquet|2021-09-05 21:57:56.188|2021-09-05 21:57:56.454|3690|
|2021-09-07|/ts_part=2021-09-07/part-00002-28321a07-711d-4936-b305-680199f90649.c000.snappy.parquet|2021-09-05 21:57:56.455|2021-09-05 21:57:56.556|3704|
|2021-09-07|/ts_part=2021-09-07/part-00003-28321a07-711d-4936-b305-680199f90649.c000.snappy.parquet|2021-09-05 21:57:56.557|2021-09-05 21:57:56.644|3601|
|2021-09-07|/ts_part=2021-09-07/part-00004-28321a07-711d-4936-b305-680199f90649.c000.snappy.parquet|2021-09-05 21:57:56.645|2021-09-06 21:57:56.45 |3685|
|2021-09-07|/ts_part=2021-09-07/part-00005-28321a07-711d-4936-b305-680199f90649.c000.snappy.parquet|2021-09-06 21:57:56.451|2021-09-06 21:57:56.55 |3767|
|2021-09-07|/ts_part=2021-09-07/part-00006-28321a07-711d-4936-b305-680199f90649.c000.snappy.parquet|2021-09-06 21:57:56.551|2021-09-06 21:57:56.644|3846|
|2021-09-07|/ts_part=2021-09-07/part-00007-28321a07-711d-4936-b305-680199f90649.c000.snappy.parquet|2021-09-06 21:57:56.645|2021-09-07 21:57:56.452|3782|
|2021-09-07|/ts_part=2021-09-07/part-00008-28321a07-711d-4936-b305-680199f90649.c000.snappy.parquet|2021-09-07 21:57:56.453|2021-09-07 21:57:56.555|3775|
|2021-09-07|/ts_part=2021-09-07/part-00009-28321a07-711d-4936-b305-680199f90649.c000.snappy.parquet|2021-09-07 21:57:56.556|2021-09-07 21:57:56.647|3820|
+----------+---------------------------------------------------------------------------------------+-----------------------+-----------------------+----+

 Как видим, интервалы сохранились, для этого не потребовалось никаких дополнительных действий. Количество файлов для этого случая всегда на один меньше, чем количество секций в RDD. Однако в общем случае это не так, ведь в нулевую секцию могут попасть данные из очень небольших физических партиций. dfT.inputFiles.length should be(dfRep.rdd.getNumPartitions - 1)

Количество записей осталось прежним:

Количество записей в сохраненных файлах
val savedCount = dfT.count
savedCount shouldEqual rangeLimit

И схема датафрейма не изменилась, партишенер не меняет порядок полей и их типы.

Схемы совпадают
dfT.printSchema()
root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- amount: decimal(38,18) (nullable = true)
 |-- value: decimal(38,18) (nullable = true)
 |-- divider: string (nullable = true)
 |-- ts: timestamp (nullable = true)
 |-- event_time: timestamp (nullable = true)
 |-- ts_part: string (nullable = true)
dfT.schema should be (dataFrameSource.schema)

 Для того, чтобы убедиться, что сортировка работает, выведем все записи с "id" меньшим 32, без дополнительной сортировки. Мы увидим, что в пределах файла записи отсортированы по возрастанию поля "id", несмотря на то, что были специально перемешаны при создании датафрейма.

Проверка локальной сортировки
dfT
  .withColumn("file_name",
    fn.regexp_replace(fn.input_file_name(), ".*/test/pa/sometable/snp", ""))
  .select("id", "file_name")
  .where("id < 32")
  .show(32, false)
+---+---------------------------------------------------------------------------------------+
|id |file_name                                                                              |
+---+---------------------------------------------------------------------------------------+
|9  |/ts_part=2021-09-06/part-00022-ec8ffbb5-474a-4ce1-bcdb-cc5665800031.c000.snappy.parquet|
|15 |/ts_part=2021-09-06/part-00022-ec8ffbb5-474a-4ce1-bcdb-cc5665800031.c000.snappy.parquet|
|19 |/ts_part=2021-09-06/part-00022-ec8ffbb5-474a-4ce1-bcdb-cc5665800031.c000.snappy.parquet|
|25 |/ts_part=2021-09-07/part-00004-ec8ffbb5-474a-4ce1-bcdb-cc5665800031.c000.snappy.parquet|
|4  |/ts_part=2021-09-07/part-00006-ec8ffbb5-474a-4ce1-bcdb-cc5665800031.c000.snappy.parquet|
|13 |/ts_part=2021-09-07/part-00006-ec8ffbb5-474a-4ce1-bcdb-cc5665800031.c000.snappy.parquet|
|14 |/ts_part=2021-09-07/part-00006-ec8ffbb5-474a-4ce1-bcdb-cc5665800031.c000.snappy.parquet|
|16 |/ts_part=2021-09-07/part-00006-ec8ffbb5-474a-4ce1-bcdb-cc5665800031.c000.snappy.parquet|
|28 |/ts_part=2021-09-07/part-00006-ec8ffbb5-474a-4ce1-bcdb-cc5665800031.c000.snappy.parquet|
|31 |/ts_part=2021-09-07/part-00006-ec8ffbb5-474a-4ce1-bcdb-cc5665800031.c000.snappy.parquet|
|8  |/ts_part=2021-09-05/part-00012-ec8ffbb5-474a-4ce1-bcdb-cc5665800031.c000.snappy.parquet|
|12 |/ts_part=2021-09-05/part-00012-ec8ffbb5-474a-4ce1-bcdb-cc5665800031.c000.snappy.parquet|
|21 |/ts_part=2021-09-05/part-00012-ec8ffbb5-474a-4ce1-bcdb-cc5665800031.c000.snappy.parquet|
|30 |/ts_part=2021-09-05/part-00012-ec8ffbb5-474a-4ce1-bcdb-cc5665800031.c000.snappy.parquet|
|10 |/ts_part=2021-09-05/part-00015-ec8ffbb5-474a-4ce1-bcdb-cc5665800031.c000.snappy.parquet|
|27 |/ts_part=2021-09-05/part-00015-ec8ffbb5-474a-4ce1-bcdb-cc5665800031.c000.snappy.parquet|
|0  |/ts_part=2021-09-06/part-00024-ec8ffbb5-474a-4ce1-bcdb-cc5665800031.c000.snappy.parquet|
|1  |/ts_part=2021-09-06/part-00024-ec8ffbb5-474a-4ce1-bcdb-cc5665800031.c000.snappy.parquet|
|3  |/ts_part=2021-09-06/part-00024-ec8ffbb5-474a-4ce1-bcdb-cc5665800031.c000.snappy.parquet|
|20 |/ts_part=2021-09-06/part-00024-ec8ffbb5-474a-4ce1-bcdb-cc5665800031.c000.snappy.parquet|
|22 |/ts_part=2021-09-06/part-00024-ec8ffbb5-474a-4ce1-bcdb-cc5665800031.c000.snappy.parquet|
|26 |/ts_part=2021-09-06/part-00024-ec8ffbb5-474a-4ce1-bcdb-cc5665800031.c000.snappy.parquet|
|18 |/ts_part=2021-09-07/part-00001-ec8ffbb5-474a-4ce1-bcdb-cc5665800031.c000.snappy.parquet|
|5  |/ts_part=2021-09-05/part-00010-ec8ffbb5-474a-4ce1-bcdb-cc5665800031.c000.snappy.parquet|
|6  |/ts_part=2021-09-05/part-00010-ec8ffbb5-474a-4ce1-bcdb-cc5665800031.c000.snappy.parquet|
|23 |/ts_part=2021-09-05/part-00010-ec8ffbb5-474a-4ce1-bcdb-cc5665800031.c000.snappy.parquet|
|24 |/ts_part=2021-09-05/part-00010-ec8ffbb5-474a-4ce1-bcdb-cc5665800031.c000.snappy.parquet|
|29 |/ts_part=2021-09-05/part-00010-ec8ffbb5-474a-4ce1-bcdb-cc5665800031.c000.snappy.parquet|
|2  |/ts_part=2021-09-06/part-00019-ec8ffbb5-474a-4ce1-bcdb-cc5665800031.c000.snappy.parquet|
|7  |/ts_part=2021-09-06/part-00019-ec8ffbb5-474a-4ce1-bcdb-cc5665800031.c000.snappy.parquet|
|11 |/ts_part=2021-09-06/part-00019-ec8ffbb5-474a-4ce1-bcdb-cc5665800031.c000.snappy.parquet|
|17 |/ts_part=2021-09-06/part-00019-ec8ffbb5-474a-4ce1-bcdb-cc5665800031.c000.snappy.parquet|
+---+---------------------------------------------------------------------------------------+

Добавлю полные тестовые классы и pom.xml для сборки Maven:

Тесты
package ru.kalininskii.orderbucketing

import java.sql.Timestamp
import java.time.LocalDateTime

import org.apache.spark.sql.{SaveMode, functions => fn}
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers._
import ru.kalininskii.orderbucketing.OrderBucketing._

import scala.util.Random

class OrderPartitionTest extends AnyFlatSpec with SparkBuilder {

  val rangeLimit = 99999
  val seed = 111
  val numLines = 4000

  private def getRandomDF(start: Int, limit: Int, days: Int, daysMinus: Int) = {
    val rnd = new Random()
    rnd.setSeed(seed)

    val sparkSession = spark
    import sparkSession.implicits._

    val list = Range(start, limit)
      .toList
      .map(id => (Option(id), //id, Option для nullable = true
        rnd.alphanumeric.take(rnd.nextInt(32)).mkString, //name
        BigDecimal(rnd.nextInt().abs * rnd.nextDouble()), //amount
        BigDecimal(rnd.nextLong().abs.longValue()), //value
        rnd.alphanumeric.take(rnd.nextInt(3)).mkString, //divider
        Timestamp.valueOf(LocalDateTime.now().minusSeconds((rnd.nextInt(days).abs max daysMinus) * 24 * 60 * 60)), //ts
        Timestamp.valueOf(LocalDateTime.now().minusSeconds((rnd.nextInt(days).abs max daysMinus) * 24 * 60 * 60)) //event_time
      ))

    Random.shuffle(list).toDF("id", "name", "amount", "value", "divider", "ts", "event_time")

  }

  override def beforeAll() {
    super.beforeAll()
    spark.sql("create database if not exists testschema location '/test'")
  }

  "Range buckets partitioner" must "partition correctly" in {
    val dataFrameSource = getRandomDF(0, rangeLimit, 3, 0).persist()
      .withColumn("ts_part", fn.expr("substring(ts,0,10)"))

    dataFrameSource.printSchema()

    val dfRep = dataFrameSource
      .repartitionWithOrderAndSort(numLines, rangeLimit / numLines,
        fn.col("event_time"), List(fn.col("ts_part")), List(fn.col("id")))
      .persist()

    dfRep.explain(true)

    val logicalPlanString = "Repartition plan with unknown distribution: " +
      "partition by ['ts_part], " +
      "global order by 'event_time, " +
      "local sort by ['id], " +
      "4000 rows per task, 24 initial tasks"

    val analyzedPlanString = "Repartition plan with unknown distribution: " +
      "partition by [ts_part#64], " +
      "global order by event_time#21, " +
      "local sort by [id#15], " +
      "4000 rows per task, 24 initial tasks"

    val sparkPlanString = "ExchangeWithOrder Repartition with unknown distribution: " +
      "partition by [ts_part#64], " +
      "global order by event_time#21, " +
      "local sort by [id#15], " +
      "4000 rows per task, 24 initial tasks"

    assert(dfRep.queryExecution.logical.toString contains logicalPlanString)
    assert(dfRep.queryExecution.analyzed.toString contains analyzedPlanString)
    assert(dfRep.queryExecution.sparkPlan.toString contains sparkPlanString)

    println(dfRep.rdd.getNumPartitions)

    dfRep.rdd.getNumPartitions should be(28)

    dfRep
      .withColumn("rdd_part_id", fn.spark_partition_id())
      .groupBy(fn.col("ts_part") as "ts_part", fn.col("rdd_part_id"))
      .agg(fn.min("event_time") as "range_min_event_time",
        fn.max("event_time") as "range_max_event_time",
        fn.count(fn.lit(0)) as "cnt_")
      .orderBy("ts_part", "rdd_part_id")
      .show(100, false)

    dfRep
      .sortWithinPartitions(fn.col("id"))
      .write
      .mode(SaveMode.Overwrite)
      .partitionBy("ts_part")
      .format("parquet")
      .option("path", "/test/pa/sometable/snp")
      .saveAsTable("testschema.sometable")

    val dfT = spark.table("testschema.sometable")

    val savedCount = dfT.count
    savedCount shouldEqual rangeLimit

    dfT
      .withColumn("file_name",
        fn.regexp_replace(fn.input_file_name(), ".*/test/pa/sometable/snp", ""))
      .groupBy(fn.col("ts_part") as "ts_part", fn.col("file_name"))
      .agg(fn.min("event_time") as "range_min_event_time",
        fn.max("event_time") as "range_max_event_time",
        fn.count(fn.lit(0)) as "cnt_")
      .orderBy("file_name")
      .show(100, false)

    dfT.printSchema()

    dfT.inputFiles.length should be(dfRep.rdd.getNumPartitions - 1)

    dfT.schema should be(dataFrameSource.schema)

    dfT
      .withColumn("file_name",
        fn.regexp_replace(fn.input_file_name(), ".*/test/pa/sometable/snp", ""))
      .select("id", "file_name")
      .where("id < 32")
      .show(32, false)

  }
}
Создание Hadoop Minicluster для тестов
package ru.kalininskii.orderbucketing
 
import java.io.File
import java.nio.file.Files
 
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileUtil
import org.apache.hadoop.hdfs.MiniDFSCluster
import org.apache.log4j.{Level, Logger}
import org.scalatest.{BeforeAndAfterAll, TestSuite}
 
trait MiniHdfsBuilder extends BeforeAndAfterAll {
  this: TestSuite =>
 
  val baseDir: File = Files.createTempDirectory("test_hdfs").toFile.getAbsoluteFile
 
  val fsConf: Configuration = new Configuration()
  fsConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath)
 
  val builder: MiniDFSCluster.Builder = new MiniDFSCluster.Builder(fsConf)
  var hdfsCluster: MiniDFSCluster = _
 
  override def beforeAll() {
    super.beforeAll()
 
    Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.spark_project.jetty.server").setLevel(Level.WARN)
 
    hdfsCluster = builder.build()
  }
 
  override def afterAll() {
    try {
      super.afterAll()
    }
    finally {
      hdfsCluster.shutdown()
      FileUtil.fullyDelete(baseDir)
    }
  }
}
Создание SparkSession для тестов
package ru.kalininskii.orderbucketing
import java.io.File
 
import org.apache.hadoop.fs.FileUtil
import org.apache.hadoop.hdfs.DistributedFileSystem
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.scalatest.TestSuite
import ru.kalininskii.orderbucketing.strategy.RepartitionStrategy
 
trait SparkBuilder extends MiniHdfsBuilder {
  this: TestSuite =>
  var fileSystem: DistributedFileSystem = _
 
  var spark: SparkSession = _
 
  override def beforeAll() {
    super.beforeAll()
    //сессия уже может быть создана, поэтому нужно попробовать ее найти и остановить
    spark = SparkSession
      .builder()
      .appName("DUMMY")
      .master("local[*]")
      .getOrCreate()
 
    spark.stop()
 
    fileSystem = hdfsCluster.getFileSystem()
 
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RANGE BUCKET TEST")
    sparkConf.set("fs.defaultFS", fileSystem.getUri.toString)
 
    spark = SparkSession
      .builder()
      .appName("PARTITION_TEST")
      .master("local[*]")
      .config(sparkConf)
      .withExtensions(e => {
        e.injectPlannerStrategy(_ => RepartitionStrategy)
      })
      .getOrCreate()
 
    spark.sparkContext.setLogLevel("WARN")
  }
 
  override def afterAll(): Unit = {
    spark.stop()
    FileUtil.fullyDelete(new File("./metastore_db"))
    FileUtil.fullyDelete(new File("./spark-warehouse"))
    FileUtil.fullyDelete(new File("./derby.log"))
    FileUtil.fullyDelete(new File("./chk"))
 
    super.afterAll()
  }
}
pom.xml для Maven
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
 
    <groupId>fine-grained</groupId>
    <artifactId>range-bucketing</artifactId>
    <version>0.1-SNAPSHOT</version>
 
    <properties>
        <spark.version>2.4.0</spark.version>
        <hadoop.version>3.1.1</hadoop.version>
        <scala.version>2.11.8</scala.version>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
 
    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-minicluster</artifactId>
            <version>${hadoop.version}</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>${spark.version}</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.scalatest</groupId>
            <artifactId>scalatest_2.11</artifactId>
            <version>3.1.1</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.scalactic</groupId>
            <artifactId>scalactic_2.11</artifactId>
            <version>3.1.1</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
 
    <build>
        <!--   Java sources path:     -->
        <!--        <resources>-->
        <!--            <resource>-->
        <!--                <directory>src/main/scala</directory>-->
        <!--            </resource>-->
        <!--        </resources>-->
        <!--   For JaCoCo:     -->
        <sourceDirectory>src/main/scala</sourceDirectory>
        <finalName>${project.artifactId}</finalName>
        <plugins>
            <!-- display active profile in compile phase -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-help-plugin</artifactId>
                <version>3.2.0</version>
                <executions>
                    <execution>
                        <id>show-profiles</id>
                        <phase>compile</phase>
                        <goals>
                            <goal>active-profiles</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <version>3.2.0</version>
                <configuration>
                    <archive>
                        <manifest>
                            <addDefaultImplementationEntries>true</addDefaultImplementationEntries>
                            <addDefaultSpecificationEntries>true</addDefaultSpecificationEntries>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
            <!-- scala and java mix compilation -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <scalaVersion>${scala.version}</scalaVersion>
                </configuration>
            </plugin>
 
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.7</version>
                <configuration>
                    <skipTests>true</skipTests>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.scalatest</groupId>
                <artifactId>scalatest-maven-plugin</artifactId>
                <version>1.0</version>
                <executions>
                    <execution>
                        <id>test</id>
                        <goals>
                            <goal>test</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.jacoco</groupId>
                <artifactId>jacoco-maven-plugin</artifactId>
                <version>0.8.5</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>prepare-agent</goal>
                        </goals>
                    </execution>
                    <!-- attached to Maven test phase -->
                    <execution>
                        <id>report</id>
                        <phase>test</phase>
                        <goals>
                            <goal>report</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
 
</project>

На этом статья закончена, а наша работа продолжается. Разработка расширения Spark требует множества решений, и не всегда они простые. Это видно и в исходном коде Spark, я думаю, разработчики сейчас реализовали бы многое иначе.

В любом случае, мы не обязаны пользоваться только существующими средствами, можно разработать что-то своё. Если вы хотите преобразовать данные, используя Spark, то можете выбрать готовое решение (HashPartitioner, RangePartitioner). И всё же, если если вы поняли, что нужно специфическое распределение данных, то не бойтесь сделать партишенер для своих нужд, и я уверен, что он будет лучше и полезнее!

Не пытайтесь заставить машину работать быстрее, постарайтесь сделать так, чтобы машина не выполняла ненужную работу, а нужной работы было как можно меньше. Тогда это будет не преждевременная, а осознанная и нужная оптимизация.

В следующей статье мы разберём сам DataSource, предназначенный для чтения данных, и средства работы с метаданными.

Теги:
Хабы:
Всего голосов 5: ↑4 и ↓1+6
Комментарии0

Информация

Сайт
www.sber.ru
Дата регистрации
Дата основания
Численность
свыше 10 000 человек
Местоположение
Россия