Skip to content

Commit

Permalink
Move iter_motion to analyze.py
Browse files Browse the repository at this point in the history
  • Loading branch information
WyattBlue committed Jul 27, 2024
1 parent 3617b4a commit bfd5173
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 101 deletions.
118 changes: 66 additions & 52 deletions auto_editor/analyze.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@
from fractions import Fraction
from typing import TYPE_CHECKING

import av
import numpy as np
from av.subtitles.subtitle import AssSubtitle

from auto_editor import version
from auto_editor.utils.subtitle_tools import convert_ass_to_text
from auto_editor.wavfile import read

if TYPE_CHECKING:
from collections.abc import Iterator
from fractions import Fraction
from typing import Any

Expand Down Expand Up @@ -86,6 +89,53 @@ def obj_tag(tag: str, tb: Fraction, obj: dict[str, Any]) -> str:
return key


def iter_motion(src, tb, stream: int, blur: int, width: int) -> Iterator[float]:
container = av.open(src.path, "r")

video = container.streams.video[stream]
video.thread_type = "AUTO"

prev_frame = None
current_frame = None
total_pixels = src.videos[0].width * src.videos[0].height
index = 0
prev_index = -1

graph = av.filter.Graph()
graph.link_nodes(
graph.add_buffer(template=video),
graph.add("scale", f"{width}:-1"),
graph.add("format", "gray"),
graph.add("gblur", f"sigma={blur}"),
graph.add("buffersink"),
).configure()

for unframe in container.decode(video):
if unframe.pts is None:
continue

graph.push(unframe)
frame = graph.pull()
assert frame.time is not None
index = round(frame.time * tb)

current_frame = frame.to_ndarray()
if prev_frame is None:
value = 0.0
else:
# Use `int16` to avoid underflow with `uint8` datatype
diff = np.abs(prev_frame.astype(np.int16) - current_frame.astype(np.int16))
value = np.count_nonzero(diff) / total_pixels

for _ in range(index - prev_index):
yield value

prev_frame = current_frame
prev_index = index

container.close()


@dataclass(slots=True)
class Levels:
ensure: Ensure
Expand Down Expand Up @@ -114,8 +164,6 @@ def media_length(self) -> int:
return ticks

# If there's no audio, get length in video metadata.
import av

with av.open(f"{self.src.path}") as cn:
if len(cn.streams.video) < 1:
self.log.error("Could not get media duration")
Expand Down Expand Up @@ -232,9 +280,6 @@ def subtitle(
except re.error as e:
self.log.error(e)

import av
from av.subtitles.subtitle import AssSubtitle

try:
container = av.open(self.src.path, "r")
subtitle_stream = container.streams.subtitles[stream]
Expand Down Expand Up @@ -293,66 +338,35 @@ def subtitle(
return result

def motion(self, stream: int, blur: int, width: int) -> NDArray[np.float64]:
import av

if stream >= len(self.src.videos):
raise LevelError(f"motion: video stream '{stream}' does not exist.")

mobj = {"stream": stream, "width": width, "blur": blur}
if (arr := self.read_cache("motion", mobj)) is not None:
return arr

container = av.open(f"{self.src.path}", "r")

video = container.streams.video[stream]
video.thread_type = "AUTO"
with av.open(self.src.path, "r") as container:
video = container.streams.video[stream]
inaccurate_dur = (
1024
if video.duration is None or video.time_base is None
else int(video.duration * video.time_base * self.tb)
)

inaccurate_dur = 1 if video.duration is None else video.duration
self.bar.start(inaccurate_dur, "Analyzing motion")
bar = self.bar
bar.start(inaccurate_dur, "Analyzing motion")

prev_frame = None
current_frame = None
total_pixels = self.src.videos[0].width * self.src.videos[0].height
threshold_list = np.zeros((inaccurate_dur), dtype=np.float64)
index = 0

graph = av.filter.Graph()
graph.link_nodes(
graph.add_buffer(template=video),
graph.add("scale", f"{width}:-1"),
graph.add("format", "gray"),
graph.add("gblur", f"sigma={blur}"),
graph.add("buffersink"),
).configure()

threshold_list = np.zeros((1024), dtype=np.float64)

for unframe in container.decode(video):
graph.push(unframe)
frame = graph.pull()

# Showing progress ...
assert frame.time is not None
index = int(frame.time * self.tb)
if frame.pts is not None:
self.bar.tick(frame.pts)

current_frame = frame.to_ndarray()

for value in iter_motion(self.src, self.tb, stream, blur, width):
if index > len(threshold_list) - 1:
threshold_list = np.concatenate(
(threshold_list, np.zeros((len(threshold_list)), dtype=np.float64)),
axis=0,
)

if prev_frame is not None:
# Use `int16` to avoid underflow with `uint8` datatype
diff = np.abs(
prev_frame.astype(np.int16) - current_frame.astype(np.int16)
(threshold_list, np.zeros((len(threshold_list)), dtype=np.float64))
)
threshold_list[index] = np.count_nonzero(diff) / total_pixels
threshold_list[index] = value
bar.tick(index)
index += 1

prev_frame = current_frame

self.bar.end()
container.close()
bar.end()
return self.cache("motion", mobj, threshold_list[:index])
50 changes: 1 addition & 49 deletions auto_editor/subcommands/levels.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import numpy as np
from av.audio.fifo import AudioFifo

from auto_editor.analyze import LevelError, Levels
from auto_editor.analyze import LevelError, Levels, iter_motion
from auto_editor.ffwrapper import FFmpeg, initFileInfo
from auto_editor.lang.palet import env
from auto_editor.lib.contracts import is_bool, is_nat, is_nat1, is_str, is_void, orc
Expand Down Expand Up @@ -130,53 +130,6 @@ def iter_audio(src, tb: Fraction, stream: int = 0) -> Iterator[float]:
container.close()


def iter_motion(src, tb, stream: int, blur: int, width: int) -> Iterator[float]:
container = av.open(src.path, "r")

video = container.streams.video[stream]
video.thread_type = "AUTO"

prev_frame = None
current_frame = None
total_pixels = src.videos[0].width * src.videos[0].height
index = 0
prev_index = 0

graph = av.filter.Graph()
graph.link_nodes(
graph.add_buffer(template=video),
graph.add("scale", f"{width}:-1"),
graph.add("format", "gray"),
graph.add("gblur", f"sigma={blur}"),
graph.add("buffersink"),
).configure()

for unframe in container.decode(video):
if unframe.pts is None:
continue

graph.push(unframe)
frame = graph.pull()
assert frame.time is not None
index = round(frame.time * tb)

current_frame = frame.to_ndarray()
if prev_frame is None:
value = 0.0
else:
# Use `int16` to avoid underflow with `uint8` datatype
diff = np.abs(prev_frame.astype(np.int16) - current_frame.astype(np.int16))
value = np.count_nonzero(diff) / total_pixels

for _ in range(index - prev_index):
yield value

prev_frame = current_frame
prev_index = index

container.close()


def main(sys_args: list[str] = sys.argv[1:]) -> None:
parser = levels_options(ArgumentParser("levels"))
args = parser.parse_args(LevelArgs, sys_args)
Expand Down Expand Up @@ -232,7 +185,6 @@ def main(sys_args: list[str] = sys.argv[1:]) -> None:
levels = Levels(ensure, src, tb, bar, temp, log)
try:
if method == "audio":
# print_arr(levels.audio(**obj))
print_arr_gen(iter_audio(src, tb, **obj))
elif method == "motion":
print_arr_gen(iter_motion(src, tb, **obj))
Expand Down

0 comments on commit bfd5173

Please sign in to comment.