В статье описывается использование формата сериализации AVRO в языке python, дается краткое описание AVRO-схемы с поя��нениями наиболее неочевидных моментов, приводятся конкретные примеры кода на python. Намеренно исключены из рассмотрения вопросы эволюции схем (schema evolution), RPC и AVRO-IDL.


Все примеры приводятся с использованием библиотеки fastavro, которую автору пришлось заметно доработать для соответствия спецификации и совместимости с java реализацией.


Историческая справка


Почему AVRO, а не json, bson, msgpack, protobuf, ASN.1, thrift, yaml?


При декомпозиции монолитной системы возникла необходимость описать процедуру взаимодействия между микросервисами. Не долго выбирая между RabbitMQ и Kafka остановились на последней. Но вот над следующей задачей — выборе системы сериализации пришлось попотеть.


При выборе системы сериализации учитывались следующие требования:


  1. Поддержка нескольких языков программирования


    Основа нашей кодовой базы — python 2.7. При чем в дальнейшем хотелось бы чувствительные к производительности процессы перевести на другие языки.


  2. Валидация данных при сериализации


    В динамическом интерпретируемом питоне слишком просто случайно отправить "не те" данные. А в выбранной нами pub-sub модели кафки нам было очень важно обеспечить корректность входных данных в топике. Нужна была система позволяющая типизировать топики кафки.


  3. Поддержка типов


    Наша система активно оперирует типами Decimal, UUID и Datetime. Увы — известные форматы сериализации начиная ASN.1 и заканчивая msgpack в основном описывают сериализацию низкоуровневых типов (int\float\string\bytes) и не предлагают законченных решений для интересующих нас.



Исходя из этих соображений выбор пал на AVRO. Но внезапно оказалось (весна 2017) что не смотря на наличие поддержки логических типов в спецификации и JAVA билиотеки — ни в официальной реализации AVRO для python, ни в конкурирующей fastavto их просто проигнорировали. Пришлось добавлять самостоятельно.


Наиболее адекватный (а также самый быстрый) код оказался у fastavro, в результате было принято решение доработать эту библиотеку. Это стало первым моим опытом участия в opensource.


Что такое AVRO


AVRO это система сериализации данных, созданная в рамках проекта Hadoop. Данные сериализуются бинарный формат с помощью предварительно созданной json-схемы при чем для десериализации также требуется схема (возможно — другая).


Также AVRO позволят в рамках одного контейнера упаковать множество записей заданных единственной схемой, что позволяет эффективно передавать большие объемы данных, избегая свойственные другим форматам накладные расходы.


AVRO-схема


Я не стану подробно описывать правила построения схем, т.к. они изложены в официальной документации
Остановлюсь лишь на базовых вещах и совсем уж не очевидных моментах.


AVRO-схема представляет из себя JSON, описывающий сериализуемую\десериализуемую структуру данных. Типы данных могут быть:


  1. Примитивными
    • null
    • boolean
    • int — знаковое целое 32 бита
    • long — знаковое целое 64 бита
    • float
    • double
    • string — unicode строка
    • bytes — последовательность байт
    • fixed — те же байты, но с длинной заданной в схеме
  2. Составными
    • union — тип-сумма
    • recod — тип-произведение
    • enum — перечисление
    • array — массив/список
    • map — ассоциативный массив
  3. Логическими (в терминологии AVRO)
    • decimal — число с фиксированной точкой
    • date — дата
    • time-millis — время с миллисекундной точностью
    • time-micros — время с микросекундной точностью
    • timestamp-millis — дата-время с миллисекундной точностью
    • timestamp-micros — дата-время с микросекундной точностью
    • uuid — universally unique identifier

Хотя вышеперечисленные логические типы уже давно являются стандартом в реляционных БД и современных языках программирования — библиотеки сериализации обходили их стороной, вынуждая сводить их к примитивным типам, к счастью в AVRO эта проблема решена.


Рассмотрим простую схему:


{
  "type": "record",
  "namespace": "notificator",
  "name": "out",
  "doc": "HTTP нотификация",
  "fields": [
    {
      "doc": "id задания",
      "name": "id",
      "type": "long"
    },
    {
      "name": "datetime",
      "doc": "время постановки задания",
      "type": {
        "type": "long",
        "logicalType": "timestamp-millis"
      }
    },
    {
      "doc": "источник нотификации",
      "name": "source",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "doc": "Метод",
      "name": "method",
      "default": "POST",
      "type": {
        "type": "enum",
        "name": "methods",
        "symbols": [
          "POST",
          "GET",
        ]
      }
    },
    {
      "name": "url",
      "type": "string"
    },
    {
      "name": "headers",
      "type": {
        "type": "map",
        "values": "string"
      }
    },
    {
      "doc": "body",
      "name": "body",
      "type": "bytes"
    }
  ]
}

