Skip to content

Commit

Permalink
Refactor code structure, update version to 0.9.0, and add JSON and CS…
Browse files Browse the repository at this point in the history
…V file handling classes
  • Loading branch information
legout committed Jan 15, 2025
1 parent be78c5e commit e927188
Show file tree
Hide file tree
Showing 17 changed files with 1,657 additions and 737 deletions.
6 changes: 5 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ keywords = [
name = "FlowerPower"
readme = "README.md"
requires-python = ">= 3.11"
version = "0.8.5.6"
version = "0.9.0"

[project.scripts]
flowerpower = "flowerpower.cli:app"
Expand Down Expand Up @@ -94,5 +94,9 @@ dev-dependencies = [
"jupyterlab>=4.3.0",
"deltalake>=0.21.0",
"datafusion>=42.0.0",
"orjson>=3.10.14",
"joblib>=1.4.2",
"sanic>=24.12.0",
"sanic-ext>=23.12.0",
]
managed = true
3 changes: 1 addition & 2 deletions src/flowerpower/http/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from ..cli.utils import parse_dict_or_list_param

# from ..cli.utils import parse_dict_or_list_param
from pydantic import BaseModel, ValidationError as PydandticValidationError
from sanic_ext.exceptions import ValidationError

Expand Down
125 changes: 100 additions & 25 deletions src/flowerpower/io/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,8 @@ def filter(
filter_expr (str | pl.Expr | pa.compute.Expression): Filter expression.
Returns:
pl.DataFrame | pl.LazyFrame | pa.Table | list[pl.DataFrame] | list[pl.LazyFrame] | list[pa.Table]: Filtered data.
pl.DataFrame | pl.LazyFrame | pa.Table | list[pl.DataFrame] | list[pl.LazyFrame]
| list[pa.Table]: Filtered data.
"""
if isinstance(self._data, pl.DataFrame | pl.LazyFrame):
pl_schema = (
Expand Down Expand Up @@ -643,29 +644,103 @@ class BaseFileWriter(BaseFileIO):
| pl.LazyFrame
| pa.Table
| pd.DataFrame
| list[pl.DataFrame]
| list[pl.LazyFrame]
| list[pa.Table]
| list[pd.DataFrame]
| dict[str, Any]
| list[pl.DataFrame | pl.LazyFrame | pa.Table | pd.DataFrame | dict[str, Any]]
)
mode: str = "append"
basename: str | None = None
concat: bool = False
mode: str = "append" # append, overwrite, delete_matching, error_if_exists

def model_post_init(self, __context):
if isinstance(self.data, list) and self.concat:
if isinstance(self.data[0], pl.DataFrame | pl.LazyFrame):
self.data = pl.concat(self.data, how="diagonal_relaxed")
elif isinstance(self.data[0], pa.Table):
self.data = pa.concat_tables(self.data, promote_options="permissive")
elif isinstance(self.data[0], pd.DataFrame):
self.data = pd.concat(self.data)
self._gen_paths()

def _gen_paths(self):
if isinstance(self.path, str) and isinstance(self.data, list):
if self.path.endswith(self.format):
self.path = [
f"{self.path}-{i}.{self.format}" for i in range(len(self.data))
]

def write(self, data, **kwargs):
self.fs.write
def write(
self,
data: (
pl.DataFrame
| pl.LazyFrame
| pa.Table
| pd.DataFrame
| dict[str, Any]
| list[
pl.DataFrame | pl.LazyFrame | pa.Table | pd.DataFrame | dict[str, Any]
]
) | None = None,
basename: str | None = None,
concat: bool = False,
mode: str = "append",
**kwargs,
):

self.fs.write_files(
data=data or self.data,
basename=basename or self.basename,
concat=concat or self.concat,
mode=mode or self.mode,
**kwargs,
)


class BaseDatasetWriter(BaseFileWriter):
data: (
pl.DataFrame
| pl.LazyFrame
| pa.Table
| pa.RecordBatch
| pa.RecordBatchReader
| pd.DataFrame
| dict[str, Any]
| list[
pl.DataFrame
| pl.LazyFrame
| pa.Table
| pa.RecordBatch
| pa.RecordBatchReader
| pd.DataFrame
| dict[str, Any]
]
)
basename: str | None = None
schema: pa.Schema | None = None
partition_by: str | list[str] | pds.Partitioning | None = None
compression: str = "zstd"
concat: bool = False
mode: str = "append" # append, overwrite, delete_matching, error_if_exists

def write_dataset(
self,
data: (
pl.DataFrame
| pl.LazyFrame
| pa.Table
| pa.RecordBatch
| pa.RecordBatchReader
| pd.DataFrame
| dict[str, Any]
| list[
pl.DataFrame
| pl.LazyFrame
| pa.Table
| pa.RecordBatch
| pa.RecordBatchReader
| pd.DataFrame
| dict[str, Any]
]
) | None = None,
basename: str | None = None,
schema: pa.Schema | None = None,
partition_by: str | list[str] | pds.Partitioning | None = None,
compression: str = "zstd",
concat: bool = False,
mode: str = "append",
**kwargs,
):
self.fs.write_pyarrow_dataset(
data=data or self.data,
path=self.path,
basename=basename or self.basename,
schema=schema or self.schema,
partition_by=partition_by or self.partition_by,
format=self.format,
compression=compression or self.compression,
concat=concat or self.concat,
mode=mode or self.mode,
**kwargs,
)
19 changes: 11 additions & 8 deletions src/flowerpower/io/loader/deltatable.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
import datetime as dt
# import datetime as dt

import datafusion as dtf
import duckdb
import pandas as pd
import pyarrow as pa
import pyarrow.dataset as pds

# import pandas as pd
# import pyarrow as pa
# import pyarrow.dataset as pds
from deltalake import DeltaTable
from hamilton.function_modifiers import dataloader

# from hamilton.function_modifiers import dataloader

from ...utils.polars import pl
from ..utils import get_dataframe_metadata, get_delta_metadata

# from ..utils import get_dataframe_metadata, get_delta_metadata


class DeltaTableLoader(DeltaTable):
Expand Down Expand Up @@ -70,6 +73,6 @@ def register_in_datafusion(
ctx.register_dataset(name, self.to_pyarrow_dataset())
else:
ctx.register_record_batches(name, [self.to_pyarrow_table().to_batches()])
table = table.to_table()
ctx.register_dataset(name, table)
# table = table.to_table()
# ctx.register_dataset(name, table)
return ctx
Loading

0 comments on commit e927188

Please sign in to comment.