diff --git a/jellyfin-api/api/jellyfin-api.api b/jellyfin-api/api/jellyfin-api.api index af49b857a..c6f4d1231 100644 --- a/jellyfin-api/api/jellyfin-api.api +++ b/jellyfin-api/api/jellyfin-api.api @@ -1187,6 +1187,7 @@ public final class org/jellyfin/sdk/api/sockets/OkHttpWebsocketSession : org/jel public fun (Lorg/jellyfin/sdk/api/client/HttpClientOptions;Lkotlinx/coroutines/channels/Channel;Lkotlin/coroutines/CoroutineContext;)V public fun connect (Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public fun disconnect (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public fun getState ()Lkotlinx/coroutines/flow/StateFlow; public fun send (Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; } @@ -1207,6 +1208,7 @@ public final class org/jellyfin/sdk/api/sockets/SocketInstance { public abstract interface class org/jellyfin/sdk/api/sockets/SocketInstanceConnection { public abstract fun connect (Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public abstract fun disconnect (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public abstract fun getState ()Lkotlinx/coroutines/flow/StateFlow; public abstract fun send (Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; } diff --git a/jellyfin-api/src/commonMain/kotlin/org/jellyfin/sdk/api/sockets/SocketInstance.kt b/jellyfin-api/src/commonMain/kotlin/org/jellyfin/sdk/api/sockets/SocketInstance.kt index 7cc6e76ab..cf838e4a6 100644 --- a/jellyfin-api/src/commonMain/kotlin/org/jellyfin/sdk/api/sockets/SocketInstance.kt +++ b/jellyfin-api/src/commonMain/kotlin/org/jellyfin/sdk/api/sockets/SocketInstance.kt @@ -8,6 +8,7 @@ import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.flow.first import kotlinx.coroutines.launch import kotlinx.coroutines.plus import kotlinx.coroutines.sync.Mutex @@ -19,6 +20,7 @@ import org.jellyfin.sdk.api.client.util.UrlBuilder import org.jellyfin.sdk.api.sockets.exception.SocketStoppedException import org.jellyfin.sdk.api.sockets.helper.KeepAliveHelper import org.jellyfin.sdk.api.sockets.helper.ListenerHelper +import org.jellyfin.sdk.api.sockets.helper.ReconnectHelper import org.jellyfin.sdk.api.sockets.listener.SocketListener import org.jellyfin.sdk.api.sockets.listener.SocketListenerDefinition import org.jellyfin.sdk.model.socket.ForceKeepAliveMessage @@ -54,6 +56,7 @@ public class SocketInstance internal constructor( private var connection: SocketInstanceConnection? = null private var connectionScope: CoroutineScope? = null + private val reconnectHelper = ReconnectHelper(coroutineScope, ::reconnect) private val incomingMessages = Channel() private val outgoingMessages = Channel() @@ -112,11 +115,14 @@ public class SocketInstance internal constructor( // Remove listeners that don't want credential changes if (credentialsChanged) listenerHelper.reportCredentialChangedReconnect() - val connected = connection != null // TODO make smarter (handle disconnect etc.) + val connected = connection != null when { // Stop if there's no listeners - listenerHelper.listeners.isEmpty() -> connection?.disconnect() + listenerHelper.listeners.isEmpty() -> { + reconnectHelper.reset() + connection?.disconnect() + } // Reconnect when credentials changed or not connected credentialsChanged || !connected -> reconnect() // Update subscriptions when not reconnecting or disconnecting @@ -168,6 +174,7 @@ public class SocketInstance internal constructor( connection?.disconnect() connectionScope?.cancel() + reconnectHelper.notifyReconnect() // No base url set. The app might want to set it later and call [updateCredentials] if (baseUrl == null) { @@ -200,16 +207,30 @@ public class SocketInstance internal constructor( if (!connected) { _state.value = SocketInstanceState.ERROR + reconnectHelper.scheduleReconnect(error = true) return null } scope.launch { for (message in incomingMessages) forwardMessage(message) } scope.launch { for (message in outgoingMessages) send(message) } + scope.launch { + // Wait for first state change in the SocketInstanceConnection + val disconnectState = state.first { it != SocketInstanceState.CONNECTED } + + // Only act if the SocketInstance state is CONNECTED + if (_state.value == SocketInstanceState.CONNECTED) { + val scheduled = reconnectHelper.scheduleReconnect(disconnectState == SocketInstanceState.ERROR) + + _state.value = if (!scheduled) SocketInstanceState.ERROR + else SocketInstanceState.DISCONNECTED + } + } listenerHelper.activeSubscriptions.clear() updateSubscriptions() _state.value = SocketInstanceState.CONNECTED + reconnectHelper.notifyConnected() return this } @@ -223,6 +244,7 @@ public class SocketInstance internal constructor( connection?.disconnect() connectionScope?.cancel() listenerHelper.reset() + reconnectHelper.reset() incomingMessages.close() outgoingMessages.close() coroutineScope.cancel() diff --git a/jellyfin-api/src/commonMain/kotlin/org/jellyfin/sdk/api/sockets/SocketInstanceConnection.kt b/jellyfin-api/src/commonMain/kotlin/org/jellyfin/sdk/api/sockets/SocketInstanceConnection.kt index 9893790f1..0ccb1aa9c 100644 --- a/jellyfin-api/src/commonMain/kotlin/org/jellyfin/sdk/api/sockets/SocketInstanceConnection.kt +++ b/jellyfin-api/src/commonMain/kotlin/org/jellyfin/sdk/api/sockets/SocketInstanceConnection.kt @@ -1,9 +1,17 @@ package org.jellyfin.sdk.api.sockets +import kotlinx.coroutines.flow.StateFlow + /** * Reusable WebSocket connection. Constructed using [SocketConnectionFactory]. */ public interface SocketInstanceConnection { + /** + * State of the connection. Requires at least the [SocketInstanceState.ERROR] and [SocketInstanceState.DISCONNECTED] + * states to be implemented. + */ + public val state: StateFlow + /** * Connect to [url]. If there is an existing connection it will be automatically closed. After the connection is * initialized the messageListener supplied via the factory will be called until [disconnect] is called or the diff --git a/jellyfin-api/src/commonMain/kotlin/org/jellyfin/sdk/api/sockets/helper/ReconnectHelper.kt b/jellyfin-api/src/commonMain/kotlin/org/jellyfin/sdk/api/sockets/helper/ReconnectHelper.kt index dd9f0f84a..d72429b50 100644 --- a/jellyfin-api/src/commonMain/kotlin/org/jellyfin/sdk/api/sockets/helper/ReconnectHelper.kt +++ b/jellyfin-api/src/commonMain/kotlin/org/jellyfin/sdk/api/sockets/helper/ReconnectHelper.kt @@ -1,9 +1,61 @@ package org.jellyfin.sdk.api.sockets.helper -internal class ReconnectHelper { - fun reportConnect() {} - fun reportDisconnect(){} - fun reset() {} +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Job +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import mu.KotlinLogging +import kotlin.time.Duration.Companion.seconds - fun shouldReconnectNow() {} +private val logger = KotlinLogging.logger {} + +internal class ReconnectHelper( + private val coroutineScope: CoroutineScope, + private val reconnect: suspend () -> Unit, +) { + private var reconnectJob: Job? = null + private var attempts = 0 + + fun reset() { + logger.debug { "Resetting" } + reconnectJob?.cancel() + attempts = 0 + } + + fun notifyReconnect() { + reconnectJob?.cancel() + attempts++ + logger.debug { "Notified about reconnect, attempts=${attempts}" } + } + + fun notifyConnected() { + attempts = 0 + logger.debug { "Notified about connect, attempts reset" } + } + + fun scheduleReconnect(error: Boolean = false): Boolean { + reconnectJob?.cancel() + + if (attempts > RETRY_ATTEMPTS) { + logger.debug { "Reconnect schedule failed: exceeded maximum retry attempts" } + return false + } + + reconnectJob = coroutineScope.launch { + val retryAfter = if (error) RETRY_ERROR else RETRY_NORMAL + + logger.info { "Reconnect scheduled in $retryAfter (error=$error)" } + + delay(retryAfter) + reconnect() + } + + return true + } + + companion object { + val RETRY_NORMAL = 5.seconds + val RETRY_ERROR = 30.seconds + const val RETRY_ATTEMPTS = 5 + } } diff --git a/jellyfin-api/src/jvmMain/kotlin/org/jellyfin/sdk/api/sockets/OkHttpWebsocketSession.kt b/jellyfin-api/src/jvmMain/kotlin/org/jellyfin/sdk/api/sockets/OkHttpWebsocketSession.kt index 53ecd61f4..82e6092a6 100644 --- a/jellyfin-api/src/jvmMain/kotlin/org/jellyfin/sdk/api/sockets/OkHttpWebsocketSession.kt +++ b/jellyfin-api/src/jvmMain/kotlin/org/jellyfin/sdk/api/sockets/OkHttpWebsocketSession.kt @@ -3,6 +3,8 @@ package org.jellyfin.sdk.api.sockets import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.flow.asStateFlow import kotlinx.coroutines.flow.first import kotlinx.coroutines.launch import mu.KotlinLogging @@ -36,12 +38,13 @@ public class OkHttpWebsocketSession( writeTimeout(clientOptions.socketTimeout, TimeUnit.MILLISECONDS) }.build() private var webSocket: WebSocket? = null - private val state = MutableStateFlow(SocketInstanceState.DISCONNECTED) + private val _state = MutableStateFlow(SocketInstanceState.DISCONNECTED) + public override val state: StateFlow = _state.asStateFlow() private val listener = object : WebSocketListener() { override fun onOpen(webSocket: WebSocket, response: Response) { logger.info { "WebSocket has opened" } - state.value = SocketInstanceState.CONNECTED + _state.value = SocketInstanceState.CONNECTED } override fun onMessage(webSocket: WebSocket, text: String) { @@ -58,13 +61,13 @@ public class OkHttpWebsocketSession( override fun onClosing(webSocket: WebSocket, code: Int, reason: String) { logger.info { "WebSocket is closing, code=$code, reason=$reason" } - state.value = SocketInstanceState.DISCONNECTED + _state.value = SocketInstanceState.DISCONNECTED } @Suppress("PARAMETER_NAME_CHANGED_ON_OVERRIDE") override fun onClosed(closedWebSocket: WebSocket, code: Int, reason: String) { logger.info { "WebSocket has closed, code=$code, reason=$reason" } - state.value = SocketInstanceState.DISCONNECTED + _state.value = SocketInstanceState.DISCONNECTED if (webSocket == closedWebSocket) webSocket = null } @@ -72,7 +75,7 @@ public class OkHttpWebsocketSession( @Suppress("PARAMETER_NAME_CHANGED_ON_OVERRIDE") override fun onFailure(failedWebSocket: WebSocket, t: Throwable, response: Response?) { logger.warn(t) { "WebSocket has failed" } - state.value = SocketInstanceState.ERROR + _state.value = SocketInstanceState.ERROR if (webSocket == failedWebSocket) webSocket = null } @@ -85,7 +88,7 @@ public class OkHttpWebsocketSession( url(url) }.build() - state.value = SocketInstanceState.CONNECTING + _state.value = SocketInstanceState.CONNECTING webSocket = client.newWebSocket(request, listener) return state.first { it != SocketInstanceState.CONNECTING } == SocketInstanceState.CONNECTED @@ -115,7 +118,7 @@ public class OkHttpWebsocketSession( } override suspend fun disconnect() { - state.value = SocketInstanceState.DISCONNECTED + _state.value = SocketInstanceState.DISCONNECTED webSocket?.close(CLOSE_REASON_NORMAL, null) webSocket = null }