diff --git a/CHANGELOG.md b/CHANGELOG.md index e4f84af..c8d0e2c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,15 +1,29 @@ # Changelog +## Next release + +### Enhancements + +- Use an UPSERT SQL query to insert or update the projection version ([#52](https://github.com/commanded/commanded-ecto-projections/pull/52)). + ## 1.3.0 +### Enhancements + - Fix Elixir 1.14 compilation warnings ([#45](https://github.com/commanded/commanded-ecto-projections/pull/45)). +--- + ## 1.2.1 +### Enhancements + - Allow exceptions to be rescued by Commanded's event handler ([#37](https://github.com/commanded/commanded-ecto-projections/pull/37)). ## 1.2.0 +### Enhancements + - Support runtime projector names ([#32](https://github.com/commanded/commanded-ecto-projections/pull/32)). - Support `schema_prefix/2` function ([#33](https://github.com/commanded/commanded-ecto-projections/pull/33)). diff --git a/config/test.exs b/config/test.exs index fbde7dd..b500053 100644 --- a/config/test.exs +++ b/config/test.exs @@ -13,5 +13,5 @@ config :commanded_ecto_projections, Commanded.Projections.Repo, config :ex_unit, capture_log: true -# Print only warnings and errors during test +# Print only warning and above log messages during tests config :logger, :console, level: :warning, format: "[$level] $message\n" diff --git a/lib/projections/ecto.ex b/lib/projections/ecto.ex index 6caa822..878f361 100644 --- a/lib/projections/ecto.ex +++ b/lib/projections/ecto.ex @@ -51,7 +51,6 @@ defmodule Commanded.Projections.Ecto do use Ecto.Schema use Commanded.Event.Handler, @handler_opts - import Ecto.Changeset import Ecto.Query import unquote(__MODULE__) @@ -59,37 +58,40 @@ defmodule Commanded.Projections.Ecto do projection_name = Map.fetch!(metadata, :handler_name) event_number = Map.fetch!(metadata, :event_number) - changeset = - %ProjectionVersion{projection_name: projection_name} - |> ProjectionVersion.changeset(%{last_seen_event_number: event_number}) + projection_version = %ProjectionVersion{ + projection_name: projection_name, + last_seen_event_number: event_number + } prefix = schema_prefix(event, metadata) + # Query to update an existing projection version with the last seen event number with + # a check to ensure that the event has not already been projected. + update_projection_version = + from(pv in ProjectionVersion, + where: + pv.projection_name == ^projection_name and pv.last_seen_event_number < ^event_number, + update: [set: [last_seen_event_number: ^event_number]] + ) + multi = Ecto.Multi.new() - |> Ecto.Multi.run(:verify_projection_version, fn repo, _changes -> - version = - case repo.get(ProjectionVersion, projection_name, prefix: prefix) do - nil -> - repo.insert!( - %ProjectionVersion{ - projection_name: projection_name, - last_seen_event_number: 0 - }, - prefix: prefix - ) - - version -> - version - end - - if version.last_seen_event_number < event_number do - {:ok, %{version: version}} - else - {:error, :already_seen_event} + |> Ecto.Multi.run(:track_projection_version, fn repo, _changes -> + try do + repo.insert(projection_version, + prefix: prefix, + on_conflict: update_projection_version, + conflict_target: [:projection_name] + ) + rescue + exception in Ecto.StaleEntryError -> + # Attempted to insert a projection version for an already seen event + {:error, :already_seen_event} + + exception -> + reraise exception, __STACKTRACE__ end end) - |> Ecto.Multi.update(:projection_version, changeset, prefix: prefix) with %Ecto.Multi{} = multi <- apply(multi_fn, [multi]), {:ok, changes} <- transaction(multi) do @@ -99,7 +101,7 @@ defmodule Commanded.Projections.Ecto do :ok end else - {:error, :verify_projection_version, :already_seen_event, _changes} -> :ok + {:error, :track_projection_version, :already_seen_event, _changes} -> :ok {:error, _stage, error, _changes} -> {:error, error} {:error, _error} = reply -> reply end @@ -202,11 +204,8 @@ defmodule Commanded.Projections.Ecto do quote do defmodule ProjectionVersion do @moduledoc false - use Ecto.Schema - import Ecto.Changeset - @primary_key {:projection_name, :string, []} schema "projection_versions" do @@ -214,12 +213,6 @@ defmodule Commanded.Projections.Ecto do timestamps(type: :naive_datetime_usec) end - - @required_fields ~w(last_seen_event_number)a - - def changeset(model, params \\ :invalid) do - cast(model, params, @required_fields) - end end end end diff --git a/mix.exs b/mix.exs index 041e15c..83f38b2 100644 --- a/mix.exs +++ b/mix.exs @@ -37,13 +37,13 @@ defmodule Commanded.Projections.Ecto.Mixfile do {:postgrex, ">= 0.0.0", only: :test}, # Optional dependencies - {:jason, "~> 1.3", optional: true}, + {:jason, "~> 1.4", optional: true}, # Test & build tooling - {:dialyxir, "~> 1.2", only: [:dev, :test], runtime: false}, + {:dialyxir, "~> 1.4", only: [:dev, :test], runtime: false}, {:ex_doc, ">= 0.0.0", only: :dev, runtime: false}, {:mix_test_watch, "~> 1.1", only: :dev, runtime: false}, - {:mox, "~> 1.0", only: :test} + {:mox, "~> 1.1", only: :test} ] end diff --git a/test/projections/ecto_projection_test.exs b/test/projections/ecto_projection_test.exs index a7211ed..7c76e27 100644 --- a/test/projections/ecto_projection_test.exs +++ b/test/projections/ecto_projection_test.exs @@ -68,6 +68,64 @@ defmodule Commanded.Projections.EctoProjectionTest do assert_seen_event("Projector", 3) end + test "should prevent first event being projected more than once" do + tasks = + Enum.map(1..5, fn _index -> + Task.async(Projector, :handle, [ + %AnEvent{name: "Event1"}, + %{handler_name: "Projector", event_number: 1} + ]) + end) + + results = Task.await_many(tasks) + + assert Enum.uniq(results) == [:ok] + + assert_projections(Projection, ["Event1"]) + assert_seen_event("Projector", 1) + end + + test "should prevent an event being projected more than once" do + Projector.handle(%AnEvent{name: "Event1"}, %{handler_name: "Projector", event_number: 1}) + Projector.handle(%AnEvent{name: "Event2"}, %{handler_name: "Projector", event_number: 2}) + + tasks = + Enum.map(1..5, fn _index -> + Task.async(Projector, :handle, [ + %AnEvent{name: "Event3"}, + %{handler_name: "Projector", event_number: 3} + ]) + end) + + results = Task.await_many(tasks) + + assert Enum.uniq(results) == [:ok] + + assert_projections(Projection, ["Event1", "Event2", "Event3"]) + assert_seen_event("Projector", 3) + end + + test "should prevent an event being projected more than once after an ignored event" do + Projector.handle(%AnEvent{name: "Event1"}, %{handler_name: "Projector", event_number: 1}) + Projector.handle(%AnEvent{name: "Event2"}, %{handler_name: "Projector", event_number: 2}) + Projector.handle(%IgnoredEvent{name: "Event2"}, %{handler_name: "Projector", event_number: 3}) + + tasks = + Enum.map(1..5, fn _index -> + Task.async(Projector, :handle, [ + %AnEvent{name: "Event4"}, + %{handler_name: "Projector", event_number: 4} + ]) + end) + + results = Task.await_many(tasks) + + assert Enum.uniq(results) == [:ok] + + assert_projections(Projection, ["Event1", "Event2", "Event4"]) + assert_seen_event("Projector", 4) + end + test "should return an error on failure" do assert {:error, :failure} == Projector.handle(%ErrorEvent{}, %{handler_name: "Projector", event_number: 1}) diff --git a/test/projections/runtime_config_projector_test.exs b/test/projections/runtime_config_projector_test.exs index 76f852c..a74801f 100644 --- a/test/projections/runtime_config_projector_test.exs +++ b/test/projections/runtime_config_projector_test.exs @@ -1,17 +1,19 @@ defmodule Commanded.Projections.RuntimeConfigProjectorTest do use ExUnit.Case - import Commanded.Projections.ProjectionAssertions - + alias Commanded.EventStore.Adapters.Mock, as: MockEventStore alias Commanded.EventStore.RecordedEvent alias Commanded.Projections.Events.AnEvent - alias Commanded.Projections.Projection - alias Commanded.Projections.Repo - alias Commanded.Projections.RuntimeConfigProjector + alias Commanded.Projections.{Projection, ProjectionAssertions, Repo, RuntimeConfigProjector} alias Commanded.UUID + import Mox + import ProjectionAssertions + + setup [:set_mox_global, :stub_event_store, :verify_on_exit!] + setup do - start_supervised!(TestApplication) + start_supervised!({TestApplication, event_store: [adapter: MockEventStore]}) Ecto.Adapters.SQL.Sandbox.checkout(Repo) end @@ -54,4 +56,19 @@ defmodule Commanded.Projections.RuntimeConfigProjectorTest do defp send_events(projector, events) do send(projector, {:events, events}) end + + defp stub_event_store(_context) do + stub(MockEventStore, :ack_event, fn _adapter_meta, _pid, _event -> :ok end) + + stub(MockEventStore, :child_spec, fn _application, _config -> + {:ok, [], %{}} + end) + + stub(MockEventStore, :subscribe_to, fn + _event_store, :all, _handler_name, _handler, _subscribe_from, _opts -> + {:ok, self()} + end) + + :ok + end end