Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip: etcd fixes and performance improvements #5392

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 78 additions & 12 deletions channeldb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,8 @@ type DB struct {
graph *ChannelGraph
clock clock.Clock
dryRun bool

chanCache *kvdb.Cache
}

// Open opens or creates channeldb. Any necessary schemas migrations due
Expand Down Expand Up @@ -260,6 +262,16 @@ func Open(dbPath string, modifiers ...OptionModifier) (*DB, error) {
return db, err
}

func resetChanStateCache(cache *kvdb.Cache) error {
// We don't use cache for Bolt backend.
if cache == nil {
return nil
}

cache.Wipe()
return cache.Init()
}

// CreateWithBackend creates channeldb instance using the passed kvdb.Backend.
// Any necessary schemas migrations due to updates will take place as necessary.
func CreateWithBackend(backend kvdb.Backend, modifiers ...OptionModifier) (*DB, error) {
Expand All @@ -274,18 +286,64 @@ func CreateWithBackend(backend kvdb.Backend, modifiers ...OptionModifier) (*DB,

chanDB := &DB{
Backend: backend,
channelStateDB: &ChannelStateDB{
linkNodeDB: &LinkNodeDB{
backend: backend,
},
backend: backend,
},
clock: opts.clock,
dryRun: opts.dryRun,
clock: opts.clock,
dryRun: opts.dryRun,
}

chanStateBackend := backend

// Override the chan state backend if we require to cache chan state.
if opts.ChanStateCache {
skipped := [][]byte{
// Skip the graph buckets.
nodeBucket,
edgeBucket,
edgeIndexBucket,
graphMetaBucket,

// Skip some non performance critical large buckets.
closedChannelBucket,
closeSummaryBucket,
fwdPackagesKey,
revocationLogBucket,
}

topLevel := [][]byte{
// Read through the graph buckets.
nodeBucket,
edgeBucket,
edgeIndexBucket,
graphMetaBucket,

// Cache important channel state.
openChannelBucket,
outpointBucket,
nodeInfoBucket,

// Channel state buckets to read through.
closedChannelBucket,
closeSummaryBucket,
fwdPackagesKey,
}

cache := kvdb.NewCache(backend, topLevel, skipped)
if err := cache.Init(); err != nil {
return nil, err
}

chanStateBackend = cache
chanDB.chanCache = cache
}

// Set the parent pointer (only used in tests).
chanDB.channelStateDB.parent = chanDB
chanDB.channelStateDB = &ChannelStateDB{
linkNodeDB: &LinkNodeDB{
backend: chanStateBackend,
},
backend: chanStateBackend,

// Set the parent pointer (only used in tests).
parent: chanDB,
}

var err error
chanDB.graph, err = NewChannelGraph(
Expand Down Expand Up @@ -343,7 +401,11 @@ func (d *DB) Wipe() error {
return err
}

return initChannelDB(d.Backend)
if err := initChannelDB(d.Backend); err != nil {
return err
}

return resetChanStateCache(d.chanCache)
}

// initChannelDB creates and initializes a fresh version of channeldb. In the
Expand Down Expand Up @@ -518,7 +580,6 @@ func (c *ChannelStateDB) fetchNodeChannels(chainBucket kvdb.RBucket) (
"chan_point=%v: %v", outPoint, err)
}
oChannel.Db = c

channels = append(channels, oChannel)

return nil
Expand Down Expand Up @@ -1440,6 +1501,11 @@ func MakeTestDB(modifiers ...OptionModifier) (*DB, func(), error) {
return nil, nil, err
}

// Use a channel state cache when testing with remote backends.
if kvdb.TestBackend != kvdb.BoltBackendName {
modifiers = append(modifiers, OptionWithChannelStateCache(true))
}

cdb, err := CreateWithBackend(backend, modifiers...)
if err != nil {
backendCleanup()
Expand Down
17 changes: 15 additions & 2 deletions channeldb/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,14 @@ func TestOpenWithCreate(t *testing.T) {
}
defer cleanup()

cdb, err := CreateWithBackend(backend)
var modifiers []OptionModifier
// Use a channel state cache when testing with remote backends.
if kvdb.TestBackend != kvdb.BoltBackendName {
modifiers = append(modifiers, OptionWithChannelStateCache(true))
}

cdb, err := CreateWithBackend(backend, modifiers...)

if err != nil {
t.Fatalf("unable to create channeldb: %v", err)
}
Expand Down Expand Up @@ -87,7 +94,13 @@ func TestWipe(t *testing.T) {
}
defer cleanup()

fullDB, err := CreateWithBackend(backend)
var modifiers []OptionModifier
// Use a channel state cache when testing with remote backends.
if kvdb.TestBackend != kvdb.BoltBackendName {
modifiers = append(modifiers, OptionWithChannelStateCache(true))
}

fullDB, err := CreateWithBackend(backend, modifiers...)
if err != nil {
t.Fatalf("unable to create channeldb: %v", err)
}
Expand Down
12 changes: 12 additions & 0 deletions channeldb/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ const (
type Options struct {
kvdb.BoltBackendConfig

// ChanStateCache when true turns of in-memory caching of important
// channel state buckets.
ChanStateCache bool

// RejectCacheSize is the maximum number of rejectCacheEntries to hold
// in the rejection cache.
RejectCacheSize int
Expand Down Expand Up @@ -72,6 +76,14 @@ func DefaultOptions() Options {
// OptionModifier is a function signature for modifying the default Options.
type OptionModifier func(*Options)

// OptionWithChannelStateCache turns on in-memory caching of important channel
// state buckets.
func OptionWithChannelStateCache(cache bool) OptionModifier {
return func(o *Options) {
o.ChanStateCache = cache
}
}

// OptionSetRejectCacheSize sets the RejectCacheSize to n.
func OptionSetRejectCacheSize(n int) OptionModifier {
return func(o *Options) {
Expand Down
70 changes: 44 additions & 26 deletions channeldb/payment_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,8 +378,11 @@ func (p *PaymentControl) RegisterAttempt(paymentHash lntypes.Hash,
return err
}

// Retrieve attempt info for the notification.
payment, err = fetchPayment(bucket)
p.HTLCs = append(p.HTLCs, HTLCAttempt{
HTLCAttemptInfo: *attempt,
})

payment = p
return err
})
if err != nil {
Expand All @@ -405,7 +408,9 @@ func (p *PaymentControl) SettleAttempt(hash lntypes.Hash,
}
settleBytes := b.Bytes()

return p.updateHtlcKey(hash, attemptID, htlcSettleInfoKey, settleBytes)
return p.updateHtlcKey(
hash, attemptID, htlcSettleInfoKey, settleBytes, settleInfo, nil,
)
}

// FailAttempt marks the given payment attempt failed.
Expand All @@ -418,12 +423,15 @@ func (p *PaymentControl) FailAttempt(hash lntypes.Hash,
}
failBytes := b.Bytes()

return p.updateHtlcKey(hash, attemptID, htlcFailInfoKey, failBytes)
return p.updateHtlcKey(
hash, attemptID, htlcFailInfoKey, failBytes, nil, failInfo,
)
}

// updateHtlcKey updates a database key for the specified htlc.
func (p *PaymentControl) updateHtlcKey(paymentHash lntypes.Hash,
attemptID uint64, key, value []byte) (*MPPayment, error) {
attemptID uint64, key, value []byte, settleInfo *HTLCSettleInfo,
failInfo *HTLCFailInfo) (*MPPayment, error) {

aid := make([]byte, 8)
binary.BigEndian.PutUint64(aid, attemptID)
Expand All @@ -450,33 +458,43 @@ func (p *PaymentControl) updateHtlcKey(paymentHash lntypes.Hash,
return err
}

htlcsBucket := bucket.NestedReadWriteBucket(paymentHtlcsBucket)
if htlcsBucket == nil {
return fmt.Errorf("htlcs bucket not found")
}
for i := range p.HTLCs {
if p.HTLCs[i].AttemptID != attemptID {
continue
}

if htlcsBucket.Get(htlcBucketKey(htlcAttemptInfoKey, aid)) == nil {
return fmt.Errorf("HTLC with ID %v not registered",
attemptID)
}
if p.HTLCs[i].Failure != nil {
return ErrAttemptAlreadyFailed
}

// Make sure the shard is not already failed or settled.
if htlcsBucket.Get(htlcBucketKey(htlcFailInfoKey, aid)) != nil {
return ErrAttemptAlreadyFailed
}
if p.HTLCs[i].Settle != nil {
return ErrAttemptAlreadySettled
}

if htlcsBucket.Get(htlcBucketKey(htlcSettleInfoKey, aid)) != nil {
return ErrAttemptAlreadySettled
}
// Udate the DB.
htlcsBucket := bucket.NestedReadWriteBucket(
paymentHtlcsBucket,
)
if htlcsBucket == nil {
return fmt.Errorf("htlcs bucket not found")
}

// Add or update the key for this htlc.
err = htlcsBucket.Put(htlcBucketKey(key, aid), value)
if err != nil {
return err
// Add or update the key for this htlc.
err = htlcsBucket.Put(htlcBucketKey(key, aid), value)
if err != nil {
return err
}

// Update the fetched payment.
if settleInfo != nil {
p.HTLCs[i].Settle = settleInfo
} else if failInfo != nil {
p.HTLCs[i].Failure = failInfo
}
}

// Retrieve attempt info for the notification.
payment, err = fetchPayment(bucket)
updatePaymentStatus(p)
payment = p
return err
})
if err != nil {
Expand Down
Loading