diff --git a/dex-it-common/src/main/scala/com/wavesplatform/dex/it/api/dex/AsyncEnrichedDexApi.scala b/dex-it-common/src/main/scala/com/wavesplatform/dex/it/api/dex/AsyncEnrichedDexApi.scala index 9e9dcc6c3c..24273a5585 100644 --- a/dex-it-common/src/main/scala/com/wavesplatform/dex/it/api/dex/AsyncEnrichedDexApi.scala +++ b/dex-it-common/src/main/scala/com/wavesplatform/dex/it/api/dex/AsyncEnrichedDexApi.scala @@ -444,6 +444,28 @@ class AsyncEnrichedDexApi(apiKey: String, host: => InetSocketAddress)(implicit e .headers(headers) } + override def addCustomFeeAssets(assets: Set[Asset], headers: Map[String, String]): AsyncEnrichedDexApi.R[HttpMessage] = mk { + basicRequest + .post(uri"$apiUri/matcher/settings/custom-fee-assets") + .followRedirects(false) + .headers(headers) + .body(Json.stringify(Json.toJson(HttpCustomFeeAssets(assets)))) + } + + override def addCustomFeeAssets(assets: Set[Asset]): AsyncEnrichedDexApi.R[HttpMessage] = + addCustomFeeAssets(assets, apiKeyHeaders) + + override def deleteCustomFeeAssets(assets: Set[Asset], headers: Map[String, String]): AsyncEnrichedDexApi.R[HttpMessage] = mk { + basicRequest + .delete(uri"$apiUri/matcher/settings/custom-fee-assets") + .followRedirects(false) + .headers(headers) + .body(Json.stringify(Json.toJson(HttpCustomFeeAssets(assets)))) + } + + override def deleteCustomFeeAssets(assets: Set[Asset]): AsyncEnrichedDexApi.R[HttpMessage] = + deleteCustomFeeAssets(assets, apiKeyHeaders) + override def deleteOrderBookWithKey(amountAsset: String, priceAsset: String, headers: Map[String, String]): R[HttpMessage] = mk { basicRequest .delete(uri"$apiUri/matcher/orderbook/$amountAsset/$priceAsset") diff --git a/dex-it-common/src/main/scala/com/wavesplatform/dex/it/api/dex/DexApi.scala b/dex-it-common/src/main/scala/com/wavesplatform/dex/it/api/dex/DexApi.scala index e339b4d821..207eeba83c 100644 --- a/dex-it-common/src/main/scala/com/wavesplatform/dex/it/api/dex/DexApi.scala +++ b/dex-it-common/src/main/scala/com/wavesplatform/dex/it/api/dex/DexApi.scala @@ -199,6 +199,11 @@ trait DexApi[F[_]] { def cancelAllInOrderBookWithKey(amountAsset: String, priceAsset: String, headers: Map[String, String]): F[HttpMessage] def cancelAllInOrderBookWithKey(assetPair: AssetPair): F[HttpMessage] + def addCustomFeeAssets(assets: Set[Asset], headers: Map[String, String]): F[HttpMessage] + def addCustomFeeAssets(assets: Set[Asset]): F[HttpMessage] + def deleteCustomFeeAssets(assets: Set[Asset], headers: Map[String, String]): F[HttpMessage] + def deleteCustomFeeAssets(assets: Set[Asset]): F[HttpMessage] + def upsertAssetRate(assetId: String, rate: Double, headers: Map[String, String] = Map.empty): F[HttpMessage] def upsertAssetRate(asset: Asset, rate: Double): F[HttpMessage] def upsertAssetRate(asset: Asset, rate: String): F[HttpMessage] diff --git a/dex-it/src/test/scala/com/wavesplatform/it/sync/orders/OrderCompositeFeeTestSuite.scala b/dex-it/src/test/scala/com/wavesplatform/it/sync/orders/OrderCompositeFeeTestSuite.scala index bebc822bde..e5e9a324eb 100644 --- a/dex-it/src/test/scala/com/wavesplatform/it/sync/orders/OrderCompositeFeeTestSuite.scala +++ b/dex-it/src/test/scala/com/wavesplatform/it/sync/orders/OrderCompositeFeeTestSuite.scala @@ -1,8 +1,10 @@ package com.wavesplatform.it.sync.orders import com.typesafe.config.{Config, ConfigFactory} +import com.wavesplatform.dex.api.http.entities.HttpCalculatedFeeResponse.CalculatedFee import com.wavesplatform.dex.api.http.entities.HttpOrderStatus import com.wavesplatform.dex.api.http.entities.HttpOrderStatus.Status +import com.wavesplatform.dex.domain.asset.Asset import com.wavesplatform.dex.domain.order.OrderType import com.wavesplatform.dex.error.FeeNotEnough @@ -81,6 +83,67 @@ final class OrderCompositeFeeTestSuite extends OrderFeeBaseTestSuite { aliceBalance2 - aliceBalance1 shouldBe 5.waves - baseFee / 2 bobBalance2 - bobBalance1 shouldBe -5.waves - baseFee } + + "add custom fee assets, but not use it until offset" in { + dex1.api.addCustomFeeAssets(Set(Asset.Waves, btc, usd)) + + dex1.api.calculateFee(wavesUsdPair, OrderType.SELL, 10.waves, 25.usd) + .base.value shouldBe CalculatedFee(Asset.Waves, baseFee) + + dex1.api.calculateFee(wavesBtcPair, OrderType.SELL, 10.waves, 1.btc) + .base.value shouldBe CalculatedFee(Asset.Waves, 0.2.waves) + + dex1.api.place(mkOrder(alice, wavesUsdnPair, OrderType.SELL, 20.waves, 5.usdn, matcherFee = baseFee)) // just for offset to be reached + dex1.api.place(mkOrder(alice, wavesUsdnPair, OrderType.SELL, 20.waves, 6.usdn, matcherFee = baseFee)) // just for offset to be reached + + dex1.api.place(mkOrder(alice, wavesBtcPair, OrderType.SELL, 10.waves, 1.btc, matcherFee = 0.04.waves)) + dex1.api.place(mkOrder(alice, wavesUsdPair, OrderType.SELL, 10.waves, 1.usd, matcherFee = 0.04.waves)) + + dex1.api.getOrderHistoryByPKWithSig(alice, activeOnly = Some(true)).foreach { item => + dex1.api.cancelOneOrderWithKey(item.id, Some(alice.publicKey)) + } + + dex1.api.calculateFee(wavesBtcPair, OrderType.SELL, 10.waves, 1.btc) + .base.value shouldBe CalculatedFee(Asset.Waves, 400000L) + + dex1.api.calculateFee(wavesUsdPair, OrderType.SELL, 10.waves, 1.usd) + .base.value shouldBe CalculatedFee(Asset.Waves, 400000L) + + dex1.api.deleteCustomFeeAssets(Set(usd)) + + dex1.api.place(mkOrder(alice, wavesBtcPair, OrderType.SELL, 10.waves, 1.btc, matcherFee = 0.04.waves)) + dex1.api.place(mkOrder(alice, wavesUsdPair, OrderType.SELL, 10.waves, 1.usd, matcherFee = baseFee)) + + dex1.api.calculateFee(wavesBtcPair, OrderType.SELL, 10.waves, 1.btc) + .base.value shouldBe CalculatedFee(Asset.Waves, 400000L) + + dex1.api.calculateFee(wavesUsdPair, OrderType.SELL, 10.waves, 1.usd) + .base.value shouldBe CalculatedFee(Asset.Waves, baseFee) + + } + + "read all custom assets action from levelDB at start" in { + (1 to 20).foreach { i => + val order = mkOrder(alice, wavesUsdnPair, OrderType.SELL, 1.waves, (5 + i).usdn, matcherFee = baseFee) + dex1.api.place(order) + dex1.api.cancelOrderById(order) + } + + dex1.api.saveSnapshots + + dex1.restart() + + dex1.api.place(mkOrder(alice, wavesBtcPair, OrderType.SELL, 10.waves, 1.btc, matcherFee = 0.04.waves)) + dex1.api.place(mkOrder(alice, wavesUsdPair, OrderType.SELL, 10.waves, 1.usd, matcherFee = baseFee)) + + dex1.api.calculateFee(wavesBtcPair, OrderType.SELL, 10.waves, 1.btc) + .base.value shouldBe CalculatedFee(Asset.Waves, 400000L) + + dex1.api.calculateFee(wavesUsdPair, OrderType.SELL, 10.waves, 1.usd) + .base.value shouldBe CalculatedFee(Asset.Waves, baseFee) + + } + } private def multiplyAmountByDouble(a: Long, d: Double): Long = @@ -89,12 +152,12 @@ final class OrderCompositeFeeTestSuite extends OrderFeeBaseTestSuite { override protected def dexInitialSuiteConfig: Config = ConfigFactory.parseString( s""" |waves.dex { - | price-assets = [ "$UsdId", "$BtcId", "WAVES" ] + | price-assets = [ "$UsdId", "$UsdnId", "$BtcId", "WAVES" ] | allowed-order-versions = [1, 2, 3] | order-fee.-1 { | mode = composite | composite { - | custom { + | custom-pairs { | "WAVES-$BtcId": { | mode = percent | percent { @@ -111,17 +174,50 @@ final class OrderCompositeFeeTestSuite extends OrderFeeBaseTestSuite { | base-taker-fee = $baseFee | } | } + | discount { + | asset = "$WctId" + | value = 0.1 + | } | } | } + | order-fee.11 { + | mode = composite + | composite { + | dynamic-custom-assets = true + | custom-pairs {} + | custom-assets { + | assets = [] + | settings { + | mode = "fixed" + | fixed { + | asset = "WAVES" + | min-fee = 400000 + | } + | } + | } + | default { + | mode = dynamic + | dynamic { + | base-maker-fee = $baseFee + | base-taker-fee = $baseFee + | } + | } + | discount { + | asset = "$WctId" + | value = 0.1 + | } + | } + | } |} """.stripMargin ) override protected def beforeAll(): Unit = { wavesNode1.start() - broadcastAndAwait(IssueUsdTx, IssueBtcTx) + broadcastAndAwait(IssueUsdTx, IssueBtcTx, IssueUsdnTx, IssueWctTx) broadcastAndAwait(mkTransfer(bob, alice, 100.btc, btc)) dex1.start() + upsertAssetRate((usdn, 1.0), (wct, 1.0)) } } diff --git a/dex-pb/src/main/protobuf/dex_order.proto b/dex-pb/src/main/protobuf/dex_order.proto index a62b0b4872..1d88e71e33 100644 --- a/dex-pb/src/main/protobuf/dex_order.proto +++ b/dex-pb/src/main/protobuf/dex_order.proto @@ -67,6 +67,14 @@ message CancelOrder { bytes owner = 3; }; +message AddFeeCustomAssets { + repeated bytes asset_id = 1; +}; + +message DeleteFeeCustomAssets { + repeated bytes asset_id = 1; +}; + message ValidatedCommand { AssetPair asset_pair = 1; bytes kamon_ctx = 2; @@ -76,5 +84,7 @@ message ValidatedCommand { CancelOrder cancel_order = 5; google.protobuf.Empty cancel_all_orders = 6; google.protobuf.Empty delete_order_book = 7; + AddFeeCustomAssets add_fee_custom_assets = 8; + DeleteFeeCustomAssets delete_fee_custom_assets = 9; } }; diff --git a/dex/src/main/resources/application.conf b/dex/src/main/resources/application.conf index 04d8d1128a..1306f47d06 100644 --- a/dex/src/main/resources/application.conf +++ b/dex/src/main/resources/application.conf @@ -273,7 +273,9 @@ waves.dex { } composite { - custom { + dynamic-custom-assets = false # use dynamic assets from kafka and levelDB for custom-assets section + + custom-pairs { DWgwcZTMhSvnyYCoWLRUXXSH1RSkzThXLJhww9gwkqdn-25FEqEjRkqK6yCkiT7Lz6SAYz7gUFCtxfCChnrVFD5AT { mode = "percent" percent { @@ -296,7 +298,7 @@ waves.dex { # WAVES or Issued asset ID in base 58 assets = [] - # will be applied to all valid pair combinations from the 'assets' field + # will be applied to all valid pair combinations from the 'assets' field or from kafka and levelDB, if dynamic-custom-assets set to true settings { mode = "percent" percent { diff --git a/dex/src/main/scala/com/wavesplatform/dex/Application.scala b/dex/src/main/scala/com/wavesplatform/dex/Application.scala index d75d94d070..6a5248792e 100644 --- a/dex/src/main/scala/com/wavesplatform/dex/Application.scala +++ b/dex/src/main/scala/com/wavesplatform/dex/Application.scala @@ -33,6 +33,7 @@ import com.wavesplatform.dex.api.routes.ApiRoute import com.wavesplatform.dex.api.ws.actors.{WsExternalClientDirectoryActor, WsInternalBroadcastActor} import com.wavesplatform.dex.api.ws.routes.MatcherWebSocketRoute import com.wavesplatform.dex.app._ +import com.wavesplatform.dex.caches.OrderFeeSettingsCache.{AssetsActionForOffset, CustomAssetFeeState} import com.wavesplatform.dex.caches.{MatchingRulesCache, OrderFeeSettingsCache, RateCache} import com.wavesplatform.dex.db._ import com.wavesplatform.dex.db.leveldb.{openDb, LevelDb} @@ -141,6 +142,7 @@ class Application(settings: MatcherSettings, config: Config)(implicit val actorS private val assetPairsDb = AssetPairsDb.levelDb(commonLevelDb) private val orderBookSnapshotDb = OrderBookSnapshotDb.levelDb(snapshotsLevelDb) private val orderDb = OrderDb.levelDb(settings.orderDb, db, levelDbEcMap) + private val customFeeAssetsDb = CustomFeeAssetsDb.levelDb(commonLevelDb) private val assetsCache = AssetsCache.from(AssetsDb.levelDb(commonLevelDb)) @@ -211,8 +213,14 @@ class Application(settings: MatcherSettings, config: Config)(implicit val actorS private val pairBuilder = new AssetPairBuilder(settings, getAndCacheDescription, settings.blacklistedAssets) + private val allFeeAssetsActions = Await.result(customFeeAssetsDb.all(), 1.minute) + + private val cfaState = allFeeAssetsActions.foldLeft(CustomAssetFeeState.empty) { + case (state, action) => state.applyAssetsActionForOffset(action) + } + private val orderFeeSettingsCache = - new OrderFeeSettingsCache(settings.orderFee.view.mapValues(_(pairBuilder.quickValidateAssetPair(_).isRight)).toMap) + new OrderFeeSettingsCache(settings.orderFee.view.mapValues(_(pairBuilder.quickValidateAssetPair(_).isRight)).toMap, cfaState) private val exchangeTxStorage = ExchangeTxStorage.levelDB(commonLevelDb) @@ -506,8 +514,21 @@ class Application(settings: MatcherSettings, config: Config)(implicit val actorS currentOffset = () => lastProcessedOffset ) + private val customFeeAssetsRoute = new CustomAssetsFeeRoute(apiKeyHashes, orderFeeSettingsCache, storeCommand) + private val v0HttpRoute = - Seq(infoRoute, ratesRoute, debugRoute, marketsRoute, historyRoute, placeRoute, cancelRoute, balancesRoute, transactionsRoute) + Seq( + infoRoute, + ratesRoute, + debugRoute, + marketsRoute, + historyRoute, + placeRoute, + cancelRoute, + balancesRoute, + transactionsRoute, + customFeeAssetsRoute + ) private val v1HttpRoute = Seq(OrderBookRoute( assetPairBuilder = pairBuilder, @@ -666,37 +687,77 @@ class Application(settings: MatcherSettings, config: Config)(implicit val actorS val loadAssets = eventAssets.traverse(getAndCacheDescription).value loadAssets.flatMap { _ => - val assetPairs: Set[AssetPair] = xs - .map { validatedCommandWithMeta => - lazy val handleValidatedCommandWithMeta = { - log.debug(s"Consumed $validatedCommandWithMeta") - orderBookDirectoryActorRef ! validatedCommandWithMeta - lastProcessedOffset = validatedCommandWithMeta.offset - validatedCommandWithMeta.command.assetPair + val (pairs, future) = xs.foldLeft((Set.empty[AssetPair], Future.unit)) { + case ((pairs, future), validatedCommand) => + log.debug(s"Consumed $validatedCommand") + validatedCommand.command match { + case cmd: OrderBookValidatedCommand => + lazy val handleCommand = { + orderBookDirectoryActorRef ! OrderBookDirectoryActor.ApplyValidatedCommandWithPair( + validatedCommand.offset, + validatedCommand.timestamp, + cmd + ) + lastProcessedOffset = validatedCommand.offset + cmd.assetPair + } + + val newFuture = future.map { _ => + cmd.maybeCtx.fold(handleCommand) { ctx => + if (status == MatcherStatus.Working) { + val parentSpan = ctx.get(kamon.trace.Span.Key) + val span = + Kamon.spanBuilder(s"consumedValidatedCommandWithMeta") + .asChildOf(parentSpan) + .traceId(parentSpan.trace.id) + .tag(ctx.tags) + .samplingDecision(KamonTraceUtils.Sample) + .doNotTrackMetrics() + .start() + Kamon.runWithSpan[AssetPair](span)(handleCommand) + } else + handleCommand + } + () + } + (pairs + cmd.assetPair, newFuture) + case cmd: ValidatedCommand.AddCustomAssetToFee => + val action = AssetsActionForOffset(assets = cmd.assets, offset = validatedCommand.offset, isAdded = true) + val newFuture = customFeeAssetsDb.save(action).tap(_.onComplete { + case Failure(ex) => + log.error("Error while event processing occurred: ", ex) + forceStopApplication(EventProcessingError) + case Success(_) => + orderFeeSettingsCache.applyAssetsActionForOffset(action) + lastProcessedOffset = action.offset + }) + (pairs, newFuture) + + case cmd: ValidatedCommand.DeleteCustomAssetToFee => + val action = AssetsActionForOffset(assets = cmd.assets, offset = validatedCommand.offset, isAdded = false) + val newFuture = customFeeAssetsDb.save(action).tap(_.onComplete { + case Failure(ex) => + log.error("Error while event processing occurred: ", ex) + forceStopApplication(EventProcessingError) + case Success(_) => + orderFeeSettingsCache.applyAssetsActionForOffset(action) + lastProcessedOffset = action.offset + }) + (pairs, newFuture) } + } - validatedCommandWithMeta.command.maybeCtx.fold(handleValidatedCommandWithMeta) { ctx => - if (status == MatcherStatus.Working) { - val parentSpan = ctx.get(kamon.trace.Span.Key) - val span = - Kamon.spanBuilder(s"consumedValidatedCommandWithMeta") - .asChildOf(parentSpan) - .traceId(parentSpan.trace.id) - .tag(ctx.tags) - .samplingDecision(KamonTraceUtils.Sample) - .doNotTrackMetrics() - .start() - Kamon.runWithSpan[AssetPair](span)(handleValidatedCommandWithMeta) - } else - handleValidatedCommandWithMeta - } + if (pairs.nonEmpty) + future.flatMap { _ => + orderBookDirectoryActorRef + .ask(OrderBookDirectoryActor.PingAll(pairs))(processConsumedTimeout) + .recover { + case NonFatal(e) => + log.error("PingAll is timed out!", e) + }.map(_ => ()) } - .to(Set) + else future - orderBookDirectoryActorRef - .ask(OrderBookDirectoryActor.PingAll(assetPairs))(processConsumedTimeout) - .recover { case NonFatal(e) => log.error("PingAll is timed out!", e) } - .map(_ => ()) } andThen { case Failure(ex) => log.error("Error while event processing occurred: ", ex) diff --git a/dex/src/main/scala/com/wavesplatform/dex/actors/OrderBookDirectoryActor.scala b/dex/src/main/scala/com/wavesplatform/dex/actors/OrderBookDirectoryActor.scala index 7932f5570f..8df6171194 100644 --- a/dex/src/main/scala/com/wavesplatform/dex/actors/OrderBookDirectoryActor.scala +++ b/dex/src/main/scala/com/wavesplatform/dex/actors/OrderBookDirectoryActor.scala @@ -16,7 +16,7 @@ import com.wavesplatform.dex.error import com.wavesplatform.dex.error.MatcherError import com.wavesplatform.dex.grpc.integration.dto.BriefAssetDescription import com.wavesplatform.dex.queue.ValidatedCommandWithMeta.{Offset => EventOffset} -import com.wavesplatform.dex.queue.{ValidatedCommand, ValidatedCommandWithMeta} +import com.wavesplatform.dex.queue.{OrderBookValidatedCommand, ValidatedCommand} import com.wavesplatform.dex.settings.MatcherSettings import kamon.Kamon import scorex.utils._ @@ -147,7 +147,7 @@ class OrderBookDirectoryActor( private def createSnapshotFor(orderbook: ActorRef, offset: EventOffset): Unit = orderbook ! SaveSnapshot(offset) - private def createSnapshotFor(offset: ValidatedCommandWithMeta.Offset): Unit = + private def createSnapshotFor(offset: EventOffset): Unit = snapshotsState.requiredSnapshot(offset).foreach { case (assetPair, updatedSnapshotState) => orderBook(assetPair) match { case Some(Right(actorRef)) => @@ -171,7 +171,7 @@ class OrderBookDirectoryActor( case GetSnapshotOffsets => sender() ! SnapshotOffsetsResponse(snapshotsState.snapshotOffsets) // DEX-1192 docs/places-and-cancels.md - case request: ValidatedCommandWithMeta => + case request: ApplyValidatedCommandWithPair => val currentLastProcessedNr = math.max(request.offset, lastProcessedNr) request.command match { case ValidatedCommand.DeleteOrderBook(assetPair, _) => @@ -386,6 +386,8 @@ object OrderBookDirectoryActor { case object GetSnapshotOffsets case class SnapshotOffsetsResponse(offsets: Map[AssetPair, Option[EventOffset]]) + final case class ApplyValidatedCommandWithPair(offset: EventOffset, timestamp: Long, command: OrderBookValidatedCommand) + case class MatcherRecovered(oldestEventNr: Long) case object Shutdown diff --git a/dex/src/main/scala/com/wavesplatform/dex/actors/orderbook/OrderBookActor.scala b/dex/src/main/scala/com/wavesplatform/dex/actors/orderbook/OrderBookActor.scala index ab60b5984d..32b9e446c1 100644 --- a/dex/src/main/scala/com/wavesplatform/dex/actors/orderbook/OrderBookActor.scala +++ b/dex/src/main/scala/com/wavesplatform/dex/actors/orderbook/OrderBookActor.scala @@ -7,7 +7,7 @@ import cats.data.NonEmptyList import cats.instances.option.catsStdInstancesForOption import cats.syntax.apply._ import cats.syntax.option._ -import com.wavesplatform.dex.actors.OrderBookDirectoryActor.SaveSnapshot +import com.wavesplatform.dex.actors.OrderBookDirectoryActor.{ApplyValidatedCommandWithPair, SaveSnapshot} import com.wavesplatform.dex.actors.address.AddressActor import com.wavesplatform.dex.actors.events.OrderEventsCoordinatorActor import com.wavesplatform.dex.actors.orderbook.OrderBookActor._ @@ -22,8 +22,8 @@ import com.wavesplatform.dex.metrics.TimerExt import com.wavesplatform.dex.model.Events._ import com.wavesplatform.dex.model.OrderBook.OrderBookUpdates import com.wavesplatform.dex.model._ +import com.wavesplatform.dex.queue.ValidatedCommand import com.wavesplatform.dex.queue.ValidatedCommandWithMeta.Offset -import com.wavesplatform.dex.queue.{ValidatedCommand, ValidatedCommandWithMeta} import com.wavesplatform.dex.settings.{DenormalizedMatchingRule, MatchingRule, OrderRestrictionsSettings} import com.wavesplatform.dex.time.Time import kamon.Kamon @@ -53,9 +53,9 @@ class OrderBookActor( private var aggregatedRef: typed.ActorRef[AggregatedOrderBookActor.InputMessage] = _ - private var savingSnapshot = Option.empty[ValidatedCommandWithMeta.Offset] - private var lastSavedSnapshotOffset = Option.empty[ValidatedCommandWithMeta.Offset] - private var lastProcessedOffset = Option.empty[ValidatedCommandWithMeta.Offset] + private var savingSnapshot = Option.empty[Offset] + private var lastSavedSnapshotOffset = Option.empty[Offset] + private var lastProcessedOffset = Option.empty[Offset] private val addTimer = Kamon.timer("matcher.orderbook.add").withTag("pair", assetPair.toString) private val cancelTimer = Kamon.timer("matcher.orderbook.cancel").withTag("pair", assetPair.toString) @@ -63,7 +63,7 @@ class OrderBookActor( private var actualRule: MatchingRule = normalizeMatchingRule(matchingRules.head) - private def actualizeRules(offset: ValidatedCommandWithMeta.Offset): Unit = { + private def actualizeRules(offset: Offset): Unit = { val actualRules = DenormalizedMatchingRule.skipOutdated(offset, matchingRules) if (matchingRules.head != actualRules.head) { matchingRules = actualRules @@ -127,7 +127,7 @@ class OrderBookActor( private def working: Receive = { // DEX-1192 docs/places-and-cancels.md - case request: ValidatedCommandWithMeta => + case request: ApplyValidatedCommandWithPair => actualizeRules(request.offset) lastProcessedOffset match { case Some(lastProcessed) if request.offset <= lastProcessed => // Already processed @@ -202,7 +202,7 @@ class OrderBookActor( } } - private def onCancelOrder(command: ValidatedCommandWithMeta, cancelCommand: ValidatedCommand.CancelOrder): Unit = cancelTimer.measure { + private def onCancelOrder(command: ApplyValidatedCommandWithPair, cancelCommand: ValidatedCommand.CancelOrder): Unit = cancelTimer.measure { orderBook.cancel(cancelCommand.orderId, toReason(cancelCommand.source), command.timestamp) match { case (updatedOrderBook, Some(cancelEvent), levelChanges) => log.trace(s"Applied $command") @@ -228,7 +228,7 @@ class OrderBookActor( } } - private def onAddOrder(command: ValidatedCommandWithMeta, acceptedOrder: AcceptedOrder): Unit = addTimer.measure { + private def onAddOrder(command: ApplyValidatedCommandWithPair, acceptedOrder: AcceptedOrder): Unit = addTimer.measure { log.trace(s"Applied $command, trying to match ...") process( command.timestamp, @@ -248,7 +248,7 @@ class OrderBookActor( super.preRestart(reason, message) } - private def saveSnapshotAt(globalEventNr: ValidatedCommandWithMeta.Offset): Unit = { + private def saveSnapshotAt(globalEventNr: Offset): Unit = { val saveSnapshot = (lastSavedSnapshotOffset, lastProcessedOffset).tupled.forall { case (saved, processed) => saved < processed } val toSave = if (saveSnapshot) Some(orderBook.snapshot) else None diff --git a/dex/src/main/scala/com/wavesplatform/dex/api/http/entities/HttpCustomFeeAssets.scala b/dex/src/main/scala/com/wavesplatform/dex/api/http/entities/HttpCustomFeeAssets.scala new file mode 100644 index 0000000000..c979d4e70d --- /dev/null +++ b/dex/src/main/scala/com/wavesplatform/dex/api/http/entities/HttpCustomFeeAssets.scala @@ -0,0 +1,11 @@ +package com.wavesplatform.dex.api.http.entities + +import com.wavesplatform.dex.domain.asset.Asset +import com.wavesplatform.dex.domain.asset.Asset._ +import play.api.libs.json.{Json, OFormat} + +final case class HttpCustomFeeAssets(assets: Set[Asset]) + +object HttpCustomFeeAssets { + implicit val httpCustomFeeAssetsFormat: OFormat[HttpCustomFeeAssets] = Json.format +} diff --git a/dex/src/main/scala/com/wavesplatform/dex/api/http/entities/HttpOrderFeeMode.scala b/dex/src/main/scala/com/wavesplatform/dex/api/http/entities/HttpOrderFeeMode.scala index 72a74b5877..c4f52e7604 100644 --- a/dex/src/main/scala/com/wavesplatform/dex/api/http/entities/HttpOrderFeeMode.scala +++ b/dex/src/main/scala/com/wavesplatform/dex/api/http/entities/HttpOrderFeeMode.scala @@ -130,7 +130,7 @@ object HttpOrderFeeMode { case x: OrderFeeSettings.DynamicSettings => FeeModeDynamic(x.maxBaseFee + matcherAccountFee) case OrderFeeSettings.FixedSettings(assetId, minFee) => FeeModeFixed(assetId, minFee) case OrderFeeSettings.PercentSettings(assetType, minFee, minFeeInWaves) => FeeModePercent(assetType, minFee, minFeeInWaves) - case cs @ OrderFeeSettings.CompositeSettings(default, _, _, discount, _) => + case cs @ OrderFeeSettings.CompositeSettings(default, _, _, _, discount, _) => FeeModeComposite( fromSettings(default, matcherAccountFee), cs.getAllCustomFeeSettings.view.mapValues(fromSettings(_, matcherAccountFee)).toMap, diff --git a/dex/src/main/scala/com/wavesplatform/dex/api/http/routes/v0/CustomAssetsFeeRoute.scala b/dex/src/main/scala/com/wavesplatform/dex/api/http/routes/v0/CustomAssetsFeeRoute.scala new file mode 100644 index 0000000000..2725242532 --- /dev/null +++ b/dex/src/main/scala/com/wavesplatform/dex/api/http/routes/v0/CustomAssetsFeeRoute.scala @@ -0,0 +1,117 @@ +package com.wavesplatform.dex.api.http.routes.v0 + +import akka.http.scaladsl.server.{Route, StandardRoute} +import akka.pattern.CircuitBreakerOpenException +import akka.stream.Materializer +import cats.syntax.either._ +import com.wavesplatform.dex._ +import com.wavesplatform.dex.api.http.SwaggerDocService +import com.wavesplatform.dex.api.http.directives.HttpKamonDirectives._ +import com.wavesplatform.dex.api.http.entities._ +import com.wavesplatform.dex.api.routes.{ApiRoute, AuthRoute} +import com.wavesplatform.dex.caches.OrderFeeSettingsCache +import com.wavesplatform.dex.domain.asset.Asset +import com.wavesplatform.dex.domain.utils.ScorexLogging +import com.wavesplatform.dex.error.MatcherError +import com.wavesplatform.dex.queue.MatcherQueue.StoreValidatedCommand +import com.wavesplatform.dex.queue.ValidatedCommand +import io.swagger.annotations._ + +import javax.ws.rs.Path +import scala.concurrent.{ExecutionContext, TimeoutException} +import scala.util.{Failure, Success} + +@Path("/matcher") +@Api() +final class CustomAssetsFeeRoute( + override val apiKeyHashes: List[Array[Byte]], + orderFeeSettingsCache: OrderFeeSettingsCache, + store: StoreValidatedCommand +)(implicit mat: Materializer) + extends ApiRoute + with AuthRoute + with ScorexLogging { + + implicit private val executionContext: ExecutionContext = mat.executionContext + + override lazy val route: Route = + pathPrefix("matcher" / "settings" / "custom-fee-assets")(addCustomFeeAssets ~ deleteCustomFeeAssets) + + @Path("/settings/custom-fee-assets#addCustomFeeAssets") + @ApiOperation( + value = "Add Custom Fee Assets", + notes = "Add Custom Fee Assets", + httpMethod = "POST", + authorizations = Array(new Authorization(SwaggerDocService.apiKeyDefinitionName)), + tags = Array("custom-fee-assets"), + response = classOf[HttpMessage] + ) + def addCustomFeeAssets: Route = + (pathEndOrSingleSlash & post & withAuth) { + entity(as[HttpCustomFeeAssets]) { customFeeAssets => + withMetricsAndTraces("addCustomFeeAssets") { + handleCustomAssetEvent( + customFeeAssets.assets, + "addCustomFeeAssets", + asset => !orderFeeSettingsCache.containsCustomFeeAsset(asset), + assets => ValidatedCommand.AddCustomAssetToFee(assets) + ) + } + } + } + + @Path("/settings/custom-fee-assets#deleteCustomFeeAssets") + @ApiOperation( + value = "Delete Custom Fee Assets", + notes = "Delete Custom Fee Assets", + httpMethod = "DELETE", + authorizations = Array(new Authorization(SwaggerDocService.apiKeyDefinitionName)), + tags = Array("custom-fee-assets"), + response = classOf[HttpMessage] + ) + def deleteCustomFeeAssets: Route = + (pathEndOrSingleSlash & delete & withAuth) { + entity(as[HttpCustomFeeAssets]) { customFeeAssets => + withMetricsAndTraces("deleteCustomFeeAssets") { + handleCustomAssetEvent( + customFeeAssets.assets, + "deleteCustomFeeAssets", + asset => orderFeeSettingsCache.containsCustomFeeAsset(asset), + assets => ValidatedCommand.DeleteCustomAssetToFee(assets) + ) + } + } + } + + private def handleCustomAssetEvent( + assets: Set[Asset], + actionName: String, + predicate: Asset => Boolean, + cmdConstructor: Set[Asset] => ValidatedCommand + ): StandardRoute = { + val assetsToHandle = assets.filter(predicate) + if (assetsToHandle.isEmpty) + complete(HttpMessage(s"There is no assets to do $actionName")) + else { + val cmd = cmdConstructor(assetsToHandle) + complete( + store(cmd).transform { + case Success(None) => Success(error.FeatureDisabled.asLeft[HttpMessage]) + case Success(_) => Success(HttpMessage(s"Successfully saved command for $actionName - $assetsToHandle").asRight[MatcherError]) + case Failure(e) => + val prefix = s"Store failed for $actionName - $assetsToHandle" + log.warn( + e match { + case _: TimeoutException => s"$prefix: timeout during storing $actionName - $assetsToHandle" + case _: CircuitBreakerOpenException => s"$prefix: fail fast due to circuit breaker" + case _ => prefix + }, + e + ) + Success(error.CanNotPersistEvent.asLeft[HttpMessage]) + } + ) + } + } + +} diff --git a/dex/src/main/scala/com/wavesplatform/dex/caches/OrderFeeSettingsCache.scala b/dex/src/main/scala/com/wavesplatform/dex/caches/OrderFeeSettingsCache.scala index ed38370212..d8e745f274 100644 --- a/dex/src/main/scala/com/wavesplatform/dex/caches/OrderFeeSettingsCache.scala +++ b/dex/src/main/scala/com/wavesplatform/dex/caches/OrderFeeSettingsCache.scala @@ -1,21 +1,92 @@ package com.wavesplatform.dex.caches +import com.wavesplatform.dex.caches.OrderFeeSettingsCache.{AssetsActionForOffset, CustomAssetFeeState} +import com.wavesplatform.dex.domain.asset.Asset +import com.wavesplatform.dex.domain.utils.ScorexLogging +import com.wavesplatform.dex.queue.ValidatedCommandWithMeta.Offset import com.wavesplatform.dex.settings.OrderFeeSettings +import java.util.concurrent.atomic.AtomicReference import scala.collection.immutable.TreeMap -class OrderFeeSettingsCache(orderFeeSettingsMap: Map[Long, OrderFeeSettings]) { +class OrderFeeSettingsCache( + orderFeeSettingsMap: Map[Offset, OrderFeeSettings], + initialAssetsState: CustomAssetFeeState = CustomAssetFeeState.empty +) extends ScorexLogging { + + log.debug(s"Initial assets state $initialAssetsState") + + private val customAssetsState = new AtomicReference[CustomAssetFeeState](initialAssetsState) private val allOrderFeeSettings = { if (orderFeeSettingsMap.isEmpty) throw new IllegalArgumentException("Order fee settings should contain at least 1 value!") - TreeMap.empty[Long, OrderFeeSettings] ++ orderFeeSettingsMap + TreeMap.empty[Offset, OrderFeeSettings] ++ orderFeeSettingsMap + } + + def applyAssetsActionForOffset(actionForOffset: AssetsActionForOffset): Unit = { + log.info(s"Applying $actionForOffset") + customAssetsState.updateAndGet(s => s.applyAssetsActionForOffset(actionForOffset)) } - def getSettingsForOffset(offset: Long): OrderFeeSettings = + def containsCustomFeeAsset(asset: Asset): Boolean = customAssetsState.get().getForLatestOffset().contains(asset) + + def getSettingsForOffset(offset: Offset): OrderFeeSettings = allOrderFeeSettings .takeWhile { case (o, _) => o <= offset } .lastOption - .map(_._2) + .map(_._2.withDynamicCustomAssets(customAssetsState.get().getAssetsForOffset(offset))) .getOrElse(throw new IllegalStateException(s"Order fee settings are not set for offset $offset")) } + +object OrderFeeSettingsCache { + + final case class AssetsActionForOffset(offset: Offset, assets: Set[Asset], isAdded: Boolean) + + final case class CustomAssetFeeState(assetFeeState: Map[Offset, Set[Asset]]) { + + private val assetsMapKeys = assetFeeState.keySet.toSeq.sorted + + val latestAssetOffsetOpt: Option[Offset] = assetsMapKeys.lastOption + + def getAssetsForOffset(offset: Offset): Set[Asset] = + assetsMapKeys + .takeWhile(_ < offset) + .lastOption + .flatMap(assetFeeState.get) + .getOrElse(Set.empty) // because it means there is nothing in map + + def applyAssetsActionForOffset(actionForOffset: AssetsActionForOffset): CustomAssetFeeState = + if (actionForOffset.isAdded) addAssets(actionForOffset.offset, actionForOffset.assets) + else removeAssets(actionForOffset.offset, actionForOffset.assets) + + def addAssets(offset: Offset, assets: Set[Asset]): CustomAssetFeeState = + latestAssetOffsetOpt.fold(CustomAssetFeeState(Map(offset -> assets))) { lastOffset => + val prevState = assetFeeState.getOrElse(lastOffset, Set.empty) + val newAssets = prevState ++ assets + CustomAssetFeeState(assetFeeState + (offset -> newAssets)) + } + + def removeAssets(offset: Offset, assets: Set[Asset]): CustomAssetFeeState = + latestAssetOffsetOpt.fold(CustomAssetFeeState(Map.empty)) { lastOffset => + assetFeeState.get(lastOffset) match { + case Some(prevState) => + val newAssets = prevState -- assets + CustomAssetFeeState(assetFeeState + (offset -> newAssets)) + case None => + CustomAssetFeeState(Map.empty) + } + } + + def getForLatestOffset(): Set[Asset] = + latestAssetOffsetOpt.flatMap(assetFeeState.get).getOrElse(Set.empty) + + } + + object CustomAssetFeeState { + + val empty: CustomAssetFeeState = CustomAssetFeeState(Map.empty[Offset, Set[Asset]]) + + } + +} diff --git a/dex/src/main/scala/com/wavesplatform/dex/db/CustomFeeAssetsDb.scala b/dex/src/main/scala/com/wavesplatform/dex/db/CustomFeeAssetsDb.scala new file mode 100644 index 0000000000..37ef7ae674 --- /dev/null +++ b/dex/src/main/scala/com/wavesplatform/dex/db/CustomFeeAssetsDb.scala @@ -0,0 +1,46 @@ +package com.wavesplatform.dex.db + +import com.google.common.primitives.Longs +import com.wavesplatform.dex.caches.OrderFeeSettingsCache.AssetsActionForOffset +import com.wavesplatform.dex.db.leveldb.LevelDb +import com.wavesplatform.dex.meta.getSimpleName +import com.wavesplatform.dex.tool.OnComplete + +trait CustomFeeAssetsDb[F[_]] { + def all(): F[Set[AssetsActionForOffset]] + def save(action: AssetsActionForOffset): F[Unit] +} + +object CustomFeeAssetsDb { + + private val cls = getSimpleName(this) + + def levelDb[F[_]: OnComplete](levelDb: LevelDb[F]): CustomFeeAssetsDb[F] = new CustomFeeAssetsDb[F] { + + def all(): F[Set[AssetsActionForOffset]] = + measureDb(cls, "all") { + levelDb.readOnly { ro => + val r = Set.newBuilder[AssetsActionForOffset] + + ro.iterateOver(DbKeys.FeeAssetsPrefix) { dbEntry => + val offset = Longs.fromByteArray(dbEntry.getKey.drop(2)) + val parsedData = DbKeys.customFeeAsset(offset).parse(dbEntry.getValue) + parsedData.foreach(v => r.addOne(v)) + } + + r.result() + } + } + + def save(action: AssetsActionForOffset): F[Unit] = + measureDb(cls, "save") { + levelDb.readWrite { rw => + val k = DbKeys.customFeeAsset(action.offset) + if (!rw.has(k)) + rw.put(k, Some(action)) + } + } + + } + +} diff --git a/dex/src/main/scala/com/wavesplatform/dex/db/DbKeys.scala b/dex/src/main/scala/com/wavesplatform/dex/db/DbKeys.scala index ebb4482225..714191634b 100644 --- a/dex/src/main/scala/com/wavesplatform/dex/db/DbKeys.scala +++ b/dex/src/main/scala/com/wavesplatform/dex/db/DbKeys.scala @@ -3,10 +3,11 @@ package com.wavesplatform.dex.db import java.nio.ByteBuffer import java.nio.charset.StandardCharsets import com.google.common.primitives.{Ints, Longs, Shorts} +import com.wavesplatform.dex.caches.OrderFeeSettingsCache.AssetsActionForOffset import com.wavesplatform.dex.db.leveldb.Key import com.wavesplatform.dex.domain.account.Address -import com.wavesplatform.dex.domain.asset.Asset.IssuedAsset -import com.wavesplatform.dex.domain.asset.AssetPair +import com.wavesplatform.dex.domain.asset.Asset.{AssetIdLength, IssuedAsset} +import com.wavesplatform.dex.domain.asset.{Asset, AssetPair} import com.wavesplatform.dex.domain.bytes.ByteStr import com.wavesplatform.dex.domain.order.Order import com.wavesplatform.dex.domain.transaction.ExchangeTransaction @@ -145,6 +146,39 @@ object DbKeys { } ) - private def encodeBoolean(value: Boolean): Byte = if (value) 1 else 0 + val FeeAssetsPrefix: Short = 28 + + def customFeeAsset(offset: Long): Key[Option[AssetsActionForOffset]] = + Key.opt( + "matcher-custom-fee-assets", + bytes(FeeAssetsPrefix, Longs.toByteArray(offset)), + bytes => { + val bb = ByteBuffer.wrap(bytes) + val assetsSize = bb.getInt + val assets = (1 to assetsSize).foldLeft(Set.empty[Asset]) { + case (acc, _) => + val byte = bb.get() + val asset = + if (byte == (1: Byte)) { + val assetIdByteArray = new Array[Byte](AssetIdLength) + bb.get(assetIdByteArray) + IssuedAsset(ByteStr.fromByteArray(assetIdByteArray)) + } else + Asset.Waves + acc + asset + } + val isAdded = bb.get == 1 + + AssetsActionForOffset(offset, assets, isAdded) + }, + x => { + val assetsSizeByte = Ints.toByteArray(x.assets.size) + val assetsBytes = x.assets.map(_.byteRepr).reduce(_ ++ _) + val isAdded = encodeBoolean(x.isAdded) + assetsSizeByte ++ assetsBytes ++ Array(isAdded) + } + ) + + private def encodeBoolean(value: Boolean): Byte = if (value) 1: Byte else 0: Byte } diff --git a/dex/src/main/scala/com/wavesplatform/dex/queue/ValidatedCommand.scala b/dex/src/main/scala/com/wavesplatform/dex/queue/ValidatedCommand.scala index c7eb0eca71..dd4e2b19bf 100644 --- a/dex/src/main/scala/com/wavesplatform/dex/queue/ValidatedCommand.scala +++ b/dex/src/main/scala/com/wavesplatform/dex/queue/ValidatedCommand.scala @@ -17,6 +17,10 @@ import kamon.context.Context import scala.util.hashing.MurmurHash3.productHash sealed trait ValidatedCommand extends Product with Serializable { + def maybeCtx: Option[Context] +} + +sealed trait OrderBookValidatedCommand extends ValidatedCommand { def assetPair: AssetPair def maybeCtx: Option[Context] } @@ -26,7 +30,7 @@ object ValidatedCommand { final case class PlaceOrder( limitOrder: LimitOrder, maybeCtx: Option[Context] = Some(Kamon.currentContext()) - ) extends ValidatedCommand { + ) extends OrderBookValidatedCommand { override def assetPair: AssetPair = limitOrder.order.assetPair override def toString: String = s"PlaceOrder(${limitOrder.order.idStr()})" override def hashCode(): Int = productHash(Tuple1(limitOrder)) @@ -42,7 +46,7 @@ object ValidatedCommand { final case class PlaceMarketOrder( marketOrder: MarketOrder, maybeCtx: Option[Context] = Some(Kamon.currentContext()) - ) extends ValidatedCommand { + ) extends OrderBookValidatedCommand { override def assetPair: AssetPair = marketOrder.order.assetPair override def toString: String = @@ -64,7 +68,7 @@ object ValidatedCommand { source: Source, maybeOwner: Option[Address], maybeCtx: Option[Context] = Some(Kamon.currentContext()) - ) extends ValidatedCommand { + ) extends OrderBookValidatedCommand { override def toString: String = s"CancelOrder($orderId, ${assetPair.key}, $source, $maybeOwner)" override def hashCode(): Int = productHash((assetPair, orderId, source, maybeOwner)) @@ -77,7 +81,8 @@ object ValidatedCommand { } - final case class DeleteOrderBook(assetPair: AssetPair, maybeCtx: Option[Context] = Some(Kamon.currentContext())) extends ValidatedCommand { + final case class DeleteOrderBook(assetPair: AssetPair, maybeCtx: Option[Context] = Some(Kamon.currentContext())) + extends OrderBookValidatedCommand { override def toString: String = s"DeleteOrderBook(${assetPair.key})" override def hashCode(): Int = productHash(Tuple1(assetPair)) @@ -89,7 +94,8 @@ object ValidatedCommand { } - final case class CancelAllOrders(assetPair: AssetPair, maybeCtx: Option[Context] = Some(Kamon.currentContext())) extends ValidatedCommand { + final case class CancelAllOrders(assetPair: AssetPair, maybeCtx: Option[Context] = Some(Kamon.currentContext())) + extends OrderBookValidatedCommand { override def toString: String = s"CancelAllOrders(${assetPair.key})" override def hashCode(): Int = productHash(Tuple1(assetPair)) @@ -101,6 +107,14 @@ object ValidatedCommand { } + final case class AddCustomAssetToFee(assets: Set[Asset]) extends ValidatedCommand { + override def maybeCtx: Option[Context] = None + } + + final case class DeleteCustomAssetToFee(assets: Set[Asset]) extends ValidatedCommand { + override def maybeCtx: Option[Context] = None + } + implicit final class ValidatedCommandOps(val self: ValidatedCommand) extends AnyVal { def assets: Set[Asset] = self match { @@ -109,6 +123,8 @@ object ValidatedCommand { case x: CancelOrder => x.assetPair.assets case x: DeleteOrderBook => x.assetPair.assets case x: CancelAllOrders => x.assetPair.assets + case x: AddCustomAssetToFee => x.assets + case x: DeleteCustomAssetToFee => x.assets } } diff --git a/dex/src/main/scala/com/wavesplatform/dex/queue/ValidatedCommandPbConversions.scala b/dex/src/main/scala/com/wavesplatform/dex/queue/ValidatedCommandPbConversions.scala index 11f5c2c513..efbbda0ffd 100644 --- a/dex/src/main/scala/com/wavesplatform/dex/queue/ValidatedCommandPbConversions.scala +++ b/dex/src/main/scala/com/wavesplatform/dex/queue/ValidatedCommandPbConversions.scala @@ -21,6 +21,8 @@ import com.wavesplatform.dex.protobuf.order.{ Order => PbOrder, PlaceLimitOrder => PbPlaceLimitOrder, PlaceMarketOrder => PbPlaceMarketOrder, + AddFeeCustomAssets => PbAddFeeCustomAssets, + DeleteFeeCustomAssets => PbDeleteFeeCustomAssets, ValidatedCommand => PbValidatedCommand } // format: on @@ -70,65 +72,94 @@ object ValidatedCommandPbConversions { val pbAssetPair = writeToPbAssetPair(assetPair) val cmd = PbValidatedCommand.Command.DeleteOrderBook(PbEmpty()) PbValidatedCommand(pbAssetPair.some, writeCtx(maybeCtx), cmd) + + case ValidatedCommand.AddCustomAssetToFee(assets) => + val cmd = PbValidatedCommand.Command.AddFeeCustomAssets(PbAddFeeCustomAssets(assets.map(writeToPbAsset).toSeq)) + PbValidatedCommand(assetPair = None, command = cmd) + + case ValidatedCommand.DeleteCustomAssetToFee(assets) => + val cmd = PbValidatedCommand.Command.DeleteFeeCustomAssets(PbDeleteFeeCustomAssets(assets.map(writeToPbAsset).toSeq)) + PbValidatedCommand(assetPair = None, command = cmd) } } def fromPb(pbValidatedCommand: PbValidatedCommand): Either[ValidationError, ValidatedCommand] = - pbValidatedCommand.assetPair.toRight(GenericError("Asset pair is empty")).map(readPbAssetPair).flatMap { assetPair => - val maybeCtx = Either.catchNonFatal(KamonTraceUtils.readCtx(pbValidatedCommand.kamonCtx.toVanilla)).toOption - - val pbCmd = pbValidatedCommand.command - if (pbCmd.isPlaceLimitOrder) - for { - pbPlaceLimitOrder <- pbCmd.placeLimitOrder.toRight(GenericError("Place limit order is empty")) - pbOrder <- pbPlaceLimitOrder.order.toRight(GenericError("Order is empty")) - order <- readPbOrder(pbOrder) - } yield ValidatedCommand.PlaceOrder( - limitOrder = LimitOrder( - o = order, - percentMinFee = pbPlaceLimitOrder.percentMinFee.map(_.value), - percentConstMinFee = pbPlaceLimitOrder.percentConstMinFee.map(_.value) - ), - maybeCtx = maybeCtx - ) - else if (pbCmd.isPlaceMarketOrder) - for { - pbPlaceMarketOrder <- pbCmd.placeMarketOrder.toRight(GenericError("Place market order is empty")) - pbOrder <- pbPlaceMarketOrder.order.toRight(GenericError("Order is empty")) - order <- readPbOrder(pbOrder) - } yield ValidatedCommand.PlaceMarketOrder( - marketOrder = MarketOrder( - o = order, - availableForSpending = pbPlaceMarketOrder.availableForSpending, - percentMinFee = pbPlaceMarketOrder.percentMinFee.map(_.value), - percentConstMinFee = pbPlaceMarketOrder.percentConstMinFee.map(_.value) - ), - maybeCtx = maybeCtx - ) - else if (pbCmd.isCancelOrder) - for { - pbCancelOrder <- pbCmd.cancelOrder.toRight(GenericError("Cancel order is empty")) - orderId = pbCancelOrder.orderId.toByteArray - source <- pbCancelOrder.source match { - case PbCancelOrder.Source.NOT_TRACKED => Right(Source.NotTracked) - case PbCancelOrder.Source.REQUEST => Right(Source.Request) - case PbCancelOrder.Source.EXPIRATION => Right(Source.Expiration) - case PbCancelOrder.Source.BALANCE_TRACKING => Right(Source.BalanceTracking) - case PbCancelOrder.Source.Unrecognized(v) => Left(GenericError(s"Unknown source type: $v")) - } - owner <- { - if (pbCancelOrder.owner.isEmpty) - Right(Option.empty) - else - Address.fromBytes(pbCancelOrder.owner.toVanilla).map(_.some) - } - } yield ValidatedCommand.CancelOrder(assetPair, orderId, source, owner, maybeCtx) - else if (pbCmd.isDeleteOrderBook) - Right(ValidatedCommand.DeleteOrderBook(assetPair, maybeCtx)) - else - Right(ValidatedCommand.CancelAllOrders(assetPair, maybeCtx)) + pbValidatedCommand.assetPair.map(readPbAssetPair) match { + case Some(assetPair) => + val maybeCtx = Either.catchNonFatal(KamonTraceUtils.readCtx(pbValidatedCommand.kamonCtx.toVanilla)).toOption + tryParseCommandWithPair(pbValidatedCommand, assetPair, maybeCtx) + case None => + tryParseCommand(pbValidatedCommand) } + private def tryParseCommand(pbValidatedCommand: PbValidatedCommand): Either[GenericError, ValidatedCommand] = { + val pbCmd = pbValidatedCommand.command + pbCmd.addFeeCustomAssets + .map(cmd => ValidatedCommand.AddCustomAssetToFee(cmd.assetId.map(_.toVanillaAsset).toSet)) + .orElse(pbCmd.deleteFeeCustomAssets + .map(cmd => ValidatedCommand.DeleteCustomAssetToFee(cmd.assetId.map(_.toVanillaAsset).toSet))).toRight(GenericError( + "Command without pair must be either AddFeeCustomAssets or DeleteFeeCustomAssets" + )) + } + + private def tryParseCommandWithPair( + pbValidatedCommand: PbValidatedCommand, + assetPair: AssetPair, + maybeCtx: Option[Context] + ): Either[ValidationError, OrderBookValidatedCommand] = { + val pbCmd = pbValidatedCommand.command + if (pbCmd.isPlaceLimitOrder) + for { + pbPlaceLimitOrder <- pbCmd.placeLimitOrder.toRight(GenericError("Place limit order is empty")) + pbOrder <- pbPlaceLimitOrder.order.toRight(GenericError("Order is empty")) + order <- readPbOrder(pbOrder) + } yield ValidatedCommand.PlaceOrder( + limitOrder = LimitOrder( + o = order, + percentMinFee = pbPlaceLimitOrder.percentMinFee.map(_.value), + percentConstMinFee = pbPlaceLimitOrder.percentConstMinFee.map(_.value) + ), + maybeCtx = maybeCtx + ) + else if (pbCmd.isPlaceMarketOrder) + for { + pbPlaceMarketOrder <- pbCmd.placeMarketOrder.toRight(GenericError("Place market order is empty")) + pbOrder <- pbPlaceMarketOrder.order.toRight(GenericError("Order is empty")) + order <- readPbOrder(pbOrder) + } yield ValidatedCommand.PlaceMarketOrder( + marketOrder = MarketOrder( + o = order, + availableForSpending = pbPlaceMarketOrder.availableForSpending, + percentMinFee = pbPlaceMarketOrder.percentMinFee.map(_.value), + percentConstMinFee = pbPlaceMarketOrder.percentConstMinFee.map(_.value) + ), + maybeCtx = maybeCtx + ) + else if (pbCmd.isCancelOrder) + for { + pbCancelOrder <- pbCmd.cancelOrder.toRight(GenericError("Cancel order is empty")) + orderId = pbCancelOrder.orderId.toByteArray + source <- pbCancelOrder.source match { + case PbCancelOrder.Source.NOT_TRACKED => Right(Source.NotTracked) + case PbCancelOrder.Source.REQUEST => Right(Source.Request) + case PbCancelOrder.Source.EXPIRATION => Right(Source.Expiration) + case PbCancelOrder.Source.BALANCE_TRACKING => Right(Source.BalanceTracking) + case PbCancelOrder.Source.Unrecognized(v) => Left(GenericError(s"Unknown source type: $v")) + } + owner <- { + if (pbCancelOrder.owner.isEmpty) + Right(Option.empty) + else + Address.fromBytes(pbCancelOrder.owner.toVanilla).map(_.some) + } + } yield ValidatedCommand.CancelOrder(assetPair, orderId, source, owner, maybeCtx) + else if (pbCmd.isDeleteOrderBook) + Right(ValidatedCommand.DeleteOrderBook(assetPair, maybeCtx)) + else if (pbCmd.isCancelAllOrders) + Right(ValidatedCommand.CancelAllOrders(assetPair, maybeCtx)) + else Left(GenericError(s"Command with asset pair $assetPair, but not one of them")) + } + private def writeToPbOrder(order: Order): PbOrder = PbOrder( chainId = AddressScheme.current.chainId.toInt, diff --git a/dex/src/main/scala/com/wavesplatform/dex/settings/OrderFeeSettings.scala b/dex/src/main/scala/com/wavesplatform/dex/settings/OrderFeeSettings.scala index 1a73ef0b0e..06cb469cf3 100644 --- a/dex/src/main/scala/com/wavesplatform/dex/settings/OrderFeeSettings.scala +++ b/dex/src/main/scala/com/wavesplatform/dex/settings/OrderFeeSettings.scala @@ -14,7 +14,11 @@ import pureconfig.configurable.genericMapReader import pureconfig.error.CannotConvert import pureconfig.generic.semiauto -sealed trait OrderFeeSettings extends Product with Serializable +sealed trait OrderFeeSettings extends Product with Serializable { + + def withDynamicCustomAssets(assets: Set[Asset]): OrderFeeSettings = this + +} object OrderFeeSettings { @@ -86,21 +90,27 @@ object OrderFeeSettings { final case class CompositeSettings( default: OrderFeeSettings, - custom: Map[AssetPair, OrderFeeSettings] = Map.empty, + dynamicCustomAssets: Boolean = false, + customPairs: Map[AssetPair, OrderFeeSettings] = Map.empty, customAssets: Option[CompositeSettings.CustomAssetsSettings] = None, discount: Option[CompositeSettings.DiscountAssetSettings] = None, zeroFeeAccounts: Set[PublicKey] = Set.empty ) extends OrderFeeSettings { + override def withDynamicCustomAssets(assets: Set[Asset]): OrderFeeSettings = + if (dynamicCustomAssets) + copy(customAssets = customAssets.map(_.copy(assets = assets))) + else this + def getOrderFeeSettings(assetPair: AssetPair): OrderFeeSettings = - custom.get(assetPair).orElse { + customPairs.get(assetPair).orElse { customAssets.flatMap(_.getSettings(assetPair)) }.getOrElse(default) def getAllCustomFeeSettings: Map[AssetPair, OrderFeeSettings] = customAssets.fold(Map.empty[AssetPair, OrderFeeSettings])( _.settingsMap - ) ++ custom // "custom" has higher priority than "customAssets" + ) ++ customPairs // "customPairs" has higher priority than "customAssets" } @@ -119,6 +129,8 @@ object OrderFeeSettings { def getSettings(assetPair: AssetPair): Option[OrderFeeSettings] = settingsMap.get(assetPair) + override def toString: String = s"CustomAssetsSettings(a=$assets, settings=$settings, customPairs=$customPairs)" + } object CustomAssetsSettings { @@ -169,19 +181,21 @@ object OrderFeeSettings { type PartialCompositeSettings = AssetPairQuickValidator => CompositeSettings - implicit val partialCompositeConfigReader: ConfigReader[PartialCompositeSettings] = ConfigReader.forProduct5[ + implicit val partialCompositeConfigReader: ConfigReader[PartialCompositeSettings] = ConfigReader.forProduct6[ PartialCompositeSettings, OrderFeeSettings, + Option[Boolean], Option[Map[AssetPair, OrderFeeSettings]], Option[PartialCustomAssetSettings], Option[CompositeSettings.DiscountAssetSettings], Option[Set[PublicKey]] - ]("default", "custom", "custom-assets", "discount", "zero-fee-accounts") { - case (default, custom, customAssets, discount, zeroFeeAccounts) => + ]("default", "dynamic-custom-assets", "custom-pairs", "custom-assets", "discount", "zero-fee-accounts") { + case (default, dynamicCustomAssets, customPairs, customAssets, discount, zeroFeeAccounts) => pairValidator: AssetPairQuickValidator => CompositeSettings( default, - custom.getOrElse(Map.empty), + dynamicCustomAssets.getOrElse(false), + customPairs.getOrElse(Map.empty), customAssets.map(_(pairValidator)), discount, zeroFeeAccounts.getOrElse(Set.empty) diff --git a/dex/src/main/scala/com/wavesplatform/dex/tool/ConfigChecker.scala b/dex/src/main/scala/com/wavesplatform/dex/tool/ConfigChecker.scala index 7a422ef1c3..bda26a9cce 100644 --- a/dex/src/main/scala/com/wavesplatform/dex/tool/ConfigChecker.scala +++ b/dex/src/main/scala/com/wavesplatform/dex/tool/ConfigChecker.scala @@ -186,7 +186,8 @@ sealed trait ConfigWriters { ConfigValueFactory.fromMap( Map( "default" -> orderFeeWriter.to(settings.default), - "custom" -> genericMapWriter[AssetPair, OrderFeeSettings](assetPairToString).to(settings.custom), + "dynamic-custom-assets" -> settings.dynamicCustomAssets, + "custom-pairs" -> genericMapWriter[AssetPair, OrderFeeSettings](assetPairToString).to(settings.customPairs), "custom-assets" -> implicitly[ConfigWriter[Option[CompositeSettings.CustomAssetsSettings]]].to(settings.customAssets), "discount" -> implicitly[ConfigWriter[Option[CompositeSettings.DiscountAssetSettings]]].to(settings.discount), "zero-fee-accounts" -> implicitly[ConfigWriter[Set[PublicKey]]].to(settings.zeroFeeAccounts) diff --git a/dex/src/test/scala/com/wavesplatform/dex/MatcherSpecBase.scala b/dex/src/test/scala/com/wavesplatform/dex/MatcherSpecBase.scala index 67f13ba189..354b4b7627 100644 --- a/dex/src/test/scala/com/wavesplatform/dex/MatcherSpecBase.scala +++ b/dex/src/test/scala/com/wavesplatform/dex/MatcherSpecBase.scala @@ -3,6 +3,7 @@ package com.wavesplatform.dex import com.google.common.base.Charsets import com.google.common.primitives.{Bytes, Ints} import com.softwaremill.diffx.{Derived, Diff} +import com.wavesplatform.dex.actors.OrderBookDirectoryActor.ApplyValidatedCommandWithPair import com.wavesplatform.dex.api.ws.protocol.WsError import com.wavesplatform.dex.api.ws.entities.WsMatchTransactionInfo import com.wavesplatform.dex.asset.DoubleOps @@ -23,7 +24,7 @@ import com.wavesplatform.dex.grpc.integration.dto.BriefAssetDescription import com.wavesplatform.dex.model.Events.{OrderCanceled, OrderExecuted} import com.wavesplatform.dex.model.{BuyLimitOrder, LimitOrder, OrderValidator, SellLimitOrder, _} import com.wavesplatform.dex.queue.ValidatedCommand.{CancelOrder, DeleteOrderBook, PlaceMarketOrder, PlaceOrder} -import com.wavesplatform.dex.queue.{ValidatedCommand, ValidatedCommandWithMeta} +import com.wavesplatform.dex.queue.{OrderBookValidatedCommand, ValidatedCommand} import com.wavesplatform.dex.settings.OrderFeeSettings._ import com.wavesplatform.dex.settings.{loadConfig, AssetType, MatcherSettings, OrderFeeSettings} import com.wavesplatform.dex.test.matchers.DiffMatcherWithImplicits @@ -116,18 +117,18 @@ trait MatcherSpecBase protected def toNormalized(value: Long): Long = value * Order.PriceConstant - protected def wrapCommand(n: Long, command: ValidatedCommand): ValidatedCommandWithMeta = - ValidatedCommandWithMeta(n, System.currentTimeMillis(), command) + protected def wrapCommand(n: Long, command: OrderBookValidatedCommand): ApplyValidatedCommandWithPair = + ApplyValidatedCommandWithPair(n, System.currentTimeMillis(), command) - protected def wrapCommand(command: ValidatedCommand): ValidatedCommandWithMeta = - ValidatedCommandWithMeta(seqNr.incrementAndGet(), System.currentTimeMillis(), command) + protected def wrapCommand(command: OrderBookValidatedCommand): ApplyValidatedCommandWithPair = + ApplyValidatedCommandWithPair(seqNr.incrementAndGet(), System.currentTimeMillis(), command) - protected def wrapLimitOrder(x: Order): ValidatedCommandWithMeta = wrapLimitOrder(seqNr.incrementAndGet(), x) + protected def wrapLimitOrder(x: Order): ApplyValidatedCommandWithPair = wrapLimitOrder(seqNr.incrementAndGet(), x) - protected def wrapLimitOrder(n: Long, x: Order): ValidatedCommandWithMeta = + protected def wrapLimitOrder(n: Long, x: Order): ApplyValidatedCommandWithPair = wrapCommand(n, ValidatedCommand.PlaceOrder(LimitOrder(x, None, None))) - protected def wrapMarketOrder(mo: MarketOrder): ValidatedCommandWithMeta = + protected def wrapMarketOrder(mo: MarketOrder): ApplyValidatedCommandWithPair = wrapCommand(ValidatedCommand.PlaceMarketOrder(mo)) protected def getSpentAmountWithFee(order: Order): Long = { diff --git a/dex/src/test/scala/com/wavesplatform/dex/actors/OrderBookDirectoryActorSpecification.scala b/dex/src/test/scala/com/wavesplatform/dex/actors/OrderBookDirectoryActorSpecification.scala index a9520fa814..b97350c9b4 100644 --- a/dex/src/test/scala/com/wavesplatform/dex/actors/OrderBookDirectoryActorSpecification.scala +++ b/dex/src/test/scala/com/wavesplatform/dex/actors/OrderBookDirectoryActorSpecification.scala @@ -7,7 +7,7 @@ import cats.Id import cats.data.NonEmptyList import cats.implicits.catsSyntaxEitherId import com.wavesplatform.dex.MatcherSpecBase -import com.wavesplatform.dex.actors.OrderBookDirectoryActor.{GetMarkets, MarketData, SaveSnapshot} +import com.wavesplatform.dex.actors.OrderBookDirectoryActor.{ApplyValidatedCommandWithPair, GetMarkets, MarketData, SaveSnapshot} import com.wavesplatform.dex.actors.OrderBookDirectoryActorSpecification.{DeletingActor, FailAtStartActor, NothingDoActor, _} import com.wavesplatform.dex.actors.orderbook.OrderBookActor.{OrderBookRecovered, OrderBookSnapshotUpdateCompleted} import com.wavesplatform.dex.actors.orderbook.OrderBookSnapshotStoreActor.{Message, Response} @@ -242,7 +242,7 @@ class OrderBookDirectoryActorSpecification ) val probe = TestProbe() - probe.send(actor, ValidatedCommandWithMeta(10L, 0L, ValidatedCommand.DeleteOrderBook(pair))) + probe.send(actor, ApplyValidatedCommandWithPair(10L, 0L, ValidatedCommand.DeleteOrderBook(pair))) eventually { probe.send(actor, GetMarkets) @@ -367,7 +367,7 @@ class OrderBookDirectoryActorSpecification ) val probe = TestProbe() - probe.send(actor, ValidatedCommandWithMeta(10L, 0L, ValidatedCommand.DeleteOrderBook(pair))) + probe.send(actor, ApplyValidatedCommandWithPair(10L, 0L, ValidatedCommand.DeleteOrderBook(pair))) withClue("Removed from snapshots rotation") { eventually { @@ -437,7 +437,7 @@ class OrderBookDirectoryActorSpecification private var nr = -1L override def receive: Receive = { - case x: ValidatedCommandWithMeta if x.offset > nr => nr = x.offset + case x: ApplyValidatedCommandWithPair if x.offset > nr => nr = x.offset case SaveSnapshot(globalNr) => val event = OrderBookSnapshotUpdateCompleted(assetPair, Some(globalNr)) context.system.scheduler.scheduleOnce(200.millis) { @@ -573,7 +573,7 @@ object OrderBookDirectoryActorSpecification { private class DeletingActor(owner: ActorRef, assetPair: AssetPair, startOffset: Option[Long] = None) extends RecoveringActor(owner, assetPair, startOffset) { override def receive: Receive = handleDelete orElse super.receive - private def handleDelete: Receive = { case ValidatedCommandWithMeta(_, _, _: ValidatedCommand.DeleteOrderBook) => context.stop(self) } + private def handleDelete: Receive = { case ApplyValidatedCommandWithPair(_, _, _: ValidatedCommand.DeleteOrderBook) => context.stop(self) } } private def emptySnapshotStoreActor(implicit actorSystem: ActorSystem): ActorRef = { diff --git a/dex/src/test/scala/com/wavesplatform/dex/api/http/routes/MatcherApiRouteSpec.scala b/dex/src/test/scala/com/wavesplatform/dex/api/http/routes/MatcherApiRouteSpec.scala index 279040b125..28aed76efd 100644 --- a/dex/src/test/scala/com/wavesplatform/dex/api/http/routes/MatcherApiRouteSpec.scala +++ b/dex/src/test/scala/com/wavesplatform/dex/api/http/routes/MatcherApiRouteSpec.scala @@ -30,7 +30,8 @@ import com.wavesplatform.dex.api.http.routes.v0._ import com.wavesplatform.dex.api.http.{entities, OrderBookHttpInfo} import com.wavesplatform.dex.api.ws.actors.WsExternalClientDirectoryActor import com.wavesplatform.dex.app.MatcherStatus -import com.wavesplatform.dex.caches.RateCache +import com.wavesplatform.dex.caches.OrderFeeSettingsCache.CustomAssetFeeState +import com.wavesplatform.dex.caches.{OrderFeeSettingsCache, RateCache} import com.wavesplatform.dex.db._ import com.wavesplatform.dex.domain.account.{Address, AddressScheme, KeyPair, PublicKey} import com.wavesplatform.dex.domain.asset.Asset.{IssuedAsset, Waves} @@ -54,6 +55,7 @@ import com.wavesplatform.dex.model.{LimitOrder, OrderInfo, OrderStatus, _} import com.wavesplatform.dex.queue.{ValidatedCommand, ValidatedCommandWithMeta} import com.wavesplatform.dex.settings.OrderFeeSettings.{CompositeSettings, DynamicSettings, PercentSettings} import com.wavesplatform.dex.settings.{AssetType, MatcherSettings, OrderFeeSettings, OrderRestrictionsSettings} +import monix.execution.atomic.AtomicLong import org.scalamock.scalatest.PathMockFactory import org.scalatest.concurrent.Eventually import play.api.libs.json.{JsArray, JsString, Json, JsonFacade => _} @@ -63,6 +65,7 @@ import java.util.concurrent.ThreadLocalRandom import java.util.concurrent.atomic.AtomicReference import scala.concurrent.Future import scala.concurrent.duration.DurationInt +import scala.util.chaining._ import scala.util.Random class MatcherApiRouteSpec extends RouteSpec("/matcher") with MatcherSpecBase with PathMockFactory with Eventually with WithDb { @@ -75,6 +78,10 @@ class MatcherApiRouteSpec extends RouteSpec("/matcher") with MatcherSpecBase wit apiKeys(Random.nextInt(apiKeys.length)) ) + private val asset1: Asset = Asset.IssuedAsset(ByteStr.decodeBase58("DWgwcZTMhSvnyYCoWLRUXXSH1RSkzThXLJhww9gwkqdn").get) + private val asset2: Asset = Asset.IssuedAsset(ByteStr.decodeBase58("2GBgdhqMjUPqreqPziXvZFSmDiQVrxNuGxR1z7ZVsm4Z").get) + private val asset3: Asset = Asset.IssuedAsset(ByteStr.decodeBase58("Euz5HtYcj3nVTZxppA7wdabwTe5BzHFiu4QG1EJtzeUx").get) + private val matcherKeyPair = KeyPair("matcher".getBytes("utf-8")) private val smartAsset = arbitraryIssuedAssetGen.sample.get @@ -147,7 +154,7 @@ class MatcherApiRouteSpec extends RouteSpec("/matcher") with MatcherSpecBase wit private val simpleCompositeSettings = CompositeSettings( default = DynamicSettings(baseMakerFee = 350000, baseTakerFee = 350000), - custom = Map( + customPairs = Map( assetPair1 -> PercentSettings(AssetType.Amount, minFee = 0.01, minFeeInWaves = 1000), assetPair2 -> PercentSettings(AssetType.Amount, minFee = 0.02, minFeeInWaves = 1000) ) @@ -155,7 +162,7 @@ class MatcherApiRouteSpec extends RouteSpec("/matcher") with MatcherSpecBase wit private val complexCompositeSettings = CompositeSettings( default = DynamicSettings(baseMakerFee = 350000, baseTakerFee = 350000), - custom = Map( + customPairs = Map( assetPair1 -> PercentSettings(AssetType.Amount, minFee = 0.01, minFeeInWaves = 1000), assetPair2 -> PercentSettings(AssetType.Amount, minFee = 0.02, minFeeInWaves = 1000) ), @@ -350,6 +357,152 @@ class MatcherApiRouteSpec extends RouteSpec("/matcher") with MatcherSpecBase wit ) } + routePath("/matcher/settings/custom-fee-assets") - { + + "X-Api-Key is required for adding" in test( + route => + Post( + routePath("/settings/custom-fee-assets"), + HttpCustomFeeAssets(Set(asset1, asset2)) + ) ~> route ~> check { + status shouldEqual StatusCodes.Forbidden + }, + apiKeys = apiKeys + ) + + "X-Api-Key is required for deleting" in test( + route => + Delete( + routePath("/settings/custom-fee-assets"), + HttpCustomFeeAssets(Set(asset1, asset2)) + ) ~> route ~> check { + status shouldEqual StatusCodes.Forbidden + }, + apiKeys = apiKeys + ) + + "return OK when trying to new add assets" in test( + route => + Post( + routePath("/settings/custom-fee-assets"), + HttpCustomFeeAssets(Set(asset1, asset2)) + ).withHeaders(apiKeyHeader()) ~> route ~> check { + status shouldEqual StatusCodes.OK + val resp = responseAs[HttpMessage] + resp.message.contains("Successfully saved command") shouldBe true + resp.message.contains("addCustomFeeAssets") shouldBe true + resp.message.contains(asset1.toString) shouldBe true + resp.message.contains(asset2.toString) shouldBe true + }, + apiKeys = apiKeys + ) + + "return only new added assets" in test( + route => + Post( + routePath("/settings/custom-fee-assets"), + HttpCustomFeeAssets(Set(asset2, asset3)) + ).withHeaders(apiKeyHeader()) ~> route ~> check { + status shouldEqual StatusCodes.OK + val resp = responseAs[HttpMessage] + resp.message.contains("Successfully saved command") shouldBe true + resp.message.contains("addCustomFeeAssets") shouldBe true + resp.message.contains(asset3.toString) shouldBe true + resp.message.contains(asset2.toString) shouldBe false + resp.message.contains(asset1.toString) shouldBe false + }, + customAssetsFeeLastState = Set(asset1, asset2), + apiKeys = apiKeys + ) + + "return special message if no assets were added" in test( + route => + Post( + routePath("/settings/custom-fee-assets"), + HttpCustomFeeAssets(Set(asset1, asset2)) + ).withHeaders(apiKeyHeader()) ~> route ~> check { + status shouldEqual StatusCodes.OK + val resp = responseAs[HttpMessage] + resp.message.contains("There is no assets to do") shouldBe true + resp.message.contains("addCustomFeeAssets") shouldBe true + resp.message.contains(asset3.toString) shouldBe false + resp.message.contains(asset2.toString) shouldBe false + resp.message.contains(asset1.toString) shouldBe false + }, + customAssetsFeeLastState = Set(asset1, asset2), + apiKeys = apiKeys + ) + + "return special message if assets are empty" in test( + route => + Post( + routePath("/settings/custom-fee-assets"), + HttpCustomFeeAssets(Set.empty) + ).withHeaders(apiKeyHeader()) ~> route ~> check { + status shouldEqual StatusCodes.OK + val resp = responseAs[HttpMessage] + resp.message.contains("There is no assets to do") shouldBe true + resp.message.contains("addCustomFeeAssets") shouldBe true + resp.message.contains(asset3.toString) shouldBe false + resp.message.contains(asset2.toString) shouldBe false + resp.message.contains(asset1.toString) shouldBe false + }, + apiKeys = apiKeys + ) + + "return message when trying to delete not existing assets" in test( + route => + Delete( + routePath("/settings/custom-fee-assets"), + HttpCustomFeeAssets(Set(asset1)) + ).withHeaders(apiKeyHeader()) ~> route ~> check { + status shouldEqual StatusCodes.OK + val resp = responseAs[HttpMessage] + resp.message.contains("There is no assets to do") shouldBe true + resp.message.contains("deleteCustomFeeAssets") shouldBe true + resp.message.contains(asset1.toString) shouldBe false + resp.message.contains(asset2.toString) shouldBe false + resp.message.contains(asset3.toString) shouldBe false + }, + apiKeys = apiKeys + ) + + "return message when trying to delete empty assets" in test( + route => + Delete( + routePath("/settings/custom-fee-assets"), + HttpCustomFeeAssets(Set.empty) + ).withHeaders(apiKeyHeader()) ~> route ~> check { + status shouldEqual StatusCodes.OK + val resp = responseAs[HttpMessage] + resp.message.contains("There is no assets to do") shouldBe true + resp.message.contains("deleteCustomFeeAssets") shouldBe true + resp.message.contains(asset1.toString) shouldBe false + resp.message.contains(asset2.toString) shouldBe false + resp.message.contains(asset3.toString) shouldBe false + }, + apiKeys = apiKeys + ) + + "return assets that were deleted for success remove" in test( + route => + Delete( + routePath("/settings/custom-fee-assets"), + HttpCustomFeeAssets(Set(asset1, asset3)) + ).withHeaders(apiKeyHeader()) ~> route ~> check { + status shouldEqual StatusCodes.OK + val resp = responseAs[HttpMessage] + resp.message.contains("Successfully saved command") shouldBe true + resp.message.contains("deleteCustomFeeAssets") shouldBe true + resp.message.contains(asset3.toString) shouldBe true + resp.message.contains(asset2.toString) shouldBe false + resp.message.contains(asset1.toString) shouldBe true + }, + customAssetsFeeLastState = Set(asset1, asset2, asset3), + apiKeys = apiKeys + ) + } + // getAssetRates routePath("/settings/rates") - { "returns available asset rates for fee" in test { route => @@ -1342,7 +1495,8 @@ class MatcherApiRouteSpec extends RouteSpec("/matcher") with MatcherSpecBase wit f: Route => U, apiKeys: List[String] = List.empty, maybeRateCache: Option[RateCache] = None, - feeSettings: OrderFeeSettings = DynamicSettings.symmetric(matcherFee) + feeSettings: OrderFeeSettings = DynamicSettings.symmetric(matcherFee), + customAssetsFeeLastState: Set[Asset] = Set.empty ): U = { val rateCache = maybeRateCache.getOrElse(RateCache(TestRateDb()).futureValue) @@ -1352,129 +1506,11 @@ class MatcherApiRouteSpec extends RouteSpec("/matcher") with MatcherSpecBase wit val apdb = AssetPairsDb.levelDb(asyncLevelDb) apdb.add(blackListedOrder.assetPair) - val addressActor = TestProbe("address") - addressActor.setAutoPilot { (sender: ActorRef, msg: Any) => - val response = msg match { - case AddressDirectoryActor.Command.ForwardMessage(forwardAddress, msg) => - msg match { - case AddressActor.Query.GetReservedBalance => AddressActor.Reply.GetBalance(Map(Waves -> 350L)) - case PlaceOrder(x, _) => - if (x.order.id() == okOrder.id()) AddressActor.Event.OrderAccepted(x.order) else error.OrderDuplicate(x.order.id()) - - case AddressActor.Query.GetOrdersStatuses(_, _) => - AddressActor.Reply.GetOrderStatuses(List(okOrder.id() -> OrderInfo.v5(LimitOrder(okOrder, None, None), OrderStatus.Accepted))) - - case AddressActor.Query.GetOrderStatus(orderId) => - if (orderId == okOrder.id()) AddressActor.Reply.GetOrderStatus(OrderStatus.Accepted) - else Status.Failure(new RuntimeException(s"Unknown order $orderId")) - - case AddressActor.Command.CancelOrder(orderId, Source.Request) => - def handleCancelOrder() = - if (orderId == okOrder.id() || orderId == orderToCancel.id()) AddressActor.Event.OrderCanceled(orderId) - else error.OrderNotFound(orderId) - - odb.get(orderId).futureValue match { - case None => handleCancelOrder() - case Some(order) if order.sender.toAddress == forwardAddress => handleCancelOrder() - case _ => error.OrderNotFound(orderId) - } - - case x @ AddressActor.Command.CancelAllOrders(pair, _, Source.Request) => - if (pair.contains(badOrder.assetPair)) error.AddressIsBlacklisted(badOrder.sender) - else if (pair.forall(_ == okOrder.assetPair)) - AddressActor.Event.BatchCancelCompleted( - Map( - okOrder.id() -> Right(AddressActor.Event.OrderCanceled(okOrder.id())), - badOrder.id() -> Left(error.CanNotPersistEvent) - ) - ) - else Status.Failure(new RuntimeException(s"Can't handle $x")) - - case AddressActor.Command.CancelOrders(ids, Source.Request) => - AddressActor.Event.BatchCancelCompleted( - ids.map { id => - id -> (if (id == orderToCancel.id()) Right(AddressActor.Event.OrderCanceled(okOrder.id())) else Left(error.CanNotPersistEvent)) - }.toMap - ) - - case GetTradableBalance(xs) => AddressActor.Reply.GetBalance(xs.map(_ -> 100L).toMap) - - case _: AddressActor.Query.GetOrderStatusInfo => - AddressActor.Reply.GetOrdersStatusInfo(OrderInfo.v5(LimitOrder(orderToCancel, None, None), OrderStatus.Accepted).some) + val addressActor = mkAddressActorTestProbe(odb) - case x => Status.Failure(new RuntimeException(s"Unknown command: $x")) - } + val orderBookDirectoryActor = mkOrderBookDirectoryActorTestProbe() - case x => Status.Failure(new RuntimeException(s"Unknown message: $x")) - } - - sender ! response - TestActor.KeepRunning - } - - val orderBookDirectoryActor = TestProbe("matcher") - orderBookDirectoryActor.setAutoPilot { (sender: ActorRef, msg: Any) => - msg match { - case GetSnapshotOffsets => - sender ! SnapshotOffsetsResponse( - Map( - AssetPair(Waves, priceAsset) -> Some(100L), - smartWavesPair -> Some(120L), - AssetPair(smartAsset, priceAsset) -> None - ) - ) - - case GetMarkets => - sender ! List( - MarketData( - pair = okOrder.assetPair, - amountAssetName = amountAssetDesc.name, - priceAssetName = priceAssetDesc.name, - created = System.currentTimeMillis(), - amountAssetInfo = Some(AssetInfo(amountAssetDesc.decimals)), - priceAssetInfo = Some(AssetInfo(priceAssetDesc.decimals)) - ) - ) - case _ => - } - - TestActor.KeepRunning - } - - val orderBookActor = TestProbe("orderBook") - orderBookActor.setAutoPilot { (sender: ActorRef, msg: Any) => - msg match { - case request: AggregatedOrderBookActor.Query.GetHttpView => - val assetPairDecimals = request.format match { - case Denormalized => Some(smartAssetDesc.decimals -> 8) - case _ => None - } - - val entity = - HttpOrderBook( - 0L, - smartWavesPair, - smartWavesAggregatedSnapshot.bids, - smartWavesAggregatedSnapshot.asks, - assetPairDecimals - ) - - val httpResponse = - HttpResponse( - entity = HttpEntity( - ContentTypes.`application/json`, - HttpOrderBook.toJson(entity) - ) - ) - - request.client ! httpResponse - - case request: AggregatedOrderBookActor.Query.GetMarketStatus => request.client ! smartWavesMarketStatus - case _ => - } - - TestActor.KeepRunning - } + val orderBookActor = mkOrderBookActorTestProbe() val exchangeTxStorage = ExchangeTxStorage.levelDB(asyncLevelDb) exchangeTxStorage.put( @@ -1508,30 +1544,7 @@ class MatcherApiRouteSpec extends RouteSpec("/matcher") with MatcherSpecBase wit else liftFutureAsync(Future.failed(new IllegalArgumentException(s"No information about $x"))) ) - val blacklistedAssets = - Set(blackListedOrder.assetPair.amountAsset, blackListedOrder.assetPair.priceAsset).foldLeft(Set.empty[IssuedAsset]) { (acc, elem) => - elem match { - case asset: IssuedAsset => acc + asset - case Asset.Waves => acc - } - } - val blacklistedPriceAsset = blackListedOrder.assetPair.priceAsset match { - case priceAsset: IssuedAsset => Some(priceAsset) - case Asset.Waves => None - } - val pairBuilder = new AssetPairBuilder( - settings, - { - case `smartAsset` => liftValueAsync[BriefAssetDescription](smartAssetDesc) - case x - if x == okOrder.assetPair.amountAsset || x == badOrder.assetPair.amountAsset || x == unknownAsset || x == blackListedOrder.assetPair.amountAsset => - liftValueAsync[BriefAssetDescription](amountAssetDesc) - case x if x == okOrder.assetPair.priceAsset || x == badOrder.assetPair.priceAsset || blacklistedPriceAsset.contains(x) => - liftValueAsync[BriefAssetDescription](priceAssetDesc) - case x => liftErrorAsync[BriefAssetDescription](error.AssetNotFound(x)) - }, - blacklistedAssets - ) + val pairBuilder = mkAssetPairBuilder() val placeRoute = new PlaceRoute( settings.actorResponseTimeout, @@ -1614,6 +1627,9 @@ class MatcherApiRouteSpec extends RouteSpec("/matcher") with MatcherSpecBase wit () => MatcherStatus.Working, apiKeys map crypto.secureHash ) + + val atomicLong = AtomicLong(0L) + val feeSettingsCache = new OrderFeeSettingsCache(Map(0L -> feeSettings), CustomAssetFeeState(Map(0L -> customAssetsFeeLastState))) val infoRoute = new MatcherInfoRoute( matcherKeyPair.publicKey, settings, @@ -1622,10 +1638,19 @@ class MatcherApiRouteSpec extends RouteSpec("/matcher") with MatcherSpecBase wit apiKeys map crypto.secureHash, rateCache, () => Future.successful(Set(1, 2, 3)), - () => feeSettings, + () => feeSettingsCache.getSettingsForOffset(1L), () => -1L ) + val feeAssetsRoute = new CustomAssetsFeeRoute( + apiKeys map crypto.secureHash, + feeSettingsCache, + cmd => { + val offset = atomicLong.incrementAndGet() + Future.successful(Some(ValidatedCommandWithMeta(offset = offset, timestamp = System.currentTimeMillis(), cmd))) + } + ) + val routes = Seq( infoRoute.route, ratesRoute.route, @@ -1635,10 +1660,170 @@ class MatcherApiRouteSpec extends RouteSpec("/matcher") with MatcherSpecBase wit placeRoute.route, cancelRoute.route, balancesRoute.route, - transactionsRoute.route + transactionsRoute.route, + feeAssetsRoute.route ) f(concat(routes: _*)) } + private def mkAssetPairBuilder() = { + val blacklistedAssets = + Set(blackListedOrder.assetPair.amountAsset, blackListedOrder.assetPair.priceAsset).foldLeft(Set.empty[IssuedAsset]) { (acc, elem) => + elem match { + case asset: IssuedAsset => acc + asset + case Asset.Waves => acc + } + } + val blacklistedPriceAsset = blackListedOrder.assetPair.priceAsset match { + case priceAsset: IssuedAsset => Some(priceAsset) + case Asset.Waves => None + } + + new AssetPairBuilder( + settings, + { + case `smartAsset` => liftValueAsync[BriefAssetDescription](smartAssetDesc) + case x + if x == okOrder.assetPair.amountAsset || x == badOrder.assetPair.amountAsset || x == unknownAsset || x == blackListedOrder.assetPair.amountAsset => + liftValueAsync[BriefAssetDescription](amountAssetDesc) + case x if x == okOrder.assetPair.priceAsset || x == badOrder.assetPair.priceAsset || blacklistedPriceAsset.contains(x) => + liftValueAsync[BriefAssetDescription](priceAssetDesc) + case x => liftErrorAsync[BriefAssetDescription](error.AssetNotFound(x)) + }, + blacklistedAssets + ) + } + + private def mkOrderBookActorTestProbe() = + TestProbe("orderBook").tap { + _.setAutoPilot { (_: ActorRef, msg: Any) => + msg match { + case request: AggregatedOrderBookActor.Query.GetHttpView => + val assetPairDecimals = request.format match { + case Denormalized => Some(smartAssetDesc.decimals -> 8) + case _ => None + } + + val entity = + HttpOrderBook( + 0L, + smartWavesPair, + smartWavesAggregatedSnapshot.bids, + smartWavesAggregatedSnapshot.asks, + assetPairDecimals + ) + + val httpResponse = + HttpResponse( + entity = HttpEntity( + ContentTypes.`application/json`, + HttpOrderBook.toJson(entity) + ) + ) + + request.client ! httpResponse + + case request: AggregatedOrderBookActor.Query.GetMarketStatus => request.client ! smartWavesMarketStatus + case _ => + } + + TestActor.KeepRunning + } + } + + private def mkOrderBookDirectoryActorTestProbe() = + TestProbe("matcher").tap { + _.setAutoPilot { (sender: ActorRef, msg: Any) => + msg match { + case GetSnapshotOffsets => + sender ! SnapshotOffsetsResponse( + Map( + AssetPair(Waves, priceAsset) -> Some(100L), + smartWavesPair -> Some(120L), + AssetPair(smartAsset, priceAsset) -> None + ) + ) + + case GetMarkets => + sender ! List( + MarketData( + pair = okOrder.assetPair, + amountAssetName = amountAssetDesc.name, + priceAssetName = priceAssetDesc.name, + created = System.currentTimeMillis(), + amountAssetInfo = Some(AssetInfo(amountAssetDesc.decimals)), + priceAssetInfo = Some(AssetInfo(priceAssetDesc.decimals)) + ) + ) + case _ => + } + + TestActor.KeepRunning + } + } + + private def mkAddressActorTestProbe(odb: OrderDb[Future]) = + TestProbe("address").tap { + _.setAutoPilot { (sender: ActorRef, msg: Any) => + val response = msg match { + case AddressDirectoryActor.Command.ForwardMessage(forwardAddress, msg) => + msg match { + case AddressActor.Query.GetReservedBalance => AddressActor.Reply.GetBalance(Map(Waves -> 350L)) + case PlaceOrder(x, _) => + if (x.order.id() == okOrder.id()) AddressActor.Event.OrderAccepted(x.order) else error.OrderDuplicate(x.order.id()) + + case AddressActor.Query.GetOrdersStatuses(_, _) => + AddressActor.Reply.GetOrderStatuses(List(okOrder.id() -> OrderInfo.v5(LimitOrder(okOrder, None, None), OrderStatus.Accepted))) + + case AddressActor.Query.GetOrderStatus(orderId) => + if (orderId == okOrder.id()) AddressActor.Reply.GetOrderStatus(OrderStatus.Accepted) + else Status.Failure(new RuntimeException(s"Unknown order $orderId")) + + case AddressActor.Command.CancelOrder(orderId, Source.Request) => + def handleCancelOrder() = + if (orderId == okOrder.id() || orderId == orderToCancel.id()) AddressActor.Event.OrderCanceled(orderId) + else error.OrderNotFound(orderId) + + odb.get(orderId).futureValue match { + case None => handleCancelOrder() + case Some(order) if order.sender.toAddress == forwardAddress => handleCancelOrder() + case _ => error.OrderNotFound(orderId) + } + + case x @ AddressActor.Command.CancelAllOrders(pair, _, Source.Request) => + if (pair.contains(badOrder.assetPair)) error.AddressIsBlacklisted(badOrder.sender) + else if (pair.forall(_ == okOrder.assetPair)) + AddressActor.Event.BatchCancelCompleted( + Map( + okOrder.id() -> Right(AddressActor.Event.OrderCanceled(okOrder.id())), + badOrder.id() -> Left(error.CanNotPersistEvent) + ) + ) + else Status.Failure(new RuntimeException(s"Can't handle $x")) + + case AddressActor.Command.CancelOrders(ids, Source.Request) => + AddressActor.Event.BatchCancelCompleted( + ids.map { id => + id -> (if (id == orderToCancel.id()) Right(AddressActor.Event.OrderCanceled(okOrder.id())) + else Left(error.CanNotPersistEvent)) + }.toMap + ) + + case GetTradableBalance(xs) => AddressActor.Reply.GetBalance(xs.map(_ -> 100L).toMap) + + case _: AddressActor.Query.GetOrderStatusInfo => + AddressActor.Reply.GetOrdersStatusInfo(OrderInfo.v5(LimitOrder(orderToCancel, None, None), OrderStatus.Accepted).some) + + case x => Status.Failure(new RuntimeException(s"Unknown command: $x")) + } + + case x => Status.Failure(new RuntimeException(s"Unknown message: $x")) + } + + sender ! response + TestActor.KeepRunning + } + } + } diff --git a/dex/src/test/scala/com/wavesplatform/dex/caches/CustomAssetFeeStateSpec.scala b/dex/src/test/scala/com/wavesplatform/dex/caches/CustomAssetFeeStateSpec.scala new file mode 100644 index 0000000000..4a3acd9a30 --- /dev/null +++ b/dex/src/test/scala/com/wavesplatform/dex/caches/CustomAssetFeeStateSpec.scala @@ -0,0 +1,77 @@ +package com.wavesplatform.dex.caches + +import com.wavesplatform.dex.caches.OrderFeeSettingsCache.{AssetsActionForOffset, CustomAssetFeeState} +import com.wavesplatform.dex.domain.asset.Asset +import com.wavesplatform.dex.domain.bytes.ByteStr +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpecLike + +final class CustomAssetFeeStateSpec extends AnyWordSpecLike with Matchers { + + private val asset1: Asset = Asset.IssuedAsset(ByteStr.decodeBase58("DWgwcZTMhSvnyYCoWLRUXXSH1RSkzThXLJhww9gwkqdn").get) + private val asset2: Asset = Asset.IssuedAsset(ByteStr.decodeBase58("2GBgdhqMjUPqreqPziXvZFSmDiQVrxNuGxR1z7ZVsm4Z").get) + private val asset3: Asset = Asset.IssuedAsset(ByteStr.decodeBase58("Euz5HtYcj3nVTZxppA7wdabwTe5BzHFiu4QG1EJtzeUx").get) + private val asset4: Asset = Asset.IssuedAsset(ByteStr.decodeBase58("5UYBPpq4WoU5n4MwpFkgJnW3Fq4B1u3ukpK33ik4QerR").get) + private val asset5: Asset = Asset.IssuedAsset(ByteStr.decodeBase58("8zUYbdB8Q6mDhpcXYv52ji8ycfj4SDX4gJXS7YY3dA4R").get) + + private val defaultAssetMap = Map( + 5L -> Set(asset1), + 10L -> Set(asset1, asset2), + 11L -> Set(asset1, asset2, asset3, asset4), + 15L -> Set(asset2, asset3, asset4), + 16L -> Set(asset2, asset3, asset4, asset5), + 18L -> Set(asset2, asset4, asset5) + ) + + "CustomAssetFeeState" should { + + "return empty Set[Asset] if there is no actions" in { + val state = CustomAssetFeeState.empty + state.getForLatestOffset() shouldBe empty + state.latestAssetOffsetOpt shouldBe empty + (0 to 10).map { i => + state.getAssetsForOffset(i + (i * 5)) shouldBe empty + } + } + + "return proper state for offset" in { + val state = CustomAssetFeeState(defaultAssetMap) + checkAccordingToMap(state) + } + + "apply AssetsActionForOffset" in { + val actions = Seq( + AssetsActionForOffset(5L, Set(asset1), isAdded = true), + AssetsActionForOffset(10L, Set(asset2), isAdded = true), + AssetsActionForOffset(11L, Set(asset3, asset4), isAdded = true), + AssetsActionForOffset(15L, Set(asset1), isAdded = false), + AssetsActionForOffset(16L, Set(asset5), isAdded = true), + AssetsActionForOffset(18L, Set(asset3), isAdded = false) + ) + val state = actions.foldLeft(CustomAssetFeeState.empty) { + case (acc, elem) => acc.applyAssetsActionForOffset(elem) + } + checkAccordingToMap(state) + } + + } + + private def checkAccordingToMap(state: CustomAssetFeeState) = { + state.getForLatestOffset() shouldBe Set(asset2, asset4, asset5) + state.latestAssetOffsetOpt shouldBe Some(18L) + + state.getAssetsForOffset(0) shouldBe empty + state.getAssetsForOffset(5) shouldBe empty + state.getAssetsForOffset(6) shouldBe Set(asset1) + state.getAssetsForOffset(10) shouldBe Set(asset1) + state.getAssetsForOffset(11) shouldBe Set(asset1, asset2) + state.getAssetsForOffset(12) shouldBe Set(asset1, asset2, asset3, asset4) + state.getAssetsForOffset(15) shouldBe Set(asset1, asset2, asset3, asset4) + state.getAssetsForOffset(16) shouldBe Set(asset2, asset3, asset4) + state.getAssetsForOffset(17) shouldBe Set(asset2, asset3, asset4, asset5) + state.getAssetsForOffset(19) shouldBe Set(asset2, asset4, asset5) + state.getAssetsForOffset(20) shouldBe Set(asset2, asset4, asset5) + state.getAssetsForOffset(25) shouldBe Set(asset2, asset4, asset5) + } + +} diff --git a/dex/src/test/scala/com/wavesplatform/dex/db/AssetsDbSpec.scala b/dex/src/test/scala/com/wavesplatform/dex/db/AssetsDbSpec.scala index 3883adc4cb..b120d2d824 100644 --- a/dex/src/test/scala/com/wavesplatform/dex/db/AssetsDbSpec.scala +++ b/dex/src/test/scala/com/wavesplatform/dex/db/AssetsDbSpec.scala @@ -12,7 +12,7 @@ import org.scalatestplus.scalacheck.{ScalaCheckPropertyChecks => PropertyChecks} class AssetsDbSpec extends AnyFreeSpec with Matchers with AssetDescriptionGen with WithDb with PropertyChecks with NoShrink { "AssetsDb.levelDb implementation" - { - "stores and reads all assets" in forAll(Gen.mapOf(assertDescriptionGen)) { assets => + "stores and reads all assets" in forAll(Gen.mapOf(assetDescriptionGen)) { assets => test { adb: AssetsDb[Id] => assets.foreach(Function.tupled(adb.put)) assets.foreach { diff --git a/dex/src/test/scala/com/wavesplatform/dex/db/CustomAssetsDbSpec.scala b/dex/src/test/scala/com/wavesplatform/dex/db/CustomAssetsDbSpec.scala new file mode 100644 index 0000000000..85f72691d1 --- /dev/null +++ b/dex/src/test/scala/com/wavesplatform/dex/db/CustomAssetsDbSpec.scala @@ -0,0 +1,59 @@ +package com.wavesplatform.dex.db + +import cats.Id +import com.wavesplatform.dex.caches.OrderFeeSettingsCache.AssetsActionForOffset +import com.wavesplatform.dex.domain.asset.Asset +import com.wavesplatform.dex.MatcherSpecBase +import monix.execution.atomic.AtomicLong +import org.scalacheck.Gen +import org.scalatest.freespec.AnyFreeSpec +import org.scalatest.matchers.should.Matchers +import org.scalatestplus.scalacheck.{ScalaCheckPropertyChecks => PropertyChecks} + +class CustomAssetsDbSpec extends AnyFreeSpec with Matchers with WithDb with MatcherSpecBase with PropertyChecks { + + private val offsetLong = AtomicLong(0L) + + private val assetActionGen = + for { + assetsNumber <- Gen.choose[Int](1, 40) + assets <- Gen.listOfN(assetsNumber, issuedAssetGen(1.toByte)) + isAdd <- Gen.oneOf(true, false) + } yield AssetsActionForOffset(offsetLong.getAndIncrement(), assets.toSet, isAdd) + + "CustomFeeAssetsDb.levelDb" - { + + "save actions only once and read it" in forAll(assetActionGen) { action => + test { adb => + + adb.save(action) + val all = adb.all() + all.size shouldBe 1 + all.headOption.get shouldBe action + + adb.save(action.copy(isAdded = !action.isAdded, assets = action.assets + Asset.Waves)) + val all2 = adb.all() + all2.size shouldBe 1 + all2.headOption.get shouldBe action + + } + } + + "save and read all values" in forAll(Gen.listOfN(50, assetActionGen)) { actions => + test { adb => + + actions.foreach(adb.save) + val all = adb.all() + + all.size shouldBe actions.size + all.foreach { action => + actions.find(_.offset == action.offset).get shouldBe action + } + } + } + + } + + private def test(f: CustomFeeAssetsDb[Id] => Any): Any = tempLevelDb(db => f(CustomFeeAssetsDb.levelDb(db))) + +} diff --git a/dex/src/test/scala/com/wavesplatform/dex/gen/AssetDescriptionGen.scala b/dex/src/test/scala/com/wavesplatform/dex/gen/AssetDescriptionGen.scala index c6ed23d8cf..24206de4cb 100644 --- a/dex/src/test/scala/com/wavesplatform/dex/gen/AssetDescriptionGen.scala +++ b/dex/src/test/scala/com/wavesplatform/dex/gen/AssetDescriptionGen.scala @@ -9,9 +9,9 @@ import org.scalatest.Suite trait AssetDescriptionGen extends MatcherSpecBase { _: Suite => protected def assertDescriptionsGen(n: Int): Gen[Map[Asset.IssuedAsset, BriefAssetDescription]] = - Gen.containerOfN[Seq, (Asset.IssuedAsset, BriefAssetDescription)](n, assertDescriptionGen).map(_.toMap) + Gen.containerOfN[Seq, (Asset.IssuedAsset, BriefAssetDescription)](n, assetDescriptionGen).map(_.toMap) - protected val assertDescriptionGen: Gen[(Asset.IssuedAsset, BriefAssetDescription)] = for { + protected val assetDescriptionGen: Gen[(Asset.IssuedAsset, BriefAssetDescription)] = for { asset <- issuedAssetGen(1.toByte) name <- Arbitrary.arbString.arbitrary decimals <- Gen.choose(0, 8) diff --git a/dex/src/test/scala/com/wavesplatform/dex/settings/BaseSettingsSpecification.scala b/dex/src/test/scala/com/wavesplatform/dex/settings/BaseSettingsSpecification.scala index 8aa27eb079..e353aba58e 100644 --- a/dex/src/test/scala/com/wavesplatform/dex/settings/BaseSettingsSpecification.scala +++ b/dex/src/test/scala/com/wavesplatform/dex/settings/BaseSettingsSpecification.scala @@ -347,7 +347,7 @@ class BaseSettingsSpecification extends AnyFlatSpec { | -4: { | mode = "composite" | composite { - | custom { + | custom-pairs { | DWgwcZTMhSvnyYCoWLRUXXSH1RSkzThXLJhww9gwkqdn-25FEqEjRkqK6yCkiT7Lz6SAYz7gUFCtxfCChnrVFD5AT { | mode = "percent" | percent { diff --git a/dex/src/test/scala/com/wavesplatform/dex/settings/MatcherFeeSettingsSpecification.scala b/dex/src/test/scala/com/wavesplatform/dex/settings/MatcherFeeSettingsSpecification.scala index 5b41370be9..baa3381932 100644 --- a/dex/src/test/scala/com/wavesplatform/dex/settings/MatcherFeeSettingsSpecification.scala +++ b/dex/src/test/scala/com/wavesplatform/dex/settings/MatcherFeeSettingsSpecification.scala @@ -36,7 +36,7 @@ final class MatcherFeeSettingsSpecification extends BaseSettingsSpecification wi | 5: { | mode = composite | composite { - | custom { + | custom-pairs { | $asset1-WAVES { | mode = "percent" | percent { @@ -204,7 +204,7 @@ final class MatcherFeeSettingsSpecification extends BaseSettingsSpecification wi ).settingsMap.keySet compositeSettings.default shouldBe DynamicSettings(baseMakerFee = 350000, baseTakerFee = 350000) - compositeSettings.custom shouldBe Map( + compositeSettings.customPairs shouldBe Map( assetPair1Waves -> PercentSettings(AssetType.Amount, minFee = 0.01, minFeeInWaves = 1000), assetPair2Waves -> PercentSettings(AssetType.Amount, minFee = 0.01, minFeeInWaves = 1000) )