From 5924629cef24022a3f68b3f647576988b2c86ce8 Mon Sep 17 00:00:00 2001 From: Srinath Krishnamachari Date: Thu, 19 Dec 2024 21:53:06 +0000 Subject: [PATCH 01/11] Handle pandas timestamp with nanosecs precision Signed-off-by: Srinath Krishnamachari --- python/ray/data/_internal/table_block.py | 18 +++- python/ray/data/tests/test_map.py | 121 +++++++++++++++++++++++ 2 files changed, 137 insertions(+), 2 deletions(-) diff --git a/python/ray/data/_internal/table_block.py b/python/ray/data/_internal/table_block.py index 2164fcb99ea7..44475adbe67b 100644 --- a/python/ray/data/_internal/table_block.py +++ b/python/ray/data/_internal/table_block.py @@ -56,19 +56,24 @@ def __init__(self, block_type): self._block_type = block_type def add(self, item: Union[dict, TableRow, np.ndarray]) -> None: + from datetime import datetime + + import pandas as pd + import pyarrow as pa + if isinstance(item, TableRow): item = item.as_pydict() elif isinstance(item, np.ndarray): item = {TENSOR_COLUMN_NAME: item} if not isinstance(item, collections.abc.Mapping): raise ValueError( - "Returned elements of an TableBlock must be of type `dict`, " + "Returned elements of a TableBlock must be of type `dict`, " "got {} (type {}).".format(item, type(item)) ) item_column_names = item.keys() if self._column_names is not None: - # Check all added rows have same columns. + # Check all added rows have the same columns. if item_column_names != self._column_names: raise ValueError( "Current row has different columns compared to previous rows. " @@ -80,6 +85,15 @@ def add(self, item: Union[dict, TableRow, np.ndarray]) -> None: self._column_names = item_column_names for key, value in item.items(): + if isinstance(value, (pd.Timestamp, np.datetime64)): + # If it's a pandas Timestamp or numpy datetime64, convert to pyarrow + # Timestamp + value = pa.array([value], type=pa.timestamp("ns"))[0] + elif isinstance(value, datetime): + # Convert Python datetime to pandas Timestamp with nanosecond precision + value = pd.Timestamp(value) + value = pa.array([value], type=pa.timestamp("ns"))[0] + if is_array_like(value) and not isinstance(value, np.ndarray): value = np.array(value) self._columns[key].append(value) diff --git a/python/ray/data/tests/test_map.py b/python/ray/data/tests/test_map.py index a29e5925815f..f9839da5379f 100644 --- a/python/ray/data/tests/test_map.py +++ b/python/ray/data/tests/test_map.py @@ -4,6 +4,7 @@ import os import threading import time +from datetime import datetime from typing import Iterator import numpy as np @@ -330,6 +331,126 @@ def map_generator(item: dict) -> Iterator[int]: ] +# Helper function to process timestamp data in nanosecs +def process_timestamp_data(row): + # Convert numpy.datetime64 to pd.Timestamp if needed + if isinstance(row["timestamp"], np.datetime64): + row["timestamp"] = pd.Timestamp(row["timestamp"]) + + # Add 1ns to timestamp + row["timestamp"] = row["timestamp"] + pd.Timedelta(1, "ns") + return row + + +# Helper function to create a sample timestamp nanosecs DataFrame +def create_timestamp_dataframe(): + return pd.DataFrame( + { + "id": [1, 2, 3], + "timestamp": pd.to_datetime( + [ + "2024-01-01 00:00:00.123456789", + "2024-01-02 00:00:00.987654321", + "2024-01-03 00:00:00.111222333", + ] + ), + "value": [10.123456789, 20.987654321, 30.111222333], + } + ) + + +# Handle Timestamp nanoseconds +@pytest.mark.parametrize( + "df, expected_df", + [ + ( + create_timestamp_dataframe(), + pd.DataFrame( + { + "id": [1, 2, 3], + "timestamp": pd.to_datetime( + [ + "2024-01-01 00:00:00.123456790", + "2024-01-02 00:00:00.987654322", + "2024-01-03 00:00:00.111222334", + ] + ), + "value": [10.123456789, 20.987654321, 30.111222333], + } + ), + ) + ], +) +def test_map_timestamp_nanosecs(df, expected_df, ray_start_regular_shared): + ray_data = ray.data.from_pandas(df) + result = ray_data.map(process_timestamp_data) + processed_df = result.to_pandas() + assert processed_df.shape == df.shape, "DataFrame shapes do not match" + pd.testing.assert_frame_equal(processed_df, expected_df) + assert (processed_df["timestamp"] - df["timestamp"]).iloc[0] == pd.Timedelta( + 1, "ns" + ) + assert (processed_df["timestamp"] - df["timestamp"]).iloc[1] == pd.Timedelta( + 1, "ns" + ) + assert (processed_df["timestamp"] - df["timestamp"]).iloc[2] == pd.Timedelta( + 1, "ns" + ) + + +# Handle numpy.datetime64 values +@pytest.mark.parametrize( + "df", + [ + pd.DataFrame( + { + "id": [1, 2, 3], + "timestamp": [ + np.datetime64("2024-01-01T00:00:00.123456789"), + np.datetime64("2024-01-02T00:00:00.987654321"), + np.datetime64("2024-01-03T00:00:00.111222333"), + ], + "value": [10.123456789, 20.987654321, 30.111222333], + } + ) + ], +) +def test_map_numpy_datetime(df, ray_start_regular_shared): + ray_data = ray.data.from_pandas(df) + result = ray_data.map(process_timestamp_data) + processed_df = result.to_pandas() + # Ensure numpy.datetime64 is correctly converted to pandas Timestamp + assert isinstance(processed_df["timestamp"].iloc[0], pd.Timestamp) + + # Check that the timestamp has been incremented by 1ns + assert (processed_df["timestamp"] - df["timestamp"]).min() == pd.Timedelta(1, "ns") + + +# Handle Python datetime objects +@pytest.mark.parametrize( + "df", + [ + pd.DataFrame( + { + "id": [1, 2, 3], + "timestamp": [ + datetime(2024, 1, 1, 0, 0, 0, 123456), + datetime(2024, 1, 2, 0, 0, 0, 987654), + datetime(2024, 1, 3, 0, 0, 0, 111222), + ], + "value": [10.123456789, 20.987654321, 30.111222333], + } + ) + ], +) +def test_map_python_datetime(df, ray_start_regular_shared): + ray_data = ray.data.from_pandas(df) + result = ray_data.map(process_timestamp_data) + processed_df = result.to_pandas() + # Ensure Python datetime is correctly converted to pandas Timestamp + assert isinstance(processed_df["timestamp"].iloc[0], pd.Timestamp) + + def test_add_column(ray_start_regular_shared): """Tests the add column API.""" From ba975b5628e5a5ae4a9aae34989b66a641560a7c Mon Sep 17 00:00:00 2001 From: Srinath Krishnamachari Date: Mon, 23 Dec 2024 04:07:00 +0000 Subject: [PATCH 02/11] Addressed review comments Signed-off-by: Srinath Krishnamachari --- python/ray/data/_internal/table_block.py | 4 - python/ray/data/tests/test_map.py | 150 +++++++++++++---------- 2 files changed, 86 insertions(+), 68 deletions(-) diff --git a/python/ray/data/_internal/table_block.py b/python/ray/data/_internal/table_block.py index 44475adbe67b..43285adc13fb 100644 --- a/python/ray/data/_internal/table_block.py +++ b/python/ray/data/_internal/table_block.py @@ -89,10 +89,6 @@ def add(self, item: Union[dict, TableRow, np.ndarray]) -> None: # If it's a pandas Timestamp or numpy datetime64, convert to pyarrow # Timestamp value = pa.array([value], type=pa.timestamp("ns"))[0] - elif isinstance(value, datetime): - # Convert Python datetime to pandas Timestamp with nanosecond precision - value = pd.Timestamp(value) - value = pa.array([value], type=pa.timestamp("ns"))[0] if is_array_like(value) and not isinstance(value, np.ndarray): value = np.array(value) diff --git a/python/ray/data/tests/test_map.py b/python/ray/data/tests/test_map.py index f9839da5379f..fd306c9db464 100644 --- a/python/ray/data/tests/test_map.py +++ b/python/ray/data/tests/test_map.py @@ -331,7 +331,7 @@ def map_generator(item: dict) -> Iterator[int]: ] -# Helper function to process timestamp data in nanosecs +# Helper function to process timestamp data in nanoseconds def process_timestamp_data(row): # Convert numpy.datetime64 to pd.Timestamp if needed if isinstance(row["timestamp"], np.datetime64): @@ -342,29 +342,23 @@ def process_timestamp_data(row): return row -# Helper function to create a sample timestamp nanosecs DataFrame -def create_timestamp_dataframe(): - return pd.DataFrame( - { - "id": [1, 2, 3], - "timestamp": pd.to_datetime( - [ - "2024-01-01 00:00:00.123456789", - "2024-01-02 00:00:00.987654321", - "2024-01-03 00:00:00.111222333", - ] - ), - "value": [10.123456789, 20.987654321, 30.111222333], - } - ) - - -# Handle Timestamp nanoseconds @pytest.mark.parametrize( "df, expected_df", [ - ( - create_timestamp_dataframe(), + pytest.param( + pd.DataFrame( + { + "id": [1, 2, 3], + "timestamp": pd.to_datetime( + [ + "2024-01-01 00:00:00.123456789", + "2024-01-02 00:00:00.987654321", + "2024-01-03 00:00:00.111222333", + ] + ), + "value": [10.123456789, 20.987654321, 30.111222333], + } + ), pd.DataFrame( { "id": [1, 2, 3], @@ -378,6 +372,7 @@ def create_timestamp_dataframe(): "value": [10.123456789, 20.987654321, 30.111222333], } ), + id="nanoseconds_increment", ) ], ) @@ -385,70 +380,97 @@ def test_map_timestamp_nanosecs(df, expected_df, ray_start_regular_shared): ray_data = ray.data.from_pandas(df) result = ray_data.map(process_timestamp_data) processed_df = result.to_pandas() - assert processed_df.shape == df.shape, "DataFrame shapes do not match" pd.testing.assert_frame_equal(processed_df, expected_df) - assert (processed_df["timestamp"] - df["timestamp"]).iloc[0] == pd.Timedelta( - 1, "ns" - ) - assert (processed_df["timestamp"] - df["timestamp"]).iloc[1] == pd.Timedelta( - 1, "ns" - ) - assert (processed_df["timestamp"] - df["timestamp"]).iloc[2] == pd.Timedelta( - 1, "ns" - ) -# Handle numpy.datetime64 values @pytest.mark.parametrize( - "df", + "df, expected_df", [ - pd.DataFrame( - { - "id": [1, 2, 3], - "timestamp": [ - np.datetime64("2024-01-01T00:00:00.123456789"), - np.datetime64("2024-01-02T00:00:00.987654321"), - np.datetime64("2024-01-03T00:00:00.111222333"), - ], - "value": [10.123456789, 20.987654321, 30.111222333], - } + pytest.param( + pd.DataFrame( + { + "id": [1, 2, 3], + "timestamp": [ + np.datetime64("2024-01-01T00:00:00.123456789"), + np.datetime64("2024-01-02T00:00:00.987654321"), + np.datetime64("2024-01-03T00:00:00.111222333"), + ], + "value": [10.123456789, 20.987654321, 30.111222333], + } + ), + pd.DataFrame( + { + "id": [1, 2, 3], + "timestamp": pd.to_datetime( + [ + "2024-01-01 00:00:00.123456790", + "2024-01-02 00:00:00.987654322", + "2024-01-03 00:00:00.111222334", + ] + ), + "value": [10.123456789, 20.987654321, 30.111222333], + } + ), + id="numpy_datetime64", ) ], ) -def test_map_numpy_datetime(df, ray_start_regular_shared): +def test_map_numpy_datetime(df, expected_df, ray_start_regular_shared): ray_data = ray.data.from_pandas(df) result = ray_data.map(process_timestamp_data) processed_df = result.to_pandas() - # Ensure numpy.datetime64 is correctly converted to pandas Timestamp - assert isinstance(processed_df["timestamp"].iloc[0], pd.Timestamp) - - # Check that the timestamp has been incremented by 1ns - assert (processed_df["timestamp"] - df["timestamp"]).min() == pd.Timedelta(1, "ns") + pd.testing.assert_frame_equal(processed_df, expected_df) -# Handle Python datetime objects @pytest.mark.parametrize( - "df", + "df, expected_df", [ - pd.DataFrame( - { - "id": [1, 2, 3], - "timestamp": [ - datetime(2024, 1, 1, 0, 0, 0, 123456), - datetime(2024, 1, 2, 0, 0, 0, 987654), - datetime(2024, 1, 3, 0, 0, 0, 111222), - ], - "value": [10.123456789, 20.987654321, 30.111222333], - } + pytest.param( + pd.DataFrame( + { + "id": [1, 2, 3], + "timestamp": [ + datetime(2024, 1, 1, 0, 0, 0, 123456), + datetime(2024, 1, 2, 0, 0, 0, 987654), + datetime(2024, 1, 3, 0, 0, 0, 111222), + ], + "value": [10.123456789, 20.987654321, 30.111222333], + } + ), + pd.DataFrame( + { + "id": [1, 2, 3], + "timestamp": pd.to_datetime( + [ + "2024-01-01 00:00:00.123456", + "2024-01-02 00:00:00.987654", + "2024-01-03 00:00:00.111222", + ] + ), + "value": [10.123456789, 20.987654321, 30.111222333], + } + ), + id="python_datetime", ) ], ) -def test_map_python_datetime(df, ray_start_regular_shared): +def test_map_python_datetime(df, expected_df, ray_start_regular_shared): + # Convert the input DataFrame to Ray dataset ray_data = ray.data.from_pandas(df) + + # Apply the processing function to the Ray dataset result = ray_data.map(process_timestamp_data) + + # Convert the result back to a Pandas DataFrame processed_df = result.to_pandas() - # Ensure Python datetime is correctly converted to pandas Timestamp - assert isinstance(processed_df["timestamp"].iloc[0], pd.Timestamp) + + # Normalize timestamps to microseconds for comparison + # Applying ceil to round up the timestamps to ensure deterministic rounding behavior + processed_df["timestamp"] = processed_df["timestamp"].dt.round("us") + expected_df["timestamp"] = expected_df["timestamp"].dt.round("us") + + # Compare the processed DataFrame with the expected DataFrame + pd.testing.assert_frame_equal(processed_df, expected_df) def test_add_column(ray_start_regular_shared): From 11e962b30fb9f8608b04d95892d3bf56fcc54a87 Mon Sep 17 00:00:00 2001 From: srinathk10 <68668616+srinathk10@users.noreply.github.com> Date: Sun, 22 Dec 2024 21:02:50 -0800 Subject: [PATCH 03/11] Update table_block.py Signed-off-by: srinathk10 <68668616+srinathk10@users.noreply.github.com> --- python/ray/data/_internal/table_block.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/python/ray/data/_internal/table_block.py b/python/ray/data/_internal/table_block.py index 43285adc13fb..f5b665d7fe4f 100644 --- a/python/ray/data/_internal/table_block.py +++ b/python/ray/data/_internal/table_block.py @@ -56,11 +56,8 @@ def __init__(self, block_type): self._block_type = block_type def add(self, item: Union[dict, TableRow, np.ndarray]) -> None: - from datetime import datetime - import pandas as pd import pyarrow as pa - if isinstance(item, TableRow): item = item.as_pydict() elif isinstance(item, np.ndarray): From 18fd010a81fa735917794dbc924cabc8146e47ad Mon Sep 17 00:00:00 2001 From: srinathk10 <68668616+srinathk10@users.noreply.github.com> Date: Sun, 22 Dec 2024 21:32:05 -0800 Subject: [PATCH 04/11] Update table_block.py Signed-off-by: srinathk10 <68668616+srinathk10@users.noreply.github.com> --- python/ray/data/_internal/table_block.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/ray/data/_internal/table_block.py b/python/ray/data/_internal/table_block.py index f5b665d7fe4f..0366a1060b90 100644 --- a/python/ray/data/_internal/table_block.py +++ b/python/ray/data/_internal/table_block.py @@ -58,6 +58,7 @@ def __init__(self, block_type): def add(self, item: Union[dict, TableRow, np.ndarray]) -> None: import pandas as pd import pyarrow as pa + if isinstance(item, TableRow): item = item.as_pydict() elif isinstance(item, np.ndarray): From df76bce2105f4f383a83bee89fdd275e0966f4c1 Mon Sep 17 00:00:00 2001 From: Srinath Krishnamachari Date: Mon, 23 Dec 2024 20:43:27 +0000 Subject: [PATCH 05/11] Test fix Signed-off-by: Srinath Krishnamachari --- python/ray/data/tests/test_map.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/python/ray/data/tests/test_map.py b/python/ray/data/tests/test_map.py index fd306c9db464..96f6af8d4eee 100644 --- a/python/ray/data/tests/test_map.py +++ b/python/ray/data/tests/test_map.py @@ -339,6 +339,10 @@ def process_timestamp_data(row): # Add 1ns to timestamp row["timestamp"] = row["timestamp"] + pd.Timedelta(1, "ns") + + # Ensure the timestamp column is in the expected dtype (datetime64[ns]) + row["timestamp"] = pd.to_datetime(row["timestamp"], errors='raise') + return row From 9acadd592aa9667134adb7e2eefec0d412c882f3 Mon Sep 17 00:00:00 2001 From: srinathk10 <68668616+srinathk10@users.noreply.github.com> Date: Mon, 23 Dec 2024 13:31:38 -0800 Subject: [PATCH 06/11] Update test_map.py Signed-off-by: srinathk10 <68668616+srinathk10@users.noreply.github.com> --- python/ray/data/tests/test_map.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/data/tests/test_map.py b/python/ray/data/tests/test_map.py index 96f6af8d4eee..fdc5bb67c42e 100644 --- a/python/ray/data/tests/test_map.py +++ b/python/ray/data/tests/test_map.py @@ -341,7 +341,7 @@ def process_timestamp_data(row): row["timestamp"] = row["timestamp"] + pd.Timedelta(1, "ns") # Ensure the timestamp column is in the expected dtype (datetime64[ns]) - row["timestamp"] = pd.to_datetime(row["timestamp"], errors='raise') + row["timestamp"] = pd.to_datetime(row["timestamp"], errors="raise") return row From cb6570d85f74f05512a225d42012b8394ec11c26 Mon Sep 17 00:00:00 2001 From: Srinath Krishnamachari Date: Mon, 23 Dec 2024 21:27:53 +0000 Subject: [PATCH 07/11] Fix lint issues Signed-off-by: Srinath Krishnamachari --- python/ray/data/tests/test_map.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/python/ray/data/tests/test_map.py b/python/ray/data/tests/test_map.py index fdc5bb67c42e..dbc449d55dbe 100644 --- a/python/ray/data/tests/test_map.py +++ b/python/ray/data/tests/test_map.py @@ -446,9 +446,9 @@ def test_map_numpy_datetime(df, expected_df, ray_start_regular_shared): "id": [1, 2, 3], "timestamp": pd.to_datetime( [ - "2024-01-01 00:00:00.123456", - "2024-01-02 00:00:00.987654", - "2024-01-03 00:00:00.111222", + "2024-01-01 00:00:00.123456001", + "2024-01-02 00:00:00.987654001", + "2024-01-03 00:00:00.111222001", ] ), "value": [10.123456789, 20.987654321, 30.111222333], @@ -470,8 +470,8 @@ def test_map_python_datetime(df, expected_df, ray_start_regular_shared): # Normalize timestamps to microseconds for comparison # Applying ceil to round up the timestamps to ensure deterministic rounding behavior - processed_df["timestamp"] = processed_df["timestamp"].dt.round("us") - expected_df["timestamp"] = expected_df["timestamp"].dt.round("us") + processed_df["timestamp"] = processed_df["timestamp"].dt.round("ns") + expected_df["timestamp"] = expected_df["timestamp"].dt.round("ns") # Compare the processed DataFrame with the expected DataFrame pd.testing.assert_frame_equal(processed_df, expected_df) From 80ed7c7655e35205d260e2536c127cbff95f9af6 Mon Sep 17 00:00:00 2001 From: Srinath Krishnamachari Date: Mon, 23 Dec 2024 22:23:32 +0000 Subject: [PATCH 08/11] Test fixes Signed-off-by: Srinath Krishnamachari --- python/ray/data/tests/test_map.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/python/ray/data/tests/test_map.py b/python/ray/data/tests/test_map.py index dbc449d55dbe..fdc67db4c8c4 100644 --- a/python/ray/data/tests/test_map.py +++ b/python/ray/data/tests/test_map.py @@ -384,6 +384,8 @@ def test_map_timestamp_nanosecs(df, expected_df, ray_start_regular_shared): ray_data = ray.data.from_pandas(df) result = ray_data.map(process_timestamp_data) processed_df = result.to_pandas() + # Ensure the 'timestamp' column is of the correct dtype + processed_df["timestamp"] = pd.to_datetime(processed_df["timestamp"]) pd.testing.assert_frame_equal(processed_df, expected_df) @@ -423,6 +425,8 @@ def test_map_numpy_datetime(df, expected_df, ray_start_regular_shared): ray_data = ray.data.from_pandas(df) result = ray_data.map(process_timestamp_data) processed_df = result.to_pandas() + # Ensure the 'timestamp' column is of the correct dtype + processed_df["timestamp"] = pd.to_datetime(processed_df["timestamp"]) pd.testing.assert_frame_equal(processed_df, expected_df) @@ -468,6 +472,9 @@ def test_map_python_datetime(df, expected_df, ray_start_regular_shared): # Convert the result back to a Pandas DataFrame processed_df = result.to_pandas() + # Ensure the 'timestamp' column is of the correct dtype + processed_df["timestamp"] = pd.to_datetime(processed_df["timestamp"]) + # Normalize timestamps to microseconds for comparison # Applying ceil to round up the timestamps to ensure deterministic rounding behavior processed_df["timestamp"] = processed_df["timestamp"].dt.round("ns") From f6d52bdcdf430454d352c3a0069c0aff466b1358 Mon Sep 17 00:00:00 2001 From: Srinath Krishnamachari Date: Tue, 24 Dec 2024 00:02:48 +0000 Subject: [PATCH 09/11] Test fix Signed-off-by: Srinath Krishnamachari --- python/ray/data/tests/test_map.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/python/ray/data/tests/test_map.py b/python/ray/data/tests/test_map.py index fdc67db4c8c4..2237f9f5142c 100644 --- a/python/ray/data/tests/test_map.py +++ b/python/ray/data/tests/test_map.py @@ -384,8 +384,7 @@ def test_map_timestamp_nanosecs(df, expected_df, ray_start_regular_shared): ray_data = ray.data.from_pandas(df) result = ray_data.map(process_timestamp_data) processed_df = result.to_pandas() - # Ensure the 'timestamp' column is of the correct dtype - processed_df["timestamp"] = pd.to_datetime(processed_df["timestamp"]) + processed_df["timestamp"] = processed_df["timestamp"].astype("datetime64[ns]") pd.testing.assert_frame_equal(processed_df, expected_df) @@ -425,8 +424,7 @@ def test_map_numpy_datetime(df, expected_df, ray_start_regular_shared): ray_data = ray.data.from_pandas(df) result = ray_data.map(process_timestamp_data) processed_df = result.to_pandas() - # Ensure the 'timestamp' column is of the correct dtype - processed_df["timestamp"] = pd.to_datetime(processed_df["timestamp"]) + processed_df["timestamp"] = processed_df["timestamp"].astype("datetime64[ns]") pd.testing.assert_frame_equal(processed_df, expected_df) @@ -471,9 +469,7 @@ def test_map_python_datetime(df, expected_df, ray_start_regular_shared): # Convert the result back to a Pandas DataFrame processed_df = result.to_pandas() - - # Ensure the 'timestamp' column is of the correct dtype - processed_df["timestamp"] = pd.to_datetime(processed_df["timestamp"]) + processed_df["timestamp"] = processed_df["timestamp"].astype("datetime64[ns]") # Normalize timestamps to microseconds for comparison # Applying ceil to round up the timestamps to ensure deterministic rounding behavior From 50b05d070dac1d4968e92c59e2a29a779d25521d Mon Sep 17 00:00:00 2001 From: Srinath Krishnamachari Date: Tue, 24 Dec 2024 00:02:48 +0000 Subject: [PATCH 10/11] Test fix Signed-off-by: Srinath Krishnamachari --- python/ray/data/tests/test_map.py | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/python/ray/data/tests/test_map.py b/python/ray/data/tests/test_map.py index 2237f9f5142c..b30deb3cc5f3 100644 --- a/python/ray/data/tests/test_map.py +++ b/python/ray/data/tests/test_map.py @@ -383,7 +383,13 @@ def process_timestamp_data(row): def test_map_timestamp_nanosecs(df, expected_df, ray_start_regular_shared): ray_data = ray.data.from_pandas(df) result = ray_data.map(process_timestamp_data) + # Convert the result back to a Pandas DataFrame processed_df = result.to_pandas() + # Convert PyArrow Timestamps to Pandas Timestamps + processed_df["timestamp"] = processed_df["timestamp"].apply( + lambda x: pd.Timestamp(x.as_py()) if isinstance(x, pa.lib.TimestampScalar) else x + ) + # Ensure the dtype is correct processed_df["timestamp"] = processed_df["timestamp"].astype("datetime64[ns]") pd.testing.assert_frame_equal(processed_df, expected_df) @@ -423,7 +429,13 @@ def test_map_timestamp_nanosecs(df, expected_df, ray_start_regular_shared): def test_map_numpy_datetime(df, expected_df, ray_start_regular_shared): ray_data = ray.data.from_pandas(df) result = ray_data.map(process_timestamp_data) + # Convert the result back to a Pandas DataFrame processed_df = result.to_pandas() + # Convert PyArrow Timestamps to Pandas Timestamps + processed_df["timestamp"] = processed_df["timestamp"].apply( + lambda x: pd.Timestamp(x.as_py()) if isinstance(x, pa.lib.TimestampScalar) else x + ) + # Ensure the dtype is correct processed_df["timestamp"] = processed_df["timestamp"].astype("datetime64[ns]") pd.testing.assert_frame_equal(processed_df, expected_df) @@ -463,12 +475,14 @@ def test_map_numpy_datetime(df, expected_df, ray_start_regular_shared): def test_map_python_datetime(df, expected_df, ray_start_regular_shared): # Convert the input DataFrame to Ray dataset ray_data = ray.data.from_pandas(df) - - # Apply the processing function to the Ray dataset result = ray_data.map(process_timestamp_data) - # Convert the result back to a Pandas DataFrame processed_df = result.to_pandas() + # Convert PyArrow Timestamps to Pandas Timestamps + processed_df["timestamp"] = processed_df["timestamp"].apply( + lambda x: pd.Timestamp(x.as_py()) if isinstance(x, pa.lib.TimestampScalar) else x + ) + # Ensure the dtype is correct processed_df["timestamp"] = processed_df["timestamp"].astype("datetime64[ns]") # Normalize timestamps to microseconds for comparison From 427e753c10132245d80c67d9b3ef57a4c0da5940 Mon Sep 17 00:00:00 2001 From: Srinath Krishnamachari Date: Tue, 24 Dec 2024 02:18:01 +0000 Subject: [PATCH 11/11] Lint errors Signed-off-by: Srinath Krishnamachari --- python/ray/data/tests/test_map.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/python/ray/data/tests/test_map.py b/python/ray/data/tests/test_map.py index b30deb3cc5f3..8dfa78492273 100644 --- a/python/ray/data/tests/test_map.py +++ b/python/ray/data/tests/test_map.py @@ -387,7 +387,9 @@ def test_map_timestamp_nanosecs(df, expected_df, ray_start_regular_shared): processed_df = result.to_pandas() # Convert PyArrow Timestamps to Pandas Timestamps processed_df["timestamp"] = processed_df["timestamp"].apply( - lambda x: pd.Timestamp(x.as_py()) if isinstance(x, pa.lib.TimestampScalar) else x + lambda x: pd.Timestamp(x.as_py()) + if isinstance(x, pa.lib.TimestampScalar) + else x ) # Ensure the dtype is correct processed_df["timestamp"] = processed_df["timestamp"].astype("datetime64[ns]") @@ -433,7 +435,9 @@ def test_map_numpy_datetime(df, expected_df, ray_start_regular_shared): processed_df = result.to_pandas() # Convert PyArrow Timestamps to Pandas Timestamps processed_df["timestamp"] = processed_df["timestamp"].apply( - lambda x: pd.Timestamp(x.as_py()) if isinstance(x, pa.lib.TimestampScalar) else x + lambda x: pd.Timestamp(x.as_py()) + if isinstance(x, pa.lib.TimestampScalar) + else x ) # Ensure the dtype is correct processed_df["timestamp"] = processed_df["timestamp"].astype("datetime64[ns]") @@ -480,7 +484,9 @@ def test_map_python_datetime(df, expected_df, ray_start_regular_shared): processed_df = result.to_pandas() # Convert PyArrow Timestamps to Pandas Timestamps processed_df["timestamp"] = processed_df["timestamp"].apply( - lambda x: pd.Timestamp(x.as_py()) if isinstance(x, pa.lib.TimestampScalar) else x + lambda x: pd.Timestamp(x.as_py()) + if isinstance(x, pa.lib.TimestampScalar) + else x ) # Ensure the dtype is correct processed_df["timestamp"] = processed_df["timestamp"].astype("datetime64[ns]")