Skip to content

Commit

Permalink
Orderbook Fills emission (#1448)
Browse files Browse the repository at this point in the history
* new protos

* fix prev use case

* proto lint

* emit stream fills when clob match appended to opqueue

* more explicit function signature

* uint32 -> 64

* deliverTx process opqueue clob match emissions

* bugfixes from feature branch to emit fill amounts correctly

* pr comments

* fix tests

* fixup sample pregenesis json
  • Loading branch information
jonfung-dydx committed May 13, 2024
1 parent f710645 commit ba9e7f3
Show file tree
Hide file tree
Showing 21 changed files with 332 additions and 75 deletions.
12 changes: 12 additions & 0 deletions protocol/lib/collections.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,15 @@ func MergeAllMapsMustHaveDistinctKeys[K comparable, V any](maps ...map[K]V) map[
}
return combinedMap
}

// MergeMaps merges all the maps into a single map.
// Does not require maps to have distinct keys.
func MergeMaps[K comparable, V any](maps ...map[K]V) map[K]V {
combinedMap := make(map[K]V)
for _, m := range maps {
for k, v := range m {
combinedMap[k] = v
}
}
return combinedMap
}
1 change: 1 addition & 0 deletions protocol/lib/metrics/metric_keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ const (
// Full node grpc
FullNodeGrpc = "full_node_grpc"
GrpcSendOrderbookUpdatesLatency = "grpc_send_orderbook_updates_latency"
GrpcSendOrderbookFillsLatency = "grpc_send_orderbook_fills_latency"
EndBlocker = "end_blocker"
EndBlockerLag = "end_blocker_lag"
)
18 changes: 18 additions & 0 deletions protocol/mocks/MemClob.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions protocol/mocks/MemClobKeeper.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 8 additions & 2 deletions protocol/scripts/genesis/sample_pregenesis.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"app_hash": null,
"app_name": "dydxprotocold",
"app_state": {
"assets": {
Expand Down Expand Up @@ -524,6 +525,7 @@
}
}
},
"consensus": null,
"crisis": {
"constant_fee": {
"amount": "1000000000000000000",
Expand Down Expand Up @@ -758,6 +760,7 @@
},
"gov": {
"constitution": "",
"deposit_params": null,
"deposits": [],
"params": {
"burn_proposal_deposit_prevote": false,
Expand Down Expand Up @@ -789,7 +792,9 @@
},
"proposals": [],
"starting_proposal_id": "1",
"votes": []
"tally_params": null,
"votes": [],
"voting_params": null
},
"govplus": {},
"ibc": {
Expand Down Expand Up @@ -854,6 +859,7 @@
"port": "icahost"
}
},
"params": null,
"perpetuals": {
"liquidity_tiers": [
{
Expand Down Expand Up @@ -1782,7 +1788,7 @@
]
}
},
"app_version": "4.0.0-dev0-22-gd31fa077",
"app_version": "4.1.1-dev0-1-gf7106453",
"chain_id": "dydx-sample-1",
"consensus": {
"params": {
Expand Down
65 changes: 65 additions & 0 deletions protocol/streaming/grpc/grpc_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,71 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates(
}
}

// SendOrderbookFillUpdates groups fills by their clob pair ids and
// sends messages to the subscribers.
func (sm *GrpcStreamingManagerImpl) SendOrderbookFillUpdates(
ctx sdk.Context,
orderbookFills []clobtypes.StreamOrderbookFill,
blockHeight uint32,
execMode sdk.ExecMode,
) {
defer metrics.ModuleMeasureSince(
metrics.FullNodeGrpc,
metrics.GrpcSendOrderbookFillsLatency,
time.Now(),
)

// Group fills by clob pair id.
updatesByClobPairId := make(map[uint32][]clobtypes.StreamUpdate)
for _, orderbookFill := range orderbookFills {
// Fetch the clob pair id from the first order in `OrderBookMatchFill`.
// We can assume there must be an order, and that all orders share the same
// clob pair id.
clobPairId := orderbookFill.Orders[0].OrderId.ClobPairId
if _, ok := updatesByClobPairId[clobPairId]; !ok {
updatesByClobPairId[clobPairId] = []clobtypes.StreamUpdate{}
}
streamUpdate := clobtypes.StreamUpdate{
UpdateMessage: &clobtypes.StreamUpdate_OrderFill{
OrderFill: &orderbookFill,
},
}
updatesByClobPairId[clobPairId] = append(updatesByClobPairId[clobPairId], streamUpdate)
}

sm.Lock()
defer sm.Unlock()

// Send updates to subscribers.
idsToRemove := make([]uint32, 0)
for id, subscription := range sm.orderbookSubscriptions {
streamUpdatesForSubscription := make([]clobtypes.StreamUpdate, 0)
for _, clobPairId := range subscription.clobPairIds {
if update, ok := updatesByClobPairId[clobPairId]; ok {
streamUpdatesForSubscription = append(streamUpdatesForSubscription, update...)
}
}

if len(streamUpdatesForSubscription) > 0 {
if err := subscription.srv.Send(
&clobtypes.StreamOrderbookUpdatesResponse{
Updates: streamUpdatesForSubscription,
BlockHeight: blockHeight,
ExecMode: uint32(execMode),
},
); err != nil {
idsToRemove = append(idsToRemove, id)
}
}
}

// Clean up subscriptions that have been closed.
// If a Send update has failed for any clob pair id, the whole subscription will be removed.
for _, id := range idsToRemove {
delete(sm.orderbookSubscriptions, id)
}
}

// GetUninitializedClobPairIds returns the clob pair ids that have not been initialized.
func (sm *GrpcStreamingManagerImpl) GetUninitializedClobPairIds() []uint32 {
sm.Lock()
Expand Down
8 changes: 8 additions & 0 deletions protocol/streaming/grpc/noop_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@ func (sm *NoopGrpcStreamingManager) SendOrderbookUpdates(
) {
}

func (sm *NoopGrpcStreamingManager) SendOrderbookFillUpdates(
ctx sdk.Context,
orderbookFills []clobtypes.StreamOrderbookFill,
blockHeight uint32,
execMode sdk.ExecMode,
) {
}

func (sm *NoopGrpcStreamingManager) GetUninitializedClobPairIds() []uint32 {
return []uint32{}
}
6 changes: 6 additions & 0 deletions protocol/streaming/grpc/types/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,10 @@ type GrpcStreamingManager interface {
blockHeight uint32,
execMode sdk.ExecMode,
)
SendOrderbookFillUpdates(
ctx sdk.Context,
orderbookFills []clobtypes.StreamOrderbookFill,
blockHeight uint32,
execMode sdk.ExecMode,
)
}
6 changes: 6 additions & 0 deletions protocol/testutil/memclob/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,3 +510,9 @@ func (f *FakeMemClobKeeper) SendOrderbookUpdates(
snapshot bool,
) {
}

func (f *FakeMemClobKeeper) SendOrderbookFillUpdates(
ctx sdk.Context,
orderbookFills []types.StreamOrderbookFill,
) {
}
39 changes: 0 additions & 39 deletions protocol/x/clob/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,45 +166,6 @@ func PrepareCheckState(
offchainUpdates,
)

// For orders that are filled in the last block, send an orderbook update to the grpc streams.
if keeper.GetGrpcStreamingManager().Enabled() {
allUpdates := types.NewOffchainUpdates()
orderIdsToSend := make(map[types.OrderId]bool)

// Send an update for reverted local operations.
for _, operation := range localValidatorOperationsQueue {
if match := operation.GetMatch(); match != nil {
// For normal order matches, we send an update for the taker and maker orders.
if matchedOrders := match.GetMatchOrders(); matchedOrders != nil {
orderIdsToSend[matchedOrders.TakerOrderId] = true
for _, fill := range matchedOrders.Fills {
orderIdsToSend[fill.MakerOrderId] = true
}
}
// For liquidation matches, we send an update for the maker orders.
if matchedLiquidation := match.GetMatchPerpetualLiquidation(); matchedLiquidation != nil {
for _, fill := range matchedLiquidation.Fills {
orderIdsToSend[fill.MakerOrderId] = true
}
}
}
}

// Send an update for orders that were proposed.
for _, orderId := range processProposerMatchesEvents.OrderIdsFilledInLastBlock {
orderIdsToSend[orderId] = true
}

// Send update.
for orderId := range orderIdsToSend {
if _, exists := keeper.MemClob.GetOrder(ctx, orderId); exists {
orderbookUpdate := keeper.MemClob.GetOrderbookUpdatesForOrderUpdate(ctx, orderId)
allUpdates.Append(orderbookUpdate)
}
}
keeper.SendOrderbookUpdates(ctx, allUpdates, false)
}

// 3. Place all stateful order placements included in the last block on the memclob.
// Note telemetry is measured outside of the function call because `PlaceStatefulOrdersFromLastBlock`
// is called within `PlaceConditionalOrdersTriggeredInLastBlock`.
Expand Down
16 changes: 16 additions & 0 deletions protocol/x/clob/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,3 +256,19 @@ func (k Keeper) SendOrderbookUpdates(
ctx.ExecMode(),
)
}

