Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Let file sources choose a path for uploaded files #19154

Open
wants to merge 6 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions lib/galaxy/files/sources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ def write_from(
native_path: str,
user_context: "OptionalUserContext" = None,
opts: Optional[FilesSourceOptions] = None,
):
) -> str:
"""Write file at native path to target_path (relative to uri root).

:param target_path: url of the target file to write to within the filesource. e.g. `gxfiles://myftp1/myfile.txt`
Expand All @@ -231,6 +231,9 @@ def write_from(
:type user_context: _type_, optional
:param opts: A set of options to exercise additional control over the write_from method. Filesource specific, defaults to None
:type opts: Optional[FilesSourceOptions], optional
:return: Actual url of the written file, fixed by the service backing the FileSource. May differ from the target
path.
:rtype: str
"""

@abc.abstractmethod
Expand Down Expand Up @@ -504,10 +507,10 @@ def write_from(
native_path: str,
user_context: "OptionalUserContext" = None,
opts: Optional[FilesSourceOptions] = None,
):
) -> str:
self._ensure_writeable()
self._check_user_access(user_context)
self._write_from(target_path, native_path, user_context=user_context, opts=opts)
return self._write_from(target_path, native_path, user_context=user_context, opts=opts)

@abc.abstractmethod
def _write_from(
Expand All @@ -516,7 +519,7 @@ def _write_from(
native_path: str,
user_context: "OptionalUserContext" = None,
opts: Optional[FilesSourceOptions] = None,
):
) -> str:
pass

def realize_to(
Expand Down
3 changes: 2 additions & 1 deletion lib/galaxy/files/sources/_pyfilesystem2.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,14 @@ def _write_from(
native_path: str,
user_context: OptionalUserContext = None,
opts: Optional[FilesSourceOptions] = None,
):
) -> str:
with open(native_path, "rb") as read_file:
openfs = self._open_fs(user_context=user_context, opts=opts)
dirname = fs.path.dirname(target_path)
if not openfs.isdir(dirname):
openfs.makedirs(dirname)
openfs.upload(target_path, read_file)
return target_path

def _resource_info_to_dict(self, dir_path, resource_info) -> AnyRemoteEntry:
name = resource_info.name
Expand Down
2 changes: 1 addition & 1 deletion lib/galaxy/files/sources/base64.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def _write_from(
native_path: str,
user_context: OptionalUserContext = None,
opts: Optional[FilesSourceOptions] = None,
):
) -> str:
raise NotImplementedError()

def score_url_match(self, url: str):
Expand Down
2 changes: 1 addition & 1 deletion lib/galaxy/files/sources/drs.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def _write_from(
native_path: str,
user_context: OptionalUserContext = None,
opts: Optional[FilesSourceOptions] = None,
):
) -> str:
raise NotImplementedError()

def score_url_match(self, url: str):
Expand Down
3 changes: 2 additions & 1 deletion lib/galaxy/files/sources/ftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def _write_from(
native_path: str,
user_context: OptionalUserContext = None,
opts: Optional[FilesSourceOptions] = None,
):
) -> str:
extra_props: FTPFilesSourceProperties
if opts and opts.extra_props:
extra_props = cast(FTPFilesSourceProperties, opts.extra_props)
Expand All @@ -69,6 +69,7 @@ def _write_from(
extra_props = {}
path, opts.extra_props = self._get_props_and_rel_path(extra_props, target_path)
super()._write_from(path, native_path, user_context=user_context, opts=opts)
return target_path

def _get_props_and_rel_path(
self, extra_props: FTPFilesSourceProperties, url: str
Expand Down
2 changes: 1 addition & 1 deletion lib/galaxy/files/sources/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def _write_from(
native_path: str,
user_context: OptionalUserContext = None,
opts: Optional[FilesSourceOptions] = None,
):
) -> str:
raise NotImplementedError()

def _serialization_props(self, user_context: OptionalUserContext = None) -> HTTPFilesSourceProperties:
Expand Down
3 changes: 2 additions & 1 deletion lib/galaxy/files/sources/invenio.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,9 +201,10 @@ def _write_from(
native_path: str,
user_context: OptionalUserContext = None,
opts: Optional[FilesSourceOptions] = None,
):
) -> str:
record_id, filename = self.parse_path(target_path)
self.repository.upload_file_to_draft_record(record_id, filename, native_path, user_context=user_context)
return target_path


