Source code for mlrun.model_monitoring.applications.base

# Copyright 2023 Iguazio
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from abc import ABC, abstractmethod
from typing import Any, Union, cast

import numpy as np
import pandas as pd

import mlrun
import mlrun.model_monitoring.applications.context as mm_context
import mlrun.model_monitoring.applications.results as mm_results
from mlrun.serving.utils import StepToDict


[docs]class ModelMonitoringApplicationBaseV2(StepToDict, ABC): """ A base class for a model monitoring application. Inherit from this class to create a custom model monitoring application. example for very simple custom application:: class MyApp(ApplicationBase): def do_tracking( self, monitoring_context: mm_context.MonitoringApplicationContext, ) -> ModelMonitoringApplicationResult: monitoring_context.log_artifact( TableArtifact( "sample_df_stats", df=self.dict_to_histogram(sample_df_stats) ) ) return ModelMonitoringApplicationResult( name="data_drift_test", value=0.5, kind=mm_constant.ResultKindApp.data_drift, status=mm_constant.ResultStatusApp.detected, ) """ kind = "monitoring_application" def do( self, monitoring_context: mm_context.MonitoringApplicationContext ) -> tuple[ list[ Union[ mm_results.ModelMonitoringApplicationResult, mm_results.ModelMonitoringApplicationMetric, ] ], mm_context.MonitoringApplicationContext, ]: """ Process the monitoring event and return application results & metrics. :param monitoring_context: (MonitoringApplicationContext) The monitoring application context. :returns: A tuple of: [0] = list of application results that can be either from type `ModelMonitoringApplicationResult` or from type `ModelMonitoringApplicationResult`. [1] = the original application event, wrapped in `MonitoringApplicationContext` object """ results = self.do_tracking(monitoring_context=monitoring_context) if isinstance(results, dict): results = [ mm_results.ModelMonitoringApplicationMetric(name=key, value=value) for key, value in results.items() ] results = results if isinstance(results, list) else [results] return results, monitoring_context
[docs] @abstractmethod def do_tracking( self, monitoring_context: mm_context.MonitoringApplicationContext, ) -> Union[ mm_results.ModelMonitoringApplicationResult, list[ Union[ mm_results.ModelMonitoringApplicationResult, mm_results.ModelMonitoringApplicationMetric, ] ], dict[str, Any], ]: """ Implement this method with your custom monitoring logic. :param monitoring_context: (MonitoringApplicationContext) The monitoring context to process. :returns: (ModelMonitoringApplicationResult) or (list[Union[ModelMonitoringApplicationResult, ModelMonitoringApplicationMetric]]) or dict that contains the application metrics only (in this case the name of each metric name is the key and the metric value is the corresponding value). """ raise NotImplementedError
[docs]class ModelMonitoringApplicationBase(StepToDict, ABC): """ A base class for a model monitoring application. Inherit from this class to create a custom model monitoring application. example for very simple custom application:: class MyApp(ApplicationBase): def do_tracking( self, sample_df_stats: mlrun.common.model_monitoring.helpers.FeatureStats, feature_stats: mlrun.common.model_monitoring.helpers.FeatureStats, start_infer_time: pd.Timestamp, end_infer_time: pd.Timestamp, schedule_time: pd.Timestamp, latest_request: pd.Timestamp, endpoint_id: str, output_stream_uri: str, ) -> ModelMonitoringApplicationResult: self.context.log_artifact( TableArtifact( "sample_df_stats", df=self.dict_to_histogram(sample_df_stats) ) ) return ModelMonitoringApplicationResult( name="data_drift_test", value=0.5, kind=mm_constant.ResultKindApp.data_drift, status=mm_constant.ResultStatusApp.detected, ) """ kind = "monitoring_application" def do( self, monitoring_context: mm_context.MonitoringApplicationContext ) -> tuple[ list[mm_results.ModelMonitoringApplicationResult], mm_context.MonitoringApplicationContext, ]: """ Process the monitoring event and return application results. :param monitoring_context: (MonitoringApplicationContext) The monitoring context to process. :returns: A tuple of: [0] = list of application results that can be either from type `ModelMonitoringApplicationResult` or from type `ModelMonitoringApplicationResult`. [1] = the original application event, wrapped in `MonitoringApplicationContext` object """ resolved_event = self._resolve_event(monitoring_context) if not ( hasattr(self, "context") and isinstance(self.context, mlrun.MLClientCtx) ): self._lazy_init(monitoring_context) results = self.do_tracking(*resolved_event) results = results if isinstance(results, list) else [results] return results, monitoring_context def _lazy_init(self, monitoring_context: mm_context.MonitoringApplicationContext): self.context = cast(mlrun.MLClientCtx, monitoring_context)
[docs] @abstractmethod def do_tracking( self, application_name: str, sample_df_stats: pd.DataFrame, feature_stats: pd.DataFrame, sample_df: pd.DataFrame, start_infer_time: pd.Timestamp, end_infer_time: pd.Timestamp, latest_request: pd.Timestamp, endpoint_id: str, output_stream_uri: str, ) -> Union[ mm_results.ModelMonitoringApplicationResult, list[mm_results.ModelMonitoringApplicationResult], ]: """ Implement this method with your custom monitoring logic. :param application_name: (str) the app name :param sample_df_stats: (pd.DataFrame) The new sample distribution. :param feature_stats: (pd.DataFrame) The train sample distribution. :param sample_df: (pd.DataFrame) The new sample DataFrame. :param start_infer_time: (pd.Timestamp) Start time of the monitoring schedule. :param end_infer_time: (pd.Timestamp) End time of the monitoring schedule. :param latest_request: (pd.Timestamp) Timestamp of the latest request on this endpoint_id. :param endpoint_id: (str) ID of the monitored model endpoint :param output_stream_uri: (str) URI of the output stream for results :returns: (ModelMonitoringApplicationResult) or (list[ModelMonitoringApplicationResult]) of the application results. """ raise NotImplementedError
@classmethod def _resolve_event( cls, monitoring_context: mm_context.MonitoringApplicationContext, ) -> tuple[ str, pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.Timestamp, pd.Timestamp, pd.Timestamp, str, str, ]: """ Converting the event into a single tuple that will be used for passing the event arguments to the running application :param monitoring_context: (MonitoringApplicationContext) The monitoring context to process. :return: A tuple of: [0] = (str) application name [1] = (pd.DataFrame) current input statistics [2] = (pd.DataFrame) train statistics [3] = (pd.DataFrame) current input data [4] = (pd.Timestamp) start time of the monitoring schedule [5] = (pd.Timestamp) end time of the monitoring schedule [6] = (pd.Timestamp) timestamp of the latest request [7] = (str) endpoint id [8] = (str) output stream uri """ return ( monitoring_context.application_name, cls.dict_to_histogram(monitoring_context.sample_df_stats), cls.dict_to_histogram(monitoring_context.feature_stats), monitoring_context.sample_df, monitoring_context.start_infer_time, monitoring_context.end_infer_time, monitoring_context.latest_request, monitoring_context.endpoint_id, monitoring_context.output_stream_uri, )
[docs] @staticmethod def dict_to_histogram( histogram_dict: mlrun.common.model_monitoring.helpers.FeatureStats, ) -> pd.DataFrame: """ Convert histogram dictionary to pandas DataFrame with feature histograms as columns :param histogram_dict: Histogram dictionary :returns: Histogram dataframe """ # Create a dictionary with feature histograms as values histograms = {} for feature, stats in histogram_dict.items(): if "hist" in stats: # Normalize to probability distribution of each feature histograms[feature] = np.array(stats["hist"][0]) / stats["count"] # Convert the dictionary to pandas DataFrame histograms = pd.DataFrame(histograms) return histograms