Pull to refresh
110.59
Open Data Science
Крупнейшее русскоязычное Data Science сообщество

Введение в MLflow

Level of difficultyMedium
Reading time19 min
Views5K

MLflow - это инструмент для управления жизненным циклом машинного обучения: отслеживание экспериментов, управление и деплой моделей и проектов. В этом руководстве мы посмотрим, как организовать эксперименты и запуски, оптимизировать гиперпараметры с помощью optuna, сравнивать модели и выбирать лучшие параметры. Также рассмотрим логирование моделей, использование их в разных форматах, упаковку проекта в MLproject и установку удаленного Tracking Server MLflow.

Вы можете просто прочитать статью или повторить все шаги локально. Также по ссылке репозиторий, который является расширенной версией этого гайда на английском в README.

Подготовка env

Сначала нам нужно установить conda и создать новоет окружение:

name: mlflow-example
channels:
  - conda-forge
  - defaults
dependencies:
  - python=3.11.4
  - pip=24.0
  - mlflow=2.14.2
  - xgboost=2.0.3
  - jupyter=1.0.0
  - loguru=0.7.2
  - shap=0.45.1
  - pandas=2.2.2
  - scikit-learn=1.5.1
  - scipy=1.14.0
  - numpy=1.26.4
  - jupytext=1.16.3
  - psutil=6.0.0
  - boto3=1.34.148
  - psycopg2-binary=2.9.9
  - pip:
    - optuna==3.6.1
    - mlserver==1.3.5
    - mlserver-mlflow==1.3.5
    - mlserver-xgboost==1.3.5

Можно установить вручную или сохранить этот файл локально и вызвать:

conda env create -f conda.yaml

Теперь можно использовать это окружение в IDE или в терминале:

conda activate mlflow-example

MLflow UI

Выполните в терминале mlflow ui, чтобы запустить MLflow на localhost:5000.

Mlflow будет запущен в локальном режиме, по умолчанию он создаст папку mlruns для хранения артефактов и метаинформации.

MLflow UI
MLflow UI

Mlflow Projects

MLflow Project - это подход к структурированию проекта, следуя конвенциям, которые упрощают запуск проекта для других разработчиков (или автоматических инструментов). Каждый проект — это директория файлов или git-репозиторий, содержащий ваш код. MLflow может запускать эти проекты, основываясь на определенных правилах организации файлов в директории.

Файл MLproject помогает MLflow и другим пользователям понять и запустить ваш проект, указывая окружение, точки входа и возможные параметры для настройки:

name: Cancer_Modeling

conda_env: conda.yaml

entry_points:
  data-preprocessing:
    parameters:
      test-size: {type: float, default: 0.33}
    command: "python data_preprocessing.py --test-size {test-size}"
  hyperparameters-tuning:
    parameters:
      n-trials: {type: int, default: 10}
    command: "python hyperparameters_tuning.py --n-trials {n-trials}"
  model-training:
    command: "python model_training.py"
  data-evaluation:
    parameters:
      eval-dataset: {type: str}
    command: "python data_evaluation.py --eval-dataset {eval-dataset}"

Мы можем запускать эндпоинты проекта, используя интерфейс командной строки (CLI), либо API на Python.

Mlflow Experiments and Mlflow Runs

MLflow Experiments и MLflow Runs - это основные абстракции для структурирования проекта. Давайте рассмотрим пример разработки модели на открытом датасете с данными по забалеванию раком.

Data preprocessing

Этот шаг обычно более сложный, но здесь мы просто загрузим набор данных, разделим его на обучающую и тестовую выборки, залогируем несколько метрик в MLflow (количество образцов и признаков) и передадим сами наборы данных в артефакты MLflow.

import sys
import argparse
import mlflow
import warnings
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn import datasets
from loguru import logger

from config import config

# set up logging
logger.remove()
logger.add(sys.stdout, format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}")
warnings.filterwarnings('ignore')

def get_cancer_df():
    cancer = datasets.load_breast_cancer()
    X = pd.DataFrame(cancer.data, columns=cancer.feature_names)
    y = pd.Series(cancer.target)
    logger.info(f'Cancer data downloaded')
    return X, y


if __name__ == '__main__':
    
    parser = argparse.ArgumentParser()
    parser.add_argument("--test-size", default=config.default_test_size, type=float)
    TEST_SIZE = parser.parse_args().test_size
        
    logger.info(f'Data preprocessing started with test size: {TEST_SIZE}')
        
    # download cancer dataset
    X, y = get_cancer_df()

    # add additional features
    X['additional_feature'] = X['mean symmetry'] / X['mean texture']
    logger.info('Additional features added')

    # log dataset size and features count
    mlflow.log_metric('full_data_size', X.shape[0])
    mlflow.log_metric('features_count', X.shape[1])

    # split dataset to train and test part and log sizes to mlflow
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=TEST_SIZE)
    mlflow.log_metric('train_size', X_train.shape[0])
    mlflow.log_metric('test_size', X_test.shape[0])
    
    # log and register datasets
    train = X_train.assign(target=y_train)
    mlflow.log_text(train.to_csv(index=False),'datasets/train.csv')
    dataset_source_link = mlflow.get_artifact_uri('datasets/train.csv')
    dataset = mlflow.data.from_pandas(train, name='train', targets="target", source=dataset_source_link)
    mlflow.log_input(dataset)

    test = X_test.assign(target=y_test)
    mlflow.log_text(test.to_csv(index=False),'datasets/test.csv')
    dataset_source_link = mlflow.get_artifact_uri('datasets/test.csv')
    dataset = mlflow.data.from_pandas(train, name='test', targets="target", source=dataset_source_link)
    mlflow.log_input(dataset)
    
    logger.info('Data preprocessing finished')

Чтобы выполнить этот код сохраните его в файл data-preprocessing.py рядом с файлом MLproject и в командной строке вызовете:

mlflow run . --entry-point data-preprocessing --env-manager local --experiment-name Cancer_Classification --run-name Data_Preprocessing -P test-size=0.33

Теперь этот запуск должен быть доступен через ui. Секция datasets used заполнена благодаря mlflow.data api. Однако важно понимать, что это метаданные о даных, а не сами данные.

Модуль mlflow.data отслеживает информацию о наборах данных во время обучения и оценки модели: признаки, целевые значения, предсказания, название, схема и источник. Эти метаданные логируются с помощью mlflow.log_input().

Hyperparameters tuning

В этой части мы используем optuna для нахождения лучших гиперпараметров для XGBoost, используя встроенную кросс-валидацию для обучения и оценки модели. Кроме того, мы посмотрим, как собирать метрики во время процесса обучения.

import tempfile
import sys
import psutil
import os
import argparse
import logging
import warnings
import mlflow
import optuna
import pandas as pd
import xgboost as xgb
from xgboost.callback import TrainingCallback
from loguru import logger


# set up logging
logger.remove()
logger.add(sys.stdout, format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}")
warnings.filterwarnings('ignore')
logging.getLogger('mlflow').setLevel(logging.ERROR)
optuna.logging.set_verbosity(optuna.logging.ERROR)


# Custom callback for logging metrics
class LoggingCallback(TrainingCallback):
    def after_iteration(self, model, epoch, evals_log):
        for metric_name, metric_vals in evals_log['test'].items():
            mlflow.log_metric(f"{metric_name}", metric_vals[-1][0], step=epoch)
        return False


# Define an objective function for Optuna
def objective(trial):
    global dtrain

    # hyperparameters
    params = {
        "objective": trial.suggest_categorical('objective', ['binary:logistic']),
        "max_depth": trial.suggest_int("max_depth", 2, 8),
        "alpha": trial.suggest_float("alpha", 0.001, 0.05),
        "learning_rate": trial.suggest_float("learning_rate", 0.005, 0.5),
        "num_boost_round": trial.suggest_int("num_boost_round", 30, 300),
    }

    with mlflow.start_run(nested=True):

        mlflow.log_params(params)
        params.update(eval_metric=['auc', 'error'])
        num_boost_round = params["num_boost_round"]
        cv_results = xgb.cv(
            params,
            dtrain,
            num_boost_round=num_boost_round,
            nfold=3,
            callbacks=[LoggingCallback()],
            verbose_eval=False,
        )

        error = cv_results['test-error-mean'].iloc[-1]
        mlflow.log_metric("accuracy", (1 - error))
        logger.info(f"Attempt: {trial.number}, Accuracy: {1 - error}")

        return error


