Skip to content

Commit

Permalink
batch updates + flush out changes at EndBlocker and PrepareCheckState…
Browse files Browse the repository at this point in the history
… and DeliverTx
  • Loading branch information
jonfung-dydx committed May 13, 2024
1 parent ba9e7f3 commit f50ea03
Show file tree
Hide file tree
Showing 8 changed files with 153 additions and 63 deletions.
22 changes: 20 additions & 2 deletions protocol/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package app
import (
"context"
"encoding/json"
"fmt"
"io"
"math/big"
"net/http"
Expand Down Expand Up @@ -425,6 +426,9 @@ func New(
if app.Server != nil {
app.Server.Stop()
}
if app.GrpcStreamingManager != nil {
app.GrpcStreamingManager.Stop()
}
return nil
},
)
Expand Down Expand Up @@ -1510,6 +1514,10 @@ func (app *App) EndBlocker(ctx sdk.Context) (sdk.EndBlock, error) {
}
block := app.IndexerEventManager.ProduceBlock(ctx)
app.IndexerEventManager.SendOnchainData(block)

if app.GrpcStreamingManager.Enabled() {
app.GrpcStreamingManager.FlushStreamUpdates(uint32(ctx.BlockHeight()), ctx.ExecMode())
}
return response, err
}

Expand All @@ -1527,6 +1535,10 @@ func (app *App) PrepareCheckStater(ctx sdk.Context) {
if err := app.ModuleManager.PrepareCheckState(ctx); err != nil {
panic(err)
}

if app.GrpcStreamingManager.Enabled() {
app.GrpcStreamingManager.FlushStreamUpdates(uint32(ctx.BlockHeight()), ctx.ExecMode())
}
}

// InitChainer application update at chain initialization.
Expand Down Expand Up @@ -1765,8 +1777,14 @@ func getGrpcStreamingManagerFromOptions(
logger log.Logger,
) (manager streamingtypes.GrpcStreamingManager) {
if appFlags.GrpcStreamingEnabled {
logger.Info("GRPC streaming is enabled")
return streaming.NewGrpcStreamingManager()
flushIntervalMs := uint32(appFlags.GrpcStreamingFlushIntervalMs)
logger.Info(
fmt.Sprintf(
"GRPC streaming is enabled with flush interval %+v",
flushIntervalMs,
),
)
return streaming.NewGrpcStreamingManager(uint32(appFlags.GrpcStreamingFlushIntervalMs))
}
return streaming.NewNoopGrpcStreamingManager()
}
23 changes: 19 additions & 4 deletions protocol/app/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ type Flags struct {
GrpcEnable bool

// Grpc Streaming
GrpcStreamingEnabled bool
GrpcStreamingEnabled bool
GrpcStreamingFlushIntervalMs uint16
}

// List of CLI flags.
Expand All @@ -36,7 +37,8 @@ const (
GrpcEnable = "grpc.enable"

// Grpc Streaming
GrpcStreamingEnabled = "grpc-streaming-enabled"
GrpcStreamingEnabled = "grpc-streaming-enabled"
GrpcStreamingFlushIntervalMs = "grpc-streaming-flush-interval-ms"
)

// Default values.
Expand All @@ -46,7 +48,8 @@ const (
DefaultNonValidatingFullNode = false
DefaultDdErrorTrackingFormat = false

DefaultGrpcStreamingEnabled = false
DefaultGrpcStreamingEnabled = false
DefaultGrpcStreamingFlushIntervalMs = 10
)

// AddFlagsToCmd adds flags to app initialization.
Expand Down Expand Up @@ -80,6 +83,11 @@ func AddFlagsToCmd(cmd *cobra.Command) {
DefaultGrpcStreamingEnabled,
"Whether to enable grpc streaming for full nodes",
)
cmd.Flags().Uint16(
GrpcStreamingFlushIntervalMs,
DefaultGrpcStreamingFlushIntervalMs,
"Interval on which to flush grpc stream updates",
)
}

// Validate checks that the flags are valid.
Expand Down Expand Up @@ -114,7 +122,8 @@ func GetFlagValuesFromOptions(
GrpcAddress: config.DefaultGRPCAddress,
GrpcEnable: true,

GrpcStreamingEnabled: DefaultGrpcStreamingEnabled,
GrpcStreamingEnabled: DefaultGrpcStreamingEnabled,
GrpcStreamingFlushIntervalMs: DefaultGrpcStreamingFlushIntervalMs,
}

