Skip to content

Commit

Permalink
fix(restore_test): versioned restore with mixed SSTable ID types
Browse files Browse the repository at this point in the history
This way we are able to cover mixed scenarios (integer + UUID SSTables),
which was missing from our test coverage. It also allows us to move
to testing the default UUID SSTables, which should be our priority.

Fixes #4182
  • Loading branch information
Michal-Leszczynski committed Jan 16, 2025
1 parent 31f90d4 commit ad75906
Showing 1 changed file with 128 additions and 9 deletions.
137 changes: 128 additions & 9 deletions pkg/service/restore/service_restore_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,34 @@ import (
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"os"
"path"
"regexp"
"slices"
"strconv"
"strings"
"sync"
"testing"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/gocql/gocql"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/pkg/errors"
"github.com/scylladb/go-log"
"github.com/scylladb/gocqlx/v2"
"github.com/scylladb/scylla-manager/v3/pkg/metrics"
"github.com/scylladb/scylla-manager/v3/pkg/service/backup"
"github.com/scylladb/scylla-manager/v3/pkg/service/repair"
. "github.com/scylladb/scylla-manager/v3/pkg/service/restore"
"github.com/scylladb/scylla-manager/v3/pkg/sstable"
"github.com/scylladb/scylla-manager/v3/pkg/testutils/testconfig"
. "github.com/scylladb/scylla-manager/v3/pkg/testutils/testhelper"
"github.com/scylladb/scylla-manager/v3/pkg/util/jsonutil"
"github.com/scylladb/scylla-manager/v3/pkg/util/query"
"go.uber.org/atomic"
"go.uber.org/zap/zapcore"

Expand Down Expand Up @@ -184,6 +190,100 @@ func testLocation(bucket, dc string) Location {
}
}

// newRenameSnapshotSSTablesRespInterceptor renames SSTables in the snapshot directory right
// after the snapshot has been taken. It uses the name mapping provided by the idGen function.
func newRenameSnapshotSSTablesRespInterceptor(client *scyllaclient.Client, s gocqlx.Session, idGen func(id string) string) func(*http.Response, error) (*http.Response, error) {
return func(r *http.Response, err error) (*http.Response, error) {
// Look for successful response to snapshot call
if err != nil || !strings.HasPrefix(r.Request.URL.Path, "/storage_service/snapshots") || r.Request.Method != http.MethodPost {
return nil, nil
}
host, _, err := net.SplitHostPort(r.Request.Host)
if err != nil {
return nil, errors.New("snapshot response notifier error: get response host: " + err.Error())
}
q := r.Request.URL.Query()
ks := q.Get("kn")
rawTabs := q.Get("cf")
tag := q.Get("tag")
tabs := strings.Split(rawTabs, ",")
if len(tabs) == 0 || slices.Equal(tabs, []string{""}) {
tabs, err = client.Tables(context.Background(), ks)
if err != nil {
return nil, errors.New("snapshot response notifier error: get keyspace tables: " + err.Error())
}
}

for _, tab := range tabs {
version, err := query.GetTableVersion(s, ks, tab)
if err != nil {
return nil, errors.New("snapshot response interceptor error: get table version: " + err.Error())
}
snapshotDir := path.Join(KeyspaceDir(ks), tab+"-"+version, "snapshots", tag)
// Get snapshot files
files := make([]string, 0)
err = client.RcloneListDirIter(context.Background(), host, snapshotDir, nil, func(item *scyllaclient.RcloneListDirItem) {
// Watch out for the non-sstable files (e.g. manifest.json)
if _, err := sstable.ExtractID(item.Name); err != nil {
return
}
files = append(files, item.Name)
})
if err != nil {
return nil, errors.New("snapshot response interceptor error: list snapshot files: " + err.Error())
}
// Rename snapshot files
mapping := sstable.RenameSStables(files, idGen)
for initial, renamed := range mapping {
if initial != renamed {
src := path.Join(snapshotDir, initial)
dst := path.Join(snapshotDir, renamed)
if err := client.RcloneMoveFile(context.Background(), host, dst, src); err != nil {
return nil, errors.New("snapshot response interceptor error: rename SSTable ID: " + err.Error())
}
}
}
}
return nil, nil
}
}

