Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle pandas timestamp with nanosecs precision #49370

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
12 changes: 10 additions & 2 deletions python/ray/data/_internal/table_block.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,19 +56,22 @@ def __init__(self, block_type):
self._block_type = block_type

def add(self, item: Union[dict, TableRow, np.ndarray]) -> None:
import pandas as pd
import pyarrow as pa

if isinstance(item, TableRow):
item = item.as_pydict()
elif isinstance(item, np.ndarray):
item = {TENSOR_COLUMN_NAME: item}
if not isinstance(item, collections.abc.Mapping):
raise ValueError(
"Returned elements of an TableBlock must be of type `dict`, "
"Returned elements of a TableBlock must be of type `dict`, "
"got {} (type {}).".format(item, type(item))
)

item_column_names = item.keys()
if self._column_names is not None:
# Check all added rows have same columns.
# Check all added rows have the same columns.
if item_column_names != self._column_names:
raise ValueError(
"Current row has different columns compared to previous rows. "
Expand All @@ -80,6 +83,11 @@ def add(self, item: Union[dict, TableRow, np.ndarray]) -> None:
self._column_names = item_column_names

for key, value in item.items():
if isinstance(value, (pd.Timestamp, np.datetime64)):
# If it's a pandas Timestamp or numpy datetime64, convert to pyarrow
# Timestamp
value = pa.array([value], type=pa.timestamp("ns"))[0]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the target block type is pandas, will converting it to pyarrow Timestamp be compatible?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably put this in the arrow_block.py subclass

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, can you comment that the purpose of this conversion is to avoid losing precision?


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any ways that this conversion can fail or precision can be lost? I imagine probably not but might be good to note anything here if there are.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey again, piping in from the original issue.

If this is already a datetime object, hasn't it already lost nanosecond precision? Is it too late to perform this coercion?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default behavior is type coercion as we discovered in the bug from the test case. This is explicitly handling the type conversion for nanoseconds.

if is_array_like(value) and not isinstance(value, np.ndarray):
value = np.array(value)
self._columns[key].append(value)
Expand Down
170 changes: 170 additions & 0 deletions python/ray/data/tests/test_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import os
import threading
import time
from datetime import datetime
from typing import Iterator

import numpy as np
Expand Down Expand Up @@ -330,6 +331,175 @@ def map_generator(item: dict) -> Iterator[int]:
]


# Helper function to process timestamp data in nanoseconds
def process_timestamp_data(row):
# Convert numpy.datetime64 to pd.Timestamp if needed
if isinstance(row["timestamp"], np.datetime64):
row["timestamp"] = pd.Timestamp(row["timestamp"])

# Add 1ns to timestamp
row["timestamp"] = row["timestamp"] + pd.Timedelta(1, "ns")

# Ensure the timestamp column is in the expected dtype (datetime64[ns])
row["timestamp"] = pd.to_datetime(row["timestamp"], errors="raise")

return row


@pytest.mark.parametrize(
"df, expected_df",
[
pytest.param(
pd.DataFrame(
{
"id": [1, 2, 3],
"timestamp": pd.to_datetime(
[
"2024-01-01 00:00:00.123456789",
"2024-01-02 00:00:00.987654321",
"2024-01-03 00:00:00.111222333",
]
),
"value": [10.123456789, 20.987654321, 30.111222333],
}
),
pd.DataFrame(
{
"id": [1, 2, 3],
"timestamp": pd.to_datetime(
[
"2024-01-01 00:00:00.123456790",
"2024-01-02 00:00:00.987654322",
"2024-01-03 00:00:00.111222334",
]
),
"value": [10.123456789, 20.987654321, 30.111222333],
}
),
id="nanoseconds_increment",
)
],
)
def test_map_timestamp_nanosecs(df, expected_df, ray_start_regular_shared):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • can you document the purpose of each test?
  • maybe put them in test_pandas.py, as the issue is specific to Pandas.

