diff --git a/demo/curation/local_cuartion_demo.py b/demo/curation/local_cuartion_demo.py index aa06b32..7261b4f 100644 --- a/demo/curation/local_cuartion_demo.py +++ b/demo/curation/local_cuartion_demo.py @@ -49,7 +49,7 @@ class AnnotationData(BaseModel): extract_json=annotation_data.extract_json, kpi_mapping_path=annotation_data.kpi_mapping_path, create_neg_samples=True, - neg_pos_ratio=1, + neg_sample_rate=1, ).create_curator_df() curator.to_csv(output_file_path_main, index=False) diff --git a/src/osc_transformer_presteps/relevance_detection_dataset_curation/curator.py b/src/osc_transformer_presteps/relevance_detection_dataset_curation/curator.py index 48540b7..4b6b38c 100644 --- a/src/osc_transformer_presteps/relevance_detection_dataset_curation/curator.py +++ b/src/osc_transformer_presteps/relevance_detection_dataset_curation/curator.py @@ -8,6 +8,7 @@ import re from typing import List, Tuple, Optional import pandas as pd +from Levenshtein import distance as levenshtein_distance from pathlib import Path from pydantic import BaseModel, FilePath @@ -28,7 +29,8 @@ class Curator: annotation_folder (str): path to the folder containing annotation files extract_json (str): path to the JSON file containing extracted content kpi_mapping_path (str): path to KPI Mapping csv - neg_pos_ratio (int): ratio of negative to positive examples + + neg_sample_rate (int): number of negative samples to positive examples create_neg_samples (bool): whether to create negative samples """ @@ -38,7 +40,7 @@ def __init__( annotation_folder: str, extract_json: Path, kpi_mapping_path: str, - neg_pos_ratio: int = 1, + neg_sample_rate: int = 1, create_neg_samples: bool = False, ) -> None: """Initialize the constructor for Curator object.""" @@ -46,7 +48,7 @@ def __init__( self.extract_json = extract_json self.json_file_name = os.path.basename(extract_json).replace("_output", "") self.kpi_mapping_path = kpi_mapping_path - self.neg_pos_ratio = neg_pos_ratio + self.neg_sample_rate = neg_sample_rate self.create_neg_samples = create_neg_samples self.pdf_content = self.load_pdf_content() @@ -105,7 +107,8 @@ def create_pos_examples( ) -> Tuple[List[Tuple[Optional[str], str]], bool]: """Create positive examples based on the provided row from a DataFrame. - Returns a list of matching sentences or an empty list, along with a flag indicating if sentences were found in the JSON. + Returns a list of matching sentences or an empty list, along with a flag + indicating if sentences were found in the JSON. """ value: str = row["relevant_paragraphs"] cleaned_value: str = self.clean_text(value) @@ -128,9 +131,11 @@ def create_pos_examples( return ([(None, "")], False) # Return with in_json_flag as False source_page = str(row["source_page"]) - match = re.search(r"\d+", source_page) - page_number = match.group() if match else None + if match: + page_number = match.group() + else: + return ([(None, "")], False) if page_number in self.pdf_content: matching_sentences = [ @@ -140,19 +145,43 @@ def create_pos_examples( if any(sentence in para for sentence in sentences) ] - # Flag to know if sentence is available in json or not - in_json_flag = bool(matching_sentences) - return ( - matching_sentences - if matching_sentences - else [(None, sentence) for sentence in sentences], - in_json_flag, - ) + if matching_sentences: + # If matching sentences found, return them with in_json_flag as True + return matching_sentences, True + + # If no exact match found, find the closest paragraph and its ID + closest_para, para_id = self._get_closest_paragraph(sentences, page_number) + + if closest_para: + return [(para_id, closest_para)], True - return ([(None, "")], False) # Return with in_json_flag as False + # If no relevant paragraph found, return with in_json_flag as False + return ([(None, "")], False) + + def _get_closest_paragraph( + self, sentences: List[str], page_number: str + ) -> Tuple[Optional[str], Optional[str]]: + """Find the closest paragraph on the given page and return it with its ID.""" + closest_para = None + closest_para_id = None + min_distance = float("inf") + + # Iterate over paragraphs on the page and compute Levenshtein distance + for key_inner in self.pdf_content[page_number]: + para = self.pdf_content[page_number][key_inner]["paragraph"] + + # Compute the minimum distance between the target sentences and this paragraph + for sentence in sentences: + dist = levenshtein_distance(sentence, para) + if dist < min_distance: + min_distance = dist + closest_para = para + closest_para_id = key_inner # Store the unique paragraph ID + + return closest_para, closest_para_id def create_neg_examples(self, row: pd.Series) -> List[str]: - """Create negative examples based on the provided row from a DataFrame. + """Create negative examples excluding relevant paragraphs or close ones. Returns a list of context paragraphs or an empty list. """ @@ -164,13 +193,32 @@ def create_neg_examples(self, row: pd.Series) -> List[str]: ): return [""] + # Flatten all paragraphs from the PDF content paragraphs = [ self.pdf_content[key_outer][key_inner]["paragraph"] for key_outer in self.pdf_content for key_inner in self.pdf_content[key_outer] ] - context = random.choices(paragraphs[1:], k=self.neg_pos_ratio) + relevant_paragraphs = row["relevant_paragraphs"] + + # Filter out paragraphs that are identical or too similar + def is_similar(paragraph): + return any( + levenshtein_distance(paragraph, rel_para) + <= 5 # Adjust threshold if needed + for rel_para in relevant_paragraphs + ) + + negative_paragraphs = [p for p in paragraphs if not is_similar(p)] + + # Randomly select `neg_sample_rate` paragraphs from the filtered list + context = ( + random.choices(negative_paragraphs, k=self.neg_sample_rate) + if negative_paragraphs + else [""] + ) + return context def create_examples_annotate(self) -> List[pd.DataFrame]: @@ -209,18 +257,21 @@ def create_examples_annotate(self) -> List[pd.DataFrame]: (para, unique_para_id) for unique_para_id, para in pos_examples ] - contexts = [ - (pos_contexts, 1), - ( - [ - (neg_example, None) - for neg_example in self.create_neg_examples(row.copy()) - ] - if self.create_neg_samples - else [("", None)], - 0, - ), - ] + if self.create_neg_samples: + contexts = [ + (pos_contexts, 1), + ( + [ + (neg_example, None) + for neg_example in self.create_neg_examples(row.copy()) + ] + if self.create_neg_samples + else [("", None)], + 0, + ), + ] + else: + contexts = [(pos_contexts, 1)] for context, label in contexts: if ( @@ -278,17 +329,23 @@ def create_curator_df(self) -> pd.DataFrame: kpi_df = pd.read_csv( self.kpi_mapping_path, usecols=["kpi_id", "question"] ) + merged_df = pd.merge(new_df, kpi_df, on="kpi_id", how="left") result_df = merged_df.rename(columns={"answer": "annotation_answer"}) - result_df.loc[result_df["label"] == 0, "in_extraction_data_flag"] = ( - bool(0) - ) + # result_df.loc[result_df["label"] == 0, "in_extraction_data_flag"] = ( + # bool(0) + # ) result_df.loc[ result_df["in_extraction_data_flag"] == 0, "unique_paragraph_id" ] = None + result_df["annotation_file_name"] = Path(self.annotation_folder).name + result_df["annotation_file_row"] += 2 + result_df = result_df[ + result_df["context"].notna() & (result_df["context"] != "") + ] # Reorder columns as specified in columns_order result_df = result_df[columns_order] diff --git a/src/osc_transformer_presteps/run_local_relevance_curation.py b/src/osc_transformer_presteps/run_local_relevance_curation.py index eea5ca1..beb0c09 100644 --- a/src/osc_transformer_presteps/run_local_relevance_curation.py +++ b/src/osc_transformer_presteps/run_local_relevance_curation.py @@ -70,17 +70,20 @@ def run_local_curation( kpi_mapping_file_path: str = typer.Argument( help="This is the path to kpi_mapping.csv file" ), + output_path: str = typer.Argument( + help="Path to directory to save the output curated file.", + ), create_neg_samples: bool = typer.Option( False, "--create_neg_samples", show_default=True, help="Boolean to declare if you want to include negative samples in your dataset.", ), - neg_pos_ratio: int = typer.Option( + neg_sample_rate: int = typer.Option( 1, - "--neg_pos_ratio", + "--neg_sample_rate", show_default=True, - help="Ratio of number of negative samples you want per positive samples.", + help="Number of negative samples you want per positive samples.", ), logs_folder: str = typer.Option( default=None, @@ -152,7 +155,7 @@ def resolve_path(path_name: str, cwd: Path) -> Path: annotation_file_path=annotation_temp, kpi_mapping_file_path=kpi_mapping_temp, create_neg_samples=create_neg_samples, - neg_pos_ratio=neg_pos_ratio, + neg_sample_rate=neg_sample_rate, ) curated_data.to_csv("Curated_dataset.csv", index=False) _logger.info( @@ -160,7 +163,11 @@ def resolve_path(path_name: str, cwd: Path) -> Path: ) elif extracted_json_temp.is_dir(): - files = [f for f in extracted_json_temp.iterdir() if f.is_file()] + files = [ + f + for f in extracted_json_temp.iterdir() + if f.is_file() and f.name.endswith(".json") + ] curator_df = pd.DataFrame() for file in files: @@ -170,13 +177,13 @@ def resolve_path(path_name: str, cwd: Path) -> Path: annotation_file_path=annotation_temp, kpi_mapping_file_path=kpi_mapping_temp, create_neg_samples=create_neg_samples, - neg_pos_ratio=neg_pos_ratio, + neg_sample_rate=neg_sample_rate, ) curator_df = pd.concat([curator_df, temp_df], ignore_index=True) _logger.info(f"Added info from file {file.stem}.json to the curation file.") timestamp = datetime.now().strftime("%d%m%Y_%H%M") - csv_filename = f"Curated_dataset_{timestamp}.csv" + csv_filename = Path(output_path) / f"Curated_dataset_{timestamp}.csv" curator_df.to_csv(csv_filename, index=False) _logger.info("Curation ended.") @@ -187,7 +194,7 @@ def curate_one_file( annotation_file_path: Path, kpi_mapping_file_path: Path, create_neg_samples: bool, - neg_pos_ratio: int, + neg_sample_rate: int, ): """Curate data for a given file to a given folder for a specific setting. @@ -198,7 +205,7 @@ def curate_one_file( extract_json=dir_extracted_json_name, kpi_mapping_path=kpi_mapping_file_path, create_neg_samples=create_neg_samples, - neg_pos_ratio=neg_pos_ratio, + neg_sample_rate=neg_sample_rate, ).create_curator_df()