diff --git a/protocol/lib/collections.go b/protocol/lib/collections.go index 5f57e7ff42..b2a5f11b70 100644 --- a/protocol/lib/collections.go +++ b/protocol/lib/collections.go @@ -106,3 +106,15 @@ func MergeAllMapsMustHaveDistinctKeys[K comparable, V any](maps ...map[K]V) map[ } return combinedMap } + +// MergeMaps merges all the maps into a single map. +// Does not require maps to have distinct keys. +func MergeMaps[K comparable, V any](maps ...map[K]V) map[K]V { + combinedMap := make(map[K]V) + for _, m := range maps { + for k, v := range m { + combinedMap[k] = v + } + } + return combinedMap +} diff --git a/protocol/lib/metrics/metric_keys.go b/protocol/lib/metrics/metric_keys.go index 46d8d3df9c..09607d76e0 100644 --- a/protocol/lib/metrics/metric_keys.go +++ b/protocol/lib/metrics/metric_keys.go @@ -59,6 +59,7 @@ const ( // Full node grpc FullNodeGrpc = "full_node_grpc" GrpcSendOrderbookUpdatesLatency = "grpc_send_orderbook_updates_latency" + GrpcSendOrderbookFillsLatency = "grpc_send_orderbook_fills_latency" EndBlocker = "end_blocker" EndBlockerLag = "end_blocker_lag" ) diff --git a/protocol/mocks/MemClob.go b/protocol/mocks/MemClob.go index 7313f46942..7c7c24ef16 100644 --- a/protocol/mocks/MemClob.go +++ b/protocol/mocks/MemClob.go @@ -91,6 +91,24 @@ func (_m *MemClob) DeleverageSubaccount(ctx types.Context, subaccountId subaccou return r0, r1 } +// GenerateStreamOrderbookFill provides a mock function with given fields: ctx, clobMatch, takerOrder, makerOrders +func (_m *MemClob) GenerateStreamOrderbookFill(ctx types.Context, clobMatch clobtypes.ClobMatch, takerOrder clobtypes.MatchableOrder, makerOrders []clobtypes.Order) clobtypes.StreamOrderbookFill { + ret := _m.Called(ctx, clobMatch, takerOrder, makerOrders) + + if len(ret) == 0 { + panic("no return value specified for GenerateStreamOrderbookFill") + } + + var r0 clobtypes.StreamOrderbookFill + if rf, ok := ret.Get(0).(func(types.Context, clobtypes.ClobMatch, clobtypes.MatchableOrder, []clobtypes.Order) clobtypes.StreamOrderbookFill); ok { + r0 = rf(ctx, clobMatch, takerOrder, makerOrders) + } else { + r0 = ret.Get(0).(clobtypes.StreamOrderbookFill) + } + + return r0 +} + // GetCancelOrder provides a mock function with given fields: ctx, orderId func (_m *MemClob) GetCancelOrder(ctx types.Context, orderId clobtypes.OrderId) (uint32, bool) { ret := _m.Called(ctx, orderId) diff --git a/protocol/mocks/MemClobKeeper.go b/protocol/mocks/MemClobKeeper.go index 2952491da6..8703449d7b 100644 --- a/protocol/mocks/MemClobKeeper.go +++ b/protocol/mocks/MemClobKeeper.go @@ -382,6 +382,11 @@ func (_m *MemClobKeeper) ReplayPlaceOrder(ctx types.Context, msg *clobtypes.MsgP return r0, r1, r2, r3 } +// SendOrderbookFillUpdates provides a mock function with given fields: ctx, orderbookFills +func (_m *MemClobKeeper) SendOrderbookFillUpdates(ctx types.Context, orderbookFills []clobtypes.StreamOrderbookFill) { + _m.Called(ctx, orderbookFills) +} + // SendOrderbookUpdates provides a mock function with given fields: ctx, offchainUpdates, snapshot func (_m *MemClobKeeper) SendOrderbookUpdates(ctx types.Context, offchainUpdates *clobtypes.OffchainUpdates, snapshot bool) { _m.Called(ctx, offchainUpdates, snapshot) diff --git a/protocol/scripts/genesis/sample_pregenesis.json b/protocol/scripts/genesis/sample_pregenesis.json index 58f7b7d594..c870e8edb2 100644 --- a/protocol/scripts/genesis/sample_pregenesis.json +++ b/protocol/scripts/genesis/sample_pregenesis.json @@ -1,4 +1,5 @@ { + "app_hash": null, "app_name": "dydxprotocold", "app_state": { "assets": { @@ -524,6 +525,7 @@ } } }, + "consensus": null, "crisis": { "constant_fee": { "amount": "1000000000000000000", @@ -758,6 +760,7 @@ }, "gov": { "constitution": "", + "deposit_params": null, "deposits": [], "params": { "burn_proposal_deposit_prevote": false, @@ -789,7 +792,9 @@ }, "proposals": [], "starting_proposal_id": "1", - "votes": [] + "tally_params": null, + "votes": [], + "voting_params": null }, "govplus": {}, "ibc": { @@ -854,6 +859,7 @@ "port": "icahost" } }, + "params": null, "perpetuals": { "liquidity_tiers": [ { @@ -1782,7 +1788,7 @@ ] } }, - "app_version": "4.0.0-dev0-22-gd31fa077", + "app_version": "4.1.1-dev0-1-gf7106453", "chain_id": "dydx-sample-1", "consensus": { "params": { diff --git a/protocol/streaming/grpc/grpc_streaming_manager.go b/protocol/streaming/grpc/grpc_streaming_manager.go index 5aa7cad587..a8ce6ad1f2 100644 --- a/protocol/streaming/grpc/grpc_streaming_manager.go +++ b/protocol/streaming/grpc/grpc_streaming_manager.go @@ -149,6 +149,71 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates( } } +// SendOrderbookFillUpdates groups fills by their clob pair ids and +// sends messages to the subscribers. +func (sm *GrpcStreamingManagerImpl) SendOrderbookFillUpdates( + ctx sdk.Context, + orderbookFills []clobtypes.StreamOrderbookFill, + blockHeight uint32, + execMode sdk.ExecMode, +) { + defer metrics.ModuleMeasureSince( + metrics.FullNodeGrpc, + metrics.GrpcSendOrderbookFillsLatency, + time.Now(), + ) + + // Group fills by clob pair id. + updatesByClobPairId := make(map[uint32][]clobtypes.StreamUpdate) + for _, orderbookFill := range orderbookFills { + // Fetch the clob pair id from the first order in `OrderBookMatchFill`. + // We can assume there must be an order, and that all orders share the same + // clob pair id. + clobPairId := orderbookFill.Orders[0].OrderId.ClobPairId + if _, ok := updatesByClobPairId[clobPairId]; !ok { + updatesByClobPairId[clobPairId] = []clobtypes.StreamUpdate{} + } + streamUpdate := clobtypes.StreamUpdate{ + UpdateMessage: &clobtypes.StreamUpdate_OrderFill{ + OrderFill: &orderbookFill, + }, + } + updatesByClobPairId[clobPairId] = append(updatesByClobPairId[clobPairId], streamUpdate) + } + + sm.Lock() + defer sm.Unlock() + + // Send updates to subscribers. + idsToRemove := make([]uint32, 0) + for id, subscription := range sm.orderbookSubscriptions { + streamUpdatesForSubscription := make([]clobtypes.StreamUpdate, 0) + for _, clobPairId := range subscription.clobPairIds { + if update, ok := updatesByClobPairId[clobPairId]; ok { + streamUpdatesForSubscription = append(streamUpdatesForSubscription, update...) + } + } + + if len(streamUpdatesForSubscription) > 0 { + if err := subscription.srv.Send( + &clobtypes.StreamOrderbookUpdatesResponse{ + Updates: streamUpdatesForSubscription, + BlockHeight: blockHeight, + ExecMode: uint32(execMode), + }, + ); err != nil { + idsToRemove = append(idsToRemove, id) + } + } + } + + // Clean up subscriptions that have been closed. + // If a Send update has failed for any clob pair id, the whole subscription will be removed. + for _, id := range idsToRemove { + delete(sm.orderbookSubscriptions, id) + } +} + // GetUninitializedClobPairIds returns the clob pair ids that have not been initialized. func (sm *GrpcStreamingManagerImpl) GetUninitializedClobPairIds() []uint32 { sm.Lock() diff --git a/protocol/streaming/grpc/noop_streaming_manager.go b/protocol/streaming/grpc/noop_streaming_manager.go index 3de85deaec..424871b4c3 100644 --- a/protocol/streaming/grpc/noop_streaming_manager.go +++ b/protocol/streaming/grpc/noop_streaming_manager.go @@ -35,6 +35,14 @@ func (sm *NoopGrpcStreamingManager) SendOrderbookUpdates( ) { } +func (sm *NoopGrpcStreamingManager) SendOrderbookFillUpdates( + ctx sdk.Context, + orderbookFills []clobtypes.StreamOrderbookFill, + blockHeight uint32, + execMode sdk.ExecMode, +) { +} + func (sm *NoopGrpcStreamingManager) GetUninitializedClobPairIds() []uint32 { return []uint32{} } diff --git a/protocol/streaming/grpc/types/manager.go b/protocol/streaming/grpc/types/manager.go index 5d0d9f1ab4..9b5af0c093 100644 --- a/protocol/streaming/grpc/types/manager.go +++ b/protocol/streaming/grpc/types/manager.go @@ -22,4 +22,10 @@ type GrpcStreamingManager interface { blockHeight uint32, execMode sdk.ExecMode, ) + SendOrderbookFillUpdates( + ctx sdk.Context, + orderbookFills []clobtypes.StreamOrderbookFill, + blockHeight uint32, + execMode sdk.ExecMode, + ) } diff --git a/protocol/testutil/memclob/keeper.go b/protocol/testutil/memclob/keeper.go index 1c22339c21..1887c495c7 100644 --- a/protocol/testutil/memclob/keeper.go +++ b/protocol/testutil/memclob/keeper.go @@ -510,3 +510,9 @@ func (f *FakeMemClobKeeper) SendOrderbookUpdates( snapshot bool, ) { } + +func (f *FakeMemClobKeeper) SendOrderbookFillUpdates( + ctx sdk.Context, + orderbookFills []types.StreamOrderbookFill, +) { +} diff --git a/protocol/x/clob/abci.go b/protocol/x/clob/abci.go index a22606f18a..2ee3cb7a38 100644 --- a/protocol/x/clob/abci.go +++ b/protocol/x/clob/abci.go @@ -166,45 +166,6 @@ func PrepareCheckState( offchainUpdates, ) - // For orders that are filled in the last block, send an orderbook update to the grpc streams. - if keeper.GetGrpcStreamingManager().Enabled() { - allUpdates := types.NewOffchainUpdates() - orderIdsToSend := make(map[types.OrderId]bool) - - // Send an update for reverted local operations. - for _, operation := range localValidatorOperationsQueue { - if match := operation.GetMatch(); match != nil { - // For normal order matches, we send an update for the taker and maker orders. - if matchedOrders := match.GetMatchOrders(); matchedOrders != nil { - orderIdsToSend[matchedOrders.TakerOrderId] = true - for _, fill := range matchedOrders.Fills { - orderIdsToSend[fill.MakerOrderId] = true - } - } - // For liquidation matches, we send an update for the maker orders. - if matchedLiquidation := match.GetMatchPerpetualLiquidation(); matchedLiquidation != nil { - for _, fill := range matchedLiquidation.Fills { - orderIdsToSend[fill.MakerOrderId] = true - } - } - } - } - - // Send an update for orders that were proposed. - for _, orderId := range processProposerMatchesEvents.OrderIdsFilledInLastBlock { - orderIdsToSend[orderId] = true - } - - // Send update. - for orderId := range orderIdsToSend { - if _, exists := keeper.MemClob.GetOrder(ctx, orderId); exists { - orderbookUpdate := keeper.MemClob.GetOrderbookUpdatesForOrderUpdate(ctx, orderId) - allUpdates.Append(orderbookUpdate) - } - } - keeper.SendOrderbookUpdates(ctx, allUpdates, false) - } - // 3. Place all stateful order placements included in the last block on the memclob. // Note telemetry is measured outside of the function call because `PlaceStatefulOrdersFromLastBlock` // is called within `PlaceConditionalOrdersTriggeredInLastBlock`. diff --git a/protocol/x/clob/keeper/keeper.go b/protocol/x/clob/keeper/keeper.go index 7d0dd63de2..b15b5d20d9 100644 --- a/protocol/x/clob/keeper/keeper.go +++ b/protocol/x/clob/keeper/keeper.go @@ -256,3 +256,19 @@ func (k Keeper) SendOrderbookUpdates( ctx.ExecMode(), ) } + +// SendOrderbookFillUpdates sends the orderbook fills to the gRPC streaming manager. +func (k Keeper) SendOrderbookFillUpdates( + ctx sdk.Context, + orderbookFills []types.StreamOrderbookFill, +) { + if len(orderbookFills) == 0 { + return + } + k.GetGrpcStreamingManager().SendOrderbookFillUpdates( + ctx, + orderbookFills, + lib.MustConvertIntegerToUint32(ctx.BlockHeight()), + ctx.ExecMode(), + ) +} diff --git a/protocol/x/clob/keeper/order_state.go b/protocol/x/clob/keeper/order_state.go index 3df48b71fe..55a58beaf0 100644 --- a/protocol/x/clob/keeper/order_state.go +++ b/protocol/x/clob/keeper/order_state.go @@ -249,6 +249,19 @@ func (k Keeper) RemoveOrderFillAmount(ctx sdk.Context, orderId types.OrderId) { []byte(types.OrderAmountFilledKeyPrefix), ) memStore.Delete(orderId.ToStateKey()) + + // If grpc stream is on, zero out the fill amount. + if k.GetGrpcStreamingManager().Enabled() { + allUpdates := types.NewOffchainUpdates() + if message, success := off_chain_updates.CreateOrderUpdateMessage( + ctx, + orderId, + 0, // Total filled quantums is zero because it's been pruned from state. + ); success { + allUpdates.AddUpdateMessage(orderId, message) + } + k.SendOrderbookUpdates(ctx, allUpdates, false) + } } // PruneStateFillAmountsForShortTermOrders prunes Short-Term order fill amounts from state that are pruneable @@ -259,27 +272,5 @@ func (k Keeper) PruneStateFillAmountsForShortTermOrders( blockHeight := lib.MustConvertIntegerToUint32(ctx.BlockHeight()) // Prune all fill amounts from state which have a pruneable block height of the current `blockHeight`. - prunedOrderIds := k.PruneOrdersForBlockHeight(ctx, blockHeight) - - // Send an orderbook update for each pruned order for grpc streams. - // This is needed because short term orders are pruned in PrepareCheckState using - // keeper.MemClob.openOrders.blockExpirationsForOrders, which can fall out of sync with state fill amount - // pruning when there's replacement. - // Long-term fix would be to add logic to keep them in sync. - // TODO(CT-722): add logic to keep state fill amount pruning and order pruning in sync. - if k.GetGrpcStreamingManager().Enabled() { - allUpdates := types.NewOffchainUpdates() - for _, orderId := range prunedOrderIds { - if _, exists := k.MemClob.GetOrder(ctx, orderId); exists { - if message, success := off_chain_updates.CreateOrderUpdateMessage( - ctx, - orderId, - 0, // Total filled quantums is zero because it's been pruned from state. - ); success { - allUpdates.AddUpdateMessage(orderId, message) - } - } - } - k.SendOrderbookUpdates(ctx, allUpdates, false) - } + k.PruneOrdersForBlockHeight(ctx, blockHeight) } diff --git a/protocol/x/clob/keeper/process_operations.go b/protocol/x/clob/keeper/process_operations.go index 1d0445a7c9..ecbd0a28a4 100644 --- a/protocol/x/clob/keeper/process_operations.go +++ b/protocol/x/clob/keeper/process_operations.go @@ -19,6 +19,25 @@ import ( satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types" ) +// fetchOrdersInvolvedInOpQueue fetches all OrderIds involved in an operations +// queue's matches + short term order placements and returns them as a set. +func fetchOrdersInvolvedInOpQueue( + operations []types.InternalOperation, +) (orderIdSet map[types.OrderId]struct{}) { + orderIdSet = make(map[types.OrderId]struct{}) + for _, operation := range operations { + if shortTermOrderPlacement := operation.GetShortTermOrderPlacement(); shortTermOrderPlacement != nil { + orderId := shortTermOrderPlacement.GetOrder().OrderId + orderIdSet[orderId] = struct{}{} + } + if clobMatch := operation.GetMatch(); clobMatch != nil { + orderIdSetForClobMatch := clobMatch.GetAllOrderIds() + orderIdSet = lib.MergeMaps(orderIdSet, orderIdSetForClobMatch) + } + } + return orderIdSet +} + // ProcessProposerOperations updates on-chain state given an []OperationRaw operations queue // representing matches that occurred in the previous block. It performs validation on an operations // queue. If all validation passes, the operations queue is written to state. @@ -38,6 +57,26 @@ func (k Keeper) ProcessProposerOperations( return errorsmod.Wrapf(types.ErrInvalidMsgProposedOperations, "Error: %+v", err) } + // If grpc streams are on, send absolute fill amounts from local + proposed opqueue to the grpc stream. + // This must be sent out to account for checkState being discarded and deliverState being used. + if streamingManager := k.GetGrpcStreamingManager(); streamingManager.Enabled() { + localValidatorOperationsQueue, _ := k.MemClob.GetOperationsToReplay(ctx) + orderIdsFromProposed := fetchOrdersInvolvedInOpQueue( + operations, + ) + orderIdsFromLocal := fetchOrdersInvolvedInOpQueue( + localValidatorOperationsQueue, + ) + orderIdSetToUpdate := lib.MergeMaps(orderIdsFromLocal, orderIdsFromProposed) + + allUpdates := types.NewOffchainUpdates() + for orderId := range orderIdSetToUpdate { + orderbookUpdate := k.MemClob.GetOrderbookUpdatesForOrderUpdate(ctx, orderId) + allUpdates.Append(orderbookUpdate) + } + k.SendOrderbookUpdates(ctx, allUpdates, false) + } + log.DebugLog(ctx, "Processing operations queue", log.OperationsQueue, types.GetInternalOperationsQueueTextString(operations)) @@ -459,6 +498,7 @@ func (k Keeper) PersistMatchOrdersToState( } } + makerOrders := make([]types.Order, 0) makerFills := matchOrders.GetFills() for _, makerFill := range makerFills { // Fetch the maker order from either short term orders or state. @@ -472,6 +512,7 @@ func (k Keeper) PersistMatchOrdersToState( MakerOrder: &makerOrder, FillAmount: satypes.BaseQuantums(makerFill.GetFillAmount()), } + makerOrders = append(makerOrders, makerOrder) _, _, _, _, err = k.ProcessSingleMatch(ctx, &matchWithOrders) if err != nil { @@ -515,6 +556,26 @@ func (k Keeper) PersistMatchOrdersToState( ) } + // if GRPC streaming is on, emit a generated clob match to stream. + if streamingManager := k.GetGrpcStreamingManager(); streamingManager.Enabled() { + streamOrderbookFill := k.MemClob.GenerateStreamOrderbookFill( + ctx, + types.ClobMatch{ + Match: &types.ClobMatch_MatchOrders{ + MatchOrders: matchOrders, + }, + }, + &takerOrder, + makerOrders, + ) + k.SendOrderbookFillUpdates( + ctx, + []types.StreamOrderbookFill{ + streamOrderbookFill, + }, + ) + } + return nil } @@ -544,12 +605,14 @@ func (k Keeper) PersistMatchLiquidationToState( return err } + makerOrders := make([]types.Order, 0) for _, fill := range matchLiquidation.GetFills() { // Fetch the maker order from either short term orders or state. makerOrder, err := k.FetchOrderFromOrderId(ctx, fill.MakerOrderId, ordersMap) if err != nil { return err } + makerOrders = append(makerOrders, makerOrder) matchWithOrders := types.MatchWithOrders{ MakerOrder: &makerOrder, @@ -601,6 +664,26 @@ func (k Keeper) PersistMatchLiquidationToState( matchLiquidation.Liquidated, matchLiquidation.PerpetualId, ) + + // if GRPC streaming is on, emit a generated clob match to stream. + if streamingManager := k.GetGrpcStreamingManager(); streamingManager.Enabled() { + streamOrderbookFill := k.MemClob.GenerateStreamOrderbookFill( + ctx, + types.ClobMatch{ + Match: &types.ClobMatch_MatchPerpetualLiquidation{ + MatchPerpetualLiquidation: matchLiquidation, + }, + }, + takerOrder, + makerOrders, + ) + k.SendOrderbookFillUpdates( + ctx, + []types.StreamOrderbookFill{ + streamOrderbookFill, + }, + ) + } return nil } diff --git a/protocol/x/clob/memclob/memclob.go b/protocol/x/clob/memclob/memclob.go index 87c1d3b652..f42e17ba96 100644 --- a/protocol/x/clob/memclob/memclob.go +++ b/protocol/x/clob/memclob/memclob.go @@ -375,7 +375,20 @@ func (m *MemClobPriceTimePriority) mustUpdateMemclobStateWithMatches( } // Add the new matches to the operations queue. - m.operationsToPropose.MustAddMatchToOperationsQueue(takerOrder, makerFillWithOrders) + internalOperation := m.operationsToPropose.MustAddMatchToOperationsQueue(takerOrder, makerFillWithOrders) + // If orderbook updates are on, send an orderbook update with the fill to grpc streams. + if m.generateOrderbookUpdates { + // Collect all maker orders. + makerOrders := lib.MapSlice( + makerFillWithOrders, + func(mfwo types.MakerFillWithOrder) types.Order { + return mfwo.Order + }, + ) + clobMatch := internalOperation.GetMatch() + orderbookMatchFill := m.GenerateStreamOrderbookFill(ctx, *clobMatch, takerOrder, makerOrders) + m.clobKeeper.SendOrderbookFillUpdates(ctx, []types.StreamOrderbookFill{orderbookMatchFill}) + } // Build a slice of all subaccounts which had matches this matching loop, and sort them for determinism. allSubaccounts := lib.GetSortedKeys[satypes.SortedSubaccountIds](subaccountTotalMatchedQuantums) @@ -857,6 +870,20 @@ func (m *MemClobPriceTimePriority) matchOrder( ) offchainUpdates.Append(matchOffchainUpdates) writeCache() + } else { + // If state was not written to, re-send grpc stream updates for all orders + // involved in the match to "reset" fill amounts. + allUpdates := types.NewOffchainUpdates() + if !order.IsLiquidation() { + normalOrder := order.MustGetOrder() + updates := m.GetOrderbookUpdatesForOrderUpdate(ctx, normalOrder.OrderId) + allUpdates.Append(updates) + } + for _, fill := range newMakerFills { + updates := m.GetOrderbookUpdatesForOrderUpdate(ctx, fill.MakerOrderId) + allUpdates.Append(updates) + } + m.clobKeeper.SendOrderbookUpdates(ctx, allUpdates, false) } return takerOrderStatus, offchainUpdates, makerOrdersToRemove, matchingErr @@ -1982,12 +2009,6 @@ func (m *MemClobPriceTimePriority) mustUpdateOrderbookStateWithMatchedMakerOrder panic("Total filled size of maker order greater than the order size") } - // Send an orderbook update for the order's new total filled amount. - if m.generateOrderbookUpdates { - orderbookUpdate := m.GetOrderbookUpdatesForOrderUpdate(ctx, makerOrder.OrderId) - m.clobKeeper.SendOrderbookUpdates(ctx, orderbookUpdate, false) - } - // If the order is fully filled, remove it from the orderbook. // Note we shouldn't remove Short-Term order hashes from `ShortTermOrderTxBytes` here since // the order was matched. diff --git a/protocol/x/clob/memclob/memclob_grpc_streaming.go b/protocol/x/clob/memclob/memclob_grpc_streaming.go index 4bce8ec772..3b2dc19a2e 100644 --- a/protocol/x/clob/memclob/memclob_grpc_streaming.go +++ b/protocol/x/clob/memclob/memclob_grpc_streaming.go @@ -9,6 +9,35 @@ import ( "github.com/dydxprotocol/v4-chain/protocol/x/clob/types" ) +// GenerateStreamOrderbookFill wraps a clob match into the `StreamOrderbookFill` +// data structure which provides prices and fill amounts alongside clob match. +func (m *MemClobPriceTimePriority) GenerateStreamOrderbookFill( + ctx sdk.Context, + clobMatch types.ClobMatch, + takerOrder types.MatchableOrder, + makerOrders []types.Order, +) types.StreamOrderbookFill { + fillAmounts := []uint64{} + + for _, makerOrder := range makerOrders { + fillAmount := m.GetOrderFilledAmount(ctx, makerOrder.OrderId) + fillAmounts = append(fillAmounts, uint64(fillAmount)) + } + // If taker order is not a liquidation order, has to be a regular + // taker order. Add the taker order to the orders array. + if !takerOrder.IsLiquidation() { + order := takerOrder.MustGetOrder() + makerOrders = append(makerOrders, order) + fillAmount := m.GetOrderFilledAmount(ctx, order.OrderId) + fillAmounts = append(fillAmounts, uint64(fillAmount)) + } + return types.StreamOrderbookFill{ + ClobMatch: &clobMatch, + Orders: makerOrders, + FillAmounts: fillAmounts, + } +} + // GetOffchainUpdatesForOrderbookSnapshot returns the offchain updates for the orderbook snapshot. // This is used by the gRPC streaming server to send the orderbook snapshot to the client. func (m *MemClobPriceTimePriority) GetOffchainUpdatesForOrderbookSnapshot( diff --git a/protocol/x/clob/memclob/memclob_purge_invalid_memclob_state_test.go b/protocol/x/clob/memclob/memclob_purge_invalid_memclob_state_test.go index b15df32321..f47af710c5 100644 --- a/protocol/x/clob/memclob/memclob_purge_invalid_memclob_state_test.go +++ b/protocol/x/clob/memclob/memclob_purge_invalid_memclob_state_test.go @@ -253,7 +253,7 @@ func TestPurgeInvalidMemclobState(t *testing.T) { mockMemClobKeeper := &mocks.MemClobKeeper{} memclob.SetClobKeeper(mockMemClobKeeper) mockMemClobKeeper.On("Logger", mock.Anything).Return(log.NewNopLogger()).Maybe() - mockMemClobKeeper.On("SendOrderbookUpdates", mock.Anything, mock.Anything).Return().Maybe() + mockMemClobKeeper.On("SendOrderbookUpdates", mock.Anything, mock.Anything, mock.Anything).Return().Maybe() for _, operation := range tc.placedOperations { switch operation.Operation.(type) { @@ -265,7 +265,7 @@ func TestPurgeInvalidMemclobState(t *testing.T) { false, satypes.BaseQuantums(0), uint32(0), - ).Times(4) + ).Times(5) mockMemClobKeeper.On("AddOrderToOrderbookCollatCheck", mock.Anything, mock.Anything, mock.Anything). Return(true, make(map[satypes.SubaccountId]satypes.UpdateResult)).Once() diff --git a/protocol/x/clob/memclob/memclob_remove_order_test.go b/protocol/x/clob/memclob/memclob_remove_order_test.go index 15c1d88803..affd4452f1 100644 --- a/protocol/x/clob/memclob/memclob_remove_order_test.go +++ b/protocol/x/clob/memclob/memclob_remove_order_test.go @@ -330,7 +330,7 @@ func TestRemoveOrderIfFilled(t *testing.T) { memClobKeeper.On("AddOrderToOrderbookCollatCheck", mock.Anything, mock.Anything, mock.Anything). Return(true, make(map[satypes.SubaccountId]satypes.UpdateResult)) memClobKeeper.On("ValidateSubaccountEquityTierLimitForNewOrder", mock.Anything, mock.Anything).Return(nil) - memClobKeeper.On("SendOrderbookUpdates", mock.Anything, mock.Anything).Return().Maybe() + memClobKeeper.On("SendOrderbookUpdates", mock.Anything, mock.Anything, mock.Anything).Return().Maybe() // Set initial fill amount to `0` for all orders. initialCall := memClobKeeper.On("GetOrderFillAmount", mock.Anything, mock.Anything). diff --git a/protocol/x/clob/types/mem_clob_keeper.go b/protocol/x/clob/types/mem_clob_keeper.go index 48195776ee..0bd6da4604 100644 --- a/protocol/x/clob/types/mem_clob_keeper.go +++ b/protocol/x/clob/types/mem_clob_keeper.go @@ -116,4 +116,8 @@ type MemClobKeeper interface { offchainUpdates *OffchainUpdates, snapshot bool, ) + SendOrderbookFillUpdates( + ctx sdk.Context, + orderbookFills []StreamOrderbookFill, + ) } diff --git a/protocol/x/clob/types/memclob.go b/protocol/x/clob/types/memclob.go index 3196320e39..4938fc915a 100644 --- a/protocol/x/clob/types/memclob.go +++ b/protocol/x/clob/types/memclob.go @@ -149,4 +149,10 @@ type MemClob interface { ctx sdk.Context, orderId OrderId, ) (offchainUpdates *OffchainUpdates) + GenerateStreamOrderbookFill( + ctx sdk.Context, + clobMatch ClobMatch, + takerOrder MatchableOrder, + makerOrders []Order, + ) StreamOrderbookFill } diff --git a/protocol/x/clob/types/message_clob_match.go b/protocol/x/clob/types/message_clob_match.go index 989cb307a6..47593e05ee 100644 --- a/protocol/x/clob/types/message_clob_match.go +++ b/protocol/x/clob/types/message_clob_match.go @@ -22,3 +22,21 @@ func NewClobMatchFromMatchPerpetualLiquidation( }, } } + +// GetAllOrderIds returns a set of orderIds involved in a ClobMatch. +// It assumes the ClobMatch is valid (no duplicate orderIds in fills) +func (clobMatch *ClobMatch) GetAllOrderIds() (orderIds map[OrderId]struct{}) { + orderIds = make(map[OrderId]struct{}) + if matchOrders := clobMatch.GetMatchOrders(); matchOrders != nil { + orderIds[matchOrders.GetTakerOrderId()] = struct{}{} + for _, makerFill := range matchOrders.GetFills() { + orderIds[makerFill.GetMakerOrderId()] = struct{}{} + } + } + if matchOrders := clobMatch.GetMatchPerpetualLiquidation(); matchOrders != nil { + for _, makerFill := range matchOrders.GetFills() { + orderIds[makerFill.GetMakerOrderId()] = struct{}{} + } + } + return orderIds +} diff --git a/protocol/x/clob/types/operations_to_propose.go b/protocol/x/clob/types/operations_to_propose.go index b1ed771bec..e135987178 100644 --- a/protocol/x/clob/types/operations_to_propose.go +++ b/protocol/x/clob/types/operations_to_propose.go @@ -183,7 +183,7 @@ func (o *OperationsToPropose) MustAddStatefulOrderPlacementToOperationsQueue( func (o *OperationsToPropose) MustAddMatchToOperationsQueue( takerMatchableOrder MatchableOrder, makerFillsWithOrders []MakerFillWithOrder, -) { +) InternalOperation { makerFills := lib.MapSlice( makerFillsWithOrders, func(mfwo MakerFillWithOrder) MakerFill { @@ -234,6 +234,7 @@ func (o *OperationsToPropose) MustAddMatchToOperationsQueue( o.OperationsQueue, matchOperation, ) + return matchOperation } // AddZeroFillDeleveragingToOperationsQueue adds a zero-fill deleveraging match operation to the