Skip to content

Commit

Permalink
Add retry logic to SocketInstance (#373)
Browse files Browse the repository at this point in the history
  • Loading branch information
nielsvanvelzen authored Apr 9, 2022
1 parent f4d3d3d commit 44818c0
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 14 deletions.
2 changes: 2 additions & 0 deletions jellyfin-api/api/jellyfin-api.api
Original file line number Diff line number Diff line change
Expand Up @@ -1187,6 +1187,7 @@ public final class org/jellyfin/sdk/api/sockets/OkHttpWebsocketSession : org/jel
public fun <init> (Lorg/jellyfin/sdk/api/client/HttpClientOptions;Lkotlinx/coroutines/channels/Channel;Lkotlin/coroutines/CoroutineContext;)V
public fun connect (Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun disconnect (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun getState ()Lkotlinx/coroutines/flow/StateFlow;
public fun send (Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

Expand All @@ -1207,6 +1208,7 @@ public final class org/jellyfin/sdk/api/sockets/SocketInstance {
public abstract interface class org/jellyfin/sdk/api/sockets/SocketInstanceConnection {
public abstract fun connect (Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun disconnect (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun getState ()Lkotlinx/coroutines/flow/StateFlow;
public abstract fun send (Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.launch
import kotlinx.coroutines.plus
import kotlinx.coroutines.sync.Mutex
Expand All @@ -19,6 +20,7 @@ import org.jellyfin.sdk.api.client.util.UrlBuilder
import org.jellyfin.sdk.api.sockets.exception.SocketStoppedException
import org.jellyfin.sdk.api.sockets.helper.KeepAliveHelper
import org.jellyfin.sdk.api.sockets.helper.ListenerHelper
import org.jellyfin.sdk.api.sockets.helper.ReconnectHelper
import org.jellyfin.sdk.api.sockets.listener.SocketListener
import org.jellyfin.sdk.api.sockets.listener.SocketListenerDefinition
import org.jellyfin.sdk.model.socket.ForceKeepAliveMessage
Expand Down Expand Up @@ -54,6 +56,7 @@ public class SocketInstance internal constructor(

private var connection: SocketInstanceConnection? = null
private var connectionScope: CoroutineScope? = null
private val reconnectHelper = ReconnectHelper(coroutineScope, ::reconnect)
private val incomingMessages = Channel<String>()
private val outgoingMessages = Channel<String>()

Expand Down Expand Up @@ -112,11 +115,14 @@ public class SocketInstance internal constructor(
// Remove listeners that don't want credential changes
if (credentialsChanged) listenerHelper.reportCredentialChangedReconnect()

val connected = connection != null // TODO make smarter (handle disconnect etc.)
val connected = connection != null

when {
// Stop if there's no listeners
listenerHelper.listeners.isEmpty() -> connection?.disconnect()
listenerHelper.listeners.isEmpty() -> {
reconnectHelper.reset()
connection?.disconnect()
}
// Reconnect when credentials changed or not connected
credentialsChanged || !connected -> reconnect()
// Update subscriptions when not reconnecting or disconnecting
Expand Down Expand Up @@ -168,6 +174,7 @@ public class SocketInstance internal constructor(

connection?.disconnect()
connectionScope?.cancel()
reconnectHelper.notifyReconnect()

// No base url set. The app might want to set it later and call [updateCredentials]
if (baseUrl == null) {
Expand Down Expand Up @@ -200,16 +207,30 @@ public class SocketInstance internal constructor(

if (!connected) {
_state.value = SocketInstanceState.ERROR
reconnectHelper.scheduleReconnect(error = true)
return null
}

scope.launch { for (message in incomingMessages) forwardMessage(message) }
scope.launch { for (message in outgoingMessages) send(message) }
scope.launch {
// Wait for first state change in the SocketInstanceConnection
val disconnectState = state.first { it != SocketInstanceState.CONNECTED }

// Only act if the SocketInstance state is CONNECTED
if (_state.value == SocketInstanceState.CONNECTED) {
val scheduled = reconnectHelper.scheduleReconnect(disconnectState == SocketInstanceState.ERROR)

_state.value = if (!scheduled) SocketInstanceState.ERROR
else SocketInstanceState.DISCONNECTED
}
}

listenerHelper.activeSubscriptions.clear()
updateSubscriptions()
_state.value = SocketInstanceState.CONNECTED

reconnectHelper.notifyConnected()
return this
}

Expand All @@ -223,6 +244,7 @@ public class SocketInstance internal constructor(
connection?.disconnect()
connectionScope?.cancel()
listenerHelper.reset()
reconnectHelper.reset()
incomingMessages.close()
outgoingMessages.close()
coroutineScope.cancel()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
package org.jellyfin.sdk.api.sockets

import kotlinx.coroutines.flow.StateFlow

/**
* Reusable WebSocket connection. Constructed using [SocketConnectionFactory].
*/
public interface SocketInstanceConnection {
/**
* State of the connection. Requires at least the [SocketInstanceState.ERROR] and [SocketInstanceState.DISCONNECTED]
* states to be implemented.
*/
public val state: StateFlow<SocketInstanceState>

/**
* Connect to [url]. If there is an existing connection it will be automatically closed. After the connection is
* initialized the messageListener supplied via the factory will be called until [disconnect] is called or the
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,61 @@
package org.jellyfin.sdk.api.sockets.helper

internal class ReconnectHelper {
fun reportConnect() {}
fun reportDisconnect(){}
fun reset() {}
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import mu.KotlinLogging
import kotlin.time.Duration.Companion.seconds

fun shouldReconnectNow() {}
private val logger = KotlinLogging.logger {}

internal class ReconnectHelper(
private val coroutineScope: CoroutineScope,
private val reconnect: suspend () -> Unit,
) {
private var reconnectJob: Job? = null
private var attempts = 0

fun reset() {
logger.debug { "Resetting" }
reconnectJob?.cancel()
attempts = 0
}

fun notifyReconnect() {
reconnectJob?.cancel()
attempts++
logger.debug { "Notified about reconnect, attempts=${attempts}" }
}

fun notifyConnected() {
attempts = 0
logger.debug { "Notified about connect, attempts reset" }
}

fun scheduleReconnect(error: Boolean = false): Boolean {
reconnectJob?.cancel()

if (attempts > RETRY_ATTEMPTS) {
logger.debug { "Reconnect schedule failed: exceeded maximum retry attempts" }
return false
}

reconnectJob = coroutineScope.launch {
val retryAfter = if (error) RETRY_ERROR else RETRY_NORMAL

logger.info { "Reconnect scheduled in $retryAfter (error=$error)" }

delay(retryAfter)
reconnect()
}

return true
}

companion object {
val RETRY_NORMAL = 5.seconds
val RETRY_ERROR = 30.seconds
const val RETRY_ATTEMPTS = 5
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package org.jellyfin.sdk.api.sockets
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.launch
import mu.KotlinLogging
Expand Down Expand Up @@ -36,12 +38,13 @@ public class OkHttpWebsocketSession(
writeTimeout(clientOptions.socketTimeout, TimeUnit.MILLISECONDS)
}.build()
private var webSocket: WebSocket? = null
private val state = MutableStateFlow(SocketInstanceState.DISCONNECTED)
private val _state = MutableStateFlow(SocketInstanceState.DISCONNECTED)
public override val state: StateFlow<SocketInstanceState> = _state.asStateFlow()

private val listener = object : WebSocketListener() {
override fun onOpen(webSocket: WebSocket, response: Response) {
logger.info { "WebSocket has opened" }
state.value = SocketInstanceState.CONNECTED
_state.value = SocketInstanceState.CONNECTED
}

override fun onMessage(webSocket: WebSocket, text: String) {
Expand All @@ -58,21 +61,21 @@ public class OkHttpWebsocketSession(

override fun onClosing(webSocket: WebSocket, code: Int, reason: String) {
logger.info { "WebSocket is closing, code=$code, reason=$reason" }
state.value = SocketInstanceState.DISCONNECTED
_state.value = SocketInstanceState.DISCONNECTED
}

@Suppress("PARAMETER_NAME_CHANGED_ON_OVERRIDE")
override fun onClosed(closedWebSocket: WebSocket, code: Int, reason: String) {
logger.info { "WebSocket has closed, code=$code, reason=$reason" }
state.value = SocketInstanceState.DISCONNECTED
_state.value = SocketInstanceState.DISCONNECTED

if (webSocket == closedWebSocket) webSocket = null
}

@Suppress("PARAMETER_NAME_CHANGED_ON_OVERRIDE")
override fun onFailure(failedWebSocket: WebSocket, t: Throwable, response: Response?) {
logger.warn(t) { "WebSocket has failed" }
state.value = SocketInstanceState.ERROR
_state.value = SocketInstanceState.ERROR

if (webSocket == failedWebSocket) webSocket = null
}
Expand All @@ -85,7 +88,7 @@ public class OkHttpWebsocketSession(
url(url)
}.build()

state.value = SocketInstanceState.CONNECTING
_state.value = SocketInstanceState.CONNECTING
webSocket = client.newWebSocket(request, listener)

return state.first { it != SocketInstanceState.CONNECTING } == SocketInstanceState.CONNECTED
Expand Down Expand Up @@ -115,7 +118,7 @@ public class OkHttpWebsocketSession(
}

override suspend fun disconnect() {
state.value = SocketInstanceState.DISCONNECTED
_state.value = SocketInstanceState.DISCONNECTED
webSocket?.close(CLOSE_REASON_NORMAL, null)
webSocket = null
}
Expand Down

0 comments on commit 44818c0

Please sign in to comment.