Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

etcd: make CommitQueue.Add non-blocking #5153

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 84 additions & 32 deletions channeldb/kvdb/etcd/commit_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,12 @@
package etcd

import (
"container/list"
"context"
"sync"
"time"
)

// commitQueueSize is the maximum number of commits we let to queue up. All
// remaining commits will block on commitQueue.Add().
const commitQueueSize = 100

// commitQueue is a simple execution queue to manage conflicts for transactions
// and thereby reduce the number of times conflicting transactions need to be
// retried. When a new transaction is added to the queue, we first upgrade the
Expand All @@ -25,9 +23,12 @@ type commitQueue struct {
readerMap map[string]int
writerMap map[string]int

commitMutex sync.RWMutex
queue chan (func())
wg sync.WaitGroup
queueMu sync.RWMutex
queueCond *sync.Cond
queue *list.List
freeCount uint32
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should document these a bit?


shutdown chan struct{}
}

// NewCommitQueue creates a new commit queue, with the passed abort context.
Expand All @@ -36,19 +37,20 @@ func NewCommitQueue(ctx context.Context) *commitQueue {
ctx: ctx,
readerMap: make(map[string]int),
writerMap: make(map[string]int),
queue: make(chan func(), commitQueueSize),
queue: list.New(),
shutdown: make(chan struct{}),
}
q.queueCond = sync.NewCond(&q.queueMu)

// Start the queue consumer loop.
q.wg.Add(1)
go q.mainLoop()

return q
}

// Wait waits for the queue to stop (after the queue context has been canceled).
func (c *commitQueue) Wait() {
c.wg.Wait()
c.signalUntilShutdown()
}

// Add increases lock counts and queues up tx commit closure for execution.
Expand Down Expand Up @@ -83,27 +85,38 @@ func (c *commitQueue) Add(commitLoop func(), rset readSet, wset writeSet) {
}

if blocked {
// Add the transaction to the queue if conflicts with an already
// queued one.
// Add the transaction to the queue if it conflicts with an
// already queued one. It is safe to do so outside the lock,
// since this we know it will be executed serially.
c.mx.Unlock()

select {
case c.queue <- commitLoop:
case <-c.ctx.Done():
}
c.queueCond.L.Lock()
c.queue.PushBack(commitLoop)
c.queueCond.L.Unlock()
} else {
// To make sure we don't add a new tx to the queue that depends
// on this "unblocked" tx, grab the commitMutex before lifting
// the mutex guarding the lock maps.
c.commitMutex.RLock()
// on this "unblocked" tx. Increment our free counter before
// unlocking so that the mainLoop stops pulling off blocked
// transactions from the queue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extra line

c.queueCond.L.Lock()
c.freeCount++
c.queueCond.L.Unlock()

c.mx.Unlock()

// At this point we're safe to execute the "unblocked" tx, as
// we cannot execute blocked tx that may have been read from the
// queue until the commitMutex is held.
commitLoop()
// At this point it is safe to execute the "unblocked" tx, as no
// blocked tx will be read from the queue until the freeCount is
// decremented back to 0.
go func() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we need this goroutine normally? It's handy for the commit queue unit test though. Did a few benchmarks with/without and performance difference is negligible.

commitLoop()

c.queueCond.L.Lock()
c.freeCount--
c.queueCond.L.Unlock()
c.queueCond.Signal()
}()

c.commitMutex.RUnlock()
}
}

