Skip to content

Commit

Permalink
FIX-#6879: Convert the right DF to single partition before broadcasti…
Browse files Browse the repository at this point in the history
…ng in query_compiler.merge (#6880)

Signed-off-by: arunjose696 <[email protected]>
Signed-off-by: Igoshev, Iaroslav <[email protected]>
Co-authored-by: Anatoly Myachev <[email protected]>
Co-authored-by: Igoshev, Iaroslav <[email protected]>
  • Loading branch information
3 people authored Feb 13, 2024
1 parent 25d143f commit 9ff1c15
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 50 deletions.
29 changes: 29 additions & 0 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2762,6 +2762,35 @@ def explode(self, axis: Union[int, Axis], func: Callable) -> "PandasDataframe":
partitions, new_index, new_columns, row_lengths, column_widths
)

def combine(self) -> "PandasDataframe":
"""
Create a single partition PandasDataframe from the partitions of the current dataframe.
Returns
-------
PandasDataframe
A single partition PandasDataframe.
"""
partitions = self._partition_mgr_cls.combine(self._partitions)
result = self.__constructor__(
partitions,
index=self.copy_index_cache(),
columns=self.copy_columns_cache(),
row_lengths=(
[sum(self._row_lengths_cache)]
if self._row_lengths_cache is not None
else None
),
column_widths=(
[sum(self._column_widths_cache)]
if self._column_widths_cache is not None
else None
),
dtypes=self.copy_dtypes_cache(),
)
result.synchronize_labels()
return result

@lazy_metadata_decorator(apply_axis="both")
def apply_full_axis(
self,
Expand Down
83 changes: 38 additions & 45 deletions modin/core/dataframe/pandas/partitioning/partition_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
PersistentPickle,
ProgressBar,
)
from modin.core.dataframe.pandas.utils import concatenate
from modin.core.dataframe.pandas.utils import create_pandas_df_from_partitions
from modin.core.storage_formats.pandas.utils import compute_chunksize
from modin.error_message import ErrorMessage
from modin.logging import ClassLogger
Expand Down Expand Up @@ -781,50 +781,7 @@ def to_pandas(cls, partitions):
A pandas DataFrame
"""
retrieved_objects = cls.get_objects_from_partitions(partitions.flatten())
if all(
isinstance(obj, (pandas.DataFrame, pandas.Series))
for obj in retrieved_objects
):
height, width, *_ = tuple(partitions.shape) + (0,)
# restore 2d array
objs = iter(retrieved_objects)
retrieved_objects = [
[next(objs) for _ in range(width)] for __ in range(height)
]
else:
# Partitions do not always contain pandas objects, for example, hdk uses pyarrow tables.
# This implementation comes from the fact that calling `partition.get`
# function is not always equivalent to `partition.to_pandas`.
retrieved_objects = [
[obj.to_pandas() for obj in part] for part in partitions
]
if all(
isinstance(part, pandas.Series) for row in retrieved_objects for part in row
):
axis = 0
elif all(
isinstance(part, pandas.DataFrame)
for row in retrieved_objects
for part in row
):
axis = 1
else:
ErrorMessage.catch_bugs_and_request_email(True)

def is_part_empty(part):
return part.empty and (
not isinstance(part, pandas.DataFrame) or (len(part.columns) == 0)
)

df_rows = [
pandas.concat([part for part in row], axis=axis, copy=False)
for row in retrieved_objects
if not all(is_part_empty(part) for part in row)
]
if len(df_rows) == 0:
return pandas.DataFrame()
else:
return concatenate(df_rows)
return create_pandas_df_from_partitions(retrieved_objects, partitions.shape)

@classmethod
def to_numpy(cls, partitions, **kwargs):
Expand Down Expand Up @@ -1141,6 +1098,42 @@ def _apply_func_to_list_of_partitions(cls, func, partitions, **kwargs):
preprocessed_func = cls.preprocess_func(func)
return [obj.apply(preprocessed_func, **kwargs) for obj in partitions]

@classmethod
def combine(cls, partitions):
"""
Convert a NumPy 2D array of partitions to a NumPy 2D array of a single partition.
Parameters
----------
partitions : np.ndarray
The partitions which have to be converted to a single partition.
Returns
-------
np.ndarray
A NumPy 2D array of a single partition.
"""

def to_pandas_remote(df, partition_shape, *dfs):
"""Copy of ``cls.to_pandas()`` method adapted for a remote function."""
return create_pandas_df_from_partitions(
(df,) + dfs, partition_shape, called_from_remote=True
)

preprocessed_func = cls.preprocess_func(to_pandas_remote)
partition_shape = partitions.shape
partitions_flattened = partitions.flatten()
for idx, part in enumerate(partitions_flattened):
if hasattr(part, "force_materialization"):
partitions_flattened[idx] = part.force_materialization()
partition_refs = [
partition.list_of_blocks[0] for partition in partitions_flattened[1:]
]
combined_partition = partitions.flat[0].apply(
preprocessed_func, partition_shape, *partition_refs
)
return np.array([combined_partition]).reshape(1, -1)

@classmethod
@wait_computations_if_benchmark_mode
def apply_func_to_select_indices(
Expand Down
68 changes: 65 additions & 3 deletions modin/core/dataframe/pandas/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
import pandas
from pandas.api.types import union_categoricals

from modin.error_message import ErrorMessage

def concatenate(dfs):

def concatenate(dfs, copy=True):
"""
Concatenate pandas DataFrames with saving 'category' dtype.
Expand All @@ -28,6 +30,8 @@ def concatenate(dfs):
----------
dfs : list
List of pandas DataFrames to concatenate.
copy : bool, default: True
Make explicit copy when creating dataframe.
Returns
-------
Expand Down Expand Up @@ -60,8 +64,66 @@ def concatenate(dfs):
i, pandas.Categorical(df.iloc[:, i], categories=union.categories)
)
# `ValueError: buffer source array is read-only` if copy==False
if len(dfs) == 1:
if len(dfs) == 1 and copy:
# concat doesn't make a copy if len(dfs) == 1,
# so do it explicitly
return dfs[0].copy()
return pandas.concat(dfs, copy=True)
return pandas.concat(dfs, copy=copy)


