Skip to content

Commit

Permalink
Handle subscription init timeout gracefully (#1915)
Browse files Browse the repository at this point in the history
### 📝 Description

When using Spring Server client tries to initiate subscription, but
timeout occurs before sending `connection_init` message Kotlin coroutine
fails with exception:
```
2024-01-22T11:14:05.757+02:00  INFO 20308 --- [           main] c.p.ApplicationKt   : Started ApplicationKt in 2.86 seconds (process running for 3.29)
Exception in thread "DefaultDispatcher-worker-2" java.util.NoSuchElementException: No value received via onNext for awaitFirst
	at kotlinx.coroutines.reactive.AwaitKt$awaitOne$2$1.onComplete(Await.kt:282)
	at reactor.core.publisher.StrictSubscriber.onComplete(StrictSubscriber.java:123)
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
	at reactor.netty.FutureMono$FutureSubscription.operationComplete(FutureMono.java:196)
	at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:590)
	at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:583)
	at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:559)
	at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:492)
	at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:636)
	at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:625)
	at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:105)
	at io.netty.util.internal.PromiseNotificationUtil.trySuccess(PromiseNotificationUtil.java:48)
	at io.netty.channel.ChannelOutboundBuffer.safeSuccess(ChannelOutboundBuffer.java:726)
	at io.netty.channel.ChannelOutboundBuffer.remove(ChannelOutboundBuffer.java:281)
	at io.netty.channel.ChannelOutboundBuffer.removeBytes(ChannelOutboundBuffer.java:361)
	at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:421)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:931)
	at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:355)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:895)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1372)
	at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:921)
	at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:907)
	at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:893)
	at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:125)
	at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:925)
	at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:941)
	at io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1247)
	at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:1583)
	Suppressed: kotlinx.coroutines.internal.DiagnosticCoroutineContextException: [StandaloneCoroutine{Cancelling}@2f9aebd2, Dispatchers.Default]
```

This small change fixes the problem, I've also added a test to cover
this specific case.
  • Loading branch information
pdambrauskas authored Jan 25, 2024
1 parent e6ab991 commit 1d413a4
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import com.expediagroup.graphql.server.execution.subscription.GRAPHQL_WS_PROTOCO
import com.expediagroup.graphql.server.execution.subscription.GraphQLWebSocketServer
import com.expediagroup.graphql.server.types.GraphQLSubscriptionStatus
import com.fasterxml.jackson.databind.ObjectMapper
import kotlinx.coroutines.reactive.awaitFirst
import kotlinx.coroutines.reactive.awaitFirstOrNull
import kotlinx.coroutines.reactor.flux
import org.springframework.web.reactive.socket.CloseStatus
import org.springframework.web.reactive.socket.WebSocketHandler
Expand Down Expand Up @@ -53,7 +53,7 @@ class SubscriptionWebSocketHandler(
)

override suspend fun closeSession(session: WebSocketSession, reason: GraphQLSubscriptionStatus) {
session.close(CloseStatus(reason.code, reason.reason)).awaitFirst()
session.close(CloseStatus(reason.code, reason.reason)).awaitFirstOrNull()
}

override suspend fun sendSubscriptionMessage(session: WebSocketSession, message: String): WebSocketMessage =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,16 @@
package com.expediagroup.graphql.server.spring.subscriptions

import com.expediagroup.graphql.server.execution.subscription.GRAPHQL_WS_PROTOCOL
import com.expediagroup.graphql.server.types.GraphQLSubscriptionStatus
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import io.mockk.every
import io.mockk.mockk
import org.junit.jupiter.api.Test
import kotlin.test.assertEquals
import kotlinx.coroutines.test.runTest
import org.junit.jupiter.api.assertDoesNotThrow
import org.springframework.web.reactive.socket.WebSocketSession
import reactor.core.publisher.Mono

class SubscriptionWebSocketHandlerTest {

Expand All @@ -35,4 +41,16 @@ class SubscriptionWebSocketHandlerTest {
val handler = SubscriptionWebSocketHandler(mockk(), mockk(), mockk(), mockk(), 1_000, jacksonObjectMapper())
assertEquals(expected = listOf(GRAPHQL_WS_PROTOCOL), actual = handler.subProtocols)
}

@Test
fun `verify default subscription handler handles init timeout gracefully`() = runTest {
val handler = SubscriptionWebSocketHandler(mockk(), mockk(), mockk(), mockk(), 1_000, jacksonObjectMapper())
val session = mockk<WebSocketSession>()

every { session.close(any()) } returns Mono.empty()

assertDoesNotThrow {
handler.closeSession(session, GraphQLSubscriptionStatus.CONNECTION_INIT_TIMEOUT)
}
}
}

0 comments on commit 1d413a4

Please sign in to comment.