diff --git a/python/ray/data/_internal/table_block.py b/python/ray/data/_internal/table_block.py index 2164fcb99ea7..0366a1060b90 100644 --- a/python/ray/data/_internal/table_block.py +++ b/python/ray/data/_internal/table_block.py @@ -56,19 +56,22 @@ def __init__(self, block_type): self._block_type = block_type def add(self, item: Union[dict, TableRow, np.ndarray]) -> None: + import pandas as pd + import pyarrow as pa + if isinstance(item, TableRow): item = item.as_pydict() elif isinstance(item, np.ndarray): item = {TENSOR_COLUMN_NAME: item} if not isinstance(item, collections.abc.Mapping): raise ValueError( - "Returned elements of an TableBlock must be of type `dict`, " + "Returned elements of a TableBlock must be of type `dict`, " "got {} (type {}).".format(item, type(item)) ) item_column_names = item.keys() if self._column_names is not None: - # Check all added rows have same columns. + # Check all added rows have the same columns. if item_column_names != self._column_names: raise ValueError( "Current row has different columns compared to previous rows. " @@ -80,6 +83,11 @@ def add(self, item: Union[dict, TableRow, np.ndarray]) -> None: self._column_names = item_column_names for key, value in item.items(): + if isinstance(value, (pd.Timestamp, np.datetime64)): + # If it's a pandas Timestamp or numpy datetime64, convert to pyarrow + # Timestamp + value = pa.array([value], type=pa.timestamp("ns"))[0] + if is_array_like(value) and not isinstance(value, np.ndarray): value = np.array(value) self._columns[key].append(value) diff --git a/python/ray/data/tests/test_map.py b/python/ray/data/tests/test_map.py index a29e5925815f..8dfa78492273 100644 --- a/python/ray/data/tests/test_map.py +++ b/python/ray/data/tests/test_map.py @@ -4,6 +4,7 @@ import os import threading import time +from datetime import datetime from typing import Iterator import numpy as np @@ -330,6 +331,175 @@ def map_generator(item: dict) -> Iterator[int]: ] +# Helper function to process timestamp data in nanoseconds +def process_timestamp_data(row): + # Convert numpy.datetime64 to pd.Timestamp if needed + if isinstance(row["timestamp"], np.datetime64): + row["timestamp"] = pd.Timestamp(row["timestamp"]) + + # Add 1ns to timestamp + row["timestamp"] = row["timestamp"] + pd.Timedelta(1, "ns") + + # Ensure the timestamp column is in the expected dtype (datetime64[ns]) + row["timestamp"] = pd.to_datetime(row["timestamp"], errors="raise") + + return row + + +@pytest.mark.parametrize( + "df, expected_df", + [ + pytest.param( + pd.DataFrame( + { + "id": [1, 2, 3], + "timestamp": pd.to_datetime( + [ + "2024-01-01 00:00:00.123456789", + "2024-01-02 00:00:00.987654321", + "2024-01-03 00:00:00.111222333", + ] + ), + "value": [10.123456789, 20.987654321, 30.111222333], + } + ), + pd.DataFrame( + { + "id": [1, 2, 3], + "timestamp": pd.to_datetime( + [ + "2024-01-01 00:00:00.123456790", + "2024-01-02 00:00:00.987654322", + "2024-01-03 00:00:00.111222334", + ] + ), + "value": [10.123456789, 20.987654321, 30.111222333], + } + ), + id="nanoseconds_increment", + ) + ], +) +def test_map_timestamp_nanosecs(df, expected_df, ray_start_regular_shared): + ray_data = ray.data.from_pandas(df) + result = ray_data.map(process_timestamp_data) + # Convert the result back to a Pandas DataFrame + processed_df = result.to_pandas() + # Convert PyArrow Timestamps to Pandas Timestamps + processed_df["timestamp"] = processed_df["timestamp"].apply( + lambda x: pd.Timestamp(x.as_py()) + if isinstance(x, pa.lib.TimestampScalar) + else x + ) + # Ensure the dtype is correct + processed_df["timestamp"] = processed_df["timestamp"].astype("datetime64[ns]") + pd.testing.assert_frame_equal(processed_df, expected_df) + + +@pytest.mark.parametrize( + "df, expected_df", + [ + pytest.param( + pd.DataFrame( + { + "id": [1, 2, 3], + "timestamp": [ + np.datetime64("2024-01-01T00:00:00.123456789"), + np.datetime64("2024-01-02T00:00:00.987654321"), + np.datetime64("2024-01-03T00:00:00.111222333"), + ], + "value": [10.123456789, 20.987654321, 30.111222333], + } + ), + pd.DataFrame( + { + "id": [1, 2, 3], + "timestamp": pd.to_datetime( + [ + "2024-01-01 00:00:00.123456790", + "2024-01-02 00:00:00.987654322", + "2024-01-03 00:00:00.111222334", + ] + ), + "value": [10.123456789, 20.987654321, 30.111222333], + } + ), + id="numpy_datetime64", + ) + ], +) +def test_map_numpy_datetime(df, expected_df, ray_start_regular_shared): + ray_data = ray.data.from_pandas(df) + result = ray_data.map(process_timestamp_data) + # Convert the result back to a Pandas DataFrame + processed_df = result.to_pandas() + # Convert PyArrow Timestamps to Pandas Timestamps + processed_df["timestamp"] = processed_df["timestamp"].apply( + lambda x: pd.Timestamp(x.as_py()) + if isinstance(x, pa.lib.TimestampScalar) + else x + ) + # Ensure the dtype is correct + processed_df["timestamp"] = processed_df["timestamp"].astype("datetime64[ns]") + pd.testing.assert_frame_equal(processed_df, expected_df) + + +@pytest.mark.parametrize( + "df, expected_df", + [ + pytest.param( + pd.DataFrame( + { + "id": [1, 2, 3], + "timestamp": [ + datetime(2024, 1, 1, 0, 0, 0, 123456), + datetime(2024, 1, 2, 0, 0, 0, 987654), + datetime(2024, 1, 3, 0, 0, 0, 111222), + ], + "value": [10.123456789, 20.987654321, 30.111222333], + } + ), + pd.DataFrame( + { + "id": [1, 2, 3], + "timestamp": pd.to_datetime( + [ + "2024-01-01 00:00:00.123456001", + "2024-01-02 00:00:00.987654001", + "2024-01-03 00:00:00.111222001", + ] + ), + "value": [10.123456789, 20.987654321, 30.111222333], + } + ), + id="python_datetime", + ) + ], +) +def test_map_python_datetime(df, expected_df, ray_start_regular_shared): + # Convert the input DataFrame to Ray dataset + ray_data = ray.data.from_pandas(df) + result = ray_data.map(process_timestamp_data) + # Convert the result back to a Pandas DataFrame + processed_df = result.to_pandas() + # Convert PyArrow Timestamps to Pandas Timestamps + processed_df["timestamp"] = processed_df["timestamp"].apply( + lambda x: pd.Timestamp(x.as_py()) + if isinstance(x, pa.lib.TimestampScalar) + else x + ) + # Ensure the dtype is correct + processed_df["timestamp"] = processed_df["timestamp"].astype("datetime64[ns]") + + # Normalize timestamps to microseconds for comparison + # Applying ceil to round up the timestamps to ensure deterministic rounding behavior + processed_df["timestamp"] = processed_df["timestamp"].dt.round("ns") + expected_df["timestamp"] = expected_df["timestamp"].dt.round("ns") + + # Compare the processed DataFrame with the expected DataFrame + pd.testing.assert_frame_equal(processed_df, expected_df) + + def test_add_column(ray_start_regular_shared): """Tests the add column API."""