From 9ff1c156263f6bb853339ad24261bfe2d82384b8 Mon Sep 17 00:00:00 2001 From: Arun Jose <40291569+arunjose696@users.noreply.github.com> Date: Tue, 13 Feb 2024 13:09:22 +0100 Subject: [PATCH] FIX-#6879: Convert the right DF to single partition before broadcasting in query_compiler.merge (#6880) Signed-off-by: arunjose696 Signed-off-by: Igoshev, Iaroslav Co-authored-by: Anatoly Myachev Co-authored-by: Igoshev, Iaroslav --- .../dataframe/pandas/dataframe/dataframe.py | 29 +++++++ .../pandas/partitioning/partition_manager.py | 83 +++++++++---------- modin/core/dataframe/pandas/utils.py | 68 ++++++++++++++- .../storage_formats/pandas/query_compiler.py | 6 +- 4 files changed, 136 insertions(+), 50 deletions(-) diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index 920b9b18583..e3ad15e1154 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -2762,6 +2762,35 @@ def explode(self, axis: Union[int, Axis], func: Callable) -> "PandasDataframe": partitions, new_index, new_columns, row_lengths, column_widths ) + def combine(self) -> "PandasDataframe": + """ + Create a single partition PandasDataframe from the partitions of the current dataframe. + + Returns + ------- + PandasDataframe + A single partition PandasDataframe. + """ + partitions = self._partition_mgr_cls.combine(self._partitions) + result = self.__constructor__( + partitions, + index=self.copy_index_cache(), + columns=self.copy_columns_cache(), + row_lengths=( + [sum(self._row_lengths_cache)] + if self._row_lengths_cache is not None + else None + ), + column_widths=( + [sum(self._column_widths_cache)] + if self._column_widths_cache is not None + else None + ), + dtypes=self.copy_dtypes_cache(), + ) + result.synchronize_labels() + return result + @lazy_metadata_decorator(apply_axis="both") def apply_full_axis( self, diff --git a/modin/core/dataframe/pandas/partitioning/partition_manager.py b/modin/core/dataframe/pandas/partitioning/partition_manager.py index 2ccbd1de47f..27def624ca6 100644 --- a/modin/core/dataframe/pandas/partitioning/partition_manager.py +++ b/modin/core/dataframe/pandas/partitioning/partition_manager.py @@ -34,7 +34,7 @@ PersistentPickle, ProgressBar, ) -from modin.core.dataframe.pandas.utils import concatenate +from modin.core.dataframe.pandas.utils import create_pandas_df_from_partitions from modin.core.storage_formats.pandas.utils import compute_chunksize from modin.error_message import ErrorMessage from modin.logging import ClassLogger @@ -781,50 +781,7 @@ def to_pandas(cls, partitions): A pandas DataFrame """ retrieved_objects = cls.get_objects_from_partitions(partitions.flatten()) - if all( - isinstance(obj, (pandas.DataFrame, pandas.Series)) - for obj in retrieved_objects - ): - height, width, *_ = tuple(partitions.shape) + (0,) - # restore 2d array - objs = iter(retrieved_objects) - retrieved_objects = [ - [next(objs) for _ in range(width)] for __ in range(height) - ] - else: - # Partitions do not always contain pandas objects, for example, hdk uses pyarrow tables. - # This implementation comes from the fact that calling `partition.get` - # function is not always equivalent to `partition.to_pandas`. - retrieved_objects = [ - [obj.to_pandas() for obj in part] for part in partitions - ] - if all( - isinstance(part, pandas.Series) for row in retrieved_objects for part in row - ): - axis = 0 - elif all( - isinstance(part, pandas.DataFrame) - for row in retrieved_objects - for part in row - ): - axis = 1 - else: - ErrorMessage.catch_bugs_and_request_email(True) - - def is_part_empty(part): - return part.empty and ( - not isinstance(part, pandas.DataFrame) or (len(part.columns) == 0) - ) - - df_rows = [ - pandas.concat([part for part in row], axis=axis, copy=False) - for row in retrieved_objects - if not all(is_part_empty(part) for part in row) - ] - if len(df_rows) == 0: - return pandas.DataFrame() - else: - return concatenate(df_rows) + return create_pandas_df_from_partitions(retrieved_objects, partitions.shape) @classmethod def to_numpy(cls, partitions, **kwargs): @@ -1141,6 +1098,42 @@ def _apply_func_to_list_of_partitions(cls, func, partitions, **kwargs): preprocessed_func = cls.preprocess_func(func) return [obj.apply(preprocessed_func, **kwargs) for obj in partitions] + @classmethod + def combine(cls, partitions): + """ + Convert a NumPy 2D array of partitions to a NumPy 2D array of a single partition. + + Parameters + ---------- + partitions : np.ndarray + The partitions which have to be converted to a single partition. + + Returns + ------- + np.ndarray + A NumPy 2D array of a single partition. + """ + + def to_pandas_remote(df, partition_shape, *dfs): + """Copy of ``cls.to_pandas()`` method adapted for a remote function.""" + return create_pandas_df_from_partitions( + (df,) + dfs, partition_shape, called_from_remote=True + ) + + preprocessed_func = cls.preprocess_func(to_pandas_remote) + partition_shape = partitions.shape + partitions_flattened = partitions.flatten() + for idx, part in enumerate(partitions_flattened): + if hasattr(part, "force_materialization"): + partitions_flattened[idx] = part.force_materialization() + partition_refs = [ + partition.list_of_blocks[0] for partition in partitions_flattened[1:] + ] + combined_partition = partitions.flat[0].apply( + preprocessed_func, partition_shape, *partition_refs + ) + return np.array([combined_partition]).reshape(1, -1) + @classmethod @wait_computations_if_benchmark_mode def apply_func_to_select_indices( diff --git a/modin/core/dataframe/pandas/utils.py b/modin/core/dataframe/pandas/utils.py index 98304ba89c3..6ecd8cba67b 100644 --- a/modin/core/dataframe/pandas/utils.py +++ b/modin/core/dataframe/pandas/utils.py @@ -17,8 +17,10 @@ import pandas from pandas.api.types import union_categoricals +from modin.error_message import ErrorMessage -def concatenate(dfs): + +def concatenate(dfs, copy=True): """ Concatenate pandas DataFrames with saving 'category' dtype. @@ -28,6 +30,8 @@ def concatenate(dfs): ---------- dfs : list List of pandas DataFrames to concatenate. + copy : bool, default: True + Make explicit copy when creating dataframe. Returns ------- @@ -60,8 +64,66 @@ def concatenate(dfs): i, pandas.Categorical(df.iloc[:, i], categories=union.categories) ) # `ValueError: buffer source array is read-only` if copy==False - if len(dfs) == 1: + if len(dfs) == 1 and copy: # concat doesn't make a copy if len(dfs) == 1, # so do it explicitly return dfs[0].copy() - return pandas.concat(dfs, copy=True) + return pandas.concat(dfs, copy=copy) + + +def create_pandas_df_from_partitions( + partition_data, partition_shape, called_from_remote=False +): + """ + Convert partition data of multiple dataframes to a single dataframe. + + Parameters + ---------- + partition_data : list + List of pandas DataFrames or list of Object references holding pandas DataFrames. + partition_shape : int or tuple + Shape of the partitions NumPy array. + called_from_remote : bool, default: False + Flag used to check if explicit copy should be done in concat. + + Returns + ------- + pandas.DataFrame + A pandas DataFrame. + """ + if all( + isinstance(obj, (pandas.DataFrame, pandas.Series)) for obj in partition_data + ): + height, width, *_ = tuple(partition_shape) + (0,) + # restore 2d array + objs = iter(partition_data) + partition_data = [[next(objs) for _ in range(width)] for __ in range(height)] + else: + # Partitions do not always contain pandas objects, for example, hdk uses pyarrow tables. + # This implementation comes from the fact that calling `partition.get` + # function is not always equivalent to `partition.to_pandas`. + partition_data = [[obj.to_pandas() for obj in part] for part in partition_data] + if all(isinstance(part, pandas.Series) for row in partition_data for part in row): + axis = 0 + elif all( + isinstance(part, pandas.DataFrame) for row in partition_data for part in row + ): + axis = 1 + else: + ErrorMessage.catch_bugs_and_request_email(True) + + def is_part_empty(part): + return part.empty and ( + not isinstance(part, pandas.DataFrame) or (len(part.columns) == 0) + ) + + df_rows = [ + pandas.concat([part for part in row], axis=axis, copy=False) + for row in partition_data + if not all(is_part_empty(part) for part in row) + ] + copy = not called_from_remote + if len(df_rows) == 0: + return pandas.DataFrame() + else: + return concatenate(df_rows, copy) diff --git a/modin/core/storage_formats/pandas/query_compiler.py b/modin/core/storage_formats/pandas/query_compiler.py index 90a3557f067..715efb3c4e8 100644 --- a/modin/core/storage_formats/pandas/query_compiler.py +++ b/modin/core/storage_formats/pandas/query_compiler.py @@ -520,6 +520,7 @@ def merge(self, right, **kwargs): left_index = kwargs.get("left_index", False) right_index = kwargs.get("right_index", False) sort = kwargs.get("sort", False) + right_to_broadcast = right._modin_frame.combine() if how in ["left", "inner"] and left_index is False and right_index is False: kwargs["sort"] = False @@ -620,7 +621,7 @@ def map_func( axis=1, func=map_func, enumerate_partitions=how == "left", - other=right._modin_frame, + other=right_to_broadcast, # We're going to explicitly change the shape across the 1-axis, # so we want for partitioning to adapt as well keep_partitioning=False, @@ -681,6 +682,7 @@ def join(self, right, **kwargs): on = kwargs.get("on", None) how = kwargs.get("how", "left") sort = kwargs.get("sort", False) + right_to_broadcast = right._modin_frame.combine() if how in ["left", "inner"]: @@ -697,7 +699,7 @@ def map_func(left, right, kwargs=kwargs): # pragma: no cover num_splits=merge_partitioning( self._modin_frame, right._modin_frame, axis=1 ), - other=right._modin_frame, + other=right_to_broadcast, ) ) return new_self.sort_rows_by_column_values(on) if sort else new_self