В рамках данной статьи будет рассмотрено построение конвейера машинного обучения для классификации рукописных цифр из базы данных MNIST с использованием фреймворка TensorFlow и TFX, а также мониторинг процесса обучения с помощью TensorBoard и выпуск модели с автоматической генерацией к ней API с помощью TensorFlow Model Server.
Основная цель данной статьи заключается в восполнении информационного пробела по теме использования TFX в контексте решения задачи классификации рукописных цифр MNIST, которая уже была представлена TensorFlow в виде одноименного репозитория tfx/tfx/examples/mnist.

Содержание
Введение
Написать данную статью меня мотивировало отсутствие хорошей документации по примеру использования конвейера машинного обучения в контексте задачи классификации рукописных цифр из набора данных MNIST. Большинство примеров предоставляют общую информацию об использовании TensorFlow без конвейерного механизма. Официальной же документации по данному примеру, кроме файла README.md в репозитории, нет. Официальный пример (на который можно найти ссылку в официальной документации), в контексте данной статьи, будет именоваться MNIST TFX.
Во время воспроизведения элементов MNIST TFX в виде программного кода, я столкнулся с большим числом неопределённостей и проблем, которые были связаны с недостатками примера из репозитория и отсутствием поясняющей информации по ключевым элементам его элементам.
Примечание: весь программный код данной статьи следует выполнять в операционной системе Linux, поскольку поддержка ОС Windows для TensorFlow имеет ограничения. Если у вас стоит ОС Windows, то удобнее всего использовать WSL2, поскольку так можно использовать ядро Linux и иметь доступ к видеокарте без проблем, которые встречаются в VirtualBox.
Также предполагается, что читатель понимает как исправить ошибки, связанные с отсутствующими модулями TensorFlow или Python (т.е. на сообщение "package not found" читатель воспользуется пакетным менеджером pip или apt самостоятельно).
Рекомендация: прежде чем продолжать писать код после подключаемых библиотек, следует запустить через интерпретатор данный файл чтобы понять какие библиотеки не установлены или с какими зависимостями есть проблемы. Это значительно облегчит поиск ошибок, если таковые будут.
Формат данных TFRecord
Прежде чем построить конвейер машинного обучения, необходимо определиться с тем, каким образом данные для обучения и валидации будут переданы в этот самый конвейер и в каком формате будут эти данные.
В описании официального примера MNIST TFX присутствует следующее пояснение:
The dataset included in this example consists of a selection of 1000 records from the MNIST dataset, converted to tfrecord format. Each record is a tf.Example with 2 columns of data: 'image_floats' representing the 28x28 image as 784 float values in the range [-0.5, 0.5], and 'image_class' representing the label with values [0-9] corresponding to decimal digit in the image.
Т.е. данный пример использует заранее подготовленный набор данных /data/mnist.tfrecord, который включает в себя 1000 записей и содержит 2 колонки - image_floats (данные каждого изображения) и image_class (класс цифры - от 0 до 9). Причём значения в колонке image_floats нормализованы определённым образом - все значения находятся в диапазоне от -0.5 до 0.5 включительно.
Однако авторы не указывают на то, каким образом (с помощью какого алгоритма) был получен данный файл. Ведь 1000 записей из набора данных MNIST для обучения нейронной сети распознаванию рукописных цифр это очень мало. В MNIST TFX присутствует только ссылка на туториал по использованию TFRecord без привязки к конкретно данному примеру.
Невозможно хорошо обучить нейронную сеть такой классификации с таким количеством изображений, если только это не мультимодальный предобученный трансформер, но там и обучать уже ничего не нужно - всё из под коробки уже в готовом виде.
Появляется задача сформировать свой tfrecord-файл, который будет содержать значительно больше данных для обучения, чем представленный в примере. Как это сделать?
Для начала разберёмся что же такое формат TFRecord.
TFRecord - это формат, который предназначен для хранения последовательностей двоичных записей. Более подробно об этом формате можно почитать в официальной документации.
Данный формат является необязательным, однако именно его использует MNIST TFX и не вижу смысла его не использовать, поскольку он удобен и позволяет компактно упаковать используемый набор данных и переносить его между различными платформами.
Теперь разберёмся с тем, каким образом можно собрать свой tfrecord-файл, который будет содержать набор данных MNIST состоящий из 60000 записей (тренировочную выборку без тестовой).
Для начала импортируем все необходимые библиотеки и включим выполнение операций в режиме "eager execution" (выполнение операций без ожидания построения графа):
# Импорт необходимых библиотек
import tensorflow as tf
import numpy as np
# Включаем eager execution
tf.executing_eagerly()
tf.config.run_functions_eagerly(True)
Теперь импортируем набор данных MNIST с приведением значений к типу float32, а меток к типу int64:
# Загрузка набора данных MNIST и его нормализация
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
x_train = x_train.reshape(60000, 28, 28, 1).astype('float32')
# x_test = x_test.reshape(10000, 28, 28, 1).astype('float32')
y_train = y_train.reshape(60000, ).astype('int64')
# y_test = y_test.reshape(10000, ).astype('int64')
В исходном наборе данных (выборка train) содержится 60000 записей в формате 28x28:

