Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task-Complexity Classifier #364

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ All of our text pipelines have great multilingual support.
- [Heuristic Filtering](https://docs.nvidia.com/nemo-framework/user-guide/latest/datacuration/qualityfiltering.html)
- Classifier Filtering
- [fastText](https://docs.nvidia.com/nemo-framework/user-guide/latest/datacuration/qualityfiltering.html)
- GPU-Accelerated models: [Domain, Quality, and Safety Classification](https://docs.nvidia.com/nemo-framework/user-guide/latest/datacuration/distributeddataclassification.html)
- GPU-Accelerated models: [Domain, Quality, Safety, and Task-Complexity Classification](https://docs.nvidia.com/nemo-framework/user-guide/latest/datacuration/distributeddataclassification.html)
- **GPU-Accelerated Deduplication**
- [Exact Deduplication](https://docs.nvidia.com/nemo-framework/user-guide/latest/datacuration/gpudeduplication.html)
- [Fuzzy Deduplication](https://docs.nvidia.com/nemo-framework/user-guide/latest/datacuration/gpudeduplication.html) via MinHash Locality Sensitive Hashing
Expand Down
5 changes: 4 additions & 1 deletion docs/user-guide/api/classifiers.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,7 @@ Classifiers
:members:

.. autoclass:: nemo_curator.classifiers.AegisClassifier
:members:
:members:

.. autoclass:: nemo_curator.classifiers.TaskComplexityClassifier
:members:
1 change: 1 addition & 0 deletions docs/user-guide/cpuvsgpu.rst
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ The following NeMo Curator modules are GPU based.

* Domain Classification
* Quality Classification
* Task-Complexity Classification

GPU modules store the ``DocumentDataset`` using a ``cudf`` backend instead of a ``pandas`` one.
To read a dataset into GPU memory, one could use the following function call.
Expand Down
9 changes: 8 additions & 1 deletion docs/user-guide/distributeddataclassification.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ NeMo Curator provides a module to help users run inference with pre-trained mode
This is achieved by chunking the datasets across multiple computing nodes, each equipped with multiple GPUs, to accelerate the classification task in a distributed manner.
Since the classification of a single text document is independent of other documents within the dataset, we can distribute the workload across multiple nodes and GPUs to perform parallel processing.

Domain, quality, content safety, and educational content models are tasks we include as examples within our module.
Domain, quality, content safety, educational content, and task-complexity models are tasks we include as examples within our module.

Here, we summarize why each is useful for training an LLM:

Expand All @@ -27,6 +27,8 @@ Here, we summarize why each is useful for training an LLM:

- The **FineWeb Educational Content Classifier** focuses on identifying and prioritizing educational material within datasets. This classifier is especially useful for training LLMs on specialized educational content, which can improve their performance on knowledge-intensive tasks. Models trained on high-quality educational content demonstrate enhanced capabilities on academic benchmarks such as MMLU and ARC, showcasing the classifier's impact on improving the knowledge-intensive task performance of LLMs.

- The **Task-Complexity Classifier** TODO

-----------------------------------------
Usage
-----------------------------------------
Expand Down Expand Up @@ -178,6 +180,11 @@ For example, to create a dataset with only highly educational content (scores 4
high_edu_dataset = result_dataset[result_dataset["fineweb-edu-score-int"] >= 4]
high_edu_dataset.to_json("high_educational_content/")

Task-Complexity Classifier
^^^^^^^^^^^^^^^^^^^^^^^^^^

TODO

-----------------------------------------
CrossFit Integration
-----------------------------------------
Expand Down
66 changes: 66 additions & 0 deletions examples/classifiers/task_complexity_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import argparse
import time

from nemo_curator.classifiers import TaskComplexityClassifier
from nemo_curator.datasets import DocumentDataset
from nemo_curator.utils.distributed_utils import get_client
from nemo_curator.utils.script_utils import ArgumentHelper


def main(args):
global_st = time.time()

# Input can be a string or list
input_file_path = "/path/to/data"
output_file_path = "./"

client_args = ArgumentHelper.parse_client_args(args)
client_args["cluster_type"] = "gpu"
client = get_client(**client_args)

input_dataset = DocumentDataset.read_json(
input_file_path, backend="cudf", add_filename=True
)

# TODO: filter_by=[]
task_complexity_classifier = TaskComplexityClassifier()
result_dataset = task_complexity_classifier(dataset=input_dataset)

result_dataset.to_json(output_file_dir=output_file_path, write_to_filename=True)

global_et = time.time()
print(
f"Total time taken for task-complexity classifier inference: {global_et-global_st} s",
flush=True,
)

client.close()


def attach_args(
parser=argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter
),
):
argumentHelper = ArgumentHelper(parser)
argumentHelper.add_distributed_classifier_cluster_args()

return argumentHelper.parser


if __name__ == "__main__":
main(attach_args().parse_args())
2 changes: 2 additions & 0 deletions nemo_curator/classifiers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
from .domain import DomainClassifier
from .fineweb_edu import FineWebEduClassifier
from .quality import QualityClassifier
from .task_complexity import TaskComplexityClassifier

__all__ = [
"DomainClassifier",
"QualityClassifier",
"AegisClassifier",
"FineWebEduClassifier",
"TaskComplexityClassifier",
]
137 changes: 137 additions & 0 deletions nemo_curator/classifiers/task_complexity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from dataclasses import dataclass
from typing import List, Optional

os.environ["RAPIDS_NO_INITIALIZE"] = "1"
from crossfit.backend.torch.hf.model import HFModel
from transformers import AutoConfig, AutoTokenizer

from nemo_curator.classifiers.base import (
DistributedDataClassifier,
HFDeberta,
_get_suggest_memory_for_classifier,
_run_classifier_helper,
)
from nemo_curator.datasets import DocumentDataset

TASK_COMPLEXITY_IDENTIFIER = "TODO"


@dataclass
class TaskComplexityModelConfig:
model: str = "microsoft/deberta-v3-base"
fc_dropout: float = 0.2
max_len: int = 512


class TaskComplexityModel(HFModel):
def __init__(
self,
config: TaskComplexityModelConfig,
autocast: bool = False,
max_mem_gb: Optional[int] = None,
):
self.config = config
self.autocast = autocast
if max_mem_gb is None:
max_mem_gb = _get_suggest_memory_for_classifier()

super().__init__(self.config.model, max_mem_gb=max_mem_gb)

def load_model(self, device: str = "cuda"):
model = HFDeberta.from_pretrained(TASK_COMPLEXITY_IDENTIFIER)
model.set_autocast(self.autocast)
model = model.to(device)
return model.eval()

def load_tokenizer(self):
return AutoTokenizer.from_pretrained(TASK_COMPLEXITY_IDENTIFIER)

def load_config(self):
return AutoConfig.from_pretrained(TASK_COMPLEXITY_IDENTIFIER)


class TaskComplexityClassifier(DistributedDataClassifier):
"""
TaskComplexityClassifier TODO.
This class is optimized for running on multi-node, multi-GPU setups to enable fast and efficient inference on large datasets.

Attributes:
filter_by (list[str], optional): The classes to filter the dataset by.
If None, all classes will be included. Defaults to None.
batch_size (int): The number of samples per batch for inference. Defaults to 256.
text_field (str): The field in the dataset that should be classified.
# TODO: Clarify output column names
pred_column (str): The column name where predictions will be stored. Defaults to "pred".
prob_column (str, optional): The column name where prediction probabilities will be stored. Defaults to None.
Comment on lines +77 to +79
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am anticipating refactoring the TaskComplexityClassifier more, based on our internal model card reference. I currently do not have access to the model itself, so this is mostly placeholder code based off of the DomainClassifier.

max_chars (int): The maximum number of characters in each document to consider for classification. Defaults to 2000.
device_type (str): The type of device to use for inference, either "cuda" or "cpu". Defaults to "cuda".
autocast (bool): Whether to use mixed precision for faster inference. Defaults to True.
max_mem_gb (int, optional): The maximum amount of memory in GB to allocate for the model. If None,
it defaults to the available GPU memory minus 4 GB.

"""

def __init__(
self,
filter_by: Optional[List[str]] = None,
batch_size: int = 256,
text_field: str = "text",
pred_column: str = "pred",
prob_column: Optional[str] = None,
max_chars: int = 2000,
device_type: str = "cuda",
autocast: bool = True,
max_mem_gb: Optional[int] = None,
):
config = AutoConfig.from_pretrained(TASK_COMPLEXITY_IDENTIFIER)

self.text_field = text_field
self.prob_column = prob_column
self.labels = list(config.label2id.keys())
self.labels.sort(key=lambda x: config.label2id[x])
self.out_dim = len(self.labels)

model = TaskComplexityModel(
config=TaskComplexityModelConfig, autocast=autocast, max_mem_gb=max_mem_gb
)

super().__init__(
model=model,
labels=self.labels,
filter_by=filter_by,
batch_size=batch_size,
out_dim=self.out_dim,
pred_column=pred_column,
max_chars=max_chars,
device_type=device_type,
autocast=autocast,
)

def _run_classifier(self, dataset: DocumentDataset) -> DocumentDataset:
print("Starting task-complexity classifier inference", flush=True)
df = dataset.df
df = _run_classifier_helper(
df=df,
model=self.model,
labels=self.labels,
max_chars=self.max_chars,
batch_size=self.batch_size,
label_col=self.pred_column,
text_field=self.text_field,
prob_col=self.prob_column,
)
return DocumentDataset(df)
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import time
import warnings

os.environ["RAPIDS_NO_INITIALIZE"] = "1"

from nemo_curator.classifiers import TaskComplexityClassifier
from nemo_curator.datasets import DocumentDataset

# Get relevant args
from nemo_curator.utils.distributed_utils import get_client, read_data, write_to_disk
from nemo_curator.utils.file_utils import get_remaining_files
from nemo_curator.utils.script_utils import ArgumentHelper

warnings.filterwarnings("ignore")


def main():
args = ArgumentHelper.parse_distributed_classifier_args(
description="Run task-complexity classifier inference."
).parse_args()
print(f"Arguments parsed = {args}", flush=True)
client_args = ArgumentHelper.parse_client_args(args)
client_args["cluster_type"] = "gpu"
client = get_client(**client_args)
print("Starting task-complexity classifier inference", flush=True)
global_st = time.time()
files_per_run = len(client.scheduler_info()["workers"]) * 2

if not os.path.exists(args.output_data_dir):
os.makedirs(args.output_data_dir)

# Some times jsonl files are stored as .json
# So to handle that case we can pass the input_file_extension
if args.input_file_extension is not None:
input_file_extension = args.input_file_extension
else:
input_file_extension = args.input_file_type

input_files = get_remaining_files(
args.input_data_dir, args.output_data_dir, input_file_extension
)
print(f"Total input files {len(input_files)}", flush=True)

if args.input_file_type == "pickle":
add_filename = False
else:
add_filename = True

task_complexity_classifier = TaskComplexityClassifier(
text_field=args.input_text_field,
max_chars=args.max_chars,
batch_size=args.batch_size,
autocast=args.autocast,
max_mem_gb=args.max_mem_gb_classifier,
)

for file_batch_id, i in enumerate(range(0, len(input_files), files_per_run)):
batch_st = time.time()
current_batch_files = input_files[i : i + files_per_run]
print(
f"File Batch ID {file_batch_id}: total input files {len(current_batch_files)}",
flush=True,
)
df = read_data(
input_files=current_batch_files,
file_type=args.input_file_type,
add_filename=add_filename,
)
df = task_complexity_classifier(DocumentDataset(df)).df
print(f"Total input Dask DataFrame partitions {df.npartitions}", flush=True)

write_to_disk(
df=df,
output_file_dir=args.output_data_dir,
write_to_filename=add_filename,
output_type=args.output_file_type,
)
batch_et = time.time()
print(
f"File Batch ID {file_batch_id}: completed in {batch_et-batch_st} seconds",
flush=True,
)

global_et = time.time()
print(
f"Total time taken for task-complexity classifier inference: {global_et-global_st} s",
flush=True,
)
client.close()


def console_script():
main()


if __name__ == "__main__":
console_script()
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ domain_classifier_inference = "nemo_curator.scripts.classifiers.domain_classifie
quality_classifier_inference = "nemo_curator.scripts.classifiers.quality_classifier_inference:console_script"
aegis_classifier_inference = "nemo_curator.scripts.classifiers.aegis_classifier_inference:console_script"
fineweb_edu_classifier_inference = "nemo_curator.scripts.classifiers.fineweb_edu_classifier_inference:console_script"
task_complexity_classifier_inference = "nemo_curator.scripts.classifiers.task_complexity_classifier_inference:console_script"
verify_classification_results = "nemo_curator.scripts.verify_classification_results:console_script"
blend_datasets = "nemo_curator.scripts.blend_datasets:console_script"
semdedup_extract_embeddings = "nemo_curator.scripts.semdedup.compute_embeddings:console_script"
Expand Down