Высокоскоростной Apache Parquet на Python с Apache Arrow

Автор оригинала: Wes McKinney
  • Перевод
Всем салют. Уже на следующей неделе стартуют занятия в новой группе курса «Data Engineer», в связи с этим делимся с вами еще одним интересным переводом.




На протяжении всего прошлого года я работал с сообществом Apache Parquet над созданием parquet-cpp — первоклассной C++ Parquet реализации для чтения/записи файлов, подходящей для использования в Python и других приложениях для работы с данными. Уве Корн и я разработали Python интерфейс и интеграцию с pandas в рамках кодовой базы Python (pyarrow) в Apache Arrow.

Эта статья является продолжением моего стратегического плана на 2017 год.


Дизайн: высокопроизводительные колоночные данные в Python.


C++ библиотеки Apache Arrow и Parquet являются вспомогательными технологиями, которые изначально проектировались нами для согласованной совместной работы.

  • Библиотеки C++ Arrow обеспечивают управление памятью, эффективный ввод/вывод (файлы, memory map, HDFS), контейнеры колоночных массивов в памяти и чрезвычайно быстрый обмен сообщениями (IPC/RPC). Я подробнее коснусь слоя обмена сообщениями Arrow в другой статье.
  • Библиотеки C++ Parquet отвечают за кодирование и декодирование файлового формата Parquet. Мы реализовали libparquet_arrow — библиотеку, которая обрабатывает транзит между данными в памяти Arrow и низкоуровневыми инструментами чтения/записи Parquet.
  • PyArrow предоставляет Python интерфейс для всего этого и обрабатывает быстрые преобразования в pandas.DataFrame.

Одной из основных целей Apache Arrow является создание эффективного межоперационного уровня транспортировки колоночной памяти.

Вы можете почитать о пользовательском API Parquet в кодовой базе PyArrow. Библиотеки доступны в conda-forge по адресу:

conda install pyarrow arrow-cpp parquet-cpp -c conda-forge

Бенчмарки: PyArrow и fastparquet


Чтобы получить представление о производительности PyArrow, я сгенерировал набор данных объемом 512 мегабайт с числовыми данными, которые демонстрируют различные варианты использования Parquet. Я сгенерировал два варианта наборов данных:

  • С высокой энтропией: все значения данных в файле (за исключением нулевых значений) различны. Этот набор данных весит 469 МБ.
  • С низкой энтропией: данные демонстрируют высокую степень повторения. Эти данные кодируются и сжимаются до весьма небольшого размера: всего 23 МБ посредством сжатия Snappy. Если вы создадите файл со словарной кодировкой, он получится еще меньше. Поскольку декодирование таких файлов имеет ограничение по процессору, нежели по операциям ввода-вывода, обычно можно ожидать более высокую пропускную способность для файлов данных с низкой энтропией.

Я создал эти файлы в трех основных используемых стилях сжатия: несжатые, snappy и gzip. Затем я вычисляю физическое время, необходимое для получения pandas DataFrame с диска.

fastparquet — это более новая реализация программы чтения/записи файлов Parquet для пользователей Python, созданная для использования в проекте Dask. Она реализована на Python и использует компилятор Numba Python-to-LLVM для ускорения процедур декодирования Parquet. Я также установил ее, чтобы сравнить с альтернативными реализациями.

Код для чтения файла в качестве pandas.DataFrame аналогичен:

# PyArrow
import pyarrow.parquet as pq
df1 = pq.read_table(path).to_pandas()

# fastparquet
import fastparquet
df2 = fastparquet.ParquetFile(path).to_pandas()

Зеленые столбцы соответствуют времени PyArrow: более длинные столбцы указывают на более высокую производительность/более высокую пропускную способность данных. Аппаратное обеспечение — ноутбук Xeon E3-1505.

Я обновлял эти бенчмарки 1 февраля 2017 года в соответствии с последними кодовыми базами.



Состояние разработки


Нам нужна помощь с виндовыми сборками и упаковкой. Кроме того, поддержание пакетов conda-forge в актуальном состоянии занимает очень много времени. И конечно, мы ищем разработчиков как на C++, так и на Python, для контрибуций в кодовую базу вообще в целом.