Такой набор данных уже выглядит более убедительным, чем 1000 записей, пусть и нормализованных.
Примечание: я специально не нормализовал загруженный набор данных чтобы частично "оправдать" использование компонента Transform в будущем конвейере машинного обучения, однако его использование необходимо и по другим соображениям, которые будут раскрыты в разделе про непосредственную разработку конвейера.
Теперь необходимо определить 2 очень важные функции, с помощью которых можно определить схему для хранения в двоичном формате какой-либо колонки в tfrecord-файле:
# Схема для хранения меток из набора данных
def wrap_int64(val):
return tf.train.Feature(int64_list=tf.train.Int64List(value=[val]))
# Схема для хранения float-значений признаков
def wrap_float(val):
return tf.train.Feature(float_list=tf.train.FloatList(value=val))
После того, как эти функции были определены, необходимо определить одну функцию для формирования tfrecord-файла и функцию для чтения tfrecord-файла для тестирования.
# Преобразование набора данных в tfrecord-файл
@tf.function
def convert_tfrecord(images, labels, out_path):
# Открываем поток для записи в tfrecord-файл и обращаемся к нему через writer
with tf.io.TFRecordWriter(out_path) as writer:
# Упаковываем элементы набора данных и проходим по каждому элементу
for image, label in zip(images, labels):
# Изменяем размер изображения (осуществляем процедуру flatten)
img = image.reshape((784, ))
# Определение элемента одной записи
mnist = {
'image_class': wrap_int64(int(label)),
'image_floats': wrap_float(img)
}
# Определяем данные и формируем элемент прототипа (Example)
feature = tf.train.Features(feature=mnist)
example = tf.train.Example(features=feature)
# Сериализация прототипа в строку
serialized = example.SerializeToString()
# Запись элемента в tfrecord-файл (построчно)
writer.write(serialized)
Вызов данной функции для формирования tfrecord-файла будет выглядеть следующим образом:
out_path='/content/train.tfrecord'
convert_tfrecord(x_train, y_train, out_path)
Теперь необходимо проверить содержание tfrecord-файла и соответствие его структуре уже существующего tfrecord-файла из примера MNIST TFX. Для этого напишем другую функцию, которая будет парсить только один элемент из tfrecord-файла:
# Парсинг одного элемента из tfrecord-файла
@tf.function
def convert_back(serialized):
# Форма для парсинга входных данных
feature = {
'image_class' : tf.io.VarLenFeature(tf.int64),
'image_floats' : tf.io.FixedLenFeature((784, ), tf.float32)
}
# Парсинг одного прототипа из входных данных (одной строки tfrecord-файла)
parsed_example = tf.io.parse_single_example(serialized=serialized, features=feature)
# Получение признака и метки из одной строки tfrecord-файла
image = parsed_example['image_floats']
# Для тестов можно также изменить размер image
# image = tf.reshape(image, shape=[28, 28, 1])
label = parsed_example['image_class']
return image, label
Важно заметить, что у определённых функций convert_tfrecord и convert_back присутствуют декораторы tf.function. Это необходимо для того, чтобы в процессе выполнения программного кода не было ошибок связанных с "eager execution".
Для сравнения /data/mnist.tfrecord из MNIST TFX и полученного файла train.tfrecord будем использовать следующую функцию:
@tf.function
def data_gen_output(filename):
# Создание датасета из tfrecord-файлов по определённому пути
raw_dataset = tf.data.TFRecordDataset(filenames=[filename])
# Получение из TFRecordDataset одного элемента
for raw_record in raw_dataset.take(1):
# Парсим одну строку (или запись) из tfrecord-файла
item = convert_back(raw_record)
# Возвращаем размер тензора признаков и меток
return (item[0].shape, item[1].shape)
Теперь выполняем сравнение двух записей из разных tfrecord-файлов:

