Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use no buffered socket and flush withcontext #1585

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public class MicrophoneManager {
private AudioPostProcessEffect audioPostProcessEffect;
protected HandlerThread handlerThread;
protected CustomAudioEffect customAudioEffect = new NoAudioEffect();
private long timeStamp = 0;

public MicrophoneManager(GetMicrophoneData getMicrophoneData) {
this.getMicrophoneData = getMicrophoneData;
Expand Down Expand Up @@ -162,6 +163,7 @@ public boolean createInternalMicrophone(AudioPlaybackCaptureConfiguration config
* Start record and get data
*/
public synchronized void start() {
timeStamp = 0;
init();
handlerThread = new HandlerThread(TAG);
handlerThread.start();
Expand Down Expand Up @@ -211,8 +213,10 @@ public boolean isMuted() {
* @return Object with size and PCM buffer data
*/
protected Frame read() {
long timeStamp = System.nanoTime() / 1000;
if (timeStamp == 0) timeStamp = System.nanoTime() / 1000;
int size = audioRecord.read(pcmBuffer, 0, pcmBuffer.length);
int channels = channel == AudioFormat.CHANNEL_IN_STEREO ? 2 : 1;
timeStamp += (long) size / sampleRate / channels;
if (size < 0) {
Log.e(TAG, "read error: " + size);
return null;
Expand Down Expand Up @@ -242,6 +246,7 @@ public synchronized void stop() {
if (audioPostProcessEffect != null) {
audioPostProcessEffect.release();
}
timeStamp = 0;
Log.i(TAG, "Microphone stopped");
}

Expand Down
2 changes: 2 additions & 0 deletions rtmp/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ afterEvaluate {
}

dependencies {
implementation("io.ktor:ktor-network:2.3.12")
implementation("io.ktor:ktor-network-tls:2.3.12")
implementation(libs.kotlinx.coroutines.android)
testImplementation(libs.kotlinx.coroutines.test)
testImplementation(libs.junit)
Expand Down
62 changes: 25 additions & 37 deletions rtmp/src/main/java/com/pedro/rtmp/rtmp/CommandsManager.kt
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,12 @@ abstract class CommandsManager {
@Throws(IOException::class)
suspend fun sendChunkSize(socket: RtmpSocket) {
writeSync.withLock {
val output = socket.getOutStream()
if (RtmpConfig.writeChunkSize != RtmpConfig.DEFAULT_CHUNK_SIZE) {
val chunkSize = SetChunkSize(RtmpConfig.writeChunkSize)
chunkSize.header.timeStamp = getCurrentTimestamp()
chunkSize.header.messageStreamId = streamId
chunkSize.writeHeader(output)
chunkSize.writeBody(output)
chunkSize.writeHeader(socket)
chunkSize.writeBody(socket)
socket.flush()
Log.i(TAG, "send $chunkSize")
} else {
Expand All @@ -111,25 +110,22 @@ abstract class CommandsManager {
@Throws(IOException::class)
suspend fun sendConnect(auth: String, socket: RtmpSocket) {
writeSync.withLock {
val output = socket.getOutStream()
sendConnect(auth, output)
sendConnectImp(auth, socket)
socket.flush()
}
}

@Throws(IOException::class)
suspend fun createStream(socket: RtmpSocket) {
writeSync.withLock {
val output = socket.getOutStream()
createStream(output)
createStreamImp(socket)
socket.flush()
}
}

@Throws(IOException::class)
fun readMessageResponse(socket: RtmpSocket): RtmpMessage {
val input = socket.getInputStream()
val message = RtmpMessage.getRtmpMessage(input, readChunkSize, sessionHistory)
suspend fun readMessageResponse(socket: RtmpSocket): RtmpMessage {
val message = RtmpMessage.getRtmpMessage(socket, readChunkSize, sessionHistory)
sessionHistory.setReadHeader(message.header)
Log.i(TAG, "read $message")
bytesRead += message.header.getPacketLength()
Expand All @@ -139,38 +135,34 @@ abstract class CommandsManager {
@Throws(IOException::class)
suspend fun sendMetadata(socket: RtmpSocket) {
writeSync.withLock {
val output = socket.getOutStream()
sendMetadata(output)
sendMetadataImp(socket)
socket.flush()
}
}

@Throws(IOException::class)
suspend fun sendPublish(socket: RtmpSocket) {
writeSync.withLock {
val output = socket.getOutStream()
sendPublish(output)
sendPublishImp(socket)
socket.flush()
}
}

@Throws(IOException::class)
suspend fun sendWindowAcknowledgementSize(socket: RtmpSocket) {
writeSync.withLock {
val output = socket.getOutStream()
val windowAcknowledgementSize = WindowAcknowledgementSize(RtmpConfig.acknowledgementWindowSize, getCurrentTimestamp())
windowAcknowledgementSize.writeHeader(output)
windowAcknowledgementSize.writeBody(output)
windowAcknowledgementSize.writeHeader(socket)
windowAcknowledgementSize.writeBody(socket)
socket.flush()
}
}

suspend fun sendPong(event: Event, socket: RtmpSocket) {
writeSync.withLock {
val output = socket.getOutStream()
val pong = UserControl(Type.PONG_REPLY, event)
pong.writeHeader(output)
pong.writeBody(output)
pong.writeHeader(socket)
pong.writeBody(socket)
socket.flush()
Log.i(TAG, "send pong")
}
Expand All @@ -179,8 +171,7 @@ abstract class CommandsManager {
@Throws(IOException::class)
suspend fun sendClose(socket: RtmpSocket) {
writeSync.withLock {
val output = socket.getOutStream()
sendClose(output)
sendCloseImp(socket)
socket.flush()
}
}
Expand All @@ -190,11 +181,10 @@ abstract class CommandsManager {
if (bytesRead >= RtmpConfig.acknowledgementWindowSize) {
acknowledgementSequence += bytesRead
bytesRead -= RtmpConfig.acknowledgementWindowSize
val output = socket.getOutStream()
val acknowledgement = Acknowledgement(acknowledgementSequence)
acknowledgement.writeHeader(output)
acknowledgement.writeBody(output)
output.flush()
acknowledgement.writeHeader(socket)
acknowledgement.writeBody(socket)
socket.flush()
Log.i(TAG, "send $acknowledgement")
}
}
Expand All @@ -203,13 +193,12 @@ abstract class CommandsManager {
@Throws(IOException::class)
suspend fun sendVideoPacket(flvPacket: FlvPacket, socket: RtmpSocket): Int {
writeSync.withLock {
val output = socket.getOutStream()
if (incrementalTs) {
flvPacket.timeStamp = ((TimeUtils.getCurrentTimeNano() / 1000 - startTs) / 1000)
}
val video = Video(flvPacket, streamId)
video.writeHeader(output)
video.writeBody(output)
video.writeHeader(socket)
video.writeBody(socket)
socket.flush(true)
return video.header.getPacketLength() //get packet size with header included to calculate bps
}
Expand All @@ -218,23 +207,22 @@ abstract class CommandsManager {
@Throws(IOException::class)
suspend fun sendAudioPacket(flvPacket: FlvPacket, socket: RtmpSocket): Int {
writeSync.withLock {
val output = socket.getOutStream()
if (incrementalTs) {
flvPacket.timeStamp = ((TimeUtils.getCurrentTimeNano() / 1000 - startTs) / 1000)
}
val audio = Audio(flvPacket, streamId)
audio.writeHeader(output)
audio.writeBody(output)
audio.writeHeader(socket)
audio.writeBody(socket)
socket.flush(true)
return audio.header.getPacketLength() //get packet size with header included to calculate bps
}
}

abstract fun sendConnect(auth: String, output: OutputStream)
abstract fun createStream(output: OutputStream)
abstract fun sendMetadata(output: OutputStream)
abstract fun sendPublish(output: OutputStream)
abstract fun sendClose(output: OutputStream)
abstract suspend fun sendConnectImp(auth: String, output: RtmpSocket)
abstract suspend fun createStreamImp(output: RtmpSocket)
abstract suspend fun sendMetadataImp(output: RtmpSocket)
abstract suspend fun sendPublishImp(output: RtmpSocket)
abstract suspend fun sendCloseImp(output: RtmpSocket)

fun reset() {
startTs = 0
Expand Down
11 changes: 6 additions & 5 deletions rtmp/src/main/java/com/pedro/rtmp/rtmp/CommandsManagerAmf0.kt
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ import com.pedro.rtmp.rtmp.chunk.ChunkType
import com.pedro.rtmp.rtmp.message.BasicHeader
import com.pedro.rtmp.rtmp.message.command.CommandAmf0
import com.pedro.rtmp.rtmp.message.data.DataAmf0
import com.pedro.rtmp.utils.socket.RtmpSocket
import java.io.OutputStream

class CommandsManagerAmf0: CommandsManager() {
override fun sendConnect(auth: String, output: OutputStream) {
override suspend fun sendConnectImp(auth: String, output: RtmpSocket) {
val connect = CommandAmf0("connect", ++commandId, getCurrentTimestamp(), streamId,
BasicHeader(ChunkType.TYPE_0, ChunkStreamId.OVER_CONNECTION.mark))
val connectInfo = AmfObject()
Expand Down Expand Up @@ -64,7 +65,7 @@ class CommandsManagerAmf0: CommandsManager() {
Log.i(TAG, "send $connect")
}

override fun createStream(output: OutputStream) {
override suspend fun createStreamImp(output: RtmpSocket) {
val releaseStream = CommandAmf0("releaseStream", ++commandId, getCurrentTimestamp(), streamId,
BasicHeader(ChunkType.TYPE_0, ChunkStreamId.OVER_STREAM.mark))
releaseStream.addData(AmfNull())
Expand Down Expand Up @@ -95,7 +96,7 @@ class CommandsManagerAmf0: CommandsManager() {
Log.i(TAG, "send $createStream")
}

override fun sendMetadata(output: OutputStream) {
override suspend fun sendMetadataImp(output: RtmpSocket) {
val name = "@setDataFrame"
val metadata = DataAmf0(name, getCurrentTimestamp(), streamId)
metadata.addData(AmfString("onMetaData"))
Expand Down Expand Up @@ -134,7 +135,7 @@ class CommandsManagerAmf0: CommandsManager() {
Log.i(TAG, "send $metadata")
}

override fun sendPublish(output: OutputStream) {
override suspend fun sendPublishImp(output: RtmpSocket) {
val name = "publish"
val publish = CommandAmf0(name, ++commandId, getCurrentTimestamp(), streamId,
BasicHeader(ChunkType.TYPE_0, ChunkStreamId.OVER_STREAM.mark))
Expand All @@ -148,7 +149,7 @@ class CommandsManagerAmf0: CommandsManager() {
Log.i(TAG, "send $publish")
}

override fun sendClose(output: OutputStream) {
override suspend fun sendCloseImp(output: RtmpSocket) {
val name = "closeStream"
val closeStream = CommandAmf0(name, ++commandId, getCurrentTimestamp(), streamId, BasicHeader(ChunkType.TYPE_0, ChunkStreamId.OVER_STREAM.mark))
closeStream.addData(AmfNull())
Expand Down
11 changes: 6 additions & 5 deletions rtmp/src/main/java/com/pedro/rtmp/rtmp/CommandsManagerAmf3.kt
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ import com.pedro.rtmp.rtmp.chunk.ChunkType
import com.pedro.rtmp.rtmp.message.BasicHeader
import com.pedro.rtmp.rtmp.message.command.CommandAmf3
import com.pedro.rtmp.rtmp.message.data.DataAmf3
import com.pedro.rtmp.utils.socket.RtmpSocket
import java.io.OutputStream

class CommandsManagerAmf3: CommandsManager() {
override fun sendConnect(auth: String, output: OutputStream) {
override suspend fun sendConnectImp(auth: String, output: RtmpSocket) {
val connect = CommandAmf3("connect", ++commandId, getCurrentTimestamp(), streamId,
BasicHeader(ChunkType.TYPE_0, ChunkStreamId.OVER_CONNECTION.mark))
val connectInfo = Amf3Object()
Expand Down Expand Up @@ -63,7 +64,7 @@ class CommandsManagerAmf3: CommandsManager() {
Log.i(TAG, "send $connect")
}

override fun createStream(output: OutputStream) {
override suspend fun createStreamImp(output: RtmpSocket) {
val releaseStream = CommandAmf3("releaseStream", ++commandId, getCurrentTimestamp(), streamId,
BasicHeader(ChunkType.TYPE_0, ChunkStreamId.OVER_STREAM.mark))
releaseStream.addData(Amf3Null())
Expand Down Expand Up @@ -94,7 +95,7 @@ class CommandsManagerAmf3: CommandsManager() {
Log.i(TAG, "send $createStream")
}

override fun sendMetadata(output: OutputStream) {
override suspend fun sendMetadataImp(output: RtmpSocket) {
val name = "@setDataFrame"
val metadata = DataAmf3(name, getCurrentTimestamp(), streamId)
metadata.addData(Amf3String("onMetaData"))
Expand Down Expand Up @@ -125,7 +126,7 @@ class CommandsManagerAmf3: CommandsManager() {
Log.i(TAG, "send $metadata")
}

override fun sendPublish(output: OutputStream) {
override suspend fun sendPublishImp(output: RtmpSocket) {
val name = "publish"
val publish = CommandAmf3(name, ++commandId, getCurrentTimestamp(), streamId,
BasicHeader(ChunkType.TYPE_0, ChunkStreamId.OVER_STREAM.mark))
Expand All @@ -139,7 +140,7 @@ class CommandsManagerAmf3: CommandsManager() {
Log.i(TAG, "send $publish")
}

override fun sendClose(output: OutputStream) {
override suspend fun sendCloseImp(output: RtmpSocket) {
val name = "closeStream"
val closeStream = CommandAmf3(name, ++commandId, getCurrentTimestamp(), streamId, BasicHeader(ChunkType.TYPE_0, ChunkStreamId.OVER_STREAM.mark))
closeStream.addData(Amf3Null())
Expand Down
30 changes: 13 additions & 17 deletions rtmp/src/main/java/com/pedro/rtmp/rtmp/Handshake.kt
Original file line number Diff line number Diff line change
Expand Up @@ -75,31 +75,27 @@ class Handshake {
private var timestampC1 = 0

@Throws(IOException::class)
fun sendHandshake(socket: RtmpSocket): Boolean {
var output = socket.getOutStream()
writeC0(output)
val c1 = writeC1(output)
suspend fun sendHandshake(socket: RtmpSocket): Boolean {
writeC0(socket)
val c1 = writeC1(socket)
socket.flush()
var input = socket.getInputStream()
readS0(input)
val s1 = readS1(input)
output = socket.getOutStream()
writeC2(output, s1)
readS0(socket)
val s1 = readS1(socket)
writeC2(socket, s1)
socket.flush()
input = socket.getInputStream()
readS2(input, c1)
readS2(socket, c1)
return true
}

@Throws(IOException::class)
private fun writeC0(output: OutputStream) {
private suspend fun writeC0(output: RtmpSocket) {
Log.i(TAG, "writing C0")
output.write(protocolVersion)
Log.i(TAG, "C0 write successful")
}

@Throws(IOException::class)
private fun writeC1(output: OutputStream): ByteArray {
private suspend fun writeC1(output: RtmpSocket): ByteArray {
Log.i(TAG, "writing C1")
val c1 = ByteArray(handshakeSize)

Expand Down Expand Up @@ -131,14 +127,14 @@ class Handshake {
}

@Throws(IOException::class)
private fun writeC2(output: OutputStream, s1: ByteArray) {
private suspend fun writeC2(output: RtmpSocket, s1: ByteArray) {
Log.i(TAG, "writing C2")
output.write(s1)
Log.i(TAG, "C2 write successful")
}

@Throws(IOException::class)
private fun readS0(input: InputStream): ByteArray {
private suspend fun readS0(input: RtmpSocket): ByteArray {
Log.i(TAG, "reading S0")
val response = input.read()
if (response == protocolVersion || response == 72) {
Expand All @@ -150,7 +146,7 @@ class Handshake {
}

@Throws(IOException::class)
private fun readS1(input: InputStream): ByteArray {
private suspend fun readS1(input: RtmpSocket): ByteArray {
Log.i(TAG, "reading S1")
val s1 = ByteArray(handshakeSize)
input.readUntil(s1)
Expand All @@ -159,7 +155,7 @@ class Handshake {
}

@Throws(IOException::class)
private fun readS2(input: InputStream, c1: ByteArray): ByteArray {
private suspend fun readS2(input: RtmpSocket, c1: ByteArray): ByteArray {
Log.i(TAG, "reading S2")
val s2 = ByteArray(handshakeSize)
input.readUntil(s2)
Expand Down
4 changes: 2 additions & 2 deletions rtmp/src/main/java/com/pedro/rtmp/rtmp/RtmpClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ class RtmpClient(private val connectChecker: ConnectChecker) {
}

@Throws(IOException::class)
private fun establishConnection(): Boolean {
private suspend fun establishConnection(): Boolean {
val socket = if (tunneled) {
TcpTunneledSocket(commandsManager.host, commandsManager.port, tlsEnabled)
} else {
Expand Down Expand Up @@ -491,7 +491,7 @@ class RtmpClient(private val connectChecker: ConnectChecker) {
}
}

fun closeConnection() {
suspend fun closeConnection() {
socket?.close()
commandsManager.reset()
}
Expand Down
Loading
Loading