Skip to content

Commit

Permalink
remove flow and channel on rtsp and srt
Browse files Browse the repository at this point in the history
  • Loading branch information
pedroSG94 committed Oct 5, 2023
1 parent 0f62274 commit 043945c
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 86 deletions.
29 changes: 14 additions & 15 deletions rtmp/src/main/java/com/pedro/rtmp/rtmp/RtmpSender.kt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import com.pedro.rtmp.utils.BitrateManager
import com.pedro.rtmp.utils.ConnectCheckerRtmp
import com.pedro.rtmp.utils.onMainThread
import com.pedro.rtmp.utils.socket.RtmpSocket
import com.pedro.rtmp.utils.trySend
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
Expand All @@ -53,13 +54,12 @@ class RtmpSender(
private var h265Packet = H265Packet()
@Volatile
private var running = false

private var cacheSize = 200

private var job: Job? = null
private val scope = CoroutineScope(Dispatchers.IO)
@Volatile
private var flvPacketBlockingQueue: BlockingQueue<FlvPacket> = LinkedBlockingQueue(cacheSize)
private var queue: BlockingQueue<FlvPacket> = LinkedBlockingQueue(cacheSize)
private var audioFramesSent: Long = 0
private var videoFramesSent: Long = 0
var socket: RtmpSocket? = null
Expand Down Expand Up @@ -93,9 +93,8 @@ class RtmpSender(
}

private fun enqueueVideoFrame(flvPacket: FlvPacket) {
try {
flvPacketBlockingQueue.add(flvPacket)
} catch (e: IllegalStateException) {
val result = queue.trySend(flvPacket)
if (!result) {
Log.i(TAG, "Video frame discarded")
droppedVideoFrames++
}
Expand All @@ -118,9 +117,8 @@ class RtmpSender(
fun sendAudioFrame(aacBuffer: ByteBuffer, info: MediaCodec.BufferInfo) {
if (running) {
aacPacket.createFlvAudioPacket(aacBuffer, info) { flvPacket ->
try {
flvPacketBlockingQueue.add(flvPacket)
} catch (e: IllegalStateException) {
val result = queue.trySend(flvPacket)
if (!result) {
Log.i(TAG, "Audio frame discarded")
droppedAudioFrames++
}
Expand All @@ -129,12 +127,13 @@ class RtmpSender(
}

fun start() {
queue.clear()
running = true
job = scope.launch {
while (scope.isActive && running) {
val error = runCatching {
val flvPacket = runInterruptible {
flvPacketBlockingQueue.poll(1, TimeUnit.SECONDS)
queue.poll(1, TimeUnit.SECONDS)
}
if (flvPacket == null) {
Log.i(TAG, "Skipping iteration, frame null")
Expand Down Expand Up @@ -183,23 +182,23 @@ class RtmpSender(
resetDroppedVideoFrames()
job?.cancelAndJoin()
job = null
flvPacketBlockingQueue.clear()
queue.clear()
}

fun hasCongestion(): Boolean {
val size = flvPacketBlockingQueue.size.toFloat()
val remaining = flvPacketBlockingQueue.remainingCapacity().toFloat()
val size = queue.size.toFloat()
val remaining = queue.remainingCapacity().toFloat()
val capacity = size + remaining
return size >= capacity * 0.2f //more than 20% queue used. You could have congestion
}

fun resizeCache(newSize: Int) {
if (newSize < flvPacketBlockingQueue.size - flvPacketBlockingQueue.remainingCapacity()) {
if (newSize < queue.size - queue.remainingCapacity()) {
throw RuntimeException("Can't fit current cache inside new cache size")
}
val tempQueue: BlockingQueue<FlvPacket> = LinkedBlockingQueue(newSize)
flvPacketBlockingQueue.drainTo(tempQueue)
flvPacketBlockingQueue = tempQueue
queue.drainTo(tempQueue)
queue = tempQueue
}

fun getCacheSize(): Int {
Expand Down
2 changes: 1 addition & 1 deletion rtmp/src/main/java/com/pedro/rtmp/utils/RtmpConfig.kt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@ package com.pedro.rtmp.utils
*/
object RtmpConfig {
const val DEFAULT_CHUNK_SIZE = 128
var writeChunkSize = 4096
var writeChunkSize = DEFAULT_CHUNK_SIZE
var acknowledgementWindowSize = 0
}
10 changes: 10 additions & 0 deletions rtmp/src/main/java/com/pedro/rtmp/utils/Utils.kt
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,21 @@ import kotlinx.coroutines.withContext
import java.io.InputStream
import java.io.OutputStream
import java.nio.ByteBuffer
import java.util.concurrent.BlockingQueue

/**
* Created by pedro on 20/04/21.
*/

inline infix fun <reified T: Any> BlockingQueue<T>.trySend(item: T): Boolean {
return try {
this.add(item)
true
} catch (e: IllegalStateException) {
false
}
}

suspend fun onMainThread(code: () -> Unit) {
withContext(Dispatchers.Main) {
code()
Expand Down
74 changes: 34 additions & 40 deletions rtsp/src/main/java/com/pedro/rtsp/rtsp/RtspSender.kt
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import com.pedro.rtsp.utils.BitrateManager
import com.pedro.rtsp.utils.ConnectCheckerRtsp
import com.pedro.rtsp.utils.RtpConstants
import com.pedro.rtsp.utils.onMainThread
import com.pedro.rtsp.utils.trySend
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
Expand All @@ -34,6 +35,7 @@ import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.receiveAsFlow
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.runInterruptible
import java.io.IOException
import java.io.OutputStream
import java.nio.ByteBuffer
Expand All @@ -54,14 +56,12 @@ class RtspSender(private val connectCheckerRtsp: ConnectCheckerRtsp) {
get() = 10 * 1024 * 1024 / RtpConstants.MTU
private var cacheSize = defaultCacheSize
@Volatile
private var itemsInQueue = 0
@Volatile
private var running = false

private var job: Job? = null
private val scope = CoroutineScope(Dispatchers.IO)
private var queue = Channel<RtpFrame>(cacheSize)
private var queueFlow = queue.receiveAsFlow()
@Volatile
private var queue: BlockingQueue<RtpFrame> = LinkedBlockingQueue(cacheSize)

private var audioFramesSent: Long = 0
private var videoFramesSent: Long = 0
Expand Down Expand Up @@ -108,11 +108,9 @@ class RtspSender(private val connectCheckerRtsp: ConnectCheckerRtsp) {
if (running) {
videoPacket?.createAndSendPacket(h264Buffer, info) { rtpFrame ->
val result = queue.trySend(rtpFrame)
if (!result.isSuccess) {
if (!result) {
Log.i(TAG, "Video frame discarded")
droppedVideoFrames++
} else {
itemsInQueue++
}
}
}
Expand All @@ -122,19 +120,16 @@ class RtspSender(private val connectCheckerRtsp: ConnectCheckerRtsp) {
if (running) {
aacPacket?.createAndSendPacket(aacBuffer, info) { rtpFrame ->
val result = queue.trySend(rtpFrame)
if (!result.isSuccess) {
if (!result) {
Log.i(TAG, "Audio frame discarded")
droppedAudioFrames++
} else {
itemsInQueue++
}
}
}
}

fun start() {
queue = Channel(cacheSize)
queueFlow = queue.receiveAsFlow()
queue.clear()
running = true
job = scope.launch {
val ssrcVideo = Random().nextInt().toLong()
Expand All @@ -143,41 +138,41 @@ class RtspSender(private val connectCheckerRtsp: ConnectCheckerRtsp) {
videoPacket?.setSSRC(ssrcVideo)
aacPacket?.setSSRC(ssrcAudio)
val isTcp = rtpSocket is RtpSocketTcp
queueFlow.collect { rtpFrame ->
itemsInQueue--
while (scope.isActive && running) {
val error = runCatching {
rtpSocket?.sendFrame(rtpFrame, isEnableLogs)
//bytes to bits (4 is tcp header length)
val packetSize = if (isTcp) rtpFrame.length + 4 else rtpFrame.length
bitrateManager.calculateBitrate(packetSize * 8.toLong())
if (rtpFrame.isVideoFrame()) {
videoFramesSent++
} else {
audioFramesSent++
val rtpFrame = runInterruptible {
queue.poll(1, TimeUnit.SECONDS)
}
if (baseSenderReport?.update(rtpFrame, isEnableLogs) == true) {
if (rtpFrame != null) {
rtpSocket?.sendFrame(rtpFrame, isEnableLogs)
//bytes to bits (4 is tcp header length)
val reportSize = if (isTcp) baseSenderReport?.PACKET_LENGTH ?: (0 + 4) else baseSenderReport?.PACKET_LENGTH ?: 0
bitrateManager.calculateBitrate(reportSize * 8.toLong())
val packetSize = if (isTcp) rtpFrame.length + 4 else rtpFrame.length
bitrateManager.calculateBitrate(packetSize * 8.toLong())
if (rtpFrame.isVideoFrame()) {
videoFramesSent++
} else {
audioFramesSent++
}
if (baseSenderReport?.update(rtpFrame, isEnableLogs) == true) {
//bytes to bits (4 is tcp header length)
val reportSize = if (isTcp) baseSenderReport?.PACKET_LENGTH ?: (0 + 4) else baseSenderReport?.PACKET_LENGTH ?: 0
bitrateManager.calculateBitrate(reportSize * 8.toLong())
}
}
}.exceptionOrNull()
if (error != null) {
onMainThread {
connectCheckerRtsp.onConnectionFailedRtsp("Error send packet, " + error.message)
connectCheckerRtsp.onConnectionFailedRtsp("Error send packet, ${error.message}")
}
Log.e(TAG, "send error: ", error)
return@collect
return@launch
}
}
}
}

suspend fun stop() {
running = false
queue.cancel()
itemsInQueue = 0
queue = Channel(cacheSize)
queueFlow = queue.receiveAsFlow()
baseSenderReport?.reset()
baseSenderReport?.close()
rtpSocket?.close()
Expand All @@ -189,24 +184,23 @@ class RtspSender(private val connectCheckerRtsp: ConnectCheckerRtsp) {
resetDroppedVideoFrames()
job?.cancelAndJoin()
job = null
queue.clear()
}

fun hasCongestion(): Boolean {
val size = cacheSize
val remaining = cacheSize - itemsInQueue
val size = queue.size.toFloat()
val remaining = queue.remainingCapacity().toFloat()
val capacity = size + remaining
return size >= capacity * 0.2f //more than 20% queue used. You could have congestion
}

fun resizeCache(newSize: Int) {
if (!scope.isActive) {
val tempQueue = Channel<RtpFrame>(newSize)
queue = tempQueue
queueFlow = queue.receiveAsFlow()
cacheSize = newSize
} else {
throw RuntimeException("resize cache while streaming is not available")
if (newSize < queue.size - queue.remainingCapacity()) {
throw RuntimeException("Can't fit current cache inside new cache size")
}
val tempQueue: BlockingQueue<RtpFrame> = LinkedBlockingQueue(newSize)
queue.drainTo(tempQueue)
queue = tempQueue
}

fun getCacheSize(): Int {
Expand Down
11 changes: 11 additions & 0 deletions rtsp/src/main/java/com/pedro/rtsp/utils/Extensions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,20 @@
package com.pedro.rtsp.utils

import android.util.Base64
import com.pedro.rtsp.rtsp.RtpFrame
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext
import java.nio.ByteBuffer
import java.util.concurrent.BlockingQueue

inline infix fun <reified T: Any> BlockingQueue<T>.trySend(item: T): Boolean {
return try {
this.add(item)
true
} catch (e: IllegalStateException) {
false
}
}

fun ByteArray.encodeToString(flags: Int = Base64.NO_WRAP): String {
return Base64.encodeToString(this, flags)
Expand Down
1 change: 0 additions & 1 deletion srt/src/main/java/com/pedro/srt/srt/CommandsManager.kt
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ class CommandsManager {
packetHandlingQueue.add(dataPacket)
dataPacket.write()
socket?.write(dataPacket)
Log.i(TAG, dataPacket.toString())
return dataPacket.getSize()
}
}
Expand Down
Loading

0 comments on commit 043945c

Please sign in to comment.