Как видно из рисунка 2 сформированный файл train.tfrecord содержит ту же форму признаков и меток, что и файл mnist.tfrecord из MNIST TFX. Таким образом получилось расширить набор данных с 1000 записей до 60000, что должно хватить для обучения нейронной сети для классификации рукописных цифр.
Теперь можно приступать к разработке конвейера машинного обучения.
Определение базовых функций
Начнём с создания файла mnist_utils_native_keras_base.py, который будет содержать основные (базовые) элементы нашего конвейера (например, нейросетевую модель).
Начнём с импорта необходимых библиотек:
from typing import List
import absl
import tensorflow as tf
import tensorflow_transform as tft
from tfx.components.trainer.fn_args_utils import DataAccessor
from tfx_bsl.tfxio import dataset_options
Можно сразу отметить, что подключается модуль DataAccessor, который служит важным компонентом для создания т.н. фабрик наборов данных, которые можно использовать для обучения модели.
Определим теперь константы, которые будут содержать имена полей для признаков и меток, а также функцию, которая будет возвращать эти имена с определённым префиксом, чтобы можно было различать имена полей обработанного набора данных (нормализованного) и исходного:
# Имя для поля признаков
IMAGE_KEY = 'image_floats'
# Имя для поля меток
LABEL_KEY = 'image_class'
# Функция для получения имени поля с постфиксом
def transformed_name(key):
return key + '_xf'
Теперь определим источник данных, который будет сформирован из файлов tfrecord, которые будут расположены в определённой папке (например, в папке data):
# Определение источника входных данных для обучения модели
def input_fn(file_pattern: List[str],
data_accessor: DataAccessor,
tf_transform_output: tft.TFTransformOutput,
batch_size: int = 200) -> tf.data.Dataset:
# Создание фабрики для доступа к набору данных для обучения или валидации
return data_accessor.tf_dataset_factory(
file_pattern,
dataset_options.TensorFlowDatasetOptions(
batch_size=batch_size, label_key=transformed_name(LABEL_KEY)),
tf_transform_output.transformed_metadata.schema).repeat()
Сразу отмечаем, что здесь используется label_key=transformed_name(LABEL_KEY)
, что означает использование в построенном наборе данных уже нормализованных данных (это важно).
В этом же файле определяем функцию для сборки модели:
# Сборка модели нейронной сети для классификации цифр из MNIST
def build_keras_model() -> tf.keras.Model:
model = tf.keras.Sequential()
model.add(tf.keras.layers.InputLayer(shape=(784, ), dtype=tf.float32, name=transformed_name(IMAGE_KEY)))
model.add(tf.keras.layers.Dense(64, activation='relu'))
model.add(tf.keras.layers.Dropout(0.2))
model.add(tf.keras.layers.Dense(64, activation='relu'))
model.add(tf.keras.layers.Dropout(0.2))
model.add(tf.keras.layers.Dense(10))
# Компиляция модели
model.compile(
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
optimizer=tf.keras.optimizers.RMSprop(learning_rate=0.0015),
metrics=['sparse_categorical_accuracy'])
# Суммаризация
model.summary(print_fn=absl.logging.info)
return model
Само содержание модели, в данном случае, не очень важно. Это классический пример, который может быть изменён. Стоит обратить внимание на слой InputLayer и то, как он определён. Входными данными выступают уже нормализованные признаки с формой тензора (784, ).
Последней функцией в данном файле будет обработка ненормализованных данных:
# Обработка входных данных (используется в Transform)
def preprocessing_fn(inputs):
outputs = {}
# Нормализация значений признаков
outputs[transformed_name(IMAGE_KEY)] = inputs[IMAGE_KEY] / 255.0
outputs[transformed_name(LABEL_KEY)] = inputs[LABEL_KEY]
return outputs
Здесь всё довольно просто - переданные в функцию признаки просто нормализуются и имена соответствующих полей изменяются (им добавляется постфикс "xf"). На выходе получаем словарь outputs, который содержит признаки и метки.
Теперь перейдём к созданию следующего файла mnist_utils_native_keras.py.
Определение функций для элементов конвейера TFX
В файле mnist_utils_native_keras.py будут находится функции, которые будут явным образом вызываться различными элементами будущего конвейера машинного обучения.
Начнём с подключение необходимых библиотек:
import tensorflow as tf
import tensorflow_transform as tft
from tfx.components.trainer.fn_args_utils import FnArgs
import mnist_utils_native_keras_base as base
Здесь стоит обратить внимание на компонент FnArgs, поскольку взаимодействия с данным компонентом будет много в программном коде данного файла.
Теперь определим функцию, которая будет вызываться компонентом Transform конвейера машинного обучения:
# TFX Transform будет вызывать эту функцию
def preprocessing_fn(inputs):
return base.preprocessing_fn(inputs)
Данная функция просто делегирует выполнение обработки входных данных другой функции из файла mnist_utils_native_keras.py.
Наибольший интерес вызывает следующая функция, которая вызывается компонентом Trainer, основной задачей которого является обучение модели:
# TFX Trainer будет вызывать эту функцию
def run_fn(fn_args: FnArgs):
# Определяем размер пакета
batch_size = 32
tf_transform_output = tft.TFTransformOutput(fn_args.transform_output)
# Определяем набор данных для обучения и валидации
train_dataset = base.input_fn(fn_args.train_files, fn_args.data_accessor,
tf_transform_output, batch_size)
eval_dataset = base.input_fn(fn_args.eval_files, fn_args.data_accessor,
tf_transform_output, batch_size)
# Определяем стратегию распределённого обучения
mirrored_strategy = tf.distribute.MirroredStrategy()
# Генерируем модель в контексте выбранной стратегии
with mirrored_strategy.scope():
model = base.build_keras_model()
# Пишем логи по пути для tensorboard
tensorboard_callback = tf.keras.callbacks.TensorBoard(
log_dir=fn_args.model_run_dir, update_freq='epoch')
print("TensorBoard logs write to: ", fn_args.model_run_dir)
#print(train_dataset)
#for raw_record in train_dataset.take(1):
#print(raw_record)
#print(raw_record[0]["image_floats_xf"].numpy())
#print(raw_record[1].numpy())
#serving_model_dir = "/".join(list(fn_args.serving_model_dir.split('/')[0:-1]))
#serving_model_dir = fn_args.serving_model_dir
# Запускаем процесс обучения модели
model.fit(
train_dataset,
epochs=32,
batch_size=batch_size,
steps_per_epoch=fn_args.train_steps // batch_size,
validation_data=eval_dataset,
validation_steps=fn_args.eval_steps // batch_size,
callbacks=[tensorboard_callback])
#signatures = {
# 'serving_default':
# _get_serve_tf_examples_fn(
# model, tf_transform_output).get_concrete_function(
# tf.TensorSpec(shape=[None], dtype=tf.string, name='examples')
# )
#}
# Экспорт модели
model.export(fn_args.serving_model_dir)
#tf.saved_model.save(model, fn_args.serving_model_dir, signatures=signatures)
Обратите внимание на закомментированный код - это участки, которые значительно отличаются от примера MNIST TFX, поскольку содержат "рабочий код". О проблеме в MNIST TFX я расскажу чуть позже, когда доберёмся до работы с TensorFlow Serving.
Теперь перейдём к самому главному файлу конвейера - mnist_pipeline_native_keras.py.
Определение конвейера машинного обучения TFX
Начнём как обычно - с определения подключаемых библиотек:
import os
from typing import List
import absl
import tensorflow_model_analysis as tfma
from tfx.components import Evaluator
from tfx.components import ExampleValidator
from tfx.components import ImportExampleGen
from tfx.components import Pusher
from tfx.components import SchemaGen
from tfx.components import StatisticsGen
from tfx.components import Trainer
from tfx.components import Transform
from tfx.orchestration import metadata
from tfx.orchestration import pipeline
from tfx.orchestration.beam.beam_dag_runner import BeamDagRunner
from tfx.proto import pusher_pb2
from tfx.proto import trainer_pb2
На этот раз их значительно больше. Стоит обратить внимание на компоненты конвейера МО - Evaluator, ExampleValidator, ImportExampleGen, Pusher, SchemaGen, StatisticsGen, Trainer и Transform.
Краткое описание элементов конвейера МО будет дано чуть позже, когда они будут выстроены в определённой последовательности пайплайна конвейера.
Теперь нужно определить константы для путей и аргументов оркестратора (BeamDagRunner):
_pipeline_name = 'mnist_native_keras'
# Директория проекта
_dir_root = os.path.join('/home', 'tensorflow-mnist-conveyor')
# Директория для MNIST данных
_data_root = os.path.join(_dir_root, 'data')
# Модуль для выполнения определённых пользовательских функций Transform и Trainer
_module_file = os.path.join(_dir_root, 'mnist_utils_native_keras.py')
# Путь, который будет прослушиваться сервером моделей. Pusher выведет сюда обученную модель
_serving_model_dir = os.path.join(_dir_root, 'serving_model', _pipeline_name)
# Директория для хранения информации о конвейере МО
_tfx_root = os.path.join('/home', 'tfx')
# Директория для хранения результатов выполнения каждого элемента пайплайна
_pipeline_root = os.path.join(_tfx_root, 'pipelines', _pipeline_name)
# Путь до хранения ML-метаданных SQLite
_metadata_path = os.path.join(_tfx_root, 'metadata', _pipeline_name, 'metadata.db')
# Аргументы конвейера для Beam
_beam_pipeline_args = [
'--direct_running_mode=multi_processing',
# 0 означает автоматическое определение в зависимости от количества
# доступных процессоров во время выполнения
'--direct_num_workers=0',
]
Теперь приступим к определению самой большой функции, в которой происходит сборка всех элементов конвейера, их конфигурация и определение их последовательности для пайплайна.
Начнём с определения функции и конфигурации первого элемента пайплайна - ImportExampleGen:
# Создание пайплайна с определёнными параметрами для классификации рукописных цифр MNIST
def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
module_file: str, serving_model_dir: str, metadata_path: str,
beam_pipeline_args: List[str], accuracy_threshold: float = 0.8) -> pipeline.Pipeline:
# Импорт данных в конвейер
example_gen = ImportExampleGen(input_base=data_root)
Если вывести example_gen с помощью функции print, то получим следующий результат:

