Skip to content

Commit

Permalink
optimize tcp socket and fix send empty rtp packets
Browse files Browse the repository at this point in the history
  • Loading branch information
pedroSG94 committed Sep 29, 2024
1 parent 8e1804d commit 2511dfb
Show file tree
Hide file tree
Showing 11 changed files with 27 additions and 19 deletions.
24 changes: 12 additions & 12 deletions common/src/main/java/com/pedro/common/socket/TcpStreamSocket.kt
Original file line number Diff line number Diff line change
Expand Up @@ -90,21 +90,15 @@ class TcpStreamSocket(
}

suspend fun writeUInt16(b: Int) {
output?.writeByte(b ushr 8)
output?.writeByte(b)
write(byteArrayOf((b ushr 8).toByte(), b.toByte()))
}

suspend fun writeUInt24(b: Int) {
output?.writeByte(b ushr 16)
output?.writeByte(b ushr 8)
output?.writeByte(b)
write(byteArrayOf((b ushr 16).toByte(), (b ushr 8).toByte(), b.toByte()))
}

suspend fun writeUInt32(b: Int) {
output?.writeByte(b ushr 24)
output?.writeByte(b ushr 16)
output?.writeByte(b ushr 8)
output?.writeByte(b)
write(byteArrayOf((b ushr 24).toByte(), (b ushr 16).toByte(), (b ushr 8).toByte(), b.toByte()))
}

suspend fun writeUInt32LittleEndian(b: Int) {
Expand All @@ -117,15 +111,21 @@ class TcpStreamSocket(
}

suspend fun readUInt16(): Int {
return read() and 0xff shl 8 or (read() and 0xff)
val b = ByteArray(2)
readUntil(b)
return b[0].toInt() and 0xff shl 8 or (b[1].toInt() and 0xff)
}

suspend fun readUInt24(): Int {
return read() and 0xff shl 16 or (read() and 0xff shl 8) or (read() and 0xff)
val b = ByteArray(3)
readUntil(b)
return b[0].toInt() and 0xff shl 16 or (b[1].toInt() and 0xff shl 8) or (b[2].toInt() and 0xff)
}

suspend fun readUInt32(): Int {
return read() and 0xff shl 24 or (read() and 0xff shl 16) or (read() and 0xff shl 8) or (read() and 0xff)
val b = ByteArray(4)
readUntil(b)
return b[0].toInt() and 0xff shl 24 or (b[1].toInt() and 0xff shl 16) or (b[2].toInt() and 0xff shl 8) or (b[3].toInt() and 0xff)
}

suspend fun readUInt32LittleEndian(): Int {
Expand Down
2 changes: 1 addition & 1 deletion rtsp/src/main/java/com/pedro/rtsp/rtp/packets/AacPacket.kt
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,6 @@ class AacPacket(
sum += size
frames.add(rtpFrame)
}
callback(frames)
if (frames.isNotEmpty()) callback(frames)
}
}
2 changes: 1 addition & 1 deletion rtsp/src/main/java/com/pedro/rtsp/rtp/packets/Av1Packet.kt
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ class Av1Packet: BasePacket(
val rtpFrame = RtpFrame(buffer, rtpTs, buffer.size, channelIdentifier)
frames.add(rtpFrame)
}
callback(frames)
if (frames.isNotEmpty()) callback(frames)
}

override fun reset() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,6 @@ class G711Packet(
sum += size
frames.add(rtpFrame)
}
callback(frames)
if (frames.isNotEmpty()) callback(frames)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ class H264Packet(
} else {
Log.i(TAG, "waiting for keyframe")
}
callback(frames)
if (frames.isNotEmpty()) callback(frames)
}

private fun setSpsPps(sps: ByteArray, pps: ByteArray) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class H265Packet: BasePacket(
header[2] = header[2] and 0x7F
}
}
callback(frames)
if (frames.isNotEmpty()) callback(frames)
}

override fun reset() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,6 @@ class OpusPacket(
sum += size
frames.add(rtpFrame)
}
callback(frames)
if (frames.isNotEmpty()) callback(frames)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,5 +55,7 @@ abstract class BaseRtpSocket {
@Throws(IOException::class)
abstract suspend fun sendFrame(rtpFrame: RtpFrame)

abstract suspend fun flush()

abstract suspend fun close()
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ class RtpSocketTcp : BaseRtpSocket() {
sendFrameTCP(rtpFrame)
}

override suspend fun flush() {
socket?.flush()
}

override suspend fun close() {}

@Throws(IOException::class)
Expand All @@ -48,6 +52,5 @@ class RtpSocketTcp : BaseRtpSocket() {
tcpHeader[3] = (len and 0xFF).toByte()
socket?.write(tcpHeader)
socket?.write(rtpFrame.buffer, 0, len)
socket?.flush()
}
}
2 changes: 2 additions & 0 deletions rtsp/src/main/java/com/pedro/rtsp/rtp/sockets/RtpSocketUdp.kt
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class RtpSocketUdp(
sendFrameUDP(rtpFrame)
}

override suspend fun flush() { }

override suspend fun close() {
videoSocket.close()
audioSocket.close()
Expand Down
1 change: 1 addition & 0 deletions rtsp/src/main/java/com/pedro/rtsp/rtsp/RtspSender.kt
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ class RtspSender(
if (isEnableLogs) Log.i(TAG, "wrote report")
}
}
rtpSocket?.flush()
if (isEnableLogs) {
val type = if (isVideo) "Video" else "Audio"
Log.i(TAG, "wrote $type packet, size $size")
Expand Down

0 comments on commit 2511dfb

Please sign in to comment.