Skip to content

Commit

Permalink
fix propagate valid error message
Browse files Browse the repository at this point in the history
  • Loading branch information
pedroSG94 committed Sep 24, 2024
1 parent af0dc61 commit ea83e4e
Show file tree
Hide file tree
Showing 11 changed files with 37 additions and 20 deletions.
4 changes: 4 additions & 0 deletions common/src/main/java/com/pedro/common/Extensions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -135,4 +135,8 @@ fun String.getIndexes(char: Char): Array<Int> {
val indexes = mutableListOf<Int>()
forEachIndexed { index, c -> if (c == char) indexes.add(index) }
return indexes.toTypedArray()
}

fun Throwable.validMessage(): String {
return (message ?: "").ifEmpty { javaClass.simpleName }
}
10 changes: 6 additions & 4 deletions common/src/main/java/com/pedro/common/socket/TcpStreamSocket.kt
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import io.ktor.utils.io.readUTF8Line
import io.ktor.utils.io.writeByte
import io.ktor.utils.io.writeFully
import io.ktor.utils.io.writeStringUtf8
import java.io.IOException
import java.net.ConnectException
import java.security.SecureRandom
import javax.net.ssl.TrustManager
import kotlin.coroutines.coroutineContext
Expand Down Expand Up @@ -112,7 +112,8 @@ class TcpStreamSocket(
}

suspend fun read(): Int {
return input?.readByte()?.toInt() ?: throw IOException("read with socket closed")
val input = input ?: throw ConnectException("Read with socket closed, broken pipe")
return input.readByte().toInt()
}

