Skip to content

Commit

Permalink
Batching support (#3)
Browse files Browse the repository at this point in the history
* Pull in initial work by @davydog187.

* Redefine how acknowledgement works

Skeleton code for batched handling, to be tested

Basic skeleton test

Ensure that no conflicting options/handlers are configured

Move the module compilation test into describe block

Add code to do error handling on batches (untested)

Add tests for batching error handling

Allow ack_events to take a list of events and acknowledge last event

batch handler telemetry

error handling fixes

Update support batch handler logic

Add batch handler telemetry tests

Align naming of batch handler test module

Update retry logic

Update tests

Add configuration test for ensuring batch_size and concurrency not test at same time

Fix handle_batch comparison and add test case

Add tests for handle_batch state and update last_event_seen logic

Remove list implementation for InMemory#ack_event/3

Minor update for event_handler_batch_state_test

Document handle_batch/2 for state update

Add more event handler test coverage

- upcast test for handle_batch/2
- batch_reset_event_handler_test

Do not retry if :skip is received

Commanded.options() -> Commanded.Application.options()

Allow skipping events for batched handler

Include commanded#493 and commanded#489 in CHANGELOG

Include commanded#493 and commanded#489 in CHANGELOG

Release v1.4.0

bugfix: retry command executing when the aggregator is down right before the execution

chore: improve typespec for router dispatch resp

Include commanded#494 in CHANGELOG

Remove duplicate event apply when receiving missed events published to aggregate's event stream

Update CHANGELOG

Use Erlang v25.0.4 and Elixir v1.14.0-otp-25

Require at least Elixir v1.10

Fix typespec typo in Commanded.Application

Use `:test` Mix env for GitHub workflow

To catch dialyzer and credo errors in test files.

Release v1.4.1

retry remaining batch when skipping event

only retry for batch

Reduce compile-time dependencies

Reformat

Cleanup some TODO comments that have been implemented

Remove unused code in test

Make Credo happy

Update docs for :skip return in error callback

Filter any already seen events from handle_batch

Update docs on event given to batch error hander

* Drop support for {:error, reason, event}

Format

Fix dialyzer error

* Update docs

* Use delegate_event_to_handler & make confirm_receipt be more generic

* Do not retry :skip events

---------

Co-authored-by: Dave Lucia <[email protected]>
Co-authored-by: Cees de Groot <[email protected]>
  • Loading branch information
3 people authored Sep 20, 2024
1 parent 3c084bd commit 5c50d3c
Show file tree
Hide file tree
Showing 17 changed files with 1,195 additions and 52 deletions.
407 changes: 356 additions & 51 deletions lib/commanded/event/handler.ex

Large diffs are not rendered by default.

7 changes: 6 additions & 1 deletion lib/commanded/event_store/adapter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,12 @@ defmodule Commanded.EventStore.Adapter do

@doc """
Acknowledge receipt and successful processing of the given event received from
a subscription to an event stream.
a subscription to an event stream. Note that if the event is part of a batch,
all events that precede this event in the batch are considered to be acknowledged
as well.
TODO Batching: this holds true for the PostgreSQL implementation, needs to be added to in_memory and
verified for EventStoreDB.
"""
@callback ack_event(adapter_meta, pid, RecordedEvent.t()) :: :ok

Expand Down
2 changes: 2 additions & 0 deletions lib/commanded/event_store/adapters/in_memory.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ defmodule Commanded.EventStore.Adapters.InMemory do
@behaviour Commanded.EventStore.Adapter

use GenServer
require Logger

defmodule State do
@moduledoc false
Expand Down Expand Up @@ -502,6 +503,7 @@ defmodule Commanded.EventStore.Adapters.InMemory do

defp ack_persistent_subscription_by_pid(%State{} = state, %RecordedEvent{} = event, pid) do
%RecordedEvent{event_number: event_number} = event
Logger.debug(fn -> "Acknowleding event ##{event_number}" end)

update_persistent_subscription(state, pid, fn %PersistentSubscription{} = subscription ->
subscription = PersistentSubscription.ack(subscription, event_number)
Expand Down
101 changes: 101 additions & 0 deletions test/event/event_handler_batch_state_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
defmodule Commanded.Event.EventHandlerBatchStateTest do
use Commanded.MockEventStoreCase

alias Commanded.Event.StatefulBatchedEventHandler
alias Commanded.Helpers.EventFactory

defmodule AnEvent do
@derive Jason.Encoder
defstruct [:reply_to, :increment]
end

describe "batched event handler state" do
test "initially set in `init/1` function" do
handler = start_supervised!(StatefulBatchedEventHandler)

event = %AnEvent{reply_to: reply_to(), increment: true}
send_events_to_handler(handler, [event])

assert_receive {:batch, [{^event, metadata}]}
assert match?(%{state: 0}, metadata)
end

test "initially set as runtime option" do
handler = start_supervised!({StatefulBatchedEventHandler, state: 1})

event = %AnEvent{reply_to: reply_to(), increment: true}
send_events_to_handler(handler, [event])

assert_receive {:batch, [{^event, metadata}]}
assert match?(%{state: 1}, metadata)
end

test "updated by returning `{:ok, new_state}` from `handle_batch/2` function" do
handler = start_supervised!(StatefulBatchedEventHandler)

event1 = %AnEvent{reply_to: reply_to(), increment: true}
event2 = %AnEvent{reply_to: reply_to(), increment: true}
event3 = %AnEvent{reply_to: reply_to(), increment: true}

send_events_to_handler(handler, [event1, event2])
assert_receive {:batch, [{^event1, metadata1}, {^event2, metadata2}]}
assert match?(%{state: 0}, metadata1)
assert match?(%{state: 0}, metadata2)

send_events_to_handler(handler, [event3], 3)
assert_receive {:batch, [{^event3, metadata3}]}
assert match?(%{state: 2}, metadata3)
end

test "not updated when returning `:ok` from `handle_batch/2` function" do
handler = start_supervised!(StatefulBatchedEventHandler)

event1 = %AnEvent{reply_to: reply_to(), increment: false}
event2 = %AnEvent{reply_to: reply_to(), increment: false}

send_events_to_handler(handler, [event1])
assert_receive {:batch, [{^event1, metadata}]}
assert match?(%{state: 0}, metadata)

send_events_to_handler(handler, [event2], 2)
assert_receive {:batch, [{^event2, metadata}]}
assert match?(%{state: 0}, metadata)
end

test "state is reset when process restarts" do
opts = [state: 10]
handler = start_supervised!({StatefulBatchedEventHandler, opts})

event1 = %AnEvent{reply_to: reply_to(), increment: true}
event2 = %AnEvent{reply_to: reply_to(), increment: true}

send_events_to_handler(handler, [event1])
assert_receive {:batch, [{^event1, metadata}]}
assert match?(%{state: 10}, metadata)

send_events_to_handler(handler, [event2], 2)
assert_receive {:batch, [{^event2, metadata}]}
assert match?(%{state: 11}, metadata)

%{id: id} = StatefulBatchedEventHandler.child_spec(state: 10)

stop_supervised!(id)

handler = start_supervised!({StatefulBatchedEventHandler, opts})

event3 = %AnEvent{reply_to: reply_to(), increment: true}
send_events_to_handler(handler, [event3], 3)

assert_receive {:batch, [{^event3, metadata}]}
assert match?(%{state: 10}, metadata)
end
end

defp reply_to, do: :erlang.pid_to_list(self())

defp send_events_to_handler(handler, events, initial_event_number \\ 1) do
recorded_events = EventFactory.map_to_recorded_events(events, initial_event_number)

send(handler, {:events, recorded_events})
end
end
59 changes: 59 additions & 0 deletions test/event/event_handler_batch_subscribe_to_stream_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
defmodule Commanded.Event.EventHandlerBatchSubscribeToStreamTest do
use ExUnit.Case

alias Commanded.{DefaultApp, EventStore}
alias Commanded.Event.Mapper

defmodule AnEvent do
@derive Jason.Encoder
defstruct [:stream_uuid, :reply_to]
end

defmodule SingleStreamBatchEventHandler do
use Commanded.Event.Handler,
application: DefaultApp,
name: __MODULE__,
subscribe_to: "stream2",
batch_size: 3

def handle_batch([{%AnEvent{stream_uuid: stream_uuid, reply_to: reply_to}, _metadata} | _rest]) do
pid = :erlang.list_to_pid(reply_to)
Process.send(pid, {:event, stream_uuid}, [])

:ok
end
end

describe "single stream batch event handler" do
setup do
start_supervised!(DefaultApp)
start_supervised!(SingleStreamBatchEventHandler)

:ok
end

test "should be only be notified of events appended to subscribed stream" do
append_events_to_stream("stream1", 3)
append_events_to_stream("stream2", 3)
append_events_to_stream("stream3", 3)

assert_receive {:event, "stream2"}
assert_receive {:event, "stream2"}
assert_receive {:event, "stream2"}
refute_receive {:event, _stream_uuid}
end
end

defp append_events_to_stream(stream_uuid, count) do
reply_to = :erlang.pid_to_list(self())

events =
1..count
|> Enum.map(fn _i ->
%AnEvent{reply_to: reply_to, stream_uuid: stream_uuid}
end)
|> Mapper.map_to_event_data()

EventStore.append_to_stream(DefaultApp, stream_uuid, :any_version, events)
end
end
138 changes: 138 additions & 0 deletions test/event/event_handler_batch_telemetry_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
defmodule Commanded.Event.EventHandlerBatchTelemetryTest do
use ExUnit.Case

alias Commanded.Application.Config
alias Commanded.Event.Handler
alias Commanded.EventStore.Subscription

# Test support code
alias Commanded.Helpers.EventFactory
alias Commanded.Event.{BatchHandler, ErrorHandlingBatchHandler}
alias Commanded.Event.EventHandlerBatchTelemetryTest.MockAdapter
alias Commanded.Event.ReplyEvent

setup do
attach_telemetry()
:ok
end

describe "batch event handler telemetry" do
test "should emit `[:commanded, :event, :batch, :stop]` telemetry from :ok" do
events = [
%ReplyEvent{reply_to: self(), value: 1},
%ReplyEvent{reply_to: self(), value: 2}
]

metadata = %{"key" => "value"}
recorded_events = EventFactory.map_to_recorded_events(events, 1, metadata: metadata)
state = setup_state(BatchHandler)

{:noreply, _state} = Handler.handle_info({:events, recorded_events}, state)

assert_receive {[:commanded, :event, :batch, :start], _measurements, _metadata}

assert_receive {[:commanded, :event, :batch, :stop], _measurements, metadata}
assert metadata.event_count == 2

refute_received {[:commanded, :event, :batch, :exception], _measurements, _metadata}
end

test "should emit `[:commanded, :event, :batch, :stop]` telemetry from :error" do
events = [
%ReplyEvent{reply_to: self(), value: :error},
%ReplyEvent{reply_to: self(), value: :error}
]

metadata = %{"key" => "value"}
recorded_events = EventFactory.map_to_recorded_events(events, 1, metadata: metadata)
state = setup_state(BatchHandler)

{:stop, :bad_value, _state} = Handler.handle_info({:events, recorded_events}, state)

assert_receive {[:commanded, :event, :batch, :start], _measurements, _metadata}
assert_receive {[:commanded, :event, :batch, :stop], _measurements, _metadata}
refute_received {[:commanded, :event, :batch, :exception], _measurements, _metadata}
end

test "should include count of successfully processed events when erroring on a specific event" do
events = [
%ReplyEvent{reply_to: self(), value: 1},
%ReplyEvent{reply_to: self(), value: 2},
%ReplyEvent{reply_to: self(), value: :error},
%ReplyEvent{reply_to: self(), value: 4}
]

metadata = %{"key" => "value"}
recorded_events = EventFactory.map_to_recorded_events(events, 1, metadata: metadata)
state = setup_state(BatchHandler)

Handler.handle_info({:events, recorded_events}, state)

assert_receive {[:commanded, :event, :batch, :start], _measurements, metadata}
assert metadata.event_count == 4

assert_receive {[:commanded, :event, :batch, :stop], _measurements, metadata}
assert metadata.event_count == 4
end

test "should emit `[:commanded, :event, :batch, :exception]` telemetry from thrown exception" do
events = [
%ReplyEvent{reply_to: self(), value: :raise},
%ReplyEvent{reply_to: self(), value: :raise}
]

metadata = %{"key" => "value"}
recorded_events = EventFactory.map_to_recorded_events(events, 1, metadata: metadata)
state = setup_state(ErrorHandlingBatchHandler)

{:stop, _, _} =
Handler.handle_info({:events, recorded_events}, state)

assert_receive {[:commanded, :event, :batch, :start], _measurements, _metadata}
refute_received {[:commanded, :event, :batch, :stop], _measurements, _metadata}
assert_receive {[:commanded, :event, :batch, :exception], _measurements, _metadata}
end
end

defp setup_state(handler_module) do
Config.associate(self(), __MODULE__, event_store: {MockAdapter, nil})

%Handler{
subscription:
struct(Subscription,
application: __MODULE__,
subscription_pid: self()
),
handler_callback: :batch,
handler_module: handler_module,
consistency: :eventual,
last_seen_event: 0
}
end

defp attach_telemetry do
:telemetry.attach_many(
"test-handler",
[
[:commanded, :event, :batch, :start],
[:commanded, :event, :batch, :stop],
[:commanded, :event, :batch, :exception]
],
fn event_name, measurements, metadata, reply_to ->
send(reply_to, {event_name, measurements, metadata})
end,
self()
)

on_exit(fn ->
:telemetry.detach("test-handler")
end)
end

defmodule MockAdapter do
def ack_event(nil, subscription_pid, event) do
send(subscription_pid, {:acked, event})
:ok
end
end
end
Loading

0 comments on commit 5c50d3c

Please sign in to comment.