Evented system framework
If available in Hex, the package can be installed
by adding esp_ex
to your list of dependencies in mix.exs
:
def deps do
[
{:esp_ex, "~> 0.1.0"}
]
end
Documentation can be generated with ExDoc and published on HexDocs. Once published, the docs can be found at https://hexdocs.pm/esp_ex.
- MessageStore.Postgres which can write and
read messages from the database in the specified format. It's used mainly for
writing, since the reading is performed transparently through
Consumer
andStore
- EventTransformer which allows to transform a message as it comes from the database into an elixir struct (an event) of your choice
- Projection which takes a list of events and create the specified entity by applying logic for each event
- StreamName holds the representation of a stream name by
converting from a raw string
"campaign:command-123"
to a struct following the conventions - Store takes a stream and an id, finds all the events on that stream and projects the requested entity, returning the final result
- Consumer.Postgres listen to any incoming event and
react by running a handler (a function which pattern matches on that event)
- Handler a module with a function
handle
which pattern matches on the event and does any kind of work (business logic)
- Handler a module with a function
- Entity a behaviour that should be implemented by entity modules, ensuring those can be initialized without any argument
- RawEvent database representation of a message in the
messages table
- RawEvent.Metadata represent a set of useful metadata attributes that can be stored in a message
- Event helpers to create a RawEvent from a custom struct
Assuming alias MessageStore.Postgres, as: MessageStore
in all the following
examples:
stream_name = EspEx.StreamName.new("person", "123")
raw_event = %EspEx.RawEvent{
type: "Created",
data: %{name: "Some Name"}
stream_name: stream_name
}
# Assuming the stream is empty
MessageStore.write!(raw_event) # => 0
# Assuming the stream has only 1 message
MessageStore.write!(raw_event, 0) # => 1
# Assuming the stream has 2 messages, so "version" is 1
MessageStore.write!(raw_event, 2) # => raises ExpectedVersionError
defmodule Person.Events do
use EventTransformer,
# optional, default to this module
events_module: __MODULE__
defmodule Created do
defstruct [:name]
end
end
stream_name = EspEx.StreamName.new("person", "123")
raw_event = %EspEx.RawEvent{
type: "Created",
data: %{name: "Some Name"}
stream_name: stream_name
}
Person.Events.to_event(raw_event) # => %Created{name: "Some Name"}
raw_event = Map.put(raw_event, :type, "Renamed")
Person.Events.to_event(raw_event) # => %EspEx.Event.Unknown{...}
You can customize the function to_event
however you want.
Assuming the modules from EventTransformer
defmodule Person do
defstruct [:name]
def new() do
%__MODULE__{name: "noname"}
end
end
defmodule Person.Projection do
use EspEx.Projection
def apply(%Person{} = person, %Person.Events.Created{} = event) do
Map.put(person, :name, event.name)
end
end
person = Person.new()
created = %Person.Events.Created{name: "jerry"}
person = Person.Projection.apply(person, created)
person.name # => "jerry"
Creates a stream name. A stream name is in the format:
category:type1+type2-ID
.
StreamName provides 2 very helpful constructors:
alias EspEx.StreamName
stream_name = StreamName.new("person", "123")
to_string(stream_name) # => "person-123"
stream_name = StreamName.new("person", "123", ["position", "command"])
# The types are always sorted
to_string(stream_name) # => "person:command+position-123"
stream_name = StreamName.from_string("person:command-123")
inspect(stream_name)
# => %StreamName{category: "person", identifier: "123", types: ["command"]}
Fetches all events from a stream and project them. Assume the module
defmodule Person.Store do
use EspEx.Store,
# required, an implementation of `MessageStore`
message_store: EspEx.MessageStore.Postgres,
# required, anything which responds to `new/0` and returns needed entity
# initialized
entity_builder: Person,
# required
event_transformer: Person.Events,
# required
projection: Person.Projection,
# required, you want this to be a category stream (stream without
# identifer)
stream_name: EspEx.StreamName.new("person")
end
created = %Person.Events.Created{name: "jerry"}
createdAgain = %Person.Events.Created{name: "francesco"}
stream_name = EspEx.StreamName.from_string("person-123")
alias EspEx.MessageStore.Postgres, as: MessageStore
raw_event = Event.to_raw_event(created, stream_name)
MessageStore.write!(raw_event)
raw_event = Event.to_raw_event(createdAgain, stream_name)
MessageStore.write!(raw_event)
{person, version} = Person.Store.fetch("123")
version # => 1
person # => %Person{name: "francesco"}
# Notice how it's not "jerry", since 2 events have been applied
Listen for incoming events and process them. Assume all the code in Store
defmodule Person.Consumer do
use EspEx.Consumer.Postgres,
# required
event_transformer: Person.Events,
# required
stream_name: EspEx.StreamName.new("person")
# optional, a string uniquely identifying this consumer. Defaults to module
# name
identifier: __MODULE__,
# optional, a module handling events for this module. Defaults to this
# module
handler: __MODULE__,
# optional, check the documentation, you should never need this
listen_opts: []
use EspEx.Handler
def handle(%Person.Events.Created{} = created, _, _) do
IO.puts("A person was created! Hello #{created.name}")
end
end
{:ok, consumer} = GenServer.start_link(Person.Consumer, nil)
created = %Person.Events.Created{name: "jerry"}
stream_name = EspEx.StreamName.from_string("person-123")
alias EspEx.MessageStore.Postgres, as: MessageStore
raw_event = Event.to_raw_event(created, stream_name)
MessageStore.write!(raw_event)
# When the consumer receives the message, it will print:
# A person was created! Hello jerry
Check Consumer.Postgres
This code is inspired and partially reimplements eventide, as such is also subject to the eventide LICENSE