Pull to refresh

Миллион записей для змеи

Reading time14 min
Views6.8K

Загрузить миллион записей в питон за секунду?
Нет. Получилось еще быстрее!

У меня есть небольшое хобби - я экспериментирую с машинным обучением применительно к торговле на бирже, в частности, с криптовалютами. После различных наколенных экспериментов я захотел создать удобный инструмент - базу торговых котировок. В процессе работы необходима быстрая загрузка достаточно большого количества данных. Это необходимо для расчетов, генерации данных для обучения, бэк-тестинга и других задач. Количество записей, которые нужно загрузить в питон довольно велико - речь может идти о миллионах и более записей.

Сами котировки представляют собой простую табличку с шестью колонками:

               time     open     high      low    close       volume
2022-07-01 00:00:00  1071.02  1117.00  1050.46  1054.52  430646.8720
2022-07-01 04:00:00  1054.52  1076.43  1045.41  1066.81  275557.9328
2022-07-01 08:00:00  1066.81  1086.44  1033.44  1050.22  252105.5665
2022-07-01 12:00:00  1050.21  1074.23  1043.00  1056.86  298465.0695
2022-07-01 16:00:00  1056.86  1083.10  1054.82  1067.91  158796.2248

Сами записи по размеру небольшие. Хранятся они в 64-разрядных значениях, одна запись получается 6*8 = 48 байт.

Но их много...

Масштабы бедствия

Записи хранятся с определенной периодичностью во времени, которая называется таймфрейм. Таймфреймы могут быть очень разные. Наиболее часто используются от одного дня до одной минуты. Соответственно, число записей в день - от одной записи до 1440 (60*24). Для расчета могут потребоваться данные и за год, и за два. За год на минутках получается 525600 записей (60*24*365). За пару лет уже больше миллиона.
А еще ведь есть и таймфрейм 1 секунда - это более 30 миллионов записей за год. Но и это не все, так как есть еще тики, которых в одних сутках может быть 5 миллионов!

Постановка задачи

Ладно, на тики смотреть пока рано. Надо начать с чего-то попроще. Пусть это будет минутный таймфрейм. И пусть записей будет миллион - это чуть меньше двух лет. Эти записи должны быть загружены в память достаточно быстро. В память - это массивы numpy. Минута, в принципе приемлемо.

Но если думать еще хотя бы о секундном таймфрейме, то минутки должны грузится не более нескольких секунд. Значит, надо загрузить миллион записей за несколько секунд. Железо, конечно, обычное, не серверное.

Формат хранения - float64. Возможно, кто то возразит, что цены надо хранить в decimal. Нет, в данном случае - именно float. Потому что нужно хранить именно в том формате, в котором это будет использовано. Конвертация туда-сюда - непозволительная роскошь.

Предмет торговли на бирже называется символ или торговый инструмент. Выборка осуществляется по символу и периоду. Получается, задаются три параметра - символ, начало периода, и конец. Никаких других выборок делать не нужно.

Теперь про запись в базу. Записи о котировках прилетают и сохраняются только один раз, обновление и удаление не нужно. Записывать лучше всего блоками по одному дню. В общем случае порядок прихода блоков (дней) произвольный.

Наверное, что бы было понятнее, стоит привести пример.

Допустим, база пустая, а я написал некий расчет и он затребовал данные по биткойну за октябрь этого года. Делаем загрузку этих данных из API биржи, отдаем в расчет и сохраняем в базу котировок.
Следующий запрос на данные может быть с 1 сентября по 30 ноября. В этом случае надо загрузить сентябрь и ноябрь, а октябрь прочитать из базы, он уже у нас есть.
Ну и так далее. Чего нет - запрашиваем и сохраняем, а что есть - извлекаем из базы.
И последнее. Данные могут понадобится по большому количеству торговых инструментов, следовательно они должны храниться на диске максимально компактно, в сжатом виде.

Варианты решения

