From cb03bf7d897b8e835fbd53a6f46bd032be4e16e8 Mon Sep 17 00:00:00 2001 From: Stefano Franz Date: Fri, 3 Dec 2021 15:02:19 +0100 Subject: [PATCH] make loading more async --- freighter-tests/build.gradle | 2 +- .../testing/NullHolderOnObserverTest.kt | 10 +- .../SelectionUtilities.kt | 5 + .../memory/config/InMemorySelectionConfig.kt | 105 +-- .../memory/services/VaultWatcherService.kt | 642 ++++++++++-------- 5 files changed, 411 insertions(+), 353 deletions(-) diff --git a/freighter-tests/build.gradle b/freighter-tests/build.gradle index 8c31e3bd..69a1a380 100644 --- a/freighter-tests/build.gradle +++ b/freighter-tests/build.gradle @@ -37,7 +37,7 @@ configurations { dependencies { freighterTestCompile "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version" - freighterTestCompile "freighter:freighter-testing-core-junit5:0.7.3-TEST-SNAPSHOT" + freighterTestCompile "freighter:freighter-testing-core-junit5:0.9.0-SNAPSHOT" freighterTestCompile project(":contracts") freighterTestCompile project(":workflows") diff --git a/freighter-tests/src/freighterTest/kotlin/freighter/testing/NullHolderOnObserverTest.kt b/freighter-tests/src/freighterTest/kotlin/freighter/testing/NullHolderOnObserverTest.kt index deacd504..ef3bdc87 100644 --- a/freighter-tests/src/freighterTest/kotlin/freighter/testing/NullHolderOnObserverTest.kt +++ b/freighter-tests/src/freighterTest/kotlin/freighter/testing/NullHolderOnObserverTest.kt @@ -67,12 +67,6 @@ class NullHolderOnObserverTest : DockerRemoteMachineBasedTest() { runTokensOnNodeRunningDatabase(DeploymentMachineProvider.DatabaseType.MS_SQL) } - @Test - @OracleTest - fun `tokens can be observed on node that does not know CI running oracle 12 r2`() { - runTokensOnNodeRunningDatabase(DeploymentMachineProvider.DatabaseType.ORACLE_12_R2) - } - private fun runTokensOnNodeRunningDatabase(db: DeploymentMachineProvider.DatabaseType) { val randomString = generateRandomString() val deploymentContext = DeploymentContext(machineProvider, nms, artifactoryUsername, artifactoryPassword) @@ -83,7 +77,7 @@ class NullHolderOnObserverTest : DockerRemoteMachineBasedTest() { .withCordapp(modernCiV1) .withCordapp(freighterHelperCordapp) .withDatabase(machineProvider.requestDatabase(db)) - ).withVersion(UnitOfDeployment.CORDA_4_6) + ).withVersion(UnitOfDeployment.CORDA_4_7) .deploy(deploymentContext) val node2 = SingleNodeDeployment( @@ -93,7 +87,7 @@ class NullHolderOnObserverTest : DockerRemoteMachineBasedTest() { .withCordapp(modernCiV1) .withCordapp(freighterHelperCordapp) .withDatabase(machineProvider.requestDatabase(db)) - ).withVersion(UnitOfDeployment.CORDA_4_6) + ).withVersion(UnitOfDeployment.CORDA_4_7) .deploy(deploymentContext) val nodeMachine1 = node1.getOrThrow().nodeMachines.single() diff --git a/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/SelectionUtilities.kt b/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/SelectionUtilities.kt index 4cbea217..405cdfa2 100644 --- a/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/SelectionUtilities.kt +++ b/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/SelectionUtilities.kt @@ -19,6 +19,11 @@ internal fun sortByStateRefAscending(): Sort { return Sort(setOf(Sort.SortColumn(sortAttribute, Sort.Direction.ASC))) } +internal fun sortByTimeStampAscending(): Sort { + val sortAttribute = SortAttribute.Standard(Sort.VaultStateAttribute.RECORDED_TIME) + return Sort(setOf(Sort.SortColumn(sortAttribute, Sort.Direction.ASC))) +} + // Returns all held token amounts of a specified token with given issuer. // We need to discriminate on the token type as well as the symbol as different tokens might use the same symbols. @Suspendable diff --git a/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/config/InMemorySelectionConfig.kt b/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/config/InMemorySelectionConfig.kt index cf8a1b65..79b814f5 100644 --- a/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/config/InMemorySelectionConfig.kt +++ b/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/config/InMemorySelectionConfig.kt @@ -9,63 +9,68 @@ import net.corda.core.cordapp.CordappConfigException import net.corda.core.node.ServiceHub import org.slf4j.LoggerFactory -const val CACHE_SIZE_DEFAULT = 1024 // TODO Return good default, for now it's not wired, it will be done in separate PR. +const val CACHE_SIZE_DEFAULT = 1024 +const val PAGE_SIZE_DEFAULT = 1024 -data class InMemorySelectionConfig @JvmOverloads constructor(val enabled: Boolean, - val indexingStrategies: List, - val cacheSize: Int = CACHE_SIZE_DEFAULT) : StateSelectionConfig { - companion object { - private val logger = LoggerFactory.getLogger("inMemoryConfigSelectionLogger") +data class InMemorySelectionConfig @JvmOverloads constructor( + val enabled: Boolean, + val indexingStrategies: List, + val cacheSize: Int = CACHE_SIZE_DEFAULT, + val pageSize: Int = 1000 +) : StateSelectionConfig { + companion object { + private val logger = LoggerFactory.getLogger("inMemoryConfigSelectionLogger") - @JvmStatic - fun parse(config: CordappConfig): InMemorySelectionConfig { - val enabled = if (!config.exists("stateSelection.inMemory.enabled")) { - logger.warn("Did not detect a configuration for InMemory selection - enabling memory usage for token indexing. Please set stateSelection.inMemory.enabled to \"false\" to disable this") - true - } else { - config.getBoolean("stateSelection.inMemory.enabled") - } - val cacheSize = config.getIntOrNull("stateSelection.inMemory.cacheSize") - ?: CACHE_SIZE_DEFAULT - val indexingType = try { - (config.get("stateSelection.inMemory.indexingStrategies") as List).map { VaultWatcherService.IndexingType.valueOf(it.toString()) } - } catch (e: CordappConfigException) { - logger.warn("No indexing method specified. Indexes will be created at run-time for each invocation of selectTokens") - emptyList() - } catch (e: ClassCastException) { - logger.warn("No indexing method specified. Indexes will be created at run-time for each invocation of selectTokens") - emptyList() - } - logger.info("Found in memory token selection configuration with values indexing strategy: $indexingType, cacheSize: $cacheSize") - return InMemorySelectionConfig(enabled, indexingType, cacheSize) - } + @JvmStatic + fun parse(config: CordappConfig): InMemorySelectionConfig { + val enabled = if (!config.exists("stateSelection.inMemory.enabled")) { + logger.warn("Did not detect a configuration for InMemory selection - enabling memory usage for token indexing. Please set stateSelection.inMemory.enabled to \"false\" to disable this") + true + } else { + config.getBoolean("stateSelection.inMemory.enabled") + } + val cacheSize = config.getIntOrNull("stateSelection.inMemory.cacheSize") + ?: CACHE_SIZE_DEFAULT + val pageSize: Int = config.getIntOrNull("stateSelection.inMemory.cacheSize")?: PAGE_SIZE_DEFAULT + val indexingType = try { + (config.get("stateSelection.inMemory.indexingStrategies") as List).map { VaultWatcherService.IndexingType.valueOf(it.toString()) } + } catch (e: CordappConfigException) { + logger.warn("No indexing method specified. Indexes will be created at run-time for each invocation of selectTokens") + emptyList() + } catch (e: ClassCastException) { + logger.warn("No indexing method specified. Indexes will be created at run-time for each invocation of selectTokens") + emptyList() + } + logger.info("Found in memory token selection configuration with values indexing strategy: $indexingType, cacheSize: $cacheSize") + return InMemorySelectionConfig(enabled, indexingType, cacheSize, pageSize) + } - fun defaultConfig(): InMemorySelectionConfig { - return InMemorySelectionConfig(true, emptyList()) - } - } + fun defaultConfig(): InMemorySelectionConfig { + return InMemorySelectionConfig(true, emptyList()) + } + } - @Suspendable - override fun toSelector(services: ServiceHub): LocalTokenSelector { - return try { - val vaultObserver = services.cordaService(VaultWatcherService::class.java) - LocalTokenSelector(services, vaultObserver, state = null) - } catch (e: IllegalArgumentException) { - throw IllegalArgumentException("Couldn't find VaultWatcherService in CordaServices, please make sure that it was installed in node.") - } - } + @Suspendable + override fun toSelector(services: ServiceHub): LocalTokenSelector { + return try { + val vaultObserver = services.cordaService(VaultWatcherService::class.java) + LocalTokenSelector(services, vaultObserver, state = null) + } catch (e: IllegalArgumentException) { + throw IllegalArgumentException("Couldn't find VaultWatcherService in CordaServices, please make sure that it was installed in node.") + } + } } // Helpers for configuration parsing. fun CordappConfig.getIntOrNull(path: String): Int? { - return try { - getInt(path) - } catch (e: CordappConfigException) { - if (exists(path)) { - throw IllegalArgumentException("Provide correct database selection configuration for config path: $path") - } else { - null - } - } + return try { + getInt(path) + } catch (e: CordappConfigException) { + if (exists(path)) { + throw IllegalArgumentException("Provide correct database selection configuration for config path: $path") + } else { + null + } + } } diff --git a/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/services/VaultWatcherService.kt b/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/services/VaultWatcherService.kt index dad687c0..c6ef7398 100644 --- a/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/services/VaultWatcherService.kt +++ b/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/services/VaultWatcherService.kt @@ -4,18 +4,22 @@ import com.r3.corda.lib.tokens.contracts.states.FungibleToken import com.r3.corda.lib.tokens.contracts.types.IssuedTokenType import com.r3.corda.lib.tokens.contracts.types.TokenType import com.r3.corda.lib.tokens.contracts.utilities.withoutIssuer -import com.r3.corda.lib.tokens.selection.memory.config.InMemorySelectionConfig import com.r3.corda.lib.tokens.selection.InsufficientBalanceException import com.r3.corda.lib.tokens.selection.InsufficientNotLockedBalanceException +import com.r3.corda.lib.tokens.selection.memory.config.InMemorySelectionConfig import com.r3.corda.lib.tokens.selection.memory.internal.Holder import com.r3.corda.lib.tokens.selection.memory.internal.lookupExternalIdFromKey import com.r3.corda.lib.tokens.selection.sortByStateRefAscending +import com.r3.corda.lib.tokens.selection.sortByTimeStampAscending +import io.github.classgraph.ClassGraph +import io.github.classgraph.ScanResult import net.corda.core.contracts.Amount import net.corda.core.contracts.StateAndRef import net.corda.core.internal.uncheckedCast import net.corda.core.node.AppServiceHub import net.corda.core.node.services.CordaService import net.corda.core.node.services.Vault +import net.corda.core.node.services.queryBy import net.corda.core.node.services.vault.DEFAULT_PAGE_NUM import net.corda.core.node.services.vault.PageSpecification import net.corda.core.node.services.vault.QueryCriteria @@ -26,312 +30,362 @@ import java.time.Duration import java.util.concurrent.* import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.locks.ReentrantReadWriteLock +import java.util.function.Function +import java.util.function.Supplier import kotlin.concurrent.read +import kotlin.concurrent.thread import kotlin.concurrent.write -val UPDATER: ScheduledExecutorService = Executors.newSingleThreadScheduledExecutor() val EMPTY_BUCKET = TokenBucket() const val PLACE_HOLDER: String = "THIS_IS_A_PLACE_HOLDER" @CordaService -class VaultWatcherService(private val tokenObserver: TokenObserver, - private val providedConfig: InMemorySelectionConfig) : SingletonSerializeAsToken() { - - private val __backingMap: ConcurrentMap, String> = ConcurrentHashMap() - private val __indexed: ConcurrentMap, ConcurrentMap> = ConcurrentHashMap( - providedConfig.indexingStrategies.map { it.ownerType to ConcurrentHashMap() }.toMap() - ) - - private val indexViewCreationLock: ReentrantReadWriteLock = ReentrantReadWriteLock() - - enum class IndexingType(val ownerType: Class) { - - EXTERNAL_ID(Holder.MappedIdentity::class.java), - PUBLIC_KEY(Holder.KeyIdentity::class.java); - - companion object { - fun fromHolder(holder: Class): IndexingType { - return when (holder) { - Holder.MappedIdentity::class.java -> { - EXTERNAL_ID - } - - Holder.KeyIdentity::class.java -> { - PUBLIC_KEY; - } - else -> throw IllegalArgumentException("Unknown Holder type: $holder") - } - } - } - - } - - constructor(appServiceHub: AppServiceHub) : this(getObservableFromAppServiceHub(appServiceHub), InMemorySelectionConfig.parse(appServiceHub.getAppContext().config)) - - companion object { - val LOG = contextLogger() - - private fun getObservableFromAppServiceHub(appServiceHub: AppServiceHub): TokenObserver { - val config = appServiceHub.cordappProvider.getAppContext().config - val configOptions: InMemorySelectionConfig = InMemorySelectionConfig.parse(config) - - if (!configOptions.enabled) { - LOG.info("Disabling inMemory token selection - refer to documentation on how to enable") - return TokenObserver(emptyList(), Observable.empty(), { _, _ -> - Holder.UnmappedIdentity() - }) - } - - val ownerProvider: (StateAndRef, IndexingType) -> Holder = { token, indexingType -> - when (indexingType) { - IndexingType.PUBLIC_KEY -> Holder.KeyIdentity(token.state.data.holder.owningKey) - IndexingType.EXTERNAL_ID -> { - val owningKey = token.state.data.holder.owningKey - lookupExternalIdFromKey(owningKey, appServiceHub) - } - } - } - - - val pageSize = 1000 - var currentPage = DEFAULT_PAGE_NUM - val (_, vaultObservable) = appServiceHub.vaultService.trackBy( - contractStateType = FungibleToken::class.java, - paging = PageSpecification(pageNumber = currentPage, pageSize = pageSize), - criteria = QueryCriteria.VaultQueryCriteria(status = Vault.StateStatus.ALL), - sorting = sortByStateRefAscending()) - - // we use the UPDATER thread for two reasons - // 1 this means we return the service before all states are loaded, and so do not hold up the node startup - // 2 because all updates to the cache (addition / removal) are also done via UPDATER, this means that until we have finished loading all updates are buffered preventing out of order updates - val asyncLoader = object : ((Vault.Update) -> Unit) -> Unit { - override fun invoke(callback: (Vault.Update) -> Unit) { - LOG.info("Starting async token loading from vault") - UPDATER.submit { - try { - var shouldLoop = true - while (shouldLoop) { - val newlyLoadedStates = appServiceHub.vaultService.queryBy( - contractStateType = FungibleToken::class.java, - paging = PageSpecification(pageNumber = currentPage, pageSize = pageSize), - criteria = QueryCriteria.VaultQueryCriteria(), - sorting = sortByStateRefAscending() - ).states.toSet() - LOG.info("publishing ${newlyLoadedStates.size} to async state loading callback") - callback(Vault.Update(emptySet(), newlyLoadedStates)) - shouldLoop = newlyLoadedStates.isNotEmpty() - LOG.debug("shouldLoop=${shouldLoop}") - currentPage++ - } - LOG.info("finished token loading") - } catch (t: Throwable) { - LOG.error("Token Loading Failed due to: ", t) - } - } - } - } - return TokenObserver(emptyList(), uncheckedCast(vaultObservable), ownerProvider, asyncLoader) - } - } - - init { - addTokensToCache(tokenObserver.initialValues) - tokenObserver.source.doOnError { - LOG.error("received error from observable", it) - } - tokenObserver.startLoading(::onVaultUpdate) - tokenObserver.source.subscribe(::onVaultUpdate) - } - - private fun processToken(token: StateAndRef, indexingType: IndexingType): TokenIndex { - val owner = tokenObserver.ownerProvider(token, indexingType) - val type = token.state.data.amount.token.tokenType.tokenClass - val typeId = token.state.data.amount.token.tokenType.tokenIdentifier - return TokenIndex(owner, type, typeId) - } - - private fun onVaultUpdate(t: Vault.Update) { - LOG.info("received token vault update with ${t.consumed.size} consumed states and: ${t.produced.size} produced states") - try { - removeTokensFromCache(t.consumed) - addTokensToCache(t.produced) - } catch (t: Throwable) { - //we DO NOT want to kill the observable - as a single exception will terminate the feed - LOG.error("Failure during token cache update", t) - } - } - - private fun removeTokensFromCache(stateAndRefs: Collection>) { - indexViewCreationLock.read { - for (stateAndRef in stateAndRefs) { - val existingMark = __backingMap.remove(stateAndRef) - existingMark - ?: LOG.warn("Attempted to remove existing token ${stateAndRef.ref}, but it was not found this suggests incorrect vault behaviours") - for (key in __indexed.keys) { - val index = processToken(stateAndRef, IndexingType.fromHolder(key)) - val indexedViewForHolder = __indexed[key] - indexedViewForHolder - ?: LOG.warn("tried to obtain an indexed view for holder type: $key but was not found in set of indexed views") - - val bucketForIndex: TokenBucket? = indexedViewForHolder?.get(index) - bucketForIndex?.remove(stateAndRef) - } - } - } - } - - private fun addTokensToCache(stateAndRefs: Collection>) { - indexViewCreationLock.read { - for (stateAndRef in stateAndRefs) { - val existingMark = __backingMap.putIfAbsent(stateAndRef, PLACE_HOLDER) - existingMark?.let { - LOG.warn("Attempted to overwrite existing token ${stateAndRef.ref}, this suggests incorrect vault behaviours") - } - for (key in __indexed.keys) { - val index = processToken(stateAndRef, IndexingType.fromHolder(key)) - val indexedViewForHolder = __indexed[key] - ?: throw IllegalStateException("tried to obtain an indexed view for holder type: $key but was not found in set of indexed views") - val bucketForIndex: TokenBucket = indexedViewForHolder.computeIfAbsent(index) { - TokenBucket() - } - bucketForIndex.add(stateAndRef) - } - } - } - } - - private fun getOrCreateIndexViewForHolderType(holderType: Class): ConcurrentMap { - return __indexed[holderType] ?: indexViewCreationLock.write { - __indexed[holderType] ?: generateNewIndexedView(holderType) - } - } - - private fun generateNewIndexedView(holderType: Class): ConcurrentMap { - val indexedViewForHolder: ConcurrentMap = ConcurrentHashMap() - for (stateAndRef in __backingMap.keys) { - val index = processToken(stateAndRef, IndexingType.fromHolder(holderType)) - val bucketForIndex: TokenBucket = indexedViewForHolder.computeIfAbsent(index) { - TokenBucket() - } - bucketForIndex.add(stateAndRef) - } - __indexed[holderType] = indexedViewForHolder - return indexedViewForHolder - } - - fun lockTokensExternal(list: List>, knownSelectionId: String) { - list.forEach { - __backingMap.replace(it, PLACE_HOLDER, knownSelectionId) - } - } - - fun selectTokens( - owner: Holder, - requiredAmount: Amount, - predicate: ((StateAndRef) -> Boolean) = { true }, - allowShortfall: Boolean = false, - autoUnlockDelay: Duration = Duration.ofMinutes(5), - selectionId: String - ): List> { - //we have to handle both cases - //1 when passed a raw TokenType - it's likely that the selecting entity does not care about the issuer and so we cannot constrain all selections to using IssuedTokenType - //2 when passed an IssuedTokenType - it's likely that the selecting entity does care about the issuer, and so we must filter all tokens which do not match the issuer. - val enrichedPredicate: AtomicReference<(StateAndRef) -> Boolean> = AtomicReference(if (requiredAmount.token is IssuedTokenType) { - val issuer = (requiredAmount.token as IssuedTokenType).issuer - { token -> - predicate(token) && token.state.data.issuer == issuer - } - } else { - predicate - }) - - val lockedTokens = mutableListOf>() - val bucket: Iterable> = if (owner is Holder.TokenOnly) { - val currentPredicate = enrichedPredicate.get() - //why do we do this? It doesn't really make sense to index on token type, as it's very likely that there will be very few types of tokens in a given vault - //so instead of relying on an indexed view, we can create a predicate on the fly which will constrain the selection to the correct token type - //we will revisit in future if this assumption turns out to be wrong - enrichedPredicate.set { - val stateTokenType = it.state.data.tokenType - currentPredicate(it) && - stateTokenType.fractionDigits == requiredAmount.token.fractionDigits && - requiredAmount.token.tokenClass == stateTokenType.tokenClass && - requiredAmount.token.tokenIdentifier == stateTokenType.tokenIdentifier - } - __backingMap.keys - } else { - val indexedView = getOrCreateIndexViewForHolderType(owner.javaClass) - getTokenBucket(owner, requiredAmount.token.tokenClass, requiredAmount.token.tokenIdentifier, indexedView) - } - - val requiredAmountWithoutIssuer = requiredAmount.withoutIssuer() - var amountLocked: Amount = requiredAmountWithoutIssuer.copy(quantity = 0) - // this is the running total of soft locked tokens that we encounter until the target token amount is reached - var amountAlreadySoftLocked: Amount = requiredAmountWithoutIssuer.copy(quantity = 0) - val finalPredicate = enrichedPredicate.get() - for (tokenStateAndRef in bucket) { - // Does the token satisfy the (optional) predicate eg. issuer? - if (finalPredicate.invoke(tokenStateAndRef)) { - val tokenAmount = uncheckedCast(tokenStateAndRef.state.data.amount.withoutIssuer()) - // if so, race to lock the token, expected oldValue = PLACE_HOLDER - if (__backingMap.replace(tokenStateAndRef, PLACE_HOLDER, selectionId)) { - // we won the race to lock this token - lockedTokens.add(tokenStateAndRef) - amountLocked += tokenAmount - if (amountLocked >= requiredAmountWithoutIssuer) { - break - } - } else { - amountAlreadySoftLocked += tokenAmount - } - } - } - - if (!allowShortfall && amountLocked < requiredAmountWithoutIssuer) { - lockedTokens.forEach { - unlockToken(it, selectionId) - } - if (amountLocked + amountAlreadySoftLocked < requiredAmountWithoutIssuer) { - throw InsufficientBalanceException("Insufficient spendable states identified for $requiredAmount.") - } else { - throw InsufficientNotLockedBalanceException("Insufficient not-locked spendable states identified for $requiredAmount.") - } - } - - UPDATER.schedule({ - lockedTokens.forEach { - unlockToken(it, selectionId) - } - }, autoUnlockDelay.toMillis(), TimeUnit.MILLISECONDS) - - return uncheckedCast(lockedTokens) - } - - fun unlockToken(it: StateAndRef, selectionId: String) { - __backingMap.replace(it, selectionId, PLACE_HOLDER) - } - - private fun getTokenBucket(idx: Holder, - tokenClass: Class<*>, - tokenIdentifier: String, - mapToSelectFrom: ConcurrentMap): TokenBucket { - return mapToSelectFrom[TokenIndex(idx, tokenClass, tokenIdentifier)] ?: EMPTY_BUCKET - } +class VaultWatcherService( + private val tokenObserver: TokenObserver, + private val providedConfig: InMemorySelectionConfig +) : SingletonSerializeAsToken() { + + private val __backingMap: ConcurrentMap, String> = ConcurrentHashMap() + private val __indexed: ConcurrentMap, ConcurrentMap> = ConcurrentHashMap( + providedConfig.indexingStrategies.map { it.ownerType to ConcurrentHashMap() }.toMap() + ) + + private val indexViewCreationLock: ReentrantReadWriteLock = ReentrantReadWriteLock() + private val UPDATER = tokenObserver.updaterThread + + enum class IndexingType(val ownerType: Class) { + + EXTERNAL_ID(Holder.MappedIdentity::class.java), + PUBLIC_KEY(Holder.KeyIdentity::class.java); + + companion object { + fun fromHolder(holder: Class): IndexingType { + return when (holder) { + Holder.MappedIdentity::class.java -> { + EXTERNAL_ID + } + + Holder.KeyIdentity::class.java -> { + PUBLIC_KEY + } + else -> throw IllegalArgumentException("Unknown Holder type: $holder") + } + } + } + + } + + constructor(appServiceHub: AppServiceHub) : this( + getObservableFromAppServiceHub(appServiceHub), + InMemorySelectionConfig.parse(appServiceHub.getAppContext().config) + ) + + companion object { + val LOG = contextLogger() + + private fun getObservableFromAppServiceHub(appServiceHub: AppServiceHub): TokenObserver { + val updaterThread = Executors.newSingleThreadScheduledExecutor() + val config = appServiceHub.cordappProvider.getAppContext().config + val configOptions: InMemorySelectionConfig = InMemorySelectionConfig.parse(config) + + if (!configOptions.enabled) { + LOG.info("Disabling inMemory token selection - refer to documentation on how to enable") + return TokenObserver(emptyList(), Observable.empty(), { _, _ -> + Holder.UnmappedIdentity() + }, updaterThread = updaterThread) + } + + val ownerProvider: (StateAndRef, IndexingType) -> Holder = { token, indexingType -> + when (indexingType) { + IndexingType.PUBLIC_KEY -> Holder.KeyIdentity(token.state.data.holder.owningKey) + IndexingType.EXTERNAL_ID -> { + val owningKey = token.state.data.holder.owningKey + lookupExternalIdFromKey(owningKey, appServiceHub) + } + } + } + + + val pageSize = configOptions.pageSize + var currentPage = DEFAULT_PAGE_NUM + val asyncLoader = object : ((Vault.Update) -> Unit) -> Unit { + override fun invoke(callback: (Vault.Update) -> Unit) { + LOG.info("Starting async token loading from vault") + + val classGraph = ClassGraph() + classGraph.enableClassInfo() + + val scanResultFuture = CompletableFuture.supplyAsync(Supplier { + classGraph.scan() + }, updaterThread) + + scanResultFuture.thenApplyAsync(Function { scanResult -> + val subclasses : Set> = scanResult.getSubclasses(FungibleToken::class.java.canonicalName) + .map { it.name } + .map { Class.forName(it) as Class }.toSet() + + val enrichedClasses = (subclasses - setOf(FungibleToken::class.java)) + LOG.info("Enriching token query with types: $enrichedClasses") + thread { + LOG.info("Querying for tokens of types: $subclasses") + try { + var shouldLoop = true + while (shouldLoop) { + val newlyLoadedStates = appServiceHub.vaultService.queryBy( + paging = PageSpecification(pageNumber = currentPage, pageSize = pageSize), + criteria = QueryCriteria.VaultQueryCriteria(contractStateTypes = subclasses), + sorting = sortByTimeStampAscending() + ).states.toSet() + callback(Vault.Update(emptySet(), newlyLoadedStates)) + LOG.info("publishing ${newlyLoadedStates.size} to async state loading callback") + shouldLoop = newlyLoadedStates.isNotEmpty() + LOG.debug("shouldLoop=${shouldLoop}") + currentPage++ + } + LOG.info("finished token loading") + } catch (t: Throwable) { + LOG.error("Token Loading Failed due to: ", t) + } + }.start() + }, updaterThread) + } + } + + val (_, vaultObservable) = appServiceHub.vaultService.trackBy( + contractStateType = FungibleToken::class.java, + paging = PageSpecification(pageNumber = DEFAULT_PAGE_NUM, pageSize = 1), + criteria = QueryCriteria.VaultQueryCriteria(status = Vault.StateStatus.ALL), + sorting = sortByStateRefAscending() + ) + + + return TokenObserver(emptyList(), uncheckedCast(vaultObservable), ownerProvider, asyncLoader, updaterThread) + } + } + + init { + addTokensToCache(tokenObserver.initialValues) + tokenObserver.source.doOnError { + LOG.error("received error from observable", it) + } + tokenObserver.startLoading(::onVaultUpdate) + tokenObserver.source.subscribe(::onVaultUpdate) + } + + private fun processToken(token: StateAndRef, indexingType: IndexingType): TokenIndex { + val owner = tokenObserver.ownerProvider(token, indexingType) + val type = token.state.data.amount.token.tokenType.tokenClass + val typeId = token.state.data.amount.token.tokenType.tokenIdentifier + return TokenIndex(owner, type, typeId) + } + + fun onVaultUpdate(t: Vault.Update) { + UPDATER.submit { + LOG.info("received token vault update with ${t.consumed.size} consumed states and: ${t.produced.size} produced states") + try { + removeTokensFromCache(t.consumed) + addTokensToCache(t.produced) + } catch (t: Throwable) { + //we DO NOT want to kill the observable - as a single exception will terminate the feed + LOG.error("Failure during token cache update", t) + } + } + } + + private fun removeTokensFromCache(stateAndRefs: Collection>) { + indexViewCreationLock.read { + for (stateAndRef in stateAndRefs) { + val existingMark = __backingMap.remove(stateAndRef) + existingMark + ?: LOG.warn("Attempted to remove existing token ${stateAndRef.ref}, but it was not found this suggests incorrect vault behaviours") + for (key in __indexed.keys) { + val index = processToken(stateAndRef, IndexingType.fromHolder(key)) + val indexedViewForHolder = __indexed[key] + indexedViewForHolder + ?: LOG.warn("tried to obtain an indexed view for holder type: $key but was not found in set of indexed views") + + val bucketForIndex: TokenBucket? = indexedViewForHolder?.get(index) + bucketForIndex?.remove(stateAndRef) + } + } + } + } + + private fun addTokensToCache(stateAndRefs: Collection>) { + indexViewCreationLock.read { + for (stateAndRef in stateAndRefs) { + if (stateAndRef.state.encumbrance != null){ + continue + } + val existingMark = __backingMap.putIfAbsent(stateAndRef, PLACE_HOLDER) + existingMark?.let { + LOG.warn("Attempted to overwrite existing token ${stateAndRef.ref}, this suggests incorrect vault behaviours") + } + for (key in __indexed.keys) { + val index = processToken(stateAndRef, IndexingType.fromHolder(key)) + val indexedViewForHolder = __indexed[key] + ?: throw IllegalStateException("tried to obtain an indexed view for holder type: $key but was not found in set of indexed views") + val bucketForIndex: TokenBucket = indexedViewForHolder.computeIfAbsent(index) { + TokenBucket() + } + bucketForIndex.add(stateAndRef) + } + } + } + } + + private fun getOrCreateIndexViewForHolderType(holderType: Class): ConcurrentMap { + return __indexed[holderType] ?: indexViewCreationLock.write { + __indexed[holderType] ?: generateNewIndexedView(holderType) + } + } + + private fun generateNewIndexedView(holderType: Class): ConcurrentMap { + val indexedViewForHolder: ConcurrentMap = ConcurrentHashMap() + for (stateAndRef in __backingMap.keys) { + val index = processToken(stateAndRef, IndexingType.fromHolder(holderType)) + val bucketForIndex: TokenBucket = indexedViewForHolder.computeIfAbsent(index) { + TokenBucket() + } + bucketForIndex.add(stateAndRef) + } + __indexed[holderType] = indexedViewForHolder + return indexedViewForHolder + } + + fun lockTokensExternal(list: List>, knownSelectionId: String, autoUnlockDelay: Duration? = null) { + list.forEach { + __backingMap.replace(it, PLACE_HOLDER, knownSelectionId) + } + + if (autoUnlockDelay != null) { + UPDATER.schedule({ + list.forEach { + unlockToken(it, knownSelectionId) + } + }, autoUnlockDelay.toMillis(), TimeUnit.MILLISECONDS) + } + } + + fun selectTokens( + owner: Holder, + requiredAmount: Amount, + predicate: ((StateAndRef) -> Boolean) = { true }, + allowShortfall: Boolean = false, + autoUnlockDelay: Duration = Duration.ofMinutes(5), + selectionId: String + ): List> { + //we have to handle both cases + //1 when passed a raw TokenType - it's likely that the selecting entity does not care about the issuer and so we cannot constrain all selections to using IssuedTokenType + //2 when passed an IssuedTokenType - it's likely that the selecting entity does care about the issuer, and so we must filter all tokens which do not match the issuer. + val enrichedPredicate: AtomicReference<(StateAndRef) -> Boolean> = AtomicReference(if (requiredAmount.token is IssuedTokenType) { + val issuer = (requiredAmount.token as IssuedTokenType).issuer + { token -> + predicate(token) && token.state.data.issuer == issuer + } + } else { + predicate + }) + + val lockedTokens = mutableListOf>() + val bucket: Iterable> = if (owner is Holder.TokenOnly) { + val currentPredicate = enrichedPredicate.get() + //why do we do this? It doesn't really make sense to index on token type, as it's very likely that there will be very few types of tokens in a given vault + //so instead of relying on an indexed view, we can create a predicate on the fly which will constrain the selection to the correct token type + //we will revisit in future if this assumption turns out to be wrong + enrichedPredicate.set { + val stateTokenType = it.state.data.tokenType + currentPredicate(it) && + stateTokenType.fractionDigits == requiredAmount.token.fractionDigits && + requiredAmount.token.tokenClass == stateTokenType.tokenClass && + requiredAmount.token.tokenIdentifier == stateTokenType.tokenIdentifier + } + __backingMap.keys + } else { + val indexedView = getOrCreateIndexViewForHolderType(owner.javaClass) + getTokenBucket(owner, requiredAmount.token.tokenClass, requiredAmount.token.tokenIdentifier, indexedView) + } + + val requiredAmountWithoutIssuer = requiredAmount.withoutIssuer() + var amountLocked: Amount = requiredAmountWithoutIssuer.copy(quantity = 0) + // this is the running total of soft locked tokens that we encounter until the target token amount is reached + var amountAlreadySoftLocked: Amount = requiredAmountWithoutIssuer.copy(quantity = 0) + val finalPredicate = enrichedPredicate.get() + for (tokenStateAndRef in bucket) { + // Does the token satisfy the (optional) predicate eg. issuer? + if (finalPredicate.invoke(tokenStateAndRef)) { + val tokenAmount = uncheckedCast(tokenStateAndRef.state.data.amount.withoutIssuer()) + // if so, race to lock the token, expected oldValue = PLACE_HOLDER + if (__backingMap.replace(tokenStateAndRef, PLACE_HOLDER, selectionId)) { + // we won the race to lock this token + lockedTokens.add(tokenStateAndRef) + amountLocked += tokenAmount + if (amountLocked >= requiredAmountWithoutIssuer) { + break + } + } else { + amountAlreadySoftLocked += tokenAmount + } + } + } + + if (!allowShortfall && amountLocked < requiredAmountWithoutIssuer) { + lockedTokens.forEach { + unlockToken(it, selectionId) + } + if (amountLocked + amountAlreadySoftLocked < requiredAmountWithoutIssuer) { + throw InsufficientBalanceException("Insufficient spendable states identified for $requiredAmount.") + } else { + throw InsufficientNotLockedBalanceException("Insufficient not-locked spendable states identified for $requiredAmount.") + } + } + + UPDATER.schedule({ + lockedTokens.forEach { + unlockToken(it, selectionId) + } + }, autoUnlockDelay.toMillis(), TimeUnit.MILLISECONDS) + + return uncheckedCast(lockedTokens) + } + + fun unlockToken(it: StateAndRef, selectionId: String) { + __backingMap.replace(it, selectionId, PLACE_HOLDER) + } + + fun isTokenLocked(it: StateAndRef, lockId: String? = null): Boolean { + return if (lockId != null) { + __backingMap[it] == lockId + } else __backingMap[it] != PLACE_HOLDER + } + + private fun getTokenBucket( + idx: Holder, + tokenClass: Class<*>, + tokenIdentifier: String, + mapToSelectFrom: ConcurrentMap + ): TokenBucket { + return mapToSelectFrom[TokenIndex(idx, tokenClass, tokenIdentifier)] ?: EMPTY_BUCKET + } } -class TokenObserver(val initialValues: List>, - val source: Observable>, - val ownerProvider: ((StateAndRef, VaultWatcherService.IndexingType) -> Holder), - inline val asyncLoader: ((Vault.Update) -> Unit) -> Unit = { _ -> }) { - - fun startLoading(loadingCallBack: (Vault.Update) -> Unit) { - asyncLoader(loadingCallBack) - } +class TokenObserver( + val initialValues: List>, + val source: Observable>, + val ownerProvider: ((StateAndRef, VaultWatcherService.IndexingType) -> Holder), + inline val asyncLoader: ((Vault.Update) -> Unit) -> Unit = { _ -> }, + val updaterThread: ScheduledExecutorService +) { + + fun startLoading(loadingCallBack: (Vault.Update) -> Unit) { + asyncLoader(loadingCallBack) + } } -class TokenBucket(set: MutableSet> = ConcurrentHashMap, Boolean>().keySet(true)) : MutableSet> by set +class TokenBucket(set: MutableSet> = ConcurrentHashMap, Boolean>().keySet(true)) : + MutableSet> by set data class TokenIndex(val owner: Holder, val tokenClazz: Class<*>, val tokenIdentifier: String)