diff --git a/rtmp/src/main/java/com/pedro/rtmp/rtmp/CommandsManager.kt b/rtmp/src/main/java/com/pedro/rtmp/rtmp/CommandsManager.kt index 8933d9538..f0016b446 100644 --- a/rtmp/src/main/java/com/pedro/rtmp/rtmp/CommandsManager.kt +++ b/rtmp/src/main/java/com/pedro/rtmp/rtmp/CommandsManager.kt @@ -32,6 +32,7 @@ import com.pedro.rtmp.utils.socket.RtmpSocket import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock import java.io.* +import java.nio.ByteBuffer /** * Created by pedro on 21/04/21. @@ -64,12 +65,28 @@ abstract class CommandsManager { protected var width = 640 protected var height = 480 var fps = 30 - protected var sampleRate = 44100 - protected var isStereo = true + var sampleRate = 44100 + protected set + var isStereo = true + protected set var videoCodec = VideoCodec.H264 var audioCodec = AudioCodec.AAC //Avoid write a packet in middle of other. private val writeSync = Mutex(locked = false) + var sps: ByteBuffer? = null + private set + var pps: ByteBuffer? = null + private set + var vps: ByteBuffer? = null + private set + + fun videoInfoReady(): Boolean { + return when (videoCodec) { + VideoCodec.H264 -> sps != null && pps != null + VideoCodec.H265 -> sps != null && pps != null && vps != null + VideoCodec.AV1 -> sps != null + } + } fun setVideoResolution(width: Int, height: Int) { this.width = width @@ -81,6 +98,12 @@ abstract class CommandsManager { this.isStereo = isStereo } + fun setVideoInfo(sps: ByteBuffer, pps: ByteBuffer?, vps: ByteBuffer?) { + this.sps = sps + this.pps = pps + this.vps = vps + } + fun setAuth(user: String?, password: String?) { this.user = user this.password = password @@ -224,7 +247,7 @@ abstract class CommandsManager { abstract suspend fun sendPublishImp(socket: RtmpSocket) abstract suspend fun sendCloseImp(socket: RtmpSocket) - fun reset() { + fun reset(clear: Boolean) { startTs = 0 timestamp = 0 streamId = 0 @@ -233,5 +256,10 @@ abstract class CommandsManager { sessionHistory.reset() acknowledgementSequence = 0 bytesRead = 0 + if (clear) { + sps = null + pps = null + vps = null + } } } diff --git a/rtmp/src/main/java/com/pedro/rtmp/rtmp/RtmpClient.kt b/rtmp/src/main/java/com/pedro/rtmp/rtmp/RtmpClient.kt index 566843226..f08de7859 100644 --- a/rtmp/src/main/java/com/pedro/rtmp/rtmp/RtmpClient.kt +++ b/rtmp/src/main/java/com/pedro/rtmp/rtmp/RtmpClient.kt @@ -47,6 +47,7 @@ import kotlinx.coroutines.cancelAndJoin import kotlinx.coroutines.delay import kotlinx.coroutines.isActive import kotlinx.coroutines.launch +import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.withTimeoutOrNull import java.io.* import java.net.* @@ -96,6 +97,7 @@ class RtmpClient(private val connectChecker: ConnectChecker) { get() = rtmpSender.getSentAudioFrames() val sentVideoFrames: Long get() = rtmpSender.getSentVideoFrames() + private var mutex = Mutex(locked = true) /** * Add certificates for TLS connection @@ -177,12 +179,12 @@ class RtmpClient(private val connectChecker: ConnectChecker) { fun setAudioInfo(sampleRate: Int, isStereo: Boolean) { commandsManager.setAudioInfo(sampleRate, isStereo) - rtmpSender.setAudioInfo(sampleRate, isStereo) } fun setVideoInfo(sps: ByteBuffer, pps: ByteBuffer?, vps: ByteBuffer?) { Log.i(TAG, "send sps and pps") - rtmpSender.setVideoInfo(sps, pps, vps) + commandsManager.setVideoInfo(sps, pps, vps) + if (mutex.isLocked) runCatching { mutex.unlock() } } fun setVideoResolution(width: Int, height: Int) { @@ -254,6 +256,22 @@ class RtmpClient(private val connectChecker: ConnectChecker) { if (user != null && password != null) setAuthorization(user, password) val error = runCatching { + if (!commandsManager.audioDisabled) { + rtmpSender.setAudioInfo(commandsManager.sampleRate, commandsManager.isStereo) + } + if (!commandsManager.videoDisabled) { + if (!commandsManager.videoInfoReady()) { + Log.i(TAG, "waiting for sps and pps") + withTimeoutOrNull(5000) { mutex.lock() } + if (!commandsManager.videoInfoReady()) { + onMainThread { + connectChecker.onConnectionFailed("sps or pps is null") + } + return@launch + } + } + rtmpSender.setVideoInfo(commandsManager.sps!!, commandsManager.pps, commandsManager.vps) + } if (!establishConnection()) { onMainThread { connectChecker.onConnectionFailed("Handshake failed") @@ -499,7 +517,7 @@ class RtmpClient(private val connectChecker: ConnectChecker) { private suspend fun closeConnection() { socket?.close() - commandsManager.reset() + commandsManager.reset(false) } @JvmOverloads @@ -538,13 +556,14 @@ class RtmpClient(private val connectChecker: ConnectChecker) { jobRetry = null scopeRetry.cancel() scopeRetry = CoroutineScope(Dispatchers.IO) + mutex = Mutex(true) } job?.cancelAndJoin() job = null scope.cancel() scope = CoroutineScope(Dispatchers.IO) publishPermitted = false - commandsManager.reset() + commandsManager.reset(clear) } fun sendVideo(videoBuffer: ByteBuffer, info: MediaCodec.BufferInfo) {