Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

channeldb: flatten the htlc attempts bucket #5635

Merged
merged 3 commits into from
Sep 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions channeldb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/lightningnetwork/lnd/channeldb/migration16"
"github.com/lightningnetwork/lnd/channeldb/migration20"
"github.com/lightningnetwork/lnd/channeldb/migration21"
"github.com/lightningnetwork/lnd/channeldb/migration23"
"github.com/lightningnetwork/lnd/channeldb/migration_01_to_11"
"github.com/lightningnetwork/lnd/clock"
"github.com/lightningnetwork/lnd/kvdb"
Expand Down Expand Up @@ -193,6 +194,10 @@ var (
number: 22,
migration: mig.CreateTLB(setIDIndexBucket),
},
{
number: 23,
migration: migration23.MigrateHtlcAttempts,
},
}

// Big endian is the preferred byte order, due to cursor scans over
Expand Down
163 changes: 163 additions & 0 deletions channeldb/migration23/migration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package migration23

import (
"fmt"

"github.com/lightningnetwork/lnd/kvdb"
)

var (
// paymentsRootBucket is the name of the top-level bucket within the
// database that stores all data related to payments.
paymentsRootBucket = []byte("payments-root-bucket")

// paymentHtlcsBucket is a bucket where we'll store the information
// about the HTLCs that were attempted for a payment.
paymentHtlcsBucket = []byte("payment-htlcs-bucket")

// oldAttemptInfoKey is a key used in a HTLC's sub-bucket to store the
// info about the attempt that was done for the HTLC in question.
oldAttemptInfoKey = []byte("htlc-attempt-info")

// oldSettleInfoKey is a key used in a HTLC's sub-bucket to store the
// settle info, if any.
oldSettleInfoKey = []byte("htlc-settle-info")

// oldFailInfoKey is a key used in a HTLC's sub-bucket to store
// failure information, if any.
oldFailInfoKey = []byte("htlc-fail-info")

// htlcAttemptInfoKey is the key used as the prefix of an HTLC attempt
// to store the info about the attempt that was done for the HTLC in
// question. The HTLC attempt ID is concatenated at the end.
htlcAttemptInfoKey = []byte("ai")

// htlcSettleInfoKey is the key used as the prefix of an HTLC attempt
// settle info, if any. The HTLC attempt ID is concatenated at the end.
htlcSettleInfoKey = []byte("si")

// htlcFailInfoKey is the key used as the prefix of an HTLC attempt
// failure information, if any.The HTLC attempt ID is concatenated at
// the end.
htlcFailInfoKey = []byte("fi")
)

// htlcBucketKey creates a composite key from prefix and id where the result is
// simply the two concatenated. This is the exact copy from payments.go.
func htlcBucketKey(prefix, id []byte) []byte {
key := make([]byte, len(prefix)+len(id))
copy(key, prefix)
copy(key[len(prefix):], id)
return key
}

// MigrateHtlcAttempts will gather all htlc-attempt-info's, htlcs-settle-info's
// and htlcs-fail-info's from the attempt ID buckes and re-store them using the
// flattened keys to each payment's payment-htlcs-bucket.
func MigrateHtlcAttempts(tx kvdb.RwTx) error {
payments := tx.ReadWriteBucket(paymentsRootBucket)
if payments == nil {
return nil
}

// Collect all payment hashes so we can migrate payments one-by-one to
// avoid any bugs bbolt might have when invalidating cursors.
// For 100 million payments, this would need about 3 GiB memory so we
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How many attempts are there for those 100 million payments?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I understand there's no hard upper bound, but certainly limited by the graph itself so we can say it's constant per payment? Also since we migrate per payment we won't pre-allocate for each attempt.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gocha. I asked because I was curious about the 3 GiB memory usage.

// should hopefully be fine for very large nodes too.
var paymentHashes []string
if err := payments.ForEach(func(hash, v []byte) error {
// Get the bucket which contains the payment, fail if the key
// does not have a bucket.
bucket := payments.NestedReadBucket(hash)
if bucket == nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could the paymentsBucket be empty?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could but then ForEach will not iterate.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I wanted to ask was there the above bucket could be nil or not...If it could be it seems we shouldn't return an error below.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry misunderstood what you meant. The semantics of NestedRead(Write)Bucket are a bit tricky since it never returns an error. When it returns nil it means that the bucket doesn't exists. In this case the payments-root-bucket can either be empty (no payments yet) or only contains buckets which we ensure with this check.

return fmt.Errorf("key must be a bucket: '%v'",
string(paymentsRootBucket))
}

paymentHashes = append(paymentHashes, string(hash))
return nil
}); err != nil {
return err
}

