Skip to content

Commit

Permalink
Merge pull request #1589 from pedroSG94/feature/ktor-migration
Browse files Browse the repository at this point in the history
Feature/ktor migration
  • Loading branch information
pedroSG94 authored Sep 24, 2024
2 parents a069fa6 + 1512feb commit 8e1804d
Show file tree
Hide file tree
Showing 66 changed files with 1,164 additions and 743 deletions.
2 changes: 2 additions & 0 deletions common/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ afterEvaluate {
}

dependencies {
implementation(libs.ktor.network)
implementation(libs.ktor.network.tls)
implementation(libs.androidx.annotation)
implementation(libs.kotlinx.coroutines.android)
testImplementation(libs.kotlinx.coroutines.test)
Expand Down
51 changes: 51 additions & 0 deletions common/src/main/java/com/pedro/common/ConnectionFailed.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
*
* * Copyright (C) 2024 pedroSG94.
* *
* * Licensed under the Apache License, Version 2.0 (the "License");
* * you may not use this file except in compliance with the License.
* * You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/

package com.pedro.common

/**
* Created by pedro on 23/9/24.
*/
enum class ConnectionFailed {
ENDPOINT_MALFORMED, TIMEOUT, REFUSED, CLOSED_BY_SERVER, NO_INTERNET, UNKNOWN;

companion object {
fun parse(reason: String): ConnectionFailed {
return if (
reason.contains("network is unreachable", ignoreCase = true) ||
reason.contains("software caused connection abort", ignoreCase = true) ||
reason.contains("no route to host", ignoreCase = true)
) {
NO_INTERNET
} else if (reason.contains("broken pipe", ignoreCase = true)) {
CLOSED_BY_SERVER
} else if (reason.contains("connection refused", ignoreCase = true)) {
REFUSED
} else if (reason.contains("endpoint malformed", ignoreCase = true)) {
ENDPOINT_MALFORMED
} else if (
reason.contains("timeout", ignoreCase = true) ||
reason.contains("timed out", ignoreCase = true)
) {
TIMEOUT
} else {
UNKNOWN
}
}
}
}
4 changes: 4 additions & 0 deletions common/src/main/java/com/pedro/common/Extensions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -135,4 +135,8 @@ fun String.getIndexes(char: Char): Array<Int> {
val indexes = mutableListOf<Int>()
forEachIndexed { index, c -> if (c == char) indexes.add(index) }
return indexes.toTypedArray()
}

fun Throwable.validMessage(): String {
return (message ?: "").ifEmpty { javaClass.simpleName }
}
63 changes: 63 additions & 0 deletions common/src/main/java/com/pedro/common/socket/StreamSocket.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
*
* * Copyright (C) 2024 pedroSG94.
* *
* * Licensed under the Apache License, Version 2.0 (the "License");
* * you may not use this file except in compliance with the License.
* * You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/

package com.pedro.common.socket

import io.ktor.network.selector.SelectorManager
import io.ktor.network.sockets.ReadWriteSocket
import io.ktor.network.sockets.isClosed
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext
import java.net.InetAddress
import java.net.InetSocketAddress

/**
* Created by pedro on 22/9/24.
*/
abstract class StreamSocket(
private val host: String,
private val port: Int
) {

private var selectorManager = SelectorManager(Dispatchers.IO)
protected var socket: ReadWriteSocket? = null
private var address: InetAddress? = null

abstract suspend fun buildSocketConfigAndConnect(selectorManager: SelectorManager): ReadWriteSocket
abstract suspend fun closeResources()

suspend fun connect() {
selectorManager = SelectorManager(Dispatchers.IO)
val socket = buildSocketConfigAndConnect(selectorManager)
address = InetSocketAddress(host, port).address
this.socket = socket
}

suspend fun close() = withContext(Dispatchers.IO) {
try {
address = null
closeResources()
socket?.close()
selectorManager.close()
} catch (ignored: Exception) {}
}

fun isConnected(): Boolean = socket?.isClosed != true

fun isReachable(): Boolean = address?.isReachable(5000) ?: false
}
148 changes: 148 additions & 0 deletions common/src/main/java/com/pedro/common/socket/TcpStreamSocket.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
*
* * Copyright (C) 2024 pedroSG94.
* *
* * Licensed under the Apache License, Version 2.0 (the "License");
* * you may not use this file except in compliance with the License.
* * You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/

package com.pedro.common.socket

import io.ktor.network.selector.SelectorManager
import io.ktor.network.sockets.InetSocketAddress
import io.ktor.network.sockets.ReadWriteSocket
import io.ktor.network.sockets.aSocket
import io.ktor.network.sockets.openReadChannel
import io.ktor.network.sockets.openWriteChannel
import io.ktor.network.tls.tls
import io.ktor.utils.io.ByteReadChannel
import io.ktor.utils.io.ByteWriteChannel
import io.ktor.utils.io.readFully
import io.ktor.utils.io.readUTF8Line
import io.ktor.utils.io.writeByte
import io.ktor.utils.io.writeFully
import io.ktor.utils.io.writeStringUtf8
import java.net.ConnectException
import java.security.SecureRandom
import javax.net.ssl.TrustManager
import kotlin.coroutines.coroutineContext

/**
* Created by pedro on 22/9/24.
*/
class TcpStreamSocket(
private val host: String,
private val port: Int,
private val secured: Boolean = false,
private val certificate: TrustManager? = null
): StreamSocket(host, port) {

private val timeout = 5000L
private var input: ByteReadChannel? = null
private var output: ByteWriteChannel? = null

override suspend fun buildSocketConfigAndConnect(selectorManager: SelectorManager): ReadWriteSocket {
val builder = aSocket(selectorManager).tcp()
val socket = if (secured) {
builder.connect(remoteAddress = InetSocketAddress(host, port)) { socketTimeout = timeout }.tls(
coroutineContext = coroutineContext
) {
trustManager = certificate
random = SecureRandom()
}
} else {
builder.connect(host, port) { socketTimeout = timeout }
}
input = socket.openReadChannel()
output = socket.openWriteChannel(autoFlush = false)
return socket
}

override suspend fun closeResources() {
input = null
output = null
}

suspend fun flush() {
output?.flush()
}

suspend fun write(b: Int) {
output?.writeByte(b)
}

suspend fun write(b: ByteArray) {
output?.writeFully(b)
}

suspend fun write(b: ByteArray, offset: Int, size: Int) {
output?.writeFully(b, offset, size)
}

suspend fun writeUInt16(b: Int) {
output?.writeByte(b ushr 8)
output?.writeByte(b)
}

suspend fun writeUInt24(b: Int) {
output?.writeByte(b ushr 16)
output?.writeByte(b ushr 8)
output?.writeByte(b)
}

suspend fun writeUInt32(b: Int) {
output?.writeByte(b ushr 24)
output?.writeByte(b ushr 16)
output?.writeByte(b ushr 8)
output?.writeByte(b)
}

suspend fun writeUInt32LittleEndian(b: Int) {
writeUInt32(Integer.reverseBytes(b))
}

suspend fun read(): Int {
val input = input ?: throw ConnectException("Read with socket closed, broken pipe")
return input.readByte().toInt()
}

suspend fun readUInt16(): Int {
return read() and 0xff shl 8 or (read() and 0xff)
}

suspend fun readUInt24(): Int {
return read() and 0xff shl 16 or (read() and 0xff shl 8) or (read() and 0xff)
}

suspend fun readUInt32(): Int {
return read() and 0xff shl 24 or (read() and 0xff shl 16) or (read() and 0xff shl 8) or (read() and 0xff)
}

suspend fun readUInt32LittleEndian(): Int {
return Integer.reverseBytes(readUInt32())
}

suspend fun readUntil(b: ByteArray) {
val input = input ?: throw ConnectException("Read with socket closed, broken pipe")
return input.readFully(b)
}

suspend fun readLine(): String? {
val input = input ?: throw ConnectException("Read with socket closed, broken pipe")
return input.readUTF8Line()
}

suspend fun write(string: String) {
output?.writeStringUtf8(string)
}
}
70 changes: 70 additions & 0 deletions common/src/main/java/com/pedro/common/socket/UdpStreamSocket.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
*
* * Copyright (C) 2024 pedroSG94.
* *
* * Licensed under the Apache License, Version 2.0 (the "License");
* * you may not use this file except in compliance with the License.
* * You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/

package com.pedro.common.socket

import io.ktor.network.selector.SelectorManager
import io.ktor.network.sockets.ConnectedDatagramSocket
import io.ktor.network.sockets.Datagram
import io.ktor.network.sockets.InetSocketAddress
import io.ktor.network.sockets.ReadWriteSocket
import io.ktor.network.sockets.aSocket
import io.ktor.utils.io.core.ByteReadPacket
import io.ktor.utils.io.core.readBytes

/**
* Created by pedro on 22/9/24.
*/
class UdpStreamSocket(
host: String,
port: Int,
private val sourcePort: Int? = null,
private val receiveSize: Int? = null,
private val broadcastMode: Boolean = false
): StreamSocket(host, port) {

private val address = InetSocketAddress(host, port)
private val udpSocket by lazy {
socket as ConnectedDatagramSocket
}

override suspend fun buildSocketConfigAndConnect(selectorManager: SelectorManager): ReadWriteSocket {
val builder = aSocket(selectorManager).udp()
val localAddress = if (sourcePort == null) null else InetSocketAddress("0.0.0.0", sourcePort)
return builder.connect(
remoteAddress = address,
localAddress = localAddress
) {
broadcast = broadcastMode
receiveBufferSize = receiveSize ?: 0
}
}

override suspend fun closeResources() { }

suspend fun readPacket(): ByteArray {
val packet = udpSocket.receive().packet
val length = packet.remaining.toInt()
return packet.readBytes().sliceArray(0 until length)
}

suspend fun writePacket(bytes: ByteArray) {
val datagram = Datagram(ByteReadPacket(bytes), address)
udpSocket.send(datagram)
}
}
3 changes: 3 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ annotation = "1.8.2"
coroutines = "1.9.0"
junit = "4.13.2"
mockito = "5.4.0"
ktor = "2.3.12"
uvcandroid = "1.0.7"

[libraries]
Expand All @@ -31,6 +32,8 @@ androidx-multidex = { module = "androidx.multidex:multidex", version.ref = "mult
junit = { module = "junit:junit", version.ref = "junit" }
kotlinx-coroutines-android = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-android", version.ref = "coroutines" }
kotlinx-coroutines-test = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-test", version.ref = "coroutines" }
ktor-network = { module = "io.ktor:ktor-network", version.ref = "ktor" }
ktor-network-tls = { module = "io.ktor:ktor-network-tls", version.ref = "ktor" }
mockito-kotlin = { module = "org.mockito.kotlin:mockito-kotlin", version.ref = "mockito" }
uvcandroid = { module = "com.herohan:UVCAndroid", version.ref = "uvcandroid" }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class GenericStreamClient(
/**
* Add certificates for TLS connection
*/
fun addCertificates(certificates: Array<TrustManager>?) {
fun addCertificates(certificates: TrustManager?) {
rtmpClient.addCertificates(certificates)
rtspClient.addCertificates(certificates)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class RtmpStreamClient(
/**
* Add certificates for TLS connection
*/
fun addCertificates(certificates: Array<TrustManager>?) {
fun addCertificates(certificates: TrustManager?) {
rtmpClient.addCertificates(certificates)
}

Expand Down
Loading

0 comments on commit 8e1804d

Please sign in to comment.