Skip to content

Commit

Permalink
FNS polish - metrics, max msg size, default flag values (#2517)
Browse files Browse the repository at this point in the history
  • Loading branch information
jonfung-dydx committed Oct 18, 2024
1 parent b357592 commit ca6870d
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 18 deletions.
4 changes: 2 additions & 2 deletions protocol/app/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ const (

DefaultGrpcStreamingEnabled = false
DefaultGrpcStreamingFlushIntervalMs = 50
DefaultGrpcStreamingMaxBatchSize = 2000
DefaultGrpcStreamingMaxChannelBufferSize = 2000
DefaultGrpcStreamingMaxBatchSize = 10000
DefaultGrpcStreamingMaxChannelBufferSize = 10000
DefaultWebsocketStreamingEnabled = false
DefaultWebsocketStreamingPort = 9092
DefaultFullNodeStreamingSnapshotInterval = 0
Expand Down
16 changes: 8 additions & 8 deletions protocol/app/flags/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ func TestValidate(t *testing.T) {
GrpcEnable: true,
GrpcStreamingEnabled: true,
GrpcStreamingFlushIntervalMs: 100,
GrpcStreamingMaxBatchSize: 2000,
GrpcStreamingMaxChannelBufferSize: 2000,
GrpcStreamingMaxBatchSize: 10000,
GrpcStreamingMaxChannelBufferSize: 10000,
WebsocketStreamingEnabled: false,
},
},
Expand All @@ -100,8 +100,8 @@ func TestValidate(t *testing.T) {
GrpcEnable: true,
GrpcStreamingEnabled: true,
GrpcStreamingFlushIntervalMs: 100,
GrpcStreamingMaxBatchSize: 2000,
GrpcStreamingMaxChannelBufferSize: 2000,
GrpcStreamingMaxBatchSize: 10000,
GrpcStreamingMaxChannelBufferSize: 10000,
WebsocketStreamingEnabled: true,
WebsocketStreamingPort: 8989,
},
Expand All @@ -119,9 +119,9 @@ func TestValidate(t *testing.T) {
GrpcEnable: true,
GrpcStreamingEnabled: true,
OptimisticExecutionEnabled: true,
GrpcStreamingMaxBatchSize: 2000,
GrpcStreamingMaxBatchSize: 10000,
GrpcStreamingFlushIntervalMs: 100,
GrpcStreamingMaxChannelBufferSize: 2000,
GrpcStreamingMaxChannelBufferSize: 10000,
WebsocketStreamingPort: 8989,
},
},
Expand Down Expand Up @@ -257,8 +257,8 @@ func TestGetFlagValuesFromOptions(t *testing.T) {
expectedGrpcEnable: true,
expectedGrpcStreamingEnable: false,
expectedGrpcStreamingFlushMs: 50,
expectedGrpcStreamingBatchSize: 2000,
expectedGrpcStreamingMaxChannelBufferSize: 2000,
expectedGrpcStreamingBatchSize: 10000,
expectedGrpcStreamingMaxChannelBufferSize: 10000,
expectedWebsocketEnabled: false,
expectedWebsocketPort: 9092,
expectedFullNodeStreamingSnapshotInterval: 0,
Expand Down
4 changes: 2 additions & 2 deletions protocol/lib/metrics/metric_keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,8 @@ const (
FullNodeGrpc = "full_node_grpc"
GrpcSendOrderbookUpdatesLatency = "grpc_send_orderbook_updates_latency"
GrpcSendOrderbookSnapshotLatency = "grpc_send_orderbook_snapshot_latency"
GrpcSendSubaccountSnapshotLatency = "grpc_send_subaccount_snapshot_latency"
GrpcSendSubaccountUpdateCount = "grpc_send_subaccount_update_count"
GrpcSendOrderbookFillsLatency = "grpc_send_orderbook_fills_latency"
GrpcSendFinalizedSubaccountUpdatesLatency = "grpc_send_finalized_subaccount_updates_latency"
GrpcAddUpdateToBufferCount = "grpc_add_update_to_buffer_count"
GrpcAddToSubscriptionChannelCount = "grpc_add_to_subscription_channel_count"
GrpcSendResponseToSubscriberCount = "grpc_send_response_to_subscriber_count"
Expand All @@ -82,6 +81,7 @@ const (
GrpcStagedAllFinalizeBlockUpdatesCount = "grpc_staged_all_finalize_block_updates_count"
GrpcStagedFillFinalizeBlockUpdatesCount = "grpc_staged_finalize_block_fill_updates_count"
GrpcStagedSubaccountFinalizeBlockUpdatesCount = "grpc_staged_finalize_block_subaccount_updates_count"
SubscriptionId = "subscription_id"

EndBlocker = "end_blocker"
EndBlockerLag = "end_blocker_lag"
Expand Down
31 changes: 25 additions & 6 deletions protocol/streaming/full_node_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (sm *FullNodeStreamingManagerImpl) Enabled() bool {
}

func (sm *FullNodeStreamingManagerImpl) EmitMetrics() {
metrics.SetGauge(
metrics.AddSample(
metrics.GrpcStreamNumUpdatesBuffered,
float32(len(sm.streamUpdateCache)),
)
Expand All @@ -162,9 +162,10 @@ func (sm *FullNodeStreamingManagerImpl) EmitMetrics() {
float32(len(sm.orderbookSubscriptions)),
)
for _, subscription := range sm.orderbookSubscriptions {
metrics.AddSample(
metrics.AddSampleWithLabels(
metrics.GrpcSubscriptionChannelLength,
float32(len(subscription.updatesChannel)),
metrics.GetLabelForIntValue(metrics.SubscriptionId, int(subscription.subscriptionId)),
)
}
}
Expand Down Expand Up @@ -234,9 +235,10 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe(
// Use current goroutine to consistently poll subscription channel for updates
// to send through stream.
for updates := range subscription.updatesChannel {
metrics.IncrCounter(
metrics.IncrCounterWithLabels(
metrics.GrpcSendResponseToSubscriberCount,
1,
metrics.GetLabelForIntValue(metrics.SubscriptionId, int(subscription.subscriptionId)),
)
err = subscription.messageSender.Send(
&clobtypes.StreamOrderbookUpdatesResponse{
Expand Down Expand Up @@ -372,9 +374,17 @@ func (sm *FullNodeStreamingManagerImpl) sendStreamUpdates(
return
}

metrics.IncrCounterWithLabels(
metrics.GrpcAddToSubscriptionChannelCount,
1,
metrics.GetLabelForIntValue(metrics.SubscriptionId, int(subscriptionId)),
)

select {
case subscription.updatesChannel <- streamUpdates:
default:
// Buffer is full. Emit metric and drop subscription.
sm.EmitMetrics()
sm.logger.Error(
fmt.Sprintf(
"Streaming subscription id %+v channel full capacity. Dropping subscription connection.",
Expand All @@ -399,6 +409,11 @@ func (sm *FullNodeStreamingManagerImpl) SendSubaccountUpdate(
return
}

metrics.IncrCounter(
metrics.GrpcSendSubaccountUpdateCount,
1,
)

// If `DeliverTx`, updates should be staged to be streamed after consensus finalizes on a block.
stagedEvent := clobtypes.StagedFinalizeBlockEvent{
Event: &clobtypes.StagedFinalizeBlockEvent_SubaccountUpdate{
Expand Down Expand Up @@ -705,9 +720,9 @@ func (sm *FullNodeStreamingManagerImpl) AddOrderUpdatesToCache(

sm.cacheStreamUpdatesByClobPairWithLock(updates, clobPairIds)

sm.EmitMetrics()
// Remove all subscriptions and wipe the buffer if buffer overflows.
sm.RemoveSubscriptionsAndClearBufferIfFull()
sm.EmitMetrics()
}

// AddSubaccountUpdatesToCache adds a series of updates to the full node streaming cache.
Expand All @@ -726,8 +741,8 @@ func (sm *FullNodeStreamingManagerImpl) AddSubaccountUpdatesToCache(

sm.cacheStreamUpdatesBySubaccountWithLock(updates, subaccountIds)

sm.RemoveSubscriptionsAndClearBufferIfFull()
sm.EmitMetrics()
sm.RemoveSubscriptionsAndClearBufferIfFull()
}

// RemoveSubscriptionsAndClearBufferIfFull removes all subscriptions and wipes the buffer if buffer overflows.
Expand All @@ -743,6 +758,7 @@ func (sm *FullNodeStreamingManagerImpl) RemoveSubscriptionsAndClearBufferIfFull(
}
sm.streamUpdateCache = nil
sm.streamUpdateSubscriptionCache = nil
sm.EmitMetrics()
}
}

Expand Down Expand Up @@ -778,13 +794,16 @@ func (sm *FullNodeStreamingManagerImpl) FlushStreamUpdatesWithLock() {
// If the buffer is full, drop the subscription.
for id, updates := range subscriptionUpdates {
if subscription, ok := sm.orderbookSubscriptions[id]; ok {
metrics.IncrCounter(
metrics.IncrCounterWithLabels(
metrics.GrpcAddToSubscriptionChannelCount,
1,
metrics.GetLabelForIntValue(metrics.SubscriptionId, int(id)),
)
select {
case subscription.updatesChannel <- updates:
default:
// Buffer is full. Emit metric and drop subscription.
sm.EmitMetrics()
idsToRemove = append(idsToRemove, id)
}
}
Expand Down
3 changes: 3 additions & 0 deletions protocol/streaming/ws/websocket_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ func (ws *WebsocketServer) Handler(w http.ResponseWriter, r *http.Request) {
}
defer conn.Close()

// Set ws max message size to 10 mb.
conn.SetReadLimit(10 * 1024 * 1024)

// Parse clobPairIds from query parameters
clobPairIds, err := parseClobPairIds(r)
if err != nil {
Expand Down

0 comments on commit ca6870d

Please sign in to comment.