// SendOrderbookFillUpdates sends the orderbook fills to the gRPC streaming manager.
func (k Keeper) SendOrderbookFillUpdates(
ctx sdk.Context,
orderbookFills []types.StreamOrderbookFill,
) {
if len(orderbookFills) == 0 {
return
}
k.GetGrpcStreamingManager().SendOrderbookFillUpdates(
ctx,
orderbookFills,
lib.MustConvertIntegerToUint32(ctx.BlockHeight()),
ctx.ExecMode(),
)
}
37 changes: 14 additions & 23 deletions protocol/x/clob/keeper/order_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,19 @@ func (k Keeper) RemoveOrderFillAmount(ctx sdk.Context, orderId types.OrderId) {
[]byte(types.OrderAmountFilledKeyPrefix),
)
memStore.Delete(orderId.ToStateKey())

// If grpc stream is on, zero out the fill amount.
if k.GetGrpcStreamingManager().Enabled() {
allUpdates := types.NewOffchainUpdates()
if message, success := off_chain_updates.CreateOrderUpdateMessage(
ctx,
orderId,
0, // Total filled quantums is zero because it's been pruned from state.
); success {
allUpdates.AddUpdateMessage(orderId, message)
}
k.SendOrderbookUpdates(ctx, allUpdates, false)
}
}

