diff --git a/cytotable/convert.py b/cytotable/convert.py index 54104771..3181d0e7 100644 --- a/cytotable/convert.py +++ b/cytotable/convert.py @@ -1052,6 +1052,7 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals _source_chunk_to_parquet, ) from cytotable.sources import _gather_sources + from cytotable.utils import _expand_path # gather sources to be processed sources = _gather_sources( @@ -1065,6 +1066,9 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals if pathlib.Path(dest_path).is_file(): pathlib.Path(dest_path).unlink() + # expand the destination path + expanded_dest_path = _expand_path(path=dest_path) + # prepare offsets for chunked data export from source tables offsets_prepared = { source_group_name: [ @@ -1129,7 +1133,7 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals source=source, chunk_size=chunk_size, offset=offset, - dest_path=dest_path, + dest_path=expanded_dest_path, data_type_cast_map=data_type_cast_map, ), source_group_name=source_group_name, @@ -1170,7 +1174,7 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals source_group_name: _concat_source_group( source_group_name=source_group_name, source_group=source_group_vals["sources"], - dest_path=dest_path, + dest_path=expanded_dest_path, common_schema=source_group_vals["common_schema"], ).result() for source_group_name, source_group_vals in common_schema_determined.items() @@ -1187,7 +1191,7 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals # join group merging as each mapped task run will need # full concat results sources=results, - dest_path=dest_path, + dest_path=expanded_dest_path, joins=joins, # get merging chunks by join columns join_group=join_group, @@ -1208,7 +1212,7 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals # return results in common format which includes metadata # for lineage and debugging results = _concat_join_sources( - dest_path=dest_path, + dest_path=expanded_dest_path, join_sources=join_sources_result, sources=results, ).result() diff --git a/cytotable/sources.py b/cytotable/sources.py index 04d16ee3..516ea43e 100644 --- a/cytotable/sources.py +++ b/cytotable/sources.py @@ -13,7 +13,7 @@ @python_app def _build_path( path: Union[str, pathlib.Path, AnyPath], **kwargs -) -> Union[pathlib.Path, Any]: +) -> Union[pathlib.Path, AnyPath]: """ Build a path client or return local path. @@ -29,12 +29,12 @@ def _build_path( A local pathlib.Path or Cloudpathlib.AnyPath type path. """ - import pathlib + from cloudpathlib import CloudPath - from cloudpathlib import AnyPath, CloudPath + from cytotable.utils import _expand_path # form a path using cloudpathlib AnyPath, stripping certain characters - processed_path = AnyPath(str(path).strip("'\" ")) + processed_path = _expand_path(str(path).strip("'\" ")) # set the client for a CloudPath if isinstance(processed_path, CloudPath): diff --git a/cytotable/utils.py b/cytotable/utils.py index c70274cc..61b01f6d 100644 --- a/cytotable/utils.py +++ b/cytotable/utils.py @@ -6,7 +6,7 @@ import multiprocessing import os import pathlib -from typing import Dict, Union, cast +from typing import Any, Dict, Union, cast import duckdb import pyarrow as pa @@ -389,3 +389,33 @@ def _arrow_type_cast_if_specified( # else we retain the existing data field type return column + + +def _expand_path( + path: Union[str, pathlib.Path, AnyPath] +) -> Union[pathlib.Path, AnyPath]: + """ + Expands "~" user directory references with the user's home directory, and expands variable references with values from the environment. After user/variable expansion, the path is resolved and an absolute path is returned. + + Args: + path: Union[str, pathlib.Path, CloudPath]: + Path to expand. + + Returns: + Union[pathlib.Path, Any] + A local pathlib.Path or Cloudpathlib.AnyPath type path. + """ + + import os + import pathlib + + from cloudpathlib import AnyPath + + # expand environment variables and resolve the path as absolute + modifed_path = AnyPath(os.path.expandvars(path)) + + # note: we use pathlib.Path here to help expand local paths (~, etc) + if isinstance(modifed_path, pathlib.Path): + modifed_path = modifed_path.expanduser() + + return modifed_path.resolve() diff --git a/tests/test_convert.py b/tests/test_convert.py index 994d0642..fe1fceca 100644 --- a/tests/test_convert.py +++ b/tests/test_convert.py @@ -5,6 +5,7 @@ # pylint: disable=no-member,too-many-lines import itertools +import os import pathlib from shutil import copy from typing import Any, Dict, List, Tuple, cast @@ -14,6 +15,7 @@ import pyarrow as pa import pyarrow.compute as pc import pytest +from cloudpathlib import CloudPath from parsl.channels import LocalChannel from parsl.config import Config from parsl.executors import HighThroughputExecutor @@ -36,6 +38,7 @@ from cytotable.utils import ( _column_sort, _duckdb_reader, + _expand_path, _sqlite_mixed_type_query_to_parquet, ) @@ -60,6 +63,28 @@ def test_config(): ) == sorted(config_preset.keys()) +def test_extend_path(fx_tempdir: str): + """ + Tests _extend_path + """ + + # check that we have a pathlib path returned for local paths + assert isinstance(_expand_path(path=fx_tempdir), pathlib.Path) + + # check that we have a cloudpath path returned for simulated cloud path + assert isinstance(_expand_path(path=f"s3://{fx_tempdir}"), CloudPath) + + # test that `~` and `$HOME` resolve properly to home + home_dir = str(os.environ.get("HOME")) + assert _expand_path(path="~") == pathlib.Path(home_dir) + assert _expand_path(path="$HOME") == pathlib.Path(home_dir) + + # create a subdir and test path resolution to a root + subdir = f"{fx_tempdir}/test_subdir" + pathlib.Path(subdir).mkdir() + assert _expand_path(path=f"{subdir}/..") == pathlib.Path(fx_tempdir).resolve() + + def test_get_source_filepaths(fx_tempdir: str, data_dir_cellprofiler: str): """ Tests _get_source_filepaths