Как видно из рисунка 3 в консоль была выведена основная информация, которая касается данного компонента. Сразу можно обратить внимание на поле component_id, которое равняется ImportExampleGen и поле examples, которое в дальнейшем часто будет использоваться. По сути поле examples содержит "выходной канал" (OutputChannel), который позволяет обращаться к прототипам (Examples) загруженных tfrecord-файлов.
Разумеется, все загруженные из tfrecord-файлов записи будут парсится согласно схеме, которая будет получена после прохождения двух следующих компонентов пайплайна - StatisticsGen и SchemaGen:
# Вычисляет статистику по данным
statistics_gen = StatisticsGen(examples=example_gen.outputs['examples'])
# Генерация схема на основе файлов статистики
schema_gen = SchemaGen(
statistics=statistics_gen.outputs['statistics'], infer_feature_shape=True)
Компонент StatisticsGen вычисляет различную статистику по полученным данным. В данном случае, данными выступают необработанные записи из tfrecord-файлов. В компоненте SchemaGen происходит уже формирование схемы на основе статистики, полученной компонентом StatisticsGen. В дальнейшем данная схема будет часто использоваться в других компонентах, в которых требуется работа с исходными необработанными данными (tfrecord-записями).
Следующие компоненты - ExampleValidator и Transform:
# Выполняет обнаружение аномалий на основе статистики и схемы данных
example_validator = ExampleValidator(
statistics=statistics_gen.outputs['statistics'],
schema=schema_gen.outputs['schema'])
# Преобразование данных
transform = Transform(
examples=example_gen.outputs['examples'],
schema=schema_gen.outputs['schema'],
module_file=module_file)
Компонент ExampleValidator проверяет насколько хорошо подходит сгенерированная схема данных текущему распределению набора данных по статистике, полученной от компонента StatisticsGen.
В компонент Transform мы передаём уже исходный загруженный набор данных (состоящий из tfrecord-файлов или файла) и схему, по которой нужно этот набор данных парсить.
В этом же компоненте уже идёт взаимодействие с файлом, где определены вызываемые конвейером TFX функции - mnist_utils_keras.py (передаётся через параметр module_file).
После компонента Transform идёт центральный компонент Trainer:
# Создание компонента Trainer
def _create_trainer(module_file, component_id):
return Trainer(
module_file=module_file,
examples=transform.outputs['transformed_examples'],
transform_graph=transform.outputs['transform_graph'],
schema=schema_gen.outputs['schema'],
train_args=trainer_pb2.TrainArgs(num_steps=50000),
eval_args=trainer_pb2.EvalArgs(num_steps=10000)).with_id(component_id)
trainer = _create_trainer(module_file, 'Trainer.mnist')
Здесь важно отметить, что в качестве examples передаются уже не исходные необработанные данные (example_gen.outputs['examples']
), а обработанные данные из компонента Transform (transform.outputs['transformed_examples']
).
Тут же определяется количество тренировочных и валидационных записей из набора данных (50000 - на обучение, 10000 - на валидацию). Это сделано через аргумент train_args и eval_args.
Следующим шагом определим компонент Evaluator, который будет оценивать работу нашей модели:
# Конфигурация для оценки качества модели-кандидата
eval_config = tfma.EvalConfig(
model_specs=[tfma.ModelSpec(label_key='image_class_xf')],
slicing_specs=[tfma.SlicingSpec()],
metrics_specs=[
tfma.MetricsSpec(metrics=[
tfma.MetricConfig(
class_name='SparseCategoricalAccuracy',
threshold=tfma.MetricThreshold(
value_threshold=tfma.GenericValueThreshold(
lower_bound={'value': accuracy_threshold})))
])
])
# Использует TFMA для вычисления статистики оценки характеристик модели.
evaluator = Evaluator(
#examples=example_gen.outputs['examples'],
examples=transform.outputs['transformed_examples'],
model=trainer.outputs['model'],
eval_config=eval_config).with_id('Evaluator.mnist')
Сперва определяется конфиг, который будет использоваться для оценки (в данном случае - TFMA). Затем идёт уже определение компонента Evaluator.
Закомментированный код я оставил специально, поскольку к нему мы ещё вернёмся чуть позже, чтобы продемонстрировать ошибку, из-за которой исходный пример MNIST TFX не позволяет нормально запустить модель через TensorFlow Serving и отправлять запрос на классификацию изображения.
Пока что отмечу, что в Evaluator важно отправлять те данные, на которых модель уже обучена - transform.outputs['transformed_examples']
.
Цепочку пайплайна завершает компонент Pusher, который проверяет модель и сохраняет её по определённому пути:
# Проверяет, прошла ли модель этапы проверки, и отправляет модель
# в пункт назначения файла, если проверка пройдена.
pusher = Pusher(
model=trainer.outputs['model'],
model_blessing=evaluator.outputs['blessing'],
push_destination=pusher_pb2.PushDestination(
filesystem=pusher_pb2.PushDestination.Filesystem(
base_directory=serving_model_dir))).with_id('Pusher.mnist')
Теперь определим сам пайплайн (он является результатом вызова функции создания пайплайна):
return pipeline.Pipeline(
pipeline_name=pipeline_name,
pipeline_root=pipeline_root,
components=[
example_gen,
statistics_gen,
schema_gen,
example_validator,
transform,
trainer,
evaluator,
pusher,
],
enable_cache=True,
metadata_connection_config=metadata.sqlite_metadata_connection_config(
metadata_path),
beam_pipeline_args=beam_pipeline_args)
Собственно, теперь обобщим последовательность компонентов нашего пайплайна.
Сначала в пайплайн загружается необработанный набор данных на основе tfrecord-файлов с помощью ImportExampleGen. Затем, чтобы определить схему по которой необработанный набор данных нужно парсить, мы вычисляем общую статистику в этом наборе данных с помощью StatisticsGen компонента и генерируем саму схему на основе этой статистики через SchemaGen компонент. После чего передаём эту схему и необработанные данные ExampleValidator, чтобы он проверил корректность сгенерированной схемы SchemaGen.
Следующим шагом является нормализация загруженных данных с помощью компонента Transform и запуск компонента Trainer, для обучения модели. Далее мы оцениваем качество обучения нашей модели с помощью Evaluator и загружаем нашу завершённую, проверенную модель по определённому пути с помощью компонента Pusher. На этом всё, наш пайплайн готов.
Полная реализация функции _create_pipeline
# Создание пайплайна с определёнными параметрами для классификации рукописных цифр MNIST
def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
module_file: str, serving_model_dir: str, metadata_path: str,
beam_pipeline_args: List[str], accuracy_threshold: float = 0.8) -> pipeline.Pipeline:
# Импорт данных в конвейер
example_gen = ImportExampleGen(input_base=data_root)
#print(example_gen.outputs['examples'])
# Вычисляет статистику по данным для визуализации и проверки на примере
statistics_gen = StatisticsGen(examples=example_gen.outputs['examples'])
# Генерация схема на основе файлов статистики
schema_gen = SchemaGen(
statistics=statistics_gen.outputs['statistics'], infer_feature_shape=True)
# Выполняет обнаружение аномалий на основе статистики и схемы данных
example_validator = ExampleValidator(
statistics=statistics_gen.outputs['statistics'],
schema=schema_gen.outputs['schema'])
#print(schema_gen.outputs["schema"])
# Преобразование данных
transform = Transform(
examples=example_gen.outputs['examples'],
schema=schema_gen.outputs['schema'],
module_file=module_file)
# Создание компонента Trainer
def _create_trainer(module_file, component_id):
return Trainer(
module_file=module_file,
examples=transform.outputs['transformed_examples'],
transform_graph=transform.outputs['transform_graph'],
schema=schema_gen.outputs['schema'],
train_args=trainer_pb2.TrainArgs(num_steps=50000),
eval_args=trainer_pb2.EvalArgs(num_steps=10000)).with_id(component_id)
trainer = _create_trainer(module_file, 'Trainer.mnist')
# Конфигурация для оценки качества модели-кандидата
eval_config = tfma.EvalConfig(
model_specs=[tfma.ModelSpec(label_key='image_class_xf')],
slicing_specs=[tfma.SlicingSpec()],
metrics_specs=[
tfma.MetricsSpec(metrics=[
tfma.MetricConfig(
class_name='SparseCategoricalAccuracy',
threshold=tfma.MetricThreshold(
value_threshold=tfma.GenericValueThreshold(
lower_bound={'value': accuracy_threshold})))
])
])
# Использует TFMA для вычисления статистики оценки характеристик модели.
evaluator = Evaluator(
#examples=example_gen.outputs['examples'],
examples=transform.outputs['transformed_examples'],
model=trainer.outputs['model'],
eval_config=eval_config).with_id('Evaluator.mnist')
# Проверяет, прошла ли модель этапы проверки, и отправляет модель
# в пункт назначения файла, если проверка пройдена.
pusher = Pusher(
model=trainer.outputs['model'],
model_blessing=evaluator.outputs['blessing'],
push_destination=pusher_pb2.PushDestination(
filesystem=pusher_pb2.PushDestination.Filesystem(
base_directory=serving_model_dir))).with_id('Pusher.mnist')
return pipeline.Pipeline(
pipeline_name=pipeline_name,
pipeline_root=pipeline_root,
components=[
example_gen,
statistics_gen,
schema_gen,
example_validator,
transform,
trainer,
evaluator,
pusher,
],
enable_cache=True,
metadata_connection_config=metadata.sqlite_metadata_connection_config(
metadata_path),
beam_pipeline_args=beam_pipeline_args)
Заключительным шагом определим поведение файла mnist_pipeline_native_keras.py если он явно запущен интерпретатором python:
if __name__ == '__main__':
absl.logging.set_verbosity(absl.logging.INFO)
# Передача пайплайна в оркестратор
BeamDagRunner().run(
_create_pipeline(
pipeline_name=_pipeline_name,
pipeline_root=_pipeline_root,
data_root=_data_root,
module_file=_module_file,
serving_model_dir=_serving_model_dir,
metadata_path=_metadata_path,
beam_pipeline_args=_beam_pipeline_args))
На данном заключительном шаге мы передаём созданный пайплайн в оркестратор, который запускает отдельные элементы пайплайна и передаёт между ними необработанные и обработанные данные, в зависимости от конфигураций, которые были определены для компонентов пайплайна.
Запуск конвейера машинного обучения TFX
Чтобы запустить конвейер МО достаточно запустить следующую команду в консоли:
python3 mnist_pipeline_native_keras.py
Обучение нейронной сети займёт 32 эпохи, каждая из которых будет обрабатывать 1562 записи из нормализованного набора данных:


