From 1d413a4f2a5ca17ca4f71e021e698d0cfb6027a9 Mon Sep 17 00:00:00 2001
From: Paulius
Date: Thu, 25 Jan 2024 07:14:16 +0200
Subject: [PATCH] Handle subscription init timeout gracefully (#1915)
### :pencil: Description
When using Spring Server client tries to initiate subscription, but
timeout occurs before sending `connection_init` message Kotlin coroutine
fails with exception:
```
2024-01-22T11:14:05.757+02:00 INFO 20308 --- [ main] c.p.ApplicationKt : Started ApplicationKt in 2.86 seconds (process running for 3.29)
Exception in thread "DefaultDispatcher-worker-2" java.util.NoSuchElementException: No value received via onNext for awaitFirst
at kotlinx.coroutines.reactive.AwaitKt$awaitOne$2$1.onComplete(Await.kt:282)
at reactor.core.publisher.StrictSubscriber.onComplete(StrictSubscriber.java:123)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
at reactor.netty.FutureMono$FutureSubscription.operationComplete(FutureMono.java:196)
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:590)
at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:583)
at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:559)
at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:492)
at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:636)
at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:625)
at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:105)
at io.netty.util.internal.PromiseNotificationUtil.trySuccess(PromiseNotificationUtil.java:48)
at io.netty.channel.ChannelOutboundBuffer.safeSuccess(ChannelOutboundBuffer.java:726)
at io.netty.channel.ChannelOutboundBuffer.remove(ChannelOutboundBuffer.java:281)
at io.netty.channel.ChannelOutboundBuffer.removeBytes(ChannelOutboundBuffer.java:361)
at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:421)
at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:931)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:355)
at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:895)
at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1372)
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:921)
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:907)
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:893)
at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:125)
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:925)
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:941)
at io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1247)
at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:1583)
Suppressed: kotlinx.coroutines.internal.DiagnosticCoroutineContextException: [StandaloneCoroutine{Cancelling}@2f9aebd2, Dispatchers.Default]
```
This small change fixes the problem, I've also added a test to cover
this specific case.
---
.../SubscriptionWebSocketHandler.kt | 4 ++--
.../SubscriptionWebSocketHandlerTest.kt | 18 ++++++++++++++++++
2 files changed, 20 insertions(+), 2 deletions(-)
diff --git a/servers/graphql-kotlin-spring-server/src/main/kotlin/com/expediagroup/graphql/server/spring/subscriptions/SubscriptionWebSocketHandler.kt b/servers/graphql-kotlin-spring-server/src/main/kotlin/com/expediagroup/graphql/server/spring/subscriptions/SubscriptionWebSocketHandler.kt
index de566cfb9c..b4d3e55452 100644
--- a/servers/graphql-kotlin-spring-server/src/main/kotlin/com/expediagroup/graphql/server/spring/subscriptions/SubscriptionWebSocketHandler.kt
+++ b/servers/graphql-kotlin-spring-server/src/main/kotlin/com/expediagroup/graphql/server/spring/subscriptions/SubscriptionWebSocketHandler.kt
@@ -21,7 +21,7 @@ import com.expediagroup.graphql.server.execution.subscription.GRAPHQL_WS_PROTOCO
import com.expediagroup.graphql.server.execution.subscription.GraphQLWebSocketServer
import com.expediagroup.graphql.server.types.GraphQLSubscriptionStatus
import com.fasterxml.jackson.databind.ObjectMapper
-import kotlinx.coroutines.reactive.awaitFirst
+import kotlinx.coroutines.reactive.awaitFirstOrNull
import kotlinx.coroutines.reactor.flux
import org.springframework.web.reactive.socket.CloseStatus
import org.springframework.web.reactive.socket.WebSocketHandler
@@ -53,7 +53,7 @@ class SubscriptionWebSocketHandler(
)
override suspend fun closeSession(session: WebSocketSession, reason: GraphQLSubscriptionStatus) {
- session.close(CloseStatus(reason.code, reason.reason)).awaitFirst()
+ session.close(CloseStatus(reason.code, reason.reason)).awaitFirstOrNull()
}
override suspend fun sendSubscriptionMessage(session: WebSocketSession, message: String): WebSocketMessage =
diff --git a/servers/graphql-kotlin-spring-server/src/test/kotlin/com/expediagroup/graphql/server/spring/subscriptions/SubscriptionWebSocketHandlerTest.kt b/servers/graphql-kotlin-spring-server/src/test/kotlin/com/expediagroup/graphql/server/spring/subscriptions/SubscriptionWebSocketHandlerTest.kt
index e8662f52b3..8ad5168ddf 100644
--- a/servers/graphql-kotlin-spring-server/src/test/kotlin/com/expediagroup/graphql/server/spring/subscriptions/SubscriptionWebSocketHandlerTest.kt
+++ b/servers/graphql-kotlin-spring-server/src/test/kotlin/com/expediagroup/graphql/server/spring/subscriptions/SubscriptionWebSocketHandlerTest.kt
@@ -17,10 +17,16 @@
package com.expediagroup.graphql.server.spring.subscriptions
import com.expediagroup.graphql.server.execution.subscription.GRAPHQL_WS_PROTOCOL
+import com.expediagroup.graphql.server.types.GraphQLSubscriptionStatus
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
+import io.mockk.every
import io.mockk.mockk
import org.junit.jupiter.api.Test
import kotlin.test.assertEquals
+import kotlinx.coroutines.test.runTest
+import org.junit.jupiter.api.assertDoesNotThrow
+import org.springframework.web.reactive.socket.WebSocketSession
+import reactor.core.publisher.Mono
class SubscriptionWebSocketHandlerTest {
@@ -35,4 +41,16 @@ class SubscriptionWebSocketHandlerTest {
val handler = SubscriptionWebSocketHandler(mockk(), mockk(), mockk(), mockk(), 1_000, jacksonObjectMapper())
assertEquals(expected = listOf(GRAPHQL_WS_PROTOCOL), actual = handler.subProtocols)
}
+
+ @Test
+ fun `verify default subscription handler handles init timeout gracefully`() = runTest {
+ val handler = SubscriptionWebSocketHandler(mockk(), mockk(), mockk(), mockk(), 1_000, jacksonObjectMapper())
+ val session = mockk()
+
+ every { session.close(any()) } returns Mono.empty()
+
+ assertDoesNotThrow {
+ handler.closeSession(session, GraphQLSubscriptionStatus.CONNECTION_INIT_TIMEOUT)
+ }
+ }
}