if __name__ == '__main__':

    N_TRIALS = 10
    # get arguments if running not in ipykernel
    parser = argparse.ArgumentParser()
    parser.add_argument("--n-trials", default=N_TRIALS, type=float)
    N_TRIALS = parser.parse_args().n_trials

    logger.info(f'Hyperparameters tuning started with: {N_TRIALS} trials')

    with mlflow.start_run() as run:

        # get experiment id
        experiment_id = run.info.experiment_id

        # get last finished run for data preprocessing
        last_run_id = mlflow.search_runs(
            experiment_ids=[experiment_id],
            filter_string=f"tags.mlflow.runName = 'Data_Preprocessing' and status = 'FINISHED'",
            order_by=["start_time DESC"]
        ).loc[0, 'run_id']

        # download train data from last run
        with tempfile.TemporaryDirectory() as tmpdir:
            mlflow.artifacts.download_artifacts(run_id=last_run_id, artifact_path='datasets/train.csv', dst_path=tmpdir)
            train = pd.read_csv(os.path.join(tmpdir, 'datasets/train.csv'))

        # convert to DMatrix format
        features = [i for i in train.columns if i != 'target']
        dtrain = xgb.DMatrix(data=train.loc[:, features], label=train['target'])

        logger.info('Starting optuna study')

        study = optuna.create_study(direction='minimize')
        study.optimize(objective, n_trials=N_TRIALS)
        best_trial = study.best_trial

        logger.info(f'Optimization finished, best params: {best_trial.params}')
        mlflow.log_params(best_trial.params)

        logger.info(f'Best trial Accuracy: {1 - best_trial.value}')
        mlflow.log_metric('accuracy', 1 - study.best_value)

Здесь мы используем conda в качестве env менджера: mlflow автоматически создаст для нас новый env из файла, указанного в конфигурации MLproject.

mlflow run . --entry-point hyperparameters-tuning --env-manager conda --experiment-name Cancer_Classification --run-name Hyperparameters_Search -P n-trials=10

Посмотрим на результаты в MLflow UI. Для структурирования проекта можно использовать вложенные запуски. Есть один основной запуск для настройки гиперпараметров, а все испытания собираются как вложенные запуски. MLflow также предоставляет возможность настраивать столбцы и порядок строк в этом представлении:

В сhart view можно сравнить запуски и настроить различные диаграммы. Использование XGBoost callbacks для логирования метрик в процессе обучения модели позволяет создавать графики с количеством деревьев на оси x.

Выберите несколько запусков, нажмите кнопку сравнения и выберите наиболее полезное представление. Функция mlflow compare может быть полезной при оптимизации гиперпараметров, так как помогает уточнить и корректировать границы возможных интервалов на основе результатов сравнения.

Системные метрики также могут отслеживаться на протяжении всего запуска. Хотя это и не дает точной оценки реальных требований, в некоторых случаях все же может быть полезным.

Log and register model

Возможно, но не обязательно, сохранять модель для каждого эксперимента и запуска. В большинстве случаев лучше сохранять параметры, а затем, выбрав лучшие, выполнить дополнительный запуск для сохранения модели. Здесь мы следуем той же логике: используем параметры из лучшего запуска для сохранения финальной модели и регистриреум ее для версионирования и использования через короткую ссылку.

import os
import sys
import tempfile
import mlflow
import warnings
import logging
import xgboost as xgb
import pandas as pd
from loguru import logger


# set up logging
warnings.filterwarnings('ignore')
logging.getLogger('mlflow').setLevel(logging.ERROR)
logger.remove()
logger.add(sys.stdout, format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}")


