Skip to content

Commit

Permalink
resolve PR comments
Browse files Browse the repository at this point in the history
Signed-off-by: Rucha Apte <[email protected]>
  • Loading branch information
ruchaa-apte committed Nov 6, 2024
1 parent 0cd5d5e commit 839c081
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 36 deletions.
1 change: 1 addition & 0 deletions tutorials/dapt-curation/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ The tutorial follows the steps below:<br>
- Heuristic-based quality filtering (Number of lines, worc count, top N-grams, etc.)
- Fix unicode errors via ftfy
- PII redaction
- GPU accelerated fuzzy and semanctic deduplication
- Step 6: Save the filtered and curated data <br>
- Step 7: Blend datasets and shuffle

Expand Down
56 changes: 31 additions & 25 deletions tutorials/dapt-curation/code/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
filter_text,
redact_code,
fuzzy_dedupe,
semantic_dedupe
semantic_dedupe,
rm_dir
)

import nemo_curator as nc
Expand Down Expand Up @@ -171,33 +172,39 @@ def run_curation_pipeline(args: Any, text_files: str, code_files: str) -> None:
print("Executing the curation pipeline...")
dataset_text = curation_steps_text(orig_dataset_text)
dataset_code = curation_steps_code(orig_dataset_code)

print("Executing the semantic dedupe pipeline...")
gpu_dataset_text = DocumentDataset(dataset_text.df.to_backend("cudf"))
gpu_dataset_code = DocumentDataset(dataset_code.df.to_backend("cudf"))
sem_dedupe_config_yaml_path = os.path.join(CONFIG_DIR, 'text_semantic_dedupe_config.yaml')
duplicates = semantic_dedupe(dataset=gpu_dataset_text, sem_dedupe_config_yaml_path=sem_dedupe_config_yaml_path, type='text')
unique_ids = duplicates.df.to_backend('pandas').compute()['id']
semantic_dataset_text = DocumentDataset(gpu_dataset_text.df[gpu_dataset_text.df.id.isin(unique_ids)])

print("Executing the fuzzy dedupe pipeline...")
fuzzy_dataset_text = fuzzy_dedupe(dataset=semantic_dataset_text, type='text')
fuzzy_dataset_code = fuzzy_dedupe(dataset=gpu_dataset_code, type='code')

fuzzy_dataset_text.df = fuzzy_dataset_text.df.to_backend("pandas")
fuzzy_dataset_code.df = fuzzy_dataset_code.df.to_backend("pandas")

final_dataset_text = fuzzy_dataset_text.persist()
final_dataset_code = fuzzy_dataset_code.persist()


print(f"Original dataset length for text files: {len(orig_dataset_text.df)}")
print(f"After dataprep for text files: {len(dataset_text.df)}")
print(f"After semantic dedupe for text files: {len(semantic_dataset_text.df)}")
print(f"After fuzzy dedupe for text files: {len(fuzzy_dataset_text.df)}")

print(f"Original dataset length for code files: {len(orig_dataset_code.df)}")
print(f"After dataprep length for code files: {len(dataset_code.df)}")
print(f"After fuzzy dedupe: {len(fuzzy_dataset_code.df)}")

if args.device == 'gpu':
print("Executing the semantic dedupe pipeline...")
gpu_dataset_text = DocumentDataset(dataset_text.df.to_backend("cudf"))
gpu_dataset_code = DocumentDataset(dataset_code.df.to_backend("cudf"))
sem_dedupe_config_yaml_path = os.path.join(CONFIG_DIR, 'text_semantic_dedupe_config.yaml')
CACHE_DIR = os.path.join(SCRIPT_DIR_PATH, "cache", "semantic_dedupe", "text")
rm_dir(CACHE_DIR)
duplicates = semantic_dedupe(dataset=gpu_dataset_text, sem_dedupe_config_yaml_path=sem_dedupe_config_yaml_path, cache=CACHE_DIR)
unique_ids = duplicates.df.to_backend('pandas').compute()['id']
semantic_dataset_text = DocumentDataset(gpu_dataset_text.df[gpu_dataset_text.df.id.isin(unique_ids)])
print(f"After semantic dedupe for text files: {len(semantic_dataset_text.df)}")

