diff --git a/auto_editor/analyze.py b/auto_editor/analyze.py index 890ef3225..d0dda3e67 100644 --- a/auto_editor/analyze.py +++ b/auto_editor/analyze.py @@ -4,15 +4,16 @@ import re from dataclasses import dataclass from fractions import Fraction +from math import ceil from typing import TYPE_CHECKING import av import numpy as np +from av.audio.fifo import AudioFifo from av.subtitles.subtitle import AssSubtitle from auto_editor import version from auto_editor.utils.subtitle_tools import convert_ass_to_text -from auto_editor.wavfile import read if TYPE_CHECKING: from collections.abc import Iterator @@ -22,7 +23,6 @@ from numpy.typing import NDArray from auto_editor.ffwrapper import FileInfo - from auto_editor.output import Ensure from auto_editor.utils.bar import Bar from auto_editor.utils.log import Log @@ -30,7 +30,6 @@ @dataclass(slots=True) class FileSetup: src: FileInfo - ensure: Ensure strict: bool tb: Fraction bar: Bar @@ -89,6 +88,41 @@ def obj_tag(tag: str, tb: Fraction, obj: dict[str, Any]) -> str: return key +def iter_audio(src, tb: Fraction, stream: int = 0) -> Iterator[float]: + fifo = AudioFifo() + try: + container = av.open(src.path, "r") + audio_stream = container.streams.audio[stream] + sample_rate = audio_stream.rate + + exact_size = (1 / tb) * sample_rate + accumulated_error = 0 + + # Resample so that audio data is between [-1, 1] + resampler = av.AudioResampler( + av.AudioFormat("flt"), audio_stream.layout, sample_rate + ) + + for frame in container.decode(audio=stream): + frame.pts = None # Skip time checks + + for reframe in resampler.resample(frame): + fifo.write(reframe) + + while fifo.samples >= ceil(exact_size): + size_with_error = exact_size + accumulated_error + current_size = round(size_with_error) + accumulated_error = size_with_error - current_size + + audio_chunk = fifo.read(current_size) + assert audio_chunk is not None + arr = audio_chunk.to_ndarray().flatten() + yield float(np.max(np.abs(arr))) + + finally: + container.close() + + def iter_motion(src, tb, stream: int, blur: int, width: int) -> Iterator[float]: container = av.open(src.path, "r") @@ -138,7 +172,6 @@ def iter_motion(src, tb, stream: int, blur: int, width: int) -> Iterator[float]: @dataclass(slots=True) class Levels: - ensure: Ensure src: FileInfo tb: Fraction bar: Bar @@ -151,24 +184,16 @@ def media_length(self) -> int: if (arr := self.read_cache("audio", {"stream": 0})) is not None: return len(arr) - sr, samples = read(self.ensure.audio(self.src, 0)) - samp_count = len(samples) - del samples - - samp_per_ticks = sr / self.tb - ticks = int(samp_count / samp_per_ticks) - self.log.debug(f"Audio Length: {ticks}") - self.log.debug( - f"... without rounding: {float(samp_count / samp_per_ticks)}" - ) - return ticks + result = sum(1 for _ in iter_audio(self.src, self.tb, 0)) + self.log.debug(f"Audio Length: {result}") + return result # If there's no audio, get length in video metadata. - with av.open(f"{self.src.path}") as cn: - if len(cn.streams.video) < 1: + with av.open(self.src.path) as container: + if len(container.streams.video) == 0: self.log.error("Could not get media duration") - video = cn.streams.video[0] + video = container.streams.video[0] if video.duration is None or video.time_base is None: dur = 0 @@ -213,56 +238,70 @@ def cache(self, tag: str, obj: dict[str, Any], arr: np.ndarray) -> np.ndarray: return arr def audio(self, stream: int) -> NDArray[np.float64]: - if stream > len(self.src.audios) - 1: + if stream >= len(self.src.audios): raise LevelError(f"audio: audio stream '{stream}' does not exist.") if (arr := self.read_cache("audio", {"stream": stream})) is not None: return arr - sr, samples = read(self.ensure.audio(self.src, stream)) - - if len(samples) == 0: - raise LevelError(f"audio: stream '{stream}' has no samples.") - - def get_max_volume(s: np.ndarray) -> float: - return max(float(np.max(s)), -float(np.min(s))) - - max_volume = get_max_volume(samples) - self.log.debug(f"Max volume: {max_volume}") + with av.open(self.src.path, "r") as container: + audio = container.streams.audio[stream] + if audio.duration is not None and audio.time_base is not None: + inaccurate_dur = int(audio.duration * audio.time_base * self.tb) + elif container.duration is not None: + inaccurate_dur = int(container.duration / av.time_base * self.tb) + else: + inaccurate_dur = 1024 - samp_count = samples.shape[0] - samp_per_ticks = sr / self.tb + bar = self.bar + bar.start(inaccurate_dur, "Analyzing audio volume") - if samp_per_ticks < 1: - self.log.error( - f"audio: stream '{stream}'\n Samplerate ({sr}) must be greater than " - f"or equal to timebase ({self.tb})\n" - " Try `-fps 30` and/or `--sample-rate 48000`" - ) + result = np.zeros((inaccurate_dur), dtype=np.float64) + index = 0 + for value in iter_audio(self.src, self.tb, stream): + if index > len(result) - 1: + result = np.concatenate( + (result, np.zeros((len(result)), dtype=np.float64)) + ) + result[index] = value + bar.tick(index) + index += 1 - audio_ticks = int(samp_count / samp_per_ticks) - self.log.debug( - f"analyze: audio length: {audio_ticks} ({float(samp_count / samp_per_ticks)})" - ) - self.bar.start(audio_ticks, "Analyzing audio volume") + bar.end() + return self.cache("audio", {"stream": stream}, result[:index]) - threshold_list = np.zeros((audio_ticks), dtype=np.float64) + def motion(self, stream: int, blur: int, width: int) -> NDArray[np.float64]: + if stream >= len(self.src.videos): + raise LevelError(f"motion: video stream '{stream}' does not exist.") - if max_volume == 0: # Prevent dividing by zero - return threshold_list + mobj = {"stream": stream, "width": width, "blur": blur} + if (arr := self.read_cache("motion", mobj)) is not None: + return arr - # Determine when audio is silent or loud. - for i in range(audio_ticks): - if i % 500 == 0: - self.bar.tick(i) + with av.open(self.src.path, "r") as container: + video = container.streams.video[stream] + inaccurate_dur = ( + 1024 + if video.duration is None or video.time_base is None + else int(video.duration * video.time_base * self.tb) + ) - start = int(i * samp_per_ticks) - end = min(int((i + 1) * samp_per_ticks), samp_count) + bar = self.bar + bar.start(inaccurate_dur, "Analyzing motion") - threshold_list[i] = get_max_volume(samples[start:end]) / max_volume + result = np.zeros((inaccurate_dur), dtype=np.float64) + index = 0 + for value in iter_motion(self.src, self.tb, stream, blur, width): + if index > len(result) - 1: + result = np.concatenate( + (result, np.zeros((len(result)), dtype=np.float64)) + ) + result[index] = value + bar.tick(index) + index += 1 - self.bar.end() - return self.cache("audio", {"stream": stream}, threshold_list) + bar.end() + return self.cache("motion", mobj, result[:index]) def subtitle( self, @@ -336,37 +375,3 @@ def subtitle( container.close() return result - - def motion(self, stream: int, blur: int, width: int) -> NDArray[np.float64]: - if stream >= len(self.src.videos): - raise LevelError(f"motion: video stream '{stream}' does not exist.") - - mobj = {"stream": stream, "width": width, "blur": blur} - if (arr := self.read_cache("motion", mobj)) is not None: - return arr - - with av.open(self.src.path, "r") as container: - video = container.streams.video[stream] - inaccurate_dur = ( - 1024 - if video.duration is None or video.time_base is None - else int(video.duration * video.time_base * self.tb) - ) - - bar = self.bar - bar.start(inaccurate_dur, "Analyzing motion") - - threshold_list = np.zeros((inaccurate_dur), dtype=np.float64) - index = 0 - - for value in iter_motion(self.src, self.tb, stream, blur, width): - if index > len(threshold_list) - 1: - threshold_list = np.concatenate( - (threshold_list, np.zeros((len(threshold_list)), dtype=np.float64)) - ) - threshold_list[index] = value - bar.tick(index) - index += 1 - - bar.end() - return self.cache("motion", mobj, threshold_list[:index]) diff --git a/auto_editor/edit.py b/auto_editor/edit.py index f8d976f3c..e7723465e 100644 --- a/auto_editor/edit.py +++ b/auto_editor/edit.py @@ -202,10 +202,8 @@ def edit_media( else: samplerate = args.sample_rate - ensure = Ensure(ffmpeg, bar, samplerate, temp, log) - if tl is None: - tl = make_timeline(sources, ensure, args, samplerate, bar, temp, log) + tl = make_timeline(sources, args, samplerate, bar, temp, log) if export["export"] == "timeline": from auto_editor.formats.json import make_json_timeline @@ -216,7 +214,7 @@ def edit_media( if args.preview: from auto_editor.preview import preview - preview(ensure, tl, temp, log) + preview(tl, temp, log) return if export["export"] == "json": @@ -265,6 +263,8 @@ def make_media(tl: v3, output: str) -> None: sub_output = [] apply_later = False + ensure = Ensure(ffmpeg, bar, samplerate, temp, log) + if ctr.default_sub != "none" and not args.sn: sub_output = make_new_subtitles(tl, ensure, temp) diff --git a/auto_editor/make_layers.py b/auto_editor/make_layers.py index 0ca6f0419..ed0c51fb9 100644 --- a/auto_editor/make_layers.py +++ b/auto_editor/make_layers.py @@ -18,7 +18,6 @@ if TYPE_CHECKING: from numpy.typing import NDArray - from auto_editor.output import Ensure from auto_editor.utils.bar import Bar from auto_editor.utils.chunks import Chunks from auto_editor.utils.log import Log @@ -75,7 +74,6 @@ def make_av(src: FileInfo, all_clips: list[list[Clip]]) -> tuple[VSpace, ASpace] def run_interpreter_for_edit_option( text: str, filesetup: FileSetup ) -> NDArray[np.bool_]: - ensure = filesetup.ensure src = filesetup.src tb = filesetup.tb bar = filesetup.bar @@ -87,8 +85,8 @@ def run_interpreter_for_edit_option( if log.is_debug: log.debug(f"edit: {parser}") - env["timebase"] = filesetup.tb - env["@levels"] = Levels(ensure, src, tb, bar, temp, log) + env["timebase"] = tb + env["@levels"] = Levels(src, tb, bar, temp, log) env["@filesetup"] = filesetup results = interpret(env, parser) @@ -139,7 +137,6 @@ def parse_time(val: str, arr: NDArray, tb: Fraction) -> int: # raises: `CoerceE def make_timeline( sources: list[FileInfo], - ensure: Ensure, args: Args, sr: int, bar: Bar, @@ -169,7 +166,7 @@ def make_timeline( concat = np.concatenate for i, src in enumerate(sources): - filesetup = FileSetup(src, ensure, len(sources) < 2, tb, bar, temp, log) + filesetup = FileSetup(src, len(sources) < 2, tb, bar, temp, log) edit_result = run_interpreter_for_edit_option(method, filesetup) mut_margin(edit_result, start_margin, end_margin) diff --git a/auto_editor/preview.py b/auto_editor/preview.py index 7b922a39e..56613ffaf 100644 --- a/auto_editor/preview.py +++ b/auto_editor/preview.py @@ -6,7 +6,6 @@ from typing import TextIO from auto_editor.analyze import Levels -from auto_editor.output import Ensure from auto_editor.timeline import v3 from auto_editor.utils.bar import Bar from auto_editor.utils.func import to_timecode @@ -49,7 +48,7 @@ def all_cuts(tl: v3, in_len: int) -> list[int]: return cut_lens -def preview(ensure: Ensure, tl: v3, temp: str, log: Log) -> None: +def preview(tl: v3, temp: str, log: Log) -> None: log.conwrite("") tb = tl.tb @@ -66,7 +65,7 @@ def preview(ensure: Ensure, tl: v3, temp: str, log: Log) -> None: in_len = 0 for src in all_sources: - in_len += Levels(ensure, src, tb, Bar("none"), temp, log).media_length + in_len += Levels(src, tb, Bar("none"), temp, log).media_length out_len = tl.out_len() diff --git a/auto_editor/render/subtitle.py b/auto_editor/render/subtitle.py index f84fa860c..33ecc7034 100644 --- a/auto_editor/render/subtitle.py +++ b/auto_editor/render/subtitle.py @@ -49,7 +49,7 @@ def parse(self, text: str, codec: str) -> None: self.codec = codec self.contents = [] - if codec == "ass": + if codec == "ass" or codec == "ssa": time_code = re.compile(r"(.*)(\d+:\d+:[\d.]+)(.*)(\d+:\d+:[\d.]+)(.*)") elif codec == "webvtt": time_code = re.compile(r"()(\d+:[\d.]+)( --> )(\d+:[\d.]+)(\n.*)") diff --git a/auto_editor/subcommands/levels.py b/auto_editor/subcommands/levels.py index dfcd1ec81..44514c87d 100644 --- a/auto_editor/subcommands/levels.py +++ b/auto_editor/subcommands/levels.py @@ -1,20 +1,16 @@ from __future__ import annotations -import math import sys from dataclasses import dataclass, field from fractions import Fraction from typing import TYPE_CHECKING -import av import numpy as np -from av.audio.fifo import AudioFifo -from auto_editor.analyze import LevelError, Levels, iter_motion -from auto_editor.ffwrapper import FFmpeg, initFileInfo +from auto_editor.analyze import LevelError, Levels, iter_audio, iter_motion +from auto_editor.ffwrapper import initFileInfo from auto_editor.lang.palet import env from auto_editor.lib.contracts import is_bool, is_nat, is_nat1, is_str, is_void, orc -from auto_editor.output import Ensure from auto_editor.utils.bar import Bar from auto_editor.utils.cmdkw import ( ParserError, @@ -40,8 +36,6 @@ class LevelArgs: input: list[str] = field(default_factory=list) edit: str = "audio" timebase: Fraction | None = None - ffmpeg_location: str | None = None - my_ffmpeg: bool = False help: bool = False @@ -59,12 +53,6 @@ def levels_options(parser: ArgumentParser) -> ArgumentParser: type=frame_rate, help="Set custom timebase", ) - parser.add_argument("--ffmpeg-location", help="Point to your custom ffmpeg file") - parser.add_argument( - "--my-ffmpeg", - flag=True, - help="Use the ffmpeg on your PATH instead of the one packaged", - ) return parser @@ -95,47 +83,10 @@ def print_arr_gen(arr: Iterator[int | float]) -> None: print("") -def iter_audio(src, tb: Fraction, stream: int = 0) -> Iterator[float]: - fifo = AudioFifo() - try: - container = av.open(src.path, "r") - audio_stream = container.streams.audio[stream] - sample_rate = audio_stream.rate - - exact_size = (1 / tb) * sample_rate - accumulated_error = 0 - - # Resample so that audio data is between [-1, 1] - resampler = av.AudioResampler( - av.AudioFormat("flt"), audio_stream.layout, sample_rate - ) - - for frame in container.decode(audio=stream): - frame.pts = None # Skip time checks - - for reframe in resampler.resample(frame): - fifo.write(reframe) - - while fifo.samples >= math.ceil(exact_size): - size_with_error = exact_size + accumulated_error - current_size = round(size_with_error) - accumulated_error = size_with_error - current_size - - audio_chunk = fifo.read(current_size) - assert audio_chunk is not None - arr = audio_chunk.to_ndarray().flatten() - yield float(np.max(np.abs(arr))) - - finally: - container.close() - - def main(sys_args: list[str] = sys.argv[1:]) -> None: parser = levels_options(ArgumentParser("levels")) args = parser.parse_args(LevelArgs, sys_args) - ffmpeg = FFmpeg(args.ffmpeg_location, args.my_ffmpeg) - bar = Bar("none") temp = setup_tempdir(None, Log()) log = Log(quiet=True, temp=temp) @@ -147,7 +98,6 @@ def main(sys_args: list[str] = sys.argv[1:]) -> None: src = sources[0] tb = src.get_fps() if args.timebase is None else args.timebase - ensure = Ensure(ffmpeg, bar, src.get_sr(), temp, log) if ":" in args.edit: method, attrs = args.edit.split(":", 1) @@ -182,7 +132,7 @@ def main(sys_args: list[str] = sys.argv[1:]) -> None: except ParserError as e: log.error(e) - levels = Levels(ensure, src, tb, bar, temp, log) + levels = Levels(src, tb, bar, temp, log) try: if method == "audio": print_arr_gen(iter_audio(src, tb, **obj)) diff --git a/auto_editor/subcommands/repl.py b/auto_editor/subcommands/repl.py index 2423cbb56..6db2d2506 100644 --- a/auto_editor/subcommands/repl.py +++ b/auto_editor/subcommands/repl.py @@ -6,11 +6,10 @@ import auto_editor from auto_editor.analyze import FileSetup, Levels -from auto_editor.ffwrapper import FFmpeg, initFileInfo +from auto_editor.ffwrapper import initFileInfo from auto_editor.lang.palet import ClosingError, Lexer, Parser, env, interpret from auto_editor.lib.data_structs import print_str from auto_editor.lib.err import MyError -from auto_editor.output import Ensure from auto_editor.utils.bar import Bar from auto_editor.utils.func import setup_tempdir from auto_editor.utils.log import Log @@ -48,12 +47,6 @@ def repl_options(parser: ArgumentParser) -> ArgumentParser: type=frame_rate, help="Set custom timebase", ) - parser.add_argument("--ffmpeg-location", help="Point to your custom ffmpeg file") - parser.add_argument( - "--my-ffmpeg", - flag=True, - help="Use the ffmpeg on your PATH instead of the one packaged", - ) parser.add_argument( "--temp-dir", metavar="PATH", @@ -68,16 +61,14 @@ def main(sys_args: list[str] = sys.argv[1:]) -> None: if args.input: temp = setup_tempdir(args.temp_dir, Log()) log = Log(quiet=True, temp=temp) - ffmpeg = FFmpeg(args.ffmpeg_location, args.my_ffmpeg, False) strict = len(args.input) < 2 sources = [initFileInfo(path, log) for path in args.input] src = sources[0] tb = src.get_fps() if args.timebase is None else args.timebase bar = Bar("modern") - ensure = Ensure(ffmpeg, bar, src.get_sr(), temp, log) env["timebase"] = tb - env["@levels"] = Levels(ensure, src, tb, bar, temp, log) - env["@filesetup"] = FileSetup(src, ensure, strict, tb, bar, temp, log) + env["@levels"] = Levels(src, tb, bar, temp, log) + env["@filesetup"] = FileSetup(src, strict, tb, bar, temp, log) print(f"Auto-Editor {auto_editor.version} ({auto_editor.__version__})") text = None