Skip to content

Commit

Permalink
Refactor get live snapshot (#549)
Browse files Browse the repository at this point in the history
* refactor get snapshot from recording

* delete old code

* upgrade deps
  • Loading branch information
gBillal authored Jan 10, 2025
1 parent acaffad commit b4a5a37
Show file tree
Hide file tree
Showing 18 changed files with 93 additions and 304 deletions.
24 changes: 12 additions & 12 deletions nerves_fw/mix.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion rtsp/mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"ex_rtcp": {:hex, :ex_rtcp, "0.4.0", "f9e515462a9581798ff6413583a25174cfd2101c94a2ebee871cca7639886f0a", [:mix], [], "hexpm", "28956602cf210d692fcdaf3f60ca49681634e1deb28ace41246aee61ee22dc3b"},
"ex_rtp": {:hex, :ex_rtp, "0.4.0", "1f1b5c1440a904706011e3afbb41741f5da309ce251cb986690ce9fd82636658", [:mix], [], "hexpm", "0f72d80d5953a62057270040f0f1ee6f955c08eeae82ac659c038001d7d5a790"},
"ex_sdp": {:hex, :ex_sdp, "1.1.1", "1a7b049491e5ec02dad9251c53d960835dc5631321ae978ec331831f3e4f6d5f", [:mix], [{:bunch, "~> 1.3", [hex: :bunch, repo: "hexpm", optional: false]}, {:elixir_uuid, "~> 1.2", [hex: :elixir_uuid, repo: "hexpm", optional: false]}], "hexpm", "1b13a72ac9c5c695b8824dbdffc671be8cbb4c0d1ccb4ff76a04a6826759f233"},
"file_system": {:hex, :file_system, "1.0.1", "79e8ceaddb0416f8b8cd02a0127bdbababe7bf4a23d2a395b983c1f8b3f73edd", [:mix], [], "hexpm", "4414d1f38863ddf9120720cd976fce5bdde8e91d8283353f0e31850fa89feb9e"},
"file_system": {:hex, :file_system, "1.1.0", "08d232062284546c6c34426997dd7ef6ec9f8bbd090eb91780283c9016840e8f", [:mix], [], "hexpm", "bfcf81244f416871f2a2e15c1b515287faa5db9c6bcf290222206d120b3d43f6"},
"ham": {:hex, :ham, "0.3.0", "7cd031b4a55fba219c11553e7b13ba73bd86eab4034518445eff1e038cb9a44d", [:mix], [], "hexpm", "7d6c6b73d7a6a83233876cc1b06a4d9b5de05562b228effda4532f9a49852bf6"},
"jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"},
"membrane_core": {:hex, :membrane_core, "1.1.2", "3ca206893e1d3739a24d5092d21c06fcb4db326733a1798f9788fc53abb74829", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:qex, "~> 0.3", [hex: :qex, repo: "hexpm", optional: false]}, {:ratio, "~> 3.0 or ~> 4.0", [hex: :ratio, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "a989fd7e0516a7e66f5fb63950b1027315b7f8c8d82d8d685e178b0fb780901b"},
Expand Down
9 changes: 3 additions & 6 deletions ui/config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,9 @@ if config_env() == :prod do
log_json? = System.get_env("EXNVR_JSON_LOGGER", "true") == "true"

if log_json? do
config :logger_json, :backend,
metadata: :all,
formatter: LoggerJSON.Formatters.BasicLogger,
on_init: :disabled

config :logger, level: :info, backends: [LoggerJSON]
config :logger, :default_handler,
level: :info,
formatter: {LoggerJSON.Formatters.Basic, []}
end

config :ex_nvr,
Expand Down
18 changes: 15 additions & 3 deletions ui/lib/ex_nvr/decoder.ex
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ defmodule ExNVR.Decoder do
end
end

@type t :: %__MODULE__{mod: module(), state: any()}
defstruct [:mod, :state]

@spec new(atom()) :: {:ok, {module(), decoder()}} | error()
def new(:h264) do
case H264.init() do
Expand All @@ -68,7 +71,16 @@ defmodule ExNVR.Decoder do
end
end

@spec new!(atom()) :: {module(), decoder()}
def new!(:h264), do: {H264, H264.init!()}
def new!(:h265), do: {H265, H265.init!()}
@spec new!(atom()) :: t()
def new!(:h264), do: %__MODULE__{mod: H264, state: H264.init!()}
def new!(:h265), do: %__MODULE__{mod: H265, state: H265.init!()}

@spec decode(t(), Buffer.t()) :: {:ok, [Buffer.t()]} | error()
def decode(%__MODULE__{mod: mod, state: state}, buffer), do: mod.decode(state, buffer)

@spec decode!(t(), Buffer.t()) :: [Buffer.t()]
def decode!(%__MODULE__{mod: mod, state: state}, buffer), do: mod.decode!(state, buffer)

@spec flush!(t()) :: [Buffer.t()]
def flush!(%__MODULE__{mod: mod, state: state}), do: mod.flush!(state)
end
37 changes: 21 additions & 16 deletions ui/lib/ex_nvr/elements/cvs_bufferer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ defmodule ExNVR.Elements.CVSBufferer do
decode it to get the latest snapshot
"""

use Membrane.Filter
use Membrane.Sink

require ExNVR.Utils

Expand All @@ -24,23 +24,26 @@ defmodule ExNVR.Elements.CVSBufferer do
),
availability: :always

def_output_pad :output,
flow_control: :auto,
accepted_format:
any_of(
%H264{alignment: :au},
%H265{alignment: :au}
),
availability: :on_request

@impl true
def handle_init(_ctx, _options) do
{[], %{cvs: [], stream_format: nil}}
{[], %{cvs: [], decoder: nil, width: 0, height: 0}}
end

@impl true
def handle_stream_format(:input, stream_format, _ctx, state) do
{[], %{state | stream_format: stream_format}}
codec =
case stream_format do
%H264{} -> :h264
%H265{} -> :h265
end

{[],
%{
state
| decoder: ExNVR.Decoder.new!(codec),
width: stream_format.width,
height: stream_format.height
}}
end

@impl true
Expand All @@ -54,12 +57,14 @@ defmodule ExNVR.Elements.CVSBufferer do
end

@impl true
def handle_pad_added(Pad.ref(:output, _ref) = pad, _ctx, state) do
actions =
def handle_parent_notification(:snapshot, _ctx, state) do
{:ok, snapshot} =
state.cvs
|> Enum.reverse()
|> Enum.map(&{:buffer, {pad, &1}})
|> ExNVR.MediaUtils.decode_last(state.decoder)
|> Map.get(:payload)
|> Turbojpeg.yuv_to_jpeg(state.width, state.height, 75, :I420)

{[stream_format: {pad, state.stream_format}] ++ actions ++ [end_of_stream: pad], state}
{[notify_parent: {:snapshot, snapshot}], state}
end
end
81 changes: 0 additions & 81 deletions ui/lib/ex_nvr/elements/snapshot_bin.ex

This file was deleted.

2 changes: 1 addition & 1 deletion ui/lib/ex_nvr/pipeline/output/socket.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ defmodule ExNVR.Pipeline.Output.Socket do
alias Membrane.{H264, H265}

def_options encoding: [
spec: ExNVR.Pipelines.encoding(),
spec: ExNVR.Pipelines.Main.encoding(),
description: "The video encoding"
]

Expand Down
2 changes: 1 addition & 1 deletion ui/lib/ex_nvr/pipeline/output/storage.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ defmodule ExNVR.Pipeline.Output.Storage do
require ExNVR.Utils
require Membrane.Logger

import __MODULE__.MediaUtils
import ExNVR.MediaUtils

alias ExMP4.Writer
alias ExNVR.Model.Device
Expand Down
5 changes: 2 additions & 3 deletions ui/lib/ex_nvr/pipeline/output/thumbnailer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,14 @@ defmodule ExNVR.Pipeline.Output.Thumbnailer do
out_height = div(state.thumbnail_width * format.height, format.width)
out_height = out_height - rem(out_height, 2)

{decoder, decoder_state} = Decoder.new!(codec)
decoder = Decoder.new!(codec)
scaler = Image.Scaler.new!(format.width, format.height, state.thumbnail_width, out_height)

{[],
%{
state
| thumbnail_height: out_height,
decoder: decoder,
decoder_state: decoder_state,
scaler: scaler
}}
end
Expand Down Expand Up @@ -99,7 +98,7 @@ defmodule ExNVR.Pipeline.Output.Thumbnailer do
end

defp decode(state, buffer) do
with {:ok, []} <- state.decoder.decode(state.decoder_state, buffer) do
with {:ok, []} <- Decoder.decode(state.decoder, buffer) do
state.decoder.flush(state.decoder_state)
end
end
Expand Down
22 changes: 4 additions & 18 deletions ui/lib/ex_nvr/pipelines/main.ex
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,7 @@ defmodule ExNVR.Pipelines.Main do
child(:hls_sink, %Output.HLS{
location: Path.join(Utils.hls_dir(device.id), "live"),
segment_name_prefix: "live"
}),
child(:snapshooter, ExNVR.Elements.SnapshotBin)
})
] ++ build_device_spec(device)

# Set device state and make last active run inactive
Expand Down Expand Up @@ -252,10 +251,10 @@ defmodule ExNVR.Pipelines.Main do
end

@impl true
def handle_call({:live_snapshot, image_format}, ctx, state) do
def handle_call({:live_snapshot, _image_format}, ctx, state) do
case state.live_snapshot_waiting_pids do
[] ->
{[spec: link_live_snapshot_elements(state, image_format)],
{[notify_child: {{:snapshooter, :main_stream}, :snapshot}],
%{state | live_snapshot_waiting_pids: [ctx.from]}}

pids ->
Expand Down Expand Up @@ -338,7 +337,7 @@ defmodule ExNVR.Pipelines.Main do
|> get_child(:hls_sink),
get_child(:tee)
|> via_out(:video_output)
|> child({:cvs_bufferer, :main_stream}, ExNVR.Elements.CVSBufferer),
|> child({:snapshooter, :main_stream}, ExNVR.Elements.CVSBufferer),
get_child(:tee)
|> via_out(:video_output)
|> child({:stats_reporter, :main_stream}, %VideoStreamStatReporter{
Expand Down Expand Up @@ -410,19 +409,6 @@ defmodule ExNVR.Pipelines.Main do
end
end

defp link_live_snapshot_elements(state, image_format) do
ref = make_ref()

[
get_child({:cvs_bufferer, :main_stream})
|> via_out(Pad.ref(:output, ref))
|> via_in(Pad.ref(:input, ref),
options: [format: image_format, encoding: state.main_stream_video_track.encoding]
)
|> get_child(:snapshooter)
]
end

defp maybe_update_device_and_report(%{device: %{state: device_state}} = state, device_state),
do: state

Expand Down
55 changes: 0 additions & 55 deletions ui/lib/ex_nvr/pipelines/snapshot.ex

This file was deleted.

14 changes: 4 additions & 10 deletions ui/lib/ex_nvr/recordings.ex
Original file line number Diff line number Diff line change
Expand Up @@ -109,19 +109,13 @@ defmodule ExNVR.Recordings do
method = Keyword.get(opts, :method, :before)

with {:ok, reader} <- ExMP4.Reader.new(path) do
track = ExMP4.Reader.tracks(reader) |> Enum.find(&(&1.type == :video))
track = ExMP4.Reader.track(reader, :video)
offset = ExMP4.Helper.timescalify(offset, :microsecond, track.timescale)
samples = read_samples(reader, track, offset, method)

{decoder, decoder_state} = ExNVR.Decoder.new!(track.media)

samples
reader
|> read_samples(track, offset, method)
|> Stream.map(&%Membrane.Buffer{payload: &1.payload, dts: &1.dts, pts: &1.pts})
|> Stream.map(&decoder.decode!(decoder_state, &1))
|> Enum.to_list()
|> Kernel.++(decoder.flush!(decoder_state))
|> List.flatten()
|> List.last()
|> ExNVR.MediaUtils.decode_last(ExNVR.Decoder.new!(track.media))
|> then(fn buffer ->
datetime =
DateTime.add(
Expand Down
Loading

0 comments on commit b4a5a37

Please sign in to comment.