Думаю, первая мысль, когда видишь табличку с данными - это взять какую-нибудь реляционную базу данных, загрузить туда - и вперед. Но увы, нет. Этот метод я уже попробовал давно, много лет назад. Это слишком медленно. СУБД предоставляет огромный (и ненужный) функционал, за который придется неизбежно платить. Платить ресурсами процессора, а это значит, все будет медленнее, чем может быть. Блокировки не нужны, индекс нужен только по символу и дате, изоляции транзакций не нужны, многопользовательская работа не нужна. И еще СУБД почти всегда имеет много бинарного кода, который может быть зависимым от типа ОС и часто требует отдельной установки. Хотя, если кто-то знает простую платформонезависимую реляционную СУБД, которая сама ставится по зависимости в PYPI, и которая может загрузить миллион записей в питон за несколько секунд - пишите в комментариях, это интересно!

Идем дальше. Не реляционные СУБД. Признаюсь, с современными не реляционными СУБД я дело не имел. Если есть такая, которая подходит под указанные требования - тоже обязательно напишите. Но у меня так же есть подозрения о их избыточности по отношению к задаче, поэтому я оставил этот вариант на потом.

CSV и подобные форматы. Отпадает, очень большие расходы на конвертацию текста в машинный формат. Как уже писал - хранить надо в том формате, в котором будет использоваться.

Что еще остается? Я пошел от железа, то есть, от хранения данных на диске и загрузке их в память. Тут возможны самые быстрые варианты. Что может быть быстрее, чем просто взять и прочитать кусок файла прямо в память, сразу в numpy array? Если бы я программировал на C/C++ или Rust, я бы так и сделал. Открыл файл, получил дескриптор - чтение, и вуаля, данные прилетели сразу куда нужно. Ах, да, еще же распаковка нужна. Но это тоже решаемо. Но у нас питон, тут надо искать что-то другое.

Кстати, а почему питон?

Чуть выше я упоминал про машинное обучение, и это более чем достаточная причина.

Но и без машинного обучения я бы выбрал питон. Вообще, у программистов C++, Java, C# часто есть снисходительное отношение к интерпретируемым языкам. Каюсь, я сам был в этом лагере. Но именно питон изменил это отношение. Меня поразила его какое-то дружелюбие и огромная скорость разработки. То, что на C++ надо писать неделю, на питоне часто можно сделать за день.

Да, скорость исполнения кода на нем уступает компилируемым языкам, но это в значительной степени компенсируется огромным числом очень быстрых оптимизированных библиотек. А есть еще PyPy, Jython, IronPython, главное, numba - я использую именно ее. Все эти варианты - уже не интерпретаторы и сравниваются со скоростью с компилируемыми языками и JIT.

Вообще, я считаю, что язык нужно выбирать исходя из решаемой задачи. И для для задач, связанных с машинным обучением и экспериментами с торговлей на бирже (не HFT) питон подходит как нельзя лучше.

В общем, я выбрал последний путь - свой бинарный формат файла. Расскажу, как создавалось хранилище, его структуру. Если все это не интересно, а интересен только обещанный миллион - то перемотайте ближе к концу, он там есть, и даже больше :)

А мы пройдемся по версиям.

Первая версия

Первую версию хранилища котировок я написал с использованием construct. Раньше я уже пользовался этой библиотекой. Это достаточно популярная библиотека и я был уверен, что она работает быстро и хорошо. Она делала то, что нужно - читала последовательность байт и записывала в массивы. Это должно было быть быстро, но я работал с большими таймфреймами и не проверял на большом количестве записей.

Структура файла была проста - сигнатура, версия формата, затем количество записей (баров) и далее шесть серий данных. Вот как описывается это с помощью construct:

import construct as cs

# сигнатура и версия
def get_file_signature_struct():
  return cs.Struct('signature' / cs.Const(BLOCK_FILE_SIGNATURE), 'file_version' / cs.Int16ub)

# заголовок
def get_header_struct_v1():
  return cs.Struct(
    'n_bars' / cs.Int
  )

# данные
def get_file_data_struct(n_bars):
  return cs.Struct(
  
      'time' / cs.Long[n_bars],
      'open' / cs.Double[n_bars],
      'high' / cs.Double[n_bars],
      'low' / cs.Double[n_bars],
      'close' / cs.Double[n_bars],
      'volume' / cs.Double[n_bars]
  )   


Почему это разбито на три части? Потому что приходится так частями читать. В начале нужно прочитать сигнатуру и версию типа файла. Потом читаем сжатый блок - в начале заголовок, потом данные.

