Skip to content

Commit

Permalink
Handle failed subscription payloads (#7866)
Browse files Browse the repository at this point in the history
Currently, the error handling for subscription events is not
well-defined in the GraphQL spec, but that doesn't mean we shouldn't
handle them! The existing behavior is that an error thrown from a
subscription's generator will go uncaught and crash the whole server.

For a transient failure, it may be preferable for consumers that we
simply return an error response and continue waiting for more data from
the iterator, in case the producer recovers and resumes producing valid
data. However, Node's AsyncGenerator terminates once an error is thrown,
even if you manually loop calling `iterator.next()`.

This change wraps the iterator consumption in a `try/catch` and closes
the subscription when an error is encountered. Propagating the error up
to the subscriber will allow them to decide if they need to resubscribe
or not, in the case of a transient error.
  • Loading branch information
tninesling authored Apr 5, 2024
1 parent 5cf2928 commit 5f335a5
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 30 deletions.
5 changes: 5 additions & 0 deletions .changeset/large-ladybugs-breathe.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@apollo/server": patch
---

Catch errors thrown by subscription generators, and gracefully clean up the subscription instead of crashing.
Original file line number Diff line number Diff line change
Expand Up @@ -1102,6 +1102,62 @@ describe('SubscriptionCallbackPlugin', () => {
`);
});

it('sends a `complete` with errors when a subscription throws an error', async () => {
const server = await startSubscriptionServer({ logger });

mockRouterCheckResponse();
mockRouterCheckResponse();
mockRouterCompleteResponse({
errors: [{ message: "The subscription generator didn't catch this!" }],
});

const result = await server.executeHTTPGraphQLRequest(
buildHTTPGraphQLRequest({
body: {
query: `#graphql
subscription {
throwsError
}
`,
extensions: {
subscription: {
callbackUrl: 'http://mock-router-url.com',
subscriptionId: '1234-cats',
verifier: 'my-verifier-token',
},
},
},
}),
);
expect(result.status).toEqual(200);

jest.advanceTimersByTime(5000);
await server.stop();

expect(logger.orderOfOperations).toMatchInlineSnapshot(`
[
"SubscriptionCallback[1234-cats]: Received new subscription request",
"SubscriptionManager[1234-cats]: Sending \`check\` request to router",
"SubscriptionManager[1234-cats]: \`check\` request successful",
"SubscriptionCallback[1234-cats]: Starting graphql-js subscription",
"SubscriptionCallback[1234-cats]: graphql-js subscription successful",
"SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com",
"SubscriptionManager[1234-cats]: Listening to graphql-js subscription",
"SubscriptionCallback[1234-cats]: Responding to original subscription request",
"ERROR: SubscriptionManager[1234-cats]: Generator threw an error, terminating subscription: The subscription generator didn't catch this!",
"SubscriptionManager[1234-cats]: Sending \`complete\` request to router with errors",
"SubscriptionManager: Sending \`check\` request to http://mock-router-url.com for ID: 1234-cats",
"SubscriptionCallback: Server is shutting down. Cleaning up outstanding subscriptions and heartbeat intervals",
"SubscriptionManager[1234-cats]: \`complete\` request successful",
"SubscriptionManager: Terminating subscriptions for ID: 1234-cats",
"SubscriptionManager: Terminating heartbeat interval for http://mock-router-url.com",
"SubscriptionManager: Heartbeat received response for ID: 1234-cats",
"SubscriptionManager: Heartbeat request successful, ID: 1234-cats",
"SubscriptionCallback: Successfully cleaned up outstanding subscriptions and heartbeat intervals.",
]
`);
});

(process.env.INCREMENTAL_DELIVERY_TESTS_ENABLED ? describe.skip : describe)(
'error handling',
() => {
Expand Down Expand Up @@ -1979,6 +2035,7 @@ async function startSubscriptionServer(
type Subscription {
count: Int
terminatesSuccessfully: Boolean
throwsError: Int
}
`,
resolvers: {
Expand Down Expand Up @@ -2011,6 +2068,19 @@ async function startSubscriptionServer(
},
}),
},
throwsError: {
subscribe: () => ({
[Symbol.asyncIterator]() {
return {
next: () => {
throw new Error(
"The subscription generator didn't catch this!",
);
},
};
},
}),
},
},
},
...opts,
Expand Down
69 changes: 39 additions & 30 deletions packages/server/src/plugin/subscriptionCallback/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -557,39 +557,48 @@ class SubscriptionManager {
cancelled: false,
async startConsumingSubscription() {
self.logger?.debug(`Listening to graphql-js subscription`, id);
for await (const payload of subscription) {
if (this.cancelled) {
self.logger?.debug(
`Subscription already cancelled, ignoring current and future payloads`,
id,
);
// It's already been cancelled - something else has already handled
// sending the `complete` request so we don't want to `break` here
// and send it again after the loop.
return;
}
try {
for await (const payload of subscription) {
if (this.cancelled) {
self.logger?.debug(
`Subscription already cancelled, ignoring current and future payloads`,
id,
);
// It's already been cancelled - something else has already handled
// sending the `complete` request so we don't want to `break` here
// and send it again after the loop.
return;
}

try {
await self.retryFetch({
url: callbackUrl,
action: 'next',
id,
verifier,
payload,
});
} catch (e) {
const originalError = ensureError(e);
self.logger?.error(
`\`next\` request failed, terminating subscription: ${originalError.message}`,
id,
);
self.terminateSubscription(id, callbackUrl);
try {
await self.retryFetch({
url: callbackUrl,
action: 'next',
id,
verifier,
payload,
});
} catch (e) {
const originalError = ensureError(e);
self.logger?.error(
`\`next\` request failed, terminating subscription: ${originalError.message}`,
id,
);
self.terminateSubscription(id, callbackUrl);
}
}
// The subscription ended without errors, send the `complete` request to
// the router
self.logger?.debug(`Subscription completed without errors`, id);
await this.completeSubscription();
} catch (e) {
const error = ensureGraphQLError(e);
self.logger?.error(
`Generator threw an error, terminating subscription: ${error.message}`,
id,
);
this.completeSubscription([error]);
}
// The subscription ended without errors, send the `complete` request to
// the router
self.logger?.debug(`Subscription completed without errors`, id);
await this.completeSubscription();
},
async completeSubscription(errors?: readonly GraphQLError[]) {
if (this.cancelled) return;
Expand Down

0 comments on commit 5f335a5

Please sign in to comment.