diff --git a/common/src/main/java/com/pedro/common/base/BaseSender.kt b/common/src/main/java/com/pedro/common/base/BaseSender.kt index 95e6f97b2..8f05b3496 100644 --- a/common/src/main/java/com/pedro/common/base/BaseSender.kt +++ b/common/src/main/java/com/pedro/common/base/BaseSender.kt @@ -64,7 +64,7 @@ abstract class BaseSender( bitrateManager.reset() queue.clear() running = true - scope.launch { + job = scope.launch { val bitrateTask = async { while (scope.isActive && running) { //bytes to bits diff --git a/common/src/main/java/com/pedro/common/socket/TcpStreamSocket.kt b/common/src/main/java/com/pedro/common/socket/TcpStreamSocket.kt index ba64f76de..c85604d00 100644 --- a/common/src/main/java/com/pedro/common/socket/TcpStreamSocket.kt +++ b/common/src/main/java/com/pedro/common/socket/TcpStreamSocket.kt @@ -32,10 +32,11 @@ import io.ktor.utils.io.readUTF8Line import io.ktor.utils.io.writeByte import io.ktor.utils.io.writeFully import io.ktor.utils.io.writeStringUtf8 +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.withTimeout import java.net.ConnectException import java.security.SecureRandom import javax.net.ssl.TrustManager -import kotlin.coroutines.coroutineContext /** * Created by pedro on 22/9/24. @@ -54,14 +55,12 @@ class TcpStreamSocket( override suspend fun buildSocketConfigAndConnect(selectorManager: SelectorManager): ReadWriteSocket { val builder = aSocket(selectorManager).tcp() val socket = if (secured) { - builder.connect(remoteAddress = InetSocketAddress(host, port)) { socketTimeout = timeout }.tls( - coroutineContext = coroutineContext - ) { + builder.connect(remoteAddress = InetSocketAddress(host, port)).tls(Dispatchers.Default) { trustManager = certificate random = SecureRandom() } } else { - builder.connect(host, port) { socketTimeout = timeout } + builder.connect(host, port) } input = socket.openReadChannel() output = socket.openWriteChannel(autoFlush = false) @@ -77,15 +76,15 @@ class TcpStreamSocket( output?.flush() } - suspend fun write(b: Int) { + suspend fun write(b: Int) = withTimeout(timeout) { output?.writeByte(b) } - suspend fun write(b: ByteArray) { + suspend fun write(b: ByteArray) = withTimeout(timeout) { output?.writeFully(b) } - suspend fun write(b: ByteArray, offset: Int, size: Int) { + suspend fun write(b: ByteArray, offset: Int, size: Int) = withTimeout(timeout) { output?.writeFully(b, offset, size) } @@ -105,9 +104,13 @@ class TcpStreamSocket( writeUInt32(Integer.reverseBytes(b)) } - suspend fun read(): Int { + suspend fun write(string: String) = withTimeout(timeout) { + output?.writeStringUtf8(string) + } + + suspend fun read(): Int = withTimeout(timeout) { val input = input ?: throw ConnectException("Read with socket closed, broken pipe") - return input.readByte().toInt() + input.readByte().toInt() } suspend fun readUInt16(): Int { @@ -132,17 +135,13 @@ class TcpStreamSocket( return Integer.reverseBytes(readUInt32()) } - suspend fun readUntil(b: ByteArray) { + suspend fun readUntil(b: ByteArray) = withTimeout(timeout) { val input = input ?: throw ConnectException("Read with socket closed, broken pipe") - return input.readFully(b) + input.readFully(b) } - suspend fun readLine(): String? { + suspend fun readLine(): String? = withTimeout(timeout) { val input = input ?: throw ConnectException("Read with socket closed, broken pipe") - return input.readUTF8Line() - } - - suspend fun write(string: String) { - output?.writeStringUtf8(string) + input.readUTF8Line() } } \ No newline at end of file