class InvenioRepositoryInteractor(RDMRepositoryInteractor):
Expand Down
3 changes: 2 additions & 1 deletion lib/galaxy/files/sources/posix.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def _write_from(
native_path: str,
user_context: OptionalUserContext = None,
opts: Optional[FilesSourceOptions] = None,
):
) -> str:
effective_root = self._effective_root(user_context)
target_native_path = self._to_native_path(target_path, user_context=user_context)
if self.enforce_symlink_security:
Expand All @@ -136,6 +136,7 @@ def _write_from(
target_native_path_part = os.path.join(target_native_path_parent, f"_{target_native_path_name}.part")
shutil.copyfile(native_path, target_native_path_part)
os.rename(target_native_path_part, target_native_path)
return target_path

def _to_native_path(self, source_path: str, user_context: OptionalUserContext = None):
source_path = os.path.normpath(source_path)
Expand Down
3 changes: 2 additions & 1 deletion lib/galaxy/files/sources/s3fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,13 @@ def _write_from(
native_path,
user_context: OptionalUserContext = None,
opts: Optional[FilesSourceOptions] = None,
):
) -> str:
_props = self._serialization_props(user_context)
_bucket_name = _props.pop("bucket", "")
fs = self._open_fs(props=_props, opts=opts)
bucket_path = self._bucket_path(_bucket_name, target_path)
fs.upload(native_path, bucket_path)
return target_path

def _bucket_path(self, bucket_name: str, path: str):
if path.startswith("s3://"):
Expand Down
20 changes: 14 additions & 6 deletions lib/galaxy/managers/model_stores.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,21 +214,29 @@ def write_history_to(self, request: WriteHistoryTo):
export_files = "symlink" if request.include_files else None
target_uri = request.target_uri
user_context = self._build_user_context(request.user.user_id)
export_metadata = self.set_history_export_request_metadata(request)

exception_exporting_history: Optional[Exception] = None
try:
with model.store.get_export_store_factory(
export_store = model.store.get_export_store_factory(
self._app, model_store_format, export_files=export_files, user_context=user_context
)(target_uri) as export_store:
)(target_uri)
with export_store:
history = self._history_manager.by_id(request.history_id)
export_store.export_history(
history, include_hidden=request.include_hidden, include_deleted=request.include_deleted
)
self.set_history_export_result_metadata(request.export_association_id, export_metadata, success=True)
request.target_uri = str(export_store.file_source_uri) or request.target_uri
except Exception as e:
exception_exporting_history = e
raise
finally:
export_metadata = self.set_history_export_request_metadata(request)
self.set_history_export_result_metadata(
request.export_association_id, export_metadata, success=False, error=str(e)
request.export_association_id,
export_metadata,
success=not bool(exception_exporting_history),
error=str(exception_exporting_history) if exception_exporting_history else None,
)
raise
Copy link
Contributor Author

@kysrpex kysrpex Nov 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before applying the set of changes from 53012bf, the method ModelStoreManager.set_history_export_request_metadata() instantiates a ExportObjectMetadata Pydantic model and dumps it to the database in the form of JSON as the field StoreExportAssociation.export_metadata. After the export is complete, the method set_history_export_result_metadata() takes the same instance of ExportObjectMetadata, instantiates a ExportObjectResultMetadata Pydantic model, sets it as the result_data of the ExportObjectMetadata instance, and then saves the ExportObjectMetadata Pydantic model in the form of JSON to the database again.

After applying the set of changes, the call to ModelStoreManager.set_history_export_request_metadata() is delayed until the file has already been saved to the file source, as the actual URI that the file source assigns to the file is otherwise unknown.

The URI assigned by the file source overwrites the original target URI in the request. This involves a slight deviation from the previous behavior: if for example, power gets cut at the right time, StoreExportAssociation.export_metadata may not exist despite the history having been already saved to the file source, because database writes happen within the finally: block.

Moreover, overwriting the original target URI from the request is formally wrong, because the actual URI assigned by the file source should be part of the export result metadata, as it becomes known when the export completes. However, that implies modifying the other parts of the codebase that reference the URI from the request.

Despite the slight deviation in behavior and the formal incorrectness, rather than jumping straight into fixing these issues, I think it makes sense to leave the chance for discussion open, as doing things this way may still be an interesting tradeoff. Let me know what you think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking...

After applying the set of changes, the call to ModelStoreManager.set_history_export_request_metadata() is delayed until the file has already been saved to the file source, as the actual URI that the file source assigns to the file is otherwise unknown.

