
Команда VK Cloud перевела статью о правилах оформления кода в PySpark. Они не обязательны для исполнения, но помогут сделать ваш код более читабельным и удобным для последующих проверок и изменений.
Импортируйте в переменную
Это помогает не загрязнять глобальное пространство имен и избегать конфликтов методов из разных пакетов.
from pyspark.sql import functions as F from pyspark.sql import types as T
Рефракторьте сложные логические операции
В сложных запросах трудно разобраться. Лучше разбить их на логические группы и потом объединить.
# Bad F.when( (F.col('prod_status') == 'Delivered') | (((F.datediff('deliveryDate_actual', 'current_date') < 0) & ((F.col('currentRegistration') != '') | ((F.datediff('deliveryDate_actual', 'current_date') < 0) & ((F.col('originalOperator') != '') | (F.col('currentOperator') != '')))))), 'In Service') # Good has_operator = ((F.col('originalOperator') != '') | (F.col('currentOperator') != '')) delivery_date_passed = (F.datediff('deliveryDate_actual', 'current_date') < 0) has_registration = (F.col('currentRegistration').rlike('.+')) is_delivered = (F.col('prod_status') == 'Delivered') is_active = (has_registration | has_operator) F.when(is_delivered | (delivery_date_passed & is_active), 'In Service')
В примере выше мы сгруппировали условие оператора в единое целое. Здесь же мы сначала отделили друг от друга и потом объединили статус доставки, регистрацию, активность и другие условия. В этом случае выражение
F.when получается лаконичным.Откажитесь от withColumnRenamed
Если возможно, используйте alias, а не
withColumnRenamed.#bad df.select('key', 'comments').withColumnRenamed('comments', 'num_comments') # good df.select('key', F.col('comments').alias('num_comments'))
Откажитесь от withColumn
В некоторых случаях для преобразования типов можно отказаться от метода
withColumn и использовать select, а потом cast.# bad df.select('comments').withColumn('comments', F.col('comments').cast('double')) # good df.select(F.col('comments').cast('double'))
Используйте lit(None)
Лучше добавить
None, чем оставить поле пустым или указать NA. Тогда мы можем использовать утилиты вроде isNull вместо проверки пустых строк, нулевых значений, 'NA' и т. п.# bad df = df.withColumn('foo', F.lit('')) df = df.withColumn('foo', F.lit('NA')) # good df = df.withColumn('foo', F.lit(None))
Используйте явные операции join
# bad flights = flights.join(aircraft, 'aircraft_id') flights = flights.join(aircraft, 'aircraft_id', 'inner') # good flights = flights.join(aircraft, 'aircraft_id', how='inner')
Избегайте right-операций join
# bad flights = aircraft.join(flights, 'aircraft_id', how='right') # good flights = flights.join(aircraft, 'aircraft_id', how='left')
Разберитесь с похожими столбцами
import pyspark.pandas as ps psdf = ps.DataFrame({'a': [1, 2], 'A':[3, 4]}) psdf.columns = ["a", "a"] Reference 'a' is ambiguous, could be: a, a.
- При соединении таблиц заранее избавляйтесь от похожих столбцов, если в DF есть столбцы с одинаковыми именами.
- Если нам нужны оба столбца, лучше всего переименовать один из них до соединения.
- Разбираться с проблемными столбцами всегда нужно до формирования датасета. После окончания преобразования вы уже не сможете их различить.
- А еще не стоит задавать имена столбцов, чувствительные к регистру.
Используйте круглые скобки, чтобы избежать \ (явных разрывов строки)
# bad df = df.filter(F.col('event') == 'executing')\ .filter(F.col('has_tests') == True)\ .drop('has_tests') # good df = ( df .filter(F.col('event') == 'executing') .filter(F.col('has_tests') == True) .drop('has_tests') )
Не соединяйте преобразования разных типов в цепочку
# bad df = ( df .select('a', 'b', 'c', 'key') .filter(F.col('a') == 'truthiness') .withColumn('boverc', F.col('b') / F.col('c')) .join(df2, 'key', how='inner') .join(df3, 'key', how='left') .drop('c') ) # better (separating into steps) # first: we select and trim down the data that we need # second: we create the columns that we need to have # third: joining with other dataframes df = ( df .select('a', 'b', 'c', 'key') .filter(F.col('a') == 'truthiness') ) df = df.withColumn('boverc', F.col('b') / F.col('c')) df = ( df .join(df2, 'key', how='inner') .join(df3, 'key', how='left') .drop('c') )
Задавайте конкретные условия при обработке и упорядочивании значений Null
ignorenulls=True F.asc_nulls_last('num')
Избегайте пустой функции PartitionBy
Не должно быть пустых окон, поскольку из-за них Spark объединяет все данные в одну партицию и это может чрезвычайно снизить производительность.
# bad w = W.partitionBy() df = df.select(F.sum('num').over(w).alias('sum')) # good df = df.agg(F.sum('num').alias('sum')) <h2>Оконная функция |. Используйте либо фреймы row, либо фреймы range</h2> # bad w1 = W.partitionBy('key') w2 = W.partitionBy('key').orderBy('num') # good w1 = W.partitionBy('key').orderBy('num').rowsBetween(W.unboundedPreceding, 0) w2 = W.partitionBy('key').orderBy('num').rowsBetween(W.unboundedPreceding, W.unboundedFollowing)
Select для указания схемы
Используйте Select как операцию очистки для подготовки датафрейма, который будет использован:
- в начале преобразования,
- перед выводом результата.
Другие рекомендации
- Не включайте избыточные или ненужные столбцы в инструкцию
select.
- Не используйте литеральные строки или целые числа в условиях фильтров, в новых значениях столбцов и т. п. Вместо этого присвойте их информативной переменной.
- Каждый раз, когда сценарием использования предусмотрено кэширование всех данных, рекомендуется использовать
df.cache.count(). После завершения использования объекта выполнитеunpersist.
- По возможности не используйте циклы для датасетов.
- Отделяйте файл конфигурации Spark от кода.
- Отделяйте конфигурацию Spark Job с помощью файла .ini.
- Используйте класс
JobContext, чтобы определять аккумуляторы, счетчики, общие данные и т. п.
- Отделяйте загрузку и сохранение данных от любой тематической или бизнес-логики.
- Не используйте функцию
printдля отладки, используйте логи и логирование.
Сейчас наша команда готовит к релизу облачный сервис Spark в K8s, подписывайтесь на телеграм-канал «Данные на стероидах», чтобы не пропустить анонс!
