Skip to content

Commit

Permalink
[HACK] get some form of block height working
Browse files Browse the repository at this point in the history
  • Loading branch information
jonfung-dydx committed May 13, 2024
1 parent f50ea03 commit cae35bc
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 1 deletion.
1 change: 1 addition & 0 deletions protocol/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -1486,6 +1486,7 @@ func (app *App) PreBlocker(ctx sdk.Context, _ *abci.RequestFinalizeBlock) (*sdk.
// BeginBlocker application updates every begin block
func (app *App) BeginBlocker(ctx sdk.Context) (sdk.BeginBlock, error) {
ctx = ctx.WithExecMode(lib.ExecModeBeginBlock)
app.GrpcStreamingManager.SetBlockHeight(uint32(ctx.BlockHeight()))

// Update the proposer address in the logger for the panic logging middleware.
proposerAddr := sdk.ConsAddress(ctx.BlockHeader().ProposerAddress)
Expand Down
13 changes: 12 additions & 1 deletion protocol/streaming/grpc/grpc_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ type GrpcStreamingManagerImpl struct {
done chan bool
// map of clob pair id to stream updates.
streamUpdateCache map[uint32][]clobtypes.StreamUpdate

blockHeight uint32
}

// OrderbookSubscription represents a active subscription to the orderbook updates stream.
Expand Down Expand Up @@ -58,7 +60,7 @@ func NewGrpcStreamingManager(flushIntervalMs uint32) *GrpcStreamingManagerImpl {
select {
case <-grpcStreamingManager.ticker.C:
// fix this with values
grpcStreamingManager.FlushStreamUpdates(0, 0)
grpcStreamingManager.internalFlushStreamUpdates()
case <-grpcStreamingManager.done:
return
}
Expand All @@ -76,6 +78,15 @@ func (sm *GrpcStreamingManagerImpl) Stop() {
sm.done <- true
}

func (sm *GrpcStreamingManagerImpl) internalFlushStreamUpdates() {
// temporary
sm.FlushStreamUpdates(sm.blockHeight, 0)
}

func (sm *GrpcStreamingManagerImpl) SetBlockHeight(blockHeight uint32) {
sm.blockHeight = blockHeight
}

// Subscribe subscribes to the orderbook updates stream.
func (sm *GrpcStreamingManagerImpl) Subscribe(
req clobtypes.StreamOrderbookUpdatesRequest,
Expand Down
5 changes: 5 additions & 0 deletions protocol/streaming/grpc/noop_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ func (sm *NoopGrpcStreamingManager) GetUninitializedClobPairIds() []uint32 {
func (sm *NoopGrpcStreamingManager) Stop() {
}

func (sm *NoopGrpcStreamingManager) SetBlockHeight(
blockHeight uint32,
) {
}

func (sm *NoopGrpcStreamingManager) FlushStreamUpdates(
blockHeight uint32,
execMode sdk.ExecMode,
Expand Down
3 changes: 3 additions & 0 deletions protocol/streaming/grpc/types/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ type GrpcStreamingManager interface {
execMode sdk.ExecMode,
)
Stop()
SetBlockHeight(
blockHeight uint32,
)
FlushStreamUpdates(
blockHeight uint32,
execMode sdk.ExecMode,
Expand Down

0 comments on commit cae35bc

Please sign in to comment.