Skip to content

Commit

Permalink
Drop support for {:error, reason, event}
Browse files Browse the repository at this point in the history
Format
  • Loading branch information
fmterrorf committed Sep 18, 2024
1 parent 110f19b commit 622551e
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 78 deletions.
66 changes: 7 additions & 59 deletions lib/commanded/event/handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,6 @@ defmodule Commanded.Event.Handler do
:ok
| {:ok, new_state :: any()}
| {:error, reason :: any()}
| {:error, reason :: any(), domain_event}

@doc """
Called when an event `handle/2` callback returns an error.
Expand Down Expand Up @@ -601,7 +600,7 @@ defmodule Commanded.Event.Handler do
"""
@callback error(
error :: term(),
failed_event :: domain_event | nil,
failed_event :: domain_event | [domain_event] | nil,
failure_context :: FailureContext.t()
) ::
{:retry, context :: map() | FailureContext.t()}
Expand Down Expand Up @@ -1107,7 +1106,8 @@ defmodule Commanded.Event.Handler do

defp handle_batch(events, context \\ %{}, handler)

defp handle_batch(events, context, %Handler{last_seen_event: last_seen_event} = state) when is_number(last_seen_event) do
defp handle_batch(events, context, %Handler{last_seen_event: last_seen_event} = state)
when is_number(last_seen_event) do
%{event_number: last_event_number} = last_event = List.last(events)

if last_event_number <= last_seen_event do
Expand All @@ -1118,7 +1118,7 @@ defmodule Commanded.Event.Handler do
confirm_receipt([last_event], state)
else
events
|> Enum.reject(& &1.event_number <= last_seen_event)
|> Enum.reject(&(&1.event_number <= last_seen_event))
|> do_handle_batch(context, state)
end
end
Expand Down Expand Up @@ -1156,60 +1156,7 @@ defmodule Commanded.Event.Handler do

failure_context = build_failure_context(nil, context, state)
retry_fun = fn context, state -> handle_batch(events, context, state) end
# For now, we pass the last event so that we can acknowledge it if the error callback
# returns :skip. In the future, we should make handle_event_error/5 be able to accept
# a list events so that the error/3 callback can get the list of events
handle_event_error(error, List.last(events), failure_context, state, retry_fun)

{:error, reason, event} ->
error = {:error, reason}
# This is actually a sort of single-event-error, so handle it like that.
# We acknowledge what came before, have the error handler tell us what to
# do, and on retry, retry the rest of the batch.

{success, recorded_event, left} =
case Enum.chunk_by(events, fn e -> e.data == event end) do
[[recorded_event], left] -> {[], recorded_event, left}
[success, [recorded_event]] -> {success, recorded_event, []}
[success, [recorded_event], left] -> {success, recorded_event, left}
end

last_successful_event_id =
case List.last(success) do
%{event_id: id} -> id
_ -> nil
end

telemetry_metadata =
telemetry_metadata
|> Map.put(:recorded_event, recorded_event)
|> Map.put(:last_event_id, last_successful_event_id)
|> Map.put(:event_count, length(success))
|> Map.put(:error, reason)

telemetry_stop(start_time, telemetry_metadata, :batch)

confirm_receipt(success, state)

log_event_error(error, recorded_event, state)

failure_context = build_failure_context(recorded_event, context, state)
# A tricky bit here: if the error action is :skip, then we will confirm
# receipt, which updates the state, and then retry the whole batch including
# the just-acknowledged event but above, we will see that we've done that one before
# and skip it.
retry_fun = fn context, state ->
events =
case Map.get(context, :failure_action) do
:skip -> left
_ -> [recorded_event | left]
end

context = Map.delete(context, :failure_action)
handle_batch(events, context, state)
end

handle_event_error(error, recorded_event, failure_context, state, retry_fun)
handle_event_error(error, events, failure_context, state, retry_fun)

invalid ->
error =
Expand Down Expand Up @@ -1300,8 +1247,9 @@ defmodule Commanded.Event.Handler do
) do
data =
case maybe_failed_event do
nil -> nil
events when is_list(events) -> Enum.map(events, fn %RecordedEvent{data: data} -> data end)
%RecordedEvent{data: data} -> data
nil -> nil
end

