From c695752af64d2a8b1904a89eeebeffd1d4a3e9a7 Mon Sep 17 00:00:00 2001 From: kmonte Date: Tue, 6 Aug 2024 18:43:47 -0700 Subject: [PATCH] Add maximum_active_task_schedulers method to Env PiperOrigin-RevId: 660179475 --- tfx/orchestration/experimental/core/env.py | 7 +++++++ tfx/orchestration/experimental/core/env_test.py | 3 +++ 2 files changed, 10 insertions(+) diff --git a/tfx/orchestration/experimental/core/env.py b/tfx/orchestration/experimental/core/env.py index a1381ecbd7..bf04dff145 100644 --- a/tfx/orchestration/experimental/core/env.py +++ b/tfx/orchestration/experimental/core/env.py @@ -158,6 +158,10 @@ def get_status_code_from_exception( Returns None if the exception is not a known type. """ + @abc.abstractmethod + def maximum_active_task_schedulers(self) -> int: + """Returns the maximum number of active task schedulers.""" + class _DefaultEnv(Env): """Default environment.""" @@ -244,6 +248,9 @@ def get_status_code_from_exception( ) -> Optional[int]: return None + def maximum_active_task_schedulers(self) -> int: + return 1 + _ENV = _DefaultEnv() diff --git a/tfx/orchestration/experimental/core/env_test.py b/tfx/orchestration/experimental/core/env_test.py index ec2c27c9b3..04f3506482 100644 --- a/tfx/orchestration/experimental/core/env_test.py +++ b/tfx/orchestration/experimental/core/env_test.py @@ -100,6 +100,9 @@ def record_orchestration_time(self, pipeline_run_id: str) -> None: def should_orchestrate(self, pipeline: pipeline_pb2.Pipeline) -> bool: raise NotImplementedError() + def maximum_active_task_schedulers(self) -> int: + raise NotImplementedError() + class EnvTest(test_utils.TfxTest):