Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: approve session proposal, receive session disconnect #1331

Merged
merged 6 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions buildSrc/src/main/kotlin/Dependencies.kt
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ fun DependencyHandlerScope.bouncyCastle() {

fun DependencyHandlerScope.sqlCipher() {
"api"("net.zetetic:android-database-sqlcipher:$sqlCipherVersion")
"api"("app.cash.sqldelight:async-extensions:2.0.0")
}

fun DependencyHandlerScope.reLinker() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,16 @@ import com.walletconnect.android.pairing.handler.PairingControllerInterface
import org.koin.dsl.module

fun corePairingModule(pairing: PairingInterface, pairingController: PairingControllerInterface) = module {
single { PairingEngine(get(), get(), get(), get(), get(), get()) }
single {
PairingEngine(
selfMetaData = get(),
crypto = get(),
metadataRepository = get(),
pairingRepository = get(),
jsonRpcInteractor = get(),
logger = get()
)
}
single { pairing }
single { pairingController }
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,23 @@ internal class JsonRpcInteractor(
return onFailure(e)
}

if (jsonRpcHistory.setRequest(payload.id, topic, payload.method, requestJson)) {
val encryptedRequest = chaChaPolyCodec.encrypt(topic, requestJson, envelopeType, participants)

relay.publish(topic.value, encryptedRequest, params.toRelay()) { result ->
result.fold(
onSuccess = { onSuccess() },
onFailure = { error -> onFailure(error) }
)
try {
if (jsonRpcHistory.setRequest(payload.id, topic, payload.method, requestJson)) {
val encryptedRequest = chaChaPolyCodec.encrypt(topic, requestJson, envelopeType, participants)

relay.publish(topic.value, encryptedRequest, params.toRelay()) { result ->
result.fold(
onSuccess = { onSuccess() },
onFailure = { error ->
logger.error("JsonRpcInteractor: Cannot send the request, error: $error")
onFailure(error)
}
)
}
}
} catch (e: Exception) {
logger.error("JsonRpcInteractor: Cannot send the request, exception: $e")
return onFailure(e)
}
}

Expand Down Expand Up @@ -131,10 +139,14 @@ internal class JsonRpcInteractor(
jsonRpcHistory.updateRequestWithResponse(response.id, responseJson)
onSuccess()
},
onFailure = { error -> onFailure(error) }
onFailure = { error ->
logger.error("JsonRpcInteractor: Cannot send the response, error: $error")
onFailure(error)
}
)
}
} catch (e: Exception) {
logger.error("JsonRpcInteractor: Cannot send the response, exception: $e")
return onFailure(e)
}
}
Expand All @@ -151,10 +163,7 @@ internal class JsonRpcInteractor(
val result = JsonRpcResponse.JsonRpcResult(id = request.id, result = clientParams)

publishJsonRpcResponse(request.topic, irnParams, result, envelopeType = envelopeType, participants = participants,
onFailure = { error ->
logger.error("Cannot send the response, error: $error")
onFailure(error)
},
onFailure = { error -> onFailure(error) },
onSuccess = { onSuccess() }
)
}
Expand All @@ -172,10 +181,7 @@ internal class JsonRpcInteractor(
val result = JsonRpcResponse.JsonRpcResult(id = requestId, result = clientParams)

publishJsonRpcResponse(topic, irnParams, result, envelopeType = envelopeType, participants = participants,
onFailure = { error ->
logger.error("Cannot send the response, error: $error")
onFailure(error)
},
onFailure = { error -> onFailure(error) },
onSuccess = { onSuccess() }
)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.walletconnect.android.internal.common.storage.pairing

import android.database.sqlite.SQLiteException
import app.cash.sqldelight.async.coroutines.awaitAsList
import com.walletconnect.android.internal.common.model.AppMetaData
import com.walletconnect.android.internal.common.model.Expiry
import com.walletconnect.android.internal.common.model.Pairing
Expand Down Expand Up @@ -35,7 +36,16 @@ class PairingStorageRepository(private val pairingQueries: PairingQueries) : Pai
override fun hasTopic(topic: Topic): Boolean = pairingQueries.hasTopic(topic = topic.value).executeAsOneOrNull() != null

@Throws(SQLiteException::class)
override fun getListOfPairings(): List<Pairing> = pairingQueries.getListOfPairing(mapper = this::toPairing).executeAsList()
override suspend fun getListOfPairings(): List<Pairing> = pairingQueries.getListOfPairing(mapper = this::toPairing).awaitAsList()

@Throws(SQLiteException::class)
override suspend fun getListOfInactivePairings(): List<Pairing> = pairingQueries.getListOfInactivePairings(mapper = this::toPairing).awaitAsList()

@Throws(SQLiteException::class)
override suspend fun getListOfActivePairings(): List<Pairing> = pairingQueries.getListOfActivePairings(mapper = this::toPairing).awaitAsList()

@Throws(SQLiteException::class)
override suspend fun getListOfInactivePairingsWithoutRequestReceived(): List<Pairing> = pairingQueries.getListOfInactivePairingsWithoutRequestReceived(mapper = this::toPairing).awaitAsList()

@Throws(SQLiteException::class)
override fun activatePairing(topic: Topic) = pairingQueries.activatePairing(expiry = activePairing, is_active = true, topic = topic.value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,13 @@ interface PairingStorageRepositoryInterface {

fun hasTopic(topic: Topic): Boolean

fun getListOfPairings(): List<Pairing>
suspend fun getListOfPairings(): List<Pairing>

suspend fun getListOfInactivePairings(): List<Pairing>

suspend fun getListOfActivePairings(): List<Pairing>

suspend fun getListOfInactivePairingsWithoutRequestReceived(): List<Pairing>

fun activatePairing(topic: Topic)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ import kotlinx.coroutines.flow.merge
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.shareIn
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.supervisorScope
import kotlinx.coroutines.withTimeout
import java.util.concurrent.TimeUnit
Expand Down Expand Up @@ -97,7 +98,8 @@ internal class PairingEngine(
init {
setOfRegisteredMethods.addAll(listOf(PairingJsonRpcMethod.WC_PAIRING_DELETE, PairingJsonRpcMethod.WC_PAIRING_PING))
resubscribeToPairingTopics()
pairingExpiryWatcher()
inactivePairingsExpiryWatcher()
activePairingsExpiryWatcher()
isPairingStateWatcher()
}

Expand All @@ -120,7 +122,7 @@ internal class PairingEngine(
val inactivePairing = Pairing(pairingTopic, relay, symmetricKey, registeredMethods, Expiry(inactivePairing))

return inactivePairing.runCatching {
logger.log("Pairing created successfully")
logger.log("Creating Pairing")
pairingRepository.insertPairing(this)
metadataRepository.upsertPeerMetadata(this.topic, selfMetaData, AppMetaDataType.SELF)
jsonRpcInteractor.subscribe(this.topic,
Expand Down Expand Up @@ -200,11 +202,12 @@ internal class PairingEngine(
val deleteParams = PairingParams.DeleteParams(6000, "User disconnected")
val pairingDelete = PairingRpc.PairingDelete(params = deleteParams)
val irnParams = IrnParams(Tags.PAIRING_DELETE, Ttl(dayInSeconds))
logger.log("Sending Pairing disconnect")
jsonRpcInteractor.publishJsonRpcRequest(Topic(topic), irnParams, pairingDelete,
onSuccess = {
scope.launch {
supervisorScope {
logger.log("Disconnect sent successfully")
logger.log("Pairing disconnect sent successfully")
pairingRepository.deletePairing(Topic(topic))
metadataRepository.deleteMetaData(Topic(topic))
jsonRpcInteractor.unsubscribe(Topic(topic))
Expand Down Expand Up @@ -232,7 +235,7 @@ internal class PairingEngine(
}
}

fun getPairings(): List<Pairing> = pairingRepository.getListOfPairings().filter { pairing -> pairing.isNotExpired() }
fun getPairings(): List<Pairing> = runBlocking { pairingRepository.getListOfPairings().filter { pairing -> pairing.isNotExpired() } }

fun register(vararg method: String) {
setOfRegisteredMethods.addAll(method)
Expand Down Expand Up @@ -266,7 +269,7 @@ internal class PairingEngine(
.onEach {
supervisorScope {
launch(Dispatchers.IO) {
resubscribeToPairingFlow()
resubscribeToPairing()
}
}

Expand All @@ -276,35 +279,56 @@ internal class PairingEngine(
}.launchIn(scope)
}

private fun pairingExpiryWatcher() {
flow {
while (true) {
emit(Unit)
delay(WATCHER_INTERVAL)
}
}.onEach {
pairingRepository
.getListOfPairings()
.onEach { pairing -> pairing.isNotExpired() }
}.launchIn(scope)
private fun inactivePairingsExpiryWatcher() {
repeatableFlow(WATCHER_INTERVAL)
.onEach {
try {
pairingRepository.getListOfInactivePairings()
.onEach { pairing ->
pairing.isNotExpired()
}
} catch (e: Exception) {
logger.error(e)
}
}.launchIn(scope)
}

private fun activePairingsExpiryWatcher() {
repeatableFlow(ACTIVE_PAIRINGS_WATCHER_INTERVAL)
.onEach {
try {
pairingRepository.getListOfActivePairings()
.onEach { pairing ->
pairing.isNotExpired()
}
} catch (e: Exception) {
logger.error(e)
}
}.launchIn(scope)
}


private fun isPairingStateWatcher() {
flow {
while (true) {
emit(Unit)
delay(WATCHER_INTERVAL)
}
}.onEach {
val inactivePairings = pairingRepository
.getListOfPairings()
.filter { pairing -> !pairing.isActive && !pairing.isProposalReceived }
if (inactivePairings.isNotEmpty()) {
_isPairingStateFlow.compareAndSet(expect = false, update = true)
} else {
_isPairingStateFlow.compareAndSet(expect = true, update = false)
}
}.launchIn(scope)
repeatableFlow(WATCHER_INTERVAL)
.onEach {
try {
val inactivePairings = pairingRepository.getListOfInactivePairingsWithoutRequestReceived()
if (inactivePairings.isNotEmpty()) {
_isPairingStateFlow.compareAndSet(expect = false, update = true)
} else {
_isPairingStateFlow.compareAndSet(expect = true, update = false)
}
} catch (e: Exception) {
logger.error(e)
}
}.launchIn(scope)
}

private fun repeatableFlow(interval: Long) = flow {
while (true) {
emit(Unit)
delay(interval)
}
}

private fun collectJsonRpcRequestsFlow(): Job =
Expand All @@ -317,9 +341,9 @@ internal class PairingEngine(
}
}.launchIn(scope)

private fun resubscribeToPairingFlow() {
private fun resubscribeToPairing() {
try {
val pairingTopics = pairingRepository.getListOfPairings().filter { pairing -> pairing.isNotExpired() }.map { pairing -> pairing.topic.value }
val pairingTopics = runBlocking { pairingRepository.getListOfPairings().filter { pairing -> pairing.isNotExpired() }.map { pairing -> pairing.topic.value } }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does this need to be blocking? I think we should think about going the other way and launch a coroutine in the init block and make this function a suspend function to run async. We're already launching coroutines inside of this function

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, good point. Made this function a suspend function.

jsonRpcInteractor.batchSubscribe(pairingTopics) { error -> scope.launch { internalErrorFlow.emit(SDKError(error)) } }
} catch (e: Exception) {
scope.launch { internalErrorFlow.emit(SDKError(e)) }
Expand Down Expand Up @@ -416,6 +440,7 @@ internal class PairingEngine(
pairingRepository.getPairingOrNullByTopic(Topic(topic))?.let { pairing -> return@let pairing.isNotExpired() } ?: false

companion object {
private const val WATCHER_INTERVAL = 3000L
private const val WATCHER_INTERVAL = 5000L
private const val ACTIVE_PAIRINGS_WATCHER_INTERVAL = 600000L //10mins
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,24 @@ SELECT pd.topic, pd.expiry, pd.relay_protocol, pd.relay_data, pd.uri, pd.methods
FROM Pairing pd
LEFT JOIN MetaData mdd_peer ON pd.topic = mdd_peer.sequence_topic AND mdd_peer.type = "PEER";

getListOfInactivePairingsWithoutRequestReceived:
SELECT pd.topic, pd.expiry, pd.relay_protocol, pd.relay_data, pd.uri, pd.methods, pd.is_active, pd.is_proposal_received, mdd_peer.name, mdd_peer.description, mdd_peer.url, mdd_peer.icons, mdd_peer.native
FROM Pairing pd
LEFT JOIN MetaData mdd_peer ON pd.topic = mdd_peer.sequence_topic AND mdd_peer.type = "PEER"
WHERE pd.is_active = 0 AND pd.is_proposal_received = 0;

getListOfInactivePairings:
SELECT pd.topic, pd.expiry, pd.relay_protocol, pd.relay_data, pd.uri, pd.methods, pd.is_active, pd.is_proposal_received, mdd_peer.name, mdd_peer.description, mdd_peer.url, mdd_peer.icons, mdd_peer.native
FROM Pairing pd
LEFT JOIN MetaData mdd_peer ON pd.topic = mdd_peer.sequence_topic AND mdd_peer.type = "PEER"
WHERE pd.is_active = 0;

getListOfActivePairings:
SELECT pd.topic, pd.expiry, pd.relay_protocol, pd.relay_data, pd.uri, pd.methods, pd.is_active, pd.is_proposal_received, mdd_peer.name, mdd_peer.description, mdd_peer.url, mdd_peer.icons, mdd_peer.native
FROM Pairing pd
LEFT JOIN MetaData mdd_peer ON pd.topic = mdd_peer.sequence_topic AND mdd_peer.type = "PEER"
WHERE pd.is_active = 1;

getPairingByTopic:
SELECT pd.topic, pd.expiry, pd.relay_protocol, pd.relay_data, pd.uri, pd.methods, pd.is_active, pd.is_proposal_received, mdd_peer.name, mdd_peer.description, mdd_peer.url, mdd_peer.icons, mdd_peer.native
FROM Pairing pd
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ internal fun engineModule() = module {
respondSessionRequestUseCase = get(),
sessionRequestUseCase = get(),
sessionUpdateUseCase = get(),
deleteRequestByIdUseCase = get()
deleteRequestByIdUseCase = get(),
logger = get(named(AndroidCommonDITags.LOGGER))
)
}
}
Loading
Loading