If we merge the PR as it is, then we'd never see an export that is in progress in the list from the UI. If the file is large, that alone would justify attempting to fix

overwriting the original target URI from the request is formally wrong, because the actual URI assigned by the file source should be part of the export result metadata, as it becomes known when the export completes

right? I guess it makes sense to make an attempt.

Copy link
Contributor Author

@kysrpex kysrpex Jan 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

7a9a52a scratches this itch


def set_history_export_request_metadata(
self, request: Union[WriteHistoryTo, GenerateHistoryDownload]
Expand Down
55 changes: 41 additions & 14 deletions lib/galaxy/model/store/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
TYPE_CHECKING,
Union,
)
from urllib.parse import urlparse

from bdbag import bdbag_api as bdb
from boltons.iterutils import remap
Expand Down Expand Up @@ -2601,6 +2602,8 @@ class BcoExportOptions:


class BcoModelExportStore(WorkflowInvocationOnlyExportStore):
file_source_uri: Optional[StrPath] = None

def __init__(self, uri, export_options: BcoExportOptions, **kwds):
temp_output_dir = tempfile.mkdtemp()
self.temp_output_dir = temp_output_dir
Expand All @@ -2620,10 +2623,14 @@ def _finalize(self):
core_biocompute_object, object_id = self._core_biocompute_object_and_object_id()
write_to_file(object_id, core_biocompute_object, self.out_file)
if self.file_source_uri:
file_source_uri = urlparse(self.file_source_uri)
file_source_path = self.file_sources.get_file_source_path(self.file_source_uri)
file_source = file_source_path.file_source
assert os.path.exists(self.out_file)
file_source.write_from(file_source_path.path, self.out_file, user_context=self.user_context)
self.file_source_uri = (
f"{file_source_uri.scheme}://{file_source_uri.netloc}"
+ file_source.write_from(file_source_path.path, self.out_file, user_context=self.user_context)
)

def _core_biocompute_object_and_object_id(self) -> Tuple[BioComputeObjectCore, str]:
assert self.app # need app.security to do anything...
Expand Down Expand Up @@ -2814,7 +2821,7 @@ def _finalize(self) -> None:


class ROCrateArchiveModelExportStore(DirectoryModelExportStore, WriteCrates):
file_source_uri: Optional[StrPath]
file_source_uri: Optional[StrPath] = None
out_file: StrPath

def __init__(self, uri: StrPath, **kwds) -> None:
Expand Down Expand Up @@ -2845,15 +2852,19 @@ def _finalize(self) -> None:
else:
if not self.file_sources:
raise Exception(f"Need self.file_sources but {type(self)} is missing it: {self.file_sources}.")
file_source_uri = urlparse(self.file_source_uri)
file_source_path = self.file_sources.get_file_source_path(self.file_source_uri)
file_source = file_source_path.file_source
assert os.path.exists(rval), rval
file_source.write_from(file_source_path.path, rval, user_context=self.user_context)
self.file_source_uri = (
f"{file_source_uri.scheme}://{file_source_uri.netloc}"
+ file_source.write_from(file_source_path.path, self.out_file, user_context=self.user_context)
)
shutil.rmtree(self.temp_output_dir)


class TarModelExportStore(DirectoryModelExportStore):
file_source_uri: Optional[StrPath]
file_source_uri: Optional[StrPath] = None
out_file: StrPath

def __init__(self, uri: StrPath, gzip: bool = True, **kwds) -> None:
Expand All @@ -2876,10 +2887,14 @@ def _finalize(self) -> None:
if self.file_source_uri:
if not self.file_sources:
raise Exception(f"Need self.file_sources but {type(self)} is missing it: {self.file_sources}.")
file_source_uri = urlparse(self.file_source_uri)
file_source_path = self.file_sources.get_file_source_path(self.file_source_uri)
file_source = file_source_path.file_source
assert os.path.exists(self.out_file)
file_source.write_from(file_source_path.path, self.out_file, user_context=self.user_context)
self.file_source_uri = (
f"{file_source_uri.scheme}://{file_source_uri.netloc}"
+ file_source.write_from(file_source_path.path, self.out_file, user_context=self.user_context)
)
shutil.rmtree(self.temp_output_dir)


Expand All @@ -2894,7 +2909,7 @@ def _finalize(self) -> None:


