Если вы работаете с данными, вы конечно же встречались с форматом CSV. Этот формат — настоящая «лингва франка» в мире данных. В нём хранятся выгрузки из баз данных, логи работы приложений, отчёты за произвольные периоды и данные для обмена между системами. CSV настолько вездесущ, что его поддержка есть буквально в каждом инструменте — от Excel до Hadoop.
Но за кажущейся простотой скрывается множество подводных камней. Сколько раз вы сталкивались с ситуацией, когда простая загрузка CSV превращалась в часы отладки? Неправильная кодировка, запятые внутри полей, экранированные кавычки, пропущенные значения — всё это может сломать пайплайн обработки данных.
В этой статье мы разберём, как правильно загружать CSV в Apache Spark — распределённую вычислительную систему, ставшую стандартом для обработки больших данных. Это первый и самый важный шаг в знакомстве с API Spark и основа для любой последующей обработки.
Базовое чтение CSV
Для того, чтобы разобраться с CSV, давайте начнём с самого простого примера. У нас есть файл people.csv со следующим содержимым:
name,age,city Alice,30,Moscow Bob,25,St. Petersburg Charlie,35,Kazan |
Теперь просто загрузим его в Spark:
from pyspark.sql import SparkSession # Создаём Spark-сессию spark = SparkSession.builder \ .appName("CSV Introduction") \ .getOrCreate() # Читаем CSV-файл df = spark.read.csv("people.csv", header=True, inferSchema=True) # Показываем содержимое df.show()
Получаем следующий вывод:
+-------+---+-------------+ | name|age| city| +-------+---+-------------+ | Alice| 30 | Moscow| | Bob| 25 |St. Petersburg| |Charlie| 35 | Kazan| +-------+---+-------------+
Что здесь произошло?
header=True — первая строка файла используется как имена столбцов.
inferSchema=True — Spark автоматически определяет типы данных столбцов.
Посмотрим на схему:
df.printSchema()
Вывод:
root |-- name: string (nullable = true) |-- age: integer (nullable = true) |-- city: string (nullable = true)
Spark правильно определил, что age — это целое число. Похоже на магию, не правда ли? Но за этой магией скрывается важный нюанс: вывод схемы требует прохода по данным. Если файл огромен, этот проход может занять время.
Теперь давайте все немного усложним и рассмотрим ситуацию, когда CSV пытается вас обмануть. Вот реальный пример из жизни:
'name','age','city' "Alice","30","Moscow, Russia" "Bob","25","St. Petersburg" "Charlie","35","Kazan" |
Обратите внимание: в первой строке значения в кавычках, а в поле city у Alice есть запятая. Если мы прочитаем этот файл стандартным способом, получим ошибку. Нужно настроить парсер.
Что предлагает Spark
Spark предоставляет десятки опций для тонкой настройки парсинга CSV. Вот самые важные из них:
df = spark.read \ .format("csv") \ .option("header", "true") \ .option("inferSchema", "true") \ .option("delimiter", ",") \ .option("quote", "\"") \ .option("escape", "\"") \ .option("multiLine", "true") \ .option("ignoreLeadingWhiteSpace", "true") \ .option("ignoreTrailingWhiteSpace", "true") \ .option("nullValue", "null") \ .option("emptyValue", "") \ .option("mode", "PERMISSIVE") \ .load("people_complex.csv")
Разберём каждую опцию:
Опция | Значение по умолчанию | Описание |
delimiter | , | Разделитель полей. Может быть табуляцией (\t), точкой с запятой (;) и т.д. |
quote | " | Символ для обрамления полей, содержащих спецсимволы. |
escape | \ | Символ экранирования внутри кавычек. |
multiLine | false | Разрешить перенос строк внутри полей, обрамлённых кавычками. |
ignoreLeadingWhiteSpace | false | Игнорировать пробелы в начале полей. |
ignoreTrailingWhiteSpace | false | Игнорировать пробелы в конце полей. |
nullValue | пустая строка | Значение, которое интерпретируется как null |
emptyValue | пустая строка | Значение для пустых полей |
mode | PERMISSIVE | Режим обработки ошибок парсинга |
Режим mode определяет, что делать с "битыми" строками. В режиме PERMISSIVE, используемом по умолчанию Spark записывает битые строки в специальное поле corruptrecord и продолжает работу. Это позволяет не терять данные, но требует последующей очистки. При использовании DROPMALFORMED строки, которые не удалось распарсить, просто игнорируются.
И в режиме FAILFAST при первой же ошибке парсинга Spark выбрасывает исключение и останавливается.
Выбор режима зависит от задачи. Если данные критически важны, используйте PERMISSIVE и разбирайтесь с битыми строками отдельно. Если вы уверены в качестве данных и хотите жёсткого контроля — FAILFAST.
Вот пример использования FAILFAST:
df = spark.read \ .option("mode", "FAILFAST") \ .csv("people.csv")
Давайте вернёмся к нашему примеру с запятой внутри поля. Правильное чтение CSV будет выглядеть так:
df = spark.read \ .option("header", "true") \ .option("inferSchema", "true") \ .option("quote", "\"") \ .option("escape", "\"") \ .csv("people_with_commas.csv") df.show(truncate=False)
Вывод:
+-------+---+-----------------+ |name |age|city | +-------+---+-----------------+ |Alice |30 |Moscow, Russia | |Bob |25 |St. Petersburg | |Charlie|35 |Kazan | +-------+---+-----------------+
Как мы видим, Spark правильно распарсил поле city, сохранив запятую внутри.
Задаем схему
Использование inferSchema=True конечно удобно, но у данной опции есть ряд недостатков. Так, как Spark делает дополнительный проход по данным для определения типов на больших файлах это может создавать дополнительные задержки в работе.
А еще Spark может ошибиться. Например, если в столбце age первые 1000 строк — числа, а 1001-я строка — строка "unknown", Spark может определить тип как string, хотя большую часть времени вы работали с числами.
Ну и кроме того, явное задание схемы делает код самодокументированным и понятным.
В приведенном ниже примере показано как можно задать схему вручную.
from pyspark.sql.types import StructType, StructField, StringType, IntegerType # Определяем схему schema = StructType([ StructField("name", StringType(), True), StructField("age", IntegerType(), True), StructField("city", StringType(), True) ]) # Читаем CSV с явной схемой df = spark.read \ .option("header", "true") \ .schema(schema) \ .csv("people.csv") df.printSchema()
Здесь параметр True в StructField означает, что поле может содержать null (nullable).
Когда мы работаем в продуктивной среде, необходимо обязательно использовать явную схему. Продакшен-код не должен полагаться на автоматическое определение типов. Также явная схема нужна при работе с большими данными. Экономия одного прохода по данным может дать существенный прирост производительности. И в случае, если типы данных известны заранее. Например, вы знаете, что age — всегда целое число.
А если файлов много
В реальной жизни данные редко лежат в одном файле. Чаще это целые директории с партициями подобно приведенным ниже:
/logs/ /date=2023-01-01/ part-00000.csv part-00001.csv /date=2023-01-02/ part-00000.csv part-00001.csv |
Для того, чтобы прочесть содержимое в подкаталогах /logs достаточно выполнить:
df = spark.read.csv("/logs/*/*.csv", header=True)
Spark прочитает все CSV-файлы, соответствующие паттерну, и объединит их в один DataFrame. Важное условие: у всех файлов должна быть одинаковая схема.
Ну а если нам требуется рекурсивный обход подкаталогов, необходимо выполнить следующий код:
df = spark.read \ .option("recursiveFileLookup", "true") \ .csv("/logs/")
Опция recursiveFileLookup заставляет Spark рекурсивно обходить все поддиректории.
Также, если ваши данные организованы в стиле Hive-партиционирования (директории вида key=value), Spark может автоматически распознать партиции:
df = spark.read \ .option("basePath", "/logs/") \ .csv("/logs/date=2023-01-01/*.csv")
Этот подход особенно полезен, если вы планируете потом писать данные после обработки обратно в партиционированном виде.
Оптимизация чтения
Когда речь идёт о терабайтах данных, даже чтение CSV может стать узким местом. Вот несколько советов по оптимизации.
Прежде всего, важно понимать, что CSV — это не самый эффективный формат для хранения. Он удобный для чтения человеком, но не эффективен для хранения. Если вы часто читаете одни и те же данные, конвертируйте их в Parquet — колоночный формат, который Spark читает в разы быстрее.
# Читаем CSV один раз df = spark.read.csv("huge_data.csv", header=True) # Сохраняем в Parquet df.write.parquet("huge_data.parquet") # Теперь читаем Parquet (быстро!) df_fast = spark.read.parquet("huge_data.parquet")
Parquet не только быстрее читается, но и занимает меньше места благодаря сжатию. По умолчанию Spark создаёт столько партиций, сколько блоков в файловой системе (обычно 128 МБ на партицию). Но при необходимости это можно настроить:
df = spark.read \ .option("maxPartitionBytes", 256 1024 1024) \ # 256 МБ на партицию .csv("large_file.csv")
Увеличение размера партиции уменьшает их количество, что может ускорить обработку, если у вас мало ресурсов.
Выборочное чтение столбцов
В Spark 3.0+ появилась возможность читать только нужные столбцы напрямую из CSV (раньше это работало только для колоночных форматов):
df = spark.read \ .option("header", "true") \ .csv("people.csv") \ .select("name", "city") # Spark оптимизирует чтение, прочитает только эти столбцы
Эта оптимизация работает не всегда, но в большинстве случаев Spark достаточно умён, чтобы не читать лишнего.
Ошибки и грязные данные
В реальном мире данные никогда не бывают идеальными. Рассмотрим типичные проблемы и способы их решения.
Начнем с битых строк и использования режима PERMISSIVE. В примере ниже мы сначала открываем CSV в режиме PERMESSIVE, затем отделяем хорошие строки от битых. Битые строки предлагается далее подвергнуть дополнительному анализу.
df = spark.read \ .option("header", "true") \ .option("mode", "PERMISSIVE") \ .option("columnNameOfCorruptRecord", "_corrupt_record") \ .csv("dirty_data.csv") # Отделяем хорошие строки от битых good_df = df.filter(df["_corrupt_record"].isNull()) bad_df = df.filter(df["_corrupt_record"].isNotNull()) # Анализируем битые строки bad_df.select("_corrupt_record").show(truncate=False)
Этот подход позволяет не терять данные, но требует дополнительной обработки для тех данных, которые Spark признал плохими.
Когда у нас данных слишком много, мы можем позволить себе просто пропускать битые строки, как это показано в примере ниже.
df = spark.read \ .option("header", "true") \ .option("mode", "DROPMALFORMED") \ # Просто пропускаем битые строки .option("dateFormat", "yyyy-MM-dd") \ # Формат даты .option("timestampFormat", "yyyy-MM-dd HH:mm:ss") \ .csv("data.csv")
Для более сложных случаев можно прочитать CSV как текст и обработать построчно:
# Читаем как обычный текст raw_df = spark.read.text("problematic_data.csv") # Применяем пользовательскую функцию для парсинга from pyspark.sql.functions import udf from pyspark.sql.types import StructType, StructField, StringType, IntegerType def parse_line(line): parts = line.split(',') if len(parts) >= 3: return (parts[0].strip('"'), int(parts[1]), parts[2].strip('"')) return None parse_udf = udf(parse_line, schema) parsed_df = raw_df.select(parse_udf("value").alias("data")) \ .select("data.*") \ .filter("data.name IS NOT NULL")
Этот подход даёт максимальную гибкость, но требует написания кода и может быть медленнее встроенного парсера.
Заключение
Мы рассмотрели различные аспекты работы с CSV документами: от простого чтения CSV до выявления некорректных данных. Подведем некоторые итоги. Итак, CSV — это просто, но не тривиально. За кажущейся простотой скрывается множество нюансов: кавычки, экранирование, переносы строк, кодировки.
Явная схема — признак профессионализма. Автоинференс удобен для экспериментов, но в продуктиве всегда задавайте схему явно.
Настройки парсера упрощают жизнь. Опции quote, escape, multiLine и mode позволяют обрабатывать самые «грязные» данные. Parquet — ваш друг. Если данные нужно читать часто, конвертируйте их в Parquet. Это окупится скоростью и местом.
Обрабатывайте ошибки осознанно. Режим PERMISSIVE и анализ битых строк помогут не терять данные и понимать проблемы в источниках.

Ошибки при чтении CSV редко бывают локальной проблемой: чаще это первый сигнал, что в обработке данных не хватает системного подхода. Курс по Spark как раз закрывает этот разрыв: работа со схемами, API Spark, потоками данных, тестированием, наблюдением за приложениями и интеграцией с разными источниками. Будет полезен тем, кто хочет увереннее работать с большими данными в реальных задачах.
Если хотите понять формат обучения и закрыть пробелы в знаниях — записывайтесь на бесплатные уроки от преподавателей курса:
30 марта в 20:00. «Чтение CSV — вход в Spark». Записаться
9 апреля в 20:00. «DataFrame — таблица с оптимизатором». Записаться
23 апреля в 20:00. «Что нового в Spark 4.0». Записаться
