diff --git a/launch/client.py b/launch/client.py index 5cabe9e6..db1d2ac0 100644 --- a/launch/client.py +++ b/launch/client.py @@ -344,74 +344,18 @@ def create_model_bundle_from_dirs( bundle. This is used to validate the response for the model bundle's endpoint. Note: If request_schema is specified, then response_schema must also be specified. """ - with open(requirements_path, "r", encoding="utf-8") as req_f: - requirements = req_f.read().splitlines() - - tmpdir = tempfile.mkdtemp() - try: - zip_path = os.path.join(tmpdir, "bundle.zip") - _zip_directories(zip_path, base_paths) - with open(zip_path, "rb") as zip_f: - data = zip_f.read() - finally: - shutil.rmtree(tmpdir) - - raw_bundle_url = self._upload_data(data) - - schema_location = None - if bool(request_schema) ^ bool(response_schema): - raise ValueError( - "If request_schema is specified, then response_schema must also be specified." - ) - if request_schema is not None and response_schema is not None: - model_definitions = get_model_definitions( - request_schema=request_schema, - response_schema=response_schema, - ) - model_definitions_encoded = json.dumps(model_definitions).encode() - schema_location = self._upload_data(model_definitions_encoded) - - bundle_metadata = { - "load_predict_fn_module_path": load_predict_fn_module_path, - "load_model_fn_module_path": load_model_fn_module_path, - } - - logger.info( - "create_model_bundle_from_dirs: raw_bundle_url=%s", - raw_bundle_url, - ) - payload = dict( - packaging_type="zip", - bundle_name=model_bundle_name, - location=raw_bundle_url, - bundle_metadata=bundle_metadata, - requirements=requirements, + self.create_model_bundle_from_dirs_v1( + model_bundle_name=model_bundle_name, + base_paths=base_paths, + requirements_path=requirements_path, env_params=env_params, - schema_location=schema_location, + load_predict_fn_module_path=load_predict_fn_module_path, + load_model_fn_module_path=load_model_fn_module_path, + app_config=app_config, + request_schema=request_schema, + response_schema=response_schema, ) - _add_app_config_to_bundle_create_payload(payload, app_config) - with ApiClient(self.configuration) as api_client: - api_instance = DefaultApi(api_client) - framework = ModelBundleFramework(env_params["framework_type"]) - env_params_copy = env_params.copy() - env_params_copy["framework_type"] = framework # type: ignore - env_params_obj = ModelBundleEnvironmentParams(**env_params_copy) # type: ignore - payload = dict_not_none( - env_params=env_params_obj, - location=raw_bundle_url, - name=model_bundle_name, - requirements=requirements, - packaging_type=ModelBundlePackagingType("zip"), - metadata=bundle_metadata, - app_config=payload.get("app_config"), - schema_location=schema_location, - ) - create_model_bundle_request = CreateModelBundleRequest(**payload) # type: ignore - api_instance.create_model_bundle_v1_model_bundles_post( - body=create_model_bundle_request, - skip_deserialization=True, - ) return ModelBundle(model_bundle_name) def create_model_bundle( # pylint: disable=too-many-statements @@ -515,129 +459,32 @@ def create_model_bundle( # pylint: disable=too-many-statements bundle. This is used to validate the response for the model bundle's endpoint. Note: If request_schema is specified, then response_schema must also be specified. """ - # TODO(ivan): remove `disable=too-many-branches` when get rid of `load_*` functions # pylint: disable=too-many-branches - check_args = [ - predict_fn_or_cls is not None, - load_predict_fn is not None and model is not None, - load_predict_fn is not None and load_model_fn is not None, - ] - - if sum(check_args) != 1: - raise ValueError( - "A model bundle consists of exactly {predict_fn_or_cls}, {load_predict_fn + model}, or {load_predict_fn + load_model_fn}." - ) - # TODO should we try to catch when people intentionally pass both model and load_model_fn as None? - - if requirements is None: - # TODO explore: does globals() actually work as expected? Should we use globals_copy instead? - requirements_inferred = find_packages_from_imports(globals()) - requirements = [ - f"{key}=={value}" - for key, value in requirements_inferred.items() - ] - logger.info( - "Using \n%s\n for model bundle %s", - requirements, - model_bundle_name, - ) - - # Prepare cloudpickle for external imports - if globals_copy: - for module in get_imports(globals_copy): - if module.__name__ == cloudpickle.__name__: - # Avoid recursion - # register_pickle_by_value does not work properly with itself - continue - cloudpickle.register_pickle_by_value(module) - - bundle: Union[ - Callable[[Any], Any], Dict[str, Any], None - ] # validate bundle - bundle_metadata = {} - # Create bundle - if predict_fn_or_cls: - bundle = predict_fn_or_cls - if inspect.isfunction(predict_fn_or_cls): - source_code = inspect.getsource(predict_fn_or_cls) - else: - source_code = inspect.getsource(predict_fn_or_cls.__class__) - bundle_metadata["predict_fn_or_cls"] = source_code - elif model is not None: - bundle = dict(model=model, load_predict_fn=load_predict_fn) - bundle_metadata["load_predict_fn"] = inspect.getsource( - load_predict_fn # type: ignore - ) - else: - bundle = dict( - load_model_fn=load_model_fn, load_predict_fn=load_predict_fn - ) - bundle_metadata["load_predict_fn"] = inspect.getsource( - load_predict_fn # type: ignore - ) - bundle_metadata["load_model_fn"] = inspect.getsource( - load_model_fn # type: ignore - ) - - serialized_bundle = cloudpickle.dumps(bundle) - raw_bundle_url = self._upload_data(data=serialized_bundle) - - schema_location = None - if bool(request_schema) ^ bool(response_schema): - raise ValueError( - "If request_schema is specified, then response_schema must also be specified." - ) - if request_schema is not None and response_schema is not None: - model_definitions = get_model_definitions( - request_schema=request_schema, - response_schema=response_schema, - ) - model_definitions_encoded = json.dumps(model_definitions).encode() - schema_location = self._upload_data(model_definitions_encoded) - - payload = dict( - packaging_type="cloudpickle", - bundle_name=model_bundle_name, - location=raw_bundle_url, - bundle_metadata=bundle_metadata, - requirements=requirements, - env_params=env_params, - schema_location=schema_location, - ) - - _add_app_config_to_bundle_create_payload(payload, app_config) - framework = ModelBundleFramework(env_params["framework_type"]) - env_params_copy = env_params.copy() - env_params_copy["framework_type"] = framework # type: ignore - env_params_obj = ModelBundleEnvironmentParams(**env_params_copy) # type: ignore - with ApiClient(self.configuration) as api_client: - api_instance = DefaultApi(api_client) - payload = dict_not_none( - env_params=env_params_obj, - location=raw_bundle_url, - name=model_bundle_name, - requirements=requirements, - packaging_type=ModelBundlePackagingType("cloudpickle"), - metadata=bundle_metadata, - app_config=app_config, - schema_location=schema_location, - ) - create_model_bundle_request = CreateModelBundleRequest(**payload) # type: ignore - api_instance.create_model_bundle_v1_model_bundles_post( - body=create_model_bundle_request, - skip_deserialization=True, - ) # resp["data"]["name"] should equal model_bundle_name # TODO check that a model bundle was created and no name collisions happened + self.create_model_bundle( + model_bundle_name=model_bundle_name, + env_params=env_params, + load_predict_fn=load_predict_fn, + predict_fn_or_cls=predict_fn_or_cls, + requirements=requirements, + model=model, + load_model_fn=load_model_fn, + bundle_url=bundle_url, + app_config=app_config, + globals_copy=globals_copy, + request_schema=request_schema, + response_schema=response_schema, + ) return ModelBundle(model_bundle_name) def create_model_endpoint( self, *, endpoint_name: str, - model_bundle: Union[ModelBundle, str], + model_bundle: Optional[Union[ModelBundle, str]] = None, cpus: int = 3, memory: str = "8Gi", storage: Optional[str] = None, @@ -661,8 +508,9 @@ def create_model_endpoint( endpoint_name: The name of the model endpoint you want to create. The name must be unique across all endpoints that you own. - model_bundle: The ``ModelBundle`` that the endpoint should serve. - + model_bundle: (deprecated) The ``ModelBundle`` that the endpoint should serve. Deprecated in favor of + model_bundle_id. + cpus: Number of cpus each worker should get, e.g. 1, 2, etc. This must be greater than or equal to 1. memory: Amount of memory each worker should get, e.g. "4Gi", "512Mi", etc. This must be a positive @@ -743,11 +591,13 @@ def create_model_endpoint( logger.info("Creating new endpoint") with ApiClient(self.configuration) as api_client: api_instance = DefaultApi(api_client) + if ( not isinstance(model_bundle, ModelBundle) or model_bundle.id is None ): model_bundle = self.get_model_bundle(model_bundle) + payload = dict_not_none( cpus=cpus, endpoint_type=ModelEndpointType(endpoint_type), @@ -760,7 +610,6 @@ def create_model_endpoint( memory=memory, metadata={}, min_workers=min_workers, - model_bundle_id=model_bundle.id, name=endpoint_name, per_worker=per_worker, post_inference_hooks=post_inference_hooks or [], @@ -951,6 +800,11 @@ def get_model_endpoint( "Endpoint should be one of the types 'sync' or 'async'" ) + """ + XXX: Returns a 500 + + Also need to pass in optional query params + """ def list_model_bundles(self) -> List[ModelBundle]: """ Returns a list of model bundles that the user owns. @@ -960,7 +814,9 @@ def list_model_bundles(self) -> List[ModelBundle]: """ with ApiClient(self.configuration) as api_client: api_instance = DefaultApi(api_client) + query_params = {"name": self.model_endpoint.name} response = api_instance.list_model_bundles_v1_model_bundles_get( + query_params=query_params, skip_deserialization=True ) resp = json.loads(response.response.data) @@ -994,26 +850,26 @@ def get_model_bundle( def clone_model_bundle_with_changes( self, - model_bundle: Union[ModelBundle, str], + model_bundle_id: str, app_config: Optional[Dict] = None, ) -> ModelBundle: """ Clones an existing model bundle with changes to its app config. (More fields coming soon) Parameters: - model_bundle: The existing bundle or its ID. + model_bundle_id: The existing bundle or its ID. app_config: The new bundle's app config, if not passed in, the new bundle's ``app_config`` will be set to ``None`` Returns: A ``ModelBundle`` object """ - bundle_id = _model_bundle_to_id(model_bundle) with ApiClient(self.configuration) as api_client: api_instance = DefaultApi(api_client) payload = dict_not_none( - original_model_bundle_id=bundle_id, + original_model_bundle_id=model_bundle_id, new_app_config=app_config, + ) clone_model_bundle_request = CloneModelBundleRequest(**payload) response = api_instance.clone_model_bundle_with_changes_v1_model_bundles_clone_with_changes_post( @@ -1416,6 +1272,821 @@ def get_batch_async_response(self, batch_job_id: str) -> Dict[str, Any]: resp = json.loads(response.response.data) return resp + ### + # V1 functions + ### + def create_model_bundle_from_dirs_v1( + self, + *, + model_bundle_name: str, + base_paths: List[str], + requirements_path: str, + env_params: Dict[str, str], + load_predict_fn_module_path: str, + load_model_fn_module_path: str, + app_config: Optional[Union[Dict[str, Any], str]] = None, + request_schema: Optional[Type[BaseModel]] = None, + response_schema: Optional[Type[BaseModel]] = None, + ) -> Dict[str, Any]: + """ + Packages up code from one or more local filesystem folders and uploads them as a bundle to Scale Launch. + In this mode, a bundle is just local code instead of a serialized object. + + For example, if you have a directory structure like so, and your current working directory is also ``my_root``: + + .. code-block:: text + + my_root/ + my_module1/ + __init__.py + ...files and directories + my_inference_file.py + my_module2/ + __init__.py + ...files and directories + + then calling ``create_model_bundle_from_dirs`` with ``base_paths=["my_module1", "my_module2"]`` essentially + creates a zip file without the root directory, e.g.: + + .. code-block:: text + + my_module1/ + __init__.py + ...files and directories + my_inference_file.py + my_module2/ + __init__.py + ...files and directories + + and these contents will be unzipped relative to the server side application root. Bear these points in mind when + referencing Python module paths for this bundle. For instance, if ``my_inference_file.py`` has ``def f(...)`` + as the desired inference loading function, then the `load_predict_fn_module_path` argument should be + `my_module1.my_inference_file.f`. + + + Parameters: + model_bundle_name: The name of the model bundle you want to create. The name must be unique across all + bundles that you own. + + base_paths: The paths on the local filesystem where the bundle code lives. + + requirements_path: A path on the local filesystem where a ``requirements.txt`` file lives. + + env_params: A dictionary that dictates environment information e.g. + the use of pytorch or tensorflow, which base image tag to use, etc. + Specifically, the dictionary should contain the following keys: + + - ``framework_type``: either ``tensorflow`` or ``pytorch``. + - PyTorch fields: + - ``pytorch_image_tag``: An image tag for the ``pytorch`` docker base image. The list of tags + can be found from https://hub.docker.com/r/pytorch/pytorch/tags. + - Example: + + .. code-block:: python + + { + "framework_type": "pytorch", + "pytorch_image_tag": "1.10.0-cuda11.3-cudnn8-runtime" + } + + load_predict_fn_module_path: A python module path for a function that, when called with the output of + load_model_fn_module_path, returns a function that carries out inference. + + load_model_fn_module_path: A python module path for a function that returns a model. The output feeds into + the function located at load_predict_fn_module_path. + + app_config: Either a Dictionary that represents a YAML file contents or a local path to a YAML file. + + request_schema: A pydantic model that represents the request schema for the model + bundle. This is used to validate the request body for the model bundle's endpoint. + + response_schema: A pydantic model that represents the request schema for the model + bundle. This is used to validate the response for the model bundle's endpoint. + Note: If request_schema is specified, then response_schema must also be specified. + """ + with open(requirements_path, "r", encoding="utf-8") as req_f: + requirements = req_f.read().splitlines() + + tmpdir = tempfile.mkdtemp() + try: + zip_path = os.path.join(tmpdir, "bundle.zip") + _zip_directories(zip_path, base_paths) + with open(zip_path, "rb") as zip_f: + data = zip_f.read() + finally: + shutil.rmtree(tmpdir) + + raw_bundle_url = self._upload_data(data) + + schema_location = None + if bool(request_schema) ^ bool(response_schema): + raise ValueError( + "If request_schema is specified, then response_schema must also be specified." + ) + if request_schema is not None and response_schema is not None: + model_definitions = get_model_definitions( + request_schema=request_schema, + response_schema=response_schema, + ) + model_definitions_encoded = json.dumps(model_definitions).encode() + schema_location = self._upload_data(model_definitions_encoded) + + bundle_metadata = { + "load_predict_fn_module_path": load_predict_fn_module_path, + "load_model_fn_module_path": load_model_fn_module_path, + } + + logger.info( + "create_model_bundle_from_dirs: raw_bundle_url=%s", + raw_bundle_url, + ) + payload = dict( + packaging_type="zip", + bundle_name=model_bundle_name, + location=raw_bundle_url, + bundle_metadata=bundle_metadata, + requirements=requirements, + env_params=env_params, + schema_location=schema_location, + ) + _add_app_config_to_bundle_create_payload(payload, app_config) + + with ApiClient(self.configuration) as api_client: + api_instance = DefaultApi(api_client) + framework = ModelBundleFramework(env_params["framework_type"]) + env_params_copy = env_params.copy() + env_params_copy["framework_type"] = framework # type: ignore + env_params_obj = ModelBundleEnvironmentParams(**env_params_copy) # type: ignore + payload = dict_not_none( + env_params=env_params_obj, + location=raw_bundle_url, + name=model_bundle_name, + requirements=requirements, + packaging_type=ModelBundlePackagingType("zip"), + metadata=bundle_metadata, + app_config=payload.get("app_config"), + schema_location=schema_location, + ) + create_model_bundle_request = CreateModelBundleRequest(**payload) # type: ignore + return api_instance.create_model_bundle_v1_model_bundles_post( + body=create_model_bundle_request, + skip_deserialization=True, + ) + + def create_model_bundle_v1( + self, + model_bundle_name: str, + env_params: Dict[str, str], + *, + load_predict_fn: Optional[ + Callable[[LaunchModel_T], Callable[[Any], Any]] + ] = None, + predict_fn_or_cls: Optional[Callable[[Any], Any]] = None, + requirements: Optional[List[str]] = None, + model: Optional[LaunchModel_T] = None, + load_model_fn: Optional[Callable[[], LaunchModel_T]] = None, + app_config: Optional[Union[Dict[str, Any], str]] = None, + globals_copy: Optional[Dict[str, Any]] = None, + request_schema: Optional[Type[BaseModel]] = None, + response_schema: Optional[Type[BaseModel]] = None, + ) -> Dict[str, Any]: + """ + Uploads and registers a model bundle to Scale Launch. + + A model bundle consists of exactly one of the following: + + - ``predict_fn_or_cls`` + - ``load_predict_fn + model`` + - ``load_predict_fn + load_model_fn`` + + Pre/post-processing code can be included inside load_predict_fn/model or in predict_fn_or_cls call. + + Parameters: + model_bundle_name: The name of the model bundle you want to create. The name must be unique across all + bundles that you own. + + predict_fn_or_cls: ``Function`` or a ``Callable`` class that runs end-to-end (pre/post processing and model inference) on the call. + i.e. ``predict_fn_or_cls(REQUEST) -> RESPONSE``. + + model: Typically a trained Neural Network, e.g. a Pytorch module. + + Exactly one of ``model`` and ``load_model_fn`` must be provided. + + load_model_fn: A function that, when run, loads a model. This function is essentially a deferred + wrapper around the ``model`` argument. + + Exactly one of ``model`` and ``load_model_fn`` must be provided. + + load_predict_fn: Function that, when called with a model, returns a function that carries out inference. + + If ``model`` is specified, then this is equivalent + to: + ``load_predict_fn(model, app_config=optional_app_config]) -> predict_fn`` + + Otherwise, if ``load_model_fn`` is specified, then this is equivalent + to: + ``load_predict_fn(load_model_fn(), app_config=optional_app_config]) -> predict_fn`` + + In both cases, ``predict_fn`` is then the inference function, i.e.: + ``predict_fn(REQUEST) -> RESPONSE`` + + + requirements: A list of python package requirements, where each list element is of the form + ``==``, e.g. + + ``["tensorflow==2.3.0", "tensorflow-hub==0.11.0"]`` + + If you do not pass in a value for ``requirements``, then you must pass in ``globals()`` for the + ``globals_copy`` argument. + + app_config: Either a Dictionary that represents a YAML file contents or a local path to a YAML file. + + env_params: A dictionary that dictates environment information e.g. + the use of pytorch or tensorflow, which base image tag to use, etc. + Specifically, the dictionary should contain the following keys: + + - ``framework_type``: either ``tensorflow`` or ``pytorch``. + - PyTorch fields: + - ``pytorch_image_tag``: An image tag for the ``pytorch`` docker base image. The list of tags + can be found from https://hub.docker.com/r/pytorch/pytorch/tags. + - Example: + + .. code-block:: python + + { + "framework_type": "pytorch", + "pytorch_image_tag": "1.10.0-cuda11.3-cudnn8-runtime" + } + + - Tensorflow fields: + - ``tensorflow_version``: Version of tensorflow, e.g. ``"2.3.0"``. + + globals_copy: Dictionary of the global symbol table. Normally provided by ``globals()`` built-in function. + + bundle_url: (Only used in self-hosted mode.) The desired location of bundle. + Overrides any value given by ``self.bundle_location_fn`` + + request_schema: A pydantic model that represents the request schema for the model + bundle. This is used to validate the request body for the model bundle's endpoint. + + response_schema: A pydantic model that represents the request schema for the model + bundle. This is used to validate the response for the model bundle's endpoint. + Note: If request_schema is specified, then response_schema must also be specified. + """ + check_args = [ + predict_fn_or_cls is not None, + load_predict_fn is not None and model is not None, + load_predict_fn is not None and load_model_fn is not None, + ] + + if sum(check_args) != 1: + raise ValueError( + "A model bundle consists of exactly {predict_fn_or_cls}, {load_predict_fn + model}, or {load_predict_fn + load_model_fn}." + ) + # TODO should we try to catch when people intentionally pass both model and load_model_fn as None? + + if requirements is None: + # TODO explore: does globals() actually work as expected? Should we use globals_copy instead? + requirements_inferred = find_packages_from_imports(globals()) + requirements = [ + f"{key}=={value}" + for key, value in requirements_inferred.items() + ] + logger.info( + "Using \n%s\n for model bundle %s", + requirements, + model_bundle_name, + ) + + # Prepare cloudpickle for external imports + if globals_copy: + for module in get_imports(globals_copy): + if module.__name__ == cloudpickle.__name__: + # Avoid recursion + # register_pickle_by_value does not work properly with itself + continue + cloudpickle.register_pickle_by_value(module) + + bundle: Union[ + Callable[[Any], Any], Dict[str, Any], None + ] # validate bundle + bundle_metadata = {} + # Create bundle + if predict_fn_or_cls: + bundle = predict_fn_or_cls + if inspect.isfunction(predict_fn_or_cls): + source_code = inspect.getsource(predict_fn_or_cls) + else: + source_code = inspect.getsource(predict_fn_or_cls.__class__) + bundle_metadata["predict_fn_or_cls"] = source_code + elif model is not None: + bundle = dict(model=model, load_predict_fn=load_predict_fn) + bundle_metadata["load_predict_fn"] = inspect.getsource( + load_predict_fn # type: ignore + ) + else: + bundle = dict( + load_model_fn=load_model_fn, load_predict_fn=load_predict_fn + ) + bundle_metadata["load_predict_fn"] = inspect.getsource( + load_predict_fn # type: ignore + ) + bundle_metadata["load_model_fn"] = inspect.getsource( + load_model_fn # type: ignore + ) + + serialized_bundle = cloudpickle.dumps(bundle) + raw_bundle_url = self._upload_data(data=serialized_bundle) + + schema_location = None + if bool(request_schema) ^ bool(response_schema): + raise ValueError( + "If request_schema is specified, then response_schema must also be specified." + ) + if request_schema is not None and response_schema is not None: + model_definitions = get_model_definitions( + request_schema=request_schema, + response_schema=response_schema, + ) + model_definitions_encoded = json.dumps(model_definitions).encode() + schema_location = self._upload_data(model_definitions_encoded) + + payload = dict( + packaging_type="cloudpickle", + bundle_name=model_bundle_name, + location=raw_bundle_url, + bundle_metadata=bundle_metadata, + requirements=requirements, + env_params=env_params, + schema_location=schema_location, + ) + + _add_app_config_to_bundle_create_payload(payload, app_config) + framework = ModelBundleFramework(env_params["framework_type"]) + env_params_copy = env_params.copy() + env_params_copy["framework_type"] = framework # type: ignore + env_params_obj = ModelBundleEnvironmentParams(**env_params_copy) # type: ignore + + with ApiClient(self.configuration) as api_client: + api_instance = DefaultApi(api_client) + payload = dict_not_none( + env_params=env_params_obj, + location=raw_bundle_url, + name=model_bundle_name, + requirements=requirements, + packaging_type=ModelBundlePackagingType("cloudpickle"), + metadata=bundle_metadata, + app_config=app_config, + schema_location=schema_location, + ) + create_model_bundle_request = CreateModelBundleRequest(**payload) # type: ignore + response = api_instance.create_model_bundle_v1_model_bundles_post( + body=create_model_bundle_request, + skip_deserialization=True, + ) + return json.loads(response.response.data) + + def list_model_bundles_v1(self) -> List[Dict[str, Any]]: + """ + Returns a list of model bundles that the user owns. + + Returns: + A list of JSON objects that correspond to the bundles that the user owns. + """ + with ApiClient(self.configuration) as api_client: + api_instance = DefaultApi(api_client) + response = api_instance.list_model_bundles_v1_model_bundles_get( + skip_deserialization=True + ) + return json.loads(response.response.data) + + def get_model_bundle_by_id_v1(self, model_bundle_id: str) -> Dict[str, Any]: + """ + Gets the details of a model bundle given its id. + + Parameters: + model_bundle_id: The ID of the model bundle to get. + + Returns: + A JSON representation of the bundle details. + """ + with ApiClient(self.configuration) as api_client: + api_instance = DefaultApi(api_client) + path_params = frozendict({"model_bundle_id": model_bundle_id}) + response = api_instance.get_model_bundle_v1_model_bundles_model_bundle_id_get( # type: ignore + path_params=path_params, # type: ignore + skip_deserialization=True, + ) + return json.loads(response.response.data) + + def get_latest_model_bundle_v1(self, model_bundle_name: str) -> Dict[str, Any]: + """ + Gets the details of the latest model bundle that has a certain name. + + Parameters: + model_bundle_name: The name of the model bundle. + + Returns: + A JSON representation of the bundle details. + """ + with ApiClient(self.configuration) as api_client: + api_instance = DefaultApi(api_client) + path_params = frozendict({"model_name": model_bundle_name}) + response = api_instance.get_latest_model_bundle_v1_model_bundles_latest_get( # type: ignore + path_params=path_params, # type: ignore + skip_deserialization=True, + ) + return json.loads(response.response.data) + + def create_model_endpoint_v1( + self, + *, + endpoint_name: str, + model_bundle_id: Optional[str] = None, + cpus: int = 3, + memory: str = "8Gi", + storage: Optional[str] = None, + gpus: int = 0, + min_workers: int = 1, + max_workers: int = 1, + per_worker: int = 10, + gpu_type: Optional[str] = None, + endpoint_type: str = "sync", + post_inference_hooks: Optional[List[PostInferenceHooks]] = None, + default_callback_url: Optional[str] = None, + update_if_exists: bool = False, + labels: Optional[Dict[str, str]] = None, + ) -> Dict[str, Any]: + """ + Creates and registers a model endpoint in Scale Launch. The returned object is an instance of type ``Endpoint``, + which is a base class of either ``SyncEndpoint`` or ``AsyncEndpoint``. This is the object + to which you sent inference requests. + + Parameters: + endpoint_name: The name of the model endpoint you want to create. The name must be unique across + all endpoints that you own. + + model_bundle_id: The ID of the ``ModelBundle`` that the endpoint should serve. + + cpus: Number of cpus each worker should get, e.g. 1, 2, etc. This must be greater than or equal to 1. + + memory: Amount of memory each worker should get, e.g. "4Gi", "512Mi", etc. This must be a positive + amount of memory. + + storage: Amount of local ephemeral storage each worker should get, e.g. "4Gi", "512Mi", etc. This must + be a positive amount of storage. + + gpus: Number of gpus each worker should get, e.g. 0, 1, etc. + + min_workers: The minimum number of workers. Must be greater than or equal to 0. This should be determined + by computing the minimum throughput of your workload and dividing it by the throughput of a single + worker. This field must be at least ``1`` for synchronous endpoints. + + max_workers: The maximum number of workers. Must be greater than or equal to 0, and as well as + greater than or equal to ``min_workers``. This should be determined by computing the maximum throughput + of your workload and dividing it by the throughput of a single worker. + + per_worker: The maximum number of concurrent requests that an individual worker can service. Launch + automatically scales the number of workers for the endpoint so that each worker is processing + ``per_worker`` requests, subject to the limits defined by ``min_workers`` and ``max_workers``. + + - If the average number of concurrent requests per worker is lower than ``per_worker``, then the number + of workers will be reduced. + - Otherwise, if the average number of concurrent requests per worker is higher + than ``per_worker``, then the number of workers will be increased to meet the elevated traffic. + + Here is our recommendation for computing ``per_worker``: + + 1. Compute ``min_workers`` and ``max_workers`` per your minimum and maximum throughput requirements. + 2. Determine a value for the maximum number of concurrent requests in the workload. Divide this number + by ``max_workers``. Doing this ensures that the number of workers will "climb" to ``max_workers``. + + gpu_type: If specifying a non-zero number of gpus, this controls the type of gpu requested. Here are the + supported values: + + - ``nvidia-tesla-t4`` + - ``nvidia-ampere-a10`` + + endpoint_type: Either ``"sync"`` or ``"async"``. + + post_inference_hooks: List of hooks to trigger after inference tasks are served. + + default_callback_url: The default callback url to use for async endpoints. + This can be overridden in the task parameters for each individual task. + post_inference_hooks must contain "callback" for the callback to be triggered. + + update_if_exists: If ``True``, will attempt to update the endpoint if it exists. Otherwise, will + unconditionally try to create a new endpoint. Note that endpoint names for a given user must be unique, + so attempting to call this function with ``update_if_exists=False`` for an existing endpoint will raise + an error. + + labels: An optional dictionary of key/value pairs to associate with this endpoint. + + Returns: + A JSON representation of the endpoint. + + """ + if update_if_exists: + # Maybe we want a server route for updating by name at some point. + + # If we're trying to do an update but no such endpoint exists, then this will blow up. + # + endpoint_response = self.get_model_endpoint_by_name_v1(model_endpoint_name=endpoint_name) + + response = self.update_model_endpoint_v1( + model_endpoint_id=endpoint_response["id"], + model_bundle_id=model_bundle_id, + cpus=cpus, + memory=memory, + storage=storage, + gpus=gpus, + min_workers=min_workers, + max_workers=max_workers, + per_worker=per_worker, + gpu_type=gpu_type, + default_callback_url=default_callback_url, + ) + return response + else: + # Presumably, the user knows that the endpoint doesn't already exist, and so we can defer + # to the server to reject any duplicate creations. + logger.info("Creating new endpoint") + with ApiClient(self.configuration) as api_client: + api_instance = DefaultApi(api_client) + + payload = dict_not_none( + cpus=cpus, + endpoint_type=ModelEndpointType(endpoint_type), + gpus=gpus, + gpu_type=GpuType(gpu_type) + if gpu_type is not None + else None, + labels=labels or {}, + max_workers=max_workers, + memory=memory, + metadata={}, + min_workers=min_workers, + model_bundle_id=model_bundle_id, + name=endpoint_name, + per_worker=per_worker, + post_inference_hooks=post_inference_hooks or [], + default_callback_url=default_callback_url, + storage=storage, + ) + create_model_endpoint_request = CreateModelEndpointRequest( + **payload + ) + response = ( + api_instance.create_model_endpoint_v1_model_endpoints_post( + body=create_model_endpoint_request, + skip_deserialization=True, + ) + ) + return json.loads(response.response.data) + + def update_model_endpoint_v1( + self, + *, + model_endpoint_id: str, + model_bundle_id: Optional[str] = None, + cpus: Optional[float] = None, + memory: Optional[str] = None, + storage: Optional[str] = None, + gpus: Optional[int] = None, + min_workers: Optional[int] = None, + max_workers: Optional[int] = None, + per_worker: Optional[int] = None, + gpu_type: Optional[str] = None, + post_inference_hooks: Optional[List[PostInferenceHooks]] = None, + default_callback_url: Optional[str] = None, + ) -> Dict[str, Any]: + """ + Edits an existing model endpoint. Here are the fields that **cannot** be edited on an existing endpoint: + + - The endpoint's name. + - The endpoint's type (i.e. you cannot go from a ``SyncEnpdoint`` to an ``AsyncEndpoint`` or vice versa. + + Parameters: + model_endpoint_id: The model endpoint (or its name) you want to edit. The name must be unique across + all endpoints that you own. + + model_bundle_id: The ``ModelBundle`` that the endpoint should serve. + + cpus: Number of cpus each worker should get, e.g. 1, 2, etc. This must be greater than or equal to 1. + + memory: Amount of memory each worker should get, e.g. "4Gi", "512Mi", etc. This must be a positive + amount of memory. + + storage: Amount of local ephemeral storage each worker should get, e.g. "4Gi", "512Mi", etc. This must + be a positive amount of storage. + + gpus: Number of gpus each worker should get, e.g. 0, 1, etc. + + min_workers: The minimum number of workers. Must be greater than or equal to 0. + + max_workers: The maximum number of workers. Must be greater than or equal to 0, and as well as + greater than or equal to ``min_workers``. + + per_worker: The maximum number of concurrent requests that an individual worker can service. Launch + automatically scales the number of workers for the endpoint so that each worker is processing + ``per_worker`` requests: + + - If the average number of concurrent requests per worker is lower than ``per_worker``, then the number + of workers will be reduced. + - Otherwise, if the average number of concurrent requests per worker is higher + than ``per_worker``, then the number of workers will be increased to meet the elevated traffic. + + gpu_type: If specifying a non-zero number of gpus, this controls the type of gpu requested. Here are the + supported values: + + - ``nvidia-tesla-t4`` + - ``nvidia-ampere-a10`` + + post_inference_hooks: List of hooks to trigger after inference tasks are served. + + default_callback_url: The default callback url to use for async endpoints. + This can be overridden in the task parameters for each individual task. + post_inference_hooks must contain "callback" for the callback to be triggered. + + """ + logger.info("Editing existing endpoint") + with ApiClient(self.configuration) as api_client: + api_instance = DefaultApi(api_client) + + payload = dict_not_none( + cpus=cpus, + gpus=gpus, + gpu_type=GpuType(gpu_type) if gpu_type is not None else None, + max_workers=max_workers, + memory=memory, + min_workers=min_workers, + model_bundle_id=model_bundle_id, + per_worker=per_worker, + post_inference_hooks=post_inference_hooks or [], + default_callback_url=default_callback_url, + storage=storage, + ) + update_model_endpoint_request = UpdateModelEndpointRequest( + **payload + ) + path_params = frozendict({"model_endpoint_id": model_endpoint_id}) + response = api_instance.update_model_endpoint_v1_model_endpoints_model_endpoint_id_put( # type: ignore + body=update_model_endpoint_request, + path_params=path_params, # type: ignore + skip_deserialization=True, + ) + return json.loads(response.response.data) + + def list_model_endpoints_v1(self) -> List[Dict[str, Any]]: + """ + Lists all model endpoints that the user owns. + + Returns: + A list of ``ModelEndpoint`` objects. + """ + with ApiClient(self.configuration) as api_client: + api_instance = DefaultApi(api_client) + response = ( + api_instance.list_model_endpoints_v1_model_endpoints_get( + skip_deserialization=True + ) + ) + return json.loads(response.response.data) + + def get_model_endpoint_by_id_v1( + self, model_endpoint_id: str + ) -> Dict[str, Any]: + """ + Gets a model endpoint for a given ID. + + Parameters: + model_endpoint_id: The name of the endpoint to retrieve. + """ + with ApiClient(self.configuration) as api_client: + api_instance = DefaultApi(api_client) + path_params = frozendict({"model_endpoint_id": model_endpoint_id}) + response = api_instance.get_model_endpoints_api_v1_model_endpoints_api_get( # type: ignore + path_params=path_params, + skip_deserialization=True, + ) + return json.loads(response.response.data) + + def get_model_endpoint_by_name_v1( + self, model_endpoint_name: str + ) -> Dict[str, Any]: + """ + Gets a model endpoint for a given ID. + + Parameters: + model_endpoint_name: The name of the endpoint to retrieve. + """ + with ApiClient(self.configuration) as api_client: + api_instance = DefaultApi(api_client) + query_params = frozendict({"name": model_endpoint_name}) + response = api_instance.get_model_endpoints_api_v1_model_endpoints_api_get( # type: ignore + query_params=query_params, + skip_deserialization=True, + ) + return response + + def delete_model_endpoint_v1(self, model_endpoint_id: str) -> Dict[str, Any]: + """ + Deletes a model endpoint. + + Parameters: + model_endpoint_id: The model endpoint's ID. + """ + with ApiClient(self.configuration) as api_client: + api_instance = DefaultApi(api_client) + path_params = frozendict({"model_endpoint_id": model_endpoint_id}) + response = api_instance.delete_model_endpoint_v1_model_endpoints_model_endpoint_id_delete( # type: ignore + path_params=path_params, # type: ignore + skip_deserialization=True, + ) + return json.loads(response.response.data) + + def create_sync_inference_task_v1( + self, + model_endpoint_id: str, + url: Optional[str] = None, + args: Optional[Dict[str, Any]] = None, + return_pickled: Optional[bool] = False, + ) -> Dict[str, Any]: + """ + Creates an inference request against a synchronous endpoint. + + Parameters: + model_endpoint_id: The id of the endpoint that should receive the inference request. + url: The url to the inference payload. The contents in the file are sent as bytes to the bundle. + Exactly one of ``url`` and ``args`` must be specified. + args: A dictionary that is passed to the bundle. Exactly one of ``url`` and ``args`` must be specified. + return_pickled: Whether the python object returned is pickled, or directly written to the file returned. + """ + return self._sync_request( + endpoint_id=model_endpoint_id, + url=url, + args=args, + return_pickled=return_pickled, + ) + + def create_async_inference_task_v1( + self, + model_endpoint_id: str, + url: Optional[str] = None, + args: Optional[Dict[str, Any]] = None, + callback_url: Optional[str] = None, + return_pickled: Optional[bool] = False, + ) -> Dict[str, Any]: + """ + Creates an inference request against a synchronous endpoint. + + Parameters: + model_endpoint_id: The id of the endpoint that should receive the inference request. + url: The url to the inference payload. The contents in the file are sent as bytes to the bundle. + Exactly one of ``url`` and ``args`` must be specified. + args: A dictionary that is passed to the bundle. Exactly one of ``url`` and ``args`` must be specified. + callback_url: The callback url to use for this task. If None, then the + default_callback_url of the endpoint is used. The endpoint must specify + "callback" as a post-inference hook for the callback to be triggered. + return_pickled: Whether the python object returned is pickled, or directly written to the file returned. + """ + validate_task_request(url=url, args=args) + with ApiClient(self.configuration) as api_client: + api_instance = DefaultApi(api_client) + payload = dict_not_none( + return_pickled=return_pickled, + url=url, + args=args, + callback_url=callback_url, + ) + request = EndpointPredictRequest(**payload) + query_params = frozendict({"model_endpoint_id": model_endpoint_id}) + response = api_instance.create_async_inference_task_v1_async_tasks_post( # type: ignore + body=request, + query_params=query_params, # type: ignore + skip_deserialization=True, + ) + return json.loads(response.response.data) + + def get_async_inference_task_v1( + self, + task_id: str, + ) -> Dict[str, Any]: + """ + Gets the current result of an async inference task id. + + Parameters: + task_id: The ID returned from create_async_inference_task_v1. + """ + with ApiClient(self.configuration) as api_client: + api_instance = DefaultApi(api_client) + path_params = frozendict({"task_id": task_id}) + response = api_instance.get_async_inference_task_v1_async_tasks_task_id_get( + path_params=path_params, + skip_deserialization=True, + ) + return json.loads(response.response.data) + def _zip_directory(zipf: ZipFile, path: str) -> None: for root, _, files in os.walk(path):