Legacy model monitoring#

Note

This legacy mode of model monitoring is currently supported only for the CE version of MLRun.

In this section

See also

Architecture#

The model monitoring process flow starts with collecting operational data. The operational data are converted to vectors, which are posted to the Model Server. The model server is then wrapped around a machine learning model that uses a function to calculate predictions based on the available vectors. Next, the model server creates a log for the input and output of the vectors, and the entries are written to the production data stream (a v3io stream). While the model server is processing the vectors, a Nuclio operation monitors the log of the data stream and is triggered when a new log entry is detected. The Nuclio function examines the log entry, processes it into statistics which are then written to the statistics databases (parquet file, time series database and key value database). The parquet files are written as a feature set under the model monitoring project. The parquet files can be read either using pandas.read_parquet or feature_set.get_offline_features, like any other feature set. In parallel, a scheduled MLRun job runs reading the parquet files, performing drift analysis. The drift analysis data is stored so that the user can retrieve it in the Iguazio UI or in a Grafana dashboard.

Monitoring is supported by Iguazio's streaming technology, and open-source integration with Kafka.

Architecture

Enabling model monitoring#

To see tracking results, model monitoring needs to be enabled in each model.

To utilize drift measurement, supply the train set in the training step.

Model activities can be tracked into a real-time stream and time-series DB. The monitoring data is used to create real-time dashboards, detect drift, and analyze performance.

To monitor a deployed model, apply set_tracking() on your serving function and specify the function spec attributes:

fn.set_tracking(stream_path, batch, sample, tracking_policy)

  • stream_path

    • Enterprise: the v3io stream path (e.g. v3io:///users/..)

    • CE: a valid Kafka stream (e.g. kafka://kafka.default.svc.cluster.local:9092)

  • sample — optional, sample every N requests

  • batch — optional, send micro-batches every N requests

  • tracking_policy — optional, model tracking configurations, such as setting the scheduling policy of the model monitoring batch job

If a serving function is configured for model-monitoring tracking, use this procedure to change the parameters of the tracking (for example changing the default_batch_intervals of the tracking_policy):

  1. Delete the "model-monitoring-batch" job function (can be found under ML functions).

  2. Delete the "model-monitoring-batch" schedule job (can be found under Jobs and Workflows -> Schedule).

  3. Redeploy the serving function with new model-monitoring tracking parameters.

Model monitoring demo#

Use the following code to test and explore model monitoring.

# Set project name
project_name = "demo-project"

Deploy model servers#

Use the following code to deploy a model server in the Iguazio instance.

import os
import pandas as pd
from sklearn.datasets import load_iris
import sys

import mlrun
from mlrun import import_function, get_dataitem, get_or_create_project
from mlrun.platforms import auto_mount

project = get_or_create_project(project_name, context="./")
project.set_model_monitoring_credentials(os.environ.get("V3IO_ACCESS_KEY"))

# Download the pre-trained Iris model
# We choose the correct model to avoid pickle warnings
suffix = (
    mlrun.__version__.split("-")[0].replace(".", "_")
    if sys.version_info[1] > 9
    else "3.9"
)
model_path = mlrun.get_sample_path(f"models/model-monitoring/model-{suffix}.pkl")

get_dataitem(model_path).download("model.pkl")

iris = load_iris()
train_set = pd.DataFrame(
    iris["data"],
    columns=["sepal_length_cm", "sepal_width_cm", "petal_length_cm", "petal_width_cm"],
)

# Import the serving function from the Function Hub
serving_fn = import_function("hub://v2_model_server", project=project_name).apply(
    auto_mount()
)

model_name = "RandomForestClassifier"

# Log the model through the projects API so that it is available through the feature store API
project.log_model(model_name, model_file="model.pkl", training_set=train_set)

# Add the model to the serving function's routing spec
serving_fn.add_model(
    model_name, model_path=f"store://models/{project_name}/{model_name}:latest"
)

# Enable model monitoring
serving_fn.set_tracking()

# Deploy the function
serving_fn.deploy()

Simulating requests#

Use the following code to simulate production data.

import json
from time import sleep
from random import choice, uniform

iris_data = iris["data"].tolist()

while True:
    data_point = choice(iris_data)
    serving_fn.invoke(
        f"v2/models/{model_name}/infer", json.dumps({"inputs": [data_point]})
    )
    sleep(uniform(0.2, 1.7))