Source code for mlrun.model_monitoring.applications.context

# Copyright 2024 Iguazio
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import socket
from typing import Any, Optional, Protocol, cast

import nuclio.request
import numpy as np
import pandas as pd

import mlrun.common.constants as mlrun_constants
import mlrun.common.schemas.model_monitoring.constants as mm_constants
import mlrun.errors
import mlrun.feature_store as fstore
import mlrun.feature_store.feature_set as fs
import mlrun.features
import mlrun.serving
import mlrun.utils
from mlrun.artifacts import Artifact, DatasetArtifact, ModelArtifact, get_model
from mlrun.common.model_monitoring.helpers import FeatureStats
from mlrun.common.schemas import ModelEndpoint
from mlrun.model_monitoring.helpers import (
    calculate_inputs_statistics,
)


class _ArtifactsLogger(Protocol):
    """
    Classes that implement this protocol are :code:`MlrunProject` and :code:`MLClientCtx`.
    """

    def log_artifact(self, *args, **kwargs) -> Artifact: ...
    def log_dataset(self, *args, **kwargs) -> DatasetArtifact: ...


[docs] class MonitoringApplicationContext: _logger_name = "monitoring-application" def __init__( self, *, application_name: str, event: dict[str, Any], project: "mlrun.MlrunProject", artifacts_logger: _ArtifactsLogger, logger: mlrun.utils.Logger, nuclio_logger: nuclio.request.Logger, model_endpoint_dict: Optional[dict[str, ModelEndpoint]] = None, sample_df: Optional[pd.DataFrame] = None, feature_stats: Optional[FeatureStats] = None, feature_sets_dict: Optional[dict[str, fs.FeatureSet]] = None, ) -> None: """ The :code:`MonitoringApplicationContext` object holds all the relevant information for the model monitoring application, and can be used for logging artifacts and messages. The monitoring context has the following attributes: :param application_name: (str) The model monitoring application name. :param project: (:py:class:`~mlrun.projects.MlrunProject`) The current MLRun project object. :param project_name: (str) The project name. :param logger: (:py:class:`~mlrun.utils.Logger`) MLRun logger. :param nuclio_logger: (nuclio.request.Logger) Nuclio logger. :param sample_df_stats: (FeatureStats) The new sample distribution dictionary. :param feature_stats: (FeatureStats) The train sample distribution dictionary. :param sample_df: (pd.DataFrame) The new sample DataFrame. :param start_infer_time: (pd.Timestamp) Start time of the monitoring schedule. :param end_infer_time: (pd.Timestamp) End time of the monitoring schedule. :param endpoint_id: (str) ID of the monitored model endpoint :param feature_set: (FeatureSet) the model endpoint feature set :param endpoint_name: (str) Name of the monitored model endpoint :param output_stream_uri: (str) URI of the output stream for results :param model_endpoint: (ModelEndpoint) The model endpoint object. :param feature_names: (list[str]) List of models feature names. :param label_names: (list[str]) List of models label names. :param model: (tuple[str, ModelArtifact, dict]) The model file, model spec object, and a list of extra data items. """ self.application_name = application_name self.project = project self.project_name = project.name self._artifacts_logger = artifacts_logger # MLRun Logger self.logger = logger # Nuclio logger - `nuclio.request.Logger`. # Note: this logger accepts keyword arguments only in its `_with` methods, e.g. `info_with`. self.nuclio_logger = nuclio_logger # event data self.start_infer_time = pd.Timestamp( cast(str, event.get(mm_constants.ApplicationEvent.START_INFER_TIME)) ) self.end_infer_time = pd.Timestamp( cast(str, event.get(mm_constants.ApplicationEvent.END_INFER_TIME)) ) self.endpoint_id = cast( str, event.get(mm_constants.ApplicationEvent.ENDPOINT_ID) ) self.endpoint_name = cast( str, event.get(mm_constants.ApplicationEvent.ENDPOINT_NAME) ) self._feature_stats: Optional[FeatureStats] = feature_stats self._sample_df_stats: Optional[FeatureStats] = None # Default labels for the artifacts self._default_labels = self._get_default_labels() # Persistent data - fetched when needed self._sample_df: Optional[pd.DataFrame] = sample_df self._model_endpoint: Optional[ModelEndpoint] = ( model_endpoint_dict.get(self.endpoint_id) if model_endpoint_dict else None ) self._feature_set: Optional[fs.FeatureSet] = ( feature_sets_dict.get(self.endpoint_id) if feature_sets_dict else None ) store, _, _ = mlrun.store_manager.get_or_create_store( mlrun.mlconf.artifact_path ) self.storage_options = store.get_storage_options() @classmethod def _from_ml_ctx( cls, context: "mlrun.MLClientCtx", *, project: Optional["mlrun.MlrunProject"] = None, application_name: str, event: dict[str, Any], model_endpoint_dict: Optional[dict[str, ModelEndpoint]] = None, sample_df: Optional[pd.DataFrame] = None, feature_stats: Optional[FeatureStats] = None, ) -> "MonitoringApplicationContext": project = project or context.get_project_object() if not project: raise mlrun.errors.MLRunValueError("Could not load project from context") logger = context.logger artifacts_logger = context nuclio_logger = nuclio.request.Logger( level=mlrun.mlconf.log_level, name=cls._logger_name ) return cls( application_name=application_name, event=event, model_endpoint_dict=model_endpoint_dict, project=project, logger=logger, nuclio_logger=nuclio_logger, artifacts_logger=artifacts_logger, sample_df=sample_df, feature_stats=feature_stats, ) @classmethod def _from_graph_ctx( cls, graph_context: mlrun.serving.GraphContext, *, application_name: str, event: dict[str, Any], model_endpoint_dict: Optional[dict[str, ModelEndpoint]] = None, sample_df: Optional[pd.DataFrame] = None, feature_stats: Optional[FeatureStats] = None, feature_sets_dict: Optional[dict[str, fs.FeatureSet]] = None, ) -> "MonitoringApplicationContext": nuclio_logger = graph_context.logger artifacts_logger = graph_context.project_obj logger = mlrun.utils.create_logger( level=mlrun.mlconf.log_level, formatter_kind=mlrun.mlconf.log_formatter, name=cls._logger_name, ) return cls( application_name=application_name, event=event, project=graph_context.project_obj, model_endpoint_dict=model_endpoint_dict, logger=logger, nuclio_logger=nuclio_logger, artifacts_logger=artifacts_logger, sample_df=sample_df, feature_stats=feature_stats, feature_sets_dict=feature_sets_dict, ) def _get_default_labels(self) -> dict[str, str]: labels = { mlrun_constants.MLRunInternalLabels.runner_pod: socket.gethostname(), mlrun_constants.MLRunInternalLabels.producer_type: "model-monitoring-app", mlrun_constants.MLRunInternalLabels.app_name: self.application_name, mlrun_constants.MLRunInternalLabels.endpoint_id: self.endpoint_id, mlrun_constants.MLRunInternalLabels.endpoint_name: self.endpoint_name, } return {key: value for key, value in labels.items() if value is not None} def _add_default_labels(self, labels: Optional[dict[str, str]]) -> dict[str, str]: """Add the default labels to logged artifacts labels""" return (labels or {}) | self._default_labels @property def sample_df(self) -> pd.DataFrame: if self._sample_df is None: if ( self.endpoint_name is None or self.endpoint_id is None or pd.isnull(self.start_infer_time) or pd.isnull(self.end_infer_time) ): raise mlrun.errors.MLRunValueError( "You have tried to access `monitoring_context.sample_df`, but have not provided it directly " "through `sample_data`, nor have you provided the model endpoint's name, ID, and the start and " f"end times: `endpoint_name`={self.endpoint_name}, `endpoint_uid`={self.endpoint_id}, " f"`start`={self.start_infer_time}, and `end`={self.end_infer_time}. " "You can either provide the sample dataframe directly, the model endpoint's details and times, " "or adapt the application's logic to not access the sample dataframe." ) df = self.feature_set.to_dataframe( start_time=self.start_infer_time, end_time=self.end_infer_time, time_column=mm_constants.EventFieldType.TIMESTAMP, storage_options=self.storage_options, ) self._sample_df = df.reset_index(drop=True) return self._sample_df @property def model_endpoint(self) -> ModelEndpoint: if not self._model_endpoint: if self.endpoint_name is None or self.endpoint_id is None: raise mlrun.errors.MLRunValueError( "You have NOT provided the model endpoint's name and ID: " f"`endpoint_name`={self.endpoint_name} and `endpoint_id`={self.endpoint_id}, " "but you have tried to access `monitoring_context.model_endpoint` " "directly or indirectly in your application. You can either provide them, " "or adapt the application's logic to not access the model endpoint." ) self._model_endpoint = mlrun.db.get_run_db().get_model_endpoint( name=self.endpoint_name, project=self.project_name, endpoint_id=self.endpoint_id, feature_analysis=True, ) return self._model_endpoint @property def feature_set(self) -> fs.FeatureSet: if not self._feature_set and self.model_endpoint: self._feature_set = fstore.get_feature_set( self.model_endpoint.spec.monitoring_feature_set_uri ) return self._feature_set @property def feature_stats(self) -> FeatureStats: if not self._feature_stats: self._feature_stats = self.model_endpoint.spec.feature_stats return self._feature_stats @property def sample_df_stats(self) -> FeatureStats: """statistics of the sample dataframe""" if not self._sample_df_stats: self._sample_df_stats = calculate_inputs_statistics( self.feature_stats, self.sample_df ) return self._sample_df_stats @property def feature_names(self) -> list[str]: """The feature names of the model""" return self.model_endpoint.spec.feature_names @property def label_names(self) -> list[str]: """The label names of the model""" return self.model_endpoint.spec.label_names @property def model(self) -> tuple[str, ModelArtifact, dict]: """The model file, model spec object, and a list of extra data items""" return get_model(self.model_endpoint.spec.model_uri)
[docs] @staticmethod def dict_to_histogram(histogram_dict: FeatureStats) -> pd.DataFrame: """ Convert histogram dictionary to pandas DataFrame with feature histograms as columns :param histogram_dict: Histogram dictionary :returns: Histogram dataframe """ # Create a dictionary with feature histograms as values histograms = {} for feature, stats in histogram_dict.items(): if "hist" in stats: # Normalize to probability distribution of each feature histograms[feature] = np.array(stats["hist"][0]) / stats["count"] # Convert the dictionary to pandas DataFrame histograms = pd.DataFrame(histograms) return histograms
[docs] def log_artifact( self, item, body=None, tag: str = "", local_path: str = "", artifact_path: Optional[str] = None, format: Optional[str] = None, upload: Optional[bool] = None, labels: Optional[dict[str, str]] = None, target_path: Optional[str] = None, unique_per_endpoint: bool = True, **kwargs, ) -> Artifact: """ Log an artifact. .. caution:: Logging artifacts in every model monitoring window may cause scale issues. This method should be called on special occasions only. See :func:`~mlrun.projects.MlrunProject.log_artifact` for the full documentation, except for one new argument: :param unique_per_endpoint: by default ``True``, we will log different artifact for each model endpoint, set to ``False`` without changing item key will cause artifact override. """ labels = self._add_default_labels(labels) # By default, we want to log different artifact for each model endpoint endpoint_id = labels.get(mlrun_constants.MLRunInternalLabels.endpoint_id, "") if unique_per_endpoint and isinstance(item, str): item = f"{item}-{endpoint_id}" if endpoint_id else item elif unique_per_endpoint: # isinstance(item, Artifact) is True item.key = f"{item.key}-{endpoint_id}" if endpoint_id else item.key return self._artifacts_logger.log_artifact( item, body=body, tag=tag, local_path=local_path, artifact_path=artifact_path, format=format, upload=upload, labels=labels, target_path=target_path, **kwargs, )
[docs] def log_dataset( self, key, df, tag="", local_path=None, artifact_path=None, upload=None, labels=None, format="", preview=None, stats=None, target_path="", extra_data=None, label_column: Optional[str] = None, unique_per_endpoint: bool = True, **kwargs, ) -> DatasetArtifact: """ Log a dataset artifact. .. caution:: Logging datasets in every model monitoring window may cause scale issues. This method should be called on special occasions only. See :func:`~mlrun.projects.MlrunProject.log_dataset` for the full documentation, except for one new argument: :param unique_per_endpoint: by default ``True``, we will log different artifact for each model endpoint, set to ``False`` without changing item key will cause artifact override. """ labels = self._add_default_labels(labels) # By default, we want to log different artifact for each model endpoint endpoint_id = labels.get(mlrun_constants.MLRunInternalLabels.endpoint_id, "") if unique_per_endpoint and isinstance(key, str): key = f"{key}-{endpoint_id}" if endpoint_id else key return self._artifacts_logger.log_dataset( key, df, tag=tag, local_path=local_path, artifact_path=artifact_path, upload=upload, labels=labels, format=format, preview=preview, stats=stats, target_path=target_path, extra_data=extra_data, label_column=label_column, **kwargs, )