suspend fun readUInt16(): Int {
Expand All @@ -132,11 +133,12 @@ class TcpStreamSocket(
}

suspend fun readUntil(b: ByteArray) {
return input?.readFully(b) ?: throw IOException("read with socket closed")
val input = input ?: throw ConnectException("Read with socket closed, broken pipe")
return input.readFully(b)
}

suspend fun readLine(): String? {
val input = input ?: throw IOException("read with socket closed")
val input = input ?: throw ConnectException("Read with socket closed, broken pipe")
return input.readUTF8Line()
}

Expand Down
9 changes: 6 additions & 3 deletions rtmp/src/main/java/com/pedro/rtmp/rtmp/RtmpClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ import android.media.MediaCodec
import android.util.Log
import com.pedro.common.AudioCodec
import com.pedro.common.ConnectChecker
import com.pedro.common.ConnectionFailed
import com.pedro.common.TimeUtils
import com.pedro.common.UrlParser
import com.pedro.common.VideoCodec
import com.pedro.common.onMainThread
import com.pedro.common.validMessage
import com.pedro.rtmp.amf.AmfVersion
import com.pedro.rtmp.rtmp.message.*
import com.pedro.rtmp.rtmp.message.command.Command
Expand Down Expand Up @@ -269,7 +271,7 @@ class RtmpClient(private val connectChecker: ConnectChecker) {
if (error != null) {
Log.e(TAG, "connection error", error)
onMainThread {
connectChecker.onConnectionFailed("Error configure stream, ${error.message}")
connectChecker.onConnectionFailed("Error configure stream, ${error.validMessage()}")
}
return@launch
}
Expand All @@ -281,6 +283,7 @@ class RtmpClient(private val connectChecker: ConnectChecker) {
while (scope.isActive && isStreaming) {
val error = runCatching {
if (isAlive()) {
delay(2000)
//ignore packet after connect if tunneled to avoid spam idle
if (!tunneled) handleMessages()
} else {
Expand All @@ -290,7 +293,7 @@ class RtmpClient(private val connectChecker: ConnectChecker) {
scope.cancel()
}
}.exceptionOrNull()
if (error != null && error !is SocketTimeoutException) {
if (ConnectionFailed.parse(error?.validMessage() ?: "") != ConnectionFailed.TIMEOUT) {
scope.cancel()
}
}
Expand Down Expand Up @@ -491,7 +494,7 @@ class RtmpClient(private val connectChecker: ConnectChecker) {
}
}

suspend fun closeConnection() {
private suspend fun closeConnection() {
socket?.close()
commandsManager.reset()
}
Expand Down
3 changes: 2 additions & 1 deletion rtmp/src/main/java/com/pedro/rtmp/rtmp/RtmpSender.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import com.pedro.common.ConnectChecker
import com.pedro.common.VideoCodec
import com.pedro.common.onMainThread
import com.pedro.common.trySend
import com.pedro.common.validMessage
import com.pedro.rtmp.flv.BasePacket
import com.pedro.rtmp.flv.FlvPacket
import com.pedro.rtmp.flv.FlvType
Expand Down Expand Up @@ -183,7 +184,7 @@ class RtmpSender(
}.exceptionOrNull()
if (error != null) {
onMainThread {
connectChecker.onConnectionFailed("Error send packet, " + error.message)
connectChecker.onConnectionFailed("Error send packet, ${error.validMessage()}")
}
Log.e(TAG, "send error: ", error)
return@launch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class TcpTunneledSocket(private val host: String, private val port: Int, private
val bytes = socket.inputStream.readBytes()
if (bytes.size > 1) input = ByteArrayInputStream(bytes, 1, bytes.size)
val success = socket.responseCode == HttpURLConnection.HTTP_OK
if (!success) throw IOException("send packet failed: ${socket.responseMessage}")
if (!success) throw IOException("send packet failed: ${socket.responseMessage}, broken pipe")
} finally {
socket.disconnect()
}
Expand All @@ -96,7 +96,7 @@ class TcpTunneledSocket(private val host: String, private val port: Int, private
socket.connect()
val data = socket.inputStream.readBytes()
val success = socket.responseCode == HttpURLConnection.HTTP_OK
if (!success) throw IOException("receive packet failed: ${socket.responseMessage}")
if (!success) throw IOException("receive packet failed: ${socket.responseMessage}, broken pipe")
return data
} finally {
socket.disconnect()
Expand Down
7 changes: 4 additions & 3 deletions rtsp/src/main/java/com/pedro/rtsp/rtsp/RtspClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ import android.media.MediaCodec
import android.util.Log
import com.pedro.common.AudioCodec
import com.pedro.common.ConnectChecker
import com.pedro.common.ConnectionFailed
import com.pedro.common.UrlParser
import com.pedro.common.VideoCodec
import com.pedro.common.onMainThread
import com.pedro.common.socket.TcpStreamSocket
import com.pedro.common.validMessage
import com.pedro.rtsp.rtsp.commands.CommandsManager
import com.pedro.rtsp.rtsp.commands.Method
import com.pedro.rtsp.utils.RtpConstants
Expand All @@ -38,7 +40,6 @@ import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.withTimeoutOrNull
import java.io.*
import java.net.SocketTimeoutException
import java.net.URISyntaxException
import java.nio.ByteBuffer
import javax.net.ssl.TrustManager
Expand Down Expand Up @@ -340,7 +341,7 @@ class RtspClient(private val connectChecker: ConnectChecker) {
if (error != null) {
Log.e(TAG, "connection error", error)
onMainThread {
connectChecker.onConnectionFailed("Error configure stream, ${error.message}")
connectChecker.onConnectionFailed("Error configure stream, ${error.validMessage()}")
}
return@launch
}
Expand All @@ -367,7 +368,7 @@ class RtspClient(private val connectChecker: ConnectChecker) {
scope.cancel()
}
}.exceptionOrNull()
if (error != null && error !is SocketTimeoutException) {
if (ConnectionFailed.parse(error?.validMessage() ?: "") != ConnectionFailed.TIMEOUT) {
scope.cancel()
}
}
Expand Down
3 changes: 2 additions & 1 deletion rtsp/src/main/java/com/pedro/rtsp/rtsp/RtspSender.kt
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import com.pedro.common.VideoCodec
import com.pedro.common.onMainThread
import com.pedro.common.socket.TcpStreamSocket
import com.pedro.common.trySend
import com.pedro.common.validMessage
import com.pedro.rtsp.rtcp.BaseSenderReport
import com.pedro.rtsp.rtp.packets.*
import com.pedro.rtsp.rtp.sockets.BaseRtpSocket
Expand Down Expand Up @@ -195,7 +196,7 @@ class RtspSender(
}.exceptionOrNull()
if (error != null) {
onMainThread {
connectChecker.onConnectionFailed("Error send packet, ${error.message}")
connectChecker.onConnectionFailed("Error send packet, ${error.validMessage()}")
}
Log.e(TAG, "send error: ", error)
return@launch
Expand Down
8 changes: 5 additions & 3 deletions srt/src/main/java/com/pedro/srt/srt/SrtClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ import android.media.MediaCodec
import android.util.Log
import com.pedro.common.AudioCodec
import com.pedro.common.ConnectChecker
import com.pedro.common.ConnectionFailed
import com.pedro.common.UrlParser
import com.pedro.common.VideoCodec
import com.pedro.common.onMainThread
import com.pedro.common.validMessage
import com.pedro.srt.srt.packets.ControlPacket
import com.pedro.srt.srt.packets.DataPacket
import com.pedro.srt.srt.packets.SrtPacket
Expand Down Expand Up @@ -51,7 +53,6 @@ import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.withTimeoutOrNull
import java.io.IOException
import java.net.SocketTimeoutException
import java.net.URISyntaxException
import java.nio.ByteBuffer

Expand Down Expand Up @@ -241,7 +242,7 @@ class SrtClient(private val connectChecker: ConnectChecker) {
if (error != null) {
Log.e(TAG, "connection error", error)
onMainThread {
connectChecker.onConnectionFailed("Error configure stream, ${error.message}")
connectChecker.onConnectionFailed("Error configure stream, ${error.validMessage()}")
}
return@launch
}
Expand Down Expand Up @@ -301,6 +302,7 @@ class SrtClient(private val connectChecker: ConnectChecker) {
while (scope.isActive && isStreaming) {
val error = runCatching {
if (isAlive()) {
delay(2000)
//ignore packet after connect if tunneled to avoid spam idle
handleMessages()
} else {
Expand All @@ -310,7 +312,7 @@ class SrtClient(private val connectChecker: ConnectChecker) {
scope.cancel()
}
}.exceptionOrNull()
if (error != null && error !is SocketTimeoutException) {
if (ConnectionFailed.parse(error?.validMessage() ?: "") != ConnectionFailed.TIMEOUT) {
scope.cancel()
}
}
Expand Down
3 changes: 2 additions & 1 deletion srt/src/main/java/com/pedro/srt/srt/SrtSender.kt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import com.pedro.common.BitrateManager
import com.pedro.common.ConnectChecker
import com.pedro.common.onMainThread
import com.pedro.common.trySend
import com.pedro.common.validMessage
import com.pedro.srt.mpeg2ts.MpegTsPacket
import com.pedro.srt.mpeg2ts.MpegTsPacketizer
import com.pedro.srt.mpeg2ts.Pid
Expand Down Expand Up @@ -171,7 +172,7 @@ class SrtSender(
}.exceptionOrNull()
if (error != null) {
onMainThread {
connectChecker.onConnectionFailed("Error send packet, " + error.message)
connectChecker.onConnectionFailed("Error send packet, ${error.validMessage()}")
}
Log.e(TAG, "send error: ", error)
return@launch
Expand Down
3 changes: 2 additions & 1 deletion udp/src/main/java/com/pedro/udp/UdpClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import com.pedro.common.ConnectChecker
import com.pedro.common.UrlParser
import com.pedro.common.VideoCodec
import com.pedro.common.onMainThread
import com.pedro.common.validMessage
import com.pedro.udp.utils.UdpSocket
import com.pedro.udp.utils.UdpType
import kotlinx.coroutines.CoroutineScope
Expand Down Expand Up @@ -169,7 +170,7 @@ class UdpClient(private val connectChecker: ConnectChecker) {
if (error != null) {
Log.e(TAG, "connection error", error)
onMainThread {
connectChecker.onConnectionFailed("Error configure stream, ${error.message}")
connectChecker.onConnectionFailed("Error configure stream, ${error.validMessage()}")
}
return@launch
}
Expand Down
3 changes: 2 additions & 1 deletion udp/src/main/java/com/pedro/udp/UdpSender.kt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import com.pedro.common.BitrateManager
import com.pedro.common.ConnectChecker
import com.pedro.common.onMainThread
import com.pedro.common.trySend
import com.pedro.common.validMessage
import com.pedro.srt.mpeg2ts.MpegTsPacket
import com.pedro.srt.mpeg2ts.MpegTsPacketizer
import com.pedro.srt.mpeg2ts.Pid
Expand Down Expand Up @@ -171,7 +172,7 @@ class UdpSender(
}.exceptionOrNull()
if (error != null) {
onMainThread {
connectChecker.onConnectionFailed("Error send packet, " + error.message)
connectChecker.onConnectionFailed("Error send packet, ${error.validMessage()}")
}
Log.e(TAG, "send error: ", error)
return@launch
Expand Down

0 comments on commit ea83e4e

Please sign in to comment.