Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] Inconsistent behavior with Ray Data and timestamps #49297

Closed
NellyWhads opened this issue Dec 17, 2024 · 8 comments · Fixed by #49370
Closed

[Data] Inconsistent behavior with Ray Data and timestamps #49297

NellyWhads opened this issue Dec 17, 2024 · 8 comments · Fixed by #49370
Assignees
Labels
bug Something that is supposed to be working; but isn't data Ray Data-related issues P1 Issue that should be fixed within a few weeks

Comments

@NellyWhads
Copy link

NellyWhads commented Dec 17, 2024

What happened + What you expected to happen

*This example has been provided as a gist instead of in-line code to make it easier to reproduce and trace.

Two odd behaviors are identified:

  1. When working with flat-structured datasets, ray data performs destructive type conversions when using the .map() API.
  2. When working with nest-structed datasets (ie. data with sub-structs), ray data performs destructive type conversions when using the .map_batches() (or .take_batches()) API.

The first issue is isolated to just ray.data, while the second is demonstrated to fail with the pandas API as well.

This currently blocks me from using ray data as I cannot extract timestamps consistently across my data sources.

Versions / Dependencies

Name: numpy
Version: 1.24.4

Name: pandas
Version: 2.0.3

Name: pyarrow
Version: 17.0.0

Name: ray
Version: 2.10.0

Name: host
Version: Darwin [hostname-redacted] 23.5.0 Darwin Kernel Version 23.5.0: Wed May 1 20:13:18 PDT 2024; root:xnu-10063.121.3~5/RELEASE_ARM64_T6030 arm64

Reproduction script

Gist: https://gist.github.com/NellyWhads/fdfb261a027be7e7bc87bec91d9e9035

Issue Severity

High: It blocks me from completing my task.

@NellyWhads NellyWhads added bug Something that is supposed to be working; but isn't triage Needs triage (eg: priority, bug/not-bug, and owning component) labels Dec 17, 2024
@jcotant1 jcotant1 added the data Ray Data-related issues label Dec 17, 2024
@PawaritL
Copy link

PawaritL commented Dec 17, 2024

@jcotant1 the crux for both issues is around how we handle nanoseconds

  1. for the 1st behavior: .map() + .take_batch(..., batch_format="pandas") truncates the time information (see the ns field)
  2. for the 2nd behavior (timestamps within a struct):
    • currently it casts timestamps with nanoseconds to integers while those with lower precisions are preserved as Python datetime objects. This prohibits any caller from relying on the type, and requires them to coerce the value back to the appropriate type before every use
    • since Python's datetime and timedelta objects don't support nanoseconds, @NellyWhads asked if Ray can standardize around np.datetime64 and/or pd.Timestamp (perhaps depending on the batch format)?

@richardliaw richardliaw added P1 Issue that should be fixed within a few weeks and removed triage Needs triage (eg: priority, bug/not-bug, and owning component) labels Dec 17, 2024
@NellyWhads
Copy link
Author

NellyWhads commented Dec 17, 2024

Based on the warnings logged from ray data, map_batch() / take_batch() is the API users should target since it is more consistent with the interface to the ray training runner. If this is the case, the 2nd issue is of much higher priority than the first.

@srinathk10
Copy link
Contributor

Test case for this.

import ray
import pandas as pd

# Initialize Ray
ray.init(ignore_reinit_error=True)

# Sample flat-structured dataset (pandas DataFrame)
df = pd.DataFrame({
    'id': [1, 2, 3],
    'timestamp': pd.to_datetime(['2024-01-01 00:00:00.123456789',
                                 '2024-01-02 00:00:00.987654321',
                                 '2024-01-03 00:00:00.111222333']),
    'value': [10.123456789, 20.987654321, 30.111222333]
})

# Ray data setup
ray_data = ray.data.from_pandas(df)

# Define a function to apply with .map()
def process_data(row):
    # Example of applying a transformation
    row['timestamp'] = row['timestamp'] + pd.Timedelta(1, 'ns')  # add 1ns to timestamp
    return row

# Apply the .map() function
result = ray_data.map(process_data)

# Convert back to pandas for inspection
processed_df = result.to_pandas()

# Print the original and processed data
print("Original DataFrame:")
print(df)
print("\nProcessed DataFrame after .map():")
print(processed_df)

Original DataFrame:
   id                     timestamp      value
0   1 2024-01-01 00:00:00.123456789  10.123457
1   2 2024-01-02 00:00:00.987654321  20.987654
2   3 2024-01-03 00:00:00.111222333  30.111222

Processed DataFrame after .map():
   id                  timestamp      value
0   1 2024-01-01 00:00:00.123456  10.123457
1   2 2024-01-02 00:00:00.987654  20.987654
2   3 2024-01-03 00:00:00.111222  30.111222

@PawaritL
Copy link

PawaritL commented Dec 18, 2024

for the 2nd issue, i believe it's because Ray Data relies on PyArrow's Pandas integration. PyArrow doesn't seem to respect the conversion table here if the timestamps are inside a StructArray

Repro:

import pandas as pd
import pyarrow as pa

base_timestamp = pd.Timestamp("2024-01-01")