// halfUUIDToIntIDGen is a possible idGen that can be used in newRenameSnapshotSSTablesRespInterceptor.
// It maps around half of encountered UUID SSTables into integer SSTables.
// It only works if the snapshot dir has less than 10000000 SSTables.
func halfUUIDToIntIDGen() func(string) string {
var mu sync.Mutex
mapping := make(map[string]string)
renameUUID := true
cnt := 10000000
return func(id string) string {
mu.Lock()
defer mu.Unlock()
// Handle integer SSTable.
// We want to leave them as they are.
// We hope that because cnt is set to a huge
// number, we won't encounter name collisions with
// the renamed UUID SSTables.
if _, err := strconv.Atoi(id); err == nil {
return id
}

if newID, ok := mapping[id]; ok {
return newID
}
cnt++
// Handle UUID SSTable.
// We want to rename only half of them.
if renameUUID {
mapping[id] = fmt.Sprint(cnt)
} else {
mapping[id] = id
}
renameUUID = !renameUUID
return mapping[id]
}
}

func TestRestoreGetTargetUnitsViewsIntegration(t *testing.T) {
testCases := []struct {
name string
Expand Down Expand Up @@ -893,10 +993,10 @@ func restoreWithResume(t *testing.T, target Target, keyspace string, loadCnt, lo
func TestRestoreTablesVersionedIntegration(t *testing.T) {
testBucket, testKeyspace, testUser := getBucketKeyspaceUser(t)
const (
testLoadCnt = 2
testLoadCnt = 5
testLoadSize = 1
testBatchSize = 1
testParallel = 3
testParallel = 0
corruptCnt = 3
)

Expand All @@ -920,10 +1020,10 @@ func TestRestoreTablesVersionedIntegration(t *testing.T) {
func TestRestoreSchemaVersionedIntegration(t *testing.T) {
testBucket, testKeyspace, testUser := getBucketKeyspaceUser(t)
const (
testLoadCnt = 2
testLoadCnt = 5
testLoadSize = 1
testBatchSize = 1
testParallel = 3
testParallel = 0
corruptCnt = 3
)

Expand Down Expand Up @@ -973,10 +1073,15 @@ func restoreWithVersions(t *testing.T, target Target, keyspace string, loadCnt,
corruptedTable = "keyspaces"
}

// Force creation of integer SSTables in the snapshot dir,
// as only integer SSTables can be versioned.
// This also allows us to test scenario with mixed ID type SSTables.
srcH.Hrt.SetRespInterceptor(newRenameSnapshotSSTablesRespInterceptor(srcH.Client, srcSession, halfUUIDToIntIDGen()))

// Restore should be performed on user with limited permissions
dropNonSuperUsers(t, dstSession)
createUser(t, dstSession, user, "pass")
dstH = newRestoreTestHelper(t, mgrSession, cfg, target.Location[0], nil, user, "pass")
//dropNonSuperUsers(t, dstSession)
//createUser(t, dstSession, user, "pass")
//dstH = newRestoreTestHelper(t, mgrSession, cfg, target.Location[0], nil, user, "pass")

if target.RestoreTables {
Print("Recreate schema on destination cluster")
Expand Down Expand Up @@ -1012,6 +1117,16 @@ func restoreWithVersions(t *testing.T, target Target, keyspace string, loadCnt,
if _, err = VersionedFileCreationTime(item.Name); err == nil {
t.Fatalf("Versioned file %s present after first backup", path.Join(remoteDir, item.Path))
}

// Corrupt only integer SSTables
id, err := sstable.ExtractID(item.Name)
if err != nil {
t.Fatal(err)
}
if _, err := strconv.Atoi(id); err != nil {
return
}

if strings.HasSuffix(item.Name, ".db") {
switch {
case len(firstCorrupt) < corruptCnt:
Expand All @@ -1026,6 +1141,10 @@ func restoreWithVersions(t *testing.T, target Target, keyspace string, loadCnt,
if err != nil {
t.Fatal(err)
}
if len(firstCorrupt) == 0 || len(bothCorrupt) == 0 || len(secondCorrupt) == 0 {
t.Fatalf("No files to be corrupted, firstCorrupt: %d, bothCorrupt: %d, secondCorrupt: %d",
len(firstCorrupt), len(bothCorrupt), len(secondCorrupt))
}

crc32FileNameFromGivenSSTableFile := func(sstable string) string {
// Split the filename by dashes
Expand Down Expand Up @@ -1146,9 +1265,9 @@ func restoreWithVersions(t *testing.T, target Target, keyspace string, loadCnt,
target.SnapshotTag = tag3

if target.RestoreTables {
grantRestoreTablesPermissions(t, dstSession, target.Keyspace, user)
// grantRestoreTablesPermissions(t, dstSession, target.Keyspace, user)
} else {
grantRestoreSchemaPermissions(t, dstSession, user)
// grantRestoreSchemaPermissions(t, dstSession, user)
}

if err = dstH.service.Restore(ctx, dstH.ClusterID, dstH.TaskID, dstH.RunID, dstH.targetToProperties(target)); err != nil {
Expand Down

0 comments on commit ad75906

Please sign in to comment.