Вот код чтения файла котировок первой версии. Полный исходный код библиотеки (live_trading_indicators) можно найти на гитхабе, ссылка есть в конце статьи.

    def load_from_cache(self, file_name, symbol, timeframe):

        with open(file_name, 'rb') as file:

            signature_and_version = self.parse_signature_and_version(file)
            if signature_and_version.signature != BLOCK_FILE_SIGNATURE:
                raise LTIException('Bad data cash file')

            buf = zlib.decompress(file.read())
            header_len, header = self.parse_header(buf, signature_and_version.file_version)
            data_struct = self.get_file_data_struct(header.n_bars)
            file_data = data_struct.parse(buf[header_len:])

        return OHLCV_day({
            'symbol': symbol,
            'timeframe': timeframe,
            'is_incomplete_day': False,
            'time': np.array(file_data.time, dtype=np.int64).astype(TIME_TYPE),
            'open': np.array(file_data.open, dtype=PRICE_TYPE),
            'high': np.array(file_data.high, dtype=PRICE_TYPE),
            'low': np.array(file_data.low, dtype=PRICE_TYPE),
            'close': np.array(file_data.close, dtype=PRICE_TYPE),
            'volume': np.array(file_data.volume, dtype=VOLUME_TYPE)
        })

Как видно по коду, после construct данные упаковываются zlib. Каждый день записывался в отдельный файл. Файлы получались достаточно компактными.

Вот картинка с дампом одного файла с таймфреймом 6 часов. У него можно прочитать только первые 5 несжатых байт - это сигнатура LTI и два байта - номер версии формата. Далее идет блок сжатых данных. Но и в сжатых данных все просто - там лежит количество записей и далее 6 массивов чисел - время, открытие, максимум, закрытие и объем.

Вообще, насчет упаковки были разные идеи

Кончено, самое простое - это использовать какой-то универсальный упаковщик, типа zlib.

Но самая эффективная упаковка обычно та, которая адаптирована под тип сжимаемых данных.

Самое первое, что приходит в голову при сжатии ценовых котировок - это хранить не значение цены, а дельту - изменение с прошлого значения. Диапазон этих значений намного меньше.

Теперь подумаем, как их можно хранить. Можно, например, перевести цены из float в int. Это несложно, число знаков после запятой определено заранее. Тогда на хранение дельты можно отвести гораздо меньшее число бит.

Но есть нюанс, который все портит. Дельты в среднем невелики, однако могут быть сильные выбросы при резком движении цены. Из-за этого нужно на дельту отводить много бит, и это губит всю такую упаковку. Вот если бы на дельту можно было отводить разное число бит, причем частые значения кодировать короткими последовательностями.... стоп-стоп-стоп... все уже украдено придумано до нас! Мы же сейчас пытаемся изобрести кодирование Хаффмана!

Кодирование Хаффмана бывает статическим и динамическим. Здесь, конечно, больше подойдет динамическое.

Вообще, классические алгоритмы сжатия обычно как раз и устроены на комбинации двух алгоритмов - коде Хаффмана и Лемпеля-Зива-Велча (LZW). Код Хаффмана будет точно эффективен для сжатия дельты цены, а вот насчет LZW - вопрос. Все зависит от статистики повторяющихся последовательностей по дельтам. Так что, есть шансы, что сжатие дельт динамическим Хаффманом окажется более эффективным, чем обычные библиотеки сжатия. Попробовать, конечно интересно, но победила лень и желание быстрого результата, и я просто написал "import zlib" :)

Имя файла формировалось по шаблону. Проверка наличия данных и их загрузка были просты - ищем файлы с соответствующими именами за период и читаем их. Что не нашли - грузим с биржи, попутно записывая в файлы.

Идея такой организации хранилища была навеяна хранением файлов на FTP binance. Помимо API у биржи есть еще FTP хранилище файлов, в формате csv. Но в это хранилище лучше не лезть, мне кажется, там работают стажеры :)

Про стажеров в binance