Expand Down Expand Up @@ -131,20 +144,59 @@ func (c *commitQueue) Done(rset readSet, wset writeSet) {
// dependencies. The queue ensures that the top element doesn't conflict with
// any other transactions and therefore can be executed freely.
func (c *commitQueue) mainLoop() {
defer c.wg.Done()
defer close(c.shutdown)

for {
// Wait until there are no unblocked transactions being
// executed, and for there to be at least one blocked
// transaction in our queue.
c.queueCond.L.Lock()
for c.freeCount > 0 || c.queue.Front() == nil {
c.queueCond.Wait()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did a few runs of make itest etcd=1 icase=async_payments_benchmark to see if there's any performance difference between this and the RWMutex based queue implementation and this seems to deadlock. I've uploaded the profile: https://paste.ubuntu.com/p/J3DZCDwCCQ/

To me it seems like that the problem might be that even though the context is canceled, we never signal the queueCond. We only signal when waiting for the queue to "stop".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch @bhandras! I was able to figure it out, see the second commit :)


// Check the exit condition before looping again.
select {
case <-c.ctx.Done():
c.queueCond.L.Unlock()
return
default:
}
}

// Remove the top element from the queue, now that we know there
// are no possible conflicts.
e := c.queue.Front()
top := c.queue.Remove(e).(func())
c.queueCond.L.Unlock()

// Check if we need to exit before continuing.
select {
case top := <-c.queue:
// Execute the next blocked transaction. As it is
// the top element in the queue it means that it doesn't
// depend on any other transactions anymore.
c.commitMutex.Lock()
top()
c.commitMutex.Unlock()
case <-c.ctx.Done():
return
default:
}

// Execute the next blocked transaction.
top()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we don't hold the mutex here now, how can we be sure that no other commit comes in while this is executing? I guess as long as it doesn't conflict that's okay, and any conflict would go back on the queue=

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only want to make sure there are no competing commits for the same keys, and since this (top) element already increased ref counts any subsequent txns that touch those keys are queued. Transactions that do not have conflicting keys are executed without queuing up (etcd does allow multi writers)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also one thing that may not be immediately clear is that we are OK with conflicting transactions, since the STMs will retry them and eventually one of them will succeed first followed by the other. The commit queue makes these conflicts less likely (reducing the number of retries to at most 2).


// Check if we need to exit before continuing.
select {
case <-c.ctx.Done():
return
default:
}
}
}

// signalUntilShutdown strobes the queue's condition variable to ensure the
// mainLoop reliably unblocks to check for the exit condition.
func (c *commitQueue) signalUntilShutdown() {
for {
select {
case <-time.After(time.Millisecond):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why so rapidly? 😮

Shouldn't a single signal be enough to unlock the loop?

c.queueCond.Signal()
case <-c.shutdown:
return
}
}
}
55 changes: 32 additions & 23 deletions channeldb/kvdb/etcd/commit_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,25 @@ import (
"sync"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/require"
)

