From cae35bcb0cb4b9b608e6d441f03945242e92ba6d Mon Sep 17 00:00:00 2001 From: Jonathan Fung Date: Fri, 10 May 2024 16:37:34 -0400 Subject: [PATCH] [HACK] get some form of block height working --- protocol/app/app.go | 1 + protocol/streaming/grpc/grpc_streaming_manager.go | 13 ++++++++++++- protocol/streaming/grpc/noop_streaming_manager.go | 5 +++++ protocol/streaming/grpc/types/manager.go | 3 +++ 4 files changed, 21 insertions(+), 1 deletion(-) diff --git a/protocol/app/app.go b/protocol/app/app.go index f792204355..2554b28d77 100644 --- a/protocol/app/app.go +++ b/protocol/app/app.go @@ -1486,6 +1486,7 @@ func (app *App) PreBlocker(ctx sdk.Context, _ *abci.RequestFinalizeBlock) (*sdk. // BeginBlocker application updates every begin block func (app *App) BeginBlocker(ctx sdk.Context) (sdk.BeginBlock, error) { ctx = ctx.WithExecMode(lib.ExecModeBeginBlock) + app.GrpcStreamingManager.SetBlockHeight(uint32(ctx.BlockHeight())) // Update the proposer address in the logger for the panic logging middleware. proposerAddr := sdk.ConsAddress(ctx.BlockHeader().ProposerAddress) diff --git a/protocol/streaming/grpc/grpc_streaming_manager.go b/protocol/streaming/grpc/grpc_streaming_manager.go index 01b527ba5f..c9fd3ed56a 100644 --- a/protocol/streaming/grpc/grpc_streaming_manager.go +++ b/protocol/streaming/grpc/grpc_streaming_manager.go @@ -28,6 +28,8 @@ type GrpcStreamingManagerImpl struct { done chan bool // map of clob pair id to stream updates. streamUpdateCache map[uint32][]clobtypes.StreamUpdate + + blockHeight uint32 } // OrderbookSubscription represents a active subscription to the orderbook updates stream. @@ -58,7 +60,7 @@ func NewGrpcStreamingManager(flushIntervalMs uint32) *GrpcStreamingManagerImpl { select { case <-grpcStreamingManager.ticker.C: // fix this with values - grpcStreamingManager.FlushStreamUpdates(0, 0) + grpcStreamingManager.internalFlushStreamUpdates() case <-grpcStreamingManager.done: return } @@ -76,6 +78,15 @@ func (sm *GrpcStreamingManagerImpl) Stop() { sm.done <- true } +func (sm *GrpcStreamingManagerImpl) internalFlushStreamUpdates() { + // temporary + sm.FlushStreamUpdates(sm.blockHeight, 0) +} + +func (sm *GrpcStreamingManagerImpl) SetBlockHeight(blockHeight uint32) { + sm.blockHeight = blockHeight +} + // Subscribe subscribes to the orderbook updates stream. func (sm *GrpcStreamingManagerImpl) Subscribe( req clobtypes.StreamOrderbookUpdatesRequest, diff --git a/protocol/streaming/grpc/noop_streaming_manager.go b/protocol/streaming/grpc/noop_streaming_manager.go index 615fdf0c4d..1bb30dd77f 100644 --- a/protocol/streaming/grpc/noop_streaming_manager.go +++ b/protocol/streaming/grpc/noop_streaming_manager.go @@ -50,6 +50,11 @@ func (sm *NoopGrpcStreamingManager) GetUninitializedClobPairIds() []uint32 { func (sm *NoopGrpcStreamingManager) Stop() { } +func (sm *NoopGrpcStreamingManager) SetBlockHeight( + blockHeight uint32, +) { +} + func (sm *NoopGrpcStreamingManager) FlushStreamUpdates( blockHeight uint32, execMode sdk.ExecMode, diff --git a/protocol/streaming/grpc/types/manager.go b/protocol/streaming/grpc/types/manager.go index 0cd16162ba..dffe5e59e1 100644 --- a/protocol/streaming/grpc/types/manager.go +++ b/protocol/streaming/grpc/types/manager.go @@ -29,6 +29,9 @@ type GrpcStreamingManager interface { execMode sdk.ExecMode, ) Stop() + SetBlockHeight( + blockHeight uint32, + ) FlushStreamUpdates( blockHeight uint32, execMode sdk.ExecMode,