Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expand source and destination paths #89

Merged
merged 7 commits into from
Aug 18, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions cytotable/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -1045,6 +1045,7 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals
_source_chunk_to_parquet,
)
from cytotable.sources import _gather_sources
from cytotable.utils import _expand_path

# gather sources to be processed
sources = _gather_sources(
Expand All @@ -1058,6 +1059,9 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals
if pathlib.Path(dest_path).is_file():
pathlib.Path(dest_path).unlink()

# expand the destination path
expanded_dest_path = _expand_path(path=dest_path)

# prepare offsets for chunked data export from source tables
offsets_prepared = {
source_group_name: [
Expand Down Expand Up @@ -1122,7 +1126,7 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals
source=source,
chunk_size=chunk_size,
offset=offset,
dest_path=dest_path,
dest_path=expanded_dest_path,
data_type_cast_map=data_type_cast_map,
),
source_group_name=source_group_name,
Expand Down Expand Up @@ -1163,7 +1167,7 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals
source_group_name: _concat_source_group(
source_group_name=source_group_name,
source_group=source_group_vals["sources"],
dest_path=dest_path,
dest_path=expanded_dest_path,
common_schema=source_group_vals["common_schema"],
).result()
for source_group_name, source_group_vals in common_schema_determined.items()
Expand All @@ -1180,7 +1184,7 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals
# join group merging as each mapped task run will need
# full concat results
sources=results,
dest_path=dest_path,
dest_path=expanded_dest_path,
joins=joins,
# get merging chunks by join columns
join_group=join_group,
Expand All @@ -1201,7 +1205,7 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals
# return results in common format which includes metadata
# for lineage and debugging
results = _concat_join_sources(
dest_path=dest_path,
dest_path=expanded_dest_path,
join_sources=join_sources_result,
sources=results,
).result()
Expand Down
6 changes: 4 additions & 2 deletions cytotable/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@ def _build_path(

import pathlib

from cloudpathlib import AnyPath, CloudPath
from cloudpathlib import CloudPath

from cytotable.utils import _expand_path

# form a path using cloudpathlib AnyPath, stripping certain characters
processed_path = AnyPath(str(path).strip("'\" "))
processed_path = _expand_path(str(path).strip("'\" "))

# set the client for a CloudPath
if isinstance(processed_path, CloudPath):
Expand Down
27 changes: 26 additions & 1 deletion cytotable/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import multiprocessing
import os
import pathlib
from typing import Dict, Union, cast
from typing import Any, Dict, Union, cast

import duckdb
import pyarrow as pa
Expand Down Expand Up @@ -389,3 +389,28 @@ def _arrow_type_cast_if_specified(

# else we retain the existing data field type
return column


def _expand_path(
path: Union[str, pathlib.Path, CloudPath]
) -> Union[str, pathlib.Path, CloudPath]:
"""
Expands provided path with os environment or user expanded paths.
d33bs marked this conversation as resolved.
Show resolved Hide resolved

Args:
path: Union[str, pathlib.Path, CloudPath]:
Path to expand.

Returns:
Union[pathlib.Path, Any]
falquaddoomi marked this conversation as resolved.
Show resolved Hide resolved
A local pathlib.Path or Cloudpathlib.AnyPath type path.
"""

# expand environment variables and resolve the path as absolute
modifed_path = AnyPath(os.path.expandvars(path))

# note: we use pathlib.Path here to help expand local paths (~, etc)
if isinstance(modifed_path, pathlib.Path):
modifed_path = modifed_path.expanduser()

return modifed_path.resolve()
26 changes: 25 additions & 1 deletion tests/test_convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
# pylint: disable=no-member,too-many-lines

import itertools
import os
import pathlib
from shutil import copy
from typing import Any, Dict, List, Tuple, cast
Expand All @@ -14,6 +15,7 @@
import pyarrow as pa
import pyarrow.compute as pc
import pytest
from cloudpathlib import CloudPath
from parsl.channels import LocalChannel
from parsl.config import Config
from parsl.executors import HighThroughputExecutor
Expand All @@ -32,7 +34,7 @@
convert,
)
from cytotable.presets import config
from cytotable.sources import _get_source_filepaths, _infer_source_datatype
from cytotable.sources import _build_path, _get_source_filepaths, _infer_source_datatype
from cytotable.utils import (
_column_sort,
_duckdb_reader,
Expand Down Expand Up @@ -60,6 +62,28 @@ def test_config():
) == sorted(config_preset.keys())


def test_build_path(fx_tempdir: str):
d33bs marked this conversation as resolved.
Show resolved Hide resolved
"""
Tests _build_path
"""

# check that we have a pathlib path returned for local paths
assert isinstance(_build_path.func(path=fx_tempdir), pathlib.Path)

# check that we have a cloudpath path returned for simulated cloud path
assert isinstance(_build_path.func(path=f"s3://{fx_tempdir}"), CloudPath)

# test that `~` and `$HOME` resolve properly to home
home_dir = str(os.environ.get("HOME"))
assert _build_path.func(path="~") == pathlib.Path(home_dir)
assert _build_path.func(path="$HOME") == pathlib.Path(home_dir)

# create a subdir and test path resolution to a root
subdir = f"{fx_tempdir}/test_subdir"
pathlib.Path(subdir).mkdir()
assert _build_path.func(path=f"{subdir}/..") == pathlib.Path(fx_tempdir).resolve()


def test_get_source_filepaths(fx_tempdir: str, data_dir_cellprofiler: str):
"""
Tests _get_source_filepaths
Expand Down