Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] Use AggregateFn instead of groupby.count for unique() #49298

Open
wingkitlee0 opened this issue Dec 17, 2024 · 0 comments · May be fixed by #49296
Open

[Data] Use AggregateFn instead of groupby.count for unique() #49298

wingkitlee0 opened this issue Dec 17, 2024 · 0 comments · May be fixed by #49296
Assignees
Labels
data Ray Data-related issues enhancement Request for new feature and/or capability P2 Important issue, but not time-critical

Comments

@wingkitlee0
Copy link
Contributor

Description

I was trying AggregateFn instead of groupby.count for unique() the other day. It was about 10x faster:

Groupby did a sort ( O(nlog n / parallelism)), where as AggregateFn should be of O(n/parallelism).

I got a draft PR using the existing aggregation mechanics, but the following example script essentially did the same:

import numpy as np
import ray
import ray.data
from ray.data.aggregate import AggregateFn

def experiment1(random_state=None) -> int:
    if random_state is None:
        random_state = np.random.default_rng()

    ds = ray.data.from_items(random_state.integers(0, 10000, size=1_000_000))

    unique_items = ds.unique("item")

    return unique_items

def get_unique_func(col: str) -> AggregateFn:
    return AggregateFn(
        init=lambda x: set(),
        merge=lambda a, b: list(set(a) | set(b)),
        accumulate_block=lambda a, x: a | set(x[col]),
        name="unique",
        finalize=lambda a: a,
    )

def experiment2(random_state=None) -> int:
    if random_state is None:
        random_state = np.random.default_rng()

    ds = ray.data.from_items(random_state.integers(0, 10000, size=1_000_000))

    unique_items = ds.aggregate(get_unique_func("item"))["unique"]

    return unique_items

def main():

    random_state = np.random.default_rng(1234)

    ray.init()

    unique_items = experiment1(random_state)
    print(f"Number of unique items: {len(unique_items)}")
    print(f"{type(unique_items)=}")

    unique_items = experiment2(random_state)
    print(f"Number of unique items: {len(unique_items)}")
    print(f"{type(unique_items)=}")

if __name__ == "__main__":
    main()

Use case

It's useful to get some basic statistics of a dataset very quickly.

@wingkitlee0 wingkitlee0 added enhancement Request for new feature and/or capability triage Needs triage (eg: priority, bug/not-bug, and owning component) labels Dec 17, 2024
@wingkitlee0 wingkitlee0 changed the title [Data] Using AggregateFn instead of groupby.count for unique() [Data] Use AggregateFn instead of groupby.count for unique() Dec 17, 2024
@wingkitlee0 wingkitlee0 linked a pull request Dec 17, 2024 that will close this issue
8 tasks
@jcotant1 jcotant1 added the data Ray Data-related issues label Dec 17, 2024
@richardliaw richardliaw added P2 Important issue, but not time-critical and removed triage Needs triage (eg: priority, bug/not-bug, and owning component) labels Dec 17, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
data Ray Data-related issues enhancement Request for new feature and/or capability P2 Important issue, but not time-critical
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants