Skip to content

Commit

Permalink
update TestExactAccountChunk to cause SP votersTracker to extend hist…
Browse files Browse the repository at this point in the history
…ory before making catchpoint
  • Loading branch information
cce committed Jan 8, 2025
1 parent 11a5f46 commit 69be46b
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 23 deletions.
58 changes: 39 additions & 19 deletions ledger/catchpointfilewriter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ func TestBasicCatchpointWriter(t *testing.T) {
require.Equal(t, uint64(len(accts)), uint64(len(chunk.Balances)))
}

func testWriteCatchpoint(t *testing.T, params config.ConsensusParams, rdb trackerdb.Store, datapath string, filepath string, maxResourcesPerChunk int) CatchpointFileHeader {
func testWriteCatchpoint(t *testing.T, params config.ConsensusParams, rdb trackerdb.Store, datapath string, filepath string, maxResourcesPerChunk int, onlineExcludeBefore basics.Round) CatchpointFileHeader {
var totalAccounts, totalKVs, totalOnlineAccounts, totalOnlineRoundParams, totalChunks uint64
var biggestChunkLen uint64
var accountsRnd basics.Round
Expand All @@ -307,7 +307,7 @@ func testWriteCatchpoint(t *testing.T, params config.ConsensusParams, rdb tracke
}

err := rdb.Transaction(func(ctx context.Context, tx trackerdb.TransactionScope) (err error) {
writer, err := makeCatchpointFileWriter(context.Background(), params, datapath, tx, maxResourcesPerChunk, 0)
writer, err := makeCatchpointFileWriter(context.Background(), params, datapath, tx, maxResourcesPerChunk, onlineExcludeBefore)
if err != nil {
return err
}
Expand Down Expand Up @@ -604,7 +604,7 @@ func TestFullCatchpointWriterOverflowAccounts(t *testing.T) {
catchpointDataFilePath := filepath.Join(temporaryDirectory, "15.data")
catchpointFilePath := filepath.Join(temporaryDirectory, "15.catchpoint")
const maxResourcesPerChunk = 5
testWriteCatchpoint(t, protoParams, ml.trackerDB(), catchpointDataFilePath, catchpointFilePath, maxResourcesPerChunk)
testWriteCatchpoint(t, protoParams, ml.trackerDB(), catchpointDataFilePath, catchpointFilePath, maxResourcesPerChunk, 0)

l := testNewLedgerFromCatchpoint(t, ml.trackerDB(), catchpointFilePath)
defer l.Close()
Expand Down Expand Up @@ -802,7 +802,7 @@ func TestFullCatchpointWriter(t *testing.T) {

catchpointDataFilePath := filepath.Join(temporaryDirectory, "15.data")
catchpointFilePath := filepath.Join(temporaryDirectory, "15.catchpoint")
testWriteCatchpoint(t, protoParams, ml.trackerDB(), catchpointDataFilePath, catchpointFilePath, 0)
testWriteCatchpoint(t, protoParams, ml.trackerDB(), catchpointDataFilePath, catchpointFilePath, 0, 0)

l := testNewLedgerFromCatchpoint(t, ml.trackerDB(), catchpointFilePath)
defer l.Close()
Expand Down Expand Up @@ -834,16 +834,19 @@ func TestExactAccountChunk(t *testing.T) {
partitiontest.PartitionTest(t)
// t.Parallel() // probably not good to parallelize catchpoint file save/load

t.Run("v39", func(t *testing.T) { testExactAccountChunk(t, protocol.ConsensusV39) })
t.Run("v40", func(t *testing.T) { testExactAccountChunk(t, protocol.ConsensusV40) })
t.Run("future", func(t *testing.T) { testExactAccountChunk(t, protocol.ConsensusFuture) })
t.Run("v39", func(t *testing.T) { testExactAccountChunk(t, protocol.ConsensusV39, 40) })
t.Run("v40", func(t *testing.T) { testExactAccountChunk(t, protocol.ConsensusV40, 40) })
t.Run("v40_SPstall", func(t *testing.T) { testExactAccountChunk(t, protocol.ConsensusV40, 100) })
t.Run("future", func(t *testing.T) { testExactAccountChunk(t, protocol.ConsensusFuture, 40) })
t.Run("future_SPstall", func(t *testing.T) { testExactAccountChunk(t, protocol.ConsensusFuture, 100) })
}

func testExactAccountChunk(t *testing.T, proto protocol.ConsensusVersion) {
func testExactAccountChunk(t *testing.T, proto protocol.ConsensusVersion, extraBlocks int) {
genBalances, addrs, _ := ledgertesting.NewTestGenesis(func(c *ledgertesting.GenesisCfg) {
c.OnlineCount = 1 // addrs[0] is online
}, ledgertesting.TurnOffRewards)
cfg := config.GetDefaultLocal()
params := config.Consensus[proto]

dl := NewDoubleLedger(t, genBalances, proto, cfg)
defer dl.Close()
Expand All @@ -863,8 +866,8 @@ func testExactAccountChunk(t *testing.T, proto protocol.ConsensusVersion) {
dl.fullBlock(&newacctpay)
}

// At least 32 more blocks so that we catchpoint after the accounts exist
for i := 0; i < 40; i++ {
// Add more blocks so that we catchpoint after the accounts exist
for i := 0; i < extraBlocks; i++ {
selfpay := pay
selfpay.Receiver = payFrom
selfpay.Note = ledgertesting.RandomNote()
Expand All @@ -874,7 +877,7 @@ func testExactAccountChunk(t *testing.T, proto protocol.ConsensusVersion) {
genR := testCatchpointFlushRound(dl.generator)
valR := testCatchpointFlushRound(dl.validator)
require.Equal(t, genR, valR)
require.EqualValues(t, BalancesPerCatchpointFileChunk-12+40, genR) // 540 (512-12+40)
require.EqualValues(t, BalancesPerCatchpointFileChunk-12+extraBlocks, genR)

require.Eventually(t, func() bool {
dl.generator.accts.accountsMu.RLock()
Expand All @@ -892,7 +895,24 @@ func testExactAccountChunk(t *testing.T, proto protocol.ConsensusVersion) {
catchpointDataFilePath := filepath.Join(tempDir, t.Name()+".data")
catchpointFilePath := filepath.Join(tempDir, t.Name()+".catchpoint.tar.gz")

cph := testWriteCatchpoint(t, config.Consensus[proto], dl.validator.trackerDB(), catchpointDataFilePath, catchpointFilePath, 0)
genDBRound := dl.generator.trackers.acctsOnline.cachedDBRoundOnline
valDBRound := dl.validator.trackers.acctsOnline.cachedDBRoundOnline
genLowestRound := dl.generator.trackers.acctsOnline.voters.lowestRound(genDBRound)
valLowestRound := dl.validator.trackers.acctsOnline.voters.lowestRound(valDBRound)
require.Equal(t, genLowestRound, valLowestRound)
require.Equal(t, genDBRound, valDBRound)

var onlineExcludeBefore basics.Round
// we added so many blocks that lowestRound is stuck at first state proof, round 240?
if normalHorizon := (genDBRound + 1).SubSaturate(basics.Round(params.MaxBalLookback)); normalHorizon == genLowestRound {
t.Logf("subtest is exercising case where 320 rounds of history are already in DB")
require.EqualValues(t, genLowestRound, params.StateProofInterval-params.StateProofVotersLookback)
} else if normalHorizon > genLowestRound {
t.Logf("subtest is exercising case where votersTracker causes onlineaccounts & onlineroundparams to extend history to round %d (DBRound %d)", genLowestRound, genDBRound)
onlineExcludeBefore = normalHorizon // fails without this adjustment
}

cph := testWriteCatchpoint(t, config.Consensus[proto], dl.validator.trackerDB(), catchpointDataFilePath, catchpointFilePath, 0, onlineExcludeBefore)

decodedData := readCatchpointFile(t, catchpointFilePath)

Expand Down Expand Up @@ -920,8 +940,8 @@ func testExactAccountChunk(t *testing.T, proto protocol.ConsensusVersion) {

if config.Consensus[proto].EnableCatchpointsWithOnlineAccounts {
// second and third chunks are onlinaccounts and onlineroundparams
require.Len(t, chunks[1].OnlineAccounts, 1)
require.Len(t, chunks[2].OnlineRoundParams, 320)
require.Len(t, chunks[1].OnlineAccounts, 1) // only 1 online account
require.Len(t, chunks[2].OnlineRoundParams, int(params.MaxBalLookback)) // 320
}

l := testNewLedgerFromCatchpoint(t, dl.generator.trackerDB(), catchpointFilePath)
Expand Down Expand Up @@ -974,7 +994,7 @@ func TestCatchpointAfterTxns(t *testing.T) {
catchpointDataFilePath := filepath.Join(tempDir, t.Name()+".data")
catchpointFilePath := filepath.Join(tempDir, t.Name()+".catchpoint.tar.gz")

cph := testWriteCatchpoint(t, config.Consensus[proto], dl.validator.trackerDB(), catchpointDataFilePath, catchpointFilePath, 0)
cph := testWriteCatchpoint(t, config.Consensus[proto], dl.validator.trackerDB(), catchpointDataFilePath, catchpointFilePath, 0, 0)
require.EqualValues(t, 3, cph.TotalChunks)

l := testNewLedgerFromCatchpoint(t, dl.validator.trackerDB(), catchpointFilePath)
Expand All @@ -990,7 +1010,7 @@ func TestCatchpointAfterTxns(t *testing.T) {
dl.fullBlock(&newacctpay)

// Write and read back in, and ensure even the last effect exists.
cph = testWriteCatchpoint(t, config.Consensus[proto], dl.validator.trackerDB(), catchpointDataFilePath, catchpointFilePath, 0)
cph = testWriteCatchpoint(t, config.Consensus[proto], dl.validator.trackerDB(), catchpointDataFilePath, catchpointFilePath, 0, 0)
require.EqualValues(t, cph.TotalChunks, 3) // Still only 3 chunks, as last was in a recent block

// Drive home the point that `last` is _not_ included in the catchpoint by inspecting balance read from catchpoint.
Expand All @@ -1006,7 +1026,7 @@ func TestCatchpointAfterTxns(t *testing.T) {
dl.fullBlock(pay.Noted(strconv.Itoa(i)))
}

cph = testWriteCatchpoint(t, config.Consensus[proto], dl.validator.trackerDB(), catchpointDataFilePath, catchpointFilePath, 0)
cph = testWriteCatchpoint(t, config.Consensus[proto], dl.validator.trackerDB(), catchpointDataFilePath, catchpointFilePath, 0, 0)
require.EqualValues(t, cph.TotalChunks, 4)

l = testNewLedgerFromCatchpoint(t, dl.validator.trackerDB(), catchpointFilePath)
Expand Down Expand Up @@ -1161,7 +1181,7 @@ assert
catchpointDataFilePath := filepath.Join(tempDir, t.Name()+".data")
catchpointFilePath := filepath.Join(tempDir, t.Name()+".catchpoint.tar.gz")

cph := testWriteCatchpoint(t, config.Consensus[proto], dl.generator.trackerDB(), catchpointDataFilePath, catchpointFilePath, 0)
cph := testWriteCatchpoint(t, config.Consensus[proto], dl.generator.trackerDB(), catchpointDataFilePath, catchpointFilePath, 0, 0)
require.EqualValues(t, 7, cph.TotalChunks)

l := testNewLedgerFromCatchpoint(t, dl.generator.trackerDB(), catchpointFilePath)
Expand Down Expand Up @@ -1261,7 +1281,7 @@ func TestCatchpointAfterBoxTxns(t *testing.T) {
catchpointDataFilePath := filepath.Join(tempDir, t.Name()+".data")
catchpointFilePath := filepath.Join(tempDir, t.Name()+".catchpoint.tar.gz")

cph := testWriteCatchpoint(t, config.Consensus[proto], dl.generator.trackerDB(), catchpointDataFilePath, catchpointFilePath, 0)
cph := testWriteCatchpoint(t, config.Consensus[proto], dl.generator.trackerDB(), catchpointDataFilePath, catchpointFilePath, 0, 0)
require.EqualValues(t, 3, cph.TotalChunks)

l := testNewLedgerFromCatchpoint(t, dl.generator.trackerDB(), catchpointFilePath)
Expand Down
8 changes: 4 additions & 4 deletions ledger/catchpointtracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,17 +225,17 @@ func (ct *catchpointTracker) finishFirstStage(ctx context.Context, dbRound basic
var onlineAccountsHash, onlineRoundParamsHash crypto.Digest
params := config.Consensus[blockProto]

// Usually onlineAccountsForgetBefore is dbRound - params.MaxBalLookback (320 roudns of history),
// Usually onlineAccountsForgetBefore is dbRound - params.MaxBalLookback (320 rounds of history),
// but if votersTracker needs more state, it can set lowestRound to be earlier than that.
// We want to only write MaxBalLookback rounds of history to the catchpoint file.
var onlineExcludeBefore basics.Round
if (dbRound + 1).SubSaturate(basics.Round(params.MaxBalLookback)) == onlineAccountsForgetBefore {
if normalOnlineHorizon := (dbRound + 1).SubSaturate(basics.Round(params.MaxBalLookback)); normalOnlineHorizon == onlineAccountsForgetBefore {
// this is the common case, so we pass 0 so the DB dumps the full table, as is
onlineExcludeBefore = 0
} else if (dbRound + 1).SubSaturate(basics.Round(params.MaxBalLookback)) > onlineAccountsForgetBefore {
} else if normalOnlineHorizon > onlineAccountsForgetBefore {
// the previous flush left more online-related rows than we want in the DB. we need to tell
// the catchpoint writer to exclude the rows that are older than the ones we want to keep.
onlineExcludeBefore = (dbRound + 1).SubSaturate(basics.Round(params.MaxBalLookback))
onlineExcludeBefore = normalOnlineHorizon
} else {
// The previous flush left less online-related rows than we want in the DB. This should not happen; return error
ct.log.Errorf("catchpointTracker.finishFirstStage: dbRound %d and onlineAccountsForgetBefore %d has less history than MaxBalLookback %d",
Expand Down

0 comments on commit 69be46b

Please sign in to comment.