Схема начинается с объявления типа record с заданными name и namespace. Эти поля в первых строках будут использоваться в системах кодогенерации, не актуальных для питона, т.к. у нас схема будет обрабатываться динамически. Далее идет перечисление типов полей нашей записи.


Особый интерес представляет объявление поля datetime, т.к. оно содержи�� логический тип. Важно помнить, что логические типы следует задавать вложенными в описания типа поля.


неправильно:


{
    "name": "datetime",
    "doc": "время постановки задания",
    "type": "long",
    "logicalType": "timestamp-millis"
},

правильно:


{
    "name": "datetime",
    "doc": "время постановки задания",
    "type": {
        "type": "long",
        "logicalType": "timestamp-millis"
    }
},

Далее идет поле source объявленное как union "type": ["null", "string"], эта запись означает что значение source может быть одного из двух типов null или string. Комбинировать таким образом можно не только примитивные типы, но и составные и логические. Примеры таких комбинаций, а также более сложных схем можно посмотреть тут
Ещё один неочевидный момент связан с default: значение по умолчанию должно быть задано для первого типа в перечислении.


неправильно:


{
      "name": "f",
      "type": ["long", "string"],
      "default": "asd"
    },

правильно:


{
      "name": "f",
      "type": ["string", "long"],
      "default": "asd"
    },

Отдельного упоминания заслуживают логические типы Decimal (число с фиксированной точкой) и UUID.


Decimal требует дополнительных параметров — числа знаков в числе и числа знаков после запятой:


 {
    "name": "money",
    "doc": "16 знаков, 4 после точки",
    "type": {
        "type": "bytes",
        "logicalType": "decimal",
        "precision": 16,
        "scale": 4,
    }
}

А UUID интересен тем, что в спецификации его нет, а реализация его есть. При чём довольно странно сделанная — UUID кодируется строкой.


{
    "name": "uuid",
    "type": {
        "type": "string",
        "logicalType": "uuid"
    }
}

Пришлось добавить в fastavro и такую реализацию.


Примеры работы с fastavro


Как прочитать из контейнера


import fastavro as avro

with open('some-file.avro', 'rb') as fo:
    # можно подменить схему контейнера с помощью reader_schema
    reader = fastavro.reader(fo, reader_schema=None)
    schema = reader.schema

    for record in reader:
        process_record(record)

Как записать в контейнер


from fastavro import writer

schema = {
    'doc': 'A weather reading.',
    'name': 'Weather',
    'namespace': 'test',
    'type': 'record',
    'fields': [
        {'name': 'station', 'type': 'string'},
        {'name': 'time', 'type': 'long'},
        {'name': 'temp', 'type': 'int'},
    ],
}

records = [
    {'station': '011990-99999', 'temp': 0, 'time': 1433269388},
    {'station': '011990-99999', 'temp': 22, 'time': 1433270389},
    {'station': '011990-99999', 'temp': -11, 'time': 1433273379},
    {'station': '012650-99999', 'temp': 111, 'time': 1433275478},
]

with open('weather.avro', 'wb') as out:
    writer(out, schema, records)

Как сериализовать и десериализовать данные вне контейнера


Используется при передачи данных как сообщений.


from io import BytesIO
import fastavro

def serialize(schema, data):
    bytes_writer = BytesIO()
    fastavro.schemaless_writer(bytes_writer, schema, data)
    return bytes_writer.getvalue()

def deserialize(schema, binary):
    bytes_writer = BytesIO()
    bytes_writer.write(binary)
    bytes_writer.seek(0)

    data = fastavro.schemaless_reader(bytes_writer, schema)
    return data

Как подавить чтение логических типов


import fastavro
fastavro._reader.LOGICAL_READERS['long-timestamp-millis'] = lambda d, w, r: d

Теперь логический тип timestamp-millis будет читаться не в Datetime питона, а в long.


Как заранее прочитать схему


В fastavro предусмотрена функция acquaint_schema, которая считывает схему во внутренний репозиторий (бывают ещё и внешение, но это отдельная история).


Имея схему


{
  "name": "decimal_18_6",
  "namespace": "types",
  "type": "bytes",
  "logicalType": "decimal",
  "precision": 18,
  "scale": 6
}

и загрузив её с помощью acquaint_schema можно в дальнейшем использовать краткое описание типов :


"fields": [
    {
        "name": "money1",
        "type": "types.decimal_18_6"
    },
    {
        "name": "money2",
        "type": "types.decimal_18_6"
    },
]

Обратите внимание — имя типа при обращении включает его namespace types.decimal_18_6


Также это необходимо в отдельных не тривиальных случаях