for _, paymentHash := range paymentHashes {
payment := payments.NestedReadWriteBucket([]byte(paymentHash))
if payment.Get(paymentHtlcsBucket) != nil {
return fmt.Errorf("key must be a bucket: '%v'",
string(paymentHtlcsBucket))
}

htlcs := payment.NestedReadWriteBucket(paymentHtlcsBucket)
if htlcs == nil {
// Nothing to migrate for this payment.
continue
}

if err := migrateHtlcsBucket(htlcs); err != nil {
return err
}
}

return nil
}

// migrateHtlcsBucket is a helper to gather, transform and re-store htlc attempt
// key/values.
func migrateHtlcsBucket(htlcs kvdb.RwBucket) error {
// Collect attempt ids so that we can migrate attempts one-by-one
// to avoid any bugs bbolt might have when invalidating cursors.
var aids []string

// First we collect all htlc attempt ids.
if err := htlcs.ForEach(func(aid, v []byte) error {
aids = append(aids, string(aid))
return nil
}); err != nil {
return err
}

// Next we go over these attempts, fetch all data and migrate.
for _, aid := range aids {
aidKey := []byte(aid)
attempt := htlcs.NestedReadWriteBucket(aidKey)
if attempt == nil {
return fmt.Errorf("non bucket element '%v' in '%v' "+
"bucket", aidKey, string(paymentHtlcsBucket))
}

// Collect attempt/settle/fail infos.
attemptInfo := attempt.Get(oldAttemptInfoKey)
if len(attemptInfo) > 0 {
newKey := htlcBucketKey(htlcAttemptInfoKey, aidKey)
if err := htlcs.Put(newKey, attemptInfo); err != nil {
return err
}
}

settleInfo := attempt.Get(oldSettleInfoKey)
if len(settleInfo) > 0 {
newKey := htlcBucketKey(htlcSettleInfoKey, aidKey)
if err := htlcs.Put(newKey, settleInfo); err != nil {
return err
}

}

failInfo := attempt.Get(oldFailInfoKey)
if len(failInfo) > 0 {
newKey := htlcBucketKey(htlcFailInfoKey, aidKey)
if err := htlcs.Put(newKey, failInfo); err != nil {
return err
}
}
}

// Finally we delete old attempt buckets.
for _, aid := range aids {
if err := htlcs.DeleteNestedBucket([]byte(aid)); err != nil {
return err
}
}

return nil
}
176 changes: 176 additions & 0 deletions channeldb/migration23/migration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
package migration23

import (
"testing"

"github.com/lightningnetwork/lnd/channeldb/migtest"
"github.com/lightningnetwork/lnd/kvdb"
)

