Сегодня я бы хотел рассказать о появившемся в версии 1.2 новом пакете, получившем название spark.ml. Он создан, чтобы обеспечить единый высокоуровневый API для алгоритмов машинного обучения, который поможет упростить создание и настройку, а также объединение нескольких алгоритмов в один конвейер или рабочий процесс. Сейчас на дворе у нас версия 1.4.1, и разработчики заявляют, что пакет вышел из альфы, хотя многие компоненты до сих пор помечены как Experimental или DeveloperApi.

Ну что же, давайте проверим, что может новый пакет и насколько он хорош. Первым делом нам нужно познакомиться с основными понятиями, введёнными в spark.ml.

1. ML Dataset — spark.ml использует для работы с данными DataFrame из пакета spark.sql. DataFrame представляет собой распределённую коллекцию, в которой данные хранятся как именованные колонки. Концептуально DataFrame эквивалентны таблице в реляционной базе данных или такому типу данных как frame в R или Python, но с более богатой оптимизацией под капотом. (Примеры и способы работы будут приведены ниже в статье).

2. Transformer (модификатор) — это просто любой алгоритм, который может преобразовать один DataFrame в другой. Для примера: любая обученная модель является модификатором, поскольку преобразует набор характеристик (features) в предсказание (prediction)

3. Estimator (алгоритм оценки) — это алгоритм, который может выполнить преобразование из DataFrame в Transformer. К примеру, любой алгоритм обучения является также и алгоритмом оценки, т. к. он принимает набор данных для обучения и создаёт на выходе обученную модель.

4. Pipeline — конвейер, объединяющий любое количество модификаторов и алгоритмов оценки для создания рабочего процесса машинного обучения.

5. Param — общий тип, который используют модификаторы и алгоритмы оценки для задания параметров.

Согласно описанному интерфейсу, каждый Estimator должен иметь метод fit, принимающий DataFrame и возвращающий Transformer. В свою очередь, Transformer должен иметь метод transform, который преобразует одну DataFrame в другую.

В курсе Scalable Machine Learning в одной из лабораторных работ преподаватели, рассказывая о линейной регрессии, решали задачу «об определении года создания песни по набору аудио-характеристик». В ней было реализовано довольно много методов как для обработки данных, так и для оценки и нахождения лучшей модели. Сделано это было, чтобы более детально ознакомить студентов с основными процессами в машинном обучении, но давайте проверим, насколько облегчит нам жизнь пакет spark.ml.

В лабораторной работе нам были предоставлены уже подготовленные и немного обрезанные данные. Но так как нам интересно пройти весь путь, то предлагаю взять сырой набор данных. Каждая строка вида:
2007, 45.17809 46.34234 -40.65357 -2.47909 1.21253 -0.65302 -6.95536 -12.20040 17.02512 2.00002 -1.87785 9.85499 25.59837 1905.18577 3676.09074 1976.85531 913.11216 1957.52415 955.98525 942.72667 439.85991 591.66138 493.40770 496.38516 33.94285 -255.90134 -762.28079 -66.10935 -128.02217 198.12908 -34.44957 176.00397 -140.80069 -22.56380 12.77945 193.30164 314.20949 576.29519 -429.58643 -72.20157 59.59139 -5.12110 -182.15958 31.80120 -10.67380 -8.13459 -122.96813 208.69408 -138.66307 119.52244 -17.48938 75.58779 93.29243 85.83507 47.13972 312.85482 135.50478 -32.47886 49.67063 -214.73180 -77.83503 -47.26902 7.58366 -352.56581 -36.15655 -53.39933 -98.60417 -82.37799 45.81588 -16.91676 18.35888 -315.68965 -3.14554 125.45269 -130.18808 -3.06337 42.26602 -9.04929 26.41570 23.36165 -4.36742 -87.55285 -70.79677 76.57355 -7.71727 3.26926 -298.49845 11.49326 -89.21804 -15.09719
где первым идёт год, далее 12 чисел это средние тембры, и последние 78 это ковариации тембров.

Первым делом нам нужно подтянуть эти данные в DataFrame, но сперва немного преобразуем формат данных:
  val sc = new SparkContext("local[*]", "YearPrediction")
  val rawData: RDD[(Double, linalg.Vector, linalg.Vector)] = sc.textFile("data/YearPredictionMSD.txt")
    .map(_.split(','))
    .map(x => (
      x.head.toDouble,
      Vectors.dense(x.tail.take(12).map(_.toDouble)),
      Vectors.dense(x.takeRight(78).map(_.toDouble))
    ))

