Skip to content

Commit

Permalink
Expand source and destination paths (#89)
Browse files Browse the repository at this point in the history
* add path expansion + res. for src's and dest's

* check github actions performance

* remove testing branch

* Update cytotable/utils.py

Co-authored-by: Faisal Alquaddoomi <[email protected]>

* update type hints and imports

* move to isolated testing of expand_path

---------

Co-authored-by: Faisal Alquaddoomi <[email protected]>
  • Loading branch information
d33bs and falquaddoomi authored Aug 18, 2023
1 parent 162533a commit c0d2077
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 9 deletions.
12 changes: 8 additions & 4 deletions cytotable/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -1052,6 +1052,7 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals
_source_chunk_to_parquet,
)
from cytotable.sources import _gather_sources
from cytotable.utils import _expand_path

# gather sources to be processed
sources = _gather_sources(
Expand All @@ -1065,6 +1066,9 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals
if pathlib.Path(dest_path).is_file():
pathlib.Path(dest_path).unlink()

# expand the destination path
expanded_dest_path = _expand_path(path=dest_path)

# prepare offsets for chunked data export from source tables
offsets_prepared = {
source_group_name: [
Expand Down Expand Up @@ -1129,7 +1133,7 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals
source=source,
chunk_size=chunk_size,
offset=offset,
dest_path=dest_path,
dest_path=expanded_dest_path,
data_type_cast_map=data_type_cast_map,
),
source_group_name=source_group_name,
Expand Down Expand Up @@ -1170,7 +1174,7 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals
source_group_name: _concat_source_group(
source_group_name=source_group_name,
source_group=source_group_vals["sources"],
dest_path=dest_path,
dest_path=expanded_dest_path,
common_schema=source_group_vals["common_schema"],
).result()
for source_group_name, source_group_vals in common_schema_determined.items()
Expand All @@ -1187,7 +1191,7 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals
# join group merging as each mapped task run will need
# full concat results
sources=results,
dest_path=dest_path,
dest_path=expanded_dest_path,
joins=joins,
# get merging chunks by join columns
join_group=join_group,
Expand All @@ -1208,7 +1212,7 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals
# return results in common format which includes metadata
# for lineage and debugging
results = _concat_join_sources(
dest_path=dest_path,
dest_path=expanded_dest_path,
join_sources=join_sources_result,
sources=results,
).result()
Expand Down
8 changes: 4 additions & 4 deletions cytotable/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
@python_app
def _build_path(
path: Union[str, pathlib.Path, AnyPath], **kwargs
) -> Union[pathlib.Path, Any]:
) -> Union[pathlib.Path, AnyPath]:
"""
Build a path client or return local path.
Expand All @@ -29,12 +29,12 @@ def _build_path(
A local pathlib.Path or Cloudpathlib.AnyPath type path.
"""

import pathlib
from cloudpathlib import CloudPath

from cloudpathlib import AnyPath, CloudPath
from cytotable.utils import _expand_path

# form a path using cloudpathlib AnyPath, stripping certain characters
processed_path = AnyPath(str(path).strip("'\" "))
processed_path = _expand_path(str(path).strip("'\" "))

# set the client for a CloudPath
if isinstance(processed_path, CloudPath):
Expand Down
32 changes: 31 additions & 1 deletion cytotable/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import multiprocessing
import os
import pathlib
from typing import Dict, Union, cast
from typing import Any, Dict, Union, cast

import duckdb
import pyarrow as pa
Expand Down Expand Up @@ -389,3 +389,33 @@ def _arrow_type_cast_if_specified(

# else we retain the existing data field type
return column


def _expand_path(
path: Union[str, pathlib.Path, AnyPath]
) -> Union[pathlib.Path, AnyPath]:
"""
Expands "~" user directory references with the user's home directory, and expands variable references with values from the environment. After user/variable expansion, the path is resolved and an absolute path is returned.
Args:
path: Union[str, pathlib.Path, CloudPath]:
Path to expand.
Returns:
Union[pathlib.Path, Any]
A local pathlib.Path or Cloudpathlib.AnyPath type path.
"""

import os
import pathlib

from cloudpathlib import AnyPath

# expand environment variables and resolve the path as absolute
modifed_path = AnyPath(os.path.expandvars(path))

# note: we use pathlib.Path here to help expand local paths (~, etc)
if isinstance(modifed_path, pathlib.Path):
modifed_path = modifed_path.expanduser()

return modifed_path.resolve()
25 changes: 25 additions & 0 deletions tests/test_convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
# pylint: disable=no-member,too-many-lines

import itertools
import os
import pathlib
from shutil import copy
from typing import Any, Dict, List, Tuple, cast
Expand All @@ -14,6 +15,7 @@
import pyarrow as pa
import pyarrow.compute as pc
import pytest
from cloudpathlib import CloudPath
from parsl.channels import LocalChannel
from parsl.config import Config
from parsl.executors import HighThroughputExecutor
Expand All @@ -36,6 +38,7 @@
from cytotable.utils import (
_column_sort,
_duckdb_reader,
_expand_path,
_sqlite_mixed_type_query_to_parquet,
)

Expand All @@ -60,6 +63,28 @@ def test_config():
) == sorted(config_preset.keys())


def test_extend_path(fx_tempdir: str):
"""
Tests _extend_path
"""

# check that we have a pathlib path returned for local paths
assert isinstance(_expand_path(path=fx_tempdir), pathlib.Path)

# check that we have a cloudpath path returned for simulated cloud path
assert isinstance(_expand_path(path=f"s3://{fx_tempdir}"), CloudPath)

# test that `~` and `$HOME` resolve properly to home
home_dir = str(os.environ.get("HOME"))
assert _expand_path(path="~") == pathlib.Path(home_dir)
assert _expand_path(path="$HOME") == pathlib.Path(home_dir)

# create a subdir and test path resolution to a root
subdir = f"{fx_tempdir}/test_subdir"
pathlib.Path(subdir).mkdir()
assert _expand_path(path=f"{subdir}/..") == pathlib.Path(fx_tempdir).resolve()


def test_get_source_filepaths(fx_tempdir: str, data_dir_cellprofiler: str):
"""
Tests _get_source_filepaths
Expand Down

0 comments on commit c0d2077

Please sign in to comment.