На рисунке 5 представлен результат завершения работы конвейера МО, в котором оповещается о успешном завершении выполнения компонента Pusher. Это значит, что в директории проекта находится папка serving_model (указанная в виде констант, в файле mnist_native_pipeline_keras.py), в которой и находится результат выполнения конвейера.
Посмотрим содержимое проекта:

Действительно, папка есть. Посмотрим на содержимое этой папки:

В папке находится модель, её подпись и веса модели.
Теперь рассмотрим содержимое папки /home/tfx/pipelines/mnist_native_keras (путь может быть другим):

В этой папке находятся результаты выполнения каждого элемента пайплайна конвейера.
Напомню, что в программном коде при запуске обучения модели мы использовали callback, с помощью которого логировали информацию об обучении в специальную папку для визуализации через TensorBoard:
# ...
# Пишем логи по пути для tensorboard
tensorboard_callback = tf.keras.callbacks.TensorBoard(
log_dir=fn_args.model_run_dir, update_freq='epoch')
print("TensorBoard logs write to: ", fn_args.model_run_dir)
# Запускаем процесс обучения модели
model.fit(
train_dataset,
epochs=32,
batch_size=batch_size,
steps_per_epoch=fn_args.train_steps // batch_size,
validation_data=eval_dataset,
validation_steps=fn_args.eval_steps // batch_size,
callbacks=[tensorboard_callback])
# ...
Куда сохранены эти логи? В моём случае - в папке /home/tfx/pipelines/mnist_native_keras/Trainer.mnist/model_run/11.
Чтобы запустить TensorBoard для анализа процесса обучения достаточно выполнить следующую команду в консоли:
tensorboard --logdir /home/tfx/pipelines/mnist_native_keras/Trainer.mnist/model_run/11

