Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Copying mlmonitoring directory in mlflow-ubuntu20.04-py39-cpu-inference environment #3576

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ ENV AML_APP_ROOT="/var/mlflow_resources"
ENV AZUREML_ENTRY_SCRIPT="mlflow_score_script.py"

USER root
# Copying of mlmonitoring will add once testing is completed.
# COPY mlmonitoring /var/mlflow_resources/mlmonitoring

COPY mlmonitoring /var/mlflow_resources/mlmonitoring

# We'll copy the HF scripts as well to enable better handling for v2 packaging. This will not require changes to the
# packages installed in the image, as the expectation is that these will all be brought along with the model.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
"""For init."""
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

from .collector import Collector

__all__ = ["Collector"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
"""For collector."""
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

from typing import Callable, Any

from .collector_json import JsonCollector
from .context import CorrelationContext


class Collector:
"""For collector class."""

def __init__(
self,
*,
name: str,
on_error: Callable[[Exception], Any] = None
):
"""For init."""
self._impl = JsonCollector(name=name, on_error=on_error)

def collect(
self,
data, # supported type: Union[pd.DataFrame]
correlation_context: CorrelationContext = None) -> CorrelationContext:
"""For collect."""
return self._impl.collect(data, correlation_context)
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""For Collector base."""
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

import logging

from .init import init, is_sdk_ready
from .config import get_config
from .context import CorrelationContext, get_context_wrapper


class CollectorBase:
"""For CollectorBase."""

def __init__(
self,
model_version: str):
"""For init."""
if not is_sdk_ready():
init(model_version)

self.logger = logging.getLogger("mdc.collector")
self.config = get_config()

def _response(
self,
context: CorrelationContext,
success: bool,
message: str) -> CorrelationContext:
"""For response."""
if self.config.is_debug():
return get_context_wrapper(context, success, message)

return context
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
"""For collector json."""
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

import sys
import random
from typing import Callable, Any

from .payload import PandasFrameData
from .payload.payload import build_payload
from .queue import get_queue

from .collector_base import CollectorBase
from .context import CorrelationContext, get_context

try:
import pandas as pd
except ImportError:
pass


def _build_log_data_by_type(data):
"""For build log data by type."""
if 'pandas' in sys.modules and isinstance(data, pd.DataFrame):
return PandasFrameData(data)

raise TypeError("data type (%s) not supported, "
"supported types: pandas.DataFrame"
% type(data).__name__)


def _raise_if_exception(e: Exception):
"""For raise if exception."""
raise e


class JsonCollector(CollectorBase):
"""For JsonCollector."""

def __init__(
self,
*,
name: str,
on_error: Callable[[Exception], Any] = None
):
"""For init."""
super().__init__("default")
self.name = name
if on_error:
self.on_error = on_error
else:
self.on_error = _raise_if_exception

self._validate_mdc_config()

def _validate_mdc_config(self):
"""For validate mdc config."""
if not self.name or len(self.name) <= 0:
# unexpected drop
msg = "collection name is not provided"
self.on_error(Exception(msg))
return False, msg

config = self.config
if config is None:
# unexpected drop
msg = "data collector is not initialized"
self.on_error(Exception(msg))
return False, msg

if not config.enabled():
# unexpected drop
msg = "custom logging is not enabled, drop the data"
self.on_error(Exception(msg))
return False, msg

if not config.collection_enabled(self.name):
# unexpected drop
msg = "collection {} is not enabled, drop the data".format(self.name)
self.on_error(Exception(msg))
return False, msg

return True, None

def collect(
self,
data, # supported type: Union[pd.DataFrame]
correlation_context: CorrelationContext = None) -> CorrelationContext:
"""For collect."""
if correlation_context is None:
correlation_context = get_context()

success, msg = self._validate_mdc_config()

if not success:
return self._response(correlation_context, False, msg)

config = self.config

percentage = config.collection_sample_rate_percentage(self.name)

if percentage < 100:
if percentage <= random.random() * 100.0:
# expected drop
self.logger.debug("sampling not hit, drop the data")
# TBD: send empty message to mdc to collect metrics of dropped messages?
return self._response(correlation_context, False, "dropped_sampling")

try:
# build payload and put into payload queue
log_data = _build_log_data_by_type(data)
except TypeError as e:
# unexpected drop
self.on_error(e)
return self._response(correlation_context, False, e.args[0])

payload = build_payload(
self.name,
data=log_data,
model_version=config.model_version(),
context=correlation_context)

success, msg = get_queue().enqueue(payload)

if not success:
# unexpected drop
self.on_error(Exception(msg))
return self._response(correlation_context, success, msg)
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""For init."""
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

__version__ = "0.1.0"
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
"""For init."""
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

from .config import init_config, teardown_config, is_debug, get_config

__all__ = ["init_config", "teardown_config", "is_debug", "get_config"]
Loading
Loading