Теперь каждый элемент RDD это кортеж содержащий год и два вектора характеристик, чтобы получить DataFrame нужно выполнить ещё одно преобразование:
  val sqlContext = new SQLContext(sc)
  import sqlContext.implicits._
  val rawDF: DataFrame = labeledPointsRDD.toDF("label", "avg", "cov")

Обратите внимание, что мы создали sqlContext и подтянули методы неявного преобразования (в данном случае можно было написать import sqlContext.implicits.rddToDataFrameHolder
) что-бы использовать метод toDF. Также мы указали имена колонок, и теперь структура данных будет выглядеть так:
  label | avg                                     | cov
 -------|-----------------------------------------|---------------------------------------------
  2001  | [49.94357 21.47114 73.07750 8.74861...  | [10.20556 611.10913 951.08960 698.11428...
 -------|-----------------------------------------|---------------------------------------------
  2007  | [50.57546 33.17843 50.53517 11.5521...  | [44.38997 2056.93836 605.40696 457.4117...

Градиентный метод, который используется в линейной регрессии, чувствителен к разбросу значений характеристик, поэтому данные перед обучением нужно нормировать или стандартизировать. Для этих целей в пакете spark.ml.feature есть два класса: StandardScaler и Normalizer.
  import org.apache.spark.ml.feature.{Normalizer, StandardScalerModel, StandardScaler}
  val scalerAvg: StandardScalerModel = new StandardScaler()
    .setWithMean(true)
    .setWithStd(true)
    .setInputCol("avg")
    .setOutputCol("features")
    // скармливаем наши сырые данные, чтобы алгоритм смог
    // посчитать статистику (среднее значение и стандартное отклонение)
    .fit(rawDF)

  val normAvg: Normalizer = new Normalizer()
    .setP(2.0)
    .setInputCol("avg")
    .setOutputCol("features")

Обратите внимание, что StandardScaler- это Estimator, а значит нам нужно вызвать метод fit, чтобы получить Transformer, в данном случае — StandardScalerModel. У всех классов, работающих с DataFrame, есть два общих метода:
setInputCol — задаём наименование колонки, с которой нужно считать данные
setOutputCol — указываем наименование колонки, в которую нужно записать преобразованные данные.

Отличия в результате работы этих классов в данном случае будет в том, что scaler вернёт данные в диапазоне от -1 до 1, а Normalizer в диапазоне от 0 до 1. Подробнее об алгоритмах работы можно почитать здесь и здесь.

Обучающую выборку мы подготовили (вернее получили модификаторы, которые мы будем применять для обработки ��анных), теперь нужно создать алгоритм оценки (Estimator), который на выходе даст нам обученную модель. Задаём почти стандартные настройки, на данном этапе они не особо интересны.
  import org.apache.spark.ml.regression.LinearRegression

  val linReg = new LinearRegression()
    .setFeaturesCol("features")
    .setLabelCol("label")
    .setElasticNetParam(0.5)
    .setMaxIter(500)
    .setRegParam(1e-10)
    .setTol(1e-6)

Вот теперь у нас есть всё необходимое, чтобы собрать простенький конвейер:
  import org.apache.spark.ml.Pipeline

  val pipeline = new Pipeline().setStages(Array(
    normAvg,
    linReg
  ))

У Pipeline есть метод setStages, принимающий массив шагов, которые будут выполнены в указанном порядке при поступлении обучающей выборки. Теперь, всё что нам осталось- это не забыть разделить данные на обучающую и тестовую выборку:
  val splitedData = rawDF.randomSplit(Array(0.8, 0.2), 42).map(_.cache())
  val trainData = splitedData(0)
  val testData = splitedData(1)

Давайте запустим созданный нами конвейер и оценим результат его работы:
  val pipelineModel = pipeline.fit(trainData)
  val fullPredictions = pipelineModel.transform(testData)
  val predictions = fullPredictions.select("prediction").map(_.getDouble(0))
  val labels = fullPredictions.select("label").map(_.getDouble(0))
  val rmseTest = new RegressionMetrics(predictions.zip(labels)).rootMeanSquaredError

  > (2003.0,1999.6153819348176)
    (1997.0,2000.9207184703566)
    (1996.0,2000.4171327880172)
    (1997.0,2002.022142263423)
    (2000.0,1997.6327888556184)
  RMSE: 10,552024

На этом этапе всё должно быть уже понятно, обратите внимание, что для оценки модели мы использовали готовый класс RegressionMetrics в котором, наряду с уже знакомой нам оценкой RMSE, реализованы также и другие базовые оценки.

Движемся дальше: в курсе Scalable Machine Learning мы создавали новые характеристики путём преобразования исходных в полином со степенью 2. Разработчики spark.ml позаботился и об этом: теперь нам достаточно создать ещё один модификатор и добавить его в конвейер; главное- в этом процессе не запутаться и правильно указать наименование колонок.
  import org.apache.spark.ml.feature.PolynomialExpansion

  // Создаём модификатор, который возьмёт данные из колонки "features" и созданный полином добавит в колонку "polyFeatures"
  val polynomAvg = new PolynomialExpansion()
    .setInputCol("features")
    .setOutputCol("polyFeatures")
    .setDegree(2)

  // Указываем алгоритму оценки из какой колонки брать характеристики
  linReg.setFeaturesCol("polyFeatures")

  // И добавляем новый модификатор в конвейер
  val pipeline = new Pipeline().setStages(Array(
    normAvg,
    polynomAvg,
    linReg
  ))


До сих пор мы использовали для обучения только 12 характеристик, но помнится, в сырых данных были ещё 78, может, попробуем объединить их? И на этот случай у spark.ml есть решение VectorAssembler. Раз решили, давайте сделаем:
  import org.apache.spark.ml.feature.VectorAssembler

  val assembler = new VectorAssembler()
    .setInputCols(Array("avg", "cov"))
    .setOutputCol("united")

  normAvg.setInputCol("united")

  val pipeline = new Pipeline().setStages(Array(
    assembler,
    normAvg,
    polynomAvg,
    linReg
  ))

С подготовкой данных мы немного разобрались, но остался вопрос подбора оптимальных параметров для алгоритма, уж очень не хочется делать это вручную, и не надо! Для этой цели в spark.ml реализован класс CrossValidator. CrossValidator принимает алгоритм оценки (в нашем случае это linReg), набор параметров которые мы хотели бы испытать и средство оценки (когда мы оценивали модель вручную, то использовали RMSE). CrossValidator начинает свою работу с того, что разбивает набор данных на несколько образцов (k по умолчанию 3), случайным образом выбирая обучающую и валидационную выборку (валидационная выборка будет составлять по размеру 1/k от исходной). Затем, для каждого набора параметров на каждом из образцов будет произведено обучение модели, оценка её эффективности и выбор лучшей модели. Надо отметить, что выбор модели через CrossValidator достаточно затратная по времени операция, но является статистически более обоснованным, чем эвристический ручной подбор.

Для удобства создания набора параметров в spark.ml есть класс-утилита ParamGridBuilder, его мы и используем:
  import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}

  val paramGrid: Array[ParamMap] = new ParamGridBuilder()
    .addGrid(linReg.maxIter, Array(5, 500))
    .addGrid(linReg.regParam, Array(1e-15, 1e-10))
    .addGrid(linReg.tol, Array(1e-9, 1e-6, 1e-3))
    .addGrid(linReg.elasticNetParam, Array(0, 0.5, 1))
    .build()

  val crossVal = new CrossValidator()
    .setEstimator(pipeline)
    .setEvaluator(new RegressionEvaluator)
    .setEstimatorParamMaps(paramGrid)
    .setNumFolds(3)  

  val bestModel = crossVal.fit(trainData) 

  > Best set of parameters:
    {
	  linReg_3a964d0300fd-elasticNetParam: 0.5,
	  linReg_3a964d0300fd-maxIter: 500,
	  linReg_3a964d0300fd-regParam: 1.0E-15,
	  linReg_3a964d0300fd-tol: 1.0E-9
    }
    Best cross-validation metric: -10.47433119891316 

Ну вот наверное и всё, что касается линейной регрессии, для алгоритмов классификации и кластеризации в spark.ml также есть набор решений, готовых помочь удобно организовать рабочий процесс.

Используемые материалы:
Официальная документация
UCI Machine Learning Repository
Scalable Machine Learning