start_times = pa.array([base_timestamp + pd.Timedelta(nanoseconds=i) for i in range(0, 5)], type=pa.timestamp('ns'))
end_times = pa.array([base_timestamp + pd.Timedelta(nanoseconds=j) for j in range(1, 6)], type=pa.timestamp('ns'))

struct_array = pa.StructArray.from_arrays(
    [start_times, end_times], 
    fields=[
        pa.field("start_time", pa.timestamp('ns')), 
        pa.field("end_time", pa.timestamp('ns'))
        ]
    )
# when not using StructArray, conversion works as expected per https://arrow.apache.org/docs/python/pandas.html#arrow-pandas-conversion
table_without_struct = pa.Table.from_arrays([start_times, end_times], ["start_timestamp", "end_timestamp"])
df_without_struct = table_without_struct.to_pandas()
print(df_without_struct.iloc[0]) # returns pd.Series with dtype=datetime64[ns]

print("--------")
# however when using pyarrow StructArray, pyarrow converts timestamps with nanosecond precision to ints rather than respecting the conversion table
# this is true even without using Ray
table_with_struct = pa.Table.from_arrays([struct_array], ["timestamps"])
df_with_struct = table_with_struct.to_pandas()
print(df_with_struct["timestamps"].iloc[0]) # returns a dict of ints

@NellyWhads
Copy link
Author

NellyWhads commented Dec 18, 2024

I believe that's the exact issue. I'm unfortunately not sure if pandas will be able to iterate on a fix quickly in this case. Perhaps the ray team has a better relationship with the pandas devs than I (a relative stranger) may have?

A pandas-centric fix also means that users are forced to use a specific version of pandas in their environments.

@srinathk10
Copy link
Contributor

srinathk10 commented Dec 18, 2024

Would you be able to convert the timestamp to int64? This should handle nanosecs precision.

import ray
import pandas as pd

# Initialize Ray
ray.init(ignore_reinit_error=True)

# Sample flat-structured dataset (pandas DataFrame)
df = pd.DataFrame({
    'id': [1, 2, 3],
    'timestamp': pd.to_datetime(['2024-01-01 00:00:00.123456789',
                                 '2024-01-02 00:00:00.987654321',
                                 '2024-01-03 00:00:00.111222333']),
    'value': [10.123456789, 20.987654321, 30.111222333]
})

# Convert timestamp to nanoseconds (int64) before passing to Ray
df['timestamp_ns'] = df['timestamp'].astype('int64')  # Convert to nanoseconds since epoch
df = df.drop(columns=['timestamp'])  # Drop original timestamp if not needed

# Ray data setup
ray_data = ray.data.from_pandas(df)

# Define a function to apply with .map()
def process_data(row):
    # Add 1 nanosecond
    row['timestamp_ns'] += 1
    return row

# Apply the .map() function
result = ray_data.map(process_data)

# Convert back to pandas for inspection
processed_df = result.to_pandas()

# Restore timestamp from nanoseconds
processed_df['timestamp'] = pd.to_datetime(processed_df['timestamp_ns'])  # Convert back to timestamp
processed_df = processed_df.drop(columns=['timestamp_ns'])  # Drop intermediate column if not needed

# Print the original and processed DataFrames
print("Original DataFrame:")
print(df)
print("\nProcessed DataFrame after .map():")
print(processed_df)
Original DataFrame:
   id      value         timestamp_ns
0   1  10.123457  1704067200123456789
1   2  20.987654  1704153600987654321
2   3  30.111222  1704240000111222333

Processed DataFrame after .map():
   id      value                     timestamp
0   1  10.123457 2024-01-01 00:00:00.123456790
1   2  20.987654 2024-01-02 00:00:00.987654322
2   3  30.111222 2024-01-03 00:00:00.111222334

@NellyWhads
Copy link
Author

Unfortunately, this means that whoever uses the data is now responsible for remembering to convert it back to a timestamp (in each transform they map_batch() onto the data).

This is how we currently "patch" the behavior, but it requires an extra layer of API that is rather expensive to maintain instead of just using ray data as a product directly.

@to3i
Copy link

to3i commented Dec 19, 2024

I actually run into the same bug, however, I only became aware when ray data raised an error during pa.concat_tables while unifying the schemas of two blocks.

This happens in cases there the type of the timestamp columns of two blocks are inferred differently:
2024-01-01 00:00:00.123456790 -> 2024-01-01 00:00:00.123456 -> timestamp[us] (microseconds), however,
2024-01-01 00:00:00.123000090 -> 2024-01-01 00:00:00.123 -> timestamp[ms] (milliseconds)

roshankathawate pushed a commit to roshankathawate/ray that referenced this issue Jan 7, 2025
## Why are these changes needed?
Handle pandas timestamp with nanosecs precision

## Related issue number

"Closes ray-project#49297"

---------

Signed-off-by: Srinath Krishnamachari <[email protected]>
roshankathawate pushed a commit to roshankathawate/ray that referenced this issue Jan 9, 2025
## Why are these changes needed?
Handle pandas timestamp with nanosecs precision

## Related issue number

"Closes ray-project#49297"

---------

Signed-off-by: Srinath Krishnamachari <[email protected]>
Signed-off-by: Roshan Kathawate <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something that is supposed to be working; but isn't data Ray Data-related issues P1 Issue that should be fixed within a few weeks
Projects
None yet
Development

Successfully merging a pull request may close this issue.

7 participants