Skip to content

Commit

Permalink
Add util for migrating from stateful_working_dir to executor_output_dir
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 584314929
  • Loading branch information
kmonte authored and tfx-copybara committed Nov 21, 2023
1 parent f5f1464 commit 5bdf41d
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 0 deletions.
40 changes: 40 additions & 0 deletions tfx/orchestration/portable/outputs_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,46 @@ def generate_output_artifacts(
return output_artifacts


# TODO(b/308452534): Remove this after we can guarantee that no jobs will use
# the old directory.
def migrate_executor_output_dir_from_stateful_working_directory(
execution_info: data_types.ExecutionInfo,
files: collections.abc.Sequence[str],
):
"""Copies files from stateful working dir to executor output dir.
Will not overwrite any files already existing in the executor output dir.
Args:
execution_info: Information for the execution that should have its files
migrated.
files: The relative file paths to be migrated.
"""
executor_output_dir = get_executor_output_dir(execution_info)
stateful_working_dir = execution_info.stateful_working_dir
found_paths = []
for file in files:
stateful_working_file = os.path.join(stateful_working_dir, file)
executor_output_file = os.path.join(executor_output_dir, file)

if fileio.exists(stateful_working_file) and not fileio.exists(
executor_output_file
):
# We may need to make the parent directories for the executor output dir.
executor_output_file_dir = os.path.dirname(executor_output_file)
if not fileio.exists(executor_output_file_dir):
fileio.makedirs(executor_output_file_dir)
found_paths.append(stateful_working_file)
fileio.copy(stateful_working_file, executor_output_file)

if found_paths:
logging.info(
'Executor output dir %s has had the following files migrated to it. %s',
executor_output_dir,
found_paths,
)


def get_executor_output_dir(execution_info: data_types.ExecutionInfo) -> str:
"""Generates executor output directory for a given execution info."""
return os.path.dirname(execution_info.execution_output_uri)
Expand Down
38 changes: 38 additions & 0 deletions tfx/orchestration/portable/outputs_utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,44 @@ def testGenerateOutputArtifacts(self, exec_mode, artifact_name_prefix):
self.assertEqual(artifact_7.uri, outputs_utils.RESOLVED_AT_RUNTIME)
self.assertTrue(artifact_7.is_external)

def testMigrateExecutorOutputDirFromStatefulWorkingDir(self):
existing_file = 'already_exists.txt'
existing_file_text = 'already_written'
files = ['foo.txt', 'bar.txt', 'path/to/qux.txt', existing_file]
data = ['foo', 'bar', 'qux', 'should_not_be_written']
expected_data = ['foo', 'bar', 'qux', existing_file_text]

tmpdir = self.create_tempdir()
stateful_working_dir = os.path.join(
tmpdir.full_path, 'stateful_working_dir'
)
for file, datum in zip(files, data):
stateful_working_file = os.path.join(stateful_working_dir, file)
fileio.makedirs(os.path.dirname(stateful_working_file))
with fileio.open(stateful_working_file, 'w') as f:
f.write(datum)

executor_output = os.path.join(tmpdir.full_path, 'executor_output')
executor_output_file_uri = os.path.join(executor_output, 'foobar.pbtxt')
fileio.makedirs(executor_output)
# Test when there's an existing file in the executor output dir
with fileio.open(os.path.join(executor_output, existing_file), 'w') as f:
f.write(existing_file_text)

exec_info = data_types.ExecutionInfo(
stateful_working_dir=stateful_working_dir,
execution_output_uri=executor_output_file_uri,
)
outputs_utils.migrate_executor_output_dir_from_stateful_working_directory(
exec_info, files
)

for file, datum in zip(files, expected_data):
with self.subTest(f'Check {file}'):
with fileio.open(os.path.join(executor_output, file), 'r') as f:
actual_datum = f.read()
self.assertEqual(actual_datum, datum)

def testGetExecutorOutputDir(self):
execution_info = data_types.ExecutionInfo(
execution_output_uri=self._output_resolver().get_executor_output_uri(1)
Expand Down

0 comments on commit 5bdf41d

Please sign in to comment.