if __name__ == '__main__':

    logger.info('Model training started')
 
    mlflow.xgboost.autolog()

    with mlflow.start_run() as run:

        experiment_id = run.info.experiment_id
        
        run_id = run.info.run_id
        logger.info(f'Start mlflow run: {run_id}')
        
        # get last finished run for data preprocessing
        last_data_run_id = mlflow.search_runs(
            experiment_ids=[experiment_id],
            filter_string=f"tags.mlflow.runName = 'Data_Preprocessing' and status = 'FINISHED'",
            order_by=["start_time DESC"]
        ).loc[0, 'run_id']
    
        # download train and test data from last run
        with tempfile.TemporaryDirectory() as tmpdir:
            mlflow.artifacts.download_artifacts(run_id=last_data_run_id, artifact_path='datasets/train.csv', dst_path=tmpdir)
            mlflow.artifacts.download_artifacts(run_id=last_data_run_id, artifact_path='datasets/test.csv', dst_path=tmpdir)
            train = pd.read_csv(os.path.join(tmpdir, 'datasets/train.csv'))
            test = pd.read_csv(os.path.join(tmpdir, 'datasets/test.csv'))

        # convert to DMatrix format
        features = [i for i in train.columns if i != 'target']
        dtrain = xgb.DMatrix(data=train.loc[:, features], label=train['target'])
        dtest = xgb.DMatrix(data=test.loc[:, features], label=test['target'])

        # get last finished run for hyperparameters tuning
        last_tuning_run = mlflow.search_runs(
            experiment_ids=[experiment_id],
            filter_string=f"tags.mlflow.runName = 'Hyperparameters_Search' and status = 'FINISHED'",
            order_by=["start_time DESC"]
        ).loc[0, :]
        
        # get best params
        params = {col.split('.')[1]: last_tuning_run[col] for col in last_tuning_run.index if 'params' in col}
        params.update(eval_metric=['auc', 'error'])

        mlflow.log_params(params)
        
        model = xgb.train(
            dtrain=dtrain,
            num_boost_round=int(params["num_boost_round"]),
            params=params,
            evals=[(dtest, 'test')],
            verbose_eval=False,
            early_stopping_rounds=10
        )

        mlflow.log_metric("accuracy", 1 - model.best_score)
        
        # Log model as Booster
        input_example = test.loc[0:10, features]
        predictions_example = pd.DataFrame(model.predict(xgb.DMatrix(input_example)), columns=['predictions'])
        mlflow.xgboost.log_model(model, "booster", input_example=input_example)
        mlflow.log_text(predictions_example.to_json(orient='split', index=False), 'booster/predictions_example.json')

        # Register model
        model_uri = f"runs:/{run.info.run_id}/booster"
        mlflow.register_model(model_uri, 'CancerModelBooster')
        
        # Log model as sklearn completable XGBClassifier
        params.update(num_boost_round=model.best_iteration)
        model = xgb.XGBClassifier(**params)
        model.fit(train.loc[:, features], train['target'])
        mlflow.xgboost.log_model(model, "model", input_example=input_example)

        # log datasets
        mlflow.log_text(train.to_csv(index=False), 'datasets/train.csv')
        mlflow.log_text(test.to_csv(index=False),'datasets/test.csv')

        logger.info('Model training finished')

        # Register the model
        model_uri = f"runs:/{run.info.run_id}/model"
        mlflow.register_model(model_uri, 'CancerModel')
        
        logger.info('Model registered')
mlflow run . --entry-point model-training --env-manager conda --experiment-name Cancer_Classification --run-name Model_Training -P n-trials=10 

Благодаря функции mlflow.xgboost.autolog(), все метрики автоматически логируются, в том числе в процессе обучения:

После сохранения модели, мы можем получить доступ к странице артефактов внутри запуска. В артефактах можно хранить любые типы файлов, такие как пользовательские графики, текстовые файлы, изображения, наборы данных, скрипты Python и другие. Например, я конвертировал рабочий ноутбук в HTML и сохранил его как артефакт:

Для каждой модели MLflow автоматически создаёт YAML конфигурационный файл, называемый MLmodel. Этот файл можно просмотреть в интерфейсе MLflow или скачать и изучить.

artifact_path: model
flavors:
  python_function:
    data: model.xgb
    env:
      conda: conda.yaml
      virtualenv: python_env.yaml
    loader_module: mlflow.xgboost
    python_version: 3.11.4
  xgboost:
    code: null
    data: model.xgb
    model_class: xgboost.sklearn.XGBClassifier
    model_format: xgb
    xgb_version: 2.0.3
