Skip to content

Commit

Permalink
avoid reset rtmp packets
Browse files Browse the repository at this point in the history
  • Loading branch information
pedroSG94 committed Oct 12, 2024
1 parent a36a4cf commit c626938
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 4 deletions.
27 changes: 25 additions & 2 deletions rtmp/src/main/java/com/pedro/rtmp/rtmp/CommandsManager.kt
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import com.pedro.rtmp.utils.socket.RtmpSocket
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import java.io.*
import java.nio.ByteBuffer

/**
* Created by pedro on 21/04/21.
Expand Down Expand Up @@ -64,12 +65,28 @@ abstract class CommandsManager {
protected var width = 640
protected var height = 480
var fps = 30
protected var sampleRate = 44100
protected var isStereo = true
var sampleRate = 44100
protected set
var isStereo = true
protected set
var videoCodec = VideoCodec.H264
var audioCodec = AudioCodec.AAC
//Avoid write a packet in middle of other.
private val writeSync = Mutex(locked = false)
var sps: ByteBuffer? = null
private set
var pps: ByteBuffer? = null
private set
var vps: ByteBuffer? = null
private set

fun videoInfoReady(): Boolean {
return when (videoCodec) {
VideoCodec.H264 -> sps != null && pps != null
VideoCodec.H265 -> sps != null && pps != null && vps != null
VideoCodec.AV1 -> sps != null
}
}

fun setVideoResolution(width: Int, height: Int) {
this.width = width
Expand All @@ -81,6 +98,12 @@ abstract class CommandsManager {
this.isStereo = isStereo
}

fun setVideoInfo(sps: ByteBuffer, pps: ByteBuffer?, vps: ByteBuffer?) {
this.sps = sps
this.pps = pps
this.vps = vps
}

fun setAuth(user: String?, password: String?) {
this.user = user
this.password = password
Expand Down
23 changes: 21 additions & 2 deletions rtmp/src/main/java/com/pedro/rtmp/rtmp/RtmpClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.delay
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.withTimeoutOrNull
import java.io.*
import java.net.*
Expand Down Expand Up @@ -96,6 +97,7 @@ class RtmpClient(private val connectChecker: ConnectChecker) {
get() = rtmpSender.getSentAudioFrames()
val sentVideoFrames: Long
get() = rtmpSender.getSentVideoFrames()
private var mutex = Mutex(locked = true)

/**
* Add certificates for TLS connection
Expand Down Expand Up @@ -177,12 +179,12 @@ class RtmpClient(private val connectChecker: ConnectChecker) {

fun setAudioInfo(sampleRate: Int, isStereo: Boolean) {
commandsManager.setAudioInfo(sampleRate, isStereo)
rtmpSender.setAudioInfo(sampleRate, isStereo)
}

fun setVideoInfo(sps: ByteBuffer, pps: ByteBuffer?, vps: ByteBuffer?) {
Log.i(TAG, "send sps and pps")
rtmpSender.setVideoInfo(sps, pps, vps)
commandsManager.setVideoInfo(sps, pps, vps)
if (mutex.isLocked) runCatching { mutex.unlock() }
}

fun setVideoResolution(width: Int, height: Int) {
Expand Down Expand Up @@ -254,6 +256,22 @@ class RtmpClient(private val connectChecker: ConnectChecker) {
if (user != null && password != null) setAuthorization(user, password)

val error = runCatching {
if (!commandsManager.audioDisabled) {
rtmpSender.setAudioInfo(commandsManager.sampleRate, commandsManager.isStereo)
}
if (!commandsManager.videoDisabled) {
if (!commandsManager.videoInfoReady()) {
Log.i(TAG, "waiting for sps and pps")
withTimeoutOrNull(5000) { mutex.lock() }
if (!commandsManager.videoInfoReady()) {
onMainThread {
connectChecker.onConnectionFailed("sps or pps is null")
}
return@launch
}
}
rtmpSender.setVideoInfo(commandsManager.sps!!, commandsManager.pps, commandsManager.vps)
}
if (!establishConnection()) {
onMainThread {
connectChecker.onConnectionFailed("Handshake failed")
Expand Down Expand Up @@ -538,6 +556,7 @@ class RtmpClient(private val connectChecker: ConnectChecker) {
jobRetry = null
scopeRetry.cancel()
scopeRetry = CoroutineScope(Dispatchers.IO)
mutex = Mutex(true)
}
job?.cancelAndJoin()
job = null
Expand Down

0 comments on commit c626938

Please sign in to comment.