// TestCommitQueue tests that non-conflicting transactions commit concurrently,
// while conflicting transactions are queued up.
func TestCommitQueue(t *testing.T) {
// The duration of each commit.
const commitDuration = time.Millisecond * 500
const numCommits = 4

var wg sync.WaitGroup
commits := make([]string, numCommits)
idx := int32(-1)

commit := func(tag string, sleep bool) func() {
commit := func(tag string, commit chan struct{},
ready <-chan struct{}) func() {

return func() {
if commit != nil {
close(commit)
}
defer wg.Done()

// Update our log of commit order. Avoid blocking
Expand All @@ -33,8 +34,8 @@ func TestCommitQueue(t *testing.T) {
i := atomic.AddInt32(&idx, 1)
commits[i] = tag

if sleep {
time.Sleep(commitDuration)
if ready != nil {
<-ready
}
}
}
Expand Down Expand Up @@ -68,45 +69,53 @@ func TestCommitQueue(t *testing.T) {
defer cancel()

wg.Add(numCommits)
t1 := time.Now()

ready := make(chan struct{})

// Tx1: reads: key1, key2, writes: key3, conflict: none
// Since we simulate that the txn takes a long time, we'll add in a
// new goroutine and wait for the txn commit to start execution.
q.Add(
commit("free", true),
commit("free", nil, ready),
makeReadSet([]string{"key1", "key2"}),
makeWriteSet([]string{"key3"}),
)
// Tx2: reads: key1, key2, writes: key3, conflict: Tx1

// Tx2: reads: key1, key5, writes: key3, conflict: Tx1 (key3)
// We don't expect queue add to block as this txn will queue up after
// tx1.
q.Add(
commit("blocked1", false),
commit("blocked1", nil, nil),
makeReadSet([]string{"key1", "key2"}),
makeWriteSet([]string{"key3"}),
)
// Tx3: reads: key1, writes: key4, conflict: none

// Tx3: reads: key1, key2, writes: key4, conflict: none
// We expect this transaction to be reordered before blocked1, even
// though it was added after since it it doesn't have any conflicts.
q.Add(
commit("free", true),
commit("free", nil, ready),
makeReadSet([]string{"key1", "key2"}),
makeWriteSet([]string{"key4"}),
)
// Tx4: reads: key2, writes: key4 conflict: Tx3

// Tx4: reads: key2, writes: key4 conflicts: Tx3 (key4)
// We don't expect queue add to block as this txn will queue up after
// tx2.
q.Add(
commit("blocked2", false),
commit("blocked2", nil, nil),
makeReadSet([]string{"key2"}),
makeWriteSet([]string{"key4"}),
)

// Allow Tx1 to continue with the commit.
close(ready)

// Wait for all commits.
wg.Wait()
t2 := time.Now()

// Expected total execution time: delta.
// 2 * commitDuration <= delta < 3 * commitDuration
delta := t2.Sub(t1)
require.LessOrEqual(t, int64(commitDuration*2), int64(delta))
require.Greater(t, int64(commitDuration*3), int64(delta))

// Expect that the non-conflicting "free" transactions are executed
// before the blocking ones, and the blocking ones are executed in
// before the conflicting ones, and the conflicting ones are executed in
// the order of addition.
require.Equal(t,
[]string{"free", "free", "blocked1", "blocked2"},
Expand Down
19 changes: 14 additions & 5 deletions channeldb/kvdb/etcd/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ func (c *commitStatsCollector) callback(succ bool, stats CommitStats) {

// db holds a reference to the etcd client connection.
type db struct {
ctx context.Context
cancel func()
config BackendConfig
cli *clientv3.Client
commitStatsCollector *commitStatsCollector
Expand Down Expand Up @@ -168,6 +170,8 @@ func newEtcdBackend(config BackendConfig) (*db, error) {
config.Ctx = context.Background()
}

ctx, cancel := context.WithCancel(config.Ctx)

tlsInfo := transport.TLSInfo{
CertFile: config.CertFile,
KeyFile: config.KeyFile,
Expand All @@ -180,7 +184,7 @@ func newEtcdBackend(config BackendConfig) (*db, error) {
}

cli, err := clientv3.New(clientv3.Config{
Context: config.Ctx,
Context: ctx,
Endpoints: []string{config.Host},
DialTimeout: etcdConnectionTimeout,
Username: config.User,
Expand All @@ -198,9 +202,11 @@ func newEtcdBackend(config BackendConfig) (*db, error) {
cli.Lease = namespace.NewLease(cli.Lease, config.Namespace)

backend := &db{
ctx: ctx,
cancel: cancel,
cli: cli,
config: config,
txQueue: NewCommitQueue(config.Ctx),
txQueue: NewCommitQueue(ctx),
}

if config.CollectCommitStats {
Expand All @@ -213,7 +219,7 @@ func newEtcdBackend(config BackendConfig) (*db, error) {
// getSTMOptions creats all STM options based on the backend config.
func (db *db) getSTMOptions() []STMOptionFunc {
opts := []STMOptionFunc{
WithAbortContext(db.config.Ctx),
WithAbortContext(db.ctx),
}

if db.config.CollectCommitStats {
Expand Down Expand Up @@ -286,7 +292,7 @@ func (db *db) BeginReadTx() (walletdb.ReadTx, error) {
// start a read-only transaction to perform all operations.
// This function is part of the walletdb.Db interface implementation.
func (db *db) Copy(w io.Writer) error {
ctx, cancel := context.WithTimeout(db.config.Ctx, etcdLongTimeout)
ctx, cancel := context.WithTimeout(db.ctx, etcdLongTimeout)
defer cancel()

readCloser, err := db.cli.Snapshot(ctx)
Expand All @@ -302,5 +308,8 @@ func (db *db) Copy(w io.Writer) error {
// Close cleanly shuts down the database and syncs all data.
// This function is part of the walletdb.Db interface implementation.
func (db *db) Close() error {
return db.cli.Close()
err := db.cli.Close()
db.cancel()
db.txQueue.Wait()
return err
}
9 changes: 9 additions & 0 deletions channeldb/kvdb/etcd/stm.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,17 @@ func runSTM(s *stm, apply func(STM) error) error {
select {
case <-done:
case <-s.options.ctx.Done():
return context.Canceled
}

// If the transaction executed, we can decrement the read and write lock
// sets and apply an commit stat callbacks.
//
// NOTE: It is not safe to do this in the case where the context is
// canceled, as it might inadvertently unblock other transactions that
// _should_ depend on this one. Furthermore, the executeErr is mutable
// so long as the done channel hasn't returned, so we can't read or
// return it.
s.txQueue.Done(s.rset, s.wset)

if s.options.commitStatsCallback != nil {
Expand Down