mlflow_version: 2.14.2
model_size_bytes: 35040
model_uuid: 516954aae7c94e91adeed9df76cb4052
run_id: bf212703d1874eee9dcb37c8a92a6de6
saved_input_example_info:
  artifact_path: input_example.json
  pandas_orient: split
  type: dataframe
signature:
  inputs: '[{"type": "double", "name": "mean radius", "required": true}, {"type":
    "double", "name": "mean texture", "required": true}, {"type": "double", "name":
    "mean perimeter", "required": true}, {"type": "double", "name": "mean area", "required":
    true}, {"type": "double", "name": "mean smoothness", "required": true}, {"type":
    "double", "name": "mean compactness", "required": true}, {"type": "double", "name":
    "mean concavity", "required": true}, {"type": "double", "name": "mean concave
    points", "required": true}, {"type": "double", "name": "mean symmetry", "required":
    true}, {"type": "double", "name": "mean fractal dimension", "required": true},
    {"type": "double", "name": "radius error", "required": true}, {"type": "double",
    "name": "texture error", "required": true}, {"type": "double", "name": "perimeter
    error", "required": true}, {"type": "double", "name": "area error", "required":
    true}, {"type": "double", "name": "smoothness error", "required": true}, {"type":
    "double", "name": "compactness error", "required": true}, {"type": "double", "name":
    "concavity error", "required": true}, {"type": "double", "name": "concave points
    error", "required": true}, {"type": "double", "name": "symmetry error", "required":
    true}, {"type": "double", "name": "fractal dimension error", "required": true},
    {"type": "double", "name": "worst radius", "required": true}, {"type": "double",
    "name": "worst texture", "required": true}, {"type": "double", "name": "worst
    perimeter", "required": true}, {"type": "double", "name": "worst area", "required":
    true}, {"type": "double", "name": "worst smoothness", "required": true}, {"type":
    "double", "name": "worst compactness", "required": true}, {"type": "double", "name":
    "worst concavity", "required": true}, {"type": "double", "name": "worst concave
    points", "required": true}, {"type": "double", "name": "worst symmetry", "required":
    true}, {"type": "double", "name": "worst fractal dimension", "required": true},
    {"type": "double", "name": "additional_feature", "required": true}]'
  outputs: '[{"type": "long", "required": true}]'
  params: null
utc_time_created: '2024-07-30 08:38:11.745511'

Файл MLmodel содержит информациб по оберткам/вкусам (flavours), здесь это pyfunc и xgboost. Также он включает настройки окружения для conda (conda.yaml) и virtualenv (python_env.yaml). Модель является классификатором XGBoost, совместимым с API sklearn, сохранённым в формате XGBoost версии 2.0.3. Файл отслеживает такие детали, как размер модели, UUID, run ID и время создания. Также есть пример входных данных, для модели и её сигнатуроа - спецификация входа и выхода. Сигнатура может быть создана и сохранена вручную вместе с моделью, но MLflow автоматически генерирует сигнатуру при предоставлении примера входа.

В экосистеме MLflow, flavors (вкусы) — это обертки для конкретных библиотек машинного обучения, которые позволяют сохранять, логировать и извлекать модели в едином формате. Это обеспечивает единообразное поведение метода предсказания (predict) для различных фреймворков, упрощая управление и развертывание моделей.

Добавление предсказаний для примера входных данных может быть полезным, так как позволяет сразу протестировать модель после её загрузки и убедиться в её корректной работе в различных окружениях.

Мы сохранили две версии модели, обе с использованием xgboost, каждая из которых имеет два "flavors": python_function и xgboost. Разница заключается в классе модели: для бустера это xgboost.core.Booster, а для модели это xgboost.sklearn.XGBClassifier, который поддерживает API, совместимый с scikit-learn. Эти различия влияют на работу метода predict, поэтому важно просмотреть файл MLmodel и проверить сигнатуру модели перед использованием. Также могут быть небольшие различия в производительности python_function обычно немного медленее.

При загрузке модели бустера с использованием xgboost, модель ожидает, что входные данные будут в форме объекта DMatrix, и метод predict в нашем случае будет выдавать оценки (а не классы).

Built-in evaluation

Встроенная функция mlflow.evaluate позволяет оценивать модели на дополнительных наборах данных:

import sys
import os
import argparse
import warnings
import logging
import mlflow
import pandas as pd
from loguru import logger


