Сегодня я бы хотел рассказать о появившемся в версии 1.2 новом пакете, получившем название spark.ml. Он создан, чтобы обеспечить единый высокоуровневый API для алгоритмов машинного обучения, который поможет упростить создание и настройку, а также объединение нескольких алгоритмов в один конвейер или рабочий процесс. Сейчас на дворе у нас версия 1.4.1, и разработчики заявляют, что пакет вышел из альфы, хотя многие компоненты до сих пор помечены как Experimental или DeveloperApi.Ну что же, давайте проверим, что может новый пакет и насколько он хорош. Первым делом нам нужно познакомиться с основными понятиями, введёнными в spark.ml.
1. ML Dataset — spark.ml использует для работы с данными DataFrame из пакета spark.sql. DataFrame представляет собой распределённую коллекцию, в которой данные хранятся как именованные колонки. Концептуально DataFrame эквивалентны таблице в реляционной базе данных или такому типу данных как frame в R или Python, но с более богатой оптимизацией под капотом. (Примеры и способы работы будут приведены ниже в статье).
2. Transformer (модификатор) — это просто любой алгоритм, который может преобразовать один DataFrame в другой. Для примера: любая обученная модель является модификатором, поскольку преобразует набор характеристик (features) в предсказание (prediction)
3. Estimator (алгоритм оценки) — это алгоритм, который может выполнить преобразование из DataFrame в Transformer. К примеру, любой алгоритм обучения является также и алгоритмом оценки, т. к. он принимает набор данных для обучения и создаёт на выходе обученную модель.
4. Pipeline — конвейер, объединяющий любое количество модификаторов и алгоритмов оценки для создания рабочего процесса машинного обучения.
5. Param — общий тип, который используют модификаторы и алгоритмы оценки для задания параметров.
Согласно описанному интерфейсу, каждый Estimator должен иметь метод fit, принимающий DataFrame и возвращающий Transformer. В свою очередь, Transformer должен иметь метод transform, который преобразует одну DataFrame в другую.
В курсе Scalable Machine Learning в одной из лабораторных работ преподаватели, рассказывая о линейной регрессии, решали задачу «об определении года создания песни по набору аудио-характеристик». В ней было реализовано довольно много методов как для обработки данных, так и для оценки и нахождения лучшей модели. Сделано это было, чтобы более детально ознакомить студентов с основными процессами в машинном обучении, но давайте проверим, насколько облегчит нам жизнь пакет spark.ml.
В лабораторной работе нам были предоставлены уже подготовленные и немного обрезанные данные. Но так как нам интересно пройти весь путь, то предлагаю взять сырой набор данных. Каждая строка вида:
2007, 45.17809 46.34234 -40.65357 -2.47909 1.21253 -0.65302 -6.95536 -12.20040 17.02512 2.00002 -1.87785 9.85499 25.59837 1905.18577 3676.09074 1976.85531 913.11216 1957.52415 955.98525 942.72667 439.85991 591.66138 493.40770 496.38516 33.94285 -255.90134 -762.28079 -66.10935 -128.02217 198.12908 -34.44957 176.00397 -140.80069 -22.56380 12.77945 193.30164 314.20949 576.29519 -429.58643 -72.20157 59.59139 -5.12110 -182.15958 31.80120 -10.67380 -8.13459 -122.96813 208.69408 -138.66307 119.52244 -17.48938 75.58779 93.29243 85.83507 47.13972 312.85482 135.50478 -32.47886 49.67063 -214.73180 -77.83503 -47.26902 7.58366 -352.56581 -36.15655 -53.39933 -98.60417 -82.37799 45.81588 -16.91676 18.35888 -315.68965 -3.14554 125.45269 -130.18808 -3.06337 42.26602 -9.04929 26.41570 23.36165 -4.36742 -87.55285 -70.79677 76.57355 -7.71727 3.26926 -298.49845 11.49326 -89.21804 -15.09719
где первым идёт год, далее 12 чисел это средние тембры, и последние 78 это ковариации тембров.Первым делом нам нужно подтянуть эти данные в DataFrame, но сперва немного преобразуем формат данных:
val sc = new SparkContext("local[*]", "YearPrediction")
val rawData: RDD[(Double, linalg.Vector, linalg.Vector)] = sc.textFile("data/YearPredictionMSD.txt")
.map(_.split(','))
.map(x => (
x.head.toDouble,
Vectors.dense(x.tail.take(12).map(_.toDouble)),
Vectors.dense(x.takeRight(78).map(_.toDouble))
))
Теперь каждый элемент RDD это кортеж содержащий год и два вектора характеристик, чтобы получить DataFrame нужно выполнить ещё одно преобразование:
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val rawDF: DataFrame = labeledPointsRDD.toDF("label", "avg", "cov")
Обратите внимание, что мы создали sqlContext и подтянули методы неявного преобразования (в данном случае можно было написать
import sqlContext.implicits.rddToDataFrameHolder
) что-бы использовать метод toDF. Также мы указали имена колонок, и теперь структура данных будет выглядеть так: label | avg | cov
-------|-----------------------------------------|---------------------------------------------
2001 | [49.94357 21.47114 73.07750 8.74861... | [10.20556 611.10913 951.08960 698.11428...
-------|-----------------------------------------|---------------------------------------------
2007 | [50.57546 33.17843 50.53517 11.5521... | [44.38997 2056.93836 605.40696 457.4117...
Градиентный метод, который используется в линейной регрессии, чувствителен к разбросу значений характеристик, поэтому данные перед обучением нужно нормировать или стандартизировать. Для этих целей в пакете spark.ml.feature есть два класса: StandardScaler и Normalizer.
import org.apache.spark.ml.feature.{Normalizer, StandardScalerModel, StandardScaler}
val scalerAvg: StandardScalerModel = new StandardScaler()
.setWithMean(true)
.setWithStd(true)
.setInputCol("avg")
.setOutputCol("features")
// скармливаем наши сырые данные, чтобы алгоритм смог
// посчитать статистику (среднее значение и стандартное отклонение)
.fit(rawDF)
val normAvg: Normalizer = new Normalizer()
.setP(2.0)
.setInputCol("avg")
.setOutputCol("features")
Обратите внимание, что StandardScaler- это Estimator, а значит нам нужно вызвать метод fit, чтобы получить Transformer, в данном случае — StandardScalerModel. У всех классов, работающих с DataFrame, есть два общих метода:
setInputCol — задаём наименование колонки, с которой нужно считать данные
setOutputCol — указываем наименование колонки, в которую нужно записать преобразованные данные.
Отличия в результате работы этих классов в данном случае будет в том, что scaler вернёт данные в диапазоне от -1 до 1, а Normalizer в диапазоне от 0 до 1. Подробнее об алгоритмах работы можно почитать здесь и здесь.
Обучающую выборку мы подготовили (вернее получили модификаторы, которые мы будем применять для обработки ��анных), теперь нужно создать алгоритм оценки (Estimator), который на выходе даст нам обученную модель. Задаём почти стандартные настройки, на данном этапе они не особо интересны.
import org.apache.spark.ml.regression.LinearRegression
val linReg = new LinearRegression()
.setFeaturesCol("features")
.setLabelCol("label")
.setElasticNetParam(0.5)
.setMaxIter(500)
.setRegParam(1e-10)
.setTol(1e-6)
Вот теперь у нас есть всё необходимое, чтобы собрать простенький конвейер:
import org.apache.spark.ml.Pipeline
val pipeline = new Pipeline().setStages(Array(
normAvg,
linReg
))
У Pipeline есть метод setStages, принимающий массив шагов, которые будут выполнены в указанном порядке при поступлении обучающей выборки. Теперь, всё что нам осталось- это не забыть разделить данные на обучающую и тестовую выборку:
val splitedData = rawDF.randomSplit(Array(0.8, 0.2), 42).map(_.cache())
val trainData = splitedData(0)
val testData = splitedData(1)
Давайте запустим созданный нами конвейер и оценим результат его работы:
val pipelineModel = pipeline.fit(trainData)
val fullPredictions = pipelineModel.transform(testData)
val predictions = fullPredictions.select("prediction").map(_.getDouble(0))
val labels = fullPredictions.select("label").map(_.getDouble(0))
val rmseTest = new RegressionMetrics(predictions.zip(labels)).rootMeanSquaredError
> (2003.0,1999.6153819348176)
(1997.0,2000.9207184703566)
(1996.0,2000.4171327880172)
(1997.0,2002.022142263423)
(2000.0,1997.6327888556184)
RMSE: 10,552024
На этом этапе всё должно быть уже понятно, обратите внимание, что для оценки модели мы использовали готовый класс RegressionMetrics в котором, наряду с уже знакомой нам оценкой RMSE, реализованы также и другие базовые оценки.
Движемся дальше: в курсе Scalable Machine Learning мы создавали новые характеристики путём преобразования исходных в полином со степенью 2. Разработчики spark.ml позаботился и об этом: теперь нам достаточно создать ещё один модификатор и добавить его в конвейер; главное- в этом процессе не запутаться и правильно указать наименование колонок.
import org.apache.spark.ml.feature.PolynomialExpansion
// Создаём модификатор, который возьмёт данные из колонки "features" и созданный полином добавит в колонку "polyFeatures"
val polynomAvg = new PolynomialExpansion()
.setInputCol("features")
.setOutputCol("polyFeatures")
.setDegree(2)
// Указываем алгоритму оценки из какой колонки брать характеристики
linReg.setFeaturesCol("polyFeatures")
// И добавляем новый модификатор в конвейер
val pipeline = new Pipeline().setStages(Array(
normAvg,
polynomAvg,
linReg
))
До сих пор мы использовали для обучения только 12 характеристик, но помнится, в сырых данных были ещё 78, может, попробуем объединить их? И на этот случай у spark.ml есть решение VectorAssembler. Раз решили, давайте сделаем:
import org.apache.spark.ml.feature.VectorAssembler
val assembler = new VectorAssembler()
.setInputCols(Array("avg", "cov"))
.setOutputCol("united")
normAvg.setInputCol("united")
val pipeline = new Pipeline().setStages(Array(
assembler,
normAvg,
polynomAvg,
linReg
))
С подготовкой данных мы немного разобрались, но остался вопрос подбора оптимальных параметров для алгоритма, уж очень не хочется делать это вручную, и не надо! Для этой цели в spark.ml реализован класс CrossValidator. CrossValidator принимает алгоритм оценки (в нашем случае это linReg), набор параметров которые мы хотели бы испытать и средство оценки (когда мы оценивали модель вручную, то использовали RMSE). CrossValidator начинает свою работу с того, что разбивает набор данных на несколько образцов (k по умолчанию 3), случайным образом выбирая обучающую и валидационную выборку (валидационная выборка будет составлять по размеру 1/k от исходной). Затем, для каждого набора параметров на каждом из образцов будет произведено обучение модели, оценка её эффективности и выбор лучшей модели. Надо отметить, что выбор модели через CrossValidator достаточно затратная по времени операция, но является статистически более обоснованным, чем эвристический ручной подбор.
Для удобства создания набора параметров в spark.ml есть класс-утилита ParamGridBuilder, его мы и используем:
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
val paramGrid: Array[ParamMap] = new ParamGridBuilder()
.addGrid(linReg.maxIter, Array(5, 500))
.addGrid(linReg.regParam, Array(1e-15, 1e-10))
.addGrid(linReg.tol, Array(1e-9, 1e-6, 1e-3))
.addGrid(linReg.elasticNetParam, Array(0, 0.5, 1))
.build()
val crossVal = new CrossValidator()
.setEstimator(pipeline)
.setEvaluator(new RegressionEvaluator)
.setEstimatorParamMaps(paramGrid)
.setNumFolds(3)
val bestModel = crossVal.fit(trainData)
> Best set of parameters:
{
linReg_3a964d0300fd-elasticNetParam: 0.5,
linReg_3a964d0300fd-maxIter: 500,
linReg_3a964d0300fd-regParam: 1.0E-15,
linReg_3a964d0300fd-tol: 1.0E-9
}
Best cross-validation metric: -10.47433119891316
Ну вот наверное и всё, что касается линейной регрессии, для алгоритмов классификации и кластеризации в spark.ml также есть набор решений, готовых помочь удобно организовать рабочий процесс.
Используемые материалы:
Официальная документация
UCI Machine Learning Repository
Scalable Machine Learning
