Повышаем производительность ваших запросов чтения на больших датасетах
Один из современных способов хранения большущего объема данных для платформ обработки и анализа данных - это распределение каждого датасета между несколькими узлами в кластере. Если мы используем облако, то весь датасет разделяется на множество объектов. Это может привести к появлению “слишком большого количества небольших файлов” что является хорошо известной проблемой в области Big Data. Формирование небольших файлов происходит по нескольким причинам, например, при сохранении входящих потоковых данных, сообщение за сообщением, при партиционировании по ключу с перекосом данных и т.д. Драйвер должен следить за изменениями метаданных всех файлов, чтобы планировать распределенную обработку данных при сохранении или чтении данных датасета используя Namenode, MapReduce или задачи Spark. Когда файлов слишком много, для хранения их метаданных требуется дополнительная память, а при их перечислении этих данных требуется гораздо больше времени на сетевое взаимодействие.
Во время работы в Datalake вы могли заметить, что при выполнении задачи Spark затрачивается слишком много времени на чтение датасета из s3/HDFS, где нужно подождать, даже чтобы увидеть запущенные экзекьюторы. Или вы могли заметить, что вашему Hive запросу может понадобиться несколько минут, чтобы инициировать задачи. Скорее всего, причина в том, что изначально драйвер большую часть времени тратит на просмотр всех метаданных файлов/объектов датасета в s3, особенно когда небольших файлов слишком много. Это связано с тем, что именно драйвер выполняет перечисление файлов в датасете, оценивает размер/партиции, а затем распределяет работу между экзекьюторами. Таким образом, слишком большое количество небольших файлов может привести к снижению производительности, а в худшем случае драйвер может поймать исключение из-за нехватки памяти.
Рассмотрим простую задачу Spark, которая может принимать датасет и предполагаемый размер отдельного выходного файла и объединять входной датасет в файлы большего размера, что в конечном итоге сокращает количество файлов. Не рекомендуется также использовать файлы очень большого размера. Нормой являются файлы размером 1 ГБ или 512 МБ.
Полный скрипт доступен здесь. А сейчас давайте вместе пройдемся по нему….
...
Драйвер:
val (inputBucket, prefix) = getBucketNameAndPrefix(args(1))
val targetDirectory = args(2)
val maxIndividualMergedFileSize = args(3).toLong
val inputDirs = listDirectoriesInS3(inputBucket, prefix).map(prefix => "s3://" + inputBucket + "/" + prefix)
logger.info(s"Total directories found : ${inputDirs.size}")
val startedAt = System.currentTimeMillis()
//You may want to tweak the following to set how many input directories to process in parallel
val forkJoinPool = new ForkJoinPool(inputDirs.size)
val parallelBatches = inputDirs.par
parallelBatches.tasksupport = new ForkJoinTaskSupport(forkJoinPool)
parallelBatches.foreach(dir => {
val (srcBkt, prefix) = getBucketNameAndPrefix(dir)
logger.info(s"Working on ::=> SourceBkt : $srcBkt,Prefix:$prefix")
//Step 1 : Get file sizes
val fileSizesMap = getFileSizes(inputBucket, prefix)
//Step 2 : Group them based on target file size
val grouped = makeMergeBatches(fileSizesMap, maxIndividualMergedFileSize)
val sizedStripped = stripSizesFromFileNames(grouped)
val finalSourceFileNames = addS3BucketNameToFileNames(sizedStripped, bucketName = inputBucket)
//Step 3 : Merge files and write them out
mergeFiles(spark, finalSourceFileNames, targetDirectory)
})
Это главный метод, который принимает три параметра: 1) Исходный путь s3, где находятся небольшие файлы 2) Целевой путь s3, в который задача записывает объединенные файлы и 3) Максимальный целевой размер одного объединенного файла.
Этот скрипт предполагает, что входной каталог (строка #5) содержит подкаталоги, в которых фактически находятся конечные файлы. Как правило, это свойственно партиционным датасетам. Однако, если у нас есть только один каталог, в котором содержатся все файлы, вы можете поправить строку #5, чтобы передавать в список всего один элемент.
Также стоит взглянуть на строки #9 - #11, в которых используются параллельные коллекции Scala для параллельного сабмита нескольких задач spark. Да, вы не ошиблись. Мы можем запускать параллельные экшены Spark по сабмиту из каждого потока. Как видите, вся многопоточность абстрагируется за счет использования параллельных коллекций. Строка #9 задает степень параллелизма, то есть сколько каталогов обрабатывается параллельно. Здесь я просто установил его равным количеству подкаталогов. Если их больше сотни, я бы посоветовал запускать их батчами, поскольку можно перегрузить кластер (конечно, это зависит от размера вашего кластера).
Начиная со строки #12, работа начинается с обхода каждого входного каталога и их параллельной обработки. Весь процесс для каждого каталога можно разбить на три этапа:
Шаг 1: Определите размеры файлов
def getFileSizes(bucketName: String, prefix: String): scala.collection.immutable.Map[String, Long] = {
val s3 = AmazonS3ClientBuilder.standard().withRegion(Regions.US_EAST_1).build()
val listing = s3.listObjectsV2(bucketName, prefix)
val files = listing.getObjectSummaries.asScala.map(_.getKey).filter(!_.split("/").last.startsWith("_"))
val filesSizeMap = collection.mutable.Map[String, Long]()
files.foreach(obj => {
val meta = s3.getObjectMetadata(new GetObjectMetadataRequest(bucketName, obj))
filesSizeMap += (obj -> meta.getContentLength)
})
filesSizeMap.toMap
}
Это просто. Я просто перечисляю файлы, присутствующие в каталоге, используя библиотеку “com.amazonaws:aws-java-sdk”. При составлении списка я также определяю размер каждого объекта, и затем возвращаю вызывающей стороне Map <file-prefix, size>.
Шаг 2: Определите группы файлов для объединения
def makeMergeBatches(fileSizesMap: scala.collection.immutable.Map[String, Long], maxTargetFileSize: Long): ListBuffer[ListBuffer[String]] = {
val sortedFileSizes = fileSizesMap.toSeq.sortBy(_._2)
val groupedFiles = ListBuffer[ListBuffer[String]]()
groupedFiles += ListBuffer[String]()
for (aFile <- smallerFiles) {
val lastBatch = groupedFiles.last
if ((sizeOfThisBatch(lastBatch) + aFile._2) < maxTargetFileSize) {
lastBatch += aFile._1 + "|" + aFile._2.toString
} else {
val newBatch = ListBuffer[String]()
newBatch += aFile._1 + "|" + aFile._2.toString
groupedFiles += newBatch
}
}
groupedFiles
}
На этом этапе мы определяем группу файлов небольшого размера, которые можно объединить. Как видите, метод принимает Map<file,size> и размер целевого файла. После сортировки карты по размеру файлов она группирует несколько файлов в батч, так что общий размер батча меньше или равен целевому размеру файла. Конечно, результат кластеризации может быть неудачным, поскольку в итоге мы можем получить файлы, размер которых меньше целевого файла, но их невозможно объединить с другими такими файлами, поскольку общий размер батча может превышать целевой размер файла. К счастью, поскольку мы сортируем карту по размеру файлов в возрастающем порядке, файлы наименьшего размера будут сгруппированы вместе, и останутся только те файлы, которые по размеру близки к целевому размеру файла. Наконец, функция возвращает List [List [file]], который содержит список батчей файлов, которые должны быть объединены в один файл.
Шаг 3. Объедините файлы и запишите их
def mergeFiles(spark: SparkSession, grouped: ListBuffer[ListBuffer[String]], targetDirectory: String): Unit = {
val startedAt = System.currentTimeMillis()
val forkJoinPool = new ForkJoinPool(grouped.size)
val parllelBatches = grouped.par
parllelBatches.tasksupport = new ForkJoinTaskSupport(forkJoinPool)
parllelBatches foreach (batch => {
logger.debug(s"Merging ${batch.size} files into one")
try {
spark.read.parquet(batch.toList: _*).coalesce(1).write.mode("append").parquet(targetDirectory.stripSuffix("/") + "/")
} catch {
case e: Exception => logger.error(s"Error while processing batch $batch : ${e.getMessage}")
}
})
logger.debug(s"Total Time taken to merge this directory: ${(System.currentTimeMillis() - startedAt) / (1000 * 60)} mins")
}
Последний шаг - это этап объединения сгруппированных файлов (подготовленных на 2-ом шаге) в один файл. Как вы можете догадаться - это несложно. Прочтите файлы (в примере кода выше - файл Parquet, но это может быть файл любого формата) с помощью функции spark.read()
, передав список файлов этой группе, а затем используйте coalesce(1)
, чтобы объединить их в один. Вот еще одна интересная деталь. Мы снова используем параллельные коллекции для выполнения объединения, поэтому объединение каждой группы в одном каталоге происходит параллельно. Итак, если есть m каталогов и n групп в каждом каталоге, будет запущено m x n параллельных задач.
Режим записи - “append”, так как во время записи объединенного файла , как я уже упоминал ранее, несколько каталогов обрабатываются параллельно, и поскольку целевой каталог одинаков для всех, режим “overwrite” завершится ошибкой с сообщением “Directory already exists”. Необходимо убедиться, что целевой каталог новый.
...
На этом все. Теперь даже KiB файлы будут объединены в МБ или ГБ. Вы заметите повышение производительности ваших запросов на чтение при сканировании меньшего количества файлов.
Что дальше?
1. Одно из возможных улучшений этого скрипта состоит в том, чтобы вместо объединения уже существующих файлов оптимально объединять датасет перед выполнением операций IO с помощью задачи записи. Я поделюсь в другой публикации, как это можно сделать. Между тем, этот скрипт можно использовать для объединения исторических датасетов.
2. Как я уже упоминал, шаг#2, который определяет группу файлов для объединения, не всегда удачен. Если вы знаете алгоритм кластеризации, который может сделать это оптимальным образом, чтобы после объединения было минимальное количество файлов, добро пожаловать в PR :-)
Материал подготовлен в рамках курса «Data Engineer». Всех желающих приглашаем на открытый урок «Введение в оркестрацию». На занятии подробно разберем, что такое платформы оркестрации, какие решения есть сегодня на рынке и даже углубимся в практический пример использования одной из самых распространенных платформ на сегодня: Apache Airflow.