logger.remove()
logger.add(sys.stdout, format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}")
warnings.filterwarnings('ignore')
logging.getLogger('mlflow').setLevel(logging.ERROR)


if __name__ == '__main__':

    logger.info('Evaluation started')

    parser = argparse.ArgumentParser()
    parser.add_argument("--eval-dataset", type=str)
    eval_dataset = pd.read_csv(parser.parse_args().eval_dataset)
        
    with mlflow.start_run() as run:
        
        eval_dataset = mlflow.data.from_pandas(
            eval_dataset, targets="target"
        )
        last_version = mlflow.MlflowClient().get_registered_model('CancerModel').latest_versions[0].version
        mlflow.evaluate(
            data=eval_dataset, model_type="classifier", model=f'models:/CancerModel/{last_version}'
        )
        logger.success('Evaluation finished')
mlflow run . --entry-point data-evaluation --env-manager conda --experiment-name Cancer_Classification --run-name Data_Evaluation -P eval-dataset='test.csv'

Результаты можно просмотреть в интерфейсе MLflow, где представлены различные метрики и графики, включая ROC-AUC, матрицы ошибок и графики SHAP (если SHAP установлен).

Model Serving

MLflow имеет встроенные возможности для деплоя моделей. Деплой модели с использованием Flask довольно просто сделать: mlflow models serve -m models:/CancerModel/1 --env-manager local. Но мы также можем использовать mlserver.

Пакет mlserver облегчает эффективное развертывание и обслуживание моделей машинного обучения с поддержкой множества фреймворков, используя интерфейсы REST и gRPC. Он интегрируется с Seldon Core для масштабируемого и надёжного управления моделями и мониторинга.

Возможно, вам потребуется установить следующие пакеты: mlserver, mlserver-mlflow, mlserver-xgboost, если вы используете собственное окружение. После этого мы можем настроить конфигурационный файл (model-settings.json) для MLServer. Это позволяет нам изменить работу API; здесь мы просто настраиваем алиас для модели:

{
    "name": "cancer-model",
    "implementation": "mlserver_mlflow.MLflowRuntime",
    "parameters": {
        "uri": "models:/CancerModel/1"
    }
}

Чтобы запустить MLServer в локальном окружении, можно использовать команду mlserver start .

Теперь у нас есть рабочий API с документацией OpenAPI, валидацией запросов, HTTP и gRPC серверами и эндпоинтом с метриками для prometheus. И всё это без написания кода, все что нужноя - простая конфигурацию в формате JSON.

Мы можем проверить документацию для нашей модели и изучить ожидаемую структуру данных через Swagger по адресу /v2/models/cancer/model/docs

Мы можем получить доступ к эндпоинту с метриками или настроить prometheus для их сбора

Теперь можно отправлять запросы к модели:

import requests
import json

url = "http://127.0.0.1:8080/invocations"

# convert df do split format and then to json
input_data = json.dumps({
    "params": {
      'method': 'proba',  
    },
    'dataframe_split': {
        "columns": test.columns.tolist(),
        "data": test.values.tolist()
    }
})

# Send a POST request to the MLflow model server
response = requests.post(url, data=input_data, headers={"Content-Type": "application/json"})

if response.status_code == 200:
    prediction = response.json()
    print("Prediction:", prediction)
else:
    print("Error:", response.status_code, response.text)
Prediction: {'predictions': [1, 1, 1, 0, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 1, 0, 0, 1, 1, 1, 1, 0, 0, 0, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 1, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 0, 0, 1, 0, 1, 1, 1, 1, 0, 1, 0, 1, 1, 1, 1, 1, 0, 0, 1, 0, 1, 1, 1, 0, 1, 1, 1, 1, 1, 0, 0, 1, 0, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 0, 0, 1, 0, 0, 0, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 0, 0, 0, 1, 0, 0, 1, 1, 0, 1, 0, 1, 0, 1, 1, 1, 1, 0, 1, 0, 1, 0, 1, 0, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 0, 1, 1, 1, 0, 1, 0, 0, 0, 1, 1, 0, 1, 0, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 1, 1, 1, 1, 1, 0, 1, 0, 0, 0, 1, 1, 1, 0, 1, 1]}

