Skip to content

Commit

Permalink
update RTSP setSocket
Browse files Browse the repository at this point in the history
  • Loading branch information
pedroSG94 committed Oct 26, 2024
1 parent 8aa5120 commit 18b122b
Show file tree
Hide file tree
Showing 9 changed files with 99 additions and 71 deletions.
2 changes: 1 addition & 1 deletion common/src/main/java/com/pedro/common/base/BaseSender.kt
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ abstract class BaseSender(
protected set
var droppedVideoFrames: Long = 0
protected set
protected val bitrateManager: BitrateManager = BitrateManager(connectChecker)
private val bitrateManager: BitrateManager = BitrateManager(connectChecker)
protected var isEnableLogs = true
private var job: Job? = null
protected val scope = CoroutineScope(Dispatchers.IO)
Expand Down
55 changes: 4 additions & 51 deletions common/src/main/java/com/pedro/common/socket/TcpStreamSocket.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,6 @@
package com.pedro.common.socket

import io.ktor.network.selector.SelectorManager
import io.ktor.network.sockets.InetSocketAddress
import io.ktor.network.sockets.ReadWriteSocket
import io.ktor.network.sockets.aSocket
import io.ktor.network.sockets.isClosed
import io.ktor.network.sockets.openReadChannel
import io.ktor.network.sockets.openWriteChannel
import io.ktor.network.tls.tls
import io.ktor.utils.io.ByteReadChannel
import io.ktor.utils.io.ByteWriteChannel
import io.ktor.utils.io.readByte
Expand All @@ -35,58 +28,18 @@ import io.ktor.utils.io.writeByte
import io.ktor.utils.io.writeFully
import io.ktor.utils.io.writeStringUtf8
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext
import kotlinx.coroutines.withTimeout
import java.net.ConnectException
import java.security.SecureRandom
import javax.net.ssl.TrustManager
import java.net.InetAddress

/**
* Created by pedro on 22/9/24.
*/
class TcpStreamSocket(
private val host: String,
private val port: Int,
private val secured: Boolean = false,
private val certificate: TrustManager? = null
): StreamSocket {
abstract class TcpStreamSocket: StreamSocket {

private val timeout = 5000L
private var input: ByteReadChannel? = null
private var output: ByteWriteChannel? = null
private var selectorManager = SelectorManager(Dispatchers.IO)
private var socket: ReadWriteSocket? = null
private var address: InetAddress? = null

override suspend fun connect() {
selectorManager = SelectorManager(Dispatchers.IO)
val builder = aSocket(selectorManager).tcp().connect(remoteAddress = InetSocketAddress(host, port))
val socket = if (secured) {
builder.tls(Dispatchers.Default) {
trustManager = certificate
random = SecureRandom()
}
} else builder
input = socket.openReadChannel()
output = socket.openWriteChannel(autoFlush = false)
address = java.net.InetSocketAddress(host, port).address
this.socket = socket
}

override suspend fun close() = withContext(Dispatchers.IO) {
try {
address = null
input = null
output = null
socket?.close()
selectorManager.close()
} catch (ignored: Exception) {}
}

override fun isConnected(): Boolean = socket?.isClosed != true

override fun isReachable(): Boolean = address?.isReachable(5000) ?: false
protected var input: ByteReadChannel? = null
protected var output: ByteWriteChannel? = null
protected var selectorManager = SelectorManager(Dispatchers.IO)

suspend fun flush() {
output?.flush()
Expand Down
76 changes: 76 additions & 0 deletions common/src/main/java/com/pedro/common/socket/TcpStreamSocketImp.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
*
* * Copyright (C) 2024 pedroSG94.
* *
* * Licensed under the Apache License, Version 2.0 (the "License");
* * you may not use this file except in compliance with the License.
* * You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/

package com.pedro.common.socket

import io.ktor.network.selector.SelectorManager
import io.ktor.network.sockets.InetSocketAddress
import io.ktor.network.sockets.ReadWriteSocket
import io.ktor.network.sockets.aSocket
import io.ktor.network.sockets.isClosed
import io.ktor.network.sockets.openReadChannel
import io.ktor.network.sockets.openWriteChannel
import io.ktor.network.tls.tls
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext
import java.net.InetAddress
import java.security.SecureRandom
import javax.net.ssl.TrustManager

/**
* Created by pedro on 22/9/24.
*/
class TcpStreamSocketImp(
private val host: String,
private val port: Int,
private val secured: Boolean = false,
private val certificate: TrustManager? = null
): TcpStreamSocket() {

private var socket: ReadWriteSocket? = null
private var address: InetAddress? = null

override suspend fun connect() {
selectorManager = SelectorManager(Dispatchers.IO)
val builder = aSocket(selectorManager).tcp().connect(remoteAddress = InetSocketAddress(host, port))
val socket = if (secured) {
builder.tls(Dispatchers.Default) {
trustManager = certificate
random = SecureRandom()
}
} else builder
input = socket.openReadChannel()
output = socket.openWriteChannel(autoFlush = false)
address = java.net.InetSocketAddress(host, port).address
this.socket = socket
}

override suspend fun close() = withContext(Dispatchers.IO) {
try {
address = null
input = null
output = null
socket?.close()
selectorManager.close()
} catch (ignored: Exception) {}
}

override fun isConnected(): Boolean = socket?.isClosed != true

override fun isReachable(): Boolean = address?.isReachable(5000) ?: false
}
4 changes: 2 additions & 2 deletions rtmp/src/main/java/com/pedro/rtmp/utils/socket/TcpSocket.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package com.pedro.rtmp.utils.socket

import com.pedro.common.socket.TcpStreamSocket
import com.pedro.common.socket.TcpStreamSocketImp
import javax.net.ssl.TrustManager

/**
Expand All @@ -26,7 +26,7 @@ class TcpSocket(
host: String, port: Int, secured: Boolean, certificates: TrustManager?
): RtmpSocket() {

private val socket = TcpStreamSocket(host, port, secured, certificates)
private val socket = TcpStreamSocketImp(host, port, secured, certificates)

override suspend fun flush(isPacket: Boolean) {
socket.flush()
Expand Down
Empty file added rtsp/.attach_pid87602
Empty file.
6 changes: 3 additions & 3 deletions rtsp/src/main/java/com/pedro/rtsp/rtsp/RtspClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import com.pedro.common.VideoCodec
import com.pedro.common.clone
import com.pedro.common.frame.MediaFrame
import com.pedro.common.onMainThread
import com.pedro.common.socket.TcpStreamSocket
import com.pedro.common.socket.TcpStreamSocketImp
import com.pedro.common.toMediaFrameInfo
import com.pedro.common.validMessage
import com.pedro.rtsp.rtsp.commands.CommandsManager
Expand Down Expand Up @@ -57,7 +57,7 @@ class RtspClient(private val connectChecker: ConnectChecker) {
private val validSchemes = arrayOf("rtsp", "rtsps")

//sockets objects
private var socket: TcpStreamSocket? = null
private var socket: TcpStreamSocketImp? = null
private var scope = CoroutineScope(Dispatchers.IO)
private var scopeRetry = CoroutineScope(Dispatchers.IO)
private var job: Job? = null
Expand Down Expand Up @@ -237,7 +237,7 @@ class RtspClient(private val connectChecker: ConnectChecker) {
}
rtspSender.setVideoInfo(commandsManager.sps!!, commandsManager.pps, commandsManager.vps)
}
val socket = TcpStreamSocket(host, port, tlsEnabled, certificates)
val socket = TcpStreamSocketImp(host, port, tlsEnabled, certificates)
this@RtspClient.socket = socket
socket.connect()
socket.write(commandsManager.createOptions())
Expand Down
19 changes: 9 additions & 10 deletions rtsp/src/main/java/com/pedro/rtsp/rtsp/commands/CommandsManager.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import com.pedro.common.TimeUtils
import com.pedro.common.VideoCodec
import com.pedro.common.getMd5Hash
import com.pedro.common.socket.TcpStreamSocket
import com.pedro.common.socket.TcpStreamSocketImp
import com.pedro.rtsp.rtsp.Protocol
import com.pedro.rtsp.rtsp.commands.SdpBody.createAV1Body
import com.pedro.rtsp.rtsp.commands.SdpBody.createAacBody
Expand Down Expand Up @@ -53,6 +54,8 @@ open class CommandsManager {
private set
var pps: ByteBuffer? = null
private set
var vps: ByteBuffer? = null
private set
private var cSeq = 0
private var sessionId: String? = null
private val timeStamp: Long
Expand All @@ -70,9 +73,12 @@ open class CommandsManager {
val audioServerPorts = intArrayOf(5004, 5005)
val videoServerPorts = intArrayOf(5006, 5007)

//For H265
var vps: ByteBuffer? = null
private set
val spsString: String
get() = sps?.getData()?.encodeToString() ?: ""
val ppsString: String
get() = pps?.getData()?.encodeToString() ?: ""
val vpsString: String
get() = vps?.getData()?.encodeToString() ?: ""

//For auth
var user: String? = null
Expand Down Expand Up @@ -133,13 +139,6 @@ open class CommandsManager {
sessionId = null
}

private val spsString: String
get() = sps?.getData()?.encodeToString() ?: ""
private val ppsString: String
get() = pps?.getData()?.encodeToString() ?: ""
private val vpsString: String
get() = vps?.getData()?.encodeToString() ?: ""

private fun addHeaders(): String {
return "CSeq: ${++cSeq}\r\n" +
(if (sessionId.isNullOrEmpty()) "" else "Session: $sessionId\r\n") +
Expand Down
4 changes: 2 additions & 2 deletions rtsp/src/test/java/com/pedro/rtsp/rtcp/RtcpReportTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package com.pedro.rtsp.rtcp

import com.pedro.common.TimeUtils
import com.pedro.common.socket.TcpStreamSocket
import com.pedro.common.socket.TcpStreamSocketImp
import com.pedro.common.socket.UdpStreamSocket
import com.pedro.rtsp.Utils
import com.pedro.rtsp.rtsp.Protocol
Expand Down Expand Up @@ -46,7 +46,7 @@ class RtcpReportTest {
@Mock
private lateinit var udpSocket: UdpStreamSocket
@Mock
private lateinit var tcpSocket: TcpStreamSocket
private lateinit var tcpSocket: TcpStreamSocketImp

private val timeUtilsMocked = Mockito.mockStatic(TimeUtils::class.java)
private var fakeTime = 7502849023L
Expand Down
4 changes: 2 additions & 2 deletions rtsp/src/test/java/com/pedro/rtsp/rtp/RtpStreamSocketTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package com.pedro.rtsp.rtp

import com.pedro.common.socket.TcpStreamSocket
import com.pedro.common.socket.TcpStreamSocketImp
import com.pedro.common.socket.UdpStreamSocket
import com.pedro.rtsp.rtp.sockets.BaseRtpSocket
import com.pedro.rtsp.rtp.sockets.RtpSocketUdp
Expand All @@ -43,7 +43,7 @@ class RtpStreamSocketTest {
@Mock
private lateinit var udpSocket: UdpStreamSocket
@Mock
private lateinit var tcpSocket: TcpStreamSocket
private lateinit var tcpSocket: TcpStreamSocketImp

@Test
fun `GIVEN multiple video or audio rtp frames WHEN update rtcp tcp send THEN send only 1 of video and 1 of audio each 3 seconds`() = runTest {
Expand Down

0 comments on commit 18b122b

Please sign in to comment.