Теперь, если перейти в браузере по пути "http://localhost:6006/" в адресной строке браузера, то можно увидеть базовый интерфейс TensorBoard со статистикой обучения модели:

На данный момент выводится только основная информация - потери на тренировочном и валидационном за каждую эпоху. Причём не очень хорошие, можно было бы остановиться уже на 8-ой эпохе, чтобы избежать переобучения, однако для примера сойдёт.
TensorFlow Serving
Когда мы научились запускать конвейер машинного обучения и оценивать результат работы модели в TensorBoard самое время научится развёртывать модель через TensorFlow Model Server для доступа к ней по API.
Саму концепцию serving'а модели отлично демонстрирует следующее изображение:

Как видно из рисунка, сначала модель обучается, затем сохраняется в своём репозитории (им может выступать и просто папка на жёстком диске) и после оборачивается сервером, который предоставляет клиенту API для взаимодействия с этой моделью.
Эту серверную обёртку можно сделать и вручную. Например, с помощью каких-нибудь фреймворков для Python по типу Flask или Django. Однако TensorFlow предоставляет такой механизм уже из под коробки. Данному механизму требуется только правильная сигнатура модели, по которой он может определить входные и выходные данные для API и, соответственно, самой модели.
Сигнатура модели (или signatures) - это определённая спецификация входных и выходных данных модели. Она служит контрактом, который определяет формы и типы входных и выходных данных модели (что она ожидает и что будет возвращать).
Механизм TensorFlow Model Server использует сигнатуры для генерации клиентского API. Очень важно следить за сохраняемой сигнатурой модели, поскольку её некорректное содержание может сделать взаимодействие с клиентом невозможным.
Приведу пример такого некорректного содержания сигнатуры. В MNIST TFX, при сохранении модели выполняется следующая инструкция:
# ...
signatures = {
'serving_default':
_get_serve_tf_examples_fn(
model, tf_transform_output).get_concrete_function(
tf.TensorSpec(shape=[None], dtype=tf.string, name='examples'))
}
tf.saved_model.save(model, fn_args.serving_model_dir, signatures=signatures)
# ...
А в компонент Evaluator передаются необработанные исходные данные:
# ...
evaluator = Evaluator(
examples=example_gen.outputs['examples'],
model=trainer.outputs['model'],
eval_config=eval_config).with_id('Evaluator.mnist')
# ...
Именно благодаря этой цепочке сигнатура модели будет выглядеть следующим образом:
MetaGraphDef with tag-set: 'serve' contains the following SignatureDefs:
signature_def['__saved_model_init_op']:
The given SavedModel SignatureDef contains the following input(s):
The given SavedModel SignatureDef contains the following output(s):
outputs['__saved_model_init_op'] tensor_info:
dtype: DT_INVALID
shape: unknown_rank
name: NoOp
Method name is:
signature_def['serving_default']:
The given SavedModel SignatureDef contains the following input(s):
inputs['examples'] tensor_info:
dtype: DT_STRING
shape: (-1)
name: serving_default_examples:0
The given SavedModel SignatureDef contains the following output(s):
outputs['output_0'] tensor_info:
dtype: DT_FLOAT
shape: (-1, 10)
name: StatefulPartitionedCall:0
Method name is: tensorflow/serving/predict
The MetaGraph with tag set ['serve'] contains the following ops: {'NoOp', 'Const', 'AssignVariableOp', 'RestoreV2', 'StaticRegexFullMatch', 'Shape', 'DisableCopyOnRead', 'PlaceholderWithDefault', 'Placeholder', 'PartitionedCall', 'MergeV2Checkpoints', 'ShardedFilename', 'VarIsInitializedOp', 'StatefulPartitionedCall', 'VarHandleOp', 'Identity', 'Select', 'ParseExampleV2', 'StridedSlice', 'Relu', 'SaveV2', 'Fill', 'MatMul', 'StringJoin', 'BiasAdd', 'ReadVariableOp', 'RealDiv', 'Pack'}
Concrete Functions:
Function Name: '_default_save_signature'
Option #1
Callable with:
Argument #1
inputs: TensorSpec(shape=(None, 784), dtype=tf.float32, name='inputs')
Сама сигнатура, которая используется для определения API TensorFlow Model Serving определена здесь: signature_def['serving_default'].
То есть, в примере MNIST TFX входными данными для API должна быть строка. Что это за строка? Нигде не указано, но это бинарная строка, которую при всём желании в JSON добавить у меня не получилось. По крайней мере таким образом, чтобы TensorFlow Model Serving тело запроса корректно спарсил, передал эти данные в модель и вернул результат.
Причём любое изменение сигнатуры на более корректную в примере MNIST TFX недопустимо, поскольку даже сама функция генерации сигнатуры:
def _get_serve_tf_examples_fn(model, tf_transform_output):
"""Returns a function that parses a serialized tf.Example."""
model.tft_layer = tf_transform_output.transform_features_layer()
@tf.function
def serve_tf_examples_fn(serialized_tf_examples):
"""Returns the output to be used in the serving signature."""
feature_spec = tf_transform_output.raw_feature_spec()
feature_spec.pop(base.LABEL_KEY)
parsed_features = tf.io.parse_example(serialized_tf_examples, feature_spec)
transformed_features = model.tft_layer(parsed_features)
return model(transformed_features)
return serve_tf_examples_fn
В контексте данного примера попросту бессмысленна. Да, она хорошо справляется с задачей определения выходного формата результата, но с ней и обычный экспорт модели справляется отлично.
В общем-то, сигнатура, которая генерируется примером MNIST TFX по умолчанию, совершенно не подходит для того, чтобы модель можно было обернуть сервером с помощью TensorFlow Model Server. До модели попросту не получится "достучаться" по API. Какое может быть решение? Я нашёл только одно: явный экспорт модели с автоматической генерацией сигнатуры модели и использование обработанных исходных данных в Evaluator, чтобы он корректно работал и не выкидывал исключения. В MNIST TFX об этом ни сказано ни слова.
Решение:
# ...
# Экспорт модели
model.export(fn_args.serving_model_dir)
# ...
# ...
evaluator = Evaluator(
examples=transform.outputs['transformed_examples'],
model=trainer.outputs['model'],
eval_config=eval_config).with_id('Evaluator.mnist')
# ...
Благодаря данному подходу сигнатура модели сохраняется в корректном формате и позволяет оборачивать модель вокруг сервера для взаимодействия с клиентом по API:
MetaGraphDef with tag-set: 'serve' contains the following SignatureDefs:
signature_def['__saved_model_init_op']:
The given SavedModel SignatureDef contains the following input(s):
The given SavedModel SignatureDef contains the following output(s):
outputs['__saved_model_init_op'] tensor_info:
dtype: DT_INVALID
shape: unknown_rank
name: NoOp
Method name is:
signature_def['serve']:
The given SavedModel SignatureDef contains the following input(s):
inputs['image_floats_xf'] tensor_info:
dtype: DT_FLOAT
shape: (-1, 784)
name: serve_image_floats_xf:0
The given SavedModel SignatureDef contains the following output(s):
outputs['output_0'] tensor_info:
dtype: DT_FLOAT
shape: (-1, 10)
name: StatefulPartitionedCall:0
Method name is: tensorflow/serving/predict
signature_def['serving_default']:
The given SavedModel SignatureDef contains the following input(s):
inputs['image_floats_xf'] tensor_info:
dtype: DT_FLOAT
shape: (-1, 784)
name: serving_default_image_floats_xf:0
The given SavedModel SignatureDef contains the following output(s):
outputs['output_0'] tensor_info:
dtype: DT_FLOAT
shape: (-1, 10)
name: StatefulPartitionedCall_1:0
Method name is: tensorflow/serving/predict
The MetaGraph with tag set ['serve'] contains the following ops: {'DisableCopyOnRead', 'ReadVariableOp', 'MatMul', 'VarHandleOp', 'Pack', 'Identity', 'StatefulPartitionedCall', 'Placeholder', 'VarIsInitializedOp', 'MergeV2Checkpoints', 'RestoreV2', 'Const', 'AssignVariableOp', 'Select', 'SaveV2', 'Relu', 'BiasAdd', 'StringJoin', 'NoOp', 'ShardedFilename', 'StaticRegexFullMatch'}
Concrete Functions:
Function Name: 'serve'
Option #1
Callable with:
Argument #1
image_floats_xf: TensorSpec(shape=(None, 784), dtype=tf.float32, name='image_floats_xf')
Теперь у модели на входе не строка, а массив типа данных tf.float32, а на выходе одно значение - предсказание модели к какому классу относится число на изображении. Всё как и должно быть.
Собственно, а как посмотреть на сигнатуру модели и убедится, что она корректна? Для этого достаточно использовать следующую команду:
saved_model_cli show --dir <путь до сохранённой модели> --all
Теперь нужно запустить TensorFlow Model Server и передать в качестве его параметра путь до модели, которую нужно обернуть серверным API по её сигнатуре.
tensorflow_model_server \
--rest_api_port=8501 \
--model_name=saved_model \
--model_base_path="/home/tensorflow-mnist-conveyor/serving_model/mnist_native_keras"

