From bffdab3d6b210e2254a06b691819943c17d62e39 Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Wed, 13 Nov 2024 15:55:01 -0800 Subject: [PATCH] initial commit Signed-off-by: Sarah Yurick --- README.md | 2 +- docs/user-guide/api/classifiers.rst | 5 +- docs/user-guide/cpuvsgpu.rst | 1 + .../distributeddataclassification.rst | 9 +- .../classifiers/task_complexity_example.py | 66 +++++++++ nemo_curator/classifiers/__init__.py | 2 + nemo_curator/classifiers/task_complexity.py | 137 ++++++++++++++++++ .../task_complexity_classifier_inference.py | 112 ++++++++++++++ pyproject.toml | 1 + 9 files changed, 332 insertions(+), 3 deletions(-) create mode 100644 examples/classifiers/task_complexity_example.py create mode 100644 nemo_curator/classifiers/task_complexity.py create mode 100644 nemo_curator/scripts/classifiers/task_complexity_classifier_inference.py diff --git a/README.md b/README.md index 5cba9d103..0c9e0afc1 100644 --- a/README.md +++ b/README.md @@ -28,7 +28,7 @@ All of our text pipelines have great multilingual support. - [Heuristic Filtering](https://docs.nvidia.com/nemo-framework/user-guide/latest/datacuration/qualityfiltering.html) - Classifier Filtering - [fastText](https://docs.nvidia.com/nemo-framework/user-guide/latest/datacuration/qualityfiltering.html) - - GPU-Accelerated models: [Domain, Quality, and Safety Classification](https://docs.nvidia.com/nemo-framework/user-guide/latest/datacuration/distributeddataclassification.html) + - GPU-Accelerated models: [Domain, Quality, Safety, and Task-Complexity Classification](https://docs.nvidia.com/nemo-framework/user-guide/latest/datacuration/distributeddataclassification.html) - **GPU-Accelerated Deduplication** - [Exact Deduplication](https://docs.nvidia.com/nemo-framework/user-guide/latest/datacuration/gpudeduplication.html) - [Fuzzy Deduplication](https://docs.nvidia.com/nemo-framework/user-guide/latest/datacuration/gpudeduplication.html) via MinHash Locality Sensitive Hashing diff --git a/docs/user-guide/api/classifiers.rst b/docs/user-guide/api/classifiers.rst index fa468af27..99b9661fa 100644 --- a/docs/user-guide/api/classifiers.rst +++ b/docs/user-guide/api/classifiers.rst @@ -12,4 +12,7 @@ Classifiers :members: .. autoclass:: nemo_curator.classifiers.AegisClassifier - :members: \ No newline at end of file + :members: + +.. autoclass:: nemo_curator.classifiers.TaskComplexityClassifier + :members: diff --git a/docs/user-guide/cpuvsgpu.rst b/docs/user-guide/cpuvsgpu.rst index 683723b29..0002a97a3 100644 --- a/docs/user-guide/cpuvsgpu.rst +++ b/docs/user-guide/cpuvsgpu.rst @@ -69,6 +69,7 @@ The following NeMo Curator modules are GPU based. * Domain Classification * Quality Classification + * Task-Complexity Classification GPU modules store the ``DocumentDataset`` using a ``cudf`` backend instead of a ``pandas`` one. To read a dataset into GPU memory, one could use the following function call. diff --git a/docs/user-guide/distributeddataclassification.rst b/docs/user-guide/distributeddataclassification.rst index b411896be..10fb00543 100644 --- a/docs/user-guide/distributeddataclassification.rst +++ b/docs/user-guide/distributeddataclassification.rst @@ -15,7 +15,7 @@ NeMo Curator provides a module to help users run inference with pre-trained mode This is achieved by chunking the datasets across multiple computing nodes, each equipped with multiple GPUs, to accelerate the classification task in a distributed manner. Since the classification of a single text document is independent of other documents within the dataset, we can distribute the workload across multiple nodes and GPUs to perform parallel processing. -Domain, quality, content safety, and educational content models are tasks we include as examples within our module. +Domain, quality, content safety, educational content, and task-complexity models are tasks we include as examples within our module. Here, we summarize why each is useful for training an LLM: @@ -27,6 +27,8 @@ Here, we summarize why each is useful for training an LLM: - The **FineWeb Educational Content Classifier** focuses on identifying and prioritizing educational material within datasets. This classifier is especially useful for training LLMs on specialized educational content, which can improve their performance on knowledge-intensive tasks. Models trained on high-quality educational content demonstrate enhanced capabilities on academic benchmarks such as MMLU and ARC, showcasing the classifier's impact on improving the knowledge-intensive task performance of LLMs. +- The **Task-Complexity Classifier** TODO + ----------------------------------------- Usage ----------------------------------------- @@ -178,6 +180,11 @@ For example, to create a dataset with only highly educational content (scores 4 high_edu_dataset = result_dataset[result_dataset["fineweb-edu-score-int"] >= 4] high_edu_dataset.to_json("high_educational_content/") +Task-Complexity Classifier +^^^^^^^^^^^^^^^^^^^^^^^^^^ + +TODO + ----------------------------------------- CrossFit Integration ----------------------------------------- diff --git a/examples/classifiers/task_complexity_example.py b/examples/classifiers/task_complexity_example.py new file mode 100644 index 000000000..c8408728d --- /dev/null +++ b/examples/classifiers/task_complexity_example.py @@ -0,0 +1,66 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import time + +from nemo_curator.classifiers import TaskComplexityClassifier +from nemo_curator.datasets import DocumentDataset +from nemo_curator.utils.distributed_utils import get_client +from nemo_curator.utils.script_utils import ArgumentHelper + + +def main(args): + global_st = time.time() + + # Input can be a string or list + input_file_path = "/path/to/data" + output_file_path = "./" + + client_args = ArgumentHelper.parse_client_args(args) + client_args["cluster_type"] = "gpu" + client = get_client(**client_args) + + input_dataset = DocumentDataset.read_json( + input_file_path, backend="cudf", add_filename=True + ) + + # TODO: filter_by=[] + task_complexity_classifier = TaskComplexityClassifier() + result_dataset = task_complexity_classifier(dataset=input_dataset) + + result_dataset.to_json(output_file_dir=output_file_path, write_to_filename=True) + + global_et = time.time() + print( + f"Total time taken for task-complexity classifier inference: {global_et-global_st} s", + flush=True, + ) + + client.close() + + +def attach_args( + parser=argparse.ArgumentParser( + formatter_class=argparse.ArgumentDefaultsHelpFormatter + ), +): + argumentHelper = ArgumentHelper(parser) + argumentHelper.add_distributed_classifier_cluster_args() + + return argumentHelper.parser + + +if __name__ == "__main__": + main(attach_args().parse_args()) diff --git a/nemo_curator/classifiers/__init__.py b/nemo_curator/classifiers/__init__.py index f10d63c15..8e297217c 100644 --- a/nemo_curator/classifiers/__init__.py +++ b/nemo_curator/classifiers/__init__.py @@ -19,10 +19,12 @@ from .domain import DomainClassifier from .fineweb_edu import FineWebEduClassifier from .quality import QualityClassifier +from .task_complexity import TaskComplexityClassifier __all__ = [ "DomainClassifier", "QualityClassifier", "AegisClassifier", "FineWebEduClassifier", + "TaskComplexityClassifier", ] diff --git a/nemo_curator/classifiers/task_complexity.py b/nemo_curator/classifiers/task_complexity.py new file mode 100644 index 000000000..599f11e0a --- /dev/null +++ b/nemo_curator/classifiers/task_complexity.py @@ -0,0 +1,137 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import os +from dataclasses import dataclass +from typing import List, Optional + +os.environ["RAPIDS_NO_INITIALIZE"] = "1" +from crossfit.backend.torch.hf.model import HFModel +from transformers import AutoConfig, AutoTokenizer + +from nemo_curator.classifiers.base import ( + DistributedDataClassifier, + HFDeberta, + _get_suggest_memory_for_classifier, + _run_classifier_helper, +) +from nemo_curator.datasets import DocumentDataset + +TASK_COMPLEXITY_IDENTIFIER = "TODO" + + +@dataclass +class TaskComplexityModelConfig: + model: str = "microsoft/deberta-v3-base" + fc_dropout: float = 0.2 + max_len: int = 512 + + +class TaskComplexityModel(HFModel): + def __init__( + self, + config: TaskComplexityModelConfig, + autocast: bool = False, + max_mem_gb: Optional[int] = None, + ): + self.config = config + self.autocast = autocast + if max_mem_gb is None: + max_mem_gb = _get_suggest_memory_for_classifier() + + super().__init__(self.config.model, max_mem_gb=max_mem_gb) + + def load_model(self, device: str = "cuda"): + model = HFDeberta.from_pretrained(TASK_COMPLEXITY_IDENTIFIER) + model.set_autocast(self.autocast) + model = model.to(device) + return model.eval() + + def load_tokenizer(self): + return AutoTokenizer.from_pretrained(TASK_COMPLEXITY_IDENTIFIER) + + def load_config(self): + return AutoConfig.from_pretrained(TASK_COMPLEXITY_IDENTIFIER) + + +class TaskComplexityClassifier(DistributedDataClassifier): + """ + TaskComplexityClassifier TODO. + This class is optimized for running on multi-node, multi-GPU setups to enable fast and efficient inference on large datasets. + + Attributes: + filter_by (list[str], optional): The classes to filter the dataset by. + If None, all classes will be included. Defaults to None. + batch_size (int): The number of samples per batch for inference. Defaults to 256. + text_field (str): The field in the dataset that should be classified. + # TODO: Clarify output column names + pred_column (str): The column name where predictions will be stored. Defaults to "pred". + prob_column (str, optional): The column name where prediction probabilities will be stored. Defaults to None. + max_chars (int): The maximum number of characters in each document to consider for classification. Defaults to 2000. + device_type (str): The type of device to use for inference, either "cuda" or "cpu". Defaults to "cuda". + autocast (bool): Whether to use mixed precision for faster inference. Defaults to True. + max_mem_gb (int, optional): The maximum amount of memory in GB to allocate for the model. If None, + it defaults to the available GPU memory minus 4 GB. + + """ + + def __init__( + self, + filter_by: Optional[List[str]] = None, + batch_size: int = 256, + text_field: str = "text", + pred_column: str = "pred", + prob_column: Optional[str] = None, + max_chars: int = 2000, + device_type: str = "cuda", + autocast: bool = True, + max_mem_gb: Optional[int] = None, + ): + config = AutoConfig.from_pretrained(TASK_COMPLEXITY_IDENTIFIER) + + self.text_field = text_field + self.prob_column = prob_column + self.labels = list(config.label2id.keys()) + self.labels.sort(key=lambda x: config.label2id[x]) + self.out_dim = len(self.labels) + + model = TaskComplexityModel( + config=TaskComplexityModelConfig, autocast=autocast, max_mem_gb=max_mem_gb + ) + + super().__init__( + model=model, + labels=self.labels, + filter_by=filter_by, + batch_size=batch_size, + out_dim=self.out_dim, + pred_column=pred_column, + max_chars=max_chars, + device_type=device_type, + autocast=autocast, + ) + + def _run_classifier(self, dataset: DocumentDataset) -> DocumentDataset: + print("Starting task-complexity classifier inference", flush=True) + df = dataset.df + df = _run_classifier_helper( + df=df, + model=self.model, + labels=self.labels, + max_chars=self.max_chars, + batch_size=self.batch_size, + label_col=self.pred_column, + text_field=self.text_field, + prob_col=self.prob_column, + ) + return DocumentDataset(df) diff --git a/nemo_curator/scripts/classifiers/task_complexity_classifier_inference.py b/nemo_curator/scripts/classifiers/task_complexity_classifier_inference.py new file mode 100644 index 000000000..c4d5465fe --- /dev/null +++ b/nemo_curator/scripts/classifiers/task_complexity_classifier_inference.py @@ -0,0 +1,112 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import time +import warnings + +os.environ["RAPIDS_NO_INITIALIZE"] = "1" + +from nemo_curator.classifiers import TaskComplexityClassifier +from nemo_curator.datasets import DocumentDataset + +# Get relevant args +from nemo_curator.utils.distributed_utils import get_client, read_data, write_to_disk +from nemo_curator.utils.file_utils import get_remaining_files +from nemo_curator.utils.script_utils import ArgumentHelper + +warnings.filterwarnings("ignore") + + +def main(): + args = ArgumentHelper.parse_distributed_classifier_args( + description="Run task-complexity classifier inference." + ).parse_args() + print(f"Arguments parsed = {args}", flush=True) + client_args = ArgumentHelper.parse_client_args(args) + client_args["cluster_type"] = "gpu" + client = get_client(**client_args) + print("Starting task-complexity classifier inference", flush=True) + global_st = time.time() + files_per_run = len(client.scheduler_info()["workers"]) * 2 + + if not os.path.exists(args.output_data_dir): + os.makedirs(args.output_data_dir) + + # Some times jsonl files are stored as .json + # So to handle that case we can pass the input_file_extension + if args.input_file_extension is not None: + input_file_extension = args.input_file_extension + else: + input_file_extension = args.input_file_type + + input_files = get_remaining_files( + args.input_data_dir, args.output_data_dir, input_file_extension + ) + print(f"Total input files {len(input_files)}", flush=True) + + if args.input_file_type == "pickle": + add_filename = False + else: + add_filename = True + + task_complexity_classifier = TaskComplexityClassifier( + text_field=args.input_text_field, + max_chars=args.max_chars, + batch_size=args.batch_size, + autocast=args.autocast, + max_mem_gb=args.max_mem_gb_classifier, + ) + + for file_batch_id, i in enumerate(range(0, len(input_files), files_per_run)): + batch_st = time.time() + current_batch_files = input_files[i : i + files_per_run] + print( + f"File Batch ID {file_batch_id}: total input files {len(current_batch_files)}", + flush=True, + ) + df = read_data( + input_files=current_batch_files, + file_type=args.input_file_type, + add_filename=add_filename, + ) + df = task_complexity_classifier(DocumentDataset(df)).df + print(f"Total input Dask DataFrame partitions {df.npartitions}", flush=True) + + write_to_disk( + df=df, + output_file_dir=args.output_data_dir, + write_to_filename=add_filename, + output_type=args.output_file_type, + ) + batch_et = time.time() + print( + f"File Batch ID {file_batch_id}: completed in {batch_et-batch_st} seconds", + flush=True, + ) + + global_et = time.time() + print( + f"Total time taken for task-complexity classifier inference: {global_et-global_st} s", + flush=True, + ) + client.close() + + +def console_script(): + main() + + +if __name__ == "__main__": + console_script() diff --git a/pyproject.toml b/pyproject.toml index a69dd7160..897f8c76b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -139,6 +139,7 @@ domain_classifier_inference = "nemo_curator.scripts.classifiers.domain_classifie quality_classifier_inference = "nemo_curator.scripts.classifiers.quality_classifier_inference:console_script" aegis_classifier_inference = "nemo_curator.scripts.classifiers.aegis_classifier_inference:console_script" fineweb_edu_classifier_inference = "nemo_curator.scripts.classifiers.fineweb_edu_classifier_inference:console_script" +task_complexity_classifier_inference = "nemo_curator.scripts.classifiers.task_complexity_classifier_inference:console_script" verify_classification_results = "nemo_curator.scripts.verify_classification_results:console_script" blend_datasets = "nemo_curator.scripts.blend_datasets:console_script" semdedup_extract_embeddings = "nemo_curator.scripts.semdedup.compute_embeddings:console_script"