Skip to content

Commit

Permalink
sends only the number of requests comming from workers to debug monitor
Browse files Browse the repository at this point in the history
  • Loading branch information
kjezek committed Sep 24, 2024
1 parent 5c737f7 commit 0f55025
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 6 deletions.
17 changes: 12 additions & 5 deletions go/database/mpt/io/parallel_visit.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func visitAllWithConfig(
type response struct {
node mpt.Node
err error
main bool // response comes from the main thread
}
responses := map[mpt.NodeId]response{}
responsesMutex := sync.Mutex{}
Expand All @@ -113,7 +114,7 @@ func visitAllWithConfig(

requests.Add(request{nil, root})

prefetchNext := func(source nodeSource) {
prefetchNext := func(source nodeSource, main bool) {
// get the next job
requestsMutex.Lock()
req, present := requests.Pop()
Expand All @@ -128,7 +129,7 @@ func visitAllWithConfig(
node, err := source.get(req.id)

responsesMutex.Lock()
responses[req.id] = response{node, err}
responses[req.id] = response{node, err, main}
responsesMutex.Unlock()

// if there was a fetch error, stop the workers
Expand Down Expand Up @@ -221,7 +222,7 @@ func visitAllWithConfig(

// Do the actual prefetching in parallel.
for i := 0; i < batchSize; i++ {
prefetchNext(source)
prefetchNext(source, false)
}
}
}(i)
Expand Down Expand Up @@ -263,7 +264,13 @@ func visitAllWithConfig(
responsesMutex.Lock()
for {
if config.monitor != nil {
config.monitor(len(responses))
var numResponses int
for _, res := range responses {
if !res.main {
numResponses++
}
}
config.monitor(numResponses)
}
found := false
res, found = responses[cur]
Expand All @@ -275,7 +282,7 @@ func visitAllWithConfig(
// If the node is not yet available, join the workers
// in loading the next node.
responsesMutex.Unlock()
prefetchNext(source)
prefetchNext(source, true)
responsesMutex.Lock()
}
}
Expand Down
2 changes: 1 addition & 1 deletion go/database/mpt/io/parallel_visit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func TestVisit_CanHandleSlowConsumer(t *testing.T) {
// throttling, the number of workers, the batch size, and the
// structure of the trie. The limit used here is a conservative
// upper bound which would get exceeded by a factor of 10 if the
// workers would not be throttled.
// workers were not throttled.
if got, limit := numResponses, 200; got > limit {
t.Errorf("expected at most %d responses, got %d", limit, got)
}
Expand Down

0 comments on commit 0f55025

Please sign in to comment.