В этом файловом хранилище какая-то разруха. Во первых, данные там появляются с запозданием, причем разным для фьючерсов и спота. И эти данные довольно низкого качества. Часть из них с заголовками, часть - без. И в них есть пробелы. Иногда может не доставать половины дня. Оно не заброшено, данные выкладываются, и ошибки в старых файлах иногда исправляются. Ведется даже файл с историей этих исправлений.

Самое интересное, эта же binance одновременно предоставляет эти же данные через HTTP API, который работает замечательно, и ошибок там гораздо меньше.

Любой мало-мальски опытный программист на питоне может написать за полдня скриптик, которые из этого API достанет постепенно все данные, и сформирует хранилище FTP гораздо лучшего качества. Но этого не делается. Поэтому мне все время кажется, что эти файлы формируют стажеры из какого-то особого отдела binance, которые учатся записывать в CSV формат, и у них это плохо получается. По другому все это я объяснить никак не могу :)

Через какое-то время файлов стало огромное количество. Допустим, 10 инструментов, 10 таймфреймов, 365 дней - уже 36500 файлов. Причем, например, на таймфрейме 1 день, в файле хранится всего одна строка. А на диске занимает, скорее всего целый кластер. Да и открытие и закрытие файлов наверняка имеет накладные расходы.
Стало ясно, что нужно группировать в один файл много дней. И должна быть возможность частичного заполнения файла, так как неизвестно, когда какие дни придут.

Вторая версия

Я решил сохранить осмысленную для человека группировку дней в файлах. Файлы, в котором либо день, либо месяц, либо год.
Для всех таймфреймов от одного часа и выше - один файл на год, до часа (от минуты) - один файл на месяц. Для таймфреймов меньше минуты - по прежнему один день в файле.

Что бы выбирать нужный день из файла, необходим какой-то индекс, по которому этот день можно найти в файле.
Максимальное количество дней в файле известно и относительно невелико. Поэтому я сделал просто таблицу указателей на сжатый блок данных серий. В этой таблице храниться смещение в файле и длина блока.

При такой структуре алгоритм записи дня в файл довольно прост. Берем размер файла, это будет будущий указатель на данные. Дописываем сжатый запакованный день в конец. Сбрасываем буферы файла на диск. Записываем указатель в таблицу, записываем таблицу, снова сбрасываем буферы на диск. Двойной сброс необходим для осуществления транзакционной целостности - в начале записываются данные, потом указатель на них.

Структуру упакованных данных дня приводить в коде construct не буду - она предельно проста. В начале идет количество записей (int32, 4 байта), затем 6 серий данных так же, как в первой версии.

Ниже привожу описание в construct таблицы указателей. Для каждого блока есть смещение в файле и длина. Я сохранил это в двух разных массивах, полагая, что их чтение будет быстрее. (Позже станет ясно, что я ошибался.)

def get_allocate_table_struct(n_blocks):
    assert type(n_blocks) == int
    return cs.Struct(
        'block_offset' / cs.Int32sb[n_blocks],
        'block_length' / cs.Int32sb[n_blocks]
    )

Вот и весь формат, на текущий момент это актуальная версия.

Самое интересное - тестирование


Для тестирования загрузил цены на эфириум на таймфрейме 1 минута с 1 января 2021 по 15 декабря 2022. Получилось чуть больше миллиона записей - 1026720.
Используемое железо - 32 Гб памяти со стареньким, но довольно бодрым 4-х ядерным процессором i7 4.5 ГГц. Диск SSD по SATA, но тут диск не так важен, так как на диске этот упакованный миллион минуток занял всего около 19 мегабайт. В памяти после распаковки размер составляет 49 мегабайт. Код бенчмарка:

import timeit
import src.live_trading_indicators as lti


def bench_test():
    indicators = lti.Indicators('binance')
    ohlcv = indicators.OHLCV('um/ethusdt', '1m', '2021-01-01', '2022-12-14')

number = 1
time = timeit.timeit('bench_test()', setup='from __main__ import bench_test', number=number) / number
print(f'{time} seconds')

И результат.

5.341277683968656 seconds

Ну что сказать... На минутках работать с таким быстродействием вполне можно. Но на секундном таймфрейме это будет уже более 5 минут, что довольно печально. О тиках с таким быстродействием можно забыть.

Запускаем профилирование. Вот что получается.

Все очень печально...
Все очень печально...