var (
hexStr = migtest.Hex

hash1Str = "02acee76ebd53d00824410cf6adecad4f50334dac702bd5a2d3ba01b91709f0e"
hash1 = hexStr(hash1Str)
paymentID1 = hexStr("0000000000000001")
attemptID1 = hexStr("0000000000000001")
attemptID2 = hexStr("0000000000000002")

hash2Str = "62eb3f0a48f954e495d0c14ac63df04a67cefa59dafdbcd3d5046d1f5647840c"
hash2 = hexStr(hash2Str)
paymentID2 = hexStr("0000000000000002")
attemptID3 = hexStr("0000000000000003")

hash3Str = "99eb3f0a48f954e495d0c14ac63df04af8cefa59dafdbcd3d5046d1f564784d1"
hash3 = hexStr(hash3Str)

// failing1 will fail because all payment hashes should point to sub
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice docs!

// buckets containing payment data.
failing1 = map[string]interface{}{
hash1: "bogus",
}

// failing2 will fail because the "payment-htlcs-bucket" key must point
// to an actual bucket or be non-existent, but never point to a value.
failing2 = map[string]interface{}{
hash1: map[string]interface{}{
"payment-htlcs-bucket": "bogus",
},
}

// failing3 will fail because each attempt ID inside the
// "payment-htlcs-bucket" must point to a sub-bucket.
failing3 = map[string]interface{}{
hash1: map[string]interface{}{
"payment-creation-info": "aaaa",
"payment-fail-info": "bbbb",
"payment-htlcs-bucket": map[string]interface{}{
attemptID1: map[string]interface{}{
"htlc-attempt-info": "cccc",
"htlc-fail-info": "dddd",
},
attemptID2: "bogus",
},
"payment-sequence-key": paymentID1,
},
}

// pre is a sample snapshot (with fake values) before migration.
pre = map[string]interface{}{
hash1: map[string]interface{}{
"payment-creation-info": "aaaa",
"payment-fail-info": "bbbb",
"payment-htlcs-bucket": map[string]interface{}{
attemptID1: map[string]interface{}{
"htlc-attempt-info": "cccc",
"htlc-fail-info": "dddd",
},
},
"payment-sequence-key": paymentID1,
},
hash2: map[string]interface{}{
"payment-creation-info": "eeee",
"payment-htlcs-bucket": map[string]interface{}{
attemptID2: map[string]interface{}{
"htlc-attempt-info": "ffff",
"htlc-fail-info": "gggg",
},
attemptID3: map[string]interface{}{
"htlc-attempt-info": "hhhh",
"htlc-settle-info": "iiii",
},
},
"payment-sequence-key": paymentID2,
},
hash3: map[string]interface{}{
"payment-creation-info": "aaaa",
"payment-fail-info": "bbbb",
"payment-sequence-key": paymentID1,
},
}

// post is the expected data after migration.
post = map[string]interface{}{
hash1: map[string]interface{}{
"payment-creation-info": "aaaa",
"payment-fail-info": "bbbb",
"payment-htlcs-bucket": map[string]interface{}{
"ai" + attemptID1: "cccc",
"fi" + attemptID1: "dddd",
},
"payment-sequence-key": paymentID1,
},
hash2: map[string]interface{}{
"payment-creation-info": "eeee",
"payment-htlcs-bucket": map[string]interface{}{
"ai" + attemptID2: "ffff",
"fi" + attemptID2: "gggg",
"ai" + attemptID3: "hhhh",
"si" + attemptID3: "iiii",
},
"payment-sequence-key": paymentID2,
},
hash3: map[string]interface{}{
"payment-creation-info": "aaaa",
"payment-fail-info": "bbbb",
"payment-sequence-key": paymentID1,
},
}
)

// TestMigrateHtlcAttempts tests that migration htlc attempts to the flattened
// structure succeeds.
func TestMigrateHtlcAttempts(t *testing.T) {
bhandras marked this conversation as resolved.
Show resolved Hide resolved
var paymentsRootBucket = []byte("payments-root-bucket")
tests := []struct {
name string
shouldFail bool
pre map[string]interface{}
post map[string]interface{}
}{
{
name: "migration ok",
shouldFail: false,
pre: pre,
post: post,
},
{
name: "non-bucket payments-root-bucket",
shouldFail: true,
pre: failing1,
post: failing1,
},
{
name: "non-bucket payment-htlcs-bucket",
shouldFail: true,
pre: failing2,
post: failing2,
},
{
name: "non-bucket htlc attempt",
shouldFail: true,
pre: failing3,
post: failing3,
},
}

for _, test := range tests {
test := test

migtest.ApplyMigration(
t,
func(tx kvdb.RwTx) error {
return migtest.RestoreDB(
tx, paymentsRootBucket, test.pre,
)
},
func(tx kvdb.RwTx) error {
return migtest.VerifyDB(
tx, paymentsRootBucket, test.post,
)
},
MigrateHtlcAttempts,
test.shouldFail,
)
}
}
Loading