Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a method to ShapeStream to abort long poll + reconnect + emit a new up-to-date with latest LSN #2216

Open
samwillis opened this issue Dec 31, 2024 · 5 comments

Comments

@samwillis
Copy link
Contributor

samwillis commented Dec 31, 2024

Related to #2215, we will need something like a .refresh() method in ShapeStream that can be called to abort the current long poll, reconnect with live=false, and therefor emit a new up-to-date.

There is some nuance, as this call only wants to be made during long polling - I think it's a no-op when there is no active long poll.

Further apis can wrap this to create "aligned" streams, calling the refresh method on sibling streams when they receive an update and a timeout threshold has been exceeded without an up-to-date on the other streams.

@KyleAMathews
Copy link
Contributor

I still don't understand how this is helpful. A live request is guaranteed to get sent data as soon as it exists. A non live request has longer caching headers so a response might there might even be further behind the tip of the stream...

I think a slight buffering (which we test in the wild to collect say 99.99% of associated stream updates) after receiving a new message along with resilience against missing messages (e.g. a join fails temporarily) seems better?

@samwillis
Copy link
Contributor Author

samwillis commented Dec 31, 2024

@KyleAMathews I'm trying to find a way to integrate this into D2. Each stream needs to send a frontier message that gives a lower bound of the versions that it may send in future. This lower bound is then used by the reduce, concat, consolidate in various ways. A pending long poll sent a frontier at some point in the past, but I need it to send on even when there is no data when other streams have received data.

  • reduce emits a result of the computation at a specific version when it receives a frontear message
  • concat and join join two streams, when either stream emits a frontier they emits the min(newFrontier, lastEmitedFrontier), essentially the lower band of what it knows about.
  • consolidate buffers messages until it receives a frontier, and then consolidates the buffered messages that have a version lower than the frontier.

Whenever you use a concat or join you want to then consolidate the stream before sending it to a user or materialising it. If you haven't received a frontier on one side of the join, but messages on the other side have a higher version, they will not be emitted.

This PR in the d2ts repo (electric-sql/d2ts#11) has an working Electric -> D2 adapter, and an example that:

  • has issues, users, comments shapes
  • outputs a stream that combines them into an issue stream with the users details, and a count of comments
    However, it doesn't work because it blocks the D2 pipeline waiting for a frontier on the various inputs.

You are correct that a timeout could work in theory. However, if we simulate a frontier based on a timeout, and then receive a late message, without changing its version (max(simulatedLsn, messageLsn), which could have strange side effects) it will throw an error, stoping the stream, as a message has been received with a lower version than that of the last frontier and cannot be processed.

If we want to offer any guarantee (as we could if we use D2, it's what it's designed for) when joining streams, we need a way to assert the lower bound of LSN/Version that a stream could receive at a point in time so that we can inject a frontier.

There is an alternative, and that is to decide not to offer these guarantees, DBSP for example does away with all the versioning stuff, it is assumed that messages are receved in order, and at the correct time. We could either build a DBSP implementation or take the D2 ts lib and strip out all version/ordering stuff. There is a route to doing that, and it makes it simpler. But the downside is we loos the ability to offer the transactional guarantee.

@KyleAMathews
Copy link
Contributor

So what you really want is every stream to output on every transaction whether it has a change or not? Would this also solve it?

@samwillis
Copy link
Contributor Author

That would also work, but on a busy database that's a lot of transactions. You could easily the long polling just becoming an endless loop of reconnects.

It ends up being a higher overhead than just aborting and reconnecting from the client side.

@KyleAMathews
Copy link
Contributor

Right sure — just conceptually that's the goal. Just making sure I understand.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants