Source code for mlrun.platforms

# Copyright 2023 Iguazio
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import warnings
from pprint import pprint
from time import sleep
from typing import Optional

from .iguazio import (
    V3ioStreamClient,
    add_or_refresh_credentials,
    is_iguazio_session_cookie,
)


class _DeprecationHelper:
    """A helper class to deprecate old schemas"""

    def __init__(self, new_target: str, version="1.8.0"):
        self._new_target = new_target
        self._version = version

    def __call__(self, *args, **kwargs):
        self._warn()
        return self._lazy_load()(*args, **kwargs)

    def __getattr__(self, attr):
        self._warn()
        return getattr(self._lazy_load(), attr)

    def _lazy_load(self, *args, **kwargs):
        import mlrun.runtimes.mounts as mlrun_mounts

        return getattr(mlrun_mounts, self._new_target)

    def _warn(self):
        warnings.warn(
            f"mlrun.platforms.{self._new_target} is deprecated since version {self._version}, "
            f"and will be removed in 1.10. Use mlrun.runtimes.mounts.{self._new_target} instead.",
            FutureWarning,
        )


# TODO: Remove in 1.10
# For backwards compatibility
VolumeMount = _DeprecationHelper("VolumeMount")
auto_mount = _DeprecationHelper("auto_mount")
mount_configmap = _DeprecationHelper("mount_configmap")
mount_hostpath = _DeprecationHelper("mount_hostpath")
mount_pvc = _DeprecationHelper("mount_pvc")
mount_s3 = _DeprecationHelper("mount_s3")
mount_secret = _DeprecationHelper("mount_secret")
mount_v3io = _DeprecationHelper("mount_v3io")
set_env_variables = _DeprecationHelper("set_env_variables")
v3io_cred = _DeprecationHelper("v3io_cred")
# eof 'For backwards compatibility'


[docs] def watch_stream( url, shard_ids: Optional[list] = None, seek_to: Optional[str] = None, interval=None, is_json=False, **kwargs, ): """watch on a v3io stream and print data every interval example:: watch_stream("v3io:///users/admin/mystream") :param url: stream url :param shard_ids: range or list of shard IDs :param seek_to: where to start/seek ('EARLIEST', 'LATEST', 'TIME', 'SEQUENCE') :param interval: watch interval time in seconds, 0 to run once and return :param is_json: indicate the payload is json (will be deserialized) """ interval = 3 if interval is None else interval shard_ids = shard_ids or [0] if isinstance(shard_ids, int): shard_ids = [shard_ids] watchers = [ V3ioStreamClient(url, shard_id, seek_to, **kwargs) for shard_id in list(shard_ids) ] while True: for watcher in watchers: records = watcher.get_records() for record in records: print( f"{watcher.url}:{watcher.shard_id} (#{record.sequence_number}) >> " ) data = json.loads(record.data) if is_json else record.data.decode() pprint(data) if interval <= 0: break sleep(interval)