def create_pandas_df_from_partitions(
partition_data, partition_shape, called_from_remote=False
):
"""
Convert partition data of multiple dataframes to a single dataframe.
Parameters
----------
partition_data : list
List of pandas DataFrames or list of Object references holding pandas DataFrames.
partition_shape : int or tuple
Shape of the partitions NumPy array.
called_from_remote : bool, default: False
Flag used to check if explicit copy should be done in concat.
Returns
-------
pandas.DataFrame
A pandas DataFrame.
"""
if all(
isinstance(obj, (pandas.DataFrame, pandas.Series)) for obj in partition_data
):
height, width, *_ = tuple(partition_shape) + (0,)
# restore 2d array
objs = iter(partition_data)
partition_data = [[next(objs) for _ in range(width)] for __ in range(height)]
else:
# Partitions do not always contain pandas objects, for example, hdk uses pyarrow tables.
# This implementation comes from the fact that calling `partition.get`
# function is not always equivalent to `partition.to_pandas`.
partition_data = [[obj.to_pandas() for obj in part] for part in partition_data]
if all(isinstance(part, pandas.Series) for row in partition_data for part in row):
axis = 0
elif all(
isinstance(part, pandas.DataFrame) for row in partition_data for part in row
):
axis = 1
else:
ErrorMessage.catch_bugs_and_request_email(True)

def is_part_empty(part):
return part.empty and (
not isinstance(part, pandas.DataFrame) or (len(part.columns) == 0)
)

df_rows = [
pandas.concat([part for part in row], axis=axis, copy=False)
for row in partition_data
if not all(is_part_empty(part) for part in row)
]
copy = not called_from_remote
if len(df_rows) == 0:
return pandas.DataFrame()
else:
return concatenate(df_rows, copy)
6 changes: 4 additions & 2 deletions modin/core/storage_formats/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,7 @@ def merge(self, right, **kwargs):
left_index = kwargs.get("left_index", False)
right_index = kwargs.get("right_index", False)
sort = kwargs.get("sort", False)
right_to_broadcast = right._modin_frame.combine()

if how in ["left", "inner"] and left_index is False and right_index is False:
kwargs["sort"] = False
Expand Down Expand Up @@ -620,7 +621,7 @@ def map_func(
axis=1,
func=map_func,
enumerate_partitions=how == "left",
other=right._modin_frame,
other=right_to_broadcast,
# We're going to explicitly change the shape across the 1-axis,
# so we want for partitioning to adapt as well
keep_partitioning=False,
Expand Down Expand Up @@ -681,6 +682,7 @@ def join(self, right, **kwargs):
on = kwargs.get("on", None)
how = kwargs.get("how", "left")
sort = kwargs.get("sort", False)
right_to_broadcast = right._modin_frame.combine()

if how in ["left", "inner"]:

Expand All @@ -697,7 +699,7 @@ def map_func(left, right, kwargs=kwargs): # pragma: no cover
num_splits=merge_partitioning(
self._modin_frame, right._modin_frame, axis=1
),
other=right._modin_frame,
other=right_to_broadcast,
)
)
return new_self.sort_rows_by_column_values(on) if sort else new_self
Expand Down

0 comments on commit 9ff1c15

Please sign in to comment.