diff --git a/protocol/app/app.go b/protocol/app/app.go index 527db9b43f..f792204355 100644 --- a/protocol/app/app.go +++ b/protocol/app/app.go @@ -3,6 +3,7 @@ package app import ( "context" "encoding/json" + "fmt" "io" "math/big" "net/http" @@ -425,6 +426,9 @@ func New( if app.Server != nil { app.Server.Stop() } + if app.GrpcStreamingManager != nil { + app.GrpcStreamingManager.Stop() + } return nil }, ) @@ -1510,6 +1514,10 @@ func (app *App) EndBlocker(ctx sdk.Context) (sdk.EndBlock, error) { } block := app.IndexerEventManager.ProduceBlock(ctx) app.IndexerEventManager.SendOnchainData(block) + + if app.GrpcStreamingManager.Enabled() { + app.GrpcStreamingManager.FlushStreamUpdates(uint32(ctx.BlockHeight()), ctx.ExecMode()) + } return response, err } @@ -1527,6 +1535,10 @@ func (app *App) PrepareCheckStater(ctx sdk.Context) { if err := app.ModuleManager.PrepareCheckState(ctx); err != nil { panic(err) } + + if app.GrpcStreamingManager.Enabled() { + app.GrpcStreamingManager.FlushStreamUpdates(uint32(ctx.BlockHeight()), ctx.ExecMode()) + } } // InitChainer application update at chain initialization. @@ -1765,8 +1777,14 @@ func getGrpcStreamingManagerFromOptions( logger log.Logger, ) (manager streamingtypes.GrpcStreamingManager) { if appFlags.GrpcStreamingEnabled { - logger.Info("GRPC streaming is enabled") - return streaming.NewGrpcStreamingManager() + flushIntervalMs := uint32(appFlags.GrpcStreamingFlushIntervalMs) + logger.Info( + fmt.Sprintf( + "GRPC streaming is enabled with flush interval %+v", + flushIntervalMs, + ), + ) + return streaming.NewGrpcStreamingManager(uint32(appFlags.GrpcStreamingFlushIntervalMs)) } return streaming.NewNoopGrpcStreamingManager() } diff --git a/protocol/app/flags/flags.go b/protocol/app/flags/flags.go index a900e81b40..329ab7260e 100644 --- a/protocol/app/flags/flags.go +++ b/protocol/app/flags/flags.go @@ -21,7 +21,8 @@ type Flags struct { GrpcEnable bool // Grpc Streaming - GrpcStreamingEnabled bool + GrpcStreamingEnabled bool + GrpcStreamingFlushIntervalMs uint16 } // List of CLI flags. @@ -36,7 +37,8 @@ const ( GrpcEnable = "grpc.enable" // Grpc Streaming - GrpcStreamingEnabled = "grpc-streaming-enabled" + GrpcStreamingEnabled = "grpc-streaming-enabled" + GrpcStreamingFlushIntervalMs = "grpc-streaming-flush-interval-ms" ) // Default values. @@ -46,7 +48,8 @@ const ( DefaultNonValidatingFullNode = false DefaultDdErrorTrackingFormat = false - DefaultGrpcStreamingEnabled = false + DefaultGrpcStreamingEnabled = false + DefaultGrpcStreamingFlushIntervalMs = 10 ) // AddFlagsToCmd adds flags to app initialization. @@ -80,6 +83,11 @@ func AddFlagsToCmd(cmd *cobra.Command) { DefaultGrpcStreamingEnabled, "Whether to enable grpc streaming for full nodes", ) + cmd.Flags().Uint16( + GrpcStreamingFlushIntervalMs, + DefaultGrpcStreamingFlushIntervalMs, + "Interval on which to flush grpc stream updates", + ) } // Validate checks that the flags are valid. @@ -114,7 +122,8 @@ func GetFlagValuesFromOptions( GrpcAddress: config.DefaultGRPCAddress, GrpcEnable: true, - GrpcStreamingEnabled: DefaultGrpcStreamingEnabled, + GrpcStreamingEnabled: DefaultGrpcStreamingEnabled, + GrpcStreamingFlushIntervalMs: DefaultGrpcStreamingFlushIntervalMs, } // Populate the flags if they exist. @@ -160,5 +169,11 @@ func GetFlagValuesFromOptions( } } + if option := appOpts.Get(GrpcStreamingFlushIntervalMs); option != nil { + if v, err := cast.ToUint16E(option); err == nil { + result.GrpcStreamingFlushIntervalMs = v + } + } + return result } diff --git a/protocol/app/flags/flags_test.go b/protocol/app/flags/flags_test.go index dd6f12db85..20896fc4d5 100644 --- a/protocol/app/flags/flags_test.go +++ b/protocol/app/flags/flags_test.go @@ -32,6 +32,9 @@ func TestAddFlagsToCommand(t *testing.T) { fmt.Sprintf("Has %s flag", flags.GrpcStreamingEnabled): { flagName: flags.GrpcStreamingEnabled, }, + fmt.Sprintf("Has %s flag", flags.GrpcStreamingFlushIntervalMs): { + flagName: flags.GrpcStreamingFlushIntervalMs, + }, } for name, tc := range tests { @@ -63,9 +66,10 @@ func TestValidate(t *testing.T) { }, "success - gRPC streaming enabled for validating nodes": { flags: flags.Flags{ - NonValidatingFullNode: false, - GrpcEnable: true, - GrpcStreamingEnabled: true, + NonValidatingFullNode: false, + GrpcEnable: true, + GrpcStreamingEnabled: true, + GrpcStreamingFlushIntervalMs: 15, }, }, "failure - gRPC disabled": { @@ -107,6 +111,7 @@ func TestGetFlagValuesFromOptions(t *testing.T) { expectedGrpcAddress string expectedGrpcEnable bool expectedGrpcStreamingEnable bool + expectedGrpcStreamingFlushMs uint16 }{ "Sets to default if unset": { expectedNonValidatingFullNodeFlag: false, @@ -115,15 +120,17 @@ func TestGetFlagValuesFromOptions(t *testing.T) { expectedGrpcAddress: "localhost:9090", expectedGrpcEnable: true, expectedGrpcStreamingEnable: false, + expectedGrpcStreamingFlushMs: 10, }, "Sets values from options": { optsMap: map[string]any{ - flags.NonValidatingFullNodeFlag: true, - flags.DdAgentHost: "agentHostTest", - flags.DdTraceAgentPort: uint16(777), - flags.GrpcEnable: false, - flags.GrpcAddress: "localhost:9091", - flags.GrpcStreamingEnabled: "true", + flags.NonValidatingFullNodeFlag: true, + flags.DdAgentHost: "agentHostTest", + flags.DdTraceAgentPort: uint16(777), + flags.GrpcEnable: false, + flags.GrpcAddress: "localhost:9091", + flags.GrpcStreamingEnabled: "true", + flags.GrpcStreamingFlushIntervalMs: "15", }, expectedNonValidatingFullNodeFlag: true, expectedDdAgentHost: "agentHostTest", @@ -131,6 +138,7 @@ func TestGetFlagValuesFromOptions(t *testing.T) { expectedGrpcEnable: false, expectedGrpcAddress: "localhost:9091", expectedGrpcStreamingEnable: true, + expectedGrpcStreamingFlushMs: 15, }, } @@ -168,6 +176,21 @@ func TestGetFlagValuesFromOptions(t *testing.T) { tc.expectedGrpcAddress, flags.GrpcAddress, ) + require.Equal( + t, + tc.expectedGrpcAddress, + flags.GrpcAddress, + ) + require.Equal( + t, + tc.expectedGrpcStreamingEnable, + flags.GrpcStreamingEnabled, + ) + require.Equal( + t, + tc.expectedGrpcStreamingFlushMs, + flags.GrpcStreamingFlushIntervalMs, + ) }) } } diff --git a/protocol/lib/metrics/metric_keys.go b/protocol/lib/metrics/metric_keys.go index 09607d76e0..385d3f122a 100644 --- a/protocol/lib/metrics/metric_keys.go +++ b/protocol/lib/metrics/metric_keys.go @@ -60,6 +60,7 @@ const ( FullNodeGrpc = "full_node_grpc" GrpcSendOrderbookUpdatesLatency = "grpc_send_orderbook_updates_latency" GrpcSendOrderbookFillsLatency = "grpc_send_orderbook_fills_latency" + GrpcFlushStreamUpdatesLatency = "grpc_flush_stream_updates_latency" EndBlocker = "end_blocker" EndBlockerLag = "end_blocker_lag" ) diff --git a/protocol/streaming/grpc/grpc_streaming_manager.go b/protocol/streaming/grpc/grpc_streaming_manager.go index a8ce6ad1f2..01b527ba5f 100644 --- a/protocol/streaming/grpc/grpc_streaming_manager.go +++ b/protocol/streaming/grpc/grpc_streaming_manager.go @@ -22,6 +22,12 @@ type GrpcStreamingManagerImpl struct { // orderbookSubscriptions maps subscription IDs to their respective orderbook subscriptions. orderbookSubscriptions map[uint32]*OrderbookSubscription nextSubscriptionId uint32 + + // grpc stream will batch and flush out messages every 100 ms. + ticker *time.Ticker + done chan bool + // map of clob pair id to stream updates. + streamUpdateCache map[uint32][]clobtypes.StreamUpdate } // OrderbookSubscription represents a active subscription to the orderbook updates stream. @@ -36,16 +42,40 @@ type OrderbookSubscription struct { srv clobtypes.Query_StreamOrderbookUpdatesServer } -func NewGrpcStreamingManager() *GrpcStreamingManagerImpl { - return &GrpcStreamingManagerImpl{ +func NewGrpcStreamingManager(flushIntervalMs uint32) *GrpcStreamingManagerImpl { + grpcStreamingManager := &GrpcStreamingManagerImpl{ orderbookSubscriptions: make(map[uint32]*OrderbookSubscription), + nextSubscriptionId: 0, + + ticker: time.NewTicker(time.Duration(flushIntervalMs) * time.Millisecond), + done: make(chan bool), + streamUpdateCache: make(map[uint32][]clobtypes.StreamUpdate), } + + // Start the goroutine for pushing order updates through + go func() { + for { + select { + case <-grpcStreamingManager.ticker.C: + // fix this with values + grpcStreamingManager.FlushStreamUpdates(0, 0) + case <-grpcStreamingManager.done: + return + } + } + }() + + return grpcStreamingManager } func (sm *GrpcStreamingManagerImpl) Enabled() bool { return true } +func (sm *GrpcStreamingManagerImpl) Stop() { + sm.done <- true +} + // Subscribe subscribes to the orderbook updates stream. func (sm *GrpcStreamingManagerImpl) Subscribe( req clobtypes.StreamOrderbookUpdatesRequest, @@ -87,6 +117,8 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates( metrics.GrpcSendOrderbookUpdatesLatency, time.Now(), ) + sm.Lock() + defer sm.Unlock() // Group updates by clob pair id. updates := make(map[uint32]*clobtypes.OffchainUpdates) @@ -99,14 +131,33 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates( } // Unmarshal messages to v1 updates. - v1updates := make(map[uint32][]ocutypes.OffChainUpdateV1) for clobPairId, update := range updates { v1update, err := GetOffchainUpdatesV1(update) if err != nil { panic(err) } - v1updates[clobPairId] = v1update + streamUpdates := clobtypes.StreamUpdate{ + UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{ + OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{ + Updates: v1update, + Snapshot: snapshot, + }, + }, + } + sm.streamUpdateCache[clobPairId] = append(sm.streamUpdateCache[clobPairId], streamUpdates) } +} + +// FlushStreamUpdates flushes the update cache and sends them out to subscribers. +func (sm *GrpcStreamingManagerImpl) FlushStreamUpdates( + blockHeight uint32, + execMode sdk.ExecMode, +) { + defer metrics.ModuleMeasureSince( + metrics.FullNodeGrpc, + metrics.GrpcFlushStreamUpdatesLatency, + time.Now(), + ) sm.Lock() defer sm.Unlock() @@ -114,25 +165,17 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates( // Send updates to subscribers. idsToRemove := make([]uint32, 0) for id, subscription := range sm.orderbookSubscriptions { - updatesToSend := make([]ocutypes.OffChainUpdateV1, 0) + updatesToSend := make([]clobtypes.StreamUpdate, 0) for _, clobPairId := range subscription.clobPairIds { - if updates, ok := v1updates[clobPairId]; ok { + if updates, ok := sm.streamUpdateCache[clobPairId]; ok { updatesToSend = append(updatesToSend, updates...) } } if len(updatesToSend) > 0 { - streamUpdates := clobtypes.StreamUpdate{ - UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{ - OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{ - Updates: updatesToSend, - Snapshot: snapshot, - }, - }, - } if err := subscription.srv.Send( &clobtypes.StreamOrderbookUpdatesResponse{ - Updates: []clobtypes.StreamUpdate{streamUpdates}, + Updates: updatesToSend, BlockHeight: blockHeight, ExecMode: uint32(execMode), }, @@ -147,6 +190,8 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates( for _, id := range idsToRemove { delete(sm.orderbookSubscriptions, id) } + + sm.streamUpdateCache = make(map[uint32][]clobtypes.StreamUpdate) } // SendOrderbookFillUpdates groups fills by their clob pair ids and @@ -162,6 +207,8 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookFillUpdates( metrics.GrpcSendOrderbookFillsLatency, time.Now(), ) + sm.Lock() + defer sm.Unlock() // Group fills by clob pair id. updatesByClobPairId := make(map[uint32][]clobtypes.StreamUpdate) @@ -178,39 +225,7 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookFillUpdates( OrderFill: &orderbookFill, }, } - updatesByClobPairId[clobPairId] = append(updatesByClobPairId[clobPairId], streamUpdate) - } - - sm.Lock() - defer sm.Unlock() - - // Send updates to subscribers. - idsToRemove := make([]uint32, 0) - for id, subscription := range sm.orderbookSubscriptions { - streamUpdatesForSubscription := make([]clobtypes.StreamUpdate, 0) - for _, clobPairId := range subscription.clobPairIds { - if update, ok := updatesByClobPairId[clobPairId]; ok { - streamUpdatesForSubscription = append(streamUpdatesForSubscription, update...) - } - } - - if len(streamUpdatesForSubscription) > 0 { - if err := subscription.srv.Send( - &clobtypes.StreamOrderbookUpdatesResponse{ - Updates: streamUpdatesForSubscription, - BlockHeight: blockHeight, - ExecMode: uint32(execMode), - }, - ); err != nil { - idsToRemove = append(idsToRemove, id) - } - } - } - - // Clean up subscriptions that have been closed. - // If a Send update has failed for any clob pair id, the whole subscription will be removed. - for _, id := range idsToRemove { - delete(sm.orderbookSubscriptions, id) + sm.streamUpdateCache[clobPairId] = append(sm.streamUpdateCache[clobPairId], streamUpdate) } } diff --git a/protocol/streaming/grpc/noop_streaming_manager.go b/protocol/streaming/grpc/noop_streaming_manager.go index 424871b4c3..615fdf0c4d 100644 --- a/protocol/streaming/grpc/noop_streaming_manager.go +++ b/protocol/streaming/grpc/noop_streaming_manager.go @@ -46,3 +46,12 @@ func (sm *NoopGrpcStreamingManager) SendOrderbookFillUpdates( func (sm *NoopGrpcStreamingManager) GetUninitializedClobPairIds() []uint32 { return []uint32{} } + +func (sm *NoopGrpcStreamingManager) Stop() { +} + +func (sm *NoopGrpcStreamingManager) FlushStreamUpdates( + blockHeight uint32, + execMode sdk.ExecMode, +) { +} diff --git a/protocol/streaming/grpc/types/manager.go b/protocol/streaming/grpc/types/manager.go index 9b5af0c093..0cd16162ba 100644 --- a/protocol/streaming/grpc/types/manager.go +++ b/protocol/streaming/grpc/types/manager.go @@ -28,4 +28,9 @@ type GrpcStreamingManager interface { blockHeight uint32, execMode sdk.ExecMode, ) + Stop() + FlushStreamUpdates( + blockHeight uint32, + execMode sdk.ExecMode, + ) } diff --git a/protocol/x/clob/keeper/process_operations.go b/protocol/x/clob/keeper/process_operations.go index ecbd0a28a4..03f57841da 100644 --- a/protocol/x/clob/keeper/process_operations.go +++ b/protocol/x/clob/keeper/process_operations.go @@ -135,6 +135,10 @@ func (k Keeper) ProcessProposerOperations( operationsStats := types.StatMsgProposedOperations(rawOperations) operationsStats.EmitStats(metrics.DeliverTx) + if sm := k.GetGrpcStreamingManager(); sm.Enabled() { + sm.FlushStreamUpdates(uint32(ctx.BlockHeight()), ctx.ExecMode()) + } + return nil }