Skip to content

Commit

Permalink
Add V2 version of the Corpus class
Browse files Browse the repository at this point in the history
Allows O(1) access to any segment or recording
  • Loading branch information
Icemole committed Sep 24, 2024
1 parent ef25c89 commit e12a275
Showing 1 changed file with 163 additions and 1 deletion.
164 changes: 163 additions & 1 deletion lib/corpus.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ def segments(self) -> Iterable[Segment]:
"""
for r in self.recordings:
yield from r.segments
for sc in self.subcorpora:
for sc in self.subcorpora:
yield from sc.segments()

def get_segment_by_name(self, name: str) -> Segment:
Expand Down Expand Up @@ -301,6 +301,154 @@ def _dump_internal(self, out: TextIO, indentation: str = ""):
out.write("%s</subcorpus>\n" % (indentation,))


class CorpusV2(Corpus):
"""
This class represents a corpus in the Bliss format. It is also used to represent subcorpora when the parent_corpus
attribute is set. Corpora with include statements can be read but are written back as a single file.
The difference with respect to :class:`i6_core.lib.corpus.Corpus` is that in this class we can access
any recording or segment in practically O(1).
"""

def __init__(self):
super().__init__()

self.parent_corpus: Optional[Corpus] = None

self.subcorpora: Dict[str, Corpus] = {}
self.recordings: Dict[str, RecordingV2] = {}

def segments(self) -> Iterable[Segment]:
"""
:return: an iterator over all segments within the corpus
"""
for r in self.recordings:
yield from r.segments.values()
for sc in self.subcorpora:
yield from sc.segments()

def get_segment_by_name(self, name: str) -> Segment:
"""
:return: the segment specified by its name
"""
for seg in self.segments():
if seg.name == name:
return seg
assert False, f"Segment '{name}' was not found in corpus"

def get_segment_by_full_name(self, name: str) -> Optional[Segment]:
"""
:return: the segment specified by its full name
"""
if name == "":
# Found nothing.
return None

if name in self.segments:
return self.segments[name]
else:
subcorpus_name = name.split("/")[0]
segment_name_from_subcorpus = name[len(f"{subcorpus_name}/"):]
return self.subcorpora[subcorpus_name].get_segment_by_full_name(segment_name_from_subcorpus)

def get_recording_by_full_name(self, name: str) -> Optional[Segment]:
"""
:return: the segment specified by its full name
"""
if name == "":
# Found nothing.
return None

if name in self.segments:
return self.segments[name]
else:
subcorpus_name = name.split("/")[0]
segment_name_from_subcorpus = name[len(f"{subcorpus_name}/"):]
return self.subcorpora[subcorpus_name].get_segment_by_full_name(segment_name_from_subcorpus)

def all_recordings(self) -> Iterable[Recording]:
yield from self.recordings.values()
for sc in self.subcorpora.values():
yield from sc.all_recordings()

def all_speakers(self) -> Iterable[Speaker]:
yield from self.speakers.values()
for sc in self.subcorpora:
yield from sc.all_speakers()

def top_level_recordings(self) -> Iterable[Recording]:
yield from self.recordings.values()

def top_level_subcorpora(self) -> Iterable[Corpus]:
yield from self.subcorpora.values()

def top_level_speakers(self) -> Iterable[Speaker]:
yield from self.speakers.values()

def remove_recording(self, recording: Recording):
del self.recordings[recording.name]
for sc in self.subcorpora.values():
sc.remove_recording(recording)

def remove_recordings(self, recordings: List[Recording]):
for recording in recordings:
del self.recordings[recording.name]
for sc in self.subcorpora:
sc.remove_recordings(recordings)

def add_recording(self, recording: Recording):
assert isinstance(recording, Recording)
recording.corpus = self
self.recordings[recording.name] = recording

def add_subcorpus(self, corpus: Corpus):
assert isinstance(corpus, Corpus)
corpus.parent_corpus = self
self.subcorpora[corpus.name] = corpus

def add_speaker(self, speaker: Speaker):
assert isinstance(speaker, Speaker)
self.speakers[speaker.name] = speaker

def fullname(self) -> str:
if self.parent_corpus is not None:
return self.parent_corpus.fullname() + "/" + self.name
else:
return self.name

def filter_segments(self, filter_function: FilterFunction):
"""
filter all segments (including in subcorpora) using filter_function
:param filter_function: takes arguments corpus, recording and segment, returns True if segment should be kept
"""
for r in self.recordings.values():
r.segments = [s for s in r.segments.values() if filter_function(self, r, s)]
for sc in self.subcorpora:
sc.filter_segments(filter_function)

def _dump_internal(self, out: TextIO, indentation: str = ""):
if self.parent_corpus is None:
out.write('<corpus name="%s">\n' % self.name)
else:
out.write('%s<subcorpus name="%s">\n' % (indentation, self.name))

for s in self.speakers.values():
s.dump(out, indentation + " ")
if self.speaker_name is not None:
out.write('%s <speaker name="%s"/>\n' % (indentation, self.speaker_name))

for r in self.recordings.values():
r.dump(out, indentation + " ")

for sc in self.subcorpora.values():
sc._dump_internal(out, indentation + " ")

if self.parent_corpus is None:
out.write("</corpus>\n")
else:
out.write("%s</subcorpus>\n" % (indentation,))


class Recording(NamedEntity, CorpusSection):
def __init__(self):
super().__init__()
Expand Down Expand Up @@ -338,6 +486,20 @@ def add_segment(self, segment: Segment):
self.segments.append(segment)


class RecordingV2(Recording):
def __init__(self):
super().__init__()
self.audio: Optional[str] = None
self.corpus: Optional[Corpus] = None
self.segments: Dict[str, Segment] = {}

def add_segment(self, segment: Segment):
assert isinstance(segment, Segment)
segment.recording = self
self.segments[segment.name] = segment



class Segment(NamedEntity):
def __init__(self):
super().__init__()
Expand Down

0 comments on commit e12a275

Please sign in to comment.