diff --git a/docs/release-notes/release-notes-0.14.0.md b/docs/release-notes/release-notes-0.14.0.md index 6c45edf813..a8e2d26b26 100644 --- a/docs/release-notes/release-notes-0.14.0.md +++ b/docs/release-notes/release-notes-0.14.0.md @@ -107,6 +107,10 @@ you. * [Optimized payment sequence generation](https://github.com/lightningnetwork/lnd/pull/5514/) to make LNDs payment throughput (and latency) with better when using etcd. +* [More robust commit queue design](https://github.com/lightningnetwork/lnd/pull/5513) + to make it less likely that we retry etcd transactions and make the commit + queue more scalable. + ## Performance improvements * [Update MC store in blocks](https://github.com/lightningnetwork/lnd/pull/5515) @@ -119,6 +123,7 @@ currnet DNS seeds when in SigNet mode](https://github.com/lightningnetwork/lnd/pull/5564). # Contributors (Alphabetical Order) +* Andras Banki-Horvath * ErikEk * Martin Habovstiak * Zero-1729 diff --git a/kvdb/etcd/commit_queue.go b/kvdb/etcd/commit_queue.go index f03845650c..138c08e68d 100644 --- a/kvdb/etcd/commit_queue.go +++ b/kvdb/etcd/commit_queue.go @@ -3,14 +3,11 @@ package etcd import ( + "container/list" "context" "sync" ) -// commitQueueSize is the maximum number of commits we let to queue up. All -// remaining commits will block on commitQueue.Add(). -const commitQueueSize = 100 - // commitQueue is a simple execution queue to manage conflicts for transactions // and thereby reduce the number of times conflicting transactions need to be // retried. When a new transaction is added to the queue, we first upgrade the @@ -25,9 +22,18 @@ type commitQueue struct { readerMap map[string]int writerMap map[string]int - commitMutex sync.RWMutex - queue chan (func()) - wg sync.WaitGroup + queue *list.List + queueMx sync.Mutex + queueCond *sync.Cond + + shutdown chan struct{} +} + +type commitQueueTxn struct { + commitLoop func() + blocked bool + rset []string + wset []string } // NewCommitQueue creates a new commit queue, with the passed abort context. @@ -36,32 +42,37 @@ func NewCommitQueue(ctx context.Context) *commitQueue { ctx: ctx, readerMap: make(map[string]int), writerMap: make(map[string]int), - queue: make(chan func(), commitQueueSize), + queue: list.New(), + shutdown: make(chan struct{}), } + q.queueCond = sync.NewCond(&q.queueMx) // Start the queue consumer loop. - q.wg.Add(1) go q.mainLoop() return q } -// Wait waits for the queue to stop (after the queue context has been canceled). -func (c *commitQueue) Wait() { - c.wg.Wait() +// Stop signals the queue to stop after the queue context has been canceled and +// waits until the has stopped. +func (c *commitQueue) Stop() { + // Signal the queue's condition variable to ensure the mainLoop reliably + // unblocks to check for the exit condition. + c.queueCond.Signal() + <-c.shutdown } // Add increases lock counts and queues up tx commit closure for execution. // Transactions that don't have any conflicts are executed immediately by // "downgrading" the count mutex to allow concurrency. -func (c *commitQueue) Add(commitLoop func(), rset readSet, wset writeSet) { +func (c *commitQueue) Add(commitLoop func(), rset []string, wset []string) { c.mx.Lock() blocked := false // Mark as blocked if there's any writer changing any of the keys in // the read set. Do not increment the reader counts yet as we'll need to // use the original reader counts when scanning through the write set. - for key := range rset { + for _, key := range rset { if c.writerMap[key] > 0 { blocked = true break @@ -70,7 +81,7 @@ func (c *commitQueue) Add(commitLoop func(), rset readSet, wset writeSet) { // Mark as blocked if there's any writer or reader for any of the keys // in the write set. - for key := range wset { + for _, key := range wset { blocked = blocked || c.readerMap[key] > 0 || c.writerMap[key] > 0 // Increment the writer count. @@ -78,48 +89,37 @@ func (c *commitQueue) Add(commitLoop func(), rset readSet, wset writeSet) { } // Finally we can increment the reader counts for keys in the read set. - for key := range rset { + for _, key := range rset { c.readerMap[key] += 1 } - if blocked { - // Add the transaction to the queue if conflicts with an already - // queued one. - c.mx.Unlock() + c.queueCond.L.Lock() + c.queue.PushBack(&commitQueueTxn{ + commitLoop: commitLoop, + blocked: blocked, + rset: rset, + wset: wset, + }) + c.queueCond.L.Unlock() - select { - case c.queue <- commitLoop: - case <-c.ctx.Done(): - } - } else { - // To make sure we don't add a new tx to the queue that depends - // on this "unblocked" tx, grab the commitMutex before lifting - // the mutex guarding the lock maps. - c.commitMutex.RLock() - c.mx.Unlock() - - // At this point we're safe to execute the "unblocked" tx, as - // we cannot execute blocked tx that may have been read from the - // queue until the commitMutex is held. - commitLoop() - - c.commitMutex.RUnlock() - } + c.mx.Unlock() + + c.queueCond.Signal() } -// Done decreases lock counts of the keys in the read/write sets. -func (c *commitQueue) Done(rset readSet, wset writeSet) { +// done decreases lock counts of the keys in the read/write sets. +func (c *commitQueue) done(rset []string, wset []string) { c.mx.Lock() defer c.mx.Unlock() - for key := range rset { + for _, key := range rset { c.readerMap[key] -= 1 if c.readerMap[key] == 0 { delete(c.readerMap, key) } } - for key := range wset { + for _, key := range wset { c.writerMap[key] -= 1 if c.writerMap[key] == 0 { delete(c.writerMap, key) @@ -131,20 +131,82 @@ func (c *commitQueue) Done(rset readSet, wset writeSet) { // dependencies. The queue ensures that the top element doesn't conflict with // any other transactions and therefore can be executed freely. func (c *commitQueue) mainLoop() { - defer c.wg.Done() + defer close(c.shutdown) for { + // Wait until there are no unblocked transactions being + // executed, and for there to be at least one blocked + // transaction in our queue. + c.queueCond.L.Lock() + for c.queue.Front() == nil { + c.queueCond.Wait() + + // Check the exit condition before looping again. + select { + case <-c.ctx.Done(): + c.queueCond.L.Unlock() + return + default: + } + } + + // Now collect all txns until we find the next blocking one. + // These shouldn't conflict (if the precollected read/write + // keys sets don't grow), meaning we can safely commit them + // in parallel. + work := make([]*commitQueueTxn, 1) + e := c.queue.Front() + work[0] = c.queue.Remove(e).(*commitQueueTxn) + + for { + e := c.queue.Front() + if e == nil { + break + } + + next := e.Value.(*commitQueueTxn) + if !next.blocked { + work = append(work, next) + c.queue.Remove(e) + } else { + // We found the next blocking txn which means + // the block of work needs to be cut here. + break + } + } + + c.queueCond.L.Unlock() + + // Check if we need to exit before continuing. select { - case top := <-c.queue: - // Execute the next blocked transaction. As it is - // the top element in the queue it means that it doesn't - // depend on any other transactions anymore. - c.commitMutex.Lock() - top() - c.commitMutex.Unlock() + case <-c.ctx.Done(): + return + default: + } + + var wg sync.WaitGroup + wg.Add(len(work)) + // Fire up N goroutines where each will run its commit loop + // and then clean up the reader/writer maps. + for _, txn := range work { + go func(txn *commitQueueTxn) { + defer wg.Done() + txn.commitLoop() + + // We can safely cleanup here as done only + // holds the main mutex. + c.done(txn.rset, txn.wset) + }(txn) + } + + wg.Wait() + + // Check if we need to exit before continuing. + select { case <-c.ctx.Done(): return + default: } } } diff --git a/kvdb/etcd/commit_queue_test.go b/kvdb/etcd/commit_queue_test.go index 16ff71006d..a7ebcca2b3 100644 --- a/kvdb/etcd/commit_queue_test.go +++ b/kvdb/etcd/commit_queue_test.go @@ -17,7 +17,7 @@ import ( func TestCommitQueue(t *testing.T) { // The duration of each commit. const commitDuration = time.Millisecond * 500 - const numCommits = 4 + const numCommits = 5 var wg sync.WaitGroup commits := make([]string, numCommits) @@ -30,69 +30,53 @@ func TestCommitQueue(t *testing.T) { // Update our log of commit order. Avoid blocking // by preallocating the commit log and increasing // the log index atomically. - i := atomic.AddInt32(&idx, 1) - commits[i] = tag - if sleep { time.Sleep(commitDuration) } - } - } - // Helper function to create a read set from the passed keys. - makeReadSet := func(keys []string) readSet { - rs := make(map[string]stmGet) - - for _, key := range keys { - rs[key] = stmGet{} - } - - return rs - } - - // Helper function to create a write set from the passed keys. - makeWriteSet := func(keys []string) writeSet { - ws := make(map[string]stmPut) - - for _, key := range keys { - ws[key] = stmPut{} + i := atomic.AddInt32(&idx, 1) + commits[i] = tag } - - return ws } ctx := context.Background() ctx, cancel := context.WithCancel(ctx) q := NewCommitQueue(ctx) - defer q.Wait() + defer q.Stop() defer cancel() wg.Add(numCommits) t1 := time.Now() - // Tx1: reads: key1, key2, writes: key3, conflict: none + // Tx1 (long): reads: key1, key2, writes: key3, conflict: none q.Add( commit("free", true), - makeReadSet([]string{"key1", "key2"}), - makeWriteSet([]string{"key3"}), + []string{"key1", "key2"}, + []string{"key3"}, ) // Tx2: reads: key1, key2, writes: key3, conflict: Tx1 q.Add( commit("blocked1", false), - makeReadSet([]string{"key1", "key2"}), - makeWriteSet([]string{"key3"}), + []string{"key1", "key2"}, + []string{"key3"}, + ) + // Tx3 (long): reads: key1, writes: key4, conflict: none + q.Add( + commit("free", true), + []string{"key1", "key2"}, + []string{"key4"}, ) - // Tx3: reads: key1, writes: key4, conflict: none + // Tx4 (long): reads: key1, writes: none, conflict: none q.Add( commit("free", true), - makeReadSet([]string{"key1", "key2"}), - makeWriteSet([]string{"key4"}), + []string{"key1", "key2"}, + []string{}, ) // Tx4: reads: key2, writes: key4 conflict: Tx3 q.Add( commit("blocked2", false), - makeReadSet([]string{"key2"}), - makeWriteSet([]string{"key4"}), + []string{"key2"}, + []string{"key4"}, ) // Wait for all commits. @@ -109,7 +93,7 @@ func TestCommitQueue(t *testing.T) { // before the blocking ones, and the blocking ones are executed in // the order of addition. require.Equal(t, - []string{"free", "free", "blocked1", "blocked2"}, + []string{"free", "blocked1", "free", "free", "blocked2"}, commits, ) } diff --git a/kvdb/etcd/db.go b/kvdb/etcd/db.go index e3bad3b250..12b8986884 100644 --- a/kvdb/etcd/db.go +++ b/kvdb/etcd/db.go @@ -122,6 +122,7 @@ func (c *commitStatsCollector) callback(succ bool, stats CommitStats) { type db struct { cfg Config ctx context.Context + cancel func() cli *clientv3.Client commitStatsCollector *commitStatsCollector txQueue *commitQueue @@ -135,7 +136,6 @@ var _ walletdb.DB = (*db)(nil) // config. If etcd connection cannot be established, then returns error. func newEtcdBackend(ctx context.Context, cfg Config) (*db, error) { clientCfg := clientv3.Config{ - Context: ctx, Endpoints: []string{cfg.Host}, DialTimeout: etcdConnectionTimeout, Username: cfg.User, @@ -158,8 +158,11 @@ func newEtcdBackend(ctx context.Context, cfg Config) (*db, error) { clientCfg.TLS = tlsConfig } + ctx, cancel := context.WithCancel(ctx) + clientCfg.Context = ctx cli, err := clientv3.New(clientCfg) if err != nil { + cancel() return nil, err } @@ -171,6 +174,7 @@ func newEtcdBackend(ctx context.Context, cfg Config) (*db, error) { backend := &db{ cfg: cfg, ctx: ctx, + cancel: cancel, cli: cli, txQueue: NewCommitQueue(ctx), } @@ -296,5 +300,8 @@ func (db *db) Copy(w io.Writer) error { // Close cleanly shuts down the database and syncs all data. // This function is part of the walletdb.Db interface implementation. func (db *db) Close() error { - return db.cli.Close() + err := db.cli.Close() + db.cancel() + db.txQueue.Stop() + return err } diff --git a/kvdb/etcd/stm.go b/kvdb/etcd/stm.go index 0d47fe9379..9d1e50e97a 100644 --- a/kvdb/etcd/stm.go +++ b/kvdb/etcd/stm.go @@ -280,17 +280,38 @@ func runSTM(s *stm, apply func(STM) error) error { return preApplyErr } + // Make a copy of the read/write set keys here. The reason why we need + // to do this is because subsequent applies may change (shrink) these + // sets and so when we decrease reference counts in the commit queue in + // done(...) we'd potentially miss removing references which would + // result in queueing up transactions and contending DB access. + // Copying these strings is cheap due to Go's immutable string which is + // always a reference. + rkeys := make([]string, len(s.rset)) + wkeys := make([]string, len(s.wset)) + + i := 0 + for key := range s.rset { + rkeys[i] = key + i++ + } + + i = 0 + for key := range s.wset { + wkeys[i] = key + i++ + } + // Queue up the transaction for execution. - s.txQueue.Add(execute, s.rset, s.wset) + s.txQueue.Add(execute, rkeys, wkeys) // Wait for the transaction to execute, or break if aborted. select { case <-done: case <-s.options.ctx.Done(): + return context.Canceled } - s.txQueue.Done(s.rset, s.wset) - if s.options.commitStatsCallback != nil { stats.Retries = retries s.options.commitStatsCallback(executeErr == nil, stats) diff --git a/kvdb/etcd/stm_test.go b/kvdb/etcd/stm_test.go index a780f86269..e4a3810cd9 100644 --- a/kvdb/etcd/stm_test.go +++ b/kvdb/etcd/stm_test.go @@ -28,7 +28,7 @@ func TestPutToEmpty(t *testing.T) { defer func() { cancel() f.Cleanup() - txQueue.Wait() + txQueue.Stop() }() db, err := newEtcdBackend(ctx, f.BackendConfig()) @@ -55,7 +55,7 @@ func TestGetPutDel(t *testing.T) { defer func() { cancel() f.Cleanup() - txQueue.Wait() + txQueue.Stop() }() testKeyValues := []KV{ @@ -141,7 +141,7 @@ func TestFirstLastNextPrev(t *testing.T) { defer func() { cancel() f.Cleanup() - txQueue.Wait() + txQueue.Stop() }() testKeyValues := []KV{ @@ -299,7 +299,7 @@ func TestCommitError(t *testing.T) { defer func() { cancel() f.Cleanup() - txQueue.Wait() + txQueue.Stop() }() db, err := newEtcdBackend(ctx, f.BackendConfig()) @@ -347,7 +347,7 @@ func TestManualTxError(t *testing.T) { defer func() { cancel() f.Cleanup() - txQueue.Wait() + txQueue.Stop() }() db, err := newEtcdBackend(ctx, f.BackendConfig())