Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Event store #421

Draft
wants to merge 11 commits into
base: main
Choose a base branch
from
Draft

Event store #421

wants to merge 11 commits into from

Conversation

jlabedo
Copy link
Contributor

@jlabedo jlabedo commented Dec 13, 2024

Why is this change proposed?

Event store with resiliency, scalability and availability

Description of Changes

TODO

Pull Request Contribution Terms

  • I have read and agree to the contribution terms outlined in CONTRIBUTING.

@jlabedo jlabedo marked this pull request as draft December 13, 2024 18:24
before_transaction_id < pg_current_xact_id())
ORDER BY name
FOR SHARE
SQL);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

heredoc adjusts indentation based on the position of the enclosing element, so you can use normal indentation.

ON CONFLICT (event_type) DO
UPDATE SET counter = {$this->tableName}.counter + 1
WHERE {$this->tableName}.event_type = ?
SQL);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing indent for SQL

"phpunit/phpunit": "^9.5",
"phpstan/phpstan": "^1.8",
"psr/container": "^2.0",
"symfony/event-dispatcher": "^7.2",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why from 7.2?

}

public function schemaUp(): void
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How to migrate from Prooph event store?

foreach ($eventPage->events as $event) {
$projector->project($event);
}
$this->eventStore->ack($eventPage);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack does not require a commit and opening a new transaction? And if it is in one transaction, then it will be very large in the number of statements. If the transaction is not used for projection run, it may lost ack changes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assumption is, it's wrapped in Ecotone's Dbal transaction. Right @jlabedo ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes for now it's wrapped in Ecotone's Dbal transaction. But @lifinsky is right, if we are doing the catchup here (full reset) then it will be very large in the number of statements.
I see two options:

  1. We let the asynchronous runner wrapped in Ecotone's transaction but we need to first catchup the projection (projection:init command) before the async projection is triggered.
  2. We disable Ecotone's transaction on this endpoint an we manually manage small transactions there

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Option 2

}

#[MutatingEvents]
public function mutatingEvents(): array
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this for?

protected function apply(object $event): void
{
$this->mutatingEvents[] = $event;
$this->applyHandler($event);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At final stage the apply won't happen here, and will be part of Ecotone's Aggregate abstraction?

*/
public function runProjectionsWith(array $events): void;

public function addProjection(string $projectorName, string $state = "catchup"): void;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can $state be enum for more visibility where each state is used?

Ecotone is now locked on 8.1, so we are free to go

$this->connection->prepare("UPDATE {$this->streamTableName} SET version = ? WHERE stream_id = ?")->execute([$version, $eventStreamId->streamId]);
}

$this->runProjectionsWith($persistedEvents);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method right now just fetches all the projections and tries to trigger them.
Ecotone can deliver the bindings between interested events and projections, so we can just trigger the related ones.

Instead of duplicacting this logic, we can instead inject here a service, that we will reuse between Postgres, Mysql and In Memory, and just feed it from Events from Event Store.

Wdyt?


use Stringable;

readonly class StreamEventId
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name is a bit confusing, I thought it's a wrapper around stream id, but also is a version in the stream.

I would say StreamPosition would be more descriptive here, as then it make sense that it contains of stream id and the version.
Wdyt?

CREATE TABLE IF NOT EXISTS {$this->eventTableName}
(
id BIGSERIAL PRIMARY KEY,
transaction_id XID8 NOT NULL DEFAULT pg_current_xact_id(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
transaction_id XID8 NOT NULL DEFAULT pg_current_xact_id(),
transaction_number XID8 NOT NULL DEFAULT pg_current_xact_id(),

That would make it more clear too, as id does not indicate order by itself (e.g. uuid can be id)



// Execute missing events
$missingEventsLoop = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already projected events few lines before.

What is the use case for redoing that now?
I do think if something happened between projecting in line #356 and doing that extra commit for inline, then we can simply ignore that, to keep the code more easy to work with.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get your statement.

We are projecting events in order by avoiding gaps on line 356. When there are no more events to catchup, we switch to inline, but events discarded by gaps are still not handled (and won't be handled by inline processing). This is the reason of this code: handling eventually missing events.

By the way, for now there is no lock on the events table, so there is a small period of time where events can be projected in wrong order: Events handled on line 398 can be projected after first "inline" events. It may be ok, but if it is not, we can add a brief lock on events table.

}

do {
$page = $this->readFromSubscription($projectorName);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If EventPage will hold iterator which will call database to fetch the events then we do not even need to recall this. We can rename EventPage to EventStream, as it will hold all ability to fetch all related events for us.

Then we can simply do foreach($eventStream->events() as $event) without while

}
$this->eventStore->ack($eventPage);

if ($command->until !== null && $eventPage->endPosition->isBefore($command->until)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We always want to project till the end, if we can catch something that just happened then why not, less delay for projection.
The other command then will simply skip.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the idea of this code: project until there are no more events to handle (You are right I am missing a check to continue while $eventPage->events is not empty)

But the idea behind this is also to force the runner to wait for new events that are still after an interleaved transaction (a gap).
Imagine we start a very long transaction appending events that this projection is not intersested in. Then a second transaction start, appends events related to this projection. The runner starts but won't see any events: there is a gap because of the first transaction still running.
I we stop the runner without waiting, the projection would be stale until another event triggers the projection.
That is the reason why we are stating that this trigger is not complete if it does not reach AT LEAST the position of the last event that triggered the projection.
Does it makes sense ?

foreach ($eventPage->events as $event) {
$projector->project($event);
}
$this->eventStore->ack($eventPage);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assumption is, it's wrapped in Ecotone's Dbal transaction. Right @jlabedo ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants