From e12a2757e70954a6c11f45c60ddd5fce4a4dd446 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nahuel=20Unai=20Rosell=C3=B3=20Beneitez?= Date: Tue, 24 Sep 2024 13:43:55 +0000 Subject: [PATCH 01/15] Add V2 version of the Corpus class Allows O(1) access to any segment or recording --- lib/corpus.py | 164 +++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 163 insertions(+), 1 deletion(-) diff --git a/lib/corpus.py b/lib/corpus.py index d89b1d3c..f7040c6c 100644 --- a/lib/corpus.py +++ b/lib/corpus.py @@ -163,7 +163,7 @@ def segments(self) -> Iterable[Segment]: """ for r in self.recordings: yield from r.segments - for sc in self.subcorpora: + for sc in self.subcorpora: yield from sc.segments() def get_segment_by_name(self, name: str) -> Segment: @@ -301,6 +301,154 @@ def _dump_internal(self, out: TextIO, indentation: str = ""): out.write("%s\n" % (indentation,)) +class CorpusV2(Corpus): + """ + This class represents a corpus in the Bliss format. It is also used to represent subcorpora when the parent_corpus + attribute is set. Corpora with include statements can be read but are written back as a single file. + + The difference with respect to :class:`i6_core.lib.corpus.Corpus` is that in this class we can access + any recording or segment in practically O(1). + """ + + def __init__(self): + super().__init__() + + self.parent_corpus: Optional[Corpus] = None + + self.subcorpora: Dict[str, Corpus] = {} + self.recordings: Dict[str, RecordingV2] = {} + + def segments(self) -> Iterable[Segment]: + """ + :return: an iterator over all segments within the corpus + """ + for r in self.recordings: + yield from r.segments.values() + for sc in self.subcorpora: + yield from sc.segments() + + def get_segment_by_name(self, name: str) -> Segment: + """ + :return: the segment specified by its name + """ + for seg in self.segments(): + if seg.name == name: + return seg + assert False, f"Segment '{name}' was not found in corpus" + + def get_segment_by_full_name(self, name: str) -> Optional[Segment]: + """ + :return: the segment specified by its full name + """ + if name == "": + # Found nothing. + return None + + if name in self.segments: + return self.segments[name] + else: + subcorpus_name = name.split("/")[0] + segment_name_from_subcorpus = name[len(f"{subcorpus_name}/"):] + return self.subcorpora[subcorpus_name].get_segment_by_full_name(segment_name_from_subcorpus) + + def get_recording_by_full_name(self, name: str) -> Optional[Segment]: + """ + :return: the segment specified by its full name + """ + if name == "": + # Found nothing. + return None + + if name in self.segments: + return self.segments[name] + else: + subcorpus_name = name.split("/")[0] + segment_name_from_subcorpus = name[len(f"{subcorpus_name}/"):] + return self.subcorpora[subcorpus_name].get_segment_by_full_name(segment_name_from_subcorpus) + + def all_recordings(self) -> Iterable[Recording]: + yield from self.recordings.values() + for sc in self.subcorpora.values(): + yield from sc.all_recordings() + + def all_speakers(self) -> Iterable[Speaker]: + yield from self.speakers.values() + for sc in self.subcorpora: + yield from sc.all_speakers() + + def top_level_recordings(self) -> Iterable[Recording]: + yield from self.recordings.values() + + def top_level_subcorpora(self) -> Iterable[Corpus]: + yield from self.subcorpora.values() + + def top_level_speakers(self) -> Iterable[Speaker]: + yield from self.speakers.values() + + def remove_recording(self, recording: Recording): + del self.recordings[recording.name] + for sc in self.subcorpora.values(): + sc.remove_recording(recording) + + def remove_recordings(self, recordings: List[Recording]): + for recording in recordings: + del self.recordings[recording.name] + for sc in self.subcorpora: + sc.remove_recordings(recordings) + + def add_recording(self, recording: Recording): + assert isinstance(recording, Recording) + recording.corpus = self + self.recordings[recording.name] = recording + + def add_subcorpus(self, corpus: Corpus): + assert isinstance(corpus, Corpus) + corpus.parent_corpus = self + self.subcorpora[corpus.name] = corpus + + def add_speaker(self, speaker: Speaker): + assert isinstance(speaker, Speaker) + self.speakers[speaker.name] = speaker + + def fullname(self) -> str: + if self.parent_corpus is not None: + return self.parent_corpus.fullname() + "/" + self.name + else: + return self.name + + def filter_segments(self, filter_function: FilterFunction): + """ + filter all segments (including in subcorpora) using filter_function + :param filter_function: takes arguments corpus, recording and segment, returns True if segment should be kept + """ + for r in self.recordings.values(): + r.segments = [s for s in r.segments.values() if filter_function(self, r, s)] + for sc in self.subcorpora: + sc.filter_segments(filter_function) + + def _dump_internal(self, out: TextIO, indentation: str = ""): + if self.parent_corpus is None: + out.write('\n' % self.name) + else: + out.write('%s\n' % (indentation, self.name)) + + for s in self.speakers.values(): + s.dump(out, indentation + " ") + if self.speaker_name is not None: + out.write('%s \n' % (indentation, self.speaker_name)) + + for r in self.recordings.values(): + r.dump(out, indentation + " ") + + for sc in self.subcorpora.values(): + sc._dump_internal(out, indentation + " ") + + if self.parent_corpus is None: + out.write("\n") + else: + out.write("%s\n" % (indentation,)) + + class Recording(NamedEntity, CorpusSection): def __init__(self): super().__init__() @@ -338,6 +486,20 @@ def add_segment(self, segment: Segment): self.segments.append(segment) +class RecordingV2(Recording): + def __init__(self): + super().__init__() + self.audio: Optional[str] = None + self.corpus: Optional[Corpus] = None + self.segments: Dict[str, Segment] = {} + + def add_segment(self, segment: Segment): + assert isinstance(segment, Segment) + segment.recording = self + self.segments[segment.name] = segment + + + class Segment(NamedEntity): def __init__(self): super().__init__() From 5bd8bc70c614eac57375ebaf10995538cacaf780 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nahuel=20Unai=20Rosell=C3=B3=20Beneitez?= Date: Tue, 24 Sep 2024 14:00:46 +0000 Subject: [PATCH 02/15] Several improvements Typing improvements, fixes on accessing segment/recording by full name --- lib/corpus.py | 54 ++++++++++++++++++++++++++++++++++----------------- 1 file changed, 36 insertions(+), 18 deletions(-) diff --git a/lib/corpus.py b/lib/corpus.py index f7040c6c..5d27b26b 100644 --- a/lib/corpus.py +++ b/lib/corpus.py @@ -331,12 +331,10 @@ def get_segment_by_name(self, name: str) -> Segment: """ :return: the segment specified by its name """ - for seg in self.segments(): - if seg.name == name: - return seg - assert False, f"Segment '{name}' was not found in corpus" + assert name in self.segments, f"Segment '{name}' was not found in corpus" + return self.segments[name] - def get_segment_by_full_name(self, name: str) -> Optional[Segment]: + def get_segment_by_full_name(self, name: str) -> Optional[Segment]: """ :return: the segment specified by its full name """ @@ -349,9 +347,19 @@ def get_segment_by_full_name(self, name: str) -> Optional[Segment]: else: subcorpus_name = name.split("/")[0] segment_name_from_subcorpus = name[len(f"{subcorpus_name}/"):] + if self.name == subcorpus_name: + # The name was the own corpus'. This can happen when giving the full segment name. + # Ignore the former part. + subcorpus_name = name.split("/")[0] + segment_name_from_subcorpus = name[len(f"{subcorpus_name}/"):] + + assert subcorpus_name in self.subcorpora, ( + f"Subcorpus '{subcorpus_name}' required for accessing segment '{name}' " + "not found in the list of subcorpora: {list(self.subcorpora.keys())}." + ) return self.subcorpora[subcorpus_name].get_segment_by_full_name(segment_name_from_subcorpus) - def get_recording_by_full_name(self, name: str) -> Optional[Segment]: + def get_recording_by_full_name(self, name: str) -> Optional[RecordingV2]: """ :return: the segment specified by its full name """ @@ -359,14 +367,24 @@ def get_recording_by_full_name(self, name: str) -> Optional[Segment]: # Found nothing. return None - if name in self.segments: - return self.segments[name] + if name in self.recordings: + return self.recordings[name] else: subcorpus_name = name.split("/")[0] - segment_name_from_subcorpus = name[len(f"{subcorpus_name}/"):] - return self.subcorpora[subcorpus_name].get_segment_by_full_name(segment_name_from_subcorpus) - - def all_recordings(self) -> Iterable[Recording]: + recording_name_from_subcorpus = name[len(f"{subcorpus_name}/"):] + if self.name == subcorpus_name: + # The name was the own corpus'. This can happen when giving the full recording name. + # Ignore the former part. + subcorpus_name = name.split("/")[0] + recording_name_from_subcorpus = name[len(f"{subcorpus_name}/"):] + + assert subcorpus_name in self.subcorpora, ( + f"Subcorpus '{subcorpus_name}' required for accessing recording '{name}' " + "not found in the list of subcorpora: {list(self.subcorpora.keys())}." + ) + return self.subcorpora[subcorpus_name].get_recording_by_full_name(recording_name_from_subcorpus) + + def all_recordings(self) -> Iterable[RecordingV2]: yield from self.recordings.values() for sc in self.subcorpora.values(): yield from sc.all_recordings() @@ -376,32 +394,32 @@ def all_speakers(self) -> Iterable[Speaker]: for sc in self.subcorpora: yield from sc.all_speakers() - def top_level_recordings(self) -> Iterable[Recording]: + def top_level_recordings(self) -> Iterable[RecordingV2]: yield from self.recordings.values() - def top_level_subcorpora(self) -> Iterable[Corpus]: + def top_level_subcorpora(self) -> Iterable[CorpusV2]: yield from self.subcorpora.values() def top_level_speakers(self) -> Iterable[Speaker]: yield from self.speakers.values() - def remove_recording(self, recording: Recording): + def remove_recording(self, recording: RecordingV2): del self.recordings[recording.name] for sc in self.subcorpora.values(): sc.remove_recording(recording) - def remove_recordings(self, recordings: List[Recording]): + def remove_recordings(self, recordings: List[RecordingV2]): for recording in recordings: del self.recordings[recording.name] for sc in self.subcorpora: sc.remove_recordings(recordings) - def add_recording(self, recording: Recording): + def add_recording(self, recording: RecordingV2): assert isinstance(recording, Recording) recording.corpus = self self.recordings[recording.name] = recording - def add_subcorpus(self, corpus: Corpus): + def add_subcorpus(self, corpus: CorpusV2): assert isinstance(corpus, Corpus) corpus.parent_corpus = self self.subcorpora[corpus.name] = corpus From f3188b32b14aaa5d1eb2059ca8a4703be8f7e52b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nahuel=20Unai=20Rosell=C3=B3=20Beneitez?= Date: Tue, 24 Sep 2024 14:02:34 +0000 Subject: [PATCH 03/15] Black --- lib/corpus.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/lib/corpus.py b/lib/corpus.py index 5d27b26b..fa346ed2 100644 --- a/lib/corpus.py +++ b/lib/corpus.py @@ -1,6 +1,7 @@ """ Helper functions and classes for Bliss xml corpus loading and writing """ + from __future__ import annotations __all__ = ["NamedEntity", "CorpusSection", "Corpus", "Recording", "Segment", "Speaker"] @@ -163,8 +164,9 @@ def segments(self) -> Iterable[Segment]: """ for r in self.recordings: yield from r.segments + for sc in self.subcorpora: - yield from sc.segments() + yield from sc.segments() def get_segment_by_name(self, name: str) -> Segment: """ @@ -346,12 +348,12 @@ def get_segment_by_full_name(self, name: str) -> Optional[Segment]: return self.segments[name] else: subcorpus_name = name.split("/")[0] - segment_name_from_subcorpus = name[len(f"{subcorpus_name}/"):] + segment_name_from_subcorpus = name[len(f"{subcorpus_name}/") :] if self.name == subcorpus_name: # The name was the own corpus'. This can happen when giving the full segment name. # Ignore the former part. subcorpus_name = name.split("/")[0] - segment_name_from_subcorpus = name[len(f"{subcorpus_name}/"):] + segment_name_from_subcorpus = name[len(f"{subcorpus_name}/") :] assert subcorpus_name in self.subcorpora, ( f"Subcorpus '{subcorpus_name}' required for accessing segment '{name}' " @@ -371,12 +373,12 @@ def get_recording_by_full_name(self, name: str) -> Optional[RecordingV2]: return self.recordings[name] else: subcorpus_name = name.split("/")[0] - recording_name_from_subcorpus = name[len(f"{subcorpus_name}/"):] + recording_name_from_subcorpus = name[len(f"{subcorpus_name}/") :] if self.name == subcorpus_name: # The name was the own corpus'. This can happen when giving the full recording name. # Ignore the former part. subcorpus_name = name.split("/")[0] - recording_name_from_subcorpus = name[len(f"{subcorpus_name}/"):] + recording_name_from_subcorpus = name[len(f"{subcorpus_name}/") :] assert subcorpus_name in self.subcorpora, ( f"Subcorpus '{subcorpus_name}' required for accessing recording '{name}' " @@ -517,7 +519,6 @@ def add_segment(self, segment: Segment): self.segments[segment.name] = segment - class Segment(NamedEntity): def __init__(self): super().__init__() From e045629445d3ac830a53eb2172984a77d46fccab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nahuel=20Unai=20Rosell=C3=B3=20Beneitez?= Date: Tue, 24 Sep 2024 14:03:34 +0000 Subject: [PATCH 04/15] Fix indentation --- lib/corpus.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/corpus.py b/lib/corpus.py index fa346ed2..9c8a884c 100644 --- a/lib/corpus.py +++ b/lib/corpus.py @@ -165,8 +165,8 @@ def segments(self) -> Iterable[Segment]: for r in self.recordings: yield from r.segments - for sc in self.subcorpora: - yield from sc.segments() + for sc in self.subcorpora: + yield from sc.segments() def get_segment_by_name(self, name: str) -> Segment: """ From 824ffd1bffe3996119e17026d44cfdb0f5226478 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nahuel=20Unai=20Rosell=C3=B3=20Beneitez?= Date: Tue, 24 Sep 2024 14:04:59 +0000 Subject: [PATCH 05/15] Fix subcorpus access --- lib/corpus.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/corpus.py b/lib/corpus.py index 9c8a884c..3320faed 100644 --- a/lib/corpus.py +++ b/lib/corpus.py @@ -326,7 +326,7 @@ def segments(self) -> Iterable[Segment]: """ for r in self.recordings: yield from r.segments.values() - for sc in self.subcorpora: + for sc in self.subcorpora.values(): yield from sc.segments() def get_segment_by_name(self, name: str) -> Segment: @@ -393,7 +393,7 @@ def all_recordings(self) -> Iterable[RecordingV2]: def all_speakers(self) -> Iterable[Speaker]: yield from self.speakers.values() - for sc in self.subcorpora: + for sc in self.subcorpora.values(): yield from sc.all_speakers() def top_level_recordings(self) -> Iterable[RecordingV2]: @@ -443,7 +443,7 @@ def filter_segments(self, filter_function: FilterFunction): """ for r in self.recordings.values(): r.segments = [s for s in r.segments.values() if filter_function(self, r, s)] - for sc in self.subcorpora: + for sc in self.subcorpora.values(): sc.filter_segments(filter_function) def _dump_internal(self, out: TextIO, indentation: str = ""): From 92c646cb4ec7539d01243fa7798b159932214242 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nahuel=20Unai=20Rosell=C3=B3=20Beneitez?= Date: Tue, 24 Sep 2024 15:31:38 +0000 Subject: [PATCH 06/15] Update load function to allow V2 classes to be loaded --- lib/corpus.py | 65 +++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 47 insertions(+), 18 deletions(-) diff --git a/lib/corpus.py b/lib/corpus.py index 3320faed..26540395 100644 --- a/lib/corpus.py +++ b/lib/corpus.py @@ -57,45 +57,63 @@ def startElement(self, name: str, attrs: Dict[str, str]): assert len(self.elements) == 1, " may only occur as the root element" e.name = attrs["name"] elif name == "subcorpus": - assert isinstance(e, Corpus), " may only occur within a or element" - subcorpus = Corpus() + assert isinstance( + e, (Corpus, CorpusV2) + ), " may only occur within a or element" + subcorpus = type(e)() subcorpus.name = attrs["name"] subcorpus.parent_corpus = e - e.subcorpora.append(subcorpus) + if isinstance(e, Corpus): + e.subcorpora.append(subcorpus) + elif isinstance(e, CorpusV2): + e.subcorpora[subcorpus.name] = subcorpus self.elements.append(subcorpus) elif name == "include": - assert isinstance(e, Corpus), " may only occur within a or element" + assert isinstance( + e, (Corpus, CorpusV2) + ), " may only occur within a or element" path = os.path.join(os.path.dirname(self.path), attrs["file"]) - c = Corpus() + c = type(e)() c.load(path) if c.name != e.name: print( "Warning: included corpus (%s) has a different name than the current corpus (%s)" % (c.name, e.name) ) - for sc in c.subcorpora: - sc.parent_corpus = e.parent_corpus - for r in c.recordings: - r.corpus = e - e.subcorpora.extend(c.subcorpora) - e.recordings.extend(c.recordings) + if isinstance(e, Corpus): + for sc in c.subcorpora: + sc.parent_corpus = e.parent_corpus + for r in c.recordings: + r.corpus = e + e.subcorpora.extend(c.subcorpora) + e.recordings.extend(c.recordings) + elif isinstance(e, CorpusV2): + for sc in c.subcorpora.values(): + sc.parent_corpus = e.parent_corpus + for r in c.recordings.values(): + r.corpus = e + e.subcorpora.update({sc.name: sc for sc in c.subcorpora.values()}) + e.recordings.update({r.name: r for r in c.recordings.values()}) e.speakers.update(c.speakers) elif name == "recording": - assert isinstance(e, Corpus), " may only occur within a or element" - rec = Recording() + assert isinstance( + e, (Corpus, CorpusV2) + ), " may only occur within a or element" + if isinstance(e, Corpus): + rec = Recording() + elif isinstance(e, CorpusV2): + rec = RecordingV2() rec.name = attrs["name"] rec.audio = attrs["audio"] - rec.corpus = e - e.recordings.append(rec) + e.add_recording(rec) self.elements.append(rec) elif name == "segment": - assert isinstance(e, Recording), " may only occur within a element" + assert isinstance(e, (Recording, RecordingV2)), " may only occur within a element" seg = Segment() seg.name = attrs.get("name", str(len(e.segments) + 1)) seg.start = float(attrs.get("start", "0.0")) seg.end = float(attrs.get("end", "0.0")) seg.track = int(attrs["track"]) if "track" in attrs else None - seg.recording = e - e.segments.append(seg) + e.add_segment(seg) self.elements.append(seg) elif name == "speaker-description": assert isinstance( @@ -518,6 +536,17 @@ def add_segment(self, segment: Segment): segment.recording = self self.segments[segment.name] = segment + def dump(self, out: TextIO, indentation: str = ""): + out.write('%s\n' % (indentation, self.name, self.audio)) + + for s in self.speakers.values(): + s.dump(out, indentation + " ") + if self.speaker_name is not None: + out.write('%s \n' % (indentation, self.speaker_name)) + + for s in self.segments.values(): + s.dump(out, indentation + " ") + class Segment(NamedEntity): def __init__(self): From 7b7f4b841b69fea4bf70b7cb9297ebd23032d0e0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nahuel=20Unai=20Rosell=C3=B3=20Beneitez?= Date: Tue, 24 Sep 2024 15:51:50 +0000 Subject: [PATCH 07/15] Improve get_segment_by_full_name function --- lib/corpus.py | 41 +++++++++++++++++++++++++++++------------ 1 file changed, 29 insertions(+), 12 deletions(-) diff --git a/lib/corpus.py b/lib/corpus.py index 26540395..4fb42615 100644 --- a/lib/corpus.py +++ b/lib/corpus.py @@ -356,28 +356,45 @@ def get_segment_by_name(self, name: str) -> Segment: def get_segment_by_full_name(self, name: str) -> Optional[Segment]: """ - :return: the segment specified by its full name + Obtains a segment from the corpus given its full name in the corpus. + + :param name: The full name of the segment. + :return: The segment to be searched for, or `None` if not found. + """ + split_segment_name = name.split("/") + potential_corpus_name = split_segment_name[0] + if self.name == potential_corpus_name: + # The name was the own corpus'. This can happen the first iteration when giving the full segment name, + # for instance 'base_corpus/subcorpus1/subcorpus2/recording/segment'. + # Get rid of the first part for the search. + name = split_segment_name[1:] + + return self._get_segment_by_full_name(name) + + def _get_segment_by_full_name(self, name: str) -> Optional[Segment]: + """ + :param name: The name of the segment to be searched for, relative to the current corpus. + :return: The segment whose name coincides with :param:`name` relative to the current corpus, + or `None` if not found. """ if name == "": # Found nothing. return None - if name in self.segments: - return self.segments[name] + # Base case: the recording name comes first, and the segment name appears immediately afterwards. + recording_name = name.split("/")[0] + segment_name = name[len(f"{recording_name}/") :] + if recording_name in self.recordings: + return self.recordings[recording_name][segment_name] else: - subcorpus_name = name.split("/")[0] - segment_name_from_subcorpus = name[len(f"{subcorpus_name}/") :] - if self.name == subcorpus_name: - # The name was the own corpus'. This can happen when giving the full segment name. - # Ignore the former part. - subcorpus_name = name.split("/")[0] - segment_name_from_subcorpus = name[len(f"{subcorpus_name}/") :] - + # Recursive case: look one level deeper to the indicated subcorpus. + subcorpus_name = recording_name + segment_full_name_from_subcorpus = segment_name assert subcorpus_name in self.subcorpora, ( f"Subcorpus '{subcorpus_name}' required for accessing segment '{name}' " "not found in the list of subcorpora: {list(self.subcorpora.keys())}." ) - return self.subcorpora[subcorpus_name].get_segment_by_full_name(segment_name_from_subcorpus) + return self.subcorpora[subcorpus_name].get_segment_by_full_name(segment_full_name_from_subcorpus) def get_recording_by_full_name(self, name: str) -> Optional[RecordingV2]: """ From bfd6bbcea4b2c1c0a071e8aa2ca53389598dec87 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nahuel=20Unai=20Rosell=C3=B3=20Beneitez?= Date: Tue, 24 Sep 2024 16:08:36 +0000 Subject: [PATCH 08/15] Don't make V2 classes inherit from V1 classes --- lib/corpus.py | 60 +++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 49 insertions(+), 11 deletions(-) diff --git a/lib/corpus.py b/lib/corpus.py index 4fb42615..ce5c34d4 100644 --- a/lib/corpus.py +++ b/lib/corpus.py @@ -60,7 +60,7 @@ def startElement(self, name: str, attrs: Dict[str, str]): assert isinstance( e, (Corpus, CorpusV2) ), " may only occur within a or element" - subcorpus = type(e)() + subcorpus = () subcorpus.name = attrs["name"] subcorpus.parent_corpus = e if isinstance(e, Corpus): @@ -321,7 +321,7 @@ def _dump_internal(self, out: TextIO, indentation: str = ""): out.write("%s\n" % (indentation,)) -class CorpusV2(Corpus): +class CorpusV2(NamedEntity, CorpusSection): """ This class represents a corpus in the Bliss format. It is also used to represent subcorpora when the parent_corpus attribute is set. Corpora with include statements can be read but are written back as a single file. @@ -333,9 +333,9 @@ class CorpusV2(Corpus): def __init__(self): super().__init__() - self.parent_corpus: Optional[Corpus] = None + self.parent_corpus: Optional[CorpusV2] = None - self.subcorpora: Dict[str, Corpus] = {} + self.subcorpora: Dict[str, CorpusV2] = {} self.recordings: Dict[str, RecordingV2] = {} def segments(self) -> Iterable[Segment]: @@ -452,7 +452,7 @@ def remove_recordings(self, recordings: List[RecordingV2]): sc.remove_recordings(recordings) def add_recording(self, recording: RecordingV2): - assert isinstance(recording, Recording) + assert isinstance(recording, RecordingV2) recording.corpus = self self.recordings[recording.name] = recording @@ -481,6 +481,26 @@ def filter_segments(self, filter_function: FilterFunction): for sc in self.subcorpora.values(): sc.filter_segments(filter_function) + def load(self, path: str): + """ + :param path: corpus .xml or .xml.gz + """ + open_fun = gzip.open if path.endswith(".gz") else open + + with open_fun(path, "rt") as f: + handler = CorpusParser(self, path) + sax.parse(f, handler) + + def dump(self, path: str): + """ + :param path: target .xml or .xml.gz path + """ + open_fun = gzip.open if path.endswith(".gz") else open + + with open_fun(path, "wt") as f: + f.write('\n') + self._dump_internal(f) + def _dump_internal(self, out: TextIO, indentation: str = ""): if self.parent_corpus is None: out.write('\n' % self.name) @@ -541,17 +561,30 @@ def add_segment(self, segment: Segment): self.segments.append(segment) -class RecordingV2(Recording): +class RecordingV2(NamedEntity, CorpusSection): + """ + Represents a recording, which is an entity composed of an audio file and a set of segments. + + The difference with respect to :class:`i6_core.lib.corpus.Recording` is that this class allows access + from the parent corpus to any segment in O(1). + """ + def __init__(self): super().__init__() self.audio: Optional[str] = None - self.corpus: Optional[Corpus] = None + self.corpus: Optional[CorpusV2] = None self.segments: Dict[str, Segment] = {} - def add_segment(self, segment: Segment): - assert isinstance(segment, Segment) - segment.recording = self - self.segments[segment.name] = segment + def fullname(self) -> str: + return self.corpus.fullname() + "/" + self.name + + def speaker(self, speaker_name: Optional[str] = None) -> Speaker: + if speaker_name is None: + speaker_name = self.speaker_name + if speaker_name in self.speakers: + return self.speakers[speaker_name] + else: + return self.corpus.speaker(speaker_name, self.default_speaker) def dump(self, out: TextIO, indentation: str = ""): out.write('%s\n' % (indentation, self.name, self.audio)) @@ -564,6 +597,11 @@ def dump(self, out: TextIO, indentation: str = ""): for s in self.segments.values(): s.dump(out, indentation + " ") + def add_segment(self, segment: Segment): + assert isinstance(segment, Segment) + segment.recording = self + self.segments[segment.name] = segment + class Segment(NamedEntity): def __init__(self): From 230585bad1b9314943068bed769dc927e33194d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nahuel=20Unai=20Rosell=C3=B3=20Beneitez?= Date: Tue, 24 Sep 2024 16:18:41 +0000 Subject: [PATCH 09/15] Fix subcorpus creation --- lib/corpus.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/corpus.py b/lib/corpus.py index ce5c34d4..65490e55 100644 --- a/lib/corpus.py +++ b/lib/corpus.py @@ -60,7 +60,7 @@ def startElement(self, name: str, attrs: Dict[str, str]): assert isinstance( e, (Corpus, CorpusV2) ), " may only occur within a or element" - subcorpus = () + subcorpus = type(e)() subcorpus.name = attrs["name"] subcorpus.parent_corpus = e if isinstance(e, Corpus): From 224b61877f42f4a861d863631749020c182ef631 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nahuel=20Unai=20Rosell=C3=B3=20Beneitez?= Date: Wed, 25 Sep 2024 08:24:14 +0000 Subject: [PATCH 10/15] Remove redundant get_segment_by_name, add get_segment_map get_segment_map allows for more explicit control to the user --- lib/corpus.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/lib/corpus.py b/lib/corpus.py index 65490e55..04e96dcf 100644 --- a/lib/corpus.py +++ b/lib/corpus.py @@ -347,12 +347,17 @@ def segments(self) -> Iterable[Segment]: for sc in self.subcorpora.values(): yield from sc.segments() - def get_segment_by_name(self, name: str) -> Segment: + def segment_map(self) -> Dict[str, Segment]: """ - :return: the segment specified by its name + :return: A mapping from full segment names into the actual segments. + Note that this is similar to what the function :func:`get_segment_by_full_name` does, + but giving more control to the user. """ - assert name in self.segments, f"Segment '{name}' was not found in corpus" - return self.segments[name] + seg_map = {seg.fullname(): seg for rec in self.recordings.values() for seg in rec.segments.values()} + for sc in self.subcorpora.values(): + seg_map.update(sc.segment_map()) + + return seg_map def get_segment_by_full_name(self, name: str) -> Optional[Segment]: """ From 81582f4cd94219116ca2e0d3ad581829a1dd1c9d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nahuel=20Unai=20Rosell=C3=B3=20Beneitez?= Date: Wed, 25 Sep 2024 08:43:52 +0000 Subject: [PATCH 11/15] Fix function name --- lib/corpus.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/corpus.py b/lib/corpus.py index 04e96dcf..aa05c05e 100644 --- a/lib/corpus.py +++ b/lib/corpus.py @@ -374,9 +374,9 @@ def get_segment_by_full_name(self, name: str) -> Optional[Segment]: # Get rid of the first part for the search. name = split_segment_name[1:] - return self._get_segment_by_full_name(name) + return self._get_segment_by_relative_name(name) - def _get_segment_by_full_name(self, name: str) -> Optional[Segment]: + def _get_segment_by_relative_name(self, name: str) -> Segment: """ :param name: The name of the segment to be searched for, relative to the current corpus. :return: The segment whose name coincides with :param:`name` relative to the current corpus, From a10e4273ba8a7a44986ff077a9f0193128719dfc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nahuel=20Unai=20Rosell=C3=B3=20Beneitez?= Date: Thu, 26 Sep 2024 10:16:03 +0000 Subject: [PATCH 12/15] Remove V2 implementation, add `get_{recording,segment}_mapping` --- lib/corpus.py | 254 +++----------------------------------------------- 1 file changed, 11 insertions(+), 243 deletions(-) diff --git a/lib/corpus.py b/lib/corpus.py index aa05c05e..80442d70 100644 --- a/lib/corpus.py +++ b/lib/corpus.py @@ -320,213 +320,17 @@ def _dump_internal(self, out: TextIO, indentation: str = ""): else: out.write("%s\n" % (indentation,)) - -class CorpusV2(NamedEntity, CorpusSection): - """ - This class represents a corpus in the Bliss format. It is also used to represent subcorpora when the parent_corpus - attribute is set. Corpora with include statements can be read but are written back as a single file. - - The difference with respect to :class:`i6_core.lib.corpus.Corpus` is that in this class we can access - any recording or segment in practically O(1). - """ - - def __init__(self): - super().__init__() - - self.parent_corpus: Optional[CorpusV2] = None - - self.subcorpora: Dict[str, CorpusV2] = {} - self.recordings: Dict[str, RecordingV2] = {} - - def segments(self) -> Iterable[Segment]: - """ - :return: an iterator over all segments within the corpus - """ - for r in self.recordings: - yield from r.segments.values() - for sc in self.subcorpora.values(): - yield from sc.segments() - - def segment_map(self) -> Dict[str, Segment]: - """ - :return: A mapping from full segment names into the actual segments. - Note that this is similar to what the function :func:`get_segment_by_full_name` does, - but giving more control to the user. - """ - seg_map = {seg.fullname(): seg for rec in self.recordings.values() for seg in rec.segments.values()} - for sc in self.subcorpora.values(): - seg_map.update(sc.segment_map()) - - return seg_map - - def get_segment_by_full_name(self, name: str) -> Optional[Segment]: - """ - Obtains a segment from the corpus given its full name in the corpus. - - :param name: The full name of the segment. - :return: The segment to be searched for, or `None` if not found. - """ - split_segment_name = name.split("/") - potential_corpus_name = split_segment_name[0] - if self.name == potential_corpus_name: - # The name was the own corpus'. This can happen the first iteration when giving the full segment name, - # for instance 'base_corpus/subcorpus1/subcorpus2/recording/segment'. - # Get rid of the first part for the search. - name = split_segment_name[1:] - - return self._get_segment_by_relative_name(name) - - def _get_segment_by_relative_name(self, name: str) -> Segment: - """ - :param name: The name of the segment to be searched for, relative to the current corpus. - :return: The segment whose name coincides with :param:`name` relative to the current corpus, - or `None` if not found. - """ - if name == "": - # Found nothing. - return None - - # Base case: the recording name comes first, and the segment name appears immediately afterwards. - recording_name = name.split("/")[0] - segment_name = name[len(f"{recording_name}/") :] - if recording_name in self.recordings: - return self.recordings[recording_name][segment_name] - else: - # Recursive case: look one level deeper to the indicated subcorpus. - subcorpus_name = recording_name - segment_full_name_from_subcorpus = segment_name - assert subcorpus_name in self.subcorpora, ( - f"Subcorpus '{subcorpus_name}' required for accessing segment '{name}' " - "not found in the list of subcorpora: {list(self.subcorpora.keys())}." - ) - return self.subcorpora[subcorpus_name].get_segment_by_full_name(segment_full_name_from_subcorpus) - - def get_recording_by_full_name(self, name: str) -> Optional[RecordingV2]: - """ - :return: the segment specified by its full name - """ - if name == "": - # Found nothing. - return None - - if name in self.recordings: - return self.recordings[name] - else: - subcorpus_name = name.split("/")[0] - recording_name_from_subcorpus = name[len(f"{subcorpus_name}/") :] - if self.name == subcorpus_name: - # The name was the own corpus'. This can happen when giving the full recording name. - # Ignore the former part. - subcorpus_name = name.split("/")[0] - recording_name_from_subcorpus = name[len(f"{subcorpus_name}/") :] - - assert subcorpus_name in self.subcorpora, ( - f"Subcorpus '{subcorpus_name}' required for accessing recording '{name}' " - "not found in the list of subcorpora: {list(self.subcorpora.keys())}." - ) - return self.subcorpora[subcorpus_name].get_recording_by_full_name(recording_name_from_subcorpus) - - def all_recordings(self) -> Iterable[RecordingV2]: - yield from self.recordings.values() - for sc in self.subcorpora.values(): - yield from sc.all_recordings() - - def all_speakers(self) -> Iterable[Speaker]: - yield from self.speakers.values() - for sc in self.subcorpora.values(): - yield from sc.all_speakers() - - def top_level_recordings(self) -> Iterable[RecordingV2]: - yield from self.recordings.values() - - def top_level_subcorpora(self) -> Iterable[CorpusV2]: - yield from self.subcorpora.values() - - def top_level_speakers(self) -> Iterable[Speaker]: - yield from self.speakers.values() - - def remove_recording(self, recording: RecordingV2): - del self.recordings[recording.name] - for sc in self.subcorpora.values(): - sc.remove_recording(recording) - - def remove_recordings(self, recordings: List[RecordingV2]): - for recording in recordings: - del self.recordings[recording.name] - for sc in self.subcorpora: - sc.remove_recordings(recordings) - - def add_recording(self, recording: RecordingV2): - assert isinstance(recording, RecordingV2) - recording.corpus = self - self.recordings[recording.name] = recording - - def add_subcorpus(self, corpus: CorpusV2): - assert isinstance(corpus, Corpus) - corpus.parent_corpus = self - self.subcorpora[corpus.name] = corpus - - def add_speaker(self, speaker: Speaker): - assert isinstance(speaker, Speaker) - self.speakers[speaker.name] = speaker - - def fullname(self) -> str: - if self.parent_corpus is not None: - return self.parent_corpus.fullname() + "/" + self.name - else: - return self.name - - def filter_segments(self, filter_function: FilterFunction): - """ - filter all segments (including in subcorpora) using filter_function - :param filter_function: takes arguments corpus, recording and segment, returns True if segment should be kept - """ - for r in self.recordings.values(): - r.segments = [s for s in r.segments.values() if filter_function(self, r, s)] - for sc in self.subcorpora.values(): - sc.filter_segments(filter_function) - - def load(self, path: str): + def get_segment_mapping(self) -> Dict[str, Segment]: """ - :param path: corpus .xml or .xml.gz + :return: Mapping from segment fullnames to actual segments. """ - open_fun = gzip.open if path.endswith(".gz") else open - - with open_fun(path, "rt") as f: - handler = CorpusParser(self, path) - sax.parse(f, handler) + return {seg.fullname(): seg for seg in self.segments()} - def dump(self, path: str): + def get_recording_mapping(self) -> Dict[str, Recording]: """ - :param path: target .xml or .xml.gz path + :return: Mapping from recording fullnames to actual recordings. """ - open_fun = gzip.open if path.endswith(".gz") else open - - with open_fun(path, "wt") as f: - f.write('\n') - self._dump_internal(f) - - def _dump_internal(self, out: TextIO, indentation: str = ""): - if self.parent_corpus is None: - out.write('\n' % self.name) - else: - out.write('%s\n' % (indentation, self.name)) - - for s in self.speakers.values(): - s.dump(out, indentation + " ") - if self.speaker_name is not None: - out.write('%s \n' % (indentation, self.speaker_name)) - - for r in self.recordings.values(): - r.dump(out, indentation + " ") - - for sc in self.subcorpora.values(): - sc._dump_internal(out, indentation + " ") - - if self.parent_corpus is None: - out.write("\n") - else: - out.write("%s\n" % (indentation,)) + return {rec.fullname(): rec for rec in self.recordings} class Recording(NamedEntity, CorpusSection): @@ -565,47 +369,11 @@ def add_segment(self, segment: Segment): segment.recording = self self.segments.append(segment) - -class RecordingV2(NamedEntity, CorpusSection): - """ - Represents a recording, which is an entity composed of an audio file and a set of segments. - - The difference with respect to :class:`i6_core.lib.corpus.Recording` is that this class allows access - from the parent corpus to any segment in O(1). - """ - - def __init__(self): - super().__init__() - self.audio: Optional[str] = None - self.corpus: Optional[CorpusV2] = None - self.segments: Dict[str, Segment] = {} - - def fullname(self) -> str: - return self.corpus.fullname() + "/" + self.name - - def speaker(self, speaker_name: Optional[str] = None) -> Speaker: - if speaker_name is None: - speaker_name = self.speaker_name - if speaker_name in self.speakers: - return self.speakers[speaker_name] - else: - return self.corpus.speaker(speaker_name, self.default_speaker) - - def dump(self, out: TextIO, indentation: str = ""): - out.write('%s\n' % (indentation, self.name, self.audio)) - - for s in self.speakers.values(): - s.dump(out, indentation + " ") - if self.speaker_name is not None: - out.write('%s \n' % (indentation, self.speaker_name)) - - for s in self.segments.values(): - s.dump(out, indentation + " ") - - def add_segment(self, segment: Segment): - assert isinstance(segment, Segment) - segment.recording = self - self.segments[segment.name] = segment + def get_segment_mapping(self) -> Dict[str, Segment]: + """ + :return: Mapping from segment fullnames to actual segments. + """ + return {seg.fullname(): seg for seg in self.segments} class Segment(NamedEntity): From 17c96b8daeed963e03ad70c38f97a8104af31793 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nahuel=20Unai=20Rosell=C3=B3=20Beneitez?= Date: Thu, 26 Sep 2024 15:09:36 +0000 Subject: [PATCH 13/15] Remove any reference to V2 --- lib/corpus.py | 46 +++++++++++++--------------------------------- 1 file changed, 13 insertions(+), 33 deletions(-) diff --git a/lib/corpus.py b/lib/corpus.py index 80442d70..767ee46a 100644 --- a/lib/corpus.py +++ b/lib/corpus.py @@ -57,57 +57,37 @@ def startElement(self, name: str, attrs: Dict[str, str]): assert len(self.elements) == 1, " may only occur as the root element" e.name = attrs["name"] elif name == "subcorpus": - assert isinstance( - e, (Corpus, CorpusV2) - ), " may only occur within a or element" + assert isinstance(e, Corpus), " may only occur within a or element" subcorpus = type(e)() subcorpus.name = attrs["name"] subcorpus.parent_corpus = e - if isinstance(e, Corpus): - e.subcorpora.append(subcorpus) - elif isinstance(e, CorpusV2): - e.subcorpora[subcorpus.name] = subcorpus + e.subcorpora.append(subcorpus) self.elements.append(subcorpus) elif name == "include": - assert isinstance( - e, (Corpus, CorpusV2) - ), " may only occur within a or element" + assert isinstance(e, Corpus), " may only occur within a or element" path = os.path.join(os.path.dirname(self.path), attrs["file"]) - c = type(e)() + c = Corpus() c.load(path) if c.name != e.name: print( "Warning: included corpus (%s) has a different name than the current corpus (%s)" % (c.name, e.name) ) - if isinstance(e, Corpus): - for sc in c.subcorpora: - sc.parent_corpus = e.parent_corpus - for r in c.recordings: - r.corpus = e - e.subcorpora.extend(c.subcorpora) - e.recordings.extend(c.recordings) - elif isinstance(e, CorpusV2): - for sc in c.subcorpora.values(): - sc.parent_corpus = e.parent_corpus - for r in c.recordings.values(): - r.corpus = e - e.subcorpora.update({sc.name: sc for sc in c.subcorpora.values()}) - e.recordings.update({r.name: r for r in c.recordings.values()}) + for sc in c.subcorpora: + sc.parent_corpus = e.parent_corpus + for r in c.recordings: + r.corpus = e + e.subcorpora.extend(c.subcorpora) + e.recordings.extend(c.recordings) e.speakers.update(c.speakers) elif name == "recording": - assert isinstance( - e, (Corpus, CorpusV2) - ), " may only occur within a or element" - if isinstance(e, Corpus): - rec = Recording() - elif isinstance(e, CorpusV2): - rec = RecordingV2() + assert isinstance(e, Corpus), " may only occur within a or element" + rec = Recording() rec.name = attrs["name"] rec.audio = attrs["audio"] e.add_recording(rec) self.elements.append(rec) elif name == "segment": - assert isinstance(e, (Recording, RecordingV2)), " may only occur within a element" + assert isinstance(e, Recording), " may only occur within a element" seg = Segment() seg.name = attrs.get("name", str(len(e.segments) + 1)) seg.start = float(attrs.get("start", "0.0")) From 02722e0bc090f314cc1afbfa9584c2d452aa545f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nahuel=20Unai=20Rosell=C3=B3=20Beneitez?= Date: Thu, 3 Oct 2024 07:29:08 +0000 Subject: [PATCH 14/15] Improve some small details --- lib/corpus.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lib/corpus.py b/lib/corpus.py index 767ee46a..51e1246c 100644 --- a/lib/corpus.py +++ b/lib/corpus.py @@ -58,7 +58,7 @@ def startElement(self, name: str, attrs: Dict[str, str]): e.name = attrs["name"] elif name == "subcorpus": assert isinstance(e, Corpus), " may only occur within a or element" - subcorpus = type(e)() + subcorpus = Corpus() subcorpus.name = attrs["name"] subcorpus.parent_corpus = e e.subcorpora.append(subcorpus) @@ -162,7 +162,6 @@ def segments(self) -> Iterable[Segment]: """ for r in self.recordings: yield from r.segments - for sc in self.subcorpora: yield from sc.segments() From 120cc76b6638d58b1884d52b138c2c55cfcb06bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nahuel=20Unai=20Rosell=C3=B3=20Beneitez?= <31628502+Icemole@users.noreply.github.com> Date: Fri, 4 Oct 2024 18:03:25 +0200 Subject: [PATCH 15/15] Fix access to recordings in nested subcorpora Co-authored-by: michelwi --- lib/corpus.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/corpus.py b/lib/corpus.py index 51e1246c..076cb048 100644 --- a/lib/corpus.py +++ b/lib/corpus.py @@ -309,7 +309,7 @@ def get_recording_mapping(self) -> Dict[str, Recording]: """ :return: Mapping from recording fullnames to actual recordings. """ - return {rec.fullname(): rec for rec in self.recordings} + return {rec.fullname(): rec for rec in self.all_recordings()} class Recording(NamedEntity, CorpusSection):