diff --git a/protocol/app/app.go b/protocol/app/app.go index c637f8d755..9b16079e88 100644 --- a/protocol/app/app.go +++ b/protocol/app/app.go @@ -1010,6 +1010,7 @@ func New( }, app.RevShareKeeper, &app.MarketMapKeeper, + app.FullNodeStreamingManager, ) pricesModule := pricesmodule.NewAppModule( appCodec, diff --git a/protocol/lib/metrics/metric_keys.go b/protocol/lib/metrics/metric_keys.go index 18ef5fab1b..d2208b7a6a 100644 --- a/protocol/lib/metrics/metric_keys.go +++ b/protocol/lib/metrics/metric_keys.go @@ -74,6 +74,7 @@ const ( GrpcSendOrderbookUpdatesLatency = "grpc_send_orderbook_updates_latency" GrpcSendOrderbookSnapshotLatency = "grpc_send_orderbook_snapshot_latency" GrpcSendSubaccountUpdateCount = "grpc_send_subaccount_update_count" + GrpcSendPriceUpdateCount = "grpc_send_price_update_count" GrpcSendOrderbookFillsLatency = "grpc_send_orderbook_fills_latency" GrpcAddUpdateToBufferCount = "grpc_add_update_to_buffer_count" GrpcAddToSubscriptionChannelCount = "grpc_add_to_subscription_channel_count" diff --git a/protocol/streaming/full_node_streaming_manager.go b/protocol/streaming/full_node_streaming_manager.go index 29717366fc..939c7d2979 100644 --- a/protocol/streaming/full_node_streaming_manager.go +++ b/protocol/streaming/full_node_streaming_manager.go @@ -197,7 +197,7 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe( err error, ) { // Perform some basic validation on the request. - if len(clobPairIds) == 0 && len(subaccountIds) == 0 { + if len(clobPairIds) == 0 && len(subaccountIds) == 0 && len(marketIds) == 0 { return types.ErrInvalidStreamingRequest } @@ -493,6 +493,33 @@ func (sm *FullNodeStreamingManagerImpl) SendSubaccountUpdate( ) } +// SendPriceUpdates sends price updates to the subscribers. +func (sm *FullNodeStreamingManagerImpl) SendPriceUpdate( + ctx sdk.Context, + priceUpdate pricestypes.StreamPriceUpdate, +) { + if !lib.IsDeliverTxMode(ctx) { + // If not `DeliverTx`, return since there is no optimistic price updates. + return + } + + metrics.IncrCounter( + metrics.GrpcSendPriceUpdateCount, + 1, + ) + + // If `DeliverTx`, updates should be staged to be streamed after consensus finalizes on a block. + stagedEvent := clobtypes.StagedFinalizeBlockEvent{ + Event: &clobtypes.StagedFinalizeBlockEvent_PriceUpdate{ + PriceUpdate: &priceUpdate, + }, + } + sm.finalizeBlockStager.StageFinalizeBlockEvent( + ctx, + &stagedEvent, + ) +} + // Retrieve all events staged during `FinalizeBlock`. func (sm *FullNodeStreamingManagerImpl) GetStagedFinalizeBlockEvents( ctx sdk.Context, @@ -545,6 +572,14 @@ func (sm *FullNodeStreamingManagerImpl) TracksSubaccountId(subaccountId satypes. return exists } +// TracksMarketId checks if a market id is being tracked by the streaming manager. +func (sm *FullNodeStreamingManagerImpl) TracksMarketId(marketId uint32) bool { + sm.Lock() + defer sm.Unlock() + _, exists := sm.marketIdToSubscriptionIdMapping[marketId] + return exists +} + func getStreamUpdatesFromOffchainUpdates( v1updates []ocutypes.OffChainUpdateV1, blockHeight uint32, @@ -773,6 +808,31 @@ func getStreamUpdatesForSubaccountUpdates( return streamUpdates, subaccountIds } +func getStreamUpdatesForPriceUpdates( + priceUpdates []pricestypes.StreamPriceUpdate, + blockHeight uint32, + execMode sdk.ExecMode, +) ( + streamUpdates []clobtypes.StreamUpdate, + marketIds []uint32, +) { + // Group subaccount updates by subaccount id. + streamUpdates = make([]clobtypes.StreamUpdate, 0) + marketIds = make([]uint32, 0) + for _, priceUpdate := range priceUpdates { + streamUpdate := clobtypes.StreamUpdate{ + UpdateMessage: &clobtypes.StreamUpdate_PriceUpdate{ + PriceUpdate: &priceUpdate, + }, + BlockHeight: blockHeight, + ExecMode: uint32(execMode), + } + streamUpdates = append(streamUpdates, streamUpdate) + marketIds = append(marketIds, priceUpdate.MarketId) + } + return streamUpdates, marketIds +} + // AddOrderUpdatesToCache adds a series of updates to the full node streaming cache. // Clob pair ids are the clob pair id each update is relevant to. func (sm *FullNodeStreamingManagerImpl) AddOrderUpdatesToCache( @@ -976,6 +1036,27 @@ func (sm *FullNodeStreamingManagerImpl) cacheStreamUpdatesBySubaccountWithLock( } } +// cacheStreamUpdatesByMarketIdWithLock adds stream updates to cache, +// and store corresponding market ids. +// This method requires the lock and assumes that the lock has already been +// acquired by the caller. +func (sm *FullNodeStreamingManagerImpl) cacheStreamUpdatesByMarketIdWithLock( + streamUpdates []clobtypes.StreamUpdate, + marketIds []uint32, +) { + if len(streamUpdates) != len(marketIds) { + sm.logger.Error("Mismatch between stream updates and market IDs lengths") + return + } + sm.streamUpdateCache = append(sm.streamUpdateCache, streamUpdates...) + for _, marketId := range marketIds { + sm.streamUpdateSubscriptionCache = append( + sm.streamUpdateSubscriptionCache, + sm.marketIdToSubscriptionIdMapping[marketId], + ) + } +} + // Grpc Streaming logic after consensus agrees on a block. // - Stream all events staged during `FinalizeBlock`. // - Stream orderbook updates to sync fills in local ops queue. @@ -989,7 +1070,8 @@ func (sm *FullNodeStreamingManagerImpl) StreamBatchUpdatesAfterFinalizeBlock( finalizedFills, finalizedSubaccountUpdates, - finalizedOrderbookUpdates := sm.getStagedEventsFromFinalizeBlock(ctx) + finalizedOrderbookUpdates, + finalizedPriceUpdates := sm.getStagedEventsFromFinalizeBlock(ctx) sm.Lock() defer sm.Unlock() @@ -1032,6 +1114,14 @@ func (sm *FullNodeStreamingManagerImpl) StreamBatchUpdatesAfterFinalizeBlock( ) sm.cacheStreamUpdatesBySubaccountWithLock(subaccountStreamUpdates, subaccountIds) + // Finally, cache updates for finalized subaccount updates + priceStreamUpdates, marketIds := getStreamUpdatesForPriceUpdates( + finalizedPriceUpdates, + lib.MustConvertIntegerToUint32(ctx.BlockHeight()), + ctx.ExecMode(), + ) + sm.cacheStreamUpdatesByMarketIdWithLock(priceStreamUpdates, marketIds) + // Emit all stream updates in a single batch. // Note we still have the lock, which is released right before function returns. sm.FlushStreamUpdatesWithLock() @@ -1045,6 +1135,7 @@ func (sm *FullNodeStreamingManagerImpl) getStagedEventsFromFinalizeBlock( finalizedFills []clobtypes.StreamOrderbookFill, finalizedSubaccountUpdates []satypes.StreamSubaccountUpdate, finalizedOrderbookUpdates []clobtypes.StreamOrderbookUpdate, + finalizedPriceUpdates []pricestypes.StreamPriceUpdate, ) { // Get onchain stream events stored in transient store. stagedEvents := sm.GetStagedFinalizeBlockEvents(ctx) @@ -1062,6 +1153,8 @@ func (sm *FullNodeStreamingManagerImpl) getStagedEventsFromFinalizeBlock( finalizedSubaccountUpdates = append(finalizedSubaccountUpdates, *event.SubaccountUpdate) case *clobtypes.StagedFinalizeBlockEvent_OrderbookUpdate: finalizedOrderbookUpdates = append(finalizedOrderbookUpdates, *event.OrderbookUpdate) + case *clobtypes.StagedFinalizeBlockEvent_PriceUpdate: + finalizedPriceUpdates = append(finalizedPriceUpdates, *event.PriceUpdate) default: panic(fmt.Sprintf("Unhandled staged event type: %v\n", stagedEvent.Event)) } @@ -1076,7 +1169,7 @@ func (sm *FullNodeStreamingManagerImpl) getStagedEventsFromFinalizeBlock( float32(len(finalizedFills)), ) - return finalizedFills, finalizedSubaccountUpdates, finalizedOrderbookUpdates + return finalizedFills, finalizedSubaccountUpdates, finalizedOrderbookUpdates, finalizedPriceUpdates } func (sm *FullNodeStreamingManagerImpl) InitializeNewStreams( diff --git a/protocol/streaming/noop_streaming_manager.go b/protocol/streaming/noop_streaming_manager.go index 6d6dd41608..bb81af1b43 100644 --- a/protocol/streaming/noop_streaming_manager.go +++ b/protocol/streaming/noop_streaming_manager.go @@ -54,6 +54,10 @@ func (sm *NoopGrpcStreamingManager) TracksSubaccountId(id satypes.SubaccountId) return false } +func (sm *NoopGrpcStreamingManager) TracksMarketId(id uint32) bool { + return false +} + func (sm *NoopGrpcStreamingManager) GetSubaccountSnapshotsForInitStreams( getSubaccountSnapshot func(subaccountId satypes.SubaccountId) *satypes.StreamSubaccountUpdate, ) map[satypes.SubaccountId]*satypes.StreamSubaccountUpdate { @@ -90,6 +94,12 @@ func (sm *NoopGrpcStreamingManager) SendSubaccountUpdate( ) { } +func (sm *NoopGrpcStreamingManager) SendPriceUpdate( + ctx sdk.Context, + priceUpdate pricestypes.StreamPriceUpdate, +) { +} + func (sm *NoopGrpcStreamingManager) StreamBatchUpdatesAfterFinalizeBlock( ctx sdk.Context, orderBookUpdatesToSyncLocalOpsQueue *clobtypes.OffchainUpdates, diff --git a/protocol/streaming/types/interface.go b/protocol/streaming/types/interface.go index 028352a10b..41657e7301 100644 --- a/protocol/streaming/types/interface.go +++ b/protocol/streaming/types/interface.go @@ -52,10 +52,15 @@ type FullNodeStreamingManager interface { ctx sdk.Context, subaccountUpdate satypes.StreamSubaccountUpdate, ) + SendPriceUpdate( + ctx sdk.Context, + priceUpdate pricestypes.StreamPriceUpdate, + ) GetStagedFinalizeBlockEvents( ctx sdk.Context, ) []clobtypes.StagedFinalizeBlockEvent TracksSubaccountId(id satypes.SubaccountId) bool + TracksMarketId(marketId uint32) bool StreamBatchUpdatesAfterFinalizeBlock( ctx sdk.Context, orderBookUpdatesToSyncLocalOpsQueue *clobtypes.OffchainUpdates, diff --git a/protocol/testutil/keeper/prices.go b/protocol/testutil/keeper/prices.go index 0022631dd1..d19468ecc2 100644 --- a/protocol/testutil/keeper/prices.go +++ b/protocol/testutil/keeper/prices.go @@ -4,6 +4,7 @@ import ( "fmt" "testing" + streaming "github.com/dydxprotocol/v4-chain/protocol/streaming" revsharetypes "github.com/dydxprotocol/v4-chain/protocol/x/revshare/types" "github.com/cosmos/gogoproto/proto" @@ -124,6 +125,7 @@ func createPricesKeeper( }, revShareKeeper, marketMapKeeper, + streaming.NewNoopGrpcStreamingManager(), ) return k, storeKey, indexPriceCache, mockTimeProvider diff --git a/protocol/x/prices/keeper/keeper.go b/protocol/x/prices/keeper/keeper.go index 7d1a31960b..525a43c772 100644 --- a/protocol/x/prices/keeper/keeper.go +++ b/protocol/x/prices/keeper/keeper.go @@ -11,6 +11,7 @@ import ( "github.com/dydxprotocol/v4-chain/protocol/indexer/indexer_manager" "github.com/dydxprotocol/v4-chain/protocol/lib" libtime "github.com/dydxprotocol/v4-chain/protocol/lib/time" + streamingtypes "github.com/dydxprotocol/v4-chain/protocol/streaming/types" "github.com/dydxprotocol/v4-chain/protocol/x/prices/types" ) @@ -24,6 +25,8 @@ type ( authorities map[string]struct{} RevShareKeeper types.RevShareKeeper MarketMapKeeper types.MarketMapKeeper + + streamingManager streamingtypes.FullNodeStreamingManager } ) @@ -38,6 +41,7 @@ func NewKeeper( authorities []string, revShareKeeper types.RevShareKeeper, marketMapKeeper types.MarketMapKeeper, + streamingManager streamingtypes.FullNodeStreamingManager, ) *Keeper { return &Keeper{ cdc: cdc, @@ -48,6 +52,7 @@ func NewKeeper( authorities: lib.UniqueSliceToSet(authorities), RevShareKeeper: revShareKeeper, MarketMapKeeper: marketMapKeeper, + streamingManager: streamingManager, } } @@ -66,3 +71,7 @@ func (k Keeper) HasAuthority(authority string) bool { _, ok := k.authorities[authority] return ok } + +func (k Keeper) GetFullNodeStreamingManager() streamingtypes.FullNodeStreamingManager { + return k.streamingManager +} diff --git a/protocol/x/prices/keeper/market_price.go b/protocol/x/prices/keeper/market_price.go index 78490b0670..7031f89768 100644 --- a/protocol/x/prices/keeper/market_price.go +++ b/protocol/x/prices/keeper/market_price.go @@ -96,6 +96,20 @@ func (k Keeper) UpdateMarketPrices( pricefeedmetrics.GetLabelForMarketId(marketPrice.Id), }, ) + + // If GRPC streaming is on, emit a price update to stream. + if k.GetFullNodeStreamingManager().Enabled() { + if k.GetFullNodeStreamingManager().TracksMarketId(marketPrice.Id) { + k.GetFullNodeStreamingManager().SendPriceUpdate( + ctx, + types.StreamPriceUpdate{ + MarketId: marketPrice.Id, + Price: marketPrice, + Snapshot: false, + }, + ) + } + } } // Generate indexer events.