diff --git a/rtmp/src/main/java/com/pedro/rtmp/rtmp/RtmpSender.kt b/rtmp/src/main/java/com/pedro/rtmp/rtmp/RtmpSender.kt index e19da3e57..3696f8984 100644 --- a/rtmp/src/main/java/com/pedro/rtmp/rtmp/RtmpSender.kt +++ b/rtmp/src/main/java/com/pedro/rtmp/rtmp/RtmpSender.kt @@ -28,6 +28,7 @@ import com.pedro.rtmp.utils.BitrateManager import com.pedro.rtmp.utils.ConnectCheckerRtmp import com.pedro.rtmp.utils.onMainThread import com.pedro.rtmp.utils.socket.RtmpSocket +import com.pedro.rtmp.utils.trySend import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Job @@ -53,13 +54,12 @@ class RtmpSender( private var h265Packet = H265Packet() @Volatile private var running = false - private var cacheSize = 200 private var job: Job? = null private val scope = CoroutineScope(Dispatchers.IO) @Volatile - private var flvPacketBlockingQueue: BlockingQueue = LinkedBlockingQueue(cacheSize) + private var queue: BlockingQueue = LinkedBlockingQueue(cacheSize) private var audioFramesSent: Long = 0 private var videoFramesSent: Long = 0 var socket: RtmpSocket? = null @@ -93,9 +93,8 @@ class RtmpSender( } private fun enqueueVideoFrame(flvPacket: FlvPacket) { - try { - flvPacketBlockingQueue.add(flvPacket) - } catch (e: IllegalStateException) { + val result = queue.trySend(flvPacket) + if (!result) { Log.i(TAG, "Video frame discarded") droppedVideoFrames++ } @@ -118,9 +117,8 @@ class RtmpSender( fun sendAudioFrame(aacBuffer: ByteBuffer, info: MediaCodec.BufferInfo) { if (running) { aacPacket.createFlvAudioPacket(aacBuffer, info) { flvPacket -> - try { - flvPacketBlockingQueue.add(flvPacket) - } catch (e: IllegalStateException) { + val result = queue.trySend(flvPacket) + if (!result) { Log.i(TAG, "Audio frame discarded") droppedAudioFrames++ } @@ -129,12 +127,13 @@ class RtmpSender( } fun start() { + queue.clear() running = true job = scope.launch { while (scope.isActive && running) { val error = runCatching { val flvPacket = runInterruptible { - flvPacketBlockingQueue.poll(1, TimeUnit.SECONDS) + queue.poll(1, TimeUnit.SECONDS) } if (flvPacket == null) { Log.i(TAG, "Skipping iteration, frame null") @@ -183,23 +182,23 @@ class RtmpSender( resetDroppedVideoFrames() job?.cancelAndJoin() job = null - flvPacketBlockingQueue.clear() + queue.clear() } fun hasCongestion(): Boolean { - val size = flvPacketBlockingQueue.size.toFloat() - val remaining = flvPacketBlockingQueue.remainingCapacity().toFloat() + val size = queue.size.toFloat() + val remaining = queue.remainingCapacity().toFloat() val capacity = size + remaining return size >= capacity * 0.2f //more than 20% queue used. You could have congestion } fun resizeCache(newSize: Int) { - if (newSize < flvPacketBlockingQueue.size - flvPacketBlockingQueue.remainingCapacity()) { + if (newSize < queue.size - queue.remainingCapacity()) { throw RuntimeException("Can't fit current cache inside new cache size") } val tempQueue: BlockingQueue = LinkedBlockingQueue(newSize) - flvPacketBlockingQueue.drainTo(tempQueue) - flvPacketBlockingQueue = tempQueue + queue.drainTo(tempQueue) + queue = tempQueue } fun getCacheSize(): Int { diff --git a/rtmp/src/main/java/com/pedro/rtmp/utils/RtmpConfig.kt b/rtmp/src/main/java/com/pedro/rtmp/utils/RtmpConfig.kt index 515eae292..db5474412 100644 --- a/rtmp/src/main/java/com/pedro/rtmp/utils/RtmpConfig.kt +++ b/rtmp/src/main/java/com/pedro/rtmp/utils/RtmpConfig.kt @@ -21,6 +21,6 @@ package com.pedro.rtmp.utils */ object RtmpConfig { const val DEFAULT_CHUNK_SIZE = 128 - var writeChunkSize = 4096 + var writeChunkSize = DEFAULT_CHUNK_SIZE var acknowledgementWindowSize = 0 } \ No newline at end of file diff --git a/rtmp/src/main/java/com/pedro/rtmp/utils/Utils.kt b/rtmp/src/main/java/com/pedro/rtmp/utils/Utils.kt index 14947703b..057e5e3b5 100644 --- a/rtmp/src/main/java/com/pedro/rtmp/utils/Utils.kt +++ b/rtmp/src/main/java/com/pedro/rtmp/utils/Utils.kt @@ -21,11 +21,21 @@ import kotlinx.coroutines.withContext import java.io.InputStream import java.io.OutputStream import java.nio.ByteBuffer +import java.util.concurrent.BlockingQueue /** * Created by pedro on 20/04/21. */ +inline infix fun BlockingQueue.trySend(item: T): Boolean { + return try { + this.add(item) + true + } catch (e: IllegalStateException) { + false + } +} + suspend fun onMainThread(code: () -> Unit) { withContext(Dispatchers.Main) { code() diff --git a/rtsp/src/main/java/com/pedro/rtsp/rtsp/RtspSender.kt b/rtsp/src/main/java/com/pedro/rtsp/rtsp/RtspSender.kt index 07675a0fe..96ce6891f 100644 --- a/rtsp/src/main/java/com/pedro/rtsp/rtsp/RtspSender.kt +++ b/rtsp/src/main/java/com/pedro/rtsp/rtsp/RtspSender.kt @@ -26,6 +26,7 @@ import com.pedro.rtsp.utils.BitrateManager import com.pedro.rtsp.utils.ConnectCheckerRtsp import com.pedro.rtsp.utils.RtpConstants import com.pedro.rtsp.utils.onMainThread +import com.pedro.rtsp.utils.trySend import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Job @@ -34,6 +35,7 @@ import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.receiveAsFlow import kotlinx.coroutines.isActive import kotlinx.coroutines.launch +import kotlinx.coroutines.runInterruptible import java.io.IOException import java.io.OutputStream import java.nio.ByteBuffer @@ -54,14 +56,12 @@ class RtspSender(private val connectCheckerRtsp: ConnectCheckerRtsp) { get() = 10 * 1024 * 1024 / RtpConstants.MTU private var cacheSize = defaultCacheSize @Volatile - private var itemsInQueue = 0 - @Volatile private var running = false private var job: Job? = null private val scope = CoroutineScope(Dispatchers.IO) - private var queue = Channel(cacheSize) - private var queueFlow = queue.receiveAsFlow() + @Volatile + private var queue: BlockingQueue = LinkedBlockingQueue(cacheSize) private var audioFramesSent: Long = 0 private var videoFramesSent: Long = 0 @@ -108,11 +108,9 @@ class RtspSender(private val connectCheckerRtsp: ConnectCheckerRtsp) { if (running) { videoPacket?.createAndSendPacket(h264Buffer, info) { rtpFrame -> val result = queue.trySend(rtpFrame) - if (!result.isSuccess) { + if (!result) { Log.i(TAG, "Video frame discarded") droppedVideoFrames++ - } else { - itemsInQueue++ } } } @@ -122,19 +120,16 @@ class RtspSender(private val connectCheckerRtsp: ConnectCheckerRtsp) { if (running) { aacPacket?.createAndSendPacket(aacBuffer, info) { rtpFrame -> val result = queue.trySend(rtpFrame) - if (!result.isSuccess) { + if (!result) { Log.i(TAG, "Audio frame discarded") droppedAudioFrames++ - } else { - itemsInQueue++ } } } } fun start() { - queue = Channel(cacheSize) - queueFlow = queue.receiveAsFlow() + queue.clear() running = true job = scope.launch { val ssrcVideo = Random().nextInt().toLong() @@ -143,30 +138,34 @@ class RtspSender(private val connectCheckerRtsp: ConnectCheckerRtsp) { videoPacket?.setSSRC(ssrcVideo) aacPacket?.setSSRC(ssrcAudio) val isTcp = rtpSocket is RtpSocketTcp - queueFlow.collect { rtpFrame -> - itemsInQueue-- + while (scope.isActive && running) { val error = runCatching { - rtpSocket?.sendFrame(rtpFrame, isEnableLogs) - //bytes to bits (4 is tcp header length) - val packetSize = if (isTcp) rtpFrame.length + 4 else rtpFrame.length - bitrateManager.calculateBitrate(packetSize * 8.toLong()) - if (rtpFrame.isVideoFrame()) { - videoFramesSent++ - } else { - audioFramesSent++ + val rtpFrame = runInterruptible { + queue.poll(1, TimeUnit.SECONDS) } - if (baseSenderReport?.update(rtpFrame, isEnableLogs) == true) { + if (rtpFrame != null) { + rtpSocket?.sendFrame(rtpFrame, isEnableLogs) //bytes to bits (4 is tcp header length) - val reportSize = if (isTcp) baseSenderReport?.PACKET_LENGTH ?: (0 + 4) else baseSenderReport?.PACKET_LENGTH ?: 0 - bitrateManager.calculateBitrate(reportSize * 8.toLong()) + val packetSize = if (isTcp) rtpFrame.length + 4 else rtpFrame.length + bitrateManager.calculateBitrate(packetSize * 8.toLong()) + if (rtpFrame.isVideoFrame()) { + videoFramesSent++ + } else { + audioFramesSent++ + } + if (baseSenderReport?.update(rtpFrame, isEnableLogs) == true) { + //bytes to bits (4 is tcp header length) + val reportSize = if (isTcp) baseSenderReport?.PACKET_LENGTH ?: (0 + 4) else baseSenderReport?.PACKET_LENGTH ?: 0 + bitrateManager.calculateBitrate(reportSize * 8.toLong()) + } } }.exceptionOrNull() if (error != null) { onMainThread { - connectCheckerRtsp.onConnectionFailedRtsp("Error send packet, " + error.message) + connectCheckerRtsp.onConnectionFailedRtsp("Error send packet, ${error.message}") } Log.e(TAG, "send error: ", error) - return@collect + return@launch } } } @@ -174,10 +173,6 @@ class RtspSender(private val connectCheckerRtsp: ConnectCheckerRtsp) { suspend fun stop() { running = false - queue.cancel() - itemsInQueue = 0 - queue = Channel(cacheSize) - queueFlow = queue.receiveAsFlow() baseSenderReport?.reset() baseSenderReport?.close() rtpSocket?.close() @@ -189,24 +184,23 @@ class RtspSender(private val connectCheckerRtsp: ConnectCheckerRtsp) { resetDroppedVideoFrames() job?.cancelAndJoin() job = null + queue.clear() } fun hasCongestion(): Boolean { - val size = cacheSize - val remaining = cacheSize - itemsInQueue + val size = queue.size.toFloat() + val remaining = queue.remainingCapacity().toFloat() val capacity = size + remaining return size >= capacity * 0.2f //more than 20% queue used. You could have congestion } fun resizeCache(newSize: Int) { - if (!scope.isActive) { - val tempQueue = Channel(newSize) - queue = tempQueue - queueFlow = queue.receiveAsFlow() - cacheSize = newSize - } else { - throw RuntimeException("resize cache while streaming is not available") + if (newSize < queue.size - queue.remainingCapacity()) { + throw RuntimeException("Can't fit current cache inside new cache size") } + val tempQueue: BlockingQueue = LinkedBlockingQueue(newSize) + queue.drainTo(tempQueue) + queue = tempQueue } fun getCacheSize(): Int { diff --git a/rtsp/src/main/java/com/pedro/rtsp/utils/Extensions.kt b/rtsp/src/main/java/com/pedro/rtsp/utils/Extensions.kt index 7c574e17b..dbfb33890 100644 --- a/rtsp/src/main/java/com/pedro/rtsp/utils/Extensions.kt +++ b/rtsp/src/main/java/com/pedro/rtsp/utils/Extensions.kt @@ -17,9 +17,20 @@ package com.pedro.rtsp.utils import android.util.Base64 +import com.pedro.rtsp.rtsp.RtpFrame import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.withContext import java.nio.ByteBuffer +import java.util.concurrent.BlockingQueue + +inline infix fun BlockingQueue.trySend(item: T): Boolean { + return try { + this.add(item) + true + } catch (e: IllegalStateException) { + false + } +} fun ByteArray.encodeToString(flags: Int = Base64.NO_WRAP): String { return Base64.encodeToString(this, flags) diff --git a/srt/src/main/java/com/pedro/srt/srt/CommandsManager.kt b/srt/src/main/java/com/pedro/srt/srt/CommandsManager.kt index 9423a7dc9..c7e4d307f 100644 --- a/srt/src/main/java/com/pedro/srt/srt/CommandsManager.kt +++ b/srt/src/main/java/com/pedro/srt/srt/CommandsManager.kt @@ -97,7 +97,6 @@ class CommandsManager { packetHandlingQueue.add(dataPacket) dataPacket.write() socket?.write(dataPacket) - Log.i(TAG, dataPacket.toString()) return dataPacket.getSize() } } diff --git a/srt/src/main/java/com/pedro/srt/srt/SrtSender.kt b/srt/src/main/java/com/pedro/srt/srt/SrtSender.kt index ecf438427..9b5b165d5 100644 --- a/srt/src/main/java/com/pedro/srt/srt/SrtSender.kt +++ b/srt/src/main/java/com/pedro/srt/srt/SrtSender.kt @@ -34,6 +34,7 @@ import com.pedro.srt.utils.BitrateManager import com.pedro.srt.utils.ConnectCheckerSrt import com.pedro.srt.utils.SrtSocket import com.pedro.srt.utils.onMainThread +import com.pedro.srt.utils.trySend import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Job @@ -42,7 +43,11 @@ import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.receiveAsFlow import kotlinx.coroutines.isActive import kotlinx.coroutines.launch +import kotlinx.coroutines.runInterruptible import java.nio.ByteBuffer +import java.util.concurrent.BlockingQueue +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicInteger /** @@ -66,15 +71,12 @@ class SrtSender( @Volatile private var running = false - - private var cacheSize = 60 - @Volatile - private var itemsInQueue = 0 + private var cacheSize = 200 private var job: Job? = null private val scope = CoroutineScope(Dispatchers.IO) - private var queue = Channel>(cacheSize) - private var queueFlow = queue.receiveAsFlow() + @Volatile + private var queue: BlockingQueue> = LinkedBlockingQueue(cacheSize) private var audioFramesSent: Long = 0 private var videoFramesSent: Long = 0 var socket: SrtSocket? = null @@ -120,11 +122,9 @@ class SrtSender( checkSendInfo() h26XPacket.createAndSendPacket(h264Buffer, info) { mpegTsPackets -> val result = queue.trySend(mpegTsPackets) - if (!result.isSuccess) { + if (!result) { Log.i(TAG, "Video frame discarded") droppedVideoFrames++ - } else { - itemsInQueue++ } } } @@ -135,21 +135,17 @@ class SrtSender( checkSendInfo() aacPacket.createAndSendPacket(aacBuffer, info) { mpegTsPackets -> val result = queue.trySend(mpegTsPackets) - if (!result.isSuccess) { + if (!result) { Log.i(TAG, "Audio frame discarded") droppedAudioFrames++ - } else { - itemsInQueue++ } } } } fun start() { + queue.clear() setTrackConfig(!commandsManager.videoDisabled, !commandsManager.audioDisabled) - - queue = Channel(cacheSize) - queueFlow = queue.receiveAsFlow() running = true job = scope.launch { //send config @@ -157,9 +153,11 @@ class SrtSender( MpegTsPacket(b, MpegType.PSI, PacketPosition.SINGLE) } queue.trySend(psiPackets) - queueFlow.collect { mpegTsPackets -> - itemsInQueue-- + while (scope.isActive && running) { val error = runCatching { + val mpegTsPackets = runInterruptible { + queue.poll(1, TimeUnit.SECONDS) + } mpegTsPackets.forEach { mpegTsPacket -> var size = 0 size += commandsManager.writeData(mpegTsPacket, socket) @@ -175,7 +173,7 @@ class SrtSender( connectCheckerSrt.onConnectionFailedSrt("Error send packet, " + error.message) } Log.e(TAG, "send error: ", error) - return@collect + return@launch } } } @@ -207,10 +205,6 @@ class SrtSender( suspend fun stop() { running = false - queue.cancel() - itemsInQueue = 0 - queue = Channel(cacheSize) - queueFlow = queue.receiveAsFlow() psiManager.reset() service.clear() mpegTsPacketizer.reset() @@ -222,22 +216,23 @@ class SrtSender( resetDroppedVideoFrames() job?.cancelAndJoin() job = null + queue.clear() } fun hasCongestion(): Boolean { - val size = cacheSize - val remaining = cacheSize - itemsInQueue + val size = queue.size.toFloat() + val remaining = queue.remainingCapacity().toFloat() val capacity = size + remaining return size >= capacity * 0.2f //more than 20% queue used. You could have congestion } fun resizeCache(newSize: Int) { - if (!scope.isActive) { - val tempQueue = Channel>(newSize) - queue = tempQueue - queueFlow = queue.receiveAsFlow() - cacheSize = newSize + if (newSize < queue.size - queue.remainingCapacity()) { + throw RuntimeException("Can't fit current cache inside new cache size") } + val tempQueue: BlockingQueue> = LinkedBlockingQueue(newSize) + queue.drainTo(tempQueue) + queue = tempQueue } fun getCacheSize(): Int { diff --git a/srt/src/main/java/com/pedro/srt/utils/Extensions.kt b/srt/src/main/java/com/pedro/srt/utils/Extensions.kt index 5ca960bf9..51c290293 100644 --- a/srt/src/main/java/com/pedro/srt/utils/Extensions.kt +++ b/srt/src/main/java/com/pedro/srt/utils/Extensions.kt @@ -21,6 +21,16 @@ import kotlinx.coroutines.withContext import java.io.InputStream import java.io.OutputStream import java.nio.ByteBuffer +import java.util.concurrent.BlockingQueue + +inline infix fun BlockingQueue.trySend(item: T): Boolean { + return try { + this.add(item) + true + } catch (e: IllegalStateException) { + false + } +} fun ByteBuffer.toByteArray(): ByteArray { return if (this.hasArray() && !isDirect) {