До сих пор мы уделяли особое внимание качественной реализации файлового формата с высокой производительностью чтения и записи простых наборов данных. Мы начинаем переходить к обработке вложенных JSON-подобных данных в parquet-cpp, используя Arrow в качестве контейнера для вложенных колоночных данных.

Недавно Уве Корн реализовал поддержку List Arrow в преобразованиях в pandas:

In [9]: arr = pa.from_pylist([[1,2,3], None, [1, 2], [], [4]])

In [10]: arr
Out[10]:
<pyarrow.array.ListArray object at 0x7f562d551818>
[
  [1,
   2,
   3],
  NA,
  [1,
   2],
  [],
  [4]
]

In [11]: arr.type
Out[11]: DataType(list<item: int64>)

In [12]: t = pa.Table.from_arrays([arr], ['col'])

In [13]: t.to_pandas()
Out[13]:
         col
0  [1, 2, 3]
1       None
2     [1, 2]
3         []
4        [4]

Код бенчмарка


import os
import time

import numpy as np
import pandas as pd
from pyarrow.compat import guid
import pyarrow as pa
import pyarrow.parquet as pq
import fastparquet as fp

def generate_floats(n, pct_null, repeats=1):
    nunique = int(n / repeats)
    unique_values = np.random.randn(nunique)

    num_nulls = int(nunique * pct_null)
    null_indices = np.random.choice(nunique, size=num_nulls, replace=False)
    unique_values[null_indices] = np.nan

    return unique_values.repeat(repeats)

DATA_GENERATORS = {
    'float64': generate_floats
}

def generate_data(total_size, ncols, pct_null=0.1, repeats=1, dtype='float64'):
    type_ = np.dtype('float64')
    nrows = total_size / ncols / np.dtype(type_).itemsize

    datagen_func = DATA_GENERATORS[dtype]

    data = {
        'c' + str(i): datagen_func(nrows, pct_null, repeats)
        for i in range(ncols)
    }
    return pd.DataFrame(data)

def write_to_parquet(df, out_path, compression='SNAPPY'):
    arrow_table = pa.Table.from_pandas(df)
    if compression == 'UNCOMPRESSED':
        compression = None
    pq.write_table(arrow_table, out_path, use_dictionary=False,
                   compression=compression)

def read_fastparquet(path):
    return fp.ParquetFile(path).to_pandas()

def read_pyarrow(path, nthreads=1):
    return pq.read_table(path, nthreads=nthreads).to_pandas()

MEGABYTE = 1 << 20
DATA_SIZE = 512 * MEGABYTE
NCOLS = 16

cases = {
    'high_entropy': {
        'pct_null': 0.1,
        'repeats': 1
    },
    'low_entropy': {
        'pct_null': 0.1,
        'repeats': 1000
    }
}

def get_timing(f, path, niter):
    start = time.clock_gettime(time.CLOCK_MONOTONIC)
    for i in range(niter):
        f(path)
    elapsed = time.clock_gettime(time.CLOCK_MONOTONIC) - start
    return elapsed

NITER = 5

results = []

readers = [
    ('fastparquet', lambda path: read_fastparquet(path)),
    ('pyarrow', lambda path: read_pyarrow(path)),
]

case_files = {}

for case, params in cases.items():
    for compression in ['UNCOMPRESSED', 'SNAPPY', 'GZIP']:
        path = '{0}_{1}.parquet'.format(case, compression)
        df = generate_data(DATA_SIZE, NCOLS, **params)
        write_to_parquet(df, path, compression=compression)
        df = None
        case_files[case, compression] = path

for case, params in cases.items():
    for compression in ['UNCOMPRESSED', 'SNAPPY', 'GZIP']:
        path = case_files[case, compression]

        # prime the file cache
        read_pyarrow(path)
        read_pyarrow(path)

        for reader_name, f in readers:
            elapsed = get_timing(f, path, NITER) / NITER
            result = case, compression, reader_name, elapsed
            print(result)
            results.append(result)



Успеть на курс.


OTUS. Онлайн-образование
Цифровые навыки от ведущих экспертов

Комментарии 0

Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

Самое читаемое