diff --git a/lib/asciinema/streaming/live_stream_server.ex b/lib/asciinema/streaming/live_stream_server.ex index 297fa7651..f45290454 100644 --- a/lib/asciinema/streaming/live_stream_server.ex +++ b/lib/asciinema/streaming/live_stream_server.ex @@ -20,7 +20,7 @@ defmodule Asciinema.Streaming.LiveStreamServer do end def reset(stream_id, {_, _} = vt_size, vt_init \\ nil, stream_time \\ nil, theme \\ nil) do - GenServer.call(via_tuple(stream_id), {:reset, vt_size, vt_init, stream_time, theme}) + GenServer.call(via_tuple(stream_id), {:reset, {vt_size, vt_init, stream_time, theme}}) end def feed(stream_id, event) do @@ -77,11 +77,8 @@ defmodule Asciinema.Streaming.LiveStreamServer do {:reply, :ok, %{state | producer: pid}} end - def handle_call( - {:reset, vt_size, vt_init, stream_time, theme}, - {pid, _} = _from, - %{producer: pid} = state - ) do + def handle_call({:reset, args}, {pid, _} = _from, %{producer: pid} = state) do + {vt_size, vt_init, stream_time, theme} = args stream_time = stream_time || 0.0 state = reset_stream(state, vt_size, stream_time, theme) @@ -98,13 +95,14 @@ defmodule Asciinema.Streaming.LiveStreamServer do {:reply, :ok, state} end - def handle_call({:reset, _vt_size, _vt_init, _stream_time, _theme}, _from, state) do + def handle_call({:reset, _args}, _from, state) do Logger.info("stream/#{state.stream_id}: rejecting reset from non-leader producer") {:reply, {:error, :leadership_lost}, state} end - def handle_call({:feed, {time, data} = event}, {pid, _} = _from, %{producer: pid} = state) do + def handle_call({:feed, event}, {pid, _} = _from, %{producer: pid} = state) do + {time, data} = event new_size = Vt.feed(state.vt, data) publish(state.stream_id, %Update{ diff --git a/lib/asciinema_web/live/live_stream_status_live.ex b/lib/asciinema_web/live/live_stream_status_live.ex index 3aa39ca9f..cb3b6c353 100644 --- a/lib/asciinema_web/live/live_stream_status_live.ex +++ b/lib/asciinema_web/live/live_stream_status_live.ex @@ -73,8 +73,9 @@ defmodule AsciinemaWeb.LiveStreamStatusLive do end end - def handle_info(%LiveStreamServer.Update{event: e, data: {_, _, time, _}}, socket) + def handle_info(%LiveStreamServer.Update{event: e} = update, socket) when e in [:info, :reset] do + {_, _, time, _} = update.data started_at = Timex.shift(Timex.now(), milliseconds: -round(time * 1000.0)) socket = diff --git a/lib/asciinema_web/live_stream_consumer_socket.ex b/lib/asciinema_web/live_stream_consumer_socket.ex index 4f34d412e..5e1f18068 100644 --- a/lib/asciinema_web/live_stream_consumer_socket.ex +++ b/lib/asciinema_web/live_stream_consumer_socket.ex @@ -44,16 +44,16 @@ defmodule AsciinemaWeb.LiveStreamConsumerSocket do @impl true def websocket_info(message, state) - def websocket_info(%LiveStreamServer.Update{event: e, data: data}, state) + def websocket_info(%LiveStreamServer.Update{event: e} = update, state) when e in [:info, :reset] do - {{cols, rows}, _, _, _} = data + {{cols, rows}, _, _, _} = update.data Logger.debug("consumer/#{state.stream_id}: reset (#{cols}x#{rows})") - {:reply, reset_message(data), %{state | reset: true}} + {:reply, reset_message(update.data), %{state | reset: true}} end - def websocket_info(%LiveStreamServer.Update{event: :feed, data: data}, %{reset: true} = state) do - {:reply, feed_message(data), state} + def websocket_info(%LiveStreamServer.Update{event: :feed} = update, %{reset: true} = state) do + {:reply, feed_message(update.data), state} end def websocket_info(%LiveStreamServer.Update{}, %{reset: false} = state) do diff --git a/lib/asciinema_web/live_stream_producer_socket.ex b/lib/asciinema_web/live_stream_producer_socket.ex index acdac48e1..4f27d6fc4 100644 --- a/lib/asciinema_web/live_stream_producer_socket.ex +++ b/lib/asciinema_web/live_stream_producer_socket.ex @@ -97,7 +97,8 @@ defmodule AsciinemaWeb.LiveStreamProducerSocket do def websocket_info(:parser_check, state), do: {:ok, state} - def websocket_info(:bucket_fill, %{bucket: bucket} = state) do + def websocket_info(:bucket_fill, state) do + bucket = state.bucket tokens = min(bucket.size, bucket.tokens + bucket.fill_amount) if tokens > bucket.tokens && tokens < bucket.size do @@ -180,8 +181,8 @@ defmodule AsciinemaWeb.LiveStreamProducerSocket do {:error, {:invalid_vt_size, size}} end - defp run_command({:feed, {time, data}}, %{status: :online} = state) do - with :ok <- LiveStreamServer.feed(state.stream_id, {time, data}) do + defp run_command({:feed, args}, %{status: :online} = state) do + with :ok <- LiveStreamServer.feed(state.stream_id, args) do {:ok, state} end end