Skip to content

Commit

Permalink
remove channel and use linkedqueue
Browse files Browse the repository at this point in the history
  • Loading branch information
pedroSG94 committed Sep 29, 2023
1 parent d329d33 commit 0f62274
Showing 1 changed file with 45 additions and 46 deletions.
91 changes: 45 additions & 46 deletions rtmp/src/main/java/com/pedro/rtmp/rtmp/RtmpSender.kt
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.receiveAsFlow
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.runInterruptible
import java.nio.ByteBuffer
import java.util.concurrent.BlockingQueue
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit

/**
* Created by pedro on 8/04/21.
Expand All @@ -53,13 +55,11 @@ class RtmpSender(
private var running = false

private var cacheSize = 200
@Volatile
private var itemsInQueue = 0

private var job: Job? = null
private val scope = CoroutineScope(Dispatchers.IO)
private var queue = Channel<FlvPacket>(cacheSize)
private var queueFlow = queue.receiveAsFlow()
@Volatile
private var flvPacketBlockingQueue: BlockingQueue<FlvPacket> = LinkedBlockingQueue(cacheSize)
private var audioFramesSent: Long = 0
private var videoFramesSent: Long = 0
var socket: RtmpSocket? = null
Expand Down Expand Up @@ -93,12 +93,11 @@ class RtmpSender(
}

private fun enqueueVideoFrame(flvPacket: FlvPacket) {
val result = queue.trySend(flvPacket)
if (!result.isSuccess) {
try {
flvPacketBlockingQueue.add(flvPacket)
} catch (e: IllegalStateException) {
Log.i(TAG, "Video frame discarded")
droppedVideoFrames++
} else {
itemsInQueue++
}
}

Expand All @@ -119,63 +118,62 @@ class RtmpSender(
fun sendAudioFrame(aacBuffer: ByteBuffer, info: MediaCodec.BufferInfo) {
if (running) {
aacPacket.createFlvAudioPacket(aacBuffer, info) { flvPacket ->
val result = queue.trySend(flvPacket)
if (!result.isSuccess) {
try {
flvPacketBlockingQueue.add(flvPacket)
} catch (e: IllegalStateException) {
Log.i(TAG, "Audio frame discarded")
droppedAudioFrames++
} else {
itemsInQueue++
}
}
}
}

fun start() {
queue = Channel(cacheSize)
queueFlow = queue.receiveAsFlow()
running = true
job = scope.launch {
queueFlow.collect { flvPacket ->
itemsInQueue--
while (scope.isActive && running) {
val error = runCatching {
var size = 0
if (flvPacket.type == FlvType.VIDEO) {
videoFramesSent++
socket?.let { socket ->
size = commandsManager.sendVideoPacket(flvPacket, socket)
if (isEnableLogs) {
Log.i(TAG, "wrote Video packet, size $size")
}
}
val flvPacket = runInterruptible {
flvPacketBlockingQueue.poll(1, TimeUnit.SECONDS)
}
if (flvPacket == null) {
Log.i(TAG, "Skipping iteration, frame null")
} else {
audioFramesSent++
socket?.let { socket ->
size = commandsManager.sendAudioPacket(flvPacket, socket)
if (isEnableLogs) {
Log.i(TAG, "wrote Audio packet, size $size")
var size = 0
if (flvPacket.type == FlvType.VIDEO) {
videoFramesSent++
socket?.let { socket ->
size = commandsManager.sendVideoPacket(flvPacket, socket)
if (isEnableLogs) {
Log.i(TAG, "wrote Video packet, size $size")
}
}
} else {
audioFramesSent++
socket?.let { socket ->
size = commandsManager.sendAudioPacket(flvPacket, socket)
if (isEnableLogs) {
Log.i(TAG, "wrote Audio packet, size $size")
}
}
}
//bytes to bits
bitrateManager.calculateBitrate(size * 8L)
}
//bytes to bits
bitrateManager.calculateBitrate(size * 8L)
}.exceptionOrNull()
if (error != null) {
onMainThread {
connectCheckerRtmp.onConnectionFailedRtmp("Error send packet, " + error.message)
}
Log.e(TAG, "send error: ", error)
return@collect
return@launch
}
}
}
}

suspend fun stop(clear: Boolean = true) {
running = false
queue.cancel()
itemsInQueue = 0
queue = Channel(cacheSize)
queueFlow = queue.receiveAsFlow()
aacPacket.reset()
h264Packet.reset(clear)
h265Packet.reset(clear)
Expand All @@ -185,22 +183,23 @@ class RtmpSender(
resetDroppedVideoFrames()
job?.cancelAndJoin()
job = null
flvPacketBlockingQueue.clear()
}

fun hasCongestion(): Boolean {
val size = cacheSize
val remaining = cacheSize - itemsInQueue
val size = flvPacketBlockingQueue.size.toFloat()
val remaining = flvPacketBlockingQueue.remainingCapacity().toFloat()
val capacity = size + remaining
return size >= capacity * 0.2f //more than 20% queue used. You could have congestion
}

fun resizeCache(newSize: Int) {
if (!scope.isActive) {
val tempQueue = Channel<FlvPacket>(newSize)
queue = tempQueue
queueFlow = queue.receiveAsFlow()
cacheSize = newSize
if (newSize < flvPacketBlockingQueue.size - flvPacketBlockingQueue.remainingCapacity()) {
throw RuntimeException("Can't fit current cache inside new cache size")
}
val tempQueue: BlockingQueue<FlvPacket> = LinkedBlockingQueue(newSize)
flvPacketBlockingQueue.drainTo(tempQueue)
flvPacketBlockingQueue = tempQueue
}

fun getCacheSize(): Int {
Expand Down

0 comments on commit 0f62274

Please sign in to comment.