class BagArchiveModelExportStore(BagDirectoryModelExportStore):
file_source_uri: Optional[StrPath]
file_source_uri: Optional[StrPath] = None

def __init__(self, uri: StrPath, bag_archiver: str = "tgz", **kwds) -> None:
# bag_archiver in tgz, zip, tar
Expand All @@ -2919,10 +2934,14 @@ def _finalize(self) -> None:
else:
if not self.file_sources:
raise Exception(f"Need self.file_sources but {type(self)} is missing it: {self.file_sources}.")
file_source_uri = urlparse(self.file_source_uri)
file_source_path = self.file_sources.get_file_source_path(self.file_source_uri)
file_source = file_source_path.file_source
assert os.path.exists(rval)
file_source.write_from(file_source_path.path, rval, user_context=self.user_context)
self.file_source_uri = (
f"{file_source_uri.scheme}://{file_source_uri.netloc}"
+ file_source.write_from(file_source_path.path, self.out_file, user_context=self.user_context)
)
shutil.rmtree(self.temp_output_dir)


Expand All @@ -2932,13 +2951,21 @@ def get_export_store_factory(
export_files=None,
bco_export_options: Optional[BcoExportOptions] = None,
user_context=None,
) -> Callable[[StrPath], ModelExportStore]:
export_store_class: Union[
Type[TarModelExportStore],
Type[BagArchiveModelExportStore],
Type[ROCrateArchiveModelExportStore],
Type[BcoModelExportStore],
]
) -> Callable[
[StrPath],
Union[
TarModelExportStore,
BagArchiveModelExportStore,
ROCrateArchiveModelExportStore,
BcoModelExportStore,
],
]:
export_store_class: Type[Union[
TarModelExportStore,
BagArchiveModelExportStore,
ROCrateArchiveModelExportStore,
BcoModelExportStore,
]]
export_store_class_kwds = {
"app": app,
"export_files": export_files,
Expand Down
8 changes: 5 additions & 3 deletions lib/galaxy/tools/imp_exp/export_history.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,18 @@ def main(argv=None):
# Create archive.
exit = create_archive(temp_directory, out_file, gzip=gzip)
if destination_uri is not None and exit == 0:
_write_to_destination(options.file_sources, os.path.abspath(out_file), destination_uri)
actual_uri = _write_to_destination(options.file_sources, os.path.abspath(out_file), destination_uri)
if destination_uri != actual_uri:
print(f"Saved history archive to {actual_uri}.")
return exit


def _write_to_destination(file_sources_path: str, out_file: str, destination_uri: str):
def _write_to_destination(file_sources_path: str, out_file: str, destination_uri: str) -> str:
file_sources = get_file_sources(file_sources_path)
file_source_path = file_sources.get_file_source_path(destination_uri)
file_source = file_source_path.file_source
assert os.path.exists(out_file)
file_source.write_from(file_source_path.path, out_file)
return file_source.write_from(file_source_path.path, out_file)


def get_file_sources(file_sources_path: str) -> ConfiguredFileSources:
Expand Down
9 changes: 6 additions & 3 deletions test/unit/app/managers/test_user_file_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,10 @@ def test_io(self, tmp_path):

temp_file = tmp_path / "tmp_file"
temp_file.write_text("Moo Cow", "utf-8")
file_source.write_from("/moo", str(temp_file))
actual_path = file_source.write_from("/moo", str(temp_file))
target = tmp_path / "round_trip"
file_source.realize_to("/moo", target)
assert "/moo" == actual_path
assert target.read_text("utf-8") == "Moo Cow"

def test_to_dict_filters_hidden(self, tmp_path):
Expand Down Expand Up @@ -271,7 +272,8 @@ def test_environment_injection(self, tmp_path):

temp_file = tmp_path / "tmp_file"
temp_file.write_text("Moo Cow", "utf-8")
file_source.write_from("/moo", str(temp_file))
actual_path = file_source.write_from("/moo", str(temp_file))
assert "/moo" == actual_path
assert expected_target.exists()
assert (expected_target / "moo").exists()

Expand All @@ -289,7 +291,8 @@ def test_environment_defaults(self, tmp_path):

temp_file = tmp_path / "tmp_file"
temp_file.write_text("Moo Cow", "utf-8")
file_source.write_from("/moo", str(temp_file))
actual_path = file_source.write_from("/moo", str(temp_file))
assert "/moo" == actual_path
assert expected_target.exists()
assert (expected_target / "moo").exists()

Expand Down
Loading
Loading