-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(NODE-6258): add signal support to cursor APIs #4364
base: main
Are you sure you want to change the base?
Conversation
src/cursor/abstract_cursor.ts
Outdated
@@ -481,6 +495,7 @@ export abstract class AbstractCursor< | |||
} | |||
|
|||
yield document; | |||
throwIfAborted(this.signal); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should a line like this also be added before the await
above?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, unless I'm mistaken I think generally we must always check the state before and after an await. I'm hoping to enumerate the possible state change (before we enter an await and after we resolve) in the tests in such a way that they will fail if I omit it 🤞🏻
src/utils.ts
Outdated
export function throwIfAborted(signal?: { aborted?: boolean; reason?: any }): void { | ||
if (signal?.aborted) { | ||
throw new MongoAbortedError('Operation was aborted', { cause: signal.reason }); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like it might be worth not wrapping the exception here, and just going with signal.reason
itself. I'm not 100% sure, it is convenient that with this, you could still see that the exception originates from a MongoDB driver method, but at the same time, I feel like there's a decent expectation that if you abort an operation through an AbortSignal
, then a) the reason
provided when aborting will be the actual exception your code sees and b) the default value for it will be an AbortError
instance. Additionally, that way it would be possible to adopt the standard throwIfAborted
function in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm trying to reconcile when a reason
, an AbortError
instance, and, DOMException
named "AbortError" is thrown in Node.js and seeing how that can inform our API:
(edit, omitted reason
case is missing from screenshot, see below)
await fetch('http://google.com', { signal: AbortSignal.abort() })
Uncaught:
DOMException [AbortError]: This operation was aborted
at node:internal/deps/undici/undici:13178:13
at process.processTicksAndRejections (node:internal/process/task_queues:95:5)
at async REPL24:1:33
await fs.promises.readFile('/dev/zero', { signal: AbortSignal.abort() })
Uncaught AbortError: The operation was aborted
at checkAborted (node:internal/fs/promises:473:11)
at Object.readFile (node:internal/fs/promises:1236:3)
at REPL25:1:51 {
code: 'ABORT_ERR',
[cause]: DOMException [AbortError]: This operation was aborted
at new DOMException (node:internal/per_context/domexception:53:5)
at AbortSignal.abort (node:internal/abort_controller:205:14)
at REPL25:1:95
at REPL25:2:4
at ContextifyScript.runInThisContext (node:vm:136:12)
at REPLServer.defaultEval (node:repl:598:22)
at bound (node:domain:432:15)
at REPLServer.runBound [as eval] (node:domain:443:12)
at REPLServer.onLine (node:repl:927:10)
at REPLServer.emit (node:events:532:35)
- https://github.com/nodejs/node/blob/main/lib/internal/fs/promises.js#L472-L475
- https://github.com/nodejs/node/blob/main/lib/internal/abort_controller.js#L229-L232
The behavior is interestingly different depending on web-ish/node-ish origins. 🤔
An error thrown inside the driver has control flow implications for an operation, throwing the right (or wrong) value with the right properties and you can make something retry that normally wouldn't have. This encourages me to make sure that the error thrown is one we control so we don't accidentally misinterpret an abort.
Perhaps not very useful to the downstream user but it does feel like a debuggability loss to not have the stack trace to where the abort was detected show up in the stack trace.
I am comfortable with the idea that we would always need a wrapper for throwIfAborted (rather than adopting the standard) if it means consistent error behavior, but I don't think I'm sure what is the best developer experience here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose another part of this is that the user's signal may be handed over to an API we depend on (possibly someday, not in this PR, depending on need) and that API may turn the abort into a rejection using its own logic (either wrap, don't wrap, etc.) And unless we try catch and convert we're going to throw whatever decision is made inside the upstream API. 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair point, I wasn't aware that there's precedent for either behavior in standard-ish APIs.
I don't disagree with there being a bit of a loss of debuggability, but my gut feeling is still that it's best to stick to what the standard throwIfAborted()
method does. Feel free to just resolve if you feel differently!
The behavior is interestingly different depending on web-ish/node-ish origins. 🤔
That's because Web APIs are designed, but Node.js APIs are slapped together
d1b4562
to
9b76e10
Compare
d18a156
to
a2af7ed
Compare
c05a69d
to
a68b3cb
Compare
a68b3cb
to
d5ed413
Compare
d5ed413
to
ab02d53
Compare
src/cmap/connection.ts
Outdated
for await (const response of this.readMany({ | ||
timeoutContext: options.timeoutContext, | ||
signal: options.signal | ||
})) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(optional)
for await (const response of this.readMany({ | |
timeoutContext: options.timeoutContext, | |
signal: options.signal | |
})) { | |
for await (const response of this.readMany(options)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, done!
@@ -693,3 +693,42 @@ export function mergeTestMetadata( | |||
} | |||
}; | |||
} | |||
|
|||
export function findLast<T, S extends T>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
export function findLast<T, S extends T>( | |
array.slice().reverse().find() |
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clever!
expect(result).to.not.be.instanceOf(Error); | ||
}); | ||
|
||
it(`rejects ${cursorAPI.toString()} when aborted after start but before await`, async () => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it(`rejects ${cursorAPI.toString()} when aborted after start but before await`, async () => { | |
it(`aborts in-flight ${cursorAPI.toString()}`, async () => { |
this is basically what the test is testing, right? I don't think the previous test title was completely accurate - wouldn't this also satisfy the title but would likely not pass the test?
const willBeResultBlocked = /* await */ captureCursorAPIResult(cursor, cursorAPI, args);
await setTimeout(2_000);
controller.abort();
const result = await willBeResultBlocked;
expect(result).to.be.instanceOf(DOMException);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes correct, the reason the sleep there breaks the test is because the single iteration is fully completed and the promise from the cursor is already resolved.
This test is specifically covering that if I abort the signal after the cursor has handed me back a promise, the promise's state becomes rejected. Basically internally I've made it past the entry "throwIfAborted" check and I stopped at the first await
, when I set the signal to aborted, I still want to be sure the function throws when the microTask queue is run.
[Symbol.asyncIterator]: [] | ||
}; | ||
|
||
async function captureCursorAPIResult(cursor, cursorAPI, args) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
async function captureCursorAPIResult(cursor, cursorAPI, args) { | |
async function iterateCursor(cursor, cursorAPI, args) { |
thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll do ya slightly better: iterateUntilDocumentOrError it matches the unified test operation (just glossing over the async generator boiler plate)
sinon.restore(); | ||
}); | ||
|
||
it(`rejects for-await on the next iteration`, async () => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is this different from the table tests above, specifically the
it(`rejects ${cursorAPI.toString()} on the subsequent call`, async () => {
test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It occurred to me it was worth writing the actual syntax into a test to observe the behavior folks would get when using it. I guess what I am missing here is a count of how many times we looped to really make it specific to what the title states. While a bit redundant worth having to understand how someone would write the practical code to use this rather than how I have to manually use it to abstract the api I'm using above.
Broke these out into two tests since the killCursor was another subject anyway.
src/mongo_types.ts
Outdated
* ```js | ||
* const controller = new AbortController(); | ||
* const { signal } = controller; | ||
* req,on('close', () => controller.abort(new Error('Request aborted by user'))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* req,on('close', () => controller.abort(new Error('Request aborted by user'))); | |
* req.on('close', () => controller.abort(new Error('Request aborted by user'))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch!
src/cmap/connection.ts
Outdated
@@ -701,7 +705,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> { | |||
|
|||
if (this.socket.write(buffer)) return; | |||
|
|||
const drainEvent = once<void>(this.socket, 'drain'); | |||
const drainEvent = once<void>(this.socket, 'drain', { signal: options.signal }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const drainEvent = once<void>(this.socket, 'drain', { signal: options.signal }); | |
const drainEvent = once<void>(this.socket, 'drain', options); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, done!
@@ -72,7 +72,8 @@ export class FindCursor<TSchema = any> extends ExplainableCursor<TSchema> { | |||
const options = { | |||
...this.findOptions, // NOTE: order matters here, we may need to refine this | |||
...this.cursorOptions, | |||
session | |||
session, | |||
signal: this.signal |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
signal: this.signal |
I don't feel strongly - this isn't necessary because the FindOptions
holds the signal (that's how Aggregate works now actually). I'd either always add the signal (align AggregationCursor with FindCursor) or always rely on the options (remove this.signal here).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, a little problematic that signal isn't in the types for find and agg and yet it comes in on those types but I think we're better off with the Abortable non-inheritance way. I added explicit signal passing to the other cursors but wouldn't be an error to do it the other way either. If we validated / sanitized find/agg options in the future then this would protect against the signal option not making it through here.
@@ -346,11 +347,11 @@ export class Server extends TypedEventEmitter<ServerEvents> { | |||
) { | |||
await this.pool.reauthenticate(conn); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems like a miss for CSOT too - should we apply the signal to reauthentication?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a great question... OIDC currently does have a system of interruption for the callback workflow which I think is configurable for the non callback ones as well.
We will be checking the state of the signal when we reach conn.command() before we write the command so we'll still not perform unnecessary work but we will get blocked here, currently. At least a healthy re-authed connection will make it back into the pool, which seems good.
Seems like this would be a follow up to integrate both CSOT and abortSignal properly 🤔 unless the solution seems obvious to @durran @addaleax?
@@ -455,6 +468,8 @@ export abstract class AbstractCursor< | |||
} | |||
|
|||
async *[Symbol.asyncIterator](): AsyncGenerator<TSchema, void, void> { | |||
this.signal?.throwIfAborted(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we throw if aborted after the check for this.closed?
const cursor = ...;
await cursor.close();
signal.abort();
for await (const doc of cursor) {
} // throws with this implementation, we should likely instead just not iterate because the cursor is closed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question. The general practice I observed in Node (just as a point of reference) was to always check the signal's state upon entry to a function. I think this provides a good reliable consistency in behavior: an aborted signal always causes a thrown exception regardless of if you're actually performing an asynchronous action. It keeps it simple to always map aborted signal -> rejected promise.
Description
What is changing?
Is there new documentation needed for these changes?
Yes, API docs.
What is the motivation for this change?
AbortController/AbortSignal has become the defacto interruption mechanism for async operations. We're starting small by linking an abort signal to the lifetime of a cursor so that .next() / toArray() / for-await usage can be interrupted by external means.
Release Highlight
Partial AbortSignal support added! 🚥
A
signal
argument can now be passed to the following APIs:collection.find()
&collection.findOne()
collection.aggregate()
collection.listCollections()
collection.countDocuments()
db.command()
When aborted, the signal will interrupt the execution of each of each of these APIs. For the cursor-based APIs, this will be observed when attempting to consume from the cursor.
For example, the http request closing will make whatever iteration follows the abort throw the signal's
reason
:Double check the following
npm run check:lint
scripttype(NODE-xxxx)[!]: description
feat(NODE-1234)!: rewriting everything in coffeescript