Skip to content

Commit

Permalink
flush updates before snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
jayy04 committed Jun 20, 2024
1 parent 6d2d529 commit d25bb9e
Showing 1 changed file with 16 additions and 7 deletions.
23 changes: 16 additions & 7 deletions protocol/streaming/grpc/grpc_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,9 +205,9 @@ func (sm *GrpcStreamingManagerImpl) Stop() {
sm.done <- true
}

// SendSnapshot groups updates by their clob pair ids and
// sends messages to the subscribers. It groups out updates differently
// and bypasses the buffer.
// SendSnapshot sends messages to a particular subscriber without buffering.
// Note this method requires the lock and assumes that the lock has already been
// acquired by the caller.
func (sm *GrpcStreamingManagerImpl) SendSnapshot(
offchainUpdates *clobtypes.OffchainUpdates,
subscriptionId uint32,
Expand Down Expand Up @@ -387,17 +387,22 @@ func (sm *GrpcStreamingManagerImpl) AddUpdatesToCache(
sm.EmitMetrics()
}

// FlushStreamUpdates takes in a map of clob pair id to stream updates and emits them to subscribers.
func (sm *GrpcStreamingManagerImpl) FlushStreamUpdates() {
sm.Lock()
defer sm.Unlock()
sm.FlushStreamUpdatesWithLock()
}

// FlushStreamUpdatesWithLock takes in a map of clob pair id to stream updates and emits them to subscribers.
// Note this method requires the lock and assumes that the lock has already been
// acquired by the caller.
func (sm *GrpcStreamingManagerImpl) FlushStreamUpdatesWithLock() {
defer metrics.ModuleMeasureSince(
metrics.FullNodeGrpc,
metrics.GrpcFlushUpdatesLatency,
time.Now(),
)

sm.Lock()
defer sm.Unlock()

// Non-blocking send updates through subscriber's buffered channel.
// If the buffer is full, drop the subscription.
idsToRemove := make([]uint32, 0)
Expand Down Expand Up @@ -446,6 +451,10 @@ func (sm *GrpcStreamingManagerImpl) InitializeNewGrpcStreams(
sm.Lock()
defer sm.Unlock()

// Flush any pending updates before sending the snapshot to avoid
// race conditions with the snapshot.
sm.FlushStreamUpdatesWithLock()

updatesByClobPairId := make(map[uint32]*clobtypes.OffchainUpdates)
for subscriptionId, subscription := range sm.orderbookSubscriptions {
subscription.initialize.Do(
Expand Down

0 comments on commit d25bb9e

Please sign in to comment.