Теперь модель обёрнута в API, с помощью которого можно с ней взаимодействовать. В частности - предсказывать какое число представлено на изображении.
Для тестирования API можно использовать следующий программный код:
import requests
import json
import tensorflow as tf
import numpy as np
tf.executing_eagerly()
tf.config.run_functions_eagerly(True)
tfrecord_path = "...tensorflow-tfx\\client-api\\train.tfrecord"
# Парсинг из tfrecord
@tf.function
def convert_back(serialized):
# Форма для парсинга входных данных
feature = {
'image_class' : tf.io.VarLenFeature(tf.int64),
'image_floats' : tf.io.FixedLenFeature((784, ), tf.float32)
}
# Парсинг одного экземпляра данных
parsed_example = tf.io.parse_single_example(serialized=serialized, features=feature)
image = parsed_example['image_floats']
label = parsed_example['image_class']
return image, label
# Получение 10-ти спарсенных записей из tfrecord-файла
@tf.function
def data_gen_output(filename):
raw_dataset = tf.data.TFRecordDataset(filenames=[filename])
data = []
for raw_record in raw_dataset.take(10):
data.append(convert_back(raw_record))
return data
items = data_gen_output(tfrecord_path)
# Заголовок для HTTP-запроса
headers = {"content-type": "application/json"}
for i in range(len(items)):
img, label = items[i]
img = img.numpy()
data = {
# Определение сигнатуры
"signature_name": "serving_default",
# Определение входных данных для API запроса
"instances": [img.tolist()]
}
# Отправка запроса серверу
json_response = requests.post('http://localhost:8501/v1/models/saved_model:predict', data=json.dumps(data), headers=headers)
# Получение массива предсказанных значений
predictions = json.loads(json_response.text)['predictions']
# Определение класса, к которому относится цифра на изображении
defClass = int(np.argmax(predictions))
print("Predict: ", defClass)
print("Fact: ", label.values.numpy()[0])
print()
Данный программный код я запускал из ОС Windows, а сервер с моделью запущен в WSL 2.

В общем-то, полный цикл конвейера (настройка, запуск, обучение, выпуск, мониторинг, serving) был успешно выполнен.
Заключение
В рамках данной статьи был описан процесс создания конвейера машинного обучения с помощью TensorFlow TFX, а также мониторинг процесса обучения модели через TensorBoard и обёртку модели API для доступа к ней с помощью TensorFlow Server Model. Представлен вариант программного кода для тестирования API запущенного сервера.
Список использованных источников
Ссылка на исходный код: tensorflow-mnist-conveyor