Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

avoid reset rtmp packets #1602

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 31 additions & 3 deletions rtmp/src/main/java/com/pedro/rtmp/rtmp/CommandsManager.kt
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import com.pedro.rtmp.utils.socket.RtmpSocket
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import java.io.*
import java.nio.ByteBuffer

/**
* Created by pedro on 21/04/21.
Expand Down Expand Up @@ -64,12 +65,28 @@ abstract class CommandsManager {
protected var width = 640
protected var height = 480
var fps = 30
protected var sampleRate = 44100
protected var isStereo = true
var sampleRate = 44100
protected set
var isStereo = true
protected set
var videoCodec = VideoCodec.H264
var audioCodec = AudioCodec.AAC
//Avoid write a packet in middle of other.
private val writeSync = Mutex(locked = false)
var sps: ByteBuffer? = null
private set
var pps: ByteBuffer? = null
private set
var vps: ByteBuffer? = null
private set

fun videoInfoReady(): Boolean {
return when (videoCodec) {
VideoCodec.H264 -> sps != null && pps != null
VideoCodec.H265 -> sps != null && pps != null && vps != null
VideoCodec.AV1 -> sps != null
}
}

fun setVideoResolution(width: Int, height: Int) {
this.width = width
Expand All @@ -81,6 +98,12 @@ abstract class CommandsManager {
this.isStereo = isStereo
}

fun setVideoInfo(sps: ByteBuffer, pps: ByteBuffer?, vps: ByteBuffer?) {
this.sps = sps
this.pps = pps
this.vps = vps
}

fun setAuth(user: String?, password: String?) {
this.user = user
this.password = password
Expand Down Expand Up @@ -224,7 +247,7 @@ abstract class CommandsManager {
abstract suspend fun sendPublishImp(socket: RtmpSocket)
abstract suspend fun sendCloseImp(socket: RtmpSocket)

fun reset() {
fun reset(clear: Boolean) {
startTs = 0
timestamp = 0
streamId = 0
Expand All @@ -233,5 +256,10 @@ abstract class CommandsManager {
sessionHistory.reset()
acknowledgementSequence = 0
bytesRead = 0
if (clear) {
sps = null
pps = null
vps = null
}
}
}
27 changes: 23 additions & 4 deletions rtmp/src/main/java/com/pedro/rtmp/rtmp/RtmpClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.delay
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.withTimeoutOrNull
import java.io.*
import java.net.*
Expand Down Expand Up @@ -96,6 +97,7 @@ class RtmpClient(private val connectChecker: ConnectChecker) {
get() = rtmpSender.getSentAudioFrames()
val sentVideoFrames: Long
get() = rtmpSender.getSentVideoFrames()
private var mutex = Mutex(locked = true)

/**
* Add certificates for TLS connection
Expand Down Expand Up @@ -177,12 +179,12 @@ class RtmpClient(private val connectChecker: ConnectChecker) {

fun setAudioInfo(sampleRate: Int, isStereo: Boolean) {
commandsManager.setAudioInfo(sampleRate, isStereo)
rtmpSender.setAudioInfo(sampleRate, isStereo)
}

fun setVideoInfo(sps: ByteBuffer, pps: ByteBuffer?, vps: ByteBuffer?) {
Log.i(TAG, "send sps and pps")
rtmpSender.setVideoInfo(sps, pps, vps)
commandsManager.setVideoInfo(sps, pps, vps)
if (mutex.isLocked) runCatching { mutex.unlock() }
}

fun setVideoResolution(width: Int, height: Int) {
Expand Down Expand Up @@ -254,6 +256,22 @@ class RtmpClient(private val connectChecker: ConnectChecker) {
if (user != null && password != null) setAuthorization(user, password)

val error = runCatching {
if (!commandsManager.audioDisabled) {
rtmpSender.setAudioInfo(commandsManager.sampleRate, commandsManager.isStereo)
}
if (!commandsManager.videoDisabled) {
if (!commandsManager.videoInfoReady()) {
Log.i(TAG, "waiting for sps and pps")
withTimeoutOrNull(5000) { mutex.lock() }
if (!commandsManager.videoInfoReady()) {
onMainThread {
connectChecker.onConnectionFailed("sps or pps is null")
}
return@launch
}
}
rtmpSender.setVideoInfo(commandsManager.sps!!, commandsManager.pps, commandsManager.vps)
}
if (!establishConnection()) {
onMainThread {
connectChecker.onConnectionFailed("Handshake failed")
Expand Down Expand Up @@ -499,7 +517,7 @@ class RtmpClient(private val connectChecker: ConnectChecker) {

private suspend fun closeConnection() {
socket?.close()
commandsManager.reset()
commandsManager.reset(false)
}

@JvmOverloads
Expand Down Expand Up @@ -538,13 +556,14 @@ class RtmpClient(private val connectChecker: ConnectChecker) {
jobRetry = null
scopeRetry.cancel()
scopeRetry = CoroutineScope(Dispatchers.IO)
mutex = Mutex(true)
}
job?.cancelAndJoin()
job = null
scope.cancel()
scope = CoroutineScope(Dispatchers.IO)
publishPermitted = false
commandsManager.reset()
commandsManager.reset(clear)
}

fun sendVideo(videoBuffer: ByteBuffer, info: MediaCodec.BufferInfo) {
Expand Down
Loading