%Handler{handler_module: handler_module} = state
Expand Down
6 changes: 3 additions & 3 deletions test/event/event_handler_batch_telemetry_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,13 @@ defmodule Commanded.Event.EventHandlerBatchTelemetryTest do
assert metadata.event_count == 4

assert_receive {[:commanded, :event, :batch, :stop], _measurements, metadata}
assert metadata.event_count == 2
assert metadata.event_count == 4
end

test "should emit `[:commanded, :event, :batch, :exception]` telemetry from thrown exception" do
events = [
%ReplyEvent{reply_to: self(), value: :error},
%ReplyEvent{reply_to: self(), value: :error}
%ReplyEvent{reply_to: self(), value: :raise},
%ReplyEvent{reply_to: self(), value: :raise}
]

metadata = %{"key" => "value"}
Expand Down
3 changes: 1 addition & 2 deletions test/event/event_handler_batch_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ defmodule Commanded.Event.EventHandlerBatchTest do

Handler.handle_info({:events, recorded_events}, state)
assert_received {:error, :skipping}
assert_received {:batch, _, 2}
assert_received {:acked, ^last_recorded_event}
refute_receive {:acked, ^last_recorded_event}
end

test "should stop" do
Expand Down
6 changes: 3 additions & 3 deletions test/event/support/batch_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ defmodule Commanded.Event.BatchHandler do

:ok

{event, _metadata} ->
Logger.info("Handle specific bad event")
{:error, :bad_value, event}
{_event, _metadata} ->
Logger.info("Make the entire batch fail if an error is encountered")
{:error, :bad_value}
end
end

Expand Down
32 changes: 21 additions & 11 deletions test/event/support/error_handling_batch_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,16 @@ defmodule Commanded.Event.ErrorHandlingBatchHandler do
require Logger

@impl true
def handle_batch([{%ErrorEvent{strategy: "retry"} = event, _metadata} | _rest]) do
{:error, :bad_value, event}
def handle_batch([{%ErrorEvent{strategy: "retry"}, _metadata} | _rest]) do
{:error, :bad_value}
end

def handle_batch([{%ErrorEvent{strategy: "skip"} = event, _metadata} | _rest]) do
{:error, :skipping, event}
def handle_batch([{%ErrorEvent{strategy: "skip"}, _metadata} | _rest]) do
{:error, :skipping}
end

def handle_batch([{%ReplyEvent{value: :raise}, _metadata} | _rest]) do
raise ArgumentError, "Raise"
end

def handle_batch([{%ReplyEvent{reply_to: reply_to, value: value}, _metadata} | _rest])
Expand All @@ -26,9 +30,9 @@ defmodule Commanded.Event.ErrorHandlingBatchHandler do
:ok
end

def handle_batch([{event, _metadata} | _rest]) do
def handle_batch([{_event, _metadata} | _rest]) do
Logger.info("Returning bad value error")
{:error, :bad_value, event}
{:error, :bad_value}
end

def handle_batch(events) do
Expand All @@ -37,20 +41,26 @@ defmodule Commanded.Event.ErrorHandlingBatchHandler do
end

@impl true
def error({:error, :skipping}, %ErrorEvent{reply_to: reply_to}, _failure_context) do
def error({:error, :skipping}, [%ErrorEvent{reply_to: reply_to} | _], _failure_context) do
send(reply_to, {:error, :skipping})

:skip
end

def error({:error, reason}, %ErrorEvent{strategy: "default"} = event, _failure_context) do
%ErrorEvent{reply_to: reply_to} = event
def error(
{:error, reason},
[%ErrorEvent{strategy: "default", reply_to: reply_to} | _],
_failure_context
) do
send(reply_to, {:error, :stopping})
{:stop, reason}
end

def error({:error, _reason}, %ErrorEvent{strategy: "retry"} = event, failure_context) do
%ErrorEvent{reply_to: reply_to} = event
def error(
{:error, _reason},
[%ErrorEvent{strategy: "retry", reply_to: reply_to} | _],
failure_context
) do
%FailureContext{context: context} = failure_context

context = Map.update(context, :failures, 1, fn failures -> failures + 1 end)
Expand Down

0 comments on commit 622551e

Please sign in to comment.