Skip to content

Commit

Permalink
test reusing subscription ids
Browse files Browse the repository at this point in the history
  • Loading branch information
jonfung-dydx committed Oct 18, 2024
1 parent 9bd4b19 commit d8ac9cf
Showing 1 changed file with 26 additions and 6 deletions.
32 changes: 26 additions & 6 deletions protocol/streaming/full_node_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type FullNodeStreamingManagerImpl struct {

// orderbookSubscriptions maps subscription IDs to their respective orderbook subscriptions.
orderbookSubscriptions map[uint32]*OrderbookSubscription
nextSubscriptionId uint32
activeSubscriptionIds map[uint32]bool

// stream will batch and flush out messages every 10 ms.
ticker *time.Ticker
Expand Down Expand Up @@ -106,7 +106,7 @@ func NewFullNodeStreamingManager(
fullNodeStreamingManager := &FullNodeStreamingManagerImpl{
logger: logger,
orderbookSubscriptions: make(map[uint32]*OrderbookSubscription),
nextSubscriptionId: 0,
activeSubscriptionIds: make(map[uint32]bool),

ticker: time.NewTicker(time.Duration(flushIntervalMs) * time.Millisecond),
done: make(chan bool),
Expand Down Expand Up @@ -170,6 +170,18 @@ func (sm *FullNodeStreamingManagerImpl) EmitMetrics() {
}
}

// getNextAvailableSubscriptionId returns next available subscription id. Assumes the
// lock has been acquired.
func (sm *FullNodeStreamingManagerImpl) getNextAvailableSubscriptionId() uint32 {
sm.logger.Info(fmt.Sprintf("getting next sub id, actives: %+v", sm.activeSubscriptionIds))
id := uint32(0)
for _, inUse := sm.activeSubscriptionIds[id]; inUse; _, inUse = sm.activeSubscriptionIds[id] {
id = id + uint32(1)
}
sm.logger.Info(fmt.Sprintf("done getting next sub id, id: %+v", id))
return id
}

// Subscribe subscribes to the orderbook updates stream.
func (sm *FullNodeStreamingManagerImpl) Subscribe(
clobPairIds []uint32,
Expand All @@ -188,8 +200,11 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe(
for i, subaccountId := range subaccountIds {
sIds[i] = *subaccountId
}

subscriptionId := sm.getNextAvailableSubscriptionId()

subscription := &OrderbookSubscription{
subscriptionId: sm.nextSubscriptionId,
subscriptionId: subscriptionId,
initialized: &atomic.Bool{}, // False by default.
clobPairIds: clobPairIds,
subaccountIds: sIds,
Expand All @@ -204,7 +219,7 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe(
}
sm.clobPairIdToSubscriptionIdMapping[clobPairId] = append(
sm.clobPairIdToSubscriptionIdMapping[clobPairId],
sm.nextSubscriptionId,
subscription.subscriptionId,
)
}
for _, subaccountId := range sIds {
Expand All @@ -215,7 +230,7 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe(
}
sm.subaccountIdToSubscriptionIdMapping[subaccountId] = append(
sm.subaccountIdToSubscriptionIdMapping[subaccountId],
sm.nextSubscriptionId,
subscription.subscriptionId,
)
}

Expand All @@ -228,7 +243,8 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe(
),
)
sm.orderbookSubscriptions[subscription.subscriptionId] = subscription
sm.nextSubscriptionId++
sm.activeSubscriptionIds[subscription.subscriptionId] = true
sm.logger.Info(fmt.Sprintf("updated active map: %+v", sm.activeSubscriptionIds))
sm.EmitMetrics()
sm.Unlock()

Expand Down Expand Up @@ -280,6 +296,10 @@ func (sm *FullNodeStreamingManagerImpl) removeSubscription(
}
close(subscription.updatesChannel)
delete(sm.orderbookSubscriptions, subscriptionIdToRemove)
delete(sm.activeSubscriptionIds, subscriptionIdToRemove)
sm.logger.Info(
fmt.Sprintf("Removed subscription id %+v updated map %+v", subscriptionIdToRemove, sm.activeSubscriptionIds),
)

// Iterate over the clobPairIdToSubscriptionIdMapping to remove the subscriptionIdToRemove
for pairId, subscriptionIds := range sm.clobPairIdToSubscriptionIdMapping {
Expand Down

0 comments on commit d8ac9cf

Please sign in to comment.