Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

etcd: fix dereferencing issue in commit queue causing contention and change design to be more scalable #5513

Merged
merged 3 commits into from
Aug 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/release-notes/release-notes-0.14.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ you.
* [Optimized payment sequence generation](https://github.com/lightningnetwork/lnd/pull/5514/)
to make LNDs payment throughput (and latency) with better when using etcd.

* [More robust commit queue design](https://github.com/lightningnetwork/lnd/pull/5513)
to make it less likely that we retry etcd transactions and make the commit
queue more scalable.

## Performance improvements

* [Update MC store in blocks](https://github.com/lightningnetwork/lnd/pull/5515)
Expand All @@ -119,6 +123,7 @@ currnet DNS seeds when in SigNet
mode](https://github.com/lightningnetwork/lnd/pull/5564).

# Contributors (Alphabetical Order)
* Andras Banki-Horvath
* ErikEk
* Martin Habovstiak
* Zero-1729
Expand Down
162 changes: 112 additions & 50 deletions kvdb/etcd/commit_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,11 @@
package etcd

import (
"container/list"
"context"
"sync"
)

// commitQueueSize is the maximum number of commits we let to queue up. All
// remaining commits will block on commitQueue.Add().
const commitQueueSize = 100

// commitQueue is a simple execution queue to manage conflicts for transactions
// and thereby reduce the number of times conflicting transactions need to be
// retried. When a new transaction is added to the queue, we first upgrade the
Expand All @@ -25,9 +22,18 @@ type commitQueue struct {
readerMap map[string]int
writerMap map[string]int

commitMutex sync.RWMutex
queue chan (func())
wg sync.WaitGroup
queue *list.List
queueMx sync.Mutex
queueCond *sync.Cond

shutdown chan struct{}
}

type commitQueueTxn struct {
commitLoop func()
blocked bool
rset []string
wset []string
}

// NewCommitQueue creates a new commit queue, with the passed abort context.
Expand All @@ -36,32 +42,37 @@ func NewCommitQueue(ctx context.Context) *commitQueue {
ctx: ctx,
readerMap: make(map[string]int),
writerMap: make(map[string]int),
queue: make(chan func(), commitQueueSize),
queue: list.New(),
shutdown: make(chan struct{}),
}
q.queueCond = sync.NewCond(&q.queueMx)

// Start the queue consumer loop.
q.wg.Add(1)
go q.mainLoop()

return q
}

// Wait waits for the queue to stop (after the queue context has been canceled).
func (c *commitQueue) Wait() {
c.wg.Wait()
// Stop signals the queue to stop after the queue context has been canceled and
// waits until the has stopped.
func (c *commitQueue) Stop() {
// Signal the queue's condition variable to ensure the mainLoop reliably
// unblocks to check for the exit condition.
c.queueCond.Signal()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the past we've found that at times this initial signal gets sort of "swallowed" causing us to add a loop around it like in this areas:

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we also need to cancel the main context, or send on some quit channel here?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I wasn't aware of this. I suggested removing the loop in a previous review cycle 😅

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to look up any references about this, but didn't succeed. Is this a know issue with Go conditional variables? Maybe we did the loops because of other circumstances on our side?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a know issue with Go conditional variables? Maybe we did the loops because of other circumstances on our side?

Good question...we could just be using the API wrong, and then ended up settling on this workaround as it seemed to make a difference in practice. Otherwise, we would see behavior where things would never shutdown without this type of loop added. I guess I view it as more of a defensive thing, but we should def try to repro it in like golang playground or something.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, doesn't seem to be causing any issues in the itest (which are actually all green on this PR!!!), so I guess lets just monitor it and see if it triggers any issue re shutdown.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sgtm

<-c.shutdown
}

// Add increases lock counts and queues up tx commit closure for execution.
// Transactions that don't have any conflicts are executed immediately by
// "downgrading" the count mutex to allow concurrency.
func (c *commitQueue) Add(commitLoop func(), rset readSet, wset writeSet) {
func (c *commitQueue) Add(commitLoop func(), rset []string, wset []string) {
c.mx.Lock()
blocked := false

// Mark as blocked if there's any writer changing any of the keys in
// the read set. Do not increment the reader counts yet as we'll need to
// use the original reader counts when scanning through the write set.
for key := range rset {
for _, key := range rset {
if c.writerMap[key] > 0 {
blocked = true
break
Expand All @@ -70,56 +81,45 @@ func (c *commitQueue) Add(commitLoop func(), rset readSet, wset writeSet) {

// Mark as blocked if there's any writer or reader for any of the keys
// in the write set.
for key := range wset {
for _, key := range wset {
blocked = blocked || c.readerMap[key] > 0 || c.writerMap[key] > 0

// Increment the writer count.
c.writerMap[key] += 1
}

// Finally we can increment the reader counts for keys in the read set.
for key := range rset {
for _, key := range rset {
c.readerMap[key] += 1
}

if blocked {
// Add the transaction to the queue if conflicts with an already
// queued one.
c.mx.Unlock()
c.queueCond.L.Lock()
c.queue.PushBack(&commitQueueTxn{
commitLoop: commitLoop,
blocked: blocked,
rset: rset,
wset: wset,
})
c.queueCond.L.Unlock()

select {
case c.queue <- commitLoop:
case <-c.ctx.Done():
}
} else {
// To make sure we don't add a new tx to the queue that depends
// on this "unblocked" tx, grab the commitMutex before lifting
// the mutex guarding the lock maps.
c.commitMutex.RLock()
c.mx.Unlock()

// At this point we're safe to execute the "unblocked" tx, as
// we cannot execute blocked tx that may have been read from the
// queue until the commitMutex is held.
commitLoop()

c.commitMutex.RUnlock()
}
c.mx.Unlock()

c.queueCond.Signal()
}

// Done decreases lock counts of the keys in the read/write sets.
func (c *commitQueue) Done(rset readSet, wset writeSet) {
// done decreases lock counts of the keys in the read/write sets.
func (c *commitQueue) done(rset []string, wset []string) {
c.mx.Lock()
defer c.mx.Unlock()

for key := range rset {
for _, key := range rset {
c.readerMap[key] -= 1
if c.readerMap[key] == 0 {
delete(c.readerMap, key)
}
}

for key := range wset {
for _, key := range wset {
c.writerMap[key] -= 1
if c.writerMap[key] == 0 {
delete(c.writerMap, key)
Expand All @@ -131,20 +131,82 @@ func (c *commitQueue) Done(rset readSet, wset writeSet) {
// dependencies. The queue ensures that the top element doesn't conflict with
// any other transactions and therefore can be executed freely.
func (c *commitQueue) mainLoop() {
defer c.wg.Done()
defer close(c.shutdown)

for {
// Wait until there are no unblocked transactions being
// executed, and for there to be at least one blocked
// transaction in our queue.
c.queueCond.L.Lock()
for c.queue.Front() == nil {
guggero marked this conversation as resolved.
Show resolved Hide resolved
c.queueCond.Wait()

// Check the exit condition before looping again.
select {
case <-c.ctx.Done():
c.queueCond.L.Unlock()
return
default:
}
}

// Now collect all txns until we find the next blocking one.
// These shouldn't conflict (if the precollected read/write
// keys sets don't grow), meaning we can safely commit them
// in parallel.
work := make([]*commitQueueTxn, 1)
e := c.queue.Front()
work[0] = c.queue.Remove(e).(*commitQueueTxn)
guggero marked this conversation as resolved.
Show resolved Hide resolved

for {
e := c.queue.Front()
if e == nil {
break
}

next := e.Value.(*commitQueueTxn)
if !next.blocked {
work = append(work, next)
c.queue.Remove(e)
} else {
// We found the next blocking txn which means
// the block of work needs to be cut here.
break
}
}

c.queueCond.L.Unlock()

// Check if we need to exit before continuing.
select {
case top := <-c.queue:
// Execute the next blocked transaction. As it is
// the top element in the queue it means that it doesn't
// depend on any other transactions anymore.
c.commitMutex.Lock()
top()
c.commitMutex.Unlock()
case <-c.ctx.Done():
return
default:
}

var wg sync.WaitGroup
wg.Add(len(work))

// Fire up N goroutines where each will run its commit loop
// and then clean up the reader/writer maps.
for _, txn := range work {
go func(txn *commitQueueTxn) {
defer wg.Done()
txn.commitLoop()

// We can safely cleanup here as done only
// holds the main mutex.
c.done(txn.rset, txn.wset)
}(txn)
}

wg.Wait()

// Check if we need to exit before continuing.
select {
case <-c.ctx.Done():
return
default:
}
}
}
58 changes: 21 additions & 37 deletions kvdb/etcd/commit_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
func TestCommitQueue(t *testing.T) {
// The duration of each commit.
const commitDuration = time.Millisecond * 500
const numCommits = 4
const numCommits = 5

var wg sync.WaitGroup
commits := make([]string, numCommits)
Expand All @@ -30,69 +30,53 @@ func TestCommitQueue(t *testing.T) {
// Update our log of commit order. Avoid blocking
// by preallocating the commit log and increasing
// the log index atomically.
i := atomic.AddInt32(&idx, 1)
commits[i] = tag

if sleep {
time.Sleep(commitDuration)
}
}
}

// Helper function to create a read set from the passed keys.
makeReadSet := func(keys []string) readSet {
rs := make(map[string]stmGet)

for _, key := range keys {
rs[key] = stmGet{}
}

return rs
}

// Helper function to create a write set from the passed keys.
makeWriteSet := func(keys []string) writeSet {
ws := make(map[string]stmPut)

for _, key := range keys {
ws[key] = stmPut{}
i := atomic.AddInt32(&idx, 1)
commits[i] = tag
}

return ws
}

ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
q := NewCommitQueue(ctx)
defer q.Wait()
defer q.Stop()
defer cancel()

wg.Add(numCommits)
t1 := time.Now()

// Tx1: reads: key1, key2, writes: key3, conflict: none
// Tx1 (long): reads: key1, key2, writes: key3, conflict: none
q.Add(
commit("free", true),
makeReadSet([]string{"key1", "key2"}),
makeWriteSet([]string{"key3"}),
[]string{"key1", "key2"},
[]string{"key3"},
)
// Tx2: reads: key1, key2, writes: key3, conflict: Tx1
q.Add(
commit("blocked1", false),
makeReadSet([]string{"key1", "key2"}),
makeWriteSet([]string{"key3"}),
[]string{"key1", "key2"},
[]string{"key3"},
)
// Tx3 (long): reads: key1, writes: key4, conflict: none
q.Add(
commit("free", true),
[]string{"key1", "key2"},
[]string{"key4"},
)
// Tx3: reads: key1, writes: key4, conflict: none
// Tx4 (long): reads: key1, writes: none, conflict: none
q.Add(
commit("free", true),
makeReadSet([]string{"key1", "key2"}),
makeWriteSet([]string{"key4"}),
[]string{"key1", "key2"},
[]string{},
)
// Tx4: reads: key2, writes: key4 conflict: Tx3
q.Add(
commit("blocked2", false),
makeReadSet([]string{"key2"}),
makeWriteSet([]string{"key4"}),
[]string{"key2"},
[]string{"key4"},
)

// Wait for all commits.
Expand All @@ -109,7 +93,7 @@ func TestCommitQueue(t *testing.T) {
// before the blocking ones, and the blocking ones are executed in
// the order of addition.
require.Equal(t,
[]string{"free", "free", "blocked1", "blocked2"},
[]string{"free", "blocked1", "free", "free", "blocked2"},
commits,
)
}
Loading