Skip to content

Commit

Permalink
Merge pull request #6402 from TomPham97:patch-1
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 579332047
  • Loading branch information
tfx-copybara committed Nov 3, 2023
2 parents 13c3e1b + 96b98af commit d9bf964
Show file tree
Hide file tree
Showing 37 changed files with 757 additions and 408 deletions.
2 changes: 2 additions & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
## Major Features and Improvements

* Dropped python 3.8 support.
* Extend GetPipelineRunExecutions API to support filtering executions by
create_time, type.

## Breaking Changes

Expand Down
2 changes: 1 addition & 1 deletion docs/guide/examplegen.md
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ Span can be retrieved by using '{SPAN}' spec in the

* This spec matches digits and maps the data into the relevant SPAN numbers.
For example, 'data_{SPAN}-*.tfrecord' will collect files like
'data_12-a.tfrecord', 'date_12-b.tfrecord'.
'data_12-a.tfrecord', 'data_12-b.tfrecord'.
* Optionally, this spec can be specified with the width of the integers when
mapped. For example, 'data_{SPAN:2}.file' maps to files like 'data_02.file'
and 'data_27.file' (as inputs for Span-2 and Span-27 respectively), but does
Expand Down
29 changes: 19 additions & 10 deletions tfx/dsl/compiler/placeholder_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,13 @@ class ResolutionContext:
render all kinds of placeholders.
executor_spec: An executor spec proto for rendering context placeholder.
platform_config: A platform config proto for rendering context placeholder.
pipeline_platform_config: A pipeline-level config proto for rendering
context placeholder.
"""
exec_info: data_types.ExecutionInfo = None
executor_spec: message.Message = None
platform_config: message.Message = None
pipeline_platform_config: message.Message = None


# A Placeholder Expression can be resolved to the following types:
Expand Down Expand Up @@ -181,20 +184,26 @@ class _ExpressionResolver:

def __init__(self, context: ResolutionContext):
self._resolution_values = {
placeholder_pb2.Placeholder.Type.INPUT_ARTIFACT:
context.exec_info.input_dict,
placeholder_pb2.Placeholder.Type.OUTPUT_ARTIFACT:
context.exec_info.output_dict,
placeholder_pb2.Placeholder.Type.EXEC_PROPERTY:
context.exec_info.exec_properties,
placeholder_pb2.Placeholder.Type.INPUT_ARTIFACT: (
context.exec_info.input_dict
),
placeholder_pb2.Placeholder.Type.OUTPUT_ARTIFACT: (
context.exec_info.output_dict
),
placeholder_pb2.Placeholder.Type.EXEC_PROPERTY: (
context.exec_info.exec_properties
),
placeholder_pb2.Placeholder.Type.RUNTIME_INFO: {
ph.RuntimeInfoKey.EXECUTOR_SPEC.value: context.executor_spec,
ph.RuntimeInfoKey.PLATFORM_CONFIG.value: context.platform_config,
ph.RuntimeInfoKey.PIPELINE_PLATFORM_CONFIG.value: (
context.pipeline_platform_config
),
},
placeholder_pb2.Placeholder.Type.EXEC_INVOCATION:
context.exec_info.to_proto(),
placeholder_pb2.Placeholder.Type.ENVIRONMENT_VARIABLE:
os.environ.get,
placeholder_pb2.Placeholder.Type.EXEC_INVOCATION: (
context.exec_info.to_proto()
),
placeholder_pb2.Placeholder.Type.ENVIRONMENT_VARIABLE: os.environ.get,
}

def resolve(self, expression: placeholder_pb2.PlaceholderExpression) -> Any:
Expand Down
12 changes: 12 additions & 0 deletions tfx/dsl/control_flow/for_each_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from tfx.dsl.components.base import base_node
from tfx.dsl.context_managers import dsl_context_registry
from tfx.dsl.control_flow import for_each
from tfx.orchestration import pipeline as pipeline_lib
from tfx.types import resolved_channel
from tfx.utils import test_case_utils

Expand Down Expand Up @@ -124,6 +125,17 @@ def testForEach_DifferentLoop_HasDifferentContext(self):
context2 = dsl_context_registry.get().get_contexts(c2)[-1]
self.assertNotEqual(context1, context2)

# TODO(b/247709394): This should be removed once subpipelines are supported.
def testForEach_Subpipeline_NotImplemented(self):
a = A()
with self.assertRaises(NotImplementedError):
with for_each.ForEach(a.outputs['aa']) as aa:
p_in = pipeline_lib.PipelineInputs({'aa': aa})
b = B(aa=p_in.inputs['aa'])
pipeline_lib.Pipeline(
pipeline_name='foo', components=b, inputs=p_in, outputs={}
)


if __name__ == '__main__':
tf.test.main()
12 changes: 8 additions & 4 deletions tfx/dsl/placeholder/placeholder.py
Original file line number Diff line number Diff line change
Expand Up @@ -993,8 +993,9 @@ def exec_property(key: str) -> ExecPropertyPlaceholder:


class RuntimeInfoKey(enum.Enum):
PLATFORM_CONFIG = 'platform_config'
EXECUTOR_SPEC = 'executor_spec'
PLATFORM_CONFIG = 'platform_config'
PIPELINE_PLATFORM_CONFIG = 'pipeline_platform_config'


_RUNTIME_INFO_KEYS = frozenset(key.value for key in RuntimeInfoKey)
Expand All @@ -1004,9 +1005,12 @@ def runtime_info(key: str) -> RuntimeInfoPlaceholder:
"""Returns a Placeholder that contains runtime information for component.
Currently the runtime info includes following keys:
1. platform_config: A platform_config proto that contains platform specific
information.
2. executor_spec: The executor spec proto.
1. executor_spec: The executor spec proto.
2. platform_config: A proto that contains platform-specific information for
the current pipeline node.
3. pipeline_platform_config: A proto that contains platform-specific
information for the pipeline as a whole.
Args:
key: The key of the runtime information.
Expand Down
21 changes: 0 additions & 21 deletions tfx/orchestration/data_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,27 +54,6 @@ def __init__(self,
self.use_cached_results = use_cached_results


class ExecutionInfo:
"""ExecutionInfo contains information populated during execution phase.
Attributes:
input_dict: Updated key -> List of types.Artifact for inputs that was used
during the actual execution.
output_dict: Updated key -> List of types.Artifact for outputs that was
generated during the actual execution.
exec_properties: execution properties used in this execution.
execution_id: Registered execution_id for the execution.
"""

def __init__(self, input_dict: Dict[str, List[types.Artifact]],
output_dict: Dict[str, List[types.Artifact]],
exec_properties: Dict[str, Any], execution_id: int):
self.input_dict = input_dict
self.output_dict = output_dict
self.exec_properties = exec_properties
self.execution_id = execution_id


class DriverArgs:
"""Args to driver from orchestration system.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,11 @@ def _generate_tasks_for_node(
Returns:
Returns a `Task` or `None` if task generation is deemed infeasible.
"""
logging.info('Generating task for node %s', node.node_info.id)
logging.info(
'[AsyncPipelineTaskGenerator._generate_tasks_for_node] invoked for'
' node %s',
node.node_info.id,
)
result = []
node_uid = task_lib.NodeUid.from_node(self._pipeline, node)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@
from google.protobuf import message


def get_pipeline_platform_config(
deployment_config: pipeline_pb2.IntermediateDeploymentConfig,
) -> Optional[message.Message]:
"""Unsupported."""
del deployment_config
return None


def get_node_platform_config(
deployment_config: pipeline_pb2.IntermediateDeploymentConfig,
node_id: str,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ def make_deployment_config(

class DeploymentConfigUtilsTest(tf.test.TestCase):

def test_returns_none_pipeline_platform_config(self):
self.assertIsNone(
deployment_config_utils.get_pipeline_platform_config(
pipeline_pb2.IntermediateDeploymentConfig()
)
)

def test_returns_plain_platform_config(self):
expected_config = platform_config_pb2.DockerPlatformConfig(
docker_server_url='docker/server/url'
Expand Down
7 changes: 4 additions & 3 deletions tfx/orchestration/experimental/core/event_observer.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,9 +303,10 @@ def observe_event(event):
try:
observer_fn(event)
except Exception as e: # pylint: disable=broad-except
logging.exception(
"Exception raised by observer function when observing "
"event %s: %s", event, e)
logging.error("Exception caught while observing event: %s", event)
# Log exception separately as events can be very long and block the
# exception from being logged.
logging.exception("Exception: %s", e)

def dequeue():
try:
Expand Down
Loading

0 comments on commit d9bf964

Please sign in to comment.