ray_data = ray.data.from_pandas(df)
result = ray_data.map(process_timestamp_data)
# Convert the result back to a Pandas DataFrame
processed_df = result.to_pandas()
# Convert PyArrow Timestamps to Pandas Timestamps
processed_df["timestamp"] = processed_df["timestamp"].apply(
lambda x: pd.Timestamp(x.as_py())
if isinstance(x, pa.lib.TimestampScalar)
else x
)
# Ensure the dtype is correct
processed_df["timestamp"] = processed_df["timestamp"].astype("datetime64[ns]")
pd.testing.assert_frame_equal(processed_df, expected_df)


@pytest.mark.parametrize(
"df, expected_df",
[
pytest.param(
pd.DataFrame(
{
"id": [1, 2, 3],
"timestamp": [
np.datetime64("2024-01-01T00:00:00.123456789"),
np.datetime64("2024-01-02T00:00:00.987654321"),
np.datetime64("2024-01-03T00:00:00.111222333"),
],
"value": [10.123456789, 20.987654321, 30.111222333],
}
),
pd.DataFrame(
{
"id": [1, 2, 3],
"timestamp": pd.to_datetime(
[
"2024-01-01 00:00:00.123456790",
"2024-01-02 00:00:00.987654322",
"2024-01-03 00:00:00.111222334",
]
),
"value": [10.123456789, 20.987654321, 30.111222333],
}
),
id="numpy_datetime64",
)
],
)
def test_map_numpy_datetime(df, expected_df, ray_start_regular_shared):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: It seems like test_map_numpy_datetime and test_map_timestamp_nanosecs have the same body? Can probably move this to be one test with two different sets of parameters.

ray_data = ray.data.from_pandas(df)
result = ray_data.map(process_timestamp_data)
# Convert the result back to a Pandas DataFrame
processed_df = result.to_pandas()
# Convert PyArrow Timestamps to Pandas Timestamps
processed_df["timestamp"] = processed_df["timestamp"].apply(
lambda x: pd.Timestamp(x.as_py())
if isinstance(x, pa.lib.TimestampScalar)
else x
)
# Ensure the dtype is correct
processed_df["timestamp"] = processed_df["timestamp"].astype("datetime64[ns]")
pd.testing.assert_frame_equal(processed_df, expected_df)


@pytest.mark.parametrize(
"df, expected_df",
[
pytest.param(
pd.DataFrame(
{
"id": [1, 2, 3],
"timestamp": [
datetime(2024, 1, 1, 0, 0, 0, 123456),
datetime(2024, 1, 2, 0, 0, 0, 987654),
datetime(2024, 1, 3, 0, 0, 0, 111222),
],
"value": [10.123456789, 20.987654321, 30.111222333],
}
),
pd.DataFrame(
{
"id": [1, 2, 3],
"timestamp": pd.to_datetime(
[
"2024-01-01 00:00:00.123456001",
"2024-01-02 00:00:00.987654001",
"2024-01-03 00:00:00.111222001",
]
),
"value": [10.123456789, 20.987654321, 30.111222333],
}
),
id="python_datetime",
)
],
)
def test_map_python_datetime(df, expected_df, ray_start_regular_shared):
# Convert the input DataFrame to Ray dataset
ray_data = ray.data.from_pandas(df)
result = ray_data.map(process_timestamp_data)
# Convert the result back to a Pandas DataFrame
processed_df = result.to_pandas()
# Convert PyArrow Timestamps to Pandas Timestamps
processed_df["timestamp"] = processed_df["timestamp"].apply(
lambda x: pd.Timestamp(x.as_py())
if isinstance(x, pa.lib.TimestampScalar)
else x
)
# Ensure the dtype is correct
processed_df["timestamp"] = processed_df["timestamp"].astype("datetime64[ns]")

# Normalize timestamps to microseconds for comparison
# Applying ceil to round up the timestamps to ensure deterministic rounding behavior
processed_df["timestamp"] = processed_df["timestamp"].dt.round("ns")
expected_df["timestamp"] = expected_df["timestamp"].dt.round("ns")

# Compare the processed DataFrame with the expected DataFrame
pd.testing.assert_frame_equal(processed_df, expected_df)


def test_add_column(ray_start_regular_shared):
"""Tests the add column API."""

Expand Down
Loading