// PruneStateFillAmountsForShortTermOrders prunes Short-Term order fill amounts from state that are pruneable
Expand All @@ -259,27 +272,5 @@ func (k Keeper) PruneStateFillAmountsForShortTermOrders(
blockHeight := lib.MustConvertIntegerToUint32(ctx.BlockHeight())

// Prune all fill amounts from state which have a pruneable block height of the current `blockHeight`.
prunedOrderIds := k.PruneOrdersForBlockHeight(ctx, blockHeight)

// Send an orderbook update for each pruned order for grpc streams.
// This is needed because short term orders are pruned in PrepareCheckState using
// keeper.MemClob.openOrders.blockExpirationsForOrders, which can fall out of sync with state fill amount
// pruning when there's replacement.
// Long-term fix would be to add logic to keep them in sync.
// TODO(CT-722): add logic to keep state fill amount pruning and order pruning in sync.
if k.GetGrpcStreamingManager().Enabled() {
allUpdates := types.NewOffchainUpdates()
for _, orderId := range prunedOrderIds {
if _, exists := k.MemClob.GetOrder(ctx, orderId); exists {
if message, success := off_chain_updates.CreateOrderUpdateMessage(
ctx,
orderId,
0, // Total filled quantums is zero because it's been pruned from state.
); success {
allUpdates.AddUpdateMessage(orderId, message)
}
}
}
k.SendOrderbookUpdates(ctx, allUpdates, false)
}
k.PruneOrdersForBlockHeight(ctx, blockHeight)
}
Loading

0 comments on commit ba9e7f3

Please sign in to comment.