Customize model

Мы можем настроить нашу модель для предоставления вероятностей или включения дополнительного логирования. Для этого сначала скачаем модель, а затем обернем её в пользовательскую обертку:

import mlflow
import mlflow.xgboost
import mlflow.pyfunc

# Step 1: Download the Existing Model from MLflow
model_uri = "models:/CancerModel/1"
model = mlflow.xgboost.load_model(model_uri)


# Step 2: Define the Custom PyFunc Model with `loguru` Setup in `load_context`
class CustomPyFuncModel(mlflow.pyfunc.PythonModel):
    
    def __init__(self, model):
        self.model = model
        
    def get_logger(self):
        from loguru import logger
        logger.remove()
        logger.add("mlserve/mlserver_logs.log", format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}")
        return logger
        
    def load_context(self, context):
        self.logger = self.get_logger()

    def predict(self, context, model_input):
        
        self.logger.info(f"start request")
        self.logger.info(f"batch size: {len(model_input)}")
        
        predict =  self.model.predict_proba(model_input)[:,1]
        
        self.logger.success(f"Finish request")
        
        return predict
        

# Step 3: Save the Wrapped Model Back to MLflow
with mlflow.start_run() as run:
    mlflow.pyfunc.log_model(
        artifact_path="custom_model",
        python_model=CustomPyFuncModel(model),
        registered_model_name="CustomCancerModel",
    )

Теперь перепишем конфигурационный файл и заново запустим mlserver:

{
    "name": "cancer-model",
    "implementation": "mlserver_mlflow.MLflowRuntime",
    "parameters": {
        "uri": "models:/CustomCancerModel/1"
    }
}

Отправим запросы к новому API:

# Send a POST request to the MLflow model server
response = requests.post(url, data=input_data, headers={"Content-Type": "application/json"})

if response.status_code == 200:
    prediction = response.json()
    print("Prediction:", prediction['predictions'][:10])
else:
    print("Error:", response.status_code, response.text)
Prediction: [0.9406537413597107, 0.9998677968978882, 0.9991995692253113, 0.00043031785753555596, 0.9973010420799255, 0.9998010993003845, 0.9995433688163757, 0.9998323917388916, 0.0019207964651286602, 0.0004339178267400712]

Теперь мы получили, веротяности, также можно проверить что дополнительные логи записаны в файл:

2024-07-30 12:02:38 | INFO | start request
2024-07-30 12:02:38 | INFO | batch size: 188
2024-07-30 12:02:38 | SUCCESS | Finish request

Альтернативой этому подходу является написание нового API с нуля. Это может быть лучше, когда нам нужна большая гибкость, дополнительная функциональность или когда мы ограничены во времени, так как использование MLServer может быть немного медленнее.

В случае периодического переобучения модели нам нужно обновлять модель в сервисе или перезапускать деплой в открытой версии MLflow я не нашел такого функционала. Версия Databricks включает функцию вебхуков, которая позволяет MLflow уведомлять API о новых версиях. Другим вариантом является запуск деплоя при обновлении модели. Мы также можем открыть дополнительный эндпоинт в сервере и вызывать его в рамках DAG, или сконфигурирвовать сервер периодически запрашивать обновления в MLflow.

MLflow Tracking Server

MLflow Local Setup

До этого мы работали с MLflow локально: метаданные и артефакты хранятся в папке по умолчанию mlruns. Вы можете проверить эту папку у себя, если успешно выполнили все предыдущие шаги. Мы можем изменить место хранения метаданных MLflow, указав другой backend-store-uri при запуске команды MLflow UI. Например, чтобы использовать другую папку (mlruns_new), выполните следующие действия: mlflow ui --backend-store-uri ./mlruns_new И поменяйте tracking uri в проекте: mlflow.set_tracking_uri("file:./mlruns_new").

Remote Tracking

В продакшене мы обычно настраиваем удалённый сервер отслеживания с хранилищем артефактов и базой данных для метаданных MLflow. Мы можем симулировать эту конфигурацию, используя MinIO для хранения артефактов и PostgreSQL для базы данных. Вот простой файл Docker Compose с 4 сервсиами:

  1. MinIO как s3 подобное хранилище

  2. Клиент MinIO (minio/mc) для создания бакета для MLflow

  3. PostgreSQL в качестве базы данных для метаинформации

  4. MLflow UI