Ну и ну! Этого я совсем не ожидал. Тут видно, что construct парсила массивы поэлементно, кодом на питоне. Наш миллион с небольшим записей с 6-ю колонками превратился в 6 миллионов вызовов. Даже чтение выполнялось поэлементно. Грусть и печаль. Надо совсем не это, надо что-то, что бы просто взяло кусок памяти и положило в массив.

Отдельно отметил на скрине распаковку данных, она заняла всего около 1% времени.

В общем, я стал искать. Нашел интересные вещи. Например, для numpy есть прямое отображение массива в память - numpy.memmap. Причем используется тот же системный механизм, что и для свопинга. Это должно быть очень быстрым, но нам не подходит - сжатие сделать невозможно.

Есть еще такая штука, как Zarr. Это уже ближе к теме. Скорее всего, тут можно было бы как-то применить Zarr.

Но все оказалось проще! Прямо в numpy я нашел именно то, что нужно - встречаем numpy.frombuffer. Это именно заполнение массива из буфера в памяти. Там задается тип, он, например определяет размер, порядок следования байт (big-endian, little-endian), наличие знака. Можно попробовать взять тип из construct, он заявлен как совместимый.

Ставлю точку останова, где происходит парсинг, беру тип из construct, вызываю numpy.frombuffer, сравниваю с результатами costruct, и получаю полное совпадение! Не понадобится менять даже формат файла, просто оптимизация на уровне кода.

Код загрузки данных получился, правда, не такой изящный как с construct:

        n_bars = block_header.n_bars

        time_type = np.dtype('>u8')
        float_type = np.dtype('>f8')
        series_data_size = n_bars * float_type.itemsize

        point = block_header_struct.sizeof()
        time = np.frombuffer(bar_saved_data, time_type, n_bars, offset=point)

        point += n_bars * time_type.itemsize
        open = np.frombuffer(bar_saved_data, float_type, n_bars, offset=point)

        point += series_data_size
        high = np.frombuffer(bar_saved_data, float_type, n_bars, offset=point)

        point += series_data_size
        low = np.frombuffer(bar_saved_data, float_type, n_bars, offset=point)

        point += series_data_size
        close = np.frombuffer(bar_saved_data, float_type, n_bars, offset=point)

        point += series_data_size
        volume = np.frombuffer(bar_saved_data, float_type, n_bars, offset=point)

Запускаем бенчмарк. Результат:

0.2896657044999301 seconds

Треть секунды! Ускорение в 18 раз. В 18 раз, Карл! Я даже перепроверил - не сломалось ли чего, действительно ли приходят данные. Да, приходят, все в порядке.

Смотрим профайлинг.

Это уже радует
Это уже радует

Все намного лучше. Видно, что на первое место теперь вырвалась zlib, около 30% всего времени. В прошлый раз у нее был 1%. На втором месте - создание массива numpy с результатом 3.8 %.

По большому счету, мы уперлись в распаковку и скорость интерпретатора питона. Причем, если увеличить количество данных, то пропорционально возрастет первая строка, может быть вторая. Остальные не должны расти пропорционально данным, количество выполняемого кода останется прежним. Это радует. Значит, надо попробовать секундный таймфрейм!

Пришлось подождать. Секунды за год у меня загружались с binance часов 10. На диске они заняли 447 мегабайт. Если кто-то захочет повторить этот эксперимент, то надо загружать спот, на фьючерсах секундный таймфрейм не доступен.

Посмотрим результат и количество записей.

<OHLCV data> symbol: ethusdt, timeframe: 1s
date: 2021-01-01T00:00 - 2021-12-31T23:59 (length: 31536000) 
Values: time, open, high, low, close, volume
4.66771782503929 seconds

31.5 миллион записей, меньше 5 секунд. Количество записей возросло в 30 раз, а время - примерно в 15 раз. С такими результатами вполне можно работать с секундным таймфреймом. И в перспективе это открывает возможность работать с тиками.

В памяти это должно занять 31536000*48 = 1.514 Гб. Реально же потребление памяти возрастает примерно на 2.7 Гб. Откуда разница - не знаю, не исследовал этот вопрос. Насколько я знаю, массивы numpy - это просто кусок памяти, заполненный данными определенного типа, ничего лишнего в них нет. Возможно, мусорщик не успевает убрать все данные после загрузки.