// Populate the flags if they exist.
Expand Down Expand Up @@ -160,5 +169,11 @@ func GetFlagValuesFromOptions(
}
}

if option := appOpts.Get(GrpcStreamingFlushIntervalMs); option != nil {
if v, err := cast.ToUint16E(option); err == nil {
result.GrpcStreamingFlushIntervalMs = v
}
}

return result
}
41 changes: 32 additions & 9 deletions protocol/app/flags/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ func TestAddFlagsToCommand(t *testing.T) {
fmt.Sprintf("Has %s flag", flags.GrpcStreamingEnabled): {
flagName: flags.GrpcStreamingEnabled,
},
fmt.Sprintf("Has %s flag", flags.GrpcStreamingFlushIntervalMs): {
flagName: flags.GrpcStreamingFlushIntervalMs,
},
}

for name, tc := range tests {
Expand Down Expand Up @@ -63,9 +66,10 @@ func TestValidate(t *testing.T) {
},
"success - gRPC streaming enabled for validating nodes": {
flags: flags.Flags{
NonValidatingFullNode: false,
GrpcEnable: true,
GrpcStreamingEnabled: true,
NonValidatingFullNode: false,
GrpcEnable: true,
GrpcStreamingEnabled: true,
GrpcStreamingFlushIntervalMs: 15,
},
},
"failure - gRPC disabled": {
Expand Down Expand Up @@ -107,6 +111,7 @@ func TestGetFlagValuesFromOptions(t *testing.T) {
expectedGrpcAddress string
expectedGrpcEnable bool
expectedGrpcStreamingEnable bool
expectedGrpcStreamingFlushMs uint16
}{
"Sets to default if unset": {
expectedNonValidatingFullNodeFlag: false,
Expand All @@ -115,22 +120,25 @@ func TestGetFlagValuesFromOptions(t *testing.T) {
expectedGrpcAddress: "localhost:9090",
expectedGrpcEnable: true,
expectedGrpcStreamingEnable: false,
expectedGrpcStreamingFlushMs: 10,
},
"Sets values from options": {
optsMap: map[string]any{
flags.NonValidatingFullNodeFlag: true,
flags.DdAgentHost: "agentHostTest",
flags.DdTraceAgentPort: uint16(777),
flags.GrpcEnable: false,
flags.GrpcAddress: "localhost:9091",
flags.GrpcStreamingEnabled: "true",
flags.NonValidatingFullNodeFlag: true,
flags.DdAgentHost: "agentHostTest",
flags.DdTraceAgentPort: uint16(777),
flags.GrpcEnable: false,
flags.GrpcAddress: "localhost:9091",
flags.GrpcStreamingEnabled: "true",
flags.GrpcStreamingFlushIntervalMs: "15",
},
expectedNonValidatingFullNodeFlag: true,
expectedDdAgentHost: "agentHostTest",
expectedDdTraceAgentPort: 777,
expectedGrpcEnable: false,
expectedGrpcAddress: "localhost:9091",
expectedGrpcStreamingEnable: true,
expectedGrpcStreamingFlushMs: 15,
},
}

Expand Down Expand Up @@ -168,6 +176,21 @@ func TestGetFlagValuesFromOptions(t *testing.T) {
tc.expectedGrpcAddress,
flags.GrpcAddress,
)
require.Equal(
t,
tc.expectedGrpcAddress,
flags.GrpcAddress,
)
require.Equal(
t,
tc.expectedGrpcStreamingEnable,
flags.GrpcStreamingEnabled,
)
require.Equal(
t,
tc.expectedGrpcStreamingFlushMs,
flags.GrpcStreamingFlushIntervalMs,
)
})
}
}
1 change: 1 addition & 0 deletions protocol/lib/metrics/metric_keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ const (
FullNodeGrpc = "full_node_grpc"
GrpcSendOrderbookUpdatesLatency = "grpc_send_orderbook_updates_latency"
GrpcSendOrderbookFillsLatency = "grpc_send_orderbook_fills_latency"
GrpcFlushStreamUpdatesLatency = "grpc_flush_stream_updates_latency"
EndBlocker = "end_blocker"
EndBlockerLag = "end_blocker_lag"
)
Loading

0 comments on commit f50ea03

Please sign in to comment.