Skip to content

Commit

Permalink
[CT-946] only send snapshots to uninitialized streams (backport #1738) (
Browse files Browse the repository at this point in the history
#1745)

Co-authored-by: jayy04 <[email protected]>
  • Loading branch information
mergify[bot] and jayy04 authored Jun 21, 2024
1 parent e154776 commit a3ea114
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 88 deletions.
142 changes: 70 additions & 72 deletions protocol/streaming/grpc/grpc_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/gogoproto/proto"
ocutypes "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types"
"github.com/dydxprotocol/v4-chain/protocol/lib"
"github.com/dydxprotocol/v4-chain/protocol/lib/metrics"
"github.com/dydxprotocol/v4-chain/protocol/streaming/grpc/types"
clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types"
Expand Down Expand Up @@ -206,11 +205,12 @@ func (sm *GrpcStreamingManagerImpl) Stop() {
sm.done <- true
}

// SendSnapshot groups updates by their clob pair ids and
// sends messages to the subscribers. It groups out updates differently
// and bypasses the buffer.
// SendSnapshot sends messages to a particular subscriber without buffering.
// Note this method requires the lock and assumes that the lock has already been
// acquired by the caller.
func (sm *GrpcStreamingManagerImpl) SendSnapshot(
offchainUpdates *clobtypes.OffchainUpdates,
subscriptionId uint32,
blockHeight uint32,
execMode sdk.ExecMode,
) {
Expand All @@ -220,74 +220,56 @@ func (sm *GrpcStreamingManagerImpl) SendSnapshot(
time.Now(),
)

// Group updates by clob pair id.
updates := make(map[uint32]*clobtypes.OffchainUpdates)
for _, message := range offchainUpdates.Messages {
clobPairId := message.OrderId.ClobPairId
if _, ok := updates[clobPairId]; !ok {
updates[clobPairId] = clobtypes.NewOffchainUpdates()
}
updates[clobPairId].Messages = append(updates[clobPairId].Messages, message)
v1updates, err := GetOffchainUpdatesV1(offchainUpdates)
if err != nil {
panic(err)
}

// Unmarshal each per-clob pair message to v1 updates.
updatesByClobPairId := make(map[uint32][]ocutypes.OffChainUpdateV1)
for clobPairId, update := range updates {
v1updates, err := GetOffchainUpdatesV1(update)
if err != nil {
panic(err)
}
updatesByClobPairId[clobPairId] = v1updates
}

sm.Lock()
defer sm.Unlock()

idsToRemove := make([]uint32, 0)
for id, subscription := range sm.orderbookSubscriptions {
// Consolidate orderbook updates into a single `StreamUpdate`.
v1updates := make([]ocutypes.OffChainUpdateV1, 0)
for _, clobPairId := range subscription.clobPairIds {
if update, ok := updatesByClobPairId[clobPairId]; ok {
v1updates = append(v1updates, update...)
}
removeSubscription := false
if len(v1updates) > 0 {
subscription, ok := sm.orderbookSubscriptions[subscriptionId]
if !ok {
sm.logger.Error(
fmt.Sprintf(
"GRPC Streaming subscription id %+v not found. This should not happen.",
subscriptionId,
),
)
return
}

if len(v1updates) > 0 {
streamUpdates := []clobtypes.StreamUpdate{
{
UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{
OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{
Updates: v1updates,
Snapshot: true,
},
streamUpdates := []clobtypes.StreamUpdate{
{
UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{
OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{
Updates: v1updates,
Snapshot: true,
},
BlockHeight: blockHeight,
ExecMode: uint32(execMode),
},
}
metrics.IncrCounter(
metrics.GrpcAddToSubscriptionChannelCount,
1,
BlockHeight: blockHeight,
ExecMode: uint32(execMode),
},
}
metrics.IncrCounter(
metrics.GrpcAddToSubscriptionChannelCount,
1,
)
select {
case subscription.updatesChannel <- streamUpdates:
default:
sm.logger.Error(
fmt.Sprintf(
"GRPC Streaming subscription id %+v channel full capacity. Dropping subscription connection.",
subscriptionId,
),
)
select {
case subscription.updatesChannel <- streamUpdates:
default:
sm.logger.Error(
fmt.Sprintf(
"GRPC Streaming subscription id %+v channel full capacity. Dropping subscription connection.",
id,
),
)
idsToRemove = append(idsToRemove, subscription.subscriptionId)
}
removeSubscription = true
}
}

// Clean up subscriptions that have been closed.
// If a Send update has failed for any clob pair id, the whole subscription will be removed.
for _, id := range idsToRemove {
sm.removeSubscription(id)
if removeSubscription {
sm.removeSubscription(subscriptionId)
}
}

Expand Down Expand Up @@ -405,17 +387,22 @@ func (sm *GrpcStreamingManagerImpl) AddUpdatesToCache(
sm.EmitMetrics()
}

// FlushStreamUpdates takes in a map of clob pair id to stream updates and emits them to subscribers.
func (sm *GrpcStreamingManagerImpl) FlushStreamUpdates() {
sm.Lock()
defer sm.Unlock()
sm.FlushStreamUpdatesWithLock()
}

// FlushStreamUpdatesWithLock takes in a map of clob pair id to stream updates and emits them to subscribers.
// Note this method requires the lock and assumes that the lock has already been
// acquired by the caller.
func (sm *GrpcStreamingManagerImpl) FlushStreamUpdatesWithLock() {
defer metrics.ModuleMeasureSince(
metrics.FullNodeGrpc,
metrics.GrpcFlushUpdatesLatency,
time.Now(),
)

sm.Lock()
defer sm.Unlock()

// Non-blocking send updates through subscriber's buffered channel.
// If the buffer is full, drop the subscription.
idsToRemove := make([]uint32, 0)
Expand Down Expand Up @@ -456,23 +443,34 @@ func (sm *GrpcStreamingManagerImpl) FlushStreamUpdates() {
sm.EmitMetrics()
}

// GetUninitializedClobPairIds returns the clob pair ids that have not been initialized.
func (sm *GrpcStreamingManagerImpl) GetUninitializedClobPairIds() []uint32 {
func (sm *GrpcStreamingManagerImpl) InitializeNewGrpcStreams(
getOrderbookSnapshot func(clobPairId clobtypes.ClobPairId) *clobtypes.OffchainUpdates,
blockHeight uint32,
execMode sdk.ExecMode,
) {
sm.Lock()
defer sm.Unlock()

clobPairIds := make(map[uint32]bool)
for _, subscription := range sm.orderbookSubscriptions {
// Flush any pending updates before sending the snapshot to avoid
// race conditions with the snapshot.
sm.FlushStreamUpdatesWithLock()

updatesByClobPairId := make(map[uint32]*clobtypes.OffchainUpdates)
for subscriptionId, subscription := range sm.orderbookSubscriptions {
subscription.initialize.Do(
func() {
allUpdates := clobtypes.NewOffchainUpdates()
for _, clobPairId := range subscription.clobPairIds {
clobPairIds[clobPairId] = true
if _, ok := updatesByClobPairId[clobPairId]; !ok {
updatesByClobPairId[clobPairId] = getOrderbookSnapshot(clobtypes.ClobPairId(clobPairId))
}
allUpdates.Append(updatesByClobPairId[clobPairId])
}

sm.SendSnapshot(allUpdates, subscriptionId, blockHeight, execMode)
},
)
}

return lib.GetSortedKeys[lib.Sortable[uint32]](clobPairIds)
}

// GetOffchainUpdatesV1 unmarshals messages in offchain updates to OffchainUpdateV1.
Expand Down
8 changes: 6 additions & 2 deletions protocol/streaming/grpc/noop_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func (sm *NoopGrpcStreamingManager) Subscribe(

func (sm *NoopGrpcStreamingManager) SendSnapshot(
updates *clobtypes.OffchainUpdates,
subscriptionId uint32,
blockHeight uint32,
execMode sdk.ExecMode,
) {
Expand All @@ -49,8 +50,11 @@ func (sm *NoopGrpcStreamingManager) SendOrderbookFillUpdates(
) {
}

func (sm *NoopGrpcStreamingManager) GetUninitializedClobPairIds() []uint32 {
return []uint32{}
func (sm *NoopGrpcStreamingManager) InitializeNewGrpcStreams(
getOrderbookSnapshot func(clobPairId clobtypes.ClobPairId) *clobtypes.OffchainUpdates,
blockHeight uint32,
execMode sdk.ExecMode,
) {
}

func (sm *NoopGrpcStreamingManager) Stop() {
Expand Down
7 changes: 6 additions & 1 deletion protocol/streaming/grpc/types/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,14 @@ type GrpcStreamingManager interface {
) (
err error,
)
GetUninitializedClobPairIds() []uint32
InitializeNewGrpcStreams(
getOrderbookSnapshot func(clobPairId clobtypes.ClobPairId) *clobtypes.OffchainUpdates,
blockHeight uint32,
execMode sdk.ExecMode,
)
SendSnapshot(
offchainUpdates *clobtypes.OffchainUpdates,
subscriptionId uint32,
blockHeight uint32,
execMode sdk.ExecMode,
)
Expand Down
20 changes: 7 additions & 13 deletions protocol/x/clob/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,20 +260,14 @@ func (k *Keeper) SetAnteHandler(anteHandler sdk.AnteHandler) {
// by sending the corresponding orderbook snapshots.
func (k Keeper) InitializeNewGrpcStreams(ctx sdk.Context) {
streamingManager := k.GetGrpcStreamingManager()
allUpdates := types.NewOffchainUpdates()

uninitializedClobPairIds := streamingManager.GetUninitializedClobPairIds()
for _, clobPairId := range uninitializedClobPairIds {
update := k.MemClob.GetOffchainUpdatesForOrderbookSnapshot(
ctx,
types.ClobPairId(clobPairId),
)

allUpdates.Append(update)
}

streamingManager.SendSnapshot(
allUpdates,
streamingManager.InitializeNewGrpcStreams(
func(clobPairId types.ClobPairId) *types.OffchainUpdates {
return k.MemClob.GetOffchainUpdatesForOrderbookSnapshot(
ctx,
clobPairId,
)
},
lib.MustConvertIntegerToUint32(ctx.BlockHeight()),
ctx.ExecMode(),
)
Expand Down

0 comments on commit a3ea114

Please sign in to comment.