Skip to content

Commit

Permalink
Merge pull request #1594 from pedroSG94/feature/packtizer-async
Browse files Browse the repository at this point in the history
Feature/packtizer async
  • Loading branch information
pedroSG94 authored Oct 1, 2024
2 parents de6ad6b + de0458f commit cdfeed3
Show file tree
Hide file tree
Showing 58 changed files with 752 additions and 1,103 deletions.
2 changes: 1 addition & 1 deletion common/src/main/AndroidManifest.xml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<?xml version="1.0" encoding="utf-8"?>
<manifest xmlns:android="http://schemas.android.com/apk/res/android">
<manifest>

</manifest>
16 changes: 15 additions & 1 deletion common/src/main/java/com/pedro/common/Extensions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ package com.pedro.common
import android.hardware.camera2.CameraCharacteristics
import android.hardware.camera2.CaptureRequest
import android.media.MediaCodec
import android.media.MediaFormat
import android.os.Build
import android.os.Handler
import android.os.Looper
import androidx.annotation.RequiresApi
import com.pedro.common.frame.MediaFrame
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext
import java.io.UnsupportedEncodingException
Expand Down Expand Up @@ -60,7 +62,7 @@ fun ByteBuffer.toByteArray(): ByteArray {
}
}

fun ByteBuffer.removeInfo(info: MediaCodec.BufferInfo): ByteBuffer {
fun ByteBuffer.removeInfo(info: MediaFrame.Info): ByteBuffer {
try {
position(info.offset)
limit(info.size)
Expand Down Expand Up @@ -139,4 +141,16 @@ fun String.getIndexes(char: Char): Array<Int> {

fun Throwable.validMessage(): String {
return (message ?: "").ifEmpty { javaClass.simpleName }
}

fun MediaCodec.BufferInfo.toMediaFrameInfo() = MediaFrame.Info(offset, size, presentationTimeUs, isKeyframe())

fun ByteBuffer.clone(): ByteBuffer = ByteBuffer.wrap(toByteArray())

fun MediaFormat.getIntegerSafe(name: String): Int? {
return try { getInteger(name) } catch (e: Exception) { null }
}

fun MediaFormat.getLongSafe(name: String): Long? {
return try { getLong(name) } catch (e: Exception) { null }
}
147 changes: 147 additions & 0 deletions common/src/main/java/com/pedro/common/base/BaseSender.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package com.pedro.common.base

import android.util.Log
import com.pedro.common.BitrateManager
import com.pedro.common.ConnectChecker
import com.pedro.common.frame.MediaFrame
import com.pedro.common.trySend
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.async
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.delay
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import java.nio.ByteBuffer
import java.util.concurrent.BlockingQueue
import java.util.concurrent.LinkedBlockingQueue

abstract class BaseSender(
protected val connectChecker: ConnectChecker,
protected val TAG: String
) {

@Volatile
protected var running = false
private var cacheSize = 200
@Volatile
protected var queue: BlockingQueue<MediaFrame> = LinkedBlockingQueue(cacheSize)
protected var audioFramesSent: Long = 0
protected var videoFramesSent: Long = 0
var droppedAudioFrames: Long = 0
protected set
var droppedVideoFrames: Long = 0
protected set
protected val bitrateManager: BitrateManager = BitrateManager(connectChecker)
protected var isEnableLogs = true
private var job: Job? = null
protected val scope = CoroutineScope(Dispatchers.IO)
@Volatile
protected var bytesSend = 0L

abstract fun setVideoInfo(sps: ByteBuffer, pps: ByteBuffer?, vps: ByteBuffer?)
abstract fun setAudioInfo(sampleRate: Int, isStereo: Boolean)
protected abstract suspend fun onRun()
protected abstract suspend fun stopImp(clear: Boolean = true)

fun sendMediaFrame(mediaFrame: MediaFrame) {
if (running && !queue.trySend(mediaFrame)) {
when (mediaFrame.type) {
MediaFrame.Type.VIDEO -> {
Log.i(TAG, "Video frame discarded")
droppedVideoFrames++
}
MediaFrame.Type.AUDIO -> {
Log.i(TAG, "Audio frame discarded")
droppedAudioFrames++
}
}
}
}

fun start() {
bitrateManager.reset()
queue.clear()
running = true
scope.launch {
val bitrateTask = async {
while (scope.isActive && running) {
//bytes to bits
bitrateManager.calculateBitrate(bytesSend * 8)
bytesSend = 0
delay(timeMillis = 1000)
}
}
onRun()
}
}

suspend fun stop(clear: Boolean = true) {
running = false
stopImp(clear)
resetSentAudioFrames()
resetSentVideoFrames()
resetDroppedAudioFrames()
resetDroppedVideoFrames()
job?.cancelAndJoin()
job = null
queue.clear()
}

@Throws(IllegalArgumentException::class)
fun hasCongestion(percentUsed: Float = 20f): Boolean {
if (percentUsed < 0 || percentUsed > 100) throw IllegalArgumentException("the value must be in range 0 to 100")
val size = queue.size.toFloat()
val remaining = queue.remainingCapacity().toFloat()
val capacity = size + remaining
return size >= capacity * (percentUsed / 100f)
}

fun resizeCache(newSize: Int) {
if (newSize < queue.size - queue.remainingCapacity()) {
throw RuntimeException("Can't fit current cache inside new cache size")
}
val tempQueue: BlockingQueue<MediaFrame> = LinkedBlockingQueue(newSize)
queue.drainTo(tempQueue)
queue = tempQueue
}

fun getCacheSize(): Int = cacheSize

fun getItemsInCache(): Int = queue.size

fun clearCache() {
queue.clear()
}

fun getSentAudioFrames(): Long = audioFramesSent

fun getSentVideoFrames(): Long = videoFramesSent

fun resetSentAudioFrames() {
audioFramesSent = 0
}

fun resetSentVideoFrames() {
videoFramesSent = 0
}

fun resetDroppedAudioFrames() {
droppedAudioFrames = 0
}

fun resetDroppedVideoFrames() {
droppedVideoFrames = 0
}

fun setLogs(enable: Boolean) {
isEnableLogs = enable
}

fun setBitrateExponentialFactor(factor: Float) {
bitrateManager.exponentialFactor = factor
}

fun getBitrateExponentialFactor() = bitrateManager.exponentialFactor
}
20 changes: 20 additions & 0 deletions common/src/main/java/com/pedro/common/frame/MediaFrame.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.pedro.common.frame

import java.nio.ByteBuffer

data class MediaFrame(
val data: ByteBuffer,
val info: Info,
val type: Type
) {
data class Info(
val offset: Int,
val size: Int,
val timestamp: Long,
val isKeyFrame: Boolean
)

enum class Type {
VIDEO, AUDIO
}
}
9 changes: 2 additions & 7 deletions common/src/test/java/com/pedro/common/ExtensionTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package com.pedro.common

import android.media.MediaCodec
import com.pedro.common.frame.MediaFrame
import org.junit.Assert.assertEquals
import org.junit.Test
import java.nio.ByteBuffer
Expand All @@ -29,14 +29,9 @@ class ExtensionTest {
@Test
fun `remove info`() {
val buffer = ByteBuffer.wrap(ByteArray(256) { 0x00 }.mapIndexed { index, byte -> index.toByte() }.toByteArray())
val info = MediaCodec.BufferInfo()
val offset = 4
val minusLimit = 2
info.presentationTimeUs = 0
info.offset = offset
info.size = buffer.remaining() - minusLimit
info.flags = 0

val info = MediaFrame.Info(4, buffer.remaining() - minusLimit, 0, false)
val result = buffer.removeInfo(info)
assertEquals(buffer.capacity() - offset - minusLimit, result.remaining())
assertEquals(offset.toByte(), result.get(0))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import android.media.MediaFormat;
import android.util.Log;

import com.pedro.common.ExtensionsKt;
import com.pedro.encoder.Frame;
import com.pedro.encoder.input.audio.GetMicrophoneData;
import com.pedro.encoder.utils.CodecUtil;
Expand Down Expand Up @@ -63,10 +64,14 @@ protected boolean extract(MediaExtractor audioExtractor) {
}
}
if (mediaFormat != null) {
channels = mediaFormat.getInteger(MediaFormat.KEY_CHANNEL_COUNT);
final Integer channels = ExtensionsKt.getIntegerSafe(mediaFormat, MediaFormat.KEY_CHANNEL_COUNT);
final Integer sampleRate = ExtensionsKt.getIntegerSafe(mediaFormat, MediaFormat.KEY_SAMPLE_RATE);
final Long duration = ExtensionsKt.getLongSafe(mediaFormat, MediaFormat.KEY_DURATION);
if (channels == null || sampleRate == null) return false;
this.channels = channels;
isStereo = channels >= 2;
sampleRate = mediaFormat.getInteger(MediaFormat.KEY_SAMPLE_RATE);
duration = mediaFormat.getLong(MediaFormat.KEY_DURATION);
this.sampleRate = sampleRate;
this.duration = duration != null ? duration : -1;
fixBuffer();
return true;
//audio decoder not supported
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ public boolean isLoopMode() {
}

public double getDuration() {
if (duration < 0) return duration; //fail to extract duration from file.
return duration / 10E5;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import android.os.Build;
import android.view.Surface;

import com.pedro.common.ExtensionsKt;
import java.nio.ByteBuffer;

/**
Expand Down Expand Up @@ -51,10 +52,15 @@ protected boolean extract(MediaExtractor videoExtractor) {
}
}
if (mediaFormat != null) {
width = mediaFormat.getInteger(MediaFormat.KEY_WIDTH);
height = mediaFormat.getInteger(MediaFormat.KEY_HEIGHT);
duration = mediaFormat.getLong(MediaFormat.KEY_DURATION);
fps = mediaFormat.getInteger(MediaFormat.KEY_FRAME_RATE);
final Integer width = ExtensionsKt.getIntegerSafe(mediaFormat, MediaFormat.KEY_WIDTH);
final Integer height = ExtensionsKt.getIntegerSafe(mediaFormat, MediaFormat.KEY_HEIGHT);
final Long duration = ExtensionsKt.getLongSafe(mediaFormat, MediaFormat.KEY_DURATION);
final Integer fps = ExtensionsKt.getIntegerSafe(mediaFormat, MediaFormat.KEY_FRAME_RATE);
if (width == null || height == null) return false;
this.width = width;
this.height = height;
this.duration = duration != null ? duration : -1;
this.fps = fps != null ? fps : 30;
return true;
//video decoder not supported
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -619,14 +619,14 @@ public double getAudioTime() {
}

/**
* @return return duration in seconds. 0 if no streaming
* @return return duration in seconds. 0 if no streaming, -1 if can't extract it from file
*/
public double getVideoDuration() {
return videoDecoder.getDuration();
}

/**
* @return return duration in seconds. 0 if no streaming
* @return return duration in seconds. 0 if no streaming, -1 if can't extract it from file
*/
public double getAudioDuration() {
return audioDecoder.getDuration();
Expand Down
4 changes: 2 additions & 2 deletions rtmp/src/main/java/com/pedro/rtmp/flv/BasePacket.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@

package com.pedro.rtmp.flv

import android.media.MediaCodec
import com.pedro.common.frame.MediaFrame
import java.nio.ByteBuffer

/**
* Created by pedro on 21/12/23.
*/
abstract class BasePacket {

abstract fun createFlvPacket(byteBuffer: ByteBuffer, info: MediaCodec.BufferInfo, callback: (FlvPacket) -> Unit)
abstract suspend fun createFlvPacket(byteBuffer: ByteBuffer, info: MediaFrame.Info, callback: suspend (FlvPacket) -> Unit)
abstract fun reset(resetInfo: Boolean = true)
}
10 changes: 5 additions & 5 deletions rtmp/src/main/java/com/pedro/rtmp/flv/audio/packet/AacPacket.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package com.pedro.rtmp.flv.audio.packet

import android.media.MediaCodec
import com.pedro.common.frame.MediaFrame
import com.pedro.common.removeInfo
import com.pedro.rtmp.flv.BasePacket
import com.pedro.rtmp.flv.FlvPacket
Expand Down Expand Up @@ -56,10 +56,10 @@ class AacPacket: BasePacket() {
this.audioSize = audioSize
}

override fun createFlvPacket(
override suspend fun createFlvPacket(
byteBuffer: ByteBuffer,
info: MediaCodec.BufferInfo,
callback: (FlvPacket) -> Unit
info: MediaFrame.Info,
callback: suspend (FlvPacket) -> Unit
) {
val fixedBuffer = byteBuffer.removeInfo(info)
//header is 2 bytes length
Expand Down Expand Up @@ -87,7 +87,7 @@ class AacPacket: BasePacket() {
fixedBuffer.get(buffer, header.size, fixedBuffer.remaining())
}
System.arraycopy(header, 0, buffer, 0, header.size)
val ts = info.presentationTimeUs / 1000
val ts = info.timestamp / 1000
callback(FlvPacket(buffer, ts, buffer.size, FlvType.AUDIO))
}

Expand Down
Loading

0 comments on commit cdfeed3

Please sign in to comment.