Skip to content

Commit

Permalink
Rewrite/simplify internal WebSocket message sending (#372)
Browse files Browse the repository at this point in the history
  • Loading branch information
nielsvanvelzen authored Apr 9, 2022
1 parent 34a4be7 commit f4d3d3d
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 47 deletions.
6 changes: 4 additions & 2 deletions jellyfin-api/api/jellyfin-api.api
Original file line number Diff line number Diff line change
Expand Up @@ -1184,13 +1184,14 @@ public final class org/jellyfin/sdk/api/sockets/ListenerRegistrationExtensionsKt
}

public final class org/jellyfin/sdk/api/sockets/OkHttpWebsocketSession : org/jellyfin/sdk/api/sockets/SocketInstanceConnection {
public fun <init> (Lorg/jellyfin/sdk/api/client/HttpClientOptions;Lkotlinx/coroutines/channels/Channel;Lkotlinx/coroutines/channels/Channel;Lkotlin/coroutines/CoroutineContext;)V
public fun <init> (Lorg/jellyfin/sdk/api/client/HttpClientOptions;Lkotlinx/coroutines/channels/Channel;Lkotlin/coroutines/CoroutineContext;)V
public fun connect (Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun disconnect (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun send (Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public abstract interface class org/jellyfin/sdk/api/sockets/SocketConnectionFactory {
public abstract fun create (Lorg/jellyfin/sdk/api/client/HttpClientOptions;Lkotlinx/coroutines/channels/Channel;Lkotlinx/coroutines/channels/Channel;Lkotlin/coroutines/CoroutineContext;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun create (Lorg/jellyfin/sdk/api/client/HttpClientOptions;Lkotlinx/coroutines/channels/Channel;Lkotlin/coroutines/CoroutineContext;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public final class org/jellyfin/sdk/api/sockets/SocketInstance {
Expand All @@ -1206,6 +1207,7 @@ public final class org/jellyfin/sdk/api/sockets/SocketInstance {
public abstract interface class org/jellyfin/sdk/api/sockets/SocketInstanceConnection {
public abstract fun connect (Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun disconnect (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun send (Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public final class org/jellyfin/sdk/api/sockets/SocketInstanceState : java/lang/Enum {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ public fun interface SocketConnectionFactory {
public suspend fun create(
clientOptions: HttpClientOptions,
incomingMessageChannel: Channel<String>,
outgoingMessageChannel: Channel<String>,
coroutineContext: CoroutineContext,
): SocketInstanceConnection
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.launch
import kotlinx.coroutines.plus
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import mu.KotlinLogging
Expand Down Expand Up @@ -52,13 +53,13 @@ public class SocketInstance internal constructor(
public val state: StateFlow<SocketInstanceState> = _state

private var connection: SocketInstanceConnection? = null
private var connectionScope: CoroutineScope? = null
private val incomingMessages = Channel<String>()
private val outgoingMessages = Channel<String>()

private val listenerHelper = ListenerHelper()
private val keepAliveHelper = KeepAliveHelper(coroutineScope)

private var messageForwardJob: Job? = null
private val updateConnectionStateMutex = Mutex()

/**
Expand Down Expand Up @@ -166,6 +167,7 @@ public class SocketInstance internal constructor(
_state.value = SocketInstanceState.CONNECTING

connection?.disconnect()
connectionScope?.cancel()

// No base url set. The app might want to set it later and call [updateCredentials]
if (baseUrl == null) {
Expand All @@ -174,34 +176,41 @@ public class SocketInstance internal constructor(
return
}

connection = socketConnectionFactory.create(
api.httpClientOptions,
incomingMessages,
outgoingMessages,
coroutineContext,
).apply {
val connected = connect(UrlBuilder.buildUrl(
baseUrl = requireNotNull(baseUrl),
pathTemplate = SOCKET_URL,
queryParameters = mapOf(
QUERY_DEVICE_ID to deviceInfo.id,
QUERY_ACCESS_TOKEN to accessToken,
)
).replace(Regex("^http"), "ws"))

if (connected) {
messageForwardJob?.cancel()
messageForwardJob = coroutineScope.launch {
for (message in incomingMessages) forwardMessage(message)
}
listenerHelper.activeSubscriptions.clear()
updateSubscriptions()
_state.value = SocketInstanceState.CONNECTED
} else {
_state.value = SocketInstanceState.ERROR
// TODO retry: failed connecting
}
// Create connection
val scope = coroutineScope + Job()
connectionScope = scope
scope.launch {
connection = socketConnectionFactory.create(
api.httpClientOptions,
incomingMessages,
coroutineContext,
).connectAndBind(this)
}
}

private suspend fun SocketInstanceConnection.connectAndBind(scope: CoroutineScope): SocketInstanceConnection? {
val connected = connect(UrlBuilder.buildUrl(
baseUrl = requireNotNull(baseUrl),
pathTemplate = SOCKET_URL,
queryParameters = mapOf(
QUERY_DEVICE_ID to deviceInfo.id,
QUERY_ACCESS_TOKEN to accessToken,
)
).replace(Regex("^http"), "ws"))

if (!connected) {
_state.value = SocketInstanceState.ERROR
return null
}

scope.launch { for (message in incomingMessages) forwardMessage(message) }
scope.launch { for (message in outgoingMessages) send(message) }

listenerHelper.activeSubscriptions.clear()
updateSubscriptions()
_state.value = SocketInstanceState.CONNECTED

return this
}

/**
Expand All @@ -212,7 +221,7 @@ public class SocketInstance internal constructor(
logger.info { "Stopping socket instance" }

connection?.disconnect()
messageForwardJob?.cancel()
connectionScope?.cancel()
listenerHelper.reset()
incomingMessages.close()
outgoingMessages.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@ public interface SocketInstanceConnection {
*/
public suspend fun connect(url: String): Boolean

/**
* Send a message to this connection. Messages might still fail to send due to network issues or other reasons when
* the returned value is true.
*
* @return false when failed to send/queue. true when it's likely the message will be sent.
*/
public suspend fun send(message: String): Boolean

/**
* Disconnect the connection. Will do nothing when there is no connection.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.jellyfin.sdk.api.sockets

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.first
Expand All @@ -22,7 +21,6 @@ private val logger = KotlinLogging.logger {}
public class OkHttpWebsocketSession(
clientOptions: HttpClientOptions,
private val incomingMessageChannel: Channel<String>,
private val outgoingMessageChannel: Channel<String>,
context: CoroutineContext,
) : SocketInstanceConnection {
private companion object {
Expand All @@ -38,7 +36,6 @@ public class OkHttpWebsocketSession(
writeTimeout(clientOptions.socketTimeout, TimeUnit.MILLISECONDS)
}.build()
private var webSocket: WebSocket? = null
private var messageForwardJob: Job? = null
private val state = MutableStateFlow(SocketInstanceState.DISCONNECTED)

private val listener = object : WebSocketListener() {
Expand All @@ -62,9 +59,6 @@ public class OkHttpWebsocketSession(
override fun onClosing(webSocket: WebSocket, code: Int, reason: String) {
logger.info { "WebSocket is closing, code=$code, reason=$reason" }
state.value = SocketInstanceState.DISCONNECTED

messageForwardJob?.cancel()
messageForwardJob = null
}

@Suppress("PARAMETER_NAME_CHANGED_ON_OVERRIDE")
Expand All @@ -80,9 +74,6 @@ public class OkHttpWebsocketSession(
logger.warn(t) { "WebSocket has failed" }
state.value = SocketInstanceState.ERROR

// When onFailure is called, the onClosing and onClosed functions are not called
messageForwardJob?.cancel()
messageForwardJob = null
if (webSocket == failedWebSocket) webSocket = null
}
}
Expand All @@ -97,15 +88,30 @@ public class OkHttpWebsocketSession(
state.value = SocketInstanceState.CONNECTING
webSocket = client.newWebSocket(request, listener)

messageForwardJob = coroutineScope.launch {
for (message in outgoingMessageChannel) {
logger.info { "Sending (raw) message $message" }
return state.first { it != SocketInstanceState.CONNECTING } == SocketInstanceState.CONNECTED
}

webSocket?.send(message)
}
override suspend fun send(message: String): Boolean {
logger.info { "Sending (raw) message $message" }

// Invalid state
if (state.value != SocketInstanceState.CONNECTED) {
logger.warn { "Unable to send message: invalid state (state=${state.value})" }
return false
}

return state.first { it != SocketInstanceState.CONNECTING } == SocketInstanceState.CONNECTED
val ws = webSocket

// No existing socket
if (ws == null) {
logger.warn { "Unable to send message: webSocket is null" }
return false
}

val sent = ws.send(message)
if (!sent) logger.warn { "Unable to send message: OkHttp returned false" }

return sent
}

override suspend fun disconnect() {
Expand Down

0 comments on commit f4d3d3d

Please sign in to comment.