diff --git a/protocol/app/app.go b/protocol/app/app.go index 193f6f20df..98d862bbe0 100644 --- a/protocol/app/app.go +++ b/protocol/app/app.go @@ -221,6 +221,9 @@ import ( // Grpc Streaming streaming "github.com/dydxprotocol/v4-chain/protocol/streaming/grpc" streamingtypes "github.com/dydxprotocol/v4-chain/protocol/streaming/grpc/types" + + // Grpc Streaming + streamingclient "github.com/dydxprotocol/v4-chain/protocol/streaming/grpc/client" ) var ( @@ -343,6 +346,9 @@ type App struct { // Slinky oraclePrometheusServer *promserver.PrometheusServer oracleMetrics servicemetrics.Metrics + + // Grpc Streaming Test Client + GrpcStreamingTestClient *streamingclient.GrpcClient } // assertAppPreconditions assert invariants required for an application to start. @@ -1410,6 +1416,11 @@ func New( } app.initializeRateLimiters() + if app.GrpcStreamingManager.Enabled() { + app.GrpcStreamingTestClient = streamingclient.NewGrpcClient(appFlags, app.Logger()) + app.GrpcStreamingManager.SubscribeTestClient(app.GrpcStreamingTestClient) + } + // Report out app version and git commit. This will be run when validators restart. version := version.NewInfo() app.Logger().Info( @@ -1671,6 +1682,20 @@ func (app *App) PrepareCheckStater(ctx sdk.Context) { if err := app.ModuleManager.PrepareCheckState(ctx); err != nil { panic(err) } + + // Comparing the local orderbook with memclob's orderbook. + if app.GrpcStreamingTestClient != nil { + app.ClobKeeper.CompareMemclobOrderbookWithLocalOrderbook( + ctx, + app.GrpcStreamingTestClient.GetOrderbook(0), + 0, + ) + app.ClobKeeper.CompareMemclobOrderbookWithLocalOrderbook( + ctx, + app.GrpcStreamingTestClient.GetOrderbook(1), + 1, + ) + } } // InitChainer application update at chain initialization. diff --git a/protocol/app/app_test.go b/protocol/app/app_test.go index 9bc21b9d88..a3b137bec5 100644 --- a/protocol/app/app_test.go +++ b/protocol/app/app_test.go @@ -106,6 +106,7 @@ func TestAppIsFullyInitialized(t *testing.T) { "BridgeClient", "SlinkyClient", "oraclePrometheusServer", + "GrpcStreamingTestClient", // Any default constructed type can be considered initialized if the default is what is // expected. getUninitializedStructFields relies on fields being the non-default and diff --git a/protocol/mocks/MemClob.go b/protocol/mocks/MemClob.go index aac6f81ee9..49b7cabb90 100644 --- a/protocol/mocks/MemClob.go +++ b/protocol/mocks/MemClob.go @@ -337,6 +337,26 @@ func (_m *MemClob) GetOrderRemainingAmount(ctx types.Context, order clobtypes.Or return r0, r1 } +// GetOrderbook provides a mock function with given fields: ctx, clobPairId +func (_m *MemClob) GetOrderbook(ctx types.Context, clobPairId clobtypes.ClobPairId) clobtypes.OrderbookInterface { + ret := _m.Called(ctx, clobPairId) + + if len(ret) == 0 { + panic("no return value specified for GetOrderbook") + } + + var r0 clobtypes.OrderbookInterface + if rf, ok := ret.Get(0).(func(types.Context, clobtypes.ClobPairId) clobtypes.OrderbookInterface); ok { + r0 = rf(ctx, clobPairId) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(clobtypes.OrderbookInterface) + } + } + + return r0 +} + // GetOrderbookUpdatesForOrderPlacement provides a mock function with given fields: ctx, order func (_m *MemClob) GetOrderbookUpdatesForOrderPlacement(ctx types.Context, order clobtypes.Order) *clobtypes.OffchainUpdates { ret := _m.Called(ctx, order) diff --git a/protocol/streaming/grpc/client/client.go b/protocol/streaming/grpc/client/client.go new file mode 100644 index 0000000000..113be59442 --- /dev/null +++ b/protocol/streaming/grpc/client/client.go @@ -0,0 +1,310 @@ +package client + +import ( + "sync" + + "cosmossdk.io/log" + + appflags "github.com/dydxprotocol/v4-chain/protocol/app/flags" + v1 "github.com/dydxprotocol/v4-chain/protocol/indexer/protocol/v1" + v1types "github.com/dydxprotocol/v4-chain/protocol/indexer/protocol/v1/types" + clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types" + satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types" +) + +// Example client to consume data from a gRPC server. +type GrpcClient struct { + Logger log.Logger + Orderbook map[uint32]*LocalOrderbook +} + +type LocalOrderbook struct { + sync.Mutex + + OrderIdToOrder map[v1types.IndexerOrderId]v1types.IndexerOrder + Bids map[uint64][]v1types.IndexerOrder + Asks map[uint64][]v1types.IndexerOrder + FillAmounts map[v1types.IndexerOrderId]uint64 + + Logger log.Logger +} + +func NewGrpcClient(appflags appflags.Flags, logger log.Logger) *GrpcClient { + logger = logger.With("module", "grpc-example-client") + + client := &GrpcClient{ + Logger: logger, + Orderbook: make(map[uint32]*LocalOrderbook), + } + + // Subscribe to grpc orderbook updates. + // go func() { + // grpcClient := daemontypes.GrpcClientImpl{} + + // // Make a connection to the Cosmos gRPC query services. + // queryConn, err := grpcClient.NewTcpConnection(context.Background(), appflags.GrpcAddress) + // if err != nil { + // logger.Error("Failed to establish gRPC connection to Cosmos gRPC query services", "error", err) + // return + // } + // defer func() { + // if err := grpcClient.CloseConnection(queryConn); err != nil { + // logger.Error("Failed to close gRPC connection", "error", err) + // } + // }() + + // clobQueryClient := clobtypes.NewQueryClient(queryConn) + // updateClient, err := clobQueryClient.StreamOrderbookUpdates( + // context.Background(), + // &clobtypes.StreamOrderbookUpdatesRequest{ + // ClobPairId: []uint32{0, 1}, + // }, + // ) + // if err != nil { + // logger.Error("Failed to stream orderbook updates", "error", err) + // return + // } + + // for { + // update, err := updateClient.Recv() + // if err != nil { + // logger.Error("Failed to receive orderbook update", "error", err) + // return + // } + + // logger.Info("Received orderbook update", "update", update) + // client.Update(update) + // } + // }() + return client +} + +// Read method +func (c *GrpcClient) GetOrderbookSnapshot(pairId uint32) *LocalOrderbook { + return c.GetOrderbook(pairId) +} + +// Write method for stream orderbook updates. +func (c *GrpcClient) Update(updates *clobtypes.StreamOrderbookUpdatesResponse) { + for _, update := range updates.GetUpdates() { + if orderUpdate := update.GetOrderbookUpdate(); orderUpdate != nil { + c.ProcessOrderbookUpdate(orderUpdate) + } + if orderFill := update.GetOrderFill(); orderFill != nil { + c.ProcessFill(orderFill) + } + } +} + +// Write method for order placement updates (indexer offchain events) +// Updates may be of the place typek, remove type, or update type. +func (c *GrpcClient) ProcessOrderbookUpdate(orderUpdate *clobtypes.StreamOrderbookUpdate) { + if orderUpdate.Snapshot { + c.Orderbook = make(map[uint32]*LocalOrderbook) + } + + for _, update := range orderUpdate.Updates { + if orderPlace := update.GetOrderPlace(); orderPlace != nil { + order := orderPlace.GetOrder() + orderbook := c.GetOrderbook(order.OrderId.ClobPairId) + orderbook.AddOrder(*order) + } + + if orderRemove := update.GetOrderRemove(); orderRemove != nil { + orderId := orderRemove.RemovedOrderId + orderbook := c.GetOrderbook(orderId.ClobPairId) + orderbook.RemoveOrder(*orderId) + } + + if orderUpdate := update.GetOrderUpdate(); orderUpdate != nil { + orderId := orderUpdate.OrderId + orderbook := c.GetOrderbook(orderId.ClobPairId) + orderbook.SetOrderFillAmount(orderId, orderUpdate.TotalFilledQuantums) + } + } +} + +// Write method for orderbook fills update. +// Fills are received whenever a match is emitted by the clob. +// Match can be either liquidation or a regular order match. +func (c *GrpcClient) ProcessFill(orderFill *clobtypes.StreamOrderbookFill) { + orderMap, fillAmountMap := orderListToMap(orderFill.Orders, orderFill.FillAmounts) + clobMatch := orderFill.ClobMatch + + if matchOrders := clobMatch.GetMatchOrders(); matchOrders != nil { + c.ProcessMatchOrders(matchOrders, orderMap, fillAmountMap) + } + + if matchPerpLiquidation := clobMatch.GetMatchPerpetualLiquidation(); matchPerpLiquidation != nil { + c.ProcessMatchPerpetualLiquidation(matchPerpLiquidation, orderMap, fillAmountMap) + } +} + +func (c *GrpcClient) ProcessMatchPerpetualLiquidation( + perpLiquidation *clobtypes.MatchPerpetualLiquidation, + orderMap map[clobtypes.OrderId]clobtypes.Order, + fillAmountMap map[clobtypes.OrderId]uint64, +) { + localOrderbook := c.Orderbook[perpLiquidation.ClobPairId] + for _, fill := range perpLiquidation.GetFills() { + makerOrder := orderMap[fill.MakerOrderId] + indexerMakerOrderId := v1.OrderIdToIndexerOrderId(makerOrder.OrderId) + localOrderbook.SetOrderFillAmount(&indexerMakerOrderId, fillAmountMap[makerOrder.OrderId]) + } +} + +func (c *GrpcClient) ProcessMatchOrders( + matchOrders *clobtypes.MatchOrders, + orderMap map[clobtypes.OrderId]clobtypes.Order, + fillAmountMap map[clobtypes.OrderId]uint64, +) { + takerOrderId := matchOrders.TakerOrderId + clobPairId := takerOrderId.GetClobPairId() + localOrderbook := c.Orderbook[clobPairId] + + indexerTakerOrder := v1.OrderIdToIndexerOrderId(takerOrderId) + localOrderbook.SetOrderFillAmount(&indexerTakerOrder, fillAmountMap[takerOrderId]) + + for _, fill := range matchOrders.Fills { + makerOrder := orderMap[fill.MakerOrderId] + indexerMakerOrder := v1.OrderIdToIndexerOrderId(makerOrder.OrderId) + localOrderbook.SetOrderFillAmount(&indexerMakerOrder, fillAmountMap[makerOrder.OrderId]) + } +} + +// orderListToMap generates a map from orderId to order and +// orderId to fill amount. Orders and fill amounts should be the same length. +func orderListToMap( + orders []clobtypes.Order, + fillAmounts []uint64, +) ( + orderMap map[clobtypes.OrderId]clobtypes.Order, + fillAmountMap map[clobtypes.OrderId]uint64, +) { + orderMap = make(map[clobtypes.OrderId]clobtypes.Order, 0) + fillAmountMap = make(map[clobtypes.OrderId]uint64, 0) + + for idx, order := range orders { + orderMap[order.OrderId] = order + fillAmountMap[order.OrderId] = fillAmounts[idx] + } + return orderMap, fillAmountMap +} + +func (c *GrpcClient) GetOrderbook(pairId uint32) *LocalOrderbook { + if _, ok := c.Orderbook[pairId]; !ok { + c.Orderbook[pairId] = &LocalOrderbook{ + OrderIdToOrder: make(map[v1types.IndexerOrderId]v1types.IndexerOrder), + Bids: make(map[uint64][]v1types.IndexerOrder), + Asks: make(map[uint64][]v1types.IndexerOrder), + FillAmounts: make(map[v1types.IndexerOrderId]uint64), + + Logger: c.Logger, + } + } + return c.Orderbook[pairId] +} + +// Add an order to the local orderbook. +func (l *LocalOrderbook) AddOrder(order v1types.IndexerOrder) { + l.Lock() + defer l.Unlock() + + if _, ok := l.OrderIdToOrder[order.OrderId]; ok { + l.Logger.Error("order already exists in orderbook") + } + + subticks := order.GetSubticks() + if order.Side == v1types.IndexerOrder_SIDE_BUY { + if _, ok := l.Bids[subticks]; !ok { + l.Bids[subticks] = make([]v1types.IndexerOrder, 0) + } + l.Bids[subticks] = append( + l.Bids[subticks], + order, + ) + } else { + if _, ok := l.Asks[subticks]; !ok { + l.Asks[subticks] = make([]v1types.IndexerOrder, 0) + } + l.Asks[subticks] = append( + l.Asks[subticks], + order, + ) + } + + l.OrderIdToOrder[order.OrderId] = order +} + +// Remove an order from local orderbook. +func (l *LocalOrderbook) RemoveOrder(orderId v1types.IndexerOrderId) { + l.Lock() + defer l.Unlock() + + if _, ok := l.OrderIdToOrder[orderId]; !ok { + l.Logger.Error("order not found in orderbook") + } + + order := l.OrderIdToOrder[orderId] + subticks := order.GetSubticks() + + if order.Side == v1types.IndexerOrder_SIDE_BUY { + for i, o := range l.Bids[subticks] { + if o.OrderId == order.OrderId { + l.Bids[subticks] = append( + l.Bids[subticks][:i], + l.Bids[subticks][i+1:]..., + ) + break + } + } + if len(l.Bids[subticks]) == 0 { + delete(l.Bids, subticks) + } + } else { + for i, o := range l.Asks[subticks] { + if o.OrderId == order.OrderId { + l.Asks[subticks] = append( + l.Asks[subticks][:i], + l.Asks[subticks][i+1:]..., + ) + break + } + } + if len(l.Asks[subticks]) == 0 { + delete(l.Asks, subticks) + } + } + + delete(l.OrderIdToOrder, orderId) +} + +// Update the fill amount for an order. +func (l *LocalOrderbook) SetOrderFillAmount( + orderId *v1types.IndexerOrderId, + fillAmount uint64, +) { + l.Lock() + defer l.Unlock() + + if fillAmount == 0 { + delete(l.FillAmounts, *orderId) + } else { + l.FillAmounts[*orderId] = fillAmount + } +} + +// Utility method to convert indexer order id to clob types order id. +// This example client uses indexer order ids on the backend. Currently +// there is no difference between indexer order id and clob order id. +func IndexerOrderIdToOrderId(idxOrderId v1types.IndexerOrderId) *clobtypes.OrderId { + return &clobtypes.OrderId{ + SubaccountId: satypes.SubaccountId{ + Owner: idxOrderId.SubaccountId.Owner, + Number: idxOrderId.SubaccountId.Number, + }, + ClientId: idxOrderId.ClientId, + OrderFlags: idxOrderId.OrderFlags, + ClobPairId: idxOrderId.ClobPairId, + } +} diff --git a/protocol/streaming/grpc/grpc_streaming_manager.go b/protocol/streaming/grpc/grpc_streaming_manager.go index 54037813f9..6ee69500ad 100644 --- a/protocol/streaming/grpc/grpc_streaming_manager.go +++ b/protocol/streaming/grpc/grpc_streaming_manager.go @@ -10,6 +10,7 @@ import ( "github.com/cosmos/gogoproto/proto" ocutypes "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types" "github.com/dydxprotocol/v4-chain/protocol/lib/metrics" + "github.com/dydxprotocol/v4-chain/protocol/streaming/grpc/client" "github.com/dydxprotocol/v4-chain/protocol/streaming/grpc/types" clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types" ) @@ -52,6 +53,9 @@ type OrderbookSubscription struct { // Channel to buffer writes before the stream updatesChannel chan []clobtypes.StreamUpdate + + // Testing + client *client.GrpcClient } func NewGrpcStreamingManager( @@ -185,7 +189,20 @@ func (sm *GrpcStreamingManagerImpl) Subscribe( return err } -// removeSubscription removes a subscription from the grpc streaming manager. +func (sm *GrpcStreamingManagerImpl) SubscribeTestClient(client *client.GrpcClient) { + subscription := &OrderbookSubscription{ + clobPairIds: []uint32{0, 1}, + client: client, + } + + sm.Lock() + defer sm.Unlock() + + sm.orderbookSubscriptions[sm.nextSubscriptionId] = subscription + sm.nextSubscriptionId++ +} + +// removeSubscription removes a subscription from the streaming manager. // The streaming manager's lock should already be acquired before calling this. func (sm *GrpcStreamingManagerImpl) removeSubscription( subscriptionIdToRemove uint32, @@ -253,17 +270,11 @@ func (sm *GrpcStreamingManagerImpl) SendSnapshot( metrics.GrpcAddToSubscriptionChannelCount, 1, ) - select { - case subscription.updatesChannel <- streamUpdates: - default: - sm.logger.Error( - fmt.Sprintf( - "GRPC Streaming subscription id %+v channel full capacity. Dropping subscription connection.", - subscriptionId, - ), - ) - removeSubscription = true - } + subscription.client.Update( + &clobtypes.StreamOrderbookUpdatesResponse{ + Updates: streamUpdates, + }, + ) } // Clean up subscriptions that have been closed. @@ -317,7 +328,25 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates( } } - sm.AddUpdatesToCache(updatesByClobPairId, uint32(len(updates))) + sm.Lock() + defer sm.Unlock() + + for _, subscription := range sm.orderbookSubscriptions { + updatesToSend := make([]clobtypes.StreamUpdate, 0) + for _, clobPairId := range subscription.clobPairIds { + if updates, ok := updatesByClobPairId[clobPairId]; ok { + updatesToSend = append(updatesToSend, updates...) + } + } + + if len(updatesToSend) > 0 { + subscription.client.Update( + &clobtypes.StreamOrderbookUpdatesResponse{ + Updates: updatesToSend, + }, + ) + } + } } // SendOrderbookFillUpdates groups fills by their clob pair ids and @@ -354,7 +383,25 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookFillUpdates( updatesByClobPairId[clobPairId] = append(updatesByClobPairId[clobPairId], streamUpdate) } - sm.AddUpdatesToCache(updatesByClobPairId, uint32(len(orderbookFills))) + sm.Lock() + defer sm.Unlock() + + for _, subscription := range sm.orderbookSubscriptions { + updatesToSend := make([]clobtypes.StreamUpdate, 0) + for _, clobPairId := range subscription.clobPairIds { + if updates, ok := updatesByClobPairId[clobPairId]; ok { + updatesToSend = append(updatesToSend, updates...) + } + } + + if len(updatesToSend) > 0 { + subscription.client.Update( + &clobtypes.StreamOrderbookUpdatesResponse{ + Updates: updatesToSend, + }, + ) + } + } } func (sm *GrpcStreamingManagerImpl) AddUpdatesToCache( diff --git a/protocol/streaming/grpc/noop_streaming_manager.go b/protocol/streaming/grpc/noop_streaming_manager.go index f5c61f0713..3ccfedc18b 100644 --- a/protocol/streaming/grpc/noop_streaming_manager.go +++ b/protocol/streaming/grpc/noop_streaming_manager.go @@ -2,6 +2,7 @@ package grpc import ( sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/dydxprotocol/v4-chain/protocol/streaming/grpc/client" "github.com/dydxprotocol/v4-chain/protocol/streaming/grpc/types" clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types" ) @@ -42,6 +43,9 @@ func (sm *NoopGrpcStreamingManager) SendOrderbookUpdates( ) { } +func (sm *NoopGrpcStreamingManager) SubscribeTestClient(client *client.GrpcClient) { +} + func (sm *NoopGrpcStreamingManager) SendOrderbookFillUpdates( ctx sdk.Context, orderbookFills []clobtypes.StreamOrderbookFill, diff --git a/protocol/streaming/grpc/types/manager.go b/protocol/streaming/grpc/types/manager.go index 74b145985c..08b6cc5b81 100644 --- a/protocol/streaming/grpc/types/manager.go +++ b/protocol/streaming/grpc/types/manager.go @@ -2,6 +2,7 @@ package types import ( sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/dydxprotocol/v4-chain/protocol/streaming/grpc/client" clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types" ) @@ -31,6 +32,9 @@ type GrpcStreamingManager interface { blockHeight uint32, execMode sdk.ExecMode, ) + SubscribeTestClient( + client *client.GrpcClient, + ) SendOrderbookFillUpdates( ctx sdk.Context, orderbookFills []clobtypes.StreamOrderbookFill, diff --git a/protocol/x/clob/keeper/grpc_stream_orderbook.go b/protocol/x/clob/keeper/grpc_stream_orderbook.go index 710a6ceec6..22b087feb5 100644 --- a/protocol/x/clob/keeper/grpc_stream_orderbook.go +++ b/protocol/x/clob/keeper/grpc_stream_orderbook.go @@ -1,7 +1,15 @@ package keeper import ( + "fmt" + + sdk "github.com/cosmos/cosmos-sdk/types" + v1 "github.com/dydxprotocol/v4-chain/protocol/indexer/protocol/v1" + v1types "github.com/dydxprotocol/v4-chain/protocol/indexer/protocol/v1/types" + "github.com/dydxprotocol/v4-chain/protocol/lib" + "github.com/dydxprotocol/v4-chain/protocol/streaming/grpc/client" "github.com/dydxprotocol/v4-chain/protocol/x/clob/types" + satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types" ) func (k Keeper) StreamOrderbookUpdates( @@ -18,3 +26,243 @@ func (k Keeper) StreamOrderbookUpdates( <-ctx.Done() return nil } + +// Compare the aggregated size for each price level. +func (k Keeper) CompareMemclobOrderbookWithLocalOrderbook( + ctx sdk.Context, + localOrderbook *client.LocalOrderbook, + id types.ClobPairId, +) { + localOrderbook.Lock() + defer localOrderbook.Unlock() + + logger := k.Logger(ctx).With("module", "grpc-example-client").With("block", ctx.BlockHeight()).With("clob_pair_id", id) + + logger.Info("Comparing grpc orderbook with actual memclob orderbook!") + + orderbook := k.MemClob.GetOrderbook(ctx, id) + orderbookBids := orderbook.GetBids() + + // Compare bids. + bids := lib.GetSortedKeys[lib.Sortable[types.Subticks]](orderbookBids) + + logger.Info("Comparing bids", "bids", bids) + if len(bids) != len(localOrderbook.Bids) { + logger.Error( + "Bids length mismatch", + "expected", len(bids), + "actual", len(localOrderbook.Bids), + ) + } + + for _, bid := range bids { + level := orderbookBids[bid] + + expectedAggregatedQuantity := uint64(0) + expectedOrders := make([]types.Order, 0) + expectedRemainingAmounts := make([]uint64, 0) + for levelOrder := level.LevelOrders.Front; levelOrder != nil; levelOrder = levelOrder.Next { + order := levelOrder.Value + _, filledAmount, _ := k.GetOrderFillAmount(ctx, order.Order.OrderId) + expectedAggregatedQuantity += (order.Order.Quantums - filledAmount.ToUint64()) + expectedOrders = append(expectedOrders, order.Order) + expectedRemainingAmounts = append(expectedRemainingAmounts, order.Order.Quantums-filledAmount.ToUint64()) + } + + actualAggregatedQuantity := uint64(0) + actualOrders := make([]v1types.IndexerOrder, 0) + actualRemainingAmounts := make([]uint64, 0) + for _, order := range localOrderbook.Bids[bid.ToUint64()] { + remainingAmount := order.Quantums - localOrderbook.FillAmounts[order.OrderId] + actualAggregatedQuantity += remainingAmount + actualOrders = append(actualOrders, order) + actualRemainingAmounts = append(actualRemainingAmounts, remainingAmount) + } + + // Compare the aggregated quantity as a basic sanity check. + if expectedAggregatedQuantity != actualAggregatedQuantity { + logger.Error( + "Aggregated quantity mismatch for bid level", + "price", bid, + "expected", expectedAggregatedQuantity, + "actual", actualAggregatedQuantity, + "expected_orders", expectedOrders, + "actual_orders", actualOrders, + "expected_remaining_amounts", expectedRemainingAmounts, + "actual_remaining_amounts", actualRemainingAmounts, + ) + } + + if len(expectedOrders) != len(actualOrders) { + logger.Error( + "Different number of orders at bid level", + "price", bid, + "expected", expectedOrders, + "actual", actualOrders, + ) + } else { + for i, expected := range expectedOrders { + if expected.OrderId.ClientId != actualOrders[i].OrderId.ClientId { + logger.Error( + "Different order at bid level", + "price", bid, + "expected", expected, + "actual", actualOrders[i], + ) + } + if expectedRemainingAmounts[i] != actualRemainingAmounts[i] { + logger.Error( + "Different remaining amount at bid level", + "price", bid, + "expected", expectedRemainingAmounts[i], + "actual", actualRemainingAmounts[i], + ) + } + } + } + } + + orderbookAsks := orderbook.GetAsks() + + // Compare asks. + asks := lib.GetSortedKeys[lib.Sortable[types.Subticks]](orderbookAsks) + + logger.Info("Comparing asks", "asks", asks) + if len(asks) != len(localOrderbook.Asks) { + logger.Error( + "Asks length mismatch", + "expected", len(asks), + "actual", len(localOrderbook.Asks), + ) + } + + for _, ask := range asks { + level := orderbookAsks[ask] + + expectedAggregatedQuantity := uint64(0) + expectedOrders := make([]types.Order, 0) + expectedRemainingAmounts := make([]uint64, 0) + for levelOrder := level.LevelOrders.Front; levelOrder != nil; levelOrder = levelOrder.Next { + order := levelOrder.Value + _, filledAmount, _ := k.GetOrderFillAmount(ctx, order.Order.OrderId) + expectedAggregatedQuantity += (order.Order.Quantums - filledAmount.ToUint64()) + expectedOrders = append(expectedOrders, order.Order) + expectedRemainingAmounts = append(expectedRemainingAmounts, order.Order.Quantums-filledAmount.ToUint64()) + } + + actualAggregatedQuantity := uint64(0) + actualOrders := make([]v1types.IndexerOrder, 0) + actualRemainingAmounts := make([]uint64, 0) + for _, order := range localOrderbook.Asks[ask.ToUint64()] { + remainingAmount := order.Quantums - localOrderbook.FillAmounts[order.OrderId] + actualAggregatedQuantity += remainingAmount + actualOrders = append(actualOrders, order) + actualRemainingAmounts = append(actualRemainingAmounts, remainingAmount) + } + + // Compare the aggregated quantity as a basic sanity check. + if expectedAggregatedQuantity != actualAggregatedQuantity { + logger.Error( + "Aggregated quantity mismatch for ask level", + "price", ask, + "expected", expectedAggregatedQuantity, + "actual", actualAggregatedQuantity, + "expected_orders", expectedOrders, + "actual_orders", actualOrders, + "expected_remaining_amounts", expectedRemainingAmounts, + "actual_remaining_amounts", actualRemainingAmounts, + ) + } + + if len(expectedOrders) != len(actualOrders) { + logger.Error( + "Different number of orders at ask level", + "price", ask, + "expected", expectedOrders, + "actual", actualOrders, + ) + } else { + for i, expected := range expectedOrders { + if expected.OrderId.ClientId != actualOrders[i].OrderId.ClientId { + logger.Error( + "Different order at ask level", + "price", ask, + "expected", expected, + "actual", actualOrders[i], + ) + } + if expectedRemainingAmounts[i] != actualRemainingAmounts[i] { + logger.Error( + "Different remaining amount at ask level", + "price", ask, + "expected", expectedRemainingAmounts[i], + "actual", actualRemainingAmounts[i], + ) + } + } + } + } + + // Compare Fills in State with fills on the locally constructed orderbook from + // grpc stream. + numFailed := 0 + numPassed := 0 + allFillStates := k.GetAllOrderFillStates(ctx) + for _, fillState := range allFillStates { + orderFillAmount := fillState.FillAmount + orderId := fillState.OrderId + // Skip check for non-relevant clob pair id + if orderId.ClobPairId != uint32(id) { + continue + } + + indexerOrderId := v1.OrderIdToIndexerOrderId(orderId) + localOrderbookFillAmount := localOrderbook.FillAmounts[indexerOrderId] + + if orderFillAmount != localOrderbookFillAmount { + logger.Error( + "Fill Amount Mismatch", + "orderId", orderId.String(), + "state_fill_amt", orderFillAmount, + "local_fill_amt", localOrderbookFillAmount, + ) + numFailed += 1 + } else { + numPassed += 1 + } + } + + // Check if the locally constructed orderbook has extraneous order ids in the fill amounts + // when compared to state. + numInOrderbookButNotState := 0 + for indexerOrderId, localFillAmount := range localOrderbook.FillAmounts { + clobOrderId := types.OrderId{ + SubaccountId: satypes.SubaccountId{ + Owner: indexerOrderId.SubaccountId.Owner, + Number: indexerOrderId.SubaccountId.Number, + }, + ClientId: indexerOrderId.ClientId, + OrderFlags: indexerOrderId.OrderFlags, + ClobPairId: indexerOrderId.ClobPairId, + } + exists, _, _ := k.GetOrderFillAmount(ctx, clobOrderId) + if !exists { + numInOrderbookButNotState += 1 + logger.Error( + "Fill Amount Mismatch: exists in local orderbook but not in state", + "orderId", clobOrderId.String(), + "local_fill_amt", localFillAmount, + ) + } + } + + ratio := float32(numFailed) * 100 / float32(numPassed+numFailed) + logger.Error( + fmt.Sprintf("Fill amount comparison results: %.2f failed", ratio), + "failed", numFailed, + "passed", numPassed, + "in_orderbook_not_state", numInOrderbookButNotState, + ) + + logger.Info("Orderbook comparison done!") +} diff --git a/protocol/x/clob/memclob/memclob.go b/protocol/x/clob/memclob/memclob.go index 5a4a88ce7d..25fca7b516 100644 --- a/protocol/x/clob/memclob/memclob.go +++ b/protocol/x/clob/memclob/memclob.go @@ -2438,3 +2438,7 @@ func (m *MemClobPriceTimePriority) resizeReduceOnlyMatchIfNecessary( maxMatchSize := lib.BigMin(absPositionSize, absNewMatchSize) return satypes.BaseQuantums(maxMatchSize.Uint64()) } + +func (m *MemClobPriceTimePriority) GetOrderbook(ctx sdk.Context, clobPairId types.ClobPairId) types.OrderbookInterface { + return m.orderbooks[clobPairId] +} diff --git a/protocol/x/clob/memclob/memclob_open_orders.go b/protocol/x/clob/memclob/memclob_open_orders.go index a1ef5ffbae..e9c13daa11 100644 --- a/protocol/x/clob/memclob/memclob_open_orders.go +++ b/protocol/x/clob/memclob/memclob_open_orders.go @@ -1,10 +1,11 @@ package memclob import ( - errorsmod "cosmossdk.io/errors" "fmt" "math" + errorsmod "cosmossdk.io/errors" + sdk "github.com/cosmos/cosmos-sdk/types" "github.com/dydxprotocol/v4-chain/protocol/x/clob/types" satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types" diff --git a/protocol/x/clob/types/memclob.go b/protocol/x/clob/types/memclob.go index 14cd3e38dc..94d774439d 100644 --- a/protocol/x/clob/types/memclob.go +++ b/protocol/x/clob/types/memclob.go @@ -159,4 +159,13 @@ type MemClob interface { takerOrder MatchableOrder, makerOrders []Order, ) StreamOrderbookFill + GetOrderbook( + ctx sdk.Context, + clobPairId ClobPairId, + ) OrderbookInterface +} + +type OrderbookInterface interface { + GetAsks() map[Subticks]*Level + GetBids() map[Subticks]*Level } diff --git a/protocol/x/clob/types/orderbook.go b/protocol/x/clob/types/orderbook.go index 9fa532d3fc..891e3c48a2 100644 --- a/protocol/x/clob/types/orderbook.go +++ b/protocol/x/clob/types/orderbook.go @@ -85,6 +85,16 @@ type Orderbook struct { TotalOpenOrders uint } +// GetAsks gets the asks +func (ob *Orderbook) GetAsks() map[Subticks]*Level { + return ob.Asks +} + +// GetBids gets the asks +func (ob *Orderbook) GetBids() map[Subticks]*Level { + return ob.Bids +} + // GetSide returns the Bid-side levels if `isBuy == true` otherwise, returns the Ask-side levels. func (ob *Orderbook) GetSide(isBuy bool) map[Subticks]*Level { if isBuy {