diff --git a/jellyfin-api/api/jellyfin-api.api b/jellyfin-api/api/jellyfin-api.api index c682a37bc..af49b857a 100644 --- a/jellyfin-api/api/jellyfin-api.api +++ b/jellyfin-api/api/jellyfin-api.api @@ -1184,13 +1184,14 @@ public final class org/jellyfin/sdk/api/sockets/ListenerRegistrationExtensionsKt } public final class org/jellyfin/sdk/api/sockets/OkHttpWebsocketSession : org/jellyfin/sdk/api/sockets/SocketInstanceConnection { - public fun (Lorg/jellyfin/sdk/api/client/HttpClientOptions;Lkotlinx/coroutines/channels/Channel;Lkotlinx/coroutines/channels/Channel;Lkotlin/coroutines/CoroutineContext;)V + public fun (Lorg/jellyfin/sdk/api/client/HttpClientOptions;Lkotlinx/coroutines/channels/Channel;Lkotlin/coroutines/CoroutineContext;)V public fun connect (Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public fun disconnect (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public fun send (Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; } public abstract interface class org/jellyfin/sdk/api/sockets/SocketConnectionFactory { - public abstract fun create (Lorg/jellyfin/sdk/api/client/HttpClientOptions;Lkotlinx/coroutines/channels/Channel;Lkotlinx/coroutines/channels/Channel;Lkotlin/coroutines/CoroutineContext;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public abstract fun create (Lorg/jellyfin/sdk/api/client/HttpClientOptions;Lkotlinx/coroutines/channels/Channel;Lkotlin/coroutines/CoroutineContext;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; } public final class org/jellyfin/sdk/api/sockets/SocketInstance { @@ -1206,6 +1207,7 @@ public final class org/jellyfin/sdk/api/sockets/SocketInstance { public abstract interface class org/jellyfin/sdk/api/sockets/SocketInstanceConnection { public abstract fun connect (Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public abstract fun disconnect (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public abstract fun send (Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; } public final class org/jellyfin/sdk/api/sockets/SocketInstanceState : java/lang/Enum { diff --git a/jellyfin-api/src/commonMain/kotlin/org/jellyfin/sdk/api/sockets/SocketConnectionFactory.kt b/jellyfin-api/src/commonMain/kotlin/org/jellyfin/sdk/api/sockets/SocketConnectionFactory.kt index 0871fd58e..ae9f4dcfc 100644 --- a/jellyfin-api/src/commonMain/kotlin/org/jellyfin/sdk/api/sockets/SocketConnectionFactory.kt +++ b/jellyfin-api/src/commonMain/kotlin/org/jellyfin/sdk/api/sockets/SocketConnectionFactory.kt @@ -8,7 +8,6 @@ public fun interface SocketConnectionFactory { public suspend fun create( clientOptions: HttpClientOptions, incomingMessageChannel: Channel, - outgoingMessageChannel: Channel, coroutineContext: CoroutineContext, ): SocketInstanceConnection } diff --git a/jellyfin-api/src/commonMain/kotlin/org/jellyfin/sdk/api/sockets/SocketInstance.kt b/jellyfin-api/src/commonMain/kotlin/org/jellyfin/sdk/api/sockets/SocketInstance.kt index 259c55523..7cc6e76ab 100644 --- a/jellyfin-api/src/commonMain/kotlin/org/jellyfin/sdk/api/sockets/SocketInstance.kt +++ b/jellyfin-api/src/commonMain/kotlin/org/jellyfin/sdk/api/sockets/SocketInstance.kt @@ -9,6 +9,7 @@ import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.launch +import kotlinx.coroutines.plus import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock import mu.KotlinLogging @@ -52,13 +53,13 @@ public class SocketInstance internal constructor( public val state: StateFlow = _state private var connection: SocketInstanceConnection? = null + private var connectionScope: CoroutineScope? = null private val incomingMessages = Channel() private val outgoingMessages = Channel() private val listenerHelper = ListenerHelper() private val keepAliveHelper = KeepAliveHelper(coroutineScope) - private var messageForwardJob: Job? = null private val updateConnectionStateMutex = Mutex() /** @@ -166,6 +167,7 @@ public class SocketInstance internal constructor( _state.value = SocketInstanceState.CONNECTING connection?.disconnect() + connectionScope?.cancel() // No base url set. The app might want to set it later and call [updateCredentials] if (baseUrl == null) { @@ -174,34 +176,41 @@ public class SocketInstance internal constructor( return } - connection = socketConnectionFactory.create( - api.httpClientOptions, - incomingMessages, - outgoingMessages, - coroutineContext, - ).apply { - val connected = connect(UrlBuilder.buildUrl( - baseUrl = requireNotNull(baseUrl), - pathTemplate = SOCKET_URL, - queryParameters = mapOf( - QUERY_DEVICE_ID to deviceInfo.id, - QUERY_ACCESS_TOKEN to accessToken, - ) - ).replace(Regex("^http"), "ws")) - - if (connected) { - messageForwardJob?.cancel() - messageForwardJob = coroutineScope.launch { - for (message in incomingMessages) forwardMessage(message) - } - listenerHelper.activeSubscriptions.clear() - updateSubscriptions() - _state.value = SocketInstanceState.CONNECTED - } else { - _state.value = SocketInstanceState.ERROR - // TODO retry: failed connecting - } + // Create connection + val scope = coroutineScope + Job() + connectionScope = scope + scope.launch { + connection = socketConnectionFactory.create( + api.httpClientOptions, + incomingMessages, + coroutineContext, + ).connectAndBind(this) + } + } + + private suspend fun SocketInstanceConnection.connectAndBind(scope: CoroutineScope): SocketInstanceConnection? { + val connected = connect(UrlBuilder.buildUrl( + baseUrl = requireNotNull(baseUrl), + pathTemplate = SOCKET_URL, + queryParameters = mapOf( + QUERY_DEVICE_ID to deviceInfo.id, + QUERY_ACCESS_TOKEN to accessToken, + ) + ).replace(Regex("^http"), "ws")) + + if (!connected) { + _state.value = SocketInstanceState.ERROR + return null } + + scope.launch { for (message in incomingMessages) forwardMessage(message) } + scope.launch { for (message in outgoingMessages) send(message) } + + listenerHelper.activeSubscriptions.clear() + updateSubscriptions() + _state.value = SocketInstanceState.CONNECTED + + return this } /** @@ -212,7 +221,7 @@ public class SocketInstance internal constructor( logger.info { "Stopping socket instance" } connection?.disconnect() - messageForwardJob?.cancel() + connectionScope?.cancel() listenerHelper.reset() incomingMessages.close() outgoingMessages.close() diff --git a/jellyfin-api/src/commonMain/kotlin/org/jellyfin/sdk/api/sockets/SocketInstanceConnection.kt b/jellyfin-api/src/commonMain/kotlin/org/jellyfin/sdk/api/sockets/SocketInstanceConnection.kt index 93f3ded83..9893790f1 100644 --- a/jellyfin-api/src/commonMain/kotlin/org/jellyfin/sdk/api/sockets/SocketInstanceConnection.kt +++ b/jellyfin-api/src/commonMain/kotlin/org/jellyfin/sdk/api/sockets/SocketInstanceConnection.kt @@ -15,6 +15,14 @@ public interface SocketInstanceConnection { */ public suspend fun connect(url: String): Boolean + /** + * Send a message to this connection. Messages might still fail to send due to network issues or other reasons when + * the returned value is true. + * + * @return false when failed to send/queue. true when it's likely the message will be sent. + */ + public suspend fun send(message: String): Boolean + /** * Disconnect the connection. Will do nothing when there is no connection. * diff --git a/jellyfin-api/src/jvmMain/kotlin/org/jellyfin/sdk/api/sockets/OkHttpWebsocketSession.kt b/jellyfin-api/src/jvmMain/kotlin/org/jellyfin/sdk/api/sockets/OkHttpWebsocketSession.kt index 4dec68097..53ecd61f4 100644 --- a/jellyfin-api/src/jvmMain/kotlin/org/jellyfin/sdk/api/sockets/OkHttpWebsocketSession.kt +++ b/jellyfin-api/src/jvmMain/kotlin/org/jellyfin/sdk/api/sockets/OkHttpWebsocketSession.kt @@ -1,7 +1,6 @@ package org.jellyfin.sdk.api.sockets import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Job import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.first @@ -22,7 +21,6 @@ private val logger = KotlinLogging.logger {} public class OkHttpWebsocketSession( clientOptions: HttpClientOptions, private val incomingMessageChannel: Channel, - private val outgoingMessageChannel: Channel, context: CoroutineContext, ) : SocketInstanceConnection { private companion object { @@ -38,7 +36,6 @@ public class OkHttpWebsocketSession( writeTimeout(clientOptions.socketTimeout, TimeUnit.MILLISECONDS) }.build() private var webSocket: WebSocket? = null - private var messageForwardJob: Job? = null private val state = MutableStateFlow(SocketInstanceState.DISCONNECTED) private val listener = object : WebSocketListener() { @@ -62,9 +59,6 @@ public class OkHttpWebsocketSession( override fun onClosing(webSocket: WebSocket, code: Int, reason: String) { logger.info { "WebSocket is closing, code=$code, reason=$reason" } state.value = SocketInstanceState.DISCONNECTED - - messageForwardJob?.cancel() - messageForwardJob = null } @Suppress("PARAMETER_NAME_CHANGED_ON_OVERRIDE") @@ -80,9 +74,6 @@ public class OkHttpWebsocketSession( logger.warn(t) { "WebSocket has failed" } state.value = SocketInstanceState.ERROR - // When onFailure is called, the onClosing and onClosed functions are not called - messageForwardJob?.cancel() - messageForwardJob = null if (webSocket == failedWebSocket) webSocket = null } } @@ -97,15 +88,30 @@ public class OkHttpWebsocketSession( state.value = SocketInstanceState.CONNECTING webSocket = client.newWebSocket(request, listener) - messageForwardJob = coroutineScope.launch { - for (message in outgoingMessageChannel) { - logger.info { "Sending (raw) message $message" } + return state.first { it != SocketInstanceState.CONNECTING } == SocketInstanceState.CONNECTED + } - webSocket?.send(message) - } + override suspend fun send(message: String): Boolean { + logger.info { "Sending (raw) message $message" } + + // Invalid state + if (state.value != SocketInstanceState.CONNECTED) { + logger.warn { "Unable to send message: invalid state (state=${state.value})" } + return false } - return state.first { it != SocketInstanceState.CONNECTING } == SocketInstanceState.CONNECTED + val ws = webSocket + + // No existing socket + if (ws == null) { + logger.warn { "Unable to send message: webSocket is null" } + return false + } + + val sent = ws.send(message) + if (!sent) logger.warn { "Unable to send message: OkHttp returned false" } + + return sent } override suspend fun disconnect() {