print("Executing the fuzzy dedupe pipeline...")
CACHE_DIR = os.path.join(SCRIPT_DIR_PATH, "cache", "fuzzy_dedupe", "text")
rm_dir(CACHE_DIR)
fuzzy_dataset_text = fuzzy_dedupe(dataset=semantic_dataset_text, cache=CACHE_DIR)
CACHE_DIR = os.path.join(SCRIPT_DIR_PATH, "cache", "fuzzy_dedupe", "code")
rm_dir(CACHE_DIR)
fuzzy_dataset_code = fuzzy_dedupe(dataset=gpu_dataset_code, cache=CACHE_DIR)

dataset_text.df = fuzzy_dataset_text.df.to_backend("pandas")
dataset_code.df = fuzzy_dataset_code.df.to_backend("pandas")
print(f"After fuzzy dedupe for text files: {len(dataset_text.df)}")
print(f"After fuzzy dedupe: {len(dataset_code.df)}")

final_dataset_text = dataset_text.persist()
final_dataset_code = dataset_code.persist()

print("Writing the results to disk...")

Expand Down Expand Up @@ -259,7 +266,6 @@ def main():
args = ArgumentHelper(parser).add_distributed_args().parse_args()
# Limit the total number of workers to ensure we don't run out of memory.
args.n_workers = min(args.n_workers, 8)
args.device='gpu'
print("Args: ", args)

# Download all the sources and get the list of text and code files.
Expand Down
18 changes: 7 additions & 11 deletions tutorials/dapt-curation/code/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ def exact_dedupe(dataset: DocumentDataset) -> DocumentDataset:
return DocumentDataset(deduped)


def fuzzy_dedupe(dataset: DocumentDataset, type: str = 'text') -> DocumentDataset:
def fuzzy_dedupe(dataset: DocumentDataset, cache: str) -> DocumentDataset:
"""
Removes near-duplicate documents and code lines
Expand All @@ -303,12 +303,8 @@ def fuzzy_dedupe(dataset: DocumentDataset, type: str = 'text') -> DocumentDatase
Returns:
DocumentDataset: The deduplicated dataset.
"""
cache_dir = f"./workspace/fuzzy_dedupe_cache/{type}"
if os.path.isdir(cache_dir):
os.system(f"rm -rf {cache_dir}")

fuzzy_dedup_config = FuzzyDuplicatesConfig(
cache_dir=cache_dir,
cache_dir=cache,
id_field="id",
text_field="text",
seed=42,
Expand All @@ -333,7 +329,7 @@ def fuzzy_dedupe(dataset: DocumentDataset, type: str = 'text') -> DocumentDatase
return DocumentDataset(deduped)


def semantic_dedupe(dataset: DocumentDataset, sem_dedupe_config_yaml_path:str, type:str= 'text'):
def semantic_dedupe(dataset: DocumentDataset, sem_dedupe_config_yaml_path:str, cache_dir:str):
"""
Perform semantic deduplication on the given dataset.
Expand All @@ -344,10 +340,6 @@ def semantic_dedupe(dataset: DocumentDataset, sem_dedupe_config_yaml_path:str, t
Returns:
The deduplicated DocumentDataset.
"""
cache_dir = f"./workspace/semantic_dedupe/{type}"
if os.path.isdir(cache_dir):
os.system(f"rm -rf {cache_dir}")

partition_lengths = dataset.df.map_partitions(len).compute()
non_empty_partitions = [i for i, length in enumerate(partition_lengths) if length > 0]
dataset.df = dataset.df.partitions[non_empty_partitions]
Expand Down Expand Up @@ -399,3 +391,7 @@ def score_document(self, text: str) -> bool:

def keep_document(self, score) -> bool:
return score

def rm_dir(cache_dir):
if os.path.isdir(cache_dir):
os.system(f"rm -rf {cache_dir}")

0 comments on commit 839c081

Please sign in to comment.