services:

  s3:
    image: minio/minio
    container_name: mlflow_s3
    ports:
      - 9000:9000
      - 9001:9001
    command: server /data --console-address ':9001'
    environment:
      - MINIO_ROOT_USER=mlflow
      - MINIO_ROOT_PASSWORD=password
    volumes:
      - minio_data:/data

  init_s3:
    image: minio/mc
    depends_on:
      - s3
    entrypoint: >
      /bin/sh -c "
      until (/usr/bin/mc alias set minio http://s3:9000 mlflow password) do echo '...waiting...' && sleep 1; done;
      /usr/bin/mc mb minio/mlflow;
      exit 0;
      "

  postgres:
    image: postgres:latest
    ports:
      - 5432:5432
    environment:
      - POSTGRES_USER=mlflow
      - POSTGRES_PASSWORD=password
    volumes:
      - postgres_data:/var/lib/postgresql/data

  mlflow:
    build:
      context: .
      dockerfile: Dockerfile
    ports:
      - 5050:5000
    environment:
      - MLFLOW_S3_ENDPOINT_URL=http://s3:9000
      - AWS_ACCESS_KEY_ID=mlflow
      - AWS_SECRET_ACCESS_KEY=password
    command: >
      mlflow server
        --backend-store-uri postgresql://mlflow:password@postgres:5432/mlflow
        --default-artifact-root s3://mlflow/
        --artifacts-destination s3://mlflow/
        --host 0.0.0.0
    depends_on:
      - s3
      - postgres
      - init_s3

volumes:
  postgres_data:
  minio_data:

Вызовете эти команды для создания имэджа и запуска контенеров docker compose -f tracking_server/docker-compose.yml build and docker compose -f tracking_server/docker-compose.yml up.

Вместо обработки артефактов MLflow предоставляет ссылку клиенту, позволяя сохранять и загружать артефакты напрямую из хранилища артефактов. Поэтому вам нужно настроить ключи доступа и хост сервера отслеживания, чтобы правильно логировать артефакты.

import os
import mlflow

os.environ['AWS_ACCESS_KEY_ID'] = 'mlflow'
os.environ['AWS_SECRET_ACCESS_KEY'] = 'password'
os.environ['MLFLOW_TRACKING_URI'] = 'http://localhost:5050'
os.environ['MLFLOW_S3_ENDPOINT_URL'] = 'http://localhost:9000'

# run data preprocessing step one more time
mlflow.run(
    uri = '.',
    entry_point = 'data-preprocessing',
    env_manager='local',
    experiment_name='Cancer_Classification',
    run_name='Data_Preprocessing',
    parameters={'test-size': 0.5},
)

После этого шага мы можем зайти в MLflow UI, чтобы убедиться, что запуск и артефакты были успешно зарегистрированы:

И убедится через интерфейс MinIO, что артефакты были успешно сохранены в бакете:

И также выполнить запрос к нашей базе данных PostgreSQL, чтобы убедиться, что она используется для хранения метаданных:

import psycopg2
import pandas as pd

conn = psycopg2.connect(dbname='mlflow', user='mlflow', password='password', host='localhost', port='5432')
try:
    query = "SELECT experiment_id, name FROM experiments"
    experiments_df = pd.read_sql(query, conn)
except Exception as e:
    print(e)
else:
    print(experiments_df)
finally:
    conn.close()

В продакшене может потребоваться аутентификация, разные уровни доступа пользователей и мониторинг для MLflow, бакета, базы данных и конкретных артефактов. Мы не будем охватывать эти аспекты здесь, и некоторые из них не имеют встроенной поддержки в MLflow.

Мы рассмотрели как можно использовать MLflow как локально для себя, так и для совасместной работы над проектами, кратко прошлись по основному функционалу и использовали его на примерах. Вы можете посмотреть полный код в этом репозитории, также можно посомтреть документацию Mlflow.

Tags:
Hubs:
Total votes 12: ↑12 and ↓0+18
Comments4

Articles

Information

Website
ods.ai
Registered
Founded
Employees
5,001–10,000 employees
Location
Россия