Skip to content

Commit

Permalink
Turn FileInfo into a dataclass
Browse files Browse the repository at this point in the history
  • Loading branch information
WyattBlue committed Nov 3, 2023
1 parent 88d348a commit 43dae5a
Show file tree
Hide file tree
Showing 12 changed files with 181 additions and 181 deletions.
4 changes: 2 additions & 2 deletions auto_editor/edit.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import os
from typing import Any

from auto_editor.ffwrapper import FFmpeg, FileInfo
from auto_editor.ffwrapper import FFmpeg, FileInfo, initFileInfo
from auto_editor.lib.contracts import is_int, is_str
from auto_editor.make_layers import make_timeline
from auto_editor.output import Ensure, mux_quality_media
Expand Down Expand Up @@ -129,7 +129,7 @@ def make_sources(
if path in used_paths:
inputs.append(used_paths[path])
else:
sources[str(i)] = FileInfo(path, ffmpeg, log, str(i))
sources[str(i)] = initFileInfo(path, ffmpeg, log, str(i))
inputs.append(i)
used_paths[path] = i
i += 1
Expand Down
312 changes: 156 additions & 156 deletions auto_editor/ffwrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@
from auto_editor.utils.func import get_stdout
from auto_editor.utils.log import Log

IMG_CODECS = ("png", "mjpeg", "webp")
SUB_EXTS = {"mov_text": "srt", "ass": "ass", "webvtt": "vtt"}


class FFmpeg:
__slots__ = ("debug", "show_cmd", "path", "version")
Expand Down Expand Up @@ -166,17 +163,16 @@ class SubtitleStream:
lang: str | None


@dataclass(slots=True)
class FileInfo:
__slots__ = (
"path",
"bitrate",
"duration",
"description",
"videos",
"audios",
"subtitles",
"label",
)
path: Path
bitrate: str | None
duration: float
description: str | None
videos: tuple[VideoStream, ...]
audios: tuple[AudioStream, ...]
subtitles: tuple[SubtitleStream, ...]
label: str

def get_res(self) -> tuple[int, int]:
if self.videos:
Expand All @@ -193,152 +189,156 @@ def get_sr(self) -> int:
return self.audios[0].samplerate
return 48000

def __init__(self, path: str, ffmpeg: FFmpeg, log: Log, label: str = ""):
self.label = label
self.path = Path(path)
self.videos: list[VideoStream] = []
self.audios: list[AudioStream] = []
self.subtitles: list[SubtitleStream] = []
self.description = None
self.duration = 0.0

_dir = os.path.dirname(ffmpeg.path)
_ext = os.path.splitext(ffmpeg.path)[1]
ffprobe = os.path.join(_dir, f"ffprobe{_ext}")

def initFileInfo(path: str, ffmpeg: FFmpeg, log: Log, label: str = "") -> FileInfo:
videos: list[VideoStream] = []
audios: list[AudioStream] = []
subtitles: list[SubtitleStream] = []
description = None
duration = 0.0

_dir = os.path.dirname(ffmpeg.path)
_ext = os.path.splitext(ffmpeg.path)[1]
ffprobe = os.path.join(_dir, f"ffprobe{_ext}")

try:
info = get_stdout(
[
ffprobe,
"-v",
"-8",
"-print_format",
"json",
"-show_streams",
"-show_format",
path,
]
)
except FileNotFoundError:
log.nofile(ffprobe)
except Exception as e:
log.error(e)

@overload
def get_attr(name: str, dic: dict[str, str], op: Literal[True]) -> str | None:
...

@overload
def get_attr(name: str, dic: dict[str, str]) -> str:
...

def get_attr(name: str, dic: dict[str, str], op: bool = False) -> str | None:
if name in dic:
if isinstance(dic[name], str):
return dic[name]
log.error(f"'{name}' must be a string")
if not op:
log.error(f"'{name}' must be in ffprobe json")
return None

try:
json_info = Parser(Lexer("ffprobe", info)).expr()
if not isinstance(json_info, dict):
raise MyError("Expected Object")
if "streams" not in json_info:
raise MyError("Key 'streams' not found")
if "format" not in json_info:
raise MyError("Key 'format' not found")
except MyError as e:
log.error(f"{path}: Could not read ffprobe JSON\n{e}")

bitrate: str | None = None
if "bit_rate" in json_info["format"]:
bitrate = json_info["format"]["bit_rate"]
if "tags" in json_info["format"] and "description" in json_info["format"]["tags"]:
description = json_info["format"]["tags"]["description"]

if "duration" in json_info["format"]:
try:
info = get_stdout(
[
ffprobe,
"-v",
"-8",
"-print_format",
"json",
"-show_streams",
"-show_format",
path,
]
)
except FileNotFoundError:
log.nofile(ffprobe)
except Exception as e:
log.error(e)

@overload
def get_attr(name: str, dic: dict[str, str], op: Literal[True]) -> str | None:
...

@overload
def get_attr(name: str, dic: dict[str, str]) -> str:
...

def get_attr(name: str, dic: dict[str, str], op: bool = False) -> str | None:
if name in dic:
if isinstance(dic[name], str):
return dic[name]
log.error(f"'{name}' must be a string")
if not op:
log.error(f"'{name}' must be in ffprobe json")
return None
duration = float(json_info["format"]["duration"])
except Exception:
pass

img_codecs = ("png", "mjpeg", "webp")

for stream in json_info["streams"]:
lang = None
br = None
if "tags" in stream and "language" in stream["tags"]:
lang = stream["tags"]["language"]
if "bit_rate" in stream:
br = stream["bit_rate"]

codec_type = get_attr("codec_type", stream)

if codec_type in ("video", "audio", "subtitle"):
codec = get_attr("codec_name", stream)

if codec_type == "video":
pix_fmt = get_attr("pix_fmt", stream)
vduration = get_attr("duration", stream, True)
color_range = get_attr("color_range", stream, True)
color_space = get_attr("color_space", stream, True)
color_primaries = get_attr("color_primaries", stream, True)
color_transfer = get_attr("color_transfer", stream, True)
sar = get_attr("sample_aspect_ratio", stream, True)
fps_str = get_attr("r_frame_rate", stream)
time_base_str = get_attr("time_base", stream)

try:
json_info = Parser(Lexer("ffprobe", info)).expr()
if not isinstance(json_info, dict):
raise MyError("Expected Object")
if "streams" not in json_info:
raise MyError("Key 'streams' not found")
if "format" not in json_info:
raise MyError("Key 'format' not found")
except MyError as e:
log.error(f"{path}: Could not read ffprobe JSON\n{e}")

self.bitrate: str | None = None
if "bit_rate" in json_info["format"]:
self.bitrate = json_info["format"]["bit_rate"]
if (
"tags" in json_info["format"]
and "description" in json_info["format"]["tags"]
):
self.description = json_info["format"]["tags"]["description"]

if "duration" in json_info["format"]:
try:
self.duration = float(json_info["format"]["duration"])
except Exception:
pass

for stream in json_info["streams"]:
lang = None
br = None
if "tags" in stream and "language" in stream["tags"]:
lang = stream["tags"]["language"]
if "bit_rate" in stream:
br = stream["bit_rate"]

codec_type = get_attr("codec_type", stream)

if codec_type in ("video", "audio", "subtitle"):
codec = get_attr("codec_name", stream)

if codec_type == "video":
pix_fmt = get_attr("pix_fmt", stream)
vduration = get_attr("duration", stream, True)
color_range = get_attr("color_range", stream, True)
color_space = get_attr("color_space", stream, True)
color_primaries = get_attr("color_primaries", stream, True)
color_transfer = get_attr("color_transfer", stream, True)
sar = get_attr("sample_aspect_ratio", stream, True)
fps_str = get_attr("r_frame_rate", stream)
time_base_str = get_attr("time_base", stream)

try:
fps = Fraction(fps_str)
except ZeroDivisionError:
fps = Fraction(0)
except ValueError:
log.error(f"Could not convert fps '{fps_str}' to Fraction.")

if fps < 1 and codec in IMG_CODECS:
fps = Fraction(25)
if fps == 0:
fps = Fraction(30)

try:
time_base = Fraction(time_base_str)
except (ValueError, ZeroDivisionError):
if codec not in IMG_CODECS:
log.error(
f"Could not convert time_base '{time_base_str}' to Fraction."
)
time_base = Fraction(0)

self.videos.append(
VideoStream(
stream["width"],
stream["height"],
codec,
fps,
vduration,
sar,
time_base,
pix_fmt,
color_range,
color_space,
color_primaries,
color_transfer,
br,
lang,
fps = Fraction(fps_str)
except ZeroDivisionError:
fps = Fraction(0)
except ValueError:
log.error(f"Could not convert fps '{fps_str}' to Fraction.")

if fps < 1 and codec in img_codecs:
fps = Fraction(25)
if fps == 0:
fps = Fraction(30)

try:
time_base = Fraction(time_base_str)
except (ValueError, ZeroDivisionError):
if codec not in img_codecs:
log.error(
f"Could not convert time_base '{time_base_str}' to Fraction."
)
time_base = Fraction(0)

videos.append(
VideoStream(
stream["width"],
stream["height"],
codec,
fps,
vduration,
sar,
time_base,
pix_fmt,
color_range,
color_space,
color_primaries,
color_transfer,
br,
lang,
)
if codec_type == "audio":
sr = int(stream["sample_rate"])

if "channels" not in stream:
log.error("audio stream has no 'channels' data")
c = stream["channels"]

adur = get_attr("duration", stream, True)
self.audios.append(AudioStream(codec, sr, c, adur, br, lang))
if codec_type == "subtitle":
ext = SUB_EXTS.get(codec, "vtt")
self.subtitles.append(SubtitleStream(codec, ext, lang))
)
if codec_type == "audio":
sr = int(stream["sample_rate"])

if "channels" not in stream:
log.error("audio stream has no 'channels' data")
c = stream["channels"]

adur = get_attr("duration", stream, True)
audios.append(AudioStream(codec, sr, c, adur, br, lang))
if codec_type == "subtitle":
sub_exts = {"mov_text": "srt", "ass": "ass", "webvtt": "vtt"}
ext = sub_exts.get(codec, "vtt")
subtitles.append(SubtitleStream(codec, ext, lang))

v = tuple(videos)
a = tuple(audios)
s = tuple(subtitles)
return FileInfo(Path(path), bitrate, duration, description, v, a, s, label)
4 changes: 2 additions & 2 deletions auto_editor/formats/fcp11.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from typing import TYPE_CHECKING, Any, cast
from xml.etree.ElementTree import Element, ElementTree, SubElement, indent

from auto_editor.ffwrapper import FFmpeg, FileInfo
from auto_editor.ffwrapper import FFmpeg, FileInfo, initFileInfo

from .utils import make_tracks_dir

Expand Down Expand Up @@ -80,7 +80,7 @@ def fraction(val: int) -> str:
ffmpeg.run(
["-i", f"{src.path.resolve()}", "-map", f"0:a:{i}", f"{newtrack}"]
)
all_srcs.append(FileInfo(f"{newtrack}", ffmpeg, log))
all_srcs.append(initFileInfo(f"{newtrack}", ffmpeg, log))
all_refs.append(f"r{(i + 1) * 2}")

fcpxml = Element("fcpxml", version="1.10" if flavor == "resolve" else "1.11")
Expand Down
Loading

0 comments on commit 43dae5a

Please sign in to comment.