From 10a69de3a9cd13f836ce0fe04a58edc1449f9885 Mon Sep 17 00:00:00 2001 From: Doojin Park Date: Thu, 29 Aug 2024 04:23:09 +0000 Subject: [PATCH] Deprecate KFP v1 support --- build/BUILD | 1 - tfx/dependencies.py | 33 +- .../penguin/penguin_pipeline_kubeflow.py | 49 +- .../penguin/penguin_pipeline_kubeflow_test.py | 28 +- .../templates/taxi/kubeflow_runner.py | 100 ---- tfx/orchestration/data_types.py | 2 +- tfx/orchestration/kubeflow/base_component.py | 166 ------ .../kubeflow/base_component_test.py | 213 -------- .../kubeflow/kubeflow_dag_runner.py | 471 ------------------ .../kubeflow/kubeflow_dag_runner_test.py | 329 ------------ tfx/orchestration/kubeflow/proto/BUILD | 25 - .../kubeflow/proto/kubeflow.proto | 52 -- tfx/orchestration/pipeline.py | 2 +- .../handler/kubeflow_dag_runner_patcher.py | 86 ---- .../kubeflow_dag_runner_patcher_test.py | 71 --- tfx/v1/orchestration/experimental/__init__.py | 17 - tfx/v1/proto/__init__.py | 4 +- 17 files changed, 54 insertions(+), 1595 deletions(-) delete mode 100644 tfx/experimental/templates/taxi/kubeflow_runner.py delete mode 100644 tfx/orchestration/kubeflow/base_component.py delete mode 100644 tfx/orchestration/kubeflow/base_component_test.py delete mode 100644 tfx/orchestration/kubeflow/kubeflow_dag_runner.py delete mode 100644 tfx/orchestration/kubeflow/kubeflow_dag_runner_test.py delete mode 100644 tfx/orchestration/kubeflow/proto/BUILD delete mode 100644 tfx/orchestration/kubeflow/proto/kubeflow.proto delete mode 100644 tfx/tools/cli/handler/kubeflow_dag_runner_patcher.py delete mode 100644 tfx/tools/cli/handler/kubeflow_dag_runner_patcher_test.py diff --git a/build/BUILD b/build/BUILD index 4d596ef5b2..60607e96b3 100644 --- a/build/BUILD +++ b/build/BUILD @@ -25,7 +25,6 @@ sh_binary( "//tfx/extensions/experimental/kfp_compatibility/proto:kfp_component_spec_pb2.py", "//tfx/extensions/google_cloud_big_query/experimental/elwc_example_gen/proto:elwc_config_pb2.py", "//tfx/orchestration/experimental/core:component_generated_alert_pb2.py", - "//tfx/orchestration/kubeflow/proto:kubeflow_pb2.py", "//tfx/proto:bulk_inferrer_pb2.py", "//tfx/proto:distribution_validator_pb2.py", "//tfx/proto:evaluator_pb2.py", diff --git a/tfx/dependencies.py b/tfx/dependencies.py index 24b07a24cf..5fdb30e0ca 100644 --- a/tfx/dependencies.py +++ b/tfx/dependencies.py @@ -69,9 +69,8 @@ def make_pipeline_sdk_required_install_packages(): # TODO(b/176812386): Deprecate usage of jinja2 for placeholders. 'jinja2>=2.7.3,<4', # typing-extensions allows consistent & future-proof interface for typing. - # Since kfp<2 uses typing-extensions<4, lower bound is the latest 3.x, and - # upper bound is <5 as the semver started from 4.0 according to their doc. - 'typing-extensions>=3.10.0.2,<5', + # Upper bound is <5 as the semver started from 4.0 according to their doc. + 'typing-extensions<5', ] @@ -87,7 +86,7 @@ def make_required_install_packages(): 'google-cloud-bigquery>=3,<4', 'grpcio>=1.28.1,<2', 'keras-tuner>=1.0.4,<2,!=1.4.0,!=1.4.1', - 'kubernetes>=10.0.1,<13', + 'kubernetes>=10.0.1,<27', 'numpy>=1.16,<2', 'pyarrow>=10,<11', # TODO: b/358471141 - Orjson 3.10.7 breaks TFX OSS tests. @@ -146,9 +145,8 @@ def make_extra_packages_airflow(): def make_extra_packages_kfp(): """Prepare extra packages needed for Kubeflow Pipelines orchestrator.""" return [ - # TODO(b/304892416): Migrate from KFP SDK v1 to v2. - 'kfp>=1.8.14,<2', - 'kfp-pipeline-spec>0.1.13,<0.2', + 'kfp>=2', + 'kfp-pipeline-spec>=0.2.2', ] @@ -156,17 +154,20 @@ def make_extra_packages_test(): """Prepare extra packages needed for running unit tests.""" # Note: It is okay to pin packages to exact versions in this list to minimize # conflicts. - return make_extra_packages_airflow() + make_extra_packages_kfp() + [ - 'pytest>=5,<7', - ] + return ( + make_extra_packages_airflow() + + make_extra_packages_kfp() + + [ + 'pytest>=5,<7', + ] + ) def make_extra_packages_docker_image(): # Packages needed for tfx docker image. return [ - # TODO(b/304892416): Migrate from KFP SDK v1 to v2. - 'kfp>=1.8.14,<2', - 'kfp-pipeline-spec>0.1.13,<0.2', + 'kfp>=2', + 'kfp-pipeline-spec>=0.3.0', 'mmh>=2.2,<3', 'python-snappy>=0.5,<0.6', # Required for tfx/examples/penguin/penguin_utils_cloud_tuner.py @@ -194,10 +195,12 @@ def make_extra_packages_tf_ranking(): # Packages needed for tf-ranking which is used in tfx/examples/ranking. return [ 'tensorflow-ranking>=0.5,<0.6', - 'struct2tensor' + select_constraint( + 'struct2tensor' + + select_constraint( default='>=0.46.0,<0.47.0', nightly='>=0.47.0.dev', - git_master='@git+https://github.com/google/struct2tensor@master'), + git_master='@git+https://github.com/google/struct2tensor@master', + ), ] diff --git a/tfx/examples/penguin/penguin_pipeline_kubeflow.py b/tfx/examples/penguin/penguin_pipeline_kubeflow.py index 26c82cc02e..8c0e2e46ec 100644 --- a/tfx/examples/penguin/penguin_pipeline_kubeflow.py +++ b/tfx/examples/penguin/penguin_pipeline_kubeflow.py @@ -501,33 +501,28 @@ def main(): else: beam_pipeline_args = _beam_pipeline_args_by_runner['DirectRunner'] - if use_vertex: - dag_runner = tfx.orchestration.experimental.KubeflowV2DagRunner( - config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(), - output_filename=_pipeline_definition_file) - else: - dag_runner = tfx.orchestration.experimental.KubeflowDagRunner( - config=tfx.orchestration.experimental.KubeflowDagRunnerConfig( - kubeflow_metadata_config=tfx.orchestration.experimental - .get_default_kubeflow_metadata_config())) - - dag_runner.run( - create_pipeline( - pipeline_name=_pipeline_name, - pipeline_root=_pipeline_root, - data_root=_data_root, - module_file=_module_file, - enable_tuning=False, - enable_cache=True, - user_provided_schema_path=_user_provided_schema, - ai_platform_training_args=_ai_platform_training_args, - ai_platform_serving_args=_ai_platform_serving_args, - beam_pipeline_args=beam_pipeline_args, - use_cloud_component=use_cloud_component, - use_aip=use_aip, - use_vertex=use_vertex, - serving_model_dir=_serving_model_dir, - )) + dag_runner = tfx.orchestration.experimental.KubeflowV2DagRunner( + config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(), + output_filename=_pipeline_definition_file, + ) + dag_runner.run( + create_pipeline( + pipeline_name=_pipeline_name, + pipeline_root=_pipeline_root, + data_root=_data_root, + module_file=_module_file, + enable_tuning=False, + enable_cache=True, + user_provided_schema_path=_user_provided_schema, + ai_platform_training_args=_ai_platform_training_args, + ai_platform_serving_args=_ai_platform_serving_args, + beam_pipeline_args=beam_pipeline_args, + use_cloud_component=use_cloud_component, + use_aip=use_aip, + use_vertex=use_vertex, + serving_model_dir=_serving_model_dir, + ) + ) # To compile the pipeline: diff --git a/tfx/examples/penguin/penguin_pipeline_kubeflow_test.py b/tfx/examples/penguin/penguin_pipeline_kubeflow_test.py index d36178b9b5..f267749cfe 100644 --- a/tfx/examples/penguin/penguin_pipeline_kubeflow_test.py +++ b/tfx/examples/penguin/penguin_pipeline_kubeflow_test.py @@ -64,24 +64,16 @@ def testPenguinPipelineConstructionAndDefinitionFileExists( serving_model_dir=penguin_pipeline_kubeflow._serving_model_dir) self.assertLen(kubeflow_pipeline.components, 9) - if use_vertex: - v2_dag_runner = orchestration.experimental.KubeflowV2DagRunner( - config=orchestration.experimental.KubeflowV2DagRunnerConfig(), - output_dir=self.tmp_dir, - output_filename=penguin_pipeline_kubeflow._pipeline_definition_file) - v2_dag_runner.run(kubeflow_pipeline) - file_path = os.path.join( - self.tmp_dir, penguin_pipeline_kubeflow._pipeline_definition_file) - self.assertTrue(fileio.exists(file_path)) - else: - v1_dag_runner = orchestration.experimental.KubeflowDagRunner( - config=orchestration.experimental.KubeflowDagRunnerConfig( - kubeflow_metadata_config=orchestration.experimental - .get_default_kubeflow_metadata_config())) - v1_dag_runner.run(kubeflow_pipeline) - file_path = os.path.join(self.tmp_dir, 'penguin-kubeflow.tar.gz') - self.assertTrue(fileio.exists(file_path)) - + v2_dag_runner = orchestration.experimental.KubeflowV2DagRunner( + config=orchestration.experimental.KubeflowV2DagRunnerConfig(), + output_dir=self.tmp_dir, + output_filename=penguin_pipeline_kubeflow._pipeline_definition_file, + ) + v2_dag_runner.run(kubeflow_pipeline) + file_path = os.path.join( + self.tmp_dir, penguin_pipeline_kubeflow._pipeline_definition_file + ) + self.assertTrue(fileio.exists(file_path)) if __name__ == '__main__': tf.test.main() diff --git a/tfx/experimental/templates/taxi/kubeflow_runner.py b/tfx/experimental/templates/taxi/kubeflow_runner.py deleted file mode 100644 index 74d873f0f7..0000000000 --- a/tfx/experimental/templates/taxi/kubeflow_runner.py +++ /dev/null @@ -1,100 +0,0 @@ -# Copyright 2020 Google LLC. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Define KubeflowDagRunner to run the pipeline using Kubeflow.""" - -import os -from absl import logging - -from tfx import v1 as tfx -from tfx.experimental.templates.taxi.pipeline import configs -from tfx.experimental.templates.taxi.pipeline import pipeline - -# TFX pipeline produces many output files and metadata. All output data will be -# stored under this OUTPUT_DIR. -OUTPUT_DIR = os.path.join('gs://', configs.GCS_BUCKET_NAME) - -# TFX produces two types of outputs, files and metadata. -# - Files will be created under PIPELINE_ROOT directory. -PIPELINE_ROOT = os.path.join(OUTPUT_DIR, 'tfx_pipeline_output', - configs.PIPELINE_NAME) - -# The last component of the pipeline, "Pusher" will produce serving model under -# SERVING_MODEL_DIR. -SERVING_MODEL_DIR = os.path.join(PIPELINE_ROOT, 'serving_model') - -# Specifies data file directory. DATA_PATH should be a directory containing CSV -# files for CsvExampleGen in this example. By default, data files are in the -# GCS path: `gs://{GCS_BUCKET_NAME}/tfx-template/data/`. Using a GCS path is -# recommended for KFP. -# -# One can optionally choose to use a data source located inside of the container -# built by the template, by specifying -# DATA_PATH = 'data'. Note that Dataflow does not support use container as a -# dependency currently, so this means CsvExampleGen cannot be used with Dataflow -# (step 8 in the template notebook). - -DATA_PATH = 'gs://{}/tfx-template/data/taxi/'.format(configs.GCS_BUCKET_NAME) - - -def run(): - """Define a kubeflow pipeline.""" - - # Metadata config. The defaults works work with the installation of - # KF Pipelines using Kubeflow. If installing KF Pipelines using the - # lightweight deployment option, you may need to override the defaults. - # If you use Kubeflow, metadata will be written to MySQL database inside - # Kubeflow cluster. - metadata_config = tfx.orchestration.experimental.get_default_kubeflow_metadata_config( - ) - - runner_config = tfx.orchestration.experimental.KubeflowDagRunnerConfig( - kubeflow_metadata_config=metadata_config, - tfx_image=configs.PIPELINE_IMAGE) - pod_labels = { - 'add-pod-env': 'true', - tfx.orchestration.experimental.LABEL_KFP_SDK_ENV: 'tfx-template' - } - tfx.orchestration.experimental.KubeflowDagRunner( - config=runner_config, pod_labels_to_attach=pod_labels - ).run( - pipeline.create_pipeline( - pipeline_name=configs.PIPELINE_NAME, - pipeline_root=PIPELINE_ROOT, - data_path=DATA_PATH, - # TODO(step 7): (Optional) Uncomment below to use BigQueryExampleGen. - # query=configs.BIG_QUERY_QUERY, - # TODO(step 5): (Optional) Set the path of the customized schema. - # schema_path=generated_schema_path, - preprocessing_fn=configs.PREPROCESSING_FN, - run_fn=configs.RUN_FN, - train_args=tfx.proto.TrainArgs(num_steps=configs.TRAIN_NUM_STEPS), - eval_args=tfx.proto.EvalArgs(num_steps=configs.EVAL_NUM_STEPS), - eval_accuracy_threshold=configs.EVAL_ACCURACY_THRESHOLD, - serving_model_dir=SERVING_MODEL_DIR, - # TODO(step 7): (Optional) Uncomment below to use provide GCP related - # config for BigQuery with Beam DirectRunner. - # beam_pipeline_args=configs - # .BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS, - # TODO(step 8): (Optional) Uncomment below to use Dataflow. - # beam_pipeline_args=configs.DATAFLOW_BEAM_PIPELINE_ARGS, - # TODO(step 9): (Optional) Uncomment below to use Cloud AI Platform. - # ai_platform_training_args=configs.GCP_AI_PLATFORM_TRAINING_ARGS, - # TODO(step 9): (Optional) Uncomment below to use Cloud AI Platform. - # ai_platform_serving_args=configs.GCP_AI_PLATFORM_SERVING_ARGS, - )) - - -if __name__ == '__main__': - logging.set_verbosity(logging.INFO) - run() diff --git a/tfx/orchestration/data_types.py b/tfx/orchestration/data_types.py index aa4bb12c4b..10e88ec696 100644 --- a/tfx/orchestration/data_types.py +++ b/tfx/orchestration/data_types.py @@ -145,7 +145,7 @@ def component_run_context_name(self) -> str: class RuntimeParameter(json_utils.Jsonable): """Runtime parameter. - Currently only supported on KubeflowDagRunner. + Currently only supported on KubeflowV2DagRunner. For protos, use text type RuntimeParameter, which holds the proto json string, e.g., `'{"num_steps": 5}'` for TrainArgs proto. diff --git a/tfx/orchestration/kubeflow/base_component.py b/tfx/orchestration/kubeflow/base_component.py deleted file mode 100644 index 11eeb34a87..0000000000 --- a/tfx/orchestration/kubeflow/base_component.py +++ /dev/null @@ -1,166 +0,0 @@ -# Copyright 2019 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Kubeflow Pipelines based implementation of TFX components. - -These components are lightweight wrappers around the KFP DSL's ContainerOp, -and ensure that the container gets called with the right set of input -arguments. It also ensures that each component exports named output -attributes that are consistent with those provided by the native TFX -components, thus ensuring that both types of pipeline definitions are -compatible. -Note: This requires Kubeflow Pipelines SDK to be installed. -""" - -from typing import Dict, List, Set - -from absl import logging -from kfp import dsl -from kubernetes import client as k8s_client -from tfx.dsl.components.base import base_node as tfx_base_node -from tfx.orchestration import data_types -from tfx.orchestration import pipeline as tfx_pipeline -from tfx.orchestration.kubeflow.proto import kubeflow_pb2 -from tfx.proto.orchestration import pipeline_pb2 - -from google.protobuf import json_format - -# TODO(b/166202742): Consolidate container entrypoint with TFX image's default. -_COMMAND = ['python', '-m', 'tfx.orchestration.kubeflow.container_entrypoint'] - -_WORKFLOW_ID_KEY = 'WORKFLOW_ID' - - -def _encode_runtime_parameter(param: data_types.RuntimeParameter) -> str: - """Encode a runtime parameter into a placeholder for value substitution.""" - if param.ptype is int: - type_enum = pipeline_pb2.RuntimeParameter.INT - elif param.ptype is float: - type_enum = pipeline_pb2.RuntimeParameter.DOUBLE - else: - type_enum = pipeline_pb2.RuntimeParameter.STRING - type_str = pipeline_pb2.RuntimeParameter.Type.Name(type_enum) - return f'{param.name}={type_str}:{str(dsl.PipelineParam(name=param.name))}' - - -def _replace_placeholder(component: tfx_base_node.BaseNode) -> None: - """Replaces the RuntimeParameter placeholders with kfp.dsl.PipelineParam.""" - keys = list(component.exec_properties.keys()) - for key in keys: - exec_property = component.exec_properties[key] - if not isinstance(exec_property, data_types.RuntimeParameter): - continue - component.exec_properties[key] = str( - dsl.PipelineParam(name=exec_property.name)) - - -# TODO(hongyes): renaming the name to KubeflowComponent. -class BaseComponent: - """Base component for all Kubeflow pipelines TFX components. - - Returns a wrapper around a KFP DSL ContainerOp class, and adds named output - attributes that match the output names for the corresponding native TFX - components. - """ - - def __init__(self, - component: tfx_base_node.BaseNode, - depends_on: Set[dsl.ContainerOp], - pipeline: tfx_pipeline.Pipeline, - pipeline_root: dsl.PipelineParam, - tfx_image: str, - kubeflow_metadata_config: kubeflow_pb2.KubeflowMetadataConfig, - tfx_ir: pipeline_pb2.Pipeline, - pod_labels_to_attach: Dict[str, str], - runtime_parameters: List[data_types.RuntimeParameter], - metadata_ui_path: str = '/mlpipeline-ui-metadata.json'): - """Creates a new Kubeflow-based component. - - This class essentially wraps a dsl.ContainerOp construct in Kubeflow - Pipelines. - - Args: - component: The logical TFX component to wrap. - depends_on: The set of upstream KFP ContainerOp components that this - component will depend on. - pipeline: The logical TFX pipeline to which this component belongs. - pipeline_root: The pipeline root specified, as a dsl.PipelineParam - tfx_image: The container image to use for this component. - kubeflow_metadata_config: Configuration settings for connecting to the - MLMD store in a Kubeflow cluster. - tfx_ir: The TFX intermedia representation of the pipeline. - pod_labels_to_attach: Dict of pod labels to attach to the GKE pod. - runtime_parameters: Runtime parameters of the pipeline. - metadata_ui_path: File location for metadata-ui-metadata.json file. - """ - - _replace_placeholder(component) - - arguments = [ - '--pipeline_root', - pipeline_root, - '--kubeflow_metadata_config', - json_format.MessageToJson( - message=kubeflow_metadata_config, preserving_proto_field_name=True), - '--node_id', - component.id, - # TODO(b/182220464): write IR to pipeline_root and let - # container_entrypoint.py read it back to avoid future issue that IR - # exeeds the flag size limit. - '--tfx_ir', - json_format.MessageToJson(tfx_ir), - '--metadata_ui_path', - metadata_ui_path, - ] - - for param in runtime_parameters: - arguments.append('--runtime_parameter') - arguments.append(_encode_runtime_parameter(param)) - - self.container_op = dsl.ContainerOp( - name=component.id, - command=_COMMAND, - image=tfx_image, - arguments=arguments, - output_artifact_paths={ - 'mlpipeline-ui-metadata': metadata_ui_path, - }, - ) - - logging.info('Adding upstream dependencies for component %s', - self.container_op.name) - for op in depends_on: - logging.info(' -> Component: %s', op.name) - self.container_op.after(op) - - # TODO(b/140172100): Document the use of additional_pipeline_args. - if _WORKFLOW_ID_KEY in pipeline.additional_pipeline_args: - # Allow overriding pipeline's run_id externally, primarily for testing. - self.container_op.container.add_env_variable( - k8s_client.V1EnvVar( - name=_WORKFLOW_ID_KEY, - value=pipeline.additional_pipeline_args[_WORKFLOW_ID_KEY])) - else: - # Add the Argo workflow ID to the container's environment variable so it - # can be used to uniquely place pipeline outputs under the pipeline_root. - field_path = "metadata.labels['workflows.argoproj.io/workflow']" - self.container_op.container.add_env_variable( - k8s_client.V1EnvVar( - name=_WORKFLOW_ID_KEY, - value_from=k8s_client.V1EnvVarSource( - field_ref=k8s_client.V1ObjectFieldSelector( - field_path=field_path)))) - - if pod_labels_to_attach: - for k, v in pod_labels_to_attach.items(): - self.container_op.add_pod_label(k, v) diff --git a/tfx/orchestration/kubeflow/base_component_test.py b/tfx/orchestration/kubeflow/base_component_test.py deleted file mode 100644 index 5d4c1c54fc..0000000000 --- a/tfx/orchestration/kubeflow/base_component_test.py +++ /dev/null @@ -1,213 +0,0 @@ -# Copyright 2019 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Tests for tfx.orchestration.kubeflow.base_component.""" - -import json - -from absl import logging -from kfp import dsl -import tensorflow as tf -from tfx.components.example_gen.csv_example_gen import component as csv_example_gen_component -from tfx.components.statistics_gen import component as statistics_gen_component -from tfx.orchestration import data_types -from tfx.orchestration import pipeline as tfx_pipeline -from tfx.orchestration.kubeflow import base_component -from tfx.orchestration.kubeflow.proto import kubeflow_pb2 -from tfx.proto.orchestration import pipeline_pb2 - -from ml_metadata.proto import metadata_store_pb2 - - -class BaseComponentTest(tf.test.TestCase): - maxDiff = None # pylint: disable=invalid-name - _test_pipeline_name = 'test_pipeline' - - def setUp(self): - super().setUp() - example_gen = csv_example_gen_component.CsvExampleGen( - input_base='data_input') - statistics_gen = statistics_gen_component.StatisticsGen( - examples=example_gen.outputs['examples']).with_id('foo') - - pipeline = tfx_pipeline.Pipeline( - pipeline_name=self._test_pipeline_name, - pipeline_root='test_pipeline_root', - metadata_connection_config=metadata_store_pb2.ConnectionConfig(), - components=[example_gen, statistics_gen], - ) - - test_pipeline_root = dsl.PipelineParam(name='pipeline-root-param') - - self._metadata_config = kubeflow_pb2.KubeflowMetadataConfig() - self._metadata_config.mysql_db_service_host.environment_variable = 'MYSQL_SERVICE_HOST' - self._tfx_ir = pipeline_pb2.Pipeline() - with dsl.Pipeline('test_pipeline'): - self.component = base_component.BaseComponent( - component=statistics_gen, - depends_on=set(), - pipeline=pipeline, - pipeline_root=test_pipeline_root, - tfx_image='container_image', - kubeflow_metadata_config=self._metadata_config, - tfx_ir=self._tfx_ir, - pod_labels_to_attach={}, - runtime_parameters=[] - ) - self.tfx_component = statistics_gen - - def testContainerOpArguments(self): - expected_args = [ - '--pipeline_root', - '{{pipelineparam:op=;name=pipeline-root-param}}', - '--kubeflow_metadata_config', - '{\n' - ' "mysql_db_service_host": {\n' - ' "environment_variable": "MYSQL_SERVICE_HOST"\n' - ' }\n' - '}', - '--node_id', - 'foo', - ] - try: - self.assertEqual( - self.component.container_op.arguments[:len(expected_args)], - expected_args) - - except AssertionError: - # Print out full arguments for debugging. - logging.error('==== BEGIN CONTAINER OP ARGUMENT DUMP ====') - logging.error(json.dumps(self.component.container_op.arguments, indent=2)) - logging.error('==== END CONTAINER OP ARGUMENT DUMP ====') - raise - - def testContainerOpName(self): - self.assertEqual('foo', self.tfx_component.id) - self.assertEqual('foo', self.component.container_op.name) - - -class BaseComponentWithPipelineParamTest(tf.test.TestCase): - """Test the usage of RuntimeParameter.""" - maxDiff = None # pylint: disable=invalid-name - _test_pipeline_name = 'test_pipeline' - - def setUp(self): - super().setUp() - - example_gen_output_config = data_types.RuntimeParameter( - name='example-gen-output-config', ptype=str) - - example_gen = csv_example_gen_component.CsvExampleGen( - input_base='data_root', output_config=example_gen_output_config) - statistics_gen = statistics_gen_component.StatisticsGen( - examples=example_gen.outputs['examples']).with_id('foo') - - test_pipeline_root = dsl.PipelineParam(name='pipeline-root-param') - pipeline = tfx_pipeline.Pipeline( - pipeline_name=self._test_pipeline_name, - pipeline_root='test_pipeline_root', - metadata_connection_config=metadata_store_pb2.ConnectionConfig(), - components=[example_gen, statistics_gen], - ) - - self._metadata_config = kubeflow_pb2.KubeflowMetadataConfig() - self._metadata_config.mysql_db_service_host.environment_variable = 'MYSQL_SERVICE_HOST' - self._tfx_ir = pipeline_pb2.Pipeline() - with dsl.Pipeline('test_pipeline'): - self.example_gen = base_component.BaseComponent( - component=example_gen, - depends_on=set(), - pipeline=pipeline, - pipeline_root=test_pipeline_root, - tfx_image='container_image', - kubeflow_metadata_config=self._metadata_config, - tfx_ir=self._tfx_ir, - pod_labels_to_attach={}, - runtime_parameters=[example_gen_output_config]) - self.statistics_gen = base_component.BaseComponent( - component=statistics_gen, - depends_on=set(), - pipeline=pipeline, - pipeline_root=test_pipeline_root, - tfx_image='container_image', - kubeflow_metadata_config=self._metadata_config, - tfx_ir=self._tfx_ir, - pod_labels_to_attach={}, - runtime_parameters=[] - ) - - self.tfx_example_gen = example_gen - self.tfx_statistics_gen = statistics_gen - - def testContainerOpArguments(self): - statistics_gen_expected_args = [ - '--pipeline_root', - '{{pipelineparam:op=;name=pipeline-root-param}}', - '--kubeflow_metadata_config', - '{\n' - ' "mysql_db_service_host": {\n' - ' "environment_variable": "MYSQL_SERVICE_HOST"\n' - ' }\n' - '}', - '--node_id', - 'foo', - '--tfx_ir', - '{}', - '--metadata_ui_path', - '/mlpipeline-ui-metadata.json', - ] - example_gen_expected_args = [ - '--pipeline_root', - '{{pipelineparam:op=;name=pipeline-root-param}}', - '--kubeflow_metadata_config', - '{\n' - ' "mysql_db_service_host": {\n' - ' "environment_variable": "MYSQL_SERVICE_HOST"\n' - ' }\n' - '}', - '--node_id', - 'CsvExampleGen', - '--tfx_ir', - '{}', - '--metadata_ui_path', - '/mlpipeline-ui-metadata.json', - '--runtime_parameter', - 'example-gen-output-config=STRING:{{pipelineparam:op=;name=example-gen-output-config}}', - ] - try: - self.assertEqual( - self.statistics_gen.container_op - .arguments, - statistics_gen_expected_args) - self.assertEqual( - self.example_gen.container_op.arguments, - example_gen_expected_args) - except AssertionError: - # Print out full arguments for debugging. - logging.error('==== BEGIN STATISTICSGEN CONTAINER OP ARGUMENT DUMP ====') - logging.error( - json.dumps(self.statistics_gen.container_op.arguments, indent=2)) - logging.error('==== END STATISTICSGEN CONTAINER OP ARGUMENT DUMP ====') - logging.error('==== BEGIN EXAMPLEGEN CONTAINER OP ARGUMENT DUMP ====') - logging.error( - json.dumps(self.example_gen.container_op.arguments, indent=2)) - logging.error('==== END EXAMPLEGEN CONTAINER OP ARGUMENT DUMP ====') - raise - - def testContainerOpName(self): - self.assertEqual('foo', self.tfx_statistics_gen.id) - self.assertEqual('foo', self.statistics_gen.container_op.name) - - -if __name__ == '__main__': - tf.test.main() diff --git a/tfx/orchestration/kubeflow/kubeflow_dag_runner.py b/tfx/orchestration/kubeflow/kubeflow_dag_runner.py deleted file mode 100644 index 1d320aeaf5..0000000000 --- a/tfx/orchestration/kubeflow/kubeflow_dag_runner.py +++ /dev/null @@ -1,471 +0,0 @@ -# Copyright 2019 Google LLC. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""TFX runner for Kubeflow.""" - -import collections -import copy -import os -from typing import Any, Callable, Dict, List, Optional, Type, cast, MutableMapping -from absl import logging - -from kfp import compiler -from kfp import dsl -from kfp import gcp -from kubernetes import client as k8s_client -from tfx import version -from tfx.dsl.compiler import compiler as tfx_compiler -from tfx.dsl.components.base import base_component as tfx_base_component -from tfx.dsl.components.base import base_node -from tfx.orchestration import data_types -from tfx.orchestration import pipeline as tfx_pipeline -from tfx.orchestration import tfx_runner -from tfx.orchestration.config import pipeline_config -from tfx.orchestration.kubeflow import base_component -from tfx.orchestration.kubeflow import utils -from tfx.orchestration.kubeflow.proto import kubeflow_pb2 -from tfx.orchestration.launcher import base_component_launcher -from tfx.orchestration.launcher import in_process_component_launcher -from tfx.orchestration.launcher import kubernetes_component_launcher -from tfx.proto.orchestration import pipeline_pb2 -from tfx.utils import telemetry_utils - - -# OpFunc represents the type of a function that takes as input a -# dsl.ContainerOp and returns the same object. Common operations such as adding -# k8s secrets, mounting volumes, specifying the use of TPUs and so on can be -# specified as an OpFunc. -# See example usage here: -# https://github.com/kubeflow/pipelines/blob/master/sdk/python/kfp/gcp.py -OpFunc = Callable[[dsl.ContainerOp], dsl.ContainerOp] - -# Default secret name for GCP credentials. This secret is installed as part of -# a typical Kubeflow installation when the component is GKE. -_KUBEFLOW_GCP_SECRET_NAME = 'user-gcp-sa' - -# Default TFX container image to use in KubeflowDagRunner. -DEFAULT_KUBEFLOW_TFX_IMAGE = 'tensorflow/tfx:%s' % (version.__version__,) - - -def _mount_config_map_op(config_map_name: str) -> OpFunc: - """Mounts all key-value pairs found in the named Kubernetes ConfigMap. - - All key-value pairs in the ConfigMap are mounted as environment variables. - - Args: - config_map_name: The name of the ConfigMap resource. - - Returns: - An OpFunc for mounting the ConfigMap. - """ - - def mount_config_map(container_op: dsl.ContainerOp): - config_map_ref = k8s_client.V1ConfigMapEnvSource( - name=config_map_name, optional=True) - container_op.container.add_env_from( - k8s_client.V1EnvFromSource(config_map_ref=config_map_ref)) - - return mount_config_map - - -def _mount_secret_op(secret_name: str) -> OpFunc: - """Mounts all key-value pairs found in the named Kubernetes Secret. - - All key-value pairs in the Secret are mounted as environment variables. - - Args: - secret_name: The name of the Secret resource. - - Returns: - An OpFunc for mounting the Secret. - """ - - def mount_secret(container_op: dsl.ContainerOp): - secret_ref = k8s_client.V1ConfigMapEnvSource( - name=secret_name, optional=True) - - container_op.container.add_env_from( - k8s_client.V1EnvFromSource(secret_ref=secret_ref)) - - return mount_secret - - -def get_default_pipeline_operator_funcs( - use_gcp_sa: bool = False) -> List[OpFunc]: - """Returns a default list of pipeline operator functions. - - Args: - use_gcp_sa: If true, mount a GCP service account secret to each pod, with - the name _KUBEFLOW_GCP_SECRET_NAME. - - Returns: - A list of functions with type OpFunc. - """ - # Enables authentication for GCP services if needed. - gcp_secret_op = gcp.use_gcp_secret(_KUBEFLOW_GCP_SECRET_NAME) - - # Mounts configmap containing Metadata gRPC server configuration. - mount_config_map_op = _mount_config_map_op('metadata-grpc-configmap') - if use_gcp_sa: - return [gcp_secret_op, mount_config_map_op] - else: - return [mount_config_map_op] - - -def get_default_kubeflow_metadata_config( -) -> kubeflow_pb2.KubeflowMetadataConfig: - """Returns the default metadata connection config for Kubeflow. - - Returns: - A config proto that will be serialized as JSON and passed to the running - container so the TFX component driver is able to communicate with MLMD in - a Kubeflow cluster. - """ - # The default metadata configuration for a Kubeflow Pipelines cluster is - # codified as a Kubernetes ConfigMap - # https://github.com/kubeflow/pipelines/blob/master/manifests/kustomize/base/metadata/metadata-grpc-configmap.yaml - - config = kubeflow_pb2.KubeflowMetadataConfig() - # The environment variable to use to obtain the Metadata gRPC service host in - # the cluster that is backing Kubeflow Metadata. Note that the key in the - # config map and therefore environment variable used, are lower-cased. - config.grpc_config.grpc_service_host.environment_variable = 'METADATA_GRPC_SERVICE_HOST' - # The environment variable to use to obtain the Metadata grpc service port in - # the cluster that is backing Kubeflow Metadata. - config.grpc_config.grpc_service_port.environment_variable = 'METADATA_GRPC_SERVICE_PORT' - - return config - - -def get_default_pod_labels() -> Dict[str, str]: - """Returns the default pod label dict for Kubeflow.""" - # KFP default transformers add pod env: - # https://github.com/kubeflow/pipelines/blob/0.1.32/sdk/python/kfp/compiler/_default_transformers.py - result = { - 'add-pod-env': 'true', - telemetry_utils.LABEL_KFP_SDK_ENV: 'tfx' - } - return result - - -def get_default_output_filename(pipeline_name: str) -> str: - return pipeline_name + '.tar.gz' - - -class KubeflowDagRunnerConfig(pipeline_config.PipelineConfig): - """Runtime configuration parameters specific to execution on Kubeflow.""" - - def __init__( - self, - pipeline_operator_funcs: Optional[List[OpFunc]] = None, - tfx_image: Optional[str] = None, - kubeflow_metadata_config: Optional[ - kubeflow_pb2.KubeflowMetadataConfig] = None, - # TODO(b/143883035): Figure out the best practice to put the - # SUPPORTED_LAUNCHER_CLASSES - supported_launcher_classes: Optional[List[Type[ - base_component_launcher.BaseComponentLauncher]]] = None, - metadata_ui_path: str = '/mlpipeline-ui-metadata.json', - **kwargs): - """Creates a KubeflowDagRunnerConfig object. - - The user can use pipeline_operator_funcs to apply modifications to - ContainerOps used in the pipeline. For example, to ensure the pipeline - steps mount a GCP secret, and a Persistent Volume, one can create config - object like so: - - from kfp import gcp, onprem - mount_secret_op = gcp.use_secret('my-secret-name) - mount_volume_op = onprem.mount_pvc( - "my-persistent-volume-claim", - "my-volume-name", - "/mnt/volume-mount-path") - - config = KubeflowDagRunnerConfig( - pipeline_operator_funcs=[mount_secret_op, mount_volume_op] - ) - - Args: - pipeline_operator_funcs: A list of ContainerOp modifying functions that - will be applied to every container step in the pipeline. - tfx_image: The TFX container image to use in the pipeline. - kubeflow_metadata_config: Runtime configuration to use to connect to - Kubeflow metadata. - supported_launcher_classes: A list of component launcher classes that are - supported by the current pipeline. List sequence determines the order in - which launchers are chosen for each component being run. - metadata_ui_path: File location for metadata-ui-metadata.json file. - **kwargs: keyword args for PipelineConfig. - """ - supported_launcher_classes = supported_launcher_classes or [ - in_process_component_launcher.InProcessComponentLauncher, - kubernetes_component_launcher.KubernetesComponentLauncher, - ] - super().__init__( - supported_launcher_classes=supported_launcher_classes, **kwargs) - self.pipeline_operator_funcs = ( - pipeline_operator_funcs or get_default_pipeline_operator_funcs()) - self.tfx_image = tfx_image or DEFAULT_KUBEFLOW_TFX_IMAGE - self.kubeflow_metadata_config = ( - kubeflow_metadata_config or get_default_kubeflow_metadata_config()) - self.metadata_ui_path = metadata_ui_path - - -class KubeflowDagRunner(tfx_runner.TfxRunner): - """Kubeflow Pipelines runner. - - Constructs a pipeline definition YAML file based on the TFX logical pipeline. - """ - - def __init__(self, - output_dir: Optional[str] = None, - output_filename: Optional[str] = None, - config: Optional[KubeflowDagRunnerConfig] = None, - pod_labels_to_attach: Optional[Dict[str, str]] = None): - """Initializes KubeflowDagRunner for compiling a Kubeflow Pipeline. - - Args: - output_dir: An optional output directory into which to output the pipeline - definition files. Defaults to the current working directory. - output_filename: An optional output file name for the pipeline definition - file. Defaults to pipeline_name.tar.gz when compiling a TFX pipeline. - Currently supports .tar.gz, .tgz, .zip, .yaml, .yml formats. See - https://github.com/kubeflow/pipelines/blob/181de66cf9fa87bcd0fe9291926790c400140783/sdk/python/kfp/compiler/compiler.py#L851 - for format restriction. - config: An optional KubeflowDagRunnerConfig object to specify runtime - configuration when running the pipeline under Kubeflow. - pod_labels_to_attach: Optional set of pod labels to attach to GKE pod - spinned up for this pipeline. Default to the 3 labels: - 1. add-pod-env: true, - 2. pipeline SDK type, - 3. pipeline unique ID, - where 2 and 3 are instrumentation of usage tracking. - """ - if config and not isinstance(config, KubeflowDagRunnerConfig): - raise TypeError('config must be type of KubeflowDagRunnerConfig.') - super().__init__(config or KubeflowDagRunnerConfig()) - self._config = cast(KubeflowDagRunnerConfig, self._config) - self._output_dir = output_dir or os.getcwd() - self._output_filename = output_filename - self._compiler = compiler.Compiler() - self._tfx_compiler = tfx_compiler.Compiler() - self._params = [] # List of dsl.PipelineParam used in this pipeline. - self._params_by_component_id = collections.defaultdict(list) - self._deduped_parameter_names = set() # Set of unique param names used. - self._exit_handler = None - if pod_labels_to_attach is None: - self._pod_labels_to_attach = get_default_pod_labels() - else: - self._pod_labels_to_attach = pod_labels_to_attach - - def _parse_parameter_from_component( - self, component: tfx_base_component.BaseComponent) -> None: - """Extract embedded RuntimeParameter placeholders from a component. - - Extract embedded RuntimeParameter placeholders from a component, then append - the corresponding dsl.PipelineParam to KubeflowDagRunner. - - Args: - component: a TFX component. - """ - - deduped_parameter_names_for_component = set() - for parameter in component.exec_properties.values(): - if not isinstance(parameter, data_types.RuntimeParameter): - continue - # Ignore pipeline root because it will be added later. - if parameter.name == tfx_pipeline.ROOT_PARAMETER.name: - continue - if parameter.name in deduped_parameter_names_for_component: - continue - - deduped_parameter_names_for_component.add(parameter.name) - self._params_by_component_id[component.id].append(parameter) - if parameter.name not in self._deduped_parameter_names: - self._deduped_parameter_names.add(parameter.name) - # TODO(b/178436919): Create a test to cover default value rendering - # and move the external code reference over there. - # The default needs to be serialized then passed to dsl.PipelineParam. - # See - # https://github.com/kubeflow/pipelines/blob/f65391309650fdc967586529e79af178241b4c2c/sdk/python/kfp/dsl/_pipeline_param.py#L154 - dsl_parameter = dsl.PipelineParam( - name=parameter.name, value=str(parameter.default)) - self._params.append(dsl_parameter) - - def _parse_parameter_from_pipeline(self, - pipeline: tfx_pipeline.Pipeline) -> None: - """Extract all the RuntimeParameter placeholders from the pipeline.""" - - for component in pipeline.components: - self._parse_parameter_from_component(component) - - def _construct_pipeline_graph(self, pipeline: tfx_pipeline.Pipeline, - pipeline_root: dsl.PipelineParam): - """Constructs a Kubeflow Pipeline graph. - - Args: - pipeline: The logical TFX pipeline to base the construction on. - pipeline_root: dsl.PipelineParam representing the pipeline root. - """ - component_to_kfp_op = {} - - for component in pipeline.components: - utils.replace_exec_properties(component) - tfx_ir = self._generate_tfx_ir(pipeline) - - # Assumption: There is a partial ordering of components in the list, i.e., - # if component A depends on component B and C, then A appears after B and C - # in the list. - for component in pipeline.components: - # Keep track of the set of upstream dsl.ContainerOps for this component. - depends_on = set() - - for upstream_component in component.upstream_nodes: - depends_on.add(component_to_kfp_op[upstream_component]) - - # remove the extra pipeline node information - tfx_node_ir = self._dehydrate_tfx_ir(tfx_ir, component.id) - - # Disable cache for exit_handler - if self._exit_handler and component.id == self._exit_handler.id: - tfx_node_ir.nodes[ - 0].pipeline_node.execution_options.caching_options.enable_cache = False - - kfp_component = base_component.BaseComponent( - component=component, - depends_on=depends_on, - pipeline=pipeline, - pipeline_root=pipeline_root, - tfx_image=self._config.tfx_image, - kubeflow_metadata_config=self._config.kubeflow_metadata_config, - pod_labels_to_attach=self._pod_labels_to_attach, - tfx_ir=tfx_node_ir, - metadata_ui_path=self._config.metadata_ui_path, - runtime_parameters=(self._params_by_component_id[component.id] + - [tfx_pipeline.ROOT_PARAMETER])) - - for operator in self._config.pipeline_operator_funcs: - kfp_component.container_op.apply(operator) - - component_to_kfp_op[component] = kfp_component.container_op - - # If exit handler defined create an exit handler and add all ops to it. - if self._exit_handler: - exit_op = component_to_kfp_op[self._exit_handler] - with dsl.ExitHandler(exit_op) as exit_handler_group: - exit_handler_group.name = utils.TFX_DAG_NAME - # KFP get_default_pipeline should have the pipeline object when invoked - # while compiling. This allows us to retrieve all ops from pipeline - # group (should be the only group in the pipeline). - pipeline_group = dsl.Pipeline.get_default_pipeline().groups[0] - - # Transfer all ops to exit_handler_group which will now contain all ops. - exit_handler_group.ops = pipeline_group.ops - # remove all ops from pipeline_group. Otherwise compiler fails in - # https://github.com/kubeflow/pipelines/blob/8aee62142aa13ae42b2dd18257d7e034861b7e5e/sdk/python/kfp/compiler/compiler.py#L893 - pipeline_group.ops = [] - - def _del_unused_field(self, node_id: str, message_dict: MutableMapping[str, - Any]): - for item in list(message_dict.keys()): - if item != node_id: - del message_dict[item] - - def _dehydrate_tfx_ir(self, original_pipeline: pipeline_pb2.Pipeline, - node_id: str) -> pipeline_pb2.Pipeline: - pipeline = copy.deepcopy(original_pipeline) - for node in pipeline.nodes: - if (node.WhichOneof('node') == 'pipeline_node' and - node.pipeline_node.node_info.id == node_id): - del pipeline.nodes[:] - pipeline.nodes.extend([node]) - break - - deployment_config = pipeline_pb2.IntermediateDeploymentConfig() - pipeline.deployment_config.Unpack(deployment_config) - self._del_unused_field(node_id, deployment_config.executor_specs) - self._del_unused_field(node_id, deployment_config.custom_driver_specs) - self._del_unused_field(node_id, - deployment_config.node_level_platform_configs) - pipeline.deployment_config.Pack(deployment_config) - return pipeline - - def _generate_tfx_ir( - self, pipeline: tfx_pipeline.Pipeline) -> Optional[pipeline_pb2.Pipeline]: - result = self._tfx_compiler.compile(pipeline) - return result - - def run(self, pipeline: tfx_pipeline.Pipeline): - """Compiles and outputs a Kubeflow Pipeline YAML definition file. - - Args: - pipeline: The logical TFX pipeline to use when building the Kubeflow - pipeline. - """ - # If exit handler is defined, append to existing pipeline components. - if self._exit_handler: - original_pipeline = pipeline - pipeline = copy.copy(original_pipeline) - pipeline.components = [*pipeline.components, self._exit_handler] - - for component in pipeline.components: - # TODO(b/187122662): Pass through pip dependencies as a first-class - # component flag. - if isinstance(component, tfx_base_component.BaseComponent): - component._resolve_pip_dependencies( # pylint: disable=protected-access - pipeline.pipeline_info.pipeline_root) - - # KFP DSL representation of pipeline root parameter. - dsl_pipeline_root = dsl.PipelineParam( - name=tfx_pipeline.ROOT_PARAMETER.name, - value=pipeline.pipeline_info.pipeline_root) - self._params.append(dsl_pipeline_root) - - def _construct_pipeline(): - """Constructs a Kubeflow pipeline. - - Creates Kubeflow ContainerOps for each TFX component encountered in the - logical pipeline definition. - """ - self._construct_pipeline_graph(pipeline, dsl_pipeline_root) - - # Need to run this first to get self._params populated. Then KFP compiler - # can correctly match default value with PipelineParam. - self._parse_parameter_from_pipeline(pipeline) - - file_name = self._output_filename or get_default_output_filename( - pipeline.pipeline_info.pipeline_name) - # Create workflow spec and write out to package. - self._compiler._create_and_write_workflow( # pylint: disable=protected-access - pipeline_func=_construct_pipeline, - pipeline_name=pipeline.pipeline_info.pipeline_name, - params_list=self._params, - package_path=os.path.join(self._output_dir, file_name)) - - def set_exit_handler(self, exit_handler: base_node.BaseNode): - """Set exit handler components for the Kubeflow dag runner. - - This feature is currently experimental without backward compatibility - gaurantee. - - Args: - exit_handler: exit handler component. - """ - if not exit_handler: - logging.error('Setting empty exit handler is not allowed.') - return - assert not exit_handler.downstream_nodes, ('Exit handler should not depend ' - 'on any other node.') - assert not exit_handler.upstream_nodes, ('Exit handler should not depend on' - ' any other node.') - self._exit_handler = exit_handler diff --git a/tfx/orchestration/kubeflow/kubeflow_dag_runner_test.py b/tfx/orchestration/kubeflow/kubeflow_dag_runner_test.py deleted file mode 100644 index 47ac982f48..0000000000 --- a/tfx/orchestration/kubeflow/kubeflow_dag_runner_test.py +++ /dev/null @@ -1,329 +0,0 @@ -# Copyright 2019 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Tests for tfx.orchestration.kubeflow.kubeflow_dag_runner.""" - -import json -import os -import tarfile -from typing import List - -from kfp import onprem -import tensorflow as tf -from tfx.components.statistics_gen import component as statistics_gen_component -from tfx.dsl.component.experimental import executor_specs -from tfx.dsl.component.experimental.annotations import Parameter -from tfx.dsl.component.experimental.decorators import component -from tfx.dsl.components.base import base_component -from tfx.dsl.io import fileio -from tfx.extensions.google_cloud_big_query.example_gen import component as big_query_example_gen_component -from tfx.orchestration import data_types -from tfx.orchestration import pipeline as tfx_pipeline -from tfx.orchestration.kubeflow import kubeflow_dag_runner -from tfx.orchestration.kubeflow.decorators import FinalStatusStr -from tfx.proto import example_gen_pb2 -from tfx.types import component_spec -from tfx.utils import telemetry_utils -from tfx.utils import test_case_utils -import yaml - -from ml_metadata.proto import metadata_store_pb2 - - -@component -def say_hi(status: Parameter[str]): - print(status) - - -# 2-step pipeline under test. -def _two_step_pipeline() -> tfx_pipeline.Pipeline: - default_input_config = json.dumps({ - 'splits': [{ - 'name': 'single_split', - 'pattern': 'SELECT * FROM default-table' - }] - }) - input_config = data_types.RuntimeParameter( - name='input_config', ptype=str, default=default_input_config) - example_gen = big_query_example_gen_component.BigQueryExampleGen( - input_config=input_config, output_config=example_gen_pb2.Output()) - statistics_gen = statistics_gen_component.StatisticsGen( - examples=example_gen.outputs['examples']) - return tfx_pipeline.Pipeline( - pipeline_name='two_step_pipeline', - pipeline_root='pipeline_root', - metadata_connection_config=metadata_store_pb2.ConnectionConfig(), - components=[example_gen, statistics_gen], - ) - - -class _DummySpec(component_spec.ComponentSpec): - INPUTS = {} - OUTPUTS = {} - PARAMETERS = {} - - -class _DummyComponent(base_component.BaseComponent): - SPEC_CLASS = _DummySpec - EXECUTOR_SPEC = executor_specs.TemplatedExecutorContainerSpec( - image='dummy:latest', command=['ls']) - - def __init__(self): - super().__init__(_DummySpec()) - - -def _container_component_pipeline() -> tfx_pipeline.Pipeline: - return tfx_pipeline.Pipeline( - pipeline_name='container_component_pipeline', - pipeline_root='pipeline_root', - metadata_connection_config=metadata_store_pb2.ConnectionConfig(), - components=[_DummyComponent()], - ) - - -class KubeflowDagRunnerTest(test_case_utils.TfxTest): - - def setUp(self): - super().setUp() - self._source_data_dir = os.path.join( - os.path.dirname(os.path.abspath(__file__)), 'testdata') - self.enter_context(test_case_utils.change_working_dir(self.tmp_dir)) - - def _compare_tfx_ir_against_testdata(self, args: List[str], golden_file: str): - index_of_tfx_ir_flag = args.index('--tfx_ir') - self.assertAllGreater(len(args), index_of_tfx_ir_flag) - real_tfx_ir = json.loads(args[index_of_tfx_ir_flag + 1]) - real_tfx_ir_str = json.dumps(real_tfx_ir, sort_keys=True) - with open(os.path.join(self._source_data_dir, - golden_file)) as tfx_ir_json_file: - formatted_tfx_ir = json.dumps(json.load(tfx_ir_json_file), sort_keys=True) - self.assertEqual(real_tfx_ir_str, formatted_tfx_ir) - - def testTwoStepPipeline(self): - """Sanity-checks the construction and dependencies for a 2-step pipeline.""" - kubeflow_dag_runner.KubeflowDagRunner().run(_two_step_pipeline()) - file_path = os.path.join(self.tmp_dir, 'two_step_pipeline.tar.gz') - self.assertTrue(fileio.exists(file_path)) - - with tarfile.TarFile.open(file_path).extractfile( - 'pipeline.yaml') as pipeline_file: - self.assertIsNotNone(pipeline_file) - pipeline = yaml.safe_load(pipeline_file) - - containers = [ - c for c in pipeline['spec']['templates'] if 'container' in c - ] - self.assertEqual(2, len(containers)) - - big_query_container = [ - c for c in containers if c['name'] == 'bigqueryexamplegen' - ] - self.assertEqual(1, len(big_query_container)) - self.assertEqual([ - 'python', - '-m', - 'tfx.orchestration.kubeflow.container_entrypoint', - ], big_query_container[0]['container']['command']) - self.assertIn('--tfx_ir', big_query_container[0]['container']['args']) - self.assertIn('--node_id', big_query_container[0]['container']['args']) - self._compare_tfx_ir_against_testdata( - big_query_container[0]['container']['args'], - 'two_step_pipeline_post_dehydrate_ir.json') - - statistics_gen_container = [ - c for c in containers if c['name'] == 'statisticsgen' - ] - self.assertEqual(1, len(statistics_gen_container)) - - # Ensure the pod labels are correctly appended. - metadata = [ - c['metadata'] for c in pipeline['spec']['templates'] if 'dag' not in c - ] - for m in metadata: - self.assertEqual('tfx', m['labels'][telemetry_utils.LABEL_KFP_SDK_ENV]) - - # Ensure dependencies between components are captured. - dag = [c for c in pipeline['spec']['templates'] if 'dag' in c] - self.assertEqual(1, len(dag)) - - self.assertEqual( - { - 'tasks': [{ - 'name': 'bigqueryexamplegen', - 'template': 'bigqueryexamplegen', - 'arguments': { - 'parameters': [{ - 'name': 'input_config', - 'value': '{{inputs.parameters.input_config}}' - }, { - 'name': 'pipeline-root', - 'value': '{{inputs.parameters.pipeline-root}}' - }] - } - }, { - 'name': 'statisticsgen', - 'template': 'statisticsgen', - 'arguments': { - 'parameters': [{ - 'name': 'pipeline-root', - 'value': '{{inputs.parameters.pipeline-root}}' - }] - }, - 'dependencies': ['bigqueryexamplegen'], - }] - }, dag[0]['dag']) - - def testDefaultPipelineOperatorFuncs(self): - kubeflow_dag_runner.KubeflowDagRunner().run(_two_step_pipeline()) - file_path = 'two_step_pipeline.tar.gz' - self.assertTrue(fileio.exists(file_path)) - - with tarfile.TarFile.open(file_path).extractfile( - 'pipeline.yaml') as pipeline_file: - self.assertIsNotNone(pipeline_file) - pipeline = yaml.safe_load(pipeline_file) - - containers = [ - c for c in pipeline['spec']['templates'] if 'container' in c - ] - self.assertEqual(2, len(containers)) - - def testMountGcpServiceAccount(self): - kubeflow_dag_runner.KubeflowDagRunner( - config=kubeflow_dag_runner.KubeflowDagRunnerConfig( - pipeline_operator_funcs=kubeflow_dag_runner - .get_default_pipeline_operator_funcs(use_gcp_sa=True))).run( - _two_step_pipeline()) - file_path = 'two_step_pipeline.tar.gz' - self.assertTrue(fileio.exists(file_path)) - - with tarfile.TarFile.open(file_path).extractfile( - 'pipeline.yaml') as pipeline_file: - self.assertIsNotNone(pipeline_file) - pipeline = yaml.safe_load(pipeline_file) - - containers = [ - c for c in pipeline['spec']['templates'] if 'container' in c - ] - self.assertEqual(2, len(containers)) - - # Check that each container has default GCP credentials. - - container_0 = containers[0] - env = [ - env for env in container_0['container']['env'] - if env['name'] == 'GOOGLE_APPLICATION_CREDENTIALS' - ] - self.assertEqual(1, len(env)) - self.assertEqual('/secret/gcp-credentials/user-gcp-sa.json', - env[0]['value']) - - container_1 = containers[0] - env = [ - env for env in container_1['container']['env'] - if env['name'] == 'GOOGLE_APPLICATION_CREDENTIALS' - ] - self.assertEqual(1, len(env)) - self.assertEqual('/secret/gcp-credentials/user-gcp-sa.json', - env[0]['value']) - - def testVolumeMountingPipelineOperatorFuncs(self): - mount_volume_op = onprem.mount_pvc('my-persistent-volume-claim', - 'my-volume-name', - '/mnt/volume-mount-path') - config = kubeflow_dag_runner.KubeflowDagRunnerConfig( - pipeline_operator_funcs=[mount_volume_op]) - - kubeflow_dag_runner.KubeflowDagRunner(config=config).run( - _two_step_pipeline()) - file_path = 'two_step_pipeline.tar.gz' - self.assertTrue(fileio.exists(file_path)) - - with tarfile.TarFile.open(file_path).extractfile( - 'pipeline.yaml') as pipeline_file: - self.assertIsNotNone(pipeline_file) - pipeline = yaml.safe_load(pipeline_file) - - container_templates = [ - c for c in pipeline['spec']['templates'] if 'container' in c - ] - self.assertEqual(2, len(container_templates)) - - volumes = [{ - 'name': 'my-volume-name', - 'persistentVolumeClaim': { - 'claimName': 'my-persistent-volume-claim' - } - }] - - # Check that the PVC is specified for kfp<=0.1.31.1. - if 'volumes' in pipeline['spec']: - self.assertEqual(volumes, pipeline['spec']['volumes']) - - for template in container_templates: - # Check that each container has the volume mounted. - self.assertEqual([{ - 'name': 'my-volume-name', - 'mountPath': '/mnt/volume-mount-path' - }], template['container']['volumeMounts']) - - # Check that each template has the PVC specified for kfp>=0.1.31.2. - if 'volumes' in template: - self.assertEqual(volumes, template['volumes']) - - def testContainerComponent(self): - kubeflow_dag_runner.KubeflowDagRunner().run(_container_component_pipeline()) - file_path = os.path.join(self.tmp_dir, - 'container_component_pipeline.tar.gz') - self.assertTrue(fileio.exists(file_path)) - - with tarfile.TarFile.open(file_path).extractfile( - 'pipeline.yaml') as pipeline_file: - self.assertIsNotNone(pipeline_file) - pipeline = yaml.safe_load(pipeline_file) - containers = [ - c for c in pipeline['spec']['templates'] if 'container' in c - ] - self.assertLen(containers, 1) - component_args = containers[0]['container']['args'] - self.assertIn('--node_id', component_args) - - def testExitHandler(self): - dag_runner = kubeflow_dag_runner.KubeflowDagRunner() - dag_runner.set_exit_handler(say_hi(status=FinalStatusStr())) - pipeline = _container_component_pipeline() - pipeline.enable_cache = True - dag_runner.run(pipeline) - file_path = os.path.join(self.tmp_dir, - 'container_component_pipeline.tar.gz') - self.assertTrue(fileio.exists(file_path)) - - with tarfile.TarFile.open(file_path).extractfile( - 'pipeline.yaml') as pipeline_file: - self.assertIsNotNone(pipeline_file) - pipeline = yaml.safe_load(pipeline_file) - self.assertIn('onExit', pipeline['spec']) - containers = [ - c for c in pipeline['spec']['templates'] if 'container' in c - ] - self.assertLen(containers, 2) - exit_component_args = ' '.join(containers[1]['container']['args']) - self.assertIn('{{workflow.status}}', exit_component_args) - self.assertNotIn('enableCache', exit_component_args) - first_component_args = ' '.join(containers[0]['container']['args']) - self.assertNotIn('{{workflow.status}}', first_component_args) - self.assertIn('enableCache', first_component_args) - - -if __name__ == '__main__': - tf.test.main() diff --git a/tfx/orchestration/kubeflow/proto/BUILD b/tfx/orchestration/kubeflow/proto/BUILD deleted file mode 100644 index b0ee822ee6..0000000000 --- a/tfx/orchestration/kubeflow/proto/BUILD +++ /dev/null @@ -1,25 +0,0 @@ -load("//tfx:tfx.bzl", "tfx_py_proto_library") - -# Copyright 2020 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -package(default_visibility = ["//visibility:public"]) - -licenses(["notice"]) # Apache 2.0 - -exports_files(["LICENSE"]) - -tfx_py_proto_library( - name = "kubeflow_proto_py_pb2", - srcs = ["kubeflow.proto"], -) diff --git a/tfx/orchestration/kubeflow/proto/kubeflow.proto b/tfx/orchestration/kubeflow/proto/kubeflow.proto deleted file mode 100644 index bab34bdc69..0000000000 --- a/tfx/orchestration/kubeflow/proto/kubeflow.proto +++ /dev/null @@ -1,52 +0,0 @@ -// Copyright 2019 Google LLC. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -syntax = "proto3"; - -package tfx.orchestration.kubeflow.proto; - -// ConfigValue specifies how Kubeflow components should obtain a runtime -// configuration parameter value. -message ConfigValue { - oneof value_from { - // Specifies a literal value to use. - string value = 1; - // Specifies that the parameter value should be obtained from the - // environment variable with this specified value. - string environment_variable = 2; - } -} - -// Message to specify the gRPC server configuration. -message KubeflowGrpcMetadataConfig { - // ML Metadata gRPC service host in the cluster. - ConfigValue grpc_service_host = 1; - // ML Metadata gRPC service port in the cluster. - ConfigValue grpc_service_port = 2; -} - -// Message to specify Metadata configuration. -message KubeflowMetadataConfig { - // Following mysql connection configuration fields will be deprecated soon in - // favor of oneof connection_config. - - ConfigValue mysql_db_service_host = 1 [deprecated = true]; - ConfigValue mysql_db_service_port = 2 [deprecated = true]; - ConfigValue mysql_db_name = 3 [deprecated = true]; - ConfigValue mysql_db_user = 4 [deprecated = true]; - ConfigValue mysql_db_password = 5 [deprecated = true]; - - oneof connection_config { - KubeflowGrpcMetadataConfig grpc_config = 7; - } -} diff --git a/tfx/orchestration/pipeline.py b/tfx/orchestration/pipeline.py index b2622eda97..6920441576 100644 --- a/tfx/orchestration/pipeline.py +++ b/tfx/orchestration/pipeline.py @@ -40,7 +40,7 @@ _MAX_PIPELINE_NAME_LENGTH = 63 # Pipeline root is by default specified as a RuntimeParameter when runnning on -# KubeflowDagRunner. This constant offers users an easy access to the pipeline +# KubeflowV2DagRunner. This constant offers users an easy access to the pipeline # root placeholder when defining a pipeline. For example, # # pusher = Pusher( diff --git a/tfx/tools/cli/handler/kubeflow_dag_runner_patcher.py b/tfx/tools/cli/handler/kubeflow_dag_runner_patcher.py deleted file mode 100644 index 01ea50d940..0000000000 --- a/tfx/tools/cli/handler/kubeflow_dag_runner_patcher.py +++ /dev/null @@ -1,86 +0,0 @@ -# Copyright 2021 Google LLC. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Patches KubeflowDagRunner to read and update argument during compilation.""" - -import os -import tempfile -import typing -from typing import Any, Callable, MutableMapping, Optional, Type - -from tfx.orchestration import pipeline as tfx_pipeline -from tfx.orchestration import tfx_runner -from tfx.orchestration.kubeflow import kubeflow_dag_runner -from tfx.tools.cli.handler import dag_runner_patcher - - -def _get_temporary_package_filename(pipeline_name: str, directory: str) -> str: - # mkstemp will create and open a file named 'temp_xxxxx.tar.gz'. - fd, path = tempfile.mkstemp('.tar.gz', f'temp_{pipeline_name}', directory) - os.close(fd) - return os.path.basename(path) - - -class KubeflowDagRunnerPatcher(dag_runner_patcher.DagRunnerPatcher): - """Patches KubeflowDagRunner.run() with several customizations for CLI.""" - - USE_TEMPORARY_OUTPUT_FILE = 'use_temporary_output_file' - OUTPUT_FILE_PATH = 'output_file_path' - - def __init__(self, - call_real_run: bool, - use_temporary_output_file: bool = False, - build_image_fn: Optional[Callable[[str], str]] = None): - """Initialize KubeflowDagRunnerPatcher. - - Args: - call_real_run: Specify KubeflowDagRunner.run() should be called. - use_temporary_output_file: If True, we will override the default value of - the pipeline package output path. Even if it is set to True, if users - specified a path in KubeflowDagRunner then this option will be ignored. - build_image_fn: If specified, call the function with the configured - tfx_image in the pipeline. The result of the function will be - substituted as a new tfx_image of the pipeline. - """ - super().__init__(call_real_run) - self._build_image_fn = build_image_fn - self._use_temporary_output_file = use_temporary_output_file - - def _before_run(self, runner: tfx_runner.TfxRunner, - pipeline: tfx_pipeline.Pipeline, - context: MutableMapping[str, Any]) -> None: - runner = typing.cast(kubeflow_dag_runner.KubeflowDagRunner, runner) - runner_config = typing.cast(kubeflow_dag_runner.KubeflowDagRunnerConfig, - runner.config) - if self._build_image_fn is not None: - # Replace the image for the pipeline with the newly built image name. - # This new image name will include the sha256 image id. - runner_config.tfx_image = self._build_image_fn(runner_config.tfx_image) - - # pylint: disable=protected-access - context[self.USE_TEMPORARY_OUTPUT_FILE] = ( - runner._output_filename is None and self._use_temporary_output_file) - if context[self.USE_TEMPORARY_OUTPUT_FILE]: - # Replace the output of the kfp compile to a temporary file. - # This file will be deleted after job submission in kubeflow_handler.py - runner._output_filename = _get_temporary_package_filename( - context[self.PIPELINE_NAME], runner._output_dir) - output_filename = ( - runner._output_filename or - kubeflow_dag_runner.get_default_output_filename( - context[self.PIPELINE_NAME])) - context[self.OUTPUT_FILE_PATH] = os.path.join(runner._output_dir, - output_filename) - - def get_runner_class(self) -> Type[tfx_runner.TfxRunner]: - return kubeflow_dag_runner.KubeflowDagRunner diff --git a/tfx/tools/cli/handler/kubeflow_dag_runner_patcher_test.py b/tfx/tools/cli/handler/kubeflow_dag_runner_patcher_test.py deleted file mode 100644 index e1b2459caa..0000000000 --- a/tfx/tools/cli/handler/kubeflow_dag_runner_patcher_test.py +++ /dev/null @@ -1,71 +0,0 @@ -# Copyright 2021 Google LLC. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Tests for tfx.tools.cli.handler.kubeflow_dag_runner_patcher.""" - -import os -from unittest import mock - -import tensorflow as tf -from tfx.orchestration import pipeline as tfx_pipeline -from tfx.orchestration.kubeflow import kubeflow_dag_runner -from tfx.tools.cli.handler import kubeflow_dag_runner_patcher -from tfx.utils import test_case_utils - - -class KubeflowDagRunnerPatcherTest(test_case_utils.TfxTest): - - def setUp(self): - super().setUp() - self.enter_context(test_case_utils.change_working_dir(self.tmp_dir)) - - def testPatcher(self): - given_image_name = 'foo/bar' - built_image_name = 'foo/bar@sha256:1234567890' - - mock_build_image_fn = mock.MagicMock(return_value=built_image_name) - patcher = kubeflow_dag_runner_patcher.KubeflowDagRunnerPatcher( - call_real_run=True, - build_image_fn=mock_build_image_fn, - use_temporary_output_file=True) - runner_config = kubeflow_dag_runner.KubeflowDagRunnerConfig( - tfx_image=given_image_name) - runner = kubeflow_dag_runner.KubeflowDagRunner(config=runner_config) - pipeline = tfx_pipeline.Pipeline('dummy', 'dummy_root') - with patcher.patch() as context: - runner.run(pipeline) - self.assertTrue(context[patcher.USE_TEMPORARY_OUTPUT_FILE]) - self.assertIn(patcher.OUTPUT_FILE_PATH, context) - - mock_build_image_fn.assert_called_once_with(given_image_name) - self.assertEqual(runner_config.tfx_image, built_image_name) - - def testPatcherWithOutputFile(self): - output_filename = 'foo.tar.gz' - patcher = kubeflow_dag_runner_patcher.KubeflowDagRunnerPatcher( - call_real_run=False, - build_image_fn=None, - use_temporary_output_file=True) - runner = kubeflow_dag_runner.KubeflowDagRunner( - output_filename=output_filename) - pipeline = tfx_pipeline.Pipeline('dummy', 'dummy_root') - with patcher.patch() as context: - runner.run(pipeline) - self.assertFalse(context[patcher.USE_TEMPORARY_OUTPUT_FILE]) - self.assertEqual( - os.path.basename(context[patcher.OUTPUT_FILE_PATH]), output_filename) - self.assertEqual(runner._output_filename, output_filename) - - -if __name__ == '__main__': - tf.test.main() diff --git a/tfx/v1/orchestration/experimental/__init__.py b/tfx/v1/orchestration/experimental/__init__.py index 7963c45a1f..d222954eea 100644 --- a/tfx/v1/orchestration/experimental/__init__.py +++ b/tfx/v1/orchestration/experimental/__init__.py @@ -12,23 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. """TFX orchestration.experimental module.""" - -try: # pylint: disable=g-statement-before-imports - from tfx.orchestration.kubeflow import kubeflow_dag_runner # pylint: disable=g-import-not-at-top - from tfx.orchestration.kubeflow.decorators import exit_handler # pylint: disable=g-import-not-at-top - from tfx.orchestration.kubeflow.decorators import FinalStatusStr # pylint: disable=g-import-not-at-top - from tfx.utils import telemetry_utils # pylint: disable=g-import-not-at-top - - KubeflowDagRunner = kubeflow_dag_runner.KubeflowDagRunner - KubeflowDagRunnerConfig = kubeflow_dag_runner.KubeflowDagRunnerConfig - get_default_kubeflow_metadata_config = kubeflow_dag_runner.get_default_kubeflow_metadata_config - LABEL_KFP_SDK_ENV = telemetry_utils.LABEL_KFP_SDK_ENV - - del telemetry_utils - del kubeflow_dag_runner -except ImportError: # Import will fail without kfp package. - pass - try: from tfx.orchestration.kubeflow.v2 import kubeflow_v2_dag_runner # pylint: disable=g-import-not-at-top diff --git a/tfx/v1/proto/__init__.py b/tfx/v1/proto/__init__.py index eb6bdb30a7..edfcd92c72 100644 --- a/tfx/v1/proto/__init__.py +++ b/tfx/v1/proto/__init__.py @@ -142,7 +142,7 @@ """ KubernetesConfig.__doc__ = """ -Kubernetes configuration. We currently only support the use case when infra validator is run by `orchestration.KubeflowDagRunner`. +Kubernetes configuration. Model server will be launched in the same namespace KFP is running on, as well as same service account will be used (unless specified). Model server will have `ownerReferences` to the infra validator, which delegates the strict cleanup guarantee to the kubernetes cluster. """ @@ -262,4 +262,4 @@ PairedExampleSkew.__doc__ = """ Configurations related to Example Diff on feature pairing level. -""" \ No newline at end of file +"""