Снова посмотрим профайлинг.

4 операции, которые растут с ростом данных
4 операции, которые растут с ростом данных

zlib уже 75%. И на 4-е место вылезло чтение из файла с 1.3%. Собственно, это и есть те 4 операции, которые растут с ростом количества данных. Доля остальных, как видим, только падает.

Что еще можно сделать?

Первое, что приходит в голову - поискать более быструю библиотеку распаковки. Скорее всего, она будет упаковывать хуже. К сожалению, распаковка это такая операция, которая практически не распараллеливается по ядрам процессора. Запуск htop это подтвердил - в основном нагружается одно ядро процессора. Распаковщик обрабатывает поток данных последовательно и имеет контекст, который зависит от предыдущих операций. Нельзя разделить запакованные данные и отдать их разным ядрам, если только они не запакованы изначально в несколько частей.

Второе. Загружать дни параллельно, в несколько потоков. Вот это реально может сработать. Никаких препятствий для параллельной загрузки дней нет. Диск нас не тормозит. Скорее всего так и сделаю, если захочу поработать с тиками.

А питон сможет хоть что-то сделать с 31 лям записей?

И мне тоже стало интересно, как отработают алгоритмы, которые я написал до этого. Вообще, при их написании я уделял внимание производительности. Где-то использовал numpy, где нужно - numba. Но такого количества данных еще у меня не было. Значит, наступил момент истины! :)

Пишем бенч. Я взял три индикатора. Первый - это скользящая средняя (SMA). Она написана без нумбы, использует numpy.convolve.

Второй - это экспоненциальная скользящая средняя. Формула ее расчета очень простая, но нужно пробежать последовательно по каждому значению. Код ее расчета я написал на чистом питоне, но разогнал нумбой. Посмотрим, как она справится.

Третий - это Stochastic. Он чуть сложнее, там делается расчет двух средних, потом результат обрабатывается кодом на питоне. В цикле там так же необходимо пробежаться по каждому значению. Конечно, тоже используется numba.

Все исходные коды индикаторов есть на гитхабе.

Код бенча немного другой, что бы в расчет индикаторов не вошло время чтения котировок:

import timeit
import src.live_trading_indicators as lti

indicators = lti.Indicators('binance', 20210101, 20211231)
ohlcv = indicators.OHLCV('ethusdt', '1s')


time = timeit.timeit('print(indicators.MA("ethusdt", "1s", period=22))', number=1, globals={'indicators': indicators})
print(f'MA {time} seconds\n')

time = timeit.timeit('print(indicators.EMA("ethusdt", "1s", period=22))', number=1, globals={'indicators': indicators})
print(f'EMA {time} seconds\n')

time = timeit.timeit('print(indicators.Stochastic("ethusdt", "1s", period=15, period_d=22))', number=1, globals={'indicators': indicators})
print(f'Stochastic {time} seconds\n')

Результаты. Ну а выводы делайте сами.

<IndicatorData> name: SMA, symbol: ethusdt, timeframe: 1s, allowed nan
date: 2021-01-01T00:00 - 2021-12-31T23:59 (length: 31536000) 
Values: time, move_average
MA 0.5642972170026042 seconds

<IndicatorData> name: EMA, symbol: ethusdt, timeframe: 1s, allowed nan
date: 2021-01-01T00:00 - 2021-12-31T23:59 (length: 31536000) 
Values: time, ema
EMA 0.1669687769608572 seconds

<IndicatorData> name: Stochastic, symbol: ethusdt, timeframe: 1s, allowed nan
date: 2021-01-01T00:00 - 2021-12-31T23:59 (length: 31536000) 
Values: time, value_d, value_k, oscillator
Stochastic 1.9372139129554853 seconds

В общем, construct меня огорчил. А вот numpy порадовал. Если он что-то умеет делать, то обычно делает это хорошо.

Все описанные эксперименты можно повторить самостоятельно, взяв исходники тут: live_trading_indicators

И, конечно же, критика и идеи в комментариях приветствуются!

Tags:
Hubs:
Total votes 16: ↑15 and ↓1+14
Comments40

Articles