Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add blocksize to DocumentDataset.read_* that uses dask_cudf.read_* #285

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

praateekmahajan
Copy link
Collaborator

@praateekmahajan praateekmahajan commented Oct 8, 2024

This PR introduces a new codepath that leverages dask_cudf.read_json / read_parquet directly rather than our existing from_map implementation.

Important

The newer implementation supports fewer use-cases compared to original, however it supports the most frequently used implementations (i.e jsonl and parquet files without add_filename)

Caution

The newer implementation and older implementation have different outputs for the same input specifically add_filename and input_meta

Differences between Old vs New

Discussion Points New Implementation Existing Implementation
Underlying API dask_cudf.read_* dd.from_map(read_single_partition)
backend Only supports cudf Works with pandas / cudf
filetype Only supports jsonl and parquet Supports json, along with jsonl and parquet
add_filename 1. Only when filetype is jsonl
2. Output contains complete filepath rather than just filename
1. Supports all filetypes, however when filetype is jsonl, it now goes to the new implementation code path
2. Output contains only filename
input_meta prune_columns and dtype is still parsed from it however the schema returned isn't affected The returned schema only contains the keys in input_meta that's because of the behavior of from_map
meta as **kwarg Not required as the first file is used to parse the schema Is required otherwise can result in OOM (see benchmark row 2 below)
columns Works as expected causes slow down atleast when filetype is json

Benchmarking

Reading 6000 files of ~25mb each, i.e ~145gb over 8GPUs

add_filename partition_size input_meta Using dask.read_json #285 Providing meta in
dask.from_map #291
False 2gb Specified 24.9 s ± 330 ms 25.9 s ± 520 ms
False 2gb None 24.9 s ± 470 ms OOM
True 2gb Specified 55 s ± 177 ms 53.2 s ± 350 ms per loop
True 2gb None 54.8 s ± 248 ms 64s ± 289 ms per loop
Using dask.read_json #285 Providing meta in dask.from_map #291
image image
First two are add_filename=False, latter two are True where we see a lower utilization The first one is add_filename=False, and the latter are True where we see a lower utilization

Usage

# Add snippet demonstrating usage

Checklist

  • I am familiar with the Contributing Guide.
  • New or Existing tests cover these changes.
  • The documentation is up to date with these changes.

Signed-off-by: Praateek <[email protected]>
@praateekmahajan praateekmahajan changed the title Trying dasks' read_json Trying dask_cudf's read_json Oct 8, 2024
@praateekmahajan praateekmahajan changed the title Trying dask_cudf's read_json [DRAFT] Trying dask_cudf's read_json Oct 9, 2024
@praateekmahajan praateekmahajan changed the title [DRAFT] Trying dask_cudf's read_json [DRAFT] Trying dask_cudf's read_json / read_parquet Oct 9, 2024
Signed-off-by: Praateek <[email protected]>
Signed-off-by: Praateek <[email protected]>
@praateekmahajan praateekmahajan changed the title [DRAFT] Trying dask_cudf's read_json / read_parquet Add blocksize to DocumentDataset.read_* that uses dask_cudf.read_* Nov 15, 2024
@praateekmahajan praateekmahajan marked this pull request as ready for review November 15, 2024 09:34
columns: List[str],
add_filename: bool,
) -> Union[dd.DataFrame, dask_cudf.DataFrame]:
# TODO : Reviewer TAL if filetype check is needed
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Originally I added the "json" and "jsonl" check because columns is already added as a kwarg to read_parquet, so I didn't want to filter twice in the Parquet case.

@@ -43,7 +43,8 @@ def read_json(
cls,
input_files: Union[str, List[str]],
backend: str = "pandas",
files_per_partition: int = 1,
files_per_partition: Optional[int] = None,
blocksize: Optional[str] = "1gb",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you mentioned it might be nice to add some of the information from https://github.com/rapidsai/cudf/blob/81cd4a00f8ae0dcde359ac11b53a9b3d855e50e2/docs/dask_cudf/source/best_practices.rst#use-parquet to NeMo Curator's best practices.

Also might be of interest for you: rapidsai/cudf#17250

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants