From ddceb75f051c1f4645c7ebc82b1f7a00a54ef6d1 Mon Sep 17 00:00:00 2001 From: Nick Rossenbach Date: Thu, 18 Jan 2024 11:19:50 +0900 Subject: [PATCH 1/3] Change BlissFfmpegJob threading - force ffmpeg to use 1 thread for input and output processing each - set higher number of default cpu cores - remote some tk.uncached_path calls --- audio/ffmpeg.py | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/audio/ffmpeg.py b/audio/ffmpeg.py index 8aa491aa..1cda5993 100644 --- a/audio/ffmpeg.py +++ b/audio/ffmpeg.py @@ -112,7 +112,8 @@ def __init__( if self.error_threshold > 0: self.out_failed_files = self.output_path("failed_files.txt") - self.rqmt = {"time": 4, "cpu": 4, "mem": 8} + # e.g. 1 core for python and 4x2 cores for ffmpeg, one for input processing and one for output processing + self.rqmt = {"time": 4, "cpu": 9, "mem": 8} def tasks(self): yield Task("run", rqmt=self.rqmt) @@ -124,11 +125,11 @@ def tasks(self): def run(self): c = corpus.Corpus() - c.load(tk.uncached_path(self.corpus_file)) + c.load(self.corpus_file.get_path()) from multiprocessing import pool - p = pool.Pool(self.rqmt["cpu"]) + p = pool.Pool(self.rqmt["cpu"] // 2) p.map(self._perform_ffmpeg, c.all_recordings()) for r in c.all_recordings(): @@ -138,7 +139,7 @@ def run(self): if self.recover_duration: c.dump("temp_corpus.xml.gz") else: - c.dump(tk.uncached_path(self.out_corpus)) + c.dump(self.out_corpus.get_path()) if self.out_failed_files is not None: with open(self.out_failed_files.get_path(), "wt") as out: @@ -195,13 +196,9 @@ def _perform_ffmpeg(self, recording: corpus.Recording): target = os.path.join(self.out_audio_folder.get_path(), audio_filename) if not os.path.exists(target): logging.info(f"try converting {target}") - command_head = [ - self.ffmpeg_binary, - "-hide_banner", - "-y", - ] + command_head = [self.ffmpeg_binary, "-hide_banner", "-y", "-threads", "1"] command_in = ["-i", recording.audio] - command_out = [target] + command_out = ["-threads", "1", target] in_options = self.ffmpeg_input_options or [] out_options = self.ffmpeg_options or [] command = command_head + in_options + command_in + out_options + command_out From 495fb8eb2c3b2c385fa57d7a301923ebbcaa6d3f Mon Sep 17 00:00:00 2001 From: Nick Rossenbach Date: Mon, 22 Jan 2024 16:13:18 +0900 Subject: [PATCH 2/3] add cpu_rqmt as parameter --- audio/ffmpeg.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/audio/ffmpeg.py b/audio/ffmpeg.py index 1cda5993..a7cb65ed 100644 --- a/audio/ffmpeg.py +++ b/audio/ffmpeg.py @@ -82,6 +82,7 @@ def __init__( hash_binary: bool = False, ffmpeg_input_options: Optional[List[str]] = None, error_threshold: int = 0, + cpu_rqmt: int = 9, ): """ @@ -94,6 +95,7 @@ def __init__( in which case the binary needs to be hashed :param ffmpeg_input_options: list of ffmpeg parameters thare are applied for reading the input files :param error_threshold: Allow upto this many files to fail conversion before failing this job + :param cpu_rqmt: number of cpu cores to use """ self.corpus_file = corpus_file self.ffmpeg_input_options = ffmpeg_input_options @@ -113,7 +115,7 @@ def __init__( self.out_failed_files = self.output_path("failed_files.txt") # e.g. 1 core for python and 4x2 cores for ffmpeg, one for input processing and one for output processing - self.rqmt = {"time": 4, "cpu": 9, "mem": 8} + self.rqmt = {"time": 4, "cpu": cpu_rqmt, "mem": 8} def tasks(self): yield Task("run", rqmt=self.rqmt) @@ -221,4 +223,5 @@ def hash(cls, kwargs): d.pop("ffmpeg_binary") if kwargs["error_threshold"] == 0: d.pop("error_threshold") + d.pop("cpu_rqmt") return super().hash(d) From c717767a451063b3e4008afa0a96c98018764d7b Mon Sep 17 00:00:00 2001 From: Nick Rossenbach Date: Mon, 22 Jan 2024 16:18:00 +0900 Subject: [PATCH 3/3] add cpu_rqmt also for encoding job --- audio/encoding.py | 3 +++ audio/ffmpeg.py | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/audio/encoding.py b/audio/encoding.py index 9b5b2f9b..b1910d5d 100644 --- a/audio/encoding.py +++ b/audio/encoding.py @@ -33,6 +33,7 @@ def __init__( input_codec: Optional[str] = None, input_codec_options: Optional[List[str]] = None, error_threshold: int = 0, + cpu_rqmt: int = 9, ): """ For all parameter holds that "None" means to use the ffmpeg defaults, which depend on the input file @@ -59,6 +60,7 @@ def __init__( :param input_codec: specify the codec of the input file :param input_codec_options: specify additional codec specific options for the in_codec :param error_threshold: Allow upto this many files to fail conversion before failing this job + :param cpu_rqmt: number of cpu cores to use """ ffmpeg_input_options = [] ffmpeg_options = [] @@ -107,4 +109,5 @@ def __init__( ffmpeg_binary=ffmpeg_binary, hash_binary=hash_binary, error_threshold=error_threshold, + cpu_rqmt=cpu_rqmt, ) diff --git a/audio/ffmpeg.py b/audio/ffmpeg.py index a7cb65ed..9809b8eb 100644 --- a/audio/ffmpeg.py +++ b/audio/ffmpeg.py @@ -82,7 +82,7 @@ def __init__( hash_binary: bool = False, ffmpeg_input_options: Optional[List[str]] = None, error_threshold: int = 0, - cpu_rqmt: int = 9, + cpu_rqmt: int = 5, ): """