From 622551efc753dac2b6167346f727080604066fae Mon Sep 17 00:00:00 2001 From: Daven Casia Date: Wed, 18 Sep 2024 14:25:22 -0600 Subject: [PATCH] Drop support for {:error, reason, event} Format --- lib/commanded/event/handler.ex | 66 ++----------------- .../event_handler_batch_telemetry_test.exs | 6 +- test/event/event_handler_batch_test.exs | 3 +- test/event/support/batch_handler.ex | 6 +- .../support/error_handling_batch_handler.ex | 32 +++++---- 5 files changed, 35 insertions(+), 78 deletions(-) diff --git a/lib/commanded/event/handler.ex b/lib/commanded/event/handler.ex index fa7ca61c..d8a45d35 100644 --- a/lib/commanded/event/handler.ex +++ b/lib/commanded/event/handler.ex @@ -521,7 +521,6 @@ defmodule Commanded.Event.Handler do :ok | {:ok, new_state :: any()} | {:error, reason :: any()} - | {:error, reason :: any(), domain_event} @doc """ Called when an event `handle/2` callback returns an error. @@ -601,7 +600,7 @@ defmodule Commanded.Event.Handler do """ @callback error( error :: term(), - failed_event :: domain_event | nil, + failed_event :: domain_event | [domain_event] | nil, failure_context :: FailureContext.t() ) :: {:retry, context :: map() | FailureContext.t()} @@ -1107,7 +1106,8 @@ defmodule Commanded.Event.Handler do defp handle_batch(events, context \\ %{}, handler) - defp handle_batch(events, context, %Handler{last_seen_event: last_seen_event} = state) when is_number(last_seen_event) do + defp handle_batch(events, context, %Handler{last_seen_event: last_seen_event} = state) + when is_number(last_seen_event) do %{event_number: last_event_number} = last_event = List.last(events) if last_event_number <= last_seen_event do @@ -1118,7 +1118,7 @@ defmodule Commanded.Event.Handler do confirm_receipt([last_event], state) else events - |> Enum.reject(& &1.event_number <= last_seen_event) + |> Enum.reject(&(&1.event_number <= last_seen_event)) |> do_handle_batch(context, state) end end @@ -1156,60 +1156,7 @@ defmodule Commanded.Event.Handler do failure_context = build_failure_context(nil, context, state) retry_fun = fn context, state -> handle_batch(events, context, state) end - # For now, we pass the last event so that we can acknowledge it if the error callback - # returns :skip. In the future, we should make handle_event_error/5 be able to accept - # a list events so that the error/3 callback can get the list of events - handle_event_error(error, List.last(events), failure_context, state, retry_fun) - - {:error, reason, event} -> - error = {:error, reason} - # This is actually a sort of single-event-error, so handle it like that. - # We acknowledge what came before, have the error handler tell us what to - # do, and on retry, retry the rest of the batch. - - {success, recorded_event, left} = - case Enum.chunk_by(events, fn e -> e.data == event end) do - [[recorded_event], left] -> {[], recorded_event, left} - [success, [recorded_event]] -> {success, recorded_event, []} - [success, [recorded_event], left] -> {success, recorded_event, left} - end - - last_successful_event_id = - case List.last(success) do - %{event_id: id} -> id - _ -> nil - end - - telemetry_metadata = - telemetry_metadata - |> Map.put(:recorded_event, recorded_event) - |> Map.put(:last_event_id, last_successful_event_id) - |> Map.put(:event_count, length(success)) - |> Map.put(:error, reason) - - telemetry_stop(start_time, telemetry_metadata, :batch) - - confirm_receipt(success, state) - - log_event_error(error, recorded_event, state) - - failure_context = build_failure_context(recorded_event, context, state) - # A tricky bit here: if the error action is :skip, then we will confirm - # receipt, which updates the state, and then retry the whole batch including - # the just-acknowledged event but above, we will see that we've done that one before - # and skip it. - retry_fun = fn context, state -> - events = - case Map.get(context, :failure_action) do - :skip -> left - _ -> [recorded_event | left] - end - - context = Map.delete(context, :failure_action) - handle_batch(events, context, state) - end - - handle_event_error(error, recorded_event, failure_context, state, retry_fun) + handle_event_error(error, events, failure_context, state, retry_fun) invalid -> error = @@ -1300,8 +1247,9 @@ defmodule Commanded.Event.Handler do ) do data = case maybe_failed_event do - nil -> nil + events when is_list(events) -> Enum.map(events, fn %RecordedEvent{data: data} -> data end) %RecordedEvent{data: data} -> data + nil -> nil end %Handler{handler_module: handler_module} = state diff --git a/test/event/event_handler_batch_telemetry_test.exs b/test/event/event_handler_batch_telemetry_test.exs index 0656f849..bb748814 100644 --- a/test/event/event_handler_batch_telemetry_test.exs +++ b/test/event/event_handler_batch_telemetry_test.exs @@ -72,13 +72,13 @@ defmodule Commanded.Event.EventHandlerBatchTelemetryTest do assert metadata.event_count == 4 assert_receive {[:commanded, :event, :batch, :stop], _measurements, metadata} - assert metadata.event_count == 2 + assert metadata.event_count == 4 end test "should emit `[:commanded, :event, :batch, :exception]` telemetry from thrown exception" do events = [ - %ReplyEvent{reply_to: self(), value: :error}, - %ReplyEvent{reply_to: self(), value: :error} + %ReplyEvent{reply_to: self(), value: :raise}, + %ReplyEvent{reply_to: self(), value: :raise} ] metadata = %{"key" => "value"} diff --git a/test/event/event_handler_batch_test.exs b/test/event/event_handler_batch_test.exs index ca0128ff..ec72c417 100644 --- a/test/event/event_handler_batch_test.exs +++ b/test/event/event_handler_batch_test.exs @@ -56,8 +56,7 @@ defmodule Commanded.Event.EventHandlerBatchTest do Handler.handle_info({:events, recorded_events}, state) assert_received {:error, :skipping} - assert_received {:batch, _, 2} - assert_received {:acked, ^last_recorded_event} + refute_receive {:acked, ^last_recorded_event} end test "should stop" do diff --git a/test/event/support/batch_handler.ex b/test/event/support/batch_handler.ex index 650ed79c..3d6865a7 100644 --- a/test/event/support/batch_handler.ex +++ b/test/event/support/batch_handler.ex @@ -26,9 +26,9 @@ defmodule Commanded.Event.BatchHandler do :ok - {event, _metadata} -> - Logger.info("Handle specific bad event") - {:error, :bad_value, event} + {_event, _metadata} -> + Logger.info("Make the entire batch fail if an error is encountered") + {:error, :bad_value} end end diff --git a/test/event/support/error_handling_batch_handler.ex b/test/event/support/error_handling_batch_handler.ex index ec4f616f..1409cef8 100644 --- a/test/event/support/error_handling_batch_handler.ex +++ b/test/event/support/error_handling_batch_handler.ex @@ -11,12 +11,16 @@ defmodule Commanded.Event.ErrorHandlingBatchHandler do require Logger @impl true - def handle_batch([{%ErrorEvent{strategy: "retry"} = event, _metadata} | _rest]) do - {:error, :bad_value, event} + def handle_batch([{%ErrorEvent{strategy: "retry"}, _metadata} | _rest]) do + {:error, :bad_value} end - def handle_batch([{%ErrorEvent{strategy: "skip"} = event, _metadata} | _rest]) do - {:error, :skipping, event} + def handle_batch([{%ErrorEvent{strategy: "skip"}, _metadata} | _rest]) do + {:error, :skipping} + end + + def handle_batch([{%ReplyEvent{value: :raise}, _metadata} | _rest]) do + raise ArgumentError, "Raise" end def handle_batch([{%ReplyEvent{reply_to: reply_to, value: value}, _metadata} | _rest]) @@ -26,9 +30,9 @@ defmodule Commanded.Event.ErrorHandlingBatchHandler do :ok end - def handle_batch([{event, _metadata} | _rest]) do + def handle_batch([{_event, _metadata} | _rest]) do Logger.info("Returning bad value error") - {:error, :bad_value, event} + {:error, :bad_value} end def handle_batch(events) do @@ -37,20 +41,26 @@ defmodule Commanded.Event.ErrorHandlingBatchHandler do end @impl true - def error({:error, :skipping}, %ErrorEvent{reply_to: reply_to}, _failure_context) do + def error({:error, :skipping}, [%ErrorEvent{reply_to: reply_to} | _], _failure_context) do send(reply_to, {:error, :skipping}) :skip end - def error({:error, reason}, %ErrorEvent{strategy: "default"} = event, _failure_context) do - %ErrorEvent{reply_to: reply_to} = event + def error( + {:error, reason}, + [%ErrorEvent{strategy: "default", reply_to: reply_to} | _], + _failure_context + ) do send(reply_to, {:error, :stopping}) {:stop, reason} end - def error({:error, _reason}, %ErrorEvent{strategy: "retry"} = event, failure_context) do - %ErrorEvent{reply_to: reply_to} = event + def error( + {:error, _reason}, + [%ErrorEvent{strategy: "retry", reply_to: reply_to} | _], + failure_context + ) do %FailureContext{context: context} = failure_context context = Map.update(context, :failures, 1, fn failures -> failures + 1 end)