Skip to content

Commit

Permalink
timeout, and change scope in rtmpSender
Browse files Browse the repository at this point in the history
  • Loading branch information
pedroSG94 committed Sep 29, 2024
1 parent 8e1804d commit 98bec9d
Showing 1 changed file with 25 additions and 18 deletions.
43 changes: 25 additions & 18 deletions rtmp/src/main/java/com/pedro/rtmp/rtmp/RtmpSender.kt
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,17 @@ import com.pedro.rtmp.utils.socket.RtmpSocket
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.async
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.delay
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.runInterruptible
import kotlinx.coroutines.withTimeoutOrNull
import java.nio.ByteBuffer
import java.util.concurrent.BlockingQueue
import java.util.concurrent.Executors
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit

Expand All @@ -63,7 +66,7 @@ class RtmpSender(
private var cacheSize = 200

private var job: Job? = null
private val scope = CoroutineScope(Dispatchers.IO)
private var scope = CoroutineScope(Executors.newSingleThreadExecutor().asCoroutineDispatcher())
@Volatile
private var queue: BlockingQueue<FlvPacket> = LinkedBlockingQueue(cacheSize)
private var audioFramesSent: Long = 0
Expand Down Expand Up @@ -120,7 +123,7 @@ class RtmpSender(
videoPacket.createFlvPacket(h264Buffer, info) { flvPacket ->
val result = queue.trySend(flvPacket)
if (!result) {
Log.i(TAG, "Video frame discarded")
Log.i(TAG, "Video frame discarded, threadEnabled: ${job?.isActive}")
droppedVideoFrames++
}
}
Expand All @@ -132,7 +135,7 @@ class RtmpSender(
audioPacket.createFlvPacket(aacBuffer, info) { flvPacket ->
val result = queue.trySend(flvPacket)
if (!result) {
Log.i(TAG, "Audio frame discarded")
Log.i(TAG, "Audio frame discarded, threadEnabled: ${job?.isActive}")
droppedAudioFrames++
}
}
Expand All @@ -143,6 +146,7 @@ class RtmpSender(
bitrateManager.reset()
queue.clear()
running = true
scope = CoroutineScope(Executors.newSingleThreadExecutor().asCoroutineDispatcher())
job = scope.launch {
var bytesSend = 0L
val bitrateTask = async {
Expand All @@ -161,25 +165,28 @@ class RtmpSender(
if (flvPacket == null) {
Log.i(TAG, "Skipping iteration, frame null")
} else {
var size = 0
if (flvPacket.type == FlvType.VIDEO) {
videoFramesSent++
socket?.let { socket ->
size = commandsManager.sendVideoPacket(flvPacket, socket)
if (isEnableLogs) {
Log.i(TAG, "wrote Video packet, size $size")
//Discard packet if take more than 0,5s to send to avoid block coroutine
withTimeoutOrNull(500) {
var size = 0
if (flvPacket.type == FlvType.VIDEO) {
videoFramesSent++
socket?.let { socket ->
size = commandsManager.sendVideoPacket(flvPacket, socket)
if (isEnableLogs) {
Log.i(TAG, "wrote Video packet, size $size")
}
}
}
} else {
audioFramesSent++
socket?.let { socket ->
size = commandsManager.sendAudioPacket(flvPacket, socket)
if (isEnableLogs) {
Log.i(TAG, "wrote Audio packet, size $size")
} else {
audioFramesSent++
socket?.let { socket ->
size = commandsManager.sendAudioPacket(flvPacket, socket)
if (isEnableLogs) {
Log.i(TAG, "wrote Audio packet, size $size")
}
}
}
bytesSend += size
}
bytesSend += size
}
}.exceptionOrNull()
if (error != null) {
Expand Down

0 comments on commit 98bec9d

Please sign in to comment.