Skip to content

Commit

Permalink
fix(restore_test): versioned restore with mixed SSTable ID types
Browse files Browse the repository at this point in the history
This way we are able to cover mixed scenarios (integer + UUID SSTables),
which was missing from our test coverage. It also allows us to move
to testing the default UUID SSTables, which should be our priority.

Fixes #4182
  • Loading branch information
Michal-Leszczynski committed Jan 15, 2025
1 parent 31f90d4 commit 31cf41c
Showing 1 changed file with 106 additions and 4 deletions.
110 changes: 106 additions & 4 deletions pkg/service/restore/service_restore_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ import (
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"os"
"path"
"regexp"
"slices"
"strconv"
"strings"
"testing"
"time"
Expand All @@ -24,15 +26,18 @@ import (
"github.com/gocql/gocql"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/pkg/errors"
"github.com/scylladb/go-log"
"github.com/scylladb/gocqlx/v2"
"github.com/scylladb/scylla-manager/v3/pkg/metrics"
"github.com/scylladb/scylla-manager/v3/pkg/service/backup"
"github.com/scylladb/scylla-manager/v3/pkg/service/repair"
. "github.com/scylladb/scylla-manager/v3/pkg/service/restore"
"github.com/scylladb/scylla-manager/v3/pkg/sstable"
"github.com/scylladb/scylla-manager/v3/pkg/testutils/testconfig"
. "github.com/scylladb/scylla-manager/v3/pkg/testutils/testhelper"
"github.com/scylladb/scylla-manager/v3/pkg/util/jsonutil"
"github.com/scylladb/scylla-manager/v3/pkg/util/query"
"go.uber.org/atomic"
"go.uber.org/zap/zapcore"

Expand Down Expand Up @@ -184,6 +189,84 @@ func testLocation(bucket, dc string) Location {
}
}

// newIntegerSSTablesSnapshotResponseNotifier makes sure that half of the snapshot-ed SSTables
// will be renamed to integer based IDs right after the snapshot has been taken.
func newIntegerSSTablesSnapshotResponseNotifier(client *scyllaclient.Client, s gocqlx.Session) func(*http.Response, error) (*http.Response, error) {
cnt := 0
// getNewID returns the ID to which SSTable should be renamed.
// In order to simulate mixed ID scenario, it renames half of
// the SSTables to integer ID, and leaves the others with UUID.
mapping := make(map[string]string)
getNewID := func(id string) string {
if newID, ok := mapping[id]; ok {
return newID
}
cnt++
var newID string
if cnt%2 == 1 {
newID = fmt.Sprint(cnt)
} else {
newID = id
}
mapping[id] = newID
return newID
}

return func(r *http.Response, err error) (*http.Response, error) {
// Look for successful response to snapshot call
if err != nil || !strings.HasPrefix(r.Request.URL.Path, "/storage_service/snapshots") || r.Request.Method != http.MethodPost {
return nil, nil
}
host, _, err := net.SplitHostPort(r.Request.Host)
if err != nil {
return nil, errors.New("snapshot response notifier error: get response host: " + err.Error())
}
q := r.Request.URL.Query()
ks := q.Get("kn")
rawTabs := q.Get("cf")
tag := q.Get("tag")
tabs := strings.Split(rawTabs, ",")
if len(tabs) == 0 || slices.Equal(tabs, []string{""}) {
tabs, err = client.Tables(context.Background(), ks)
if err != nil {
return nil, errors.New("snapshot response notifier error: get keyspace tables: " + err.Error())
}
}

for _, tab := range tabs {
version, err := query.GetTableVersion(s, ks, tab)
if err != nil {
return nil, errors.New("snapshot response interceptor error: get table version: " + err.Error())
}
snapshotDir := path.Join(KeyspaceDir(ks), tab+"-"+version, "snapshots", tag)
// Get snapshot files
files := make([]string, 0)
err = client.RcloneListDirIter(context.Background(), host, snapshotDir, nil, func(item *scyllaclient.RcloneListDirItem) {
// Watch out for the non-sstable files (e.g. manifest.json)
if _, err := sstable.ExtractID(item.Path); err != nil {
return
}
files = append(files, item.Path)
})
if err != nil {
return nil, errors.New("snapshot response interceptor error: list snapshot files: " + err.Error())
}
// Rename snapshot files
mapping := sstable.RenameSStables(files, getNewID)
for initial, renamed := range mapping {
if initial != renamed {
src := path.Join(snapshotDir, initial)
dst := path.Join(snapshotDir, renamed)
if err := client.RcloneMoveFile(context.Background(), host, dst, src); err != nil {
return nil, errors.New("snapshot response interceptor error: rename SSTable ID: " + err.Error())
}
}
}
}
return nil, nil
}
}

func TestRestoreGetTargetUnitsViewsIntegration(t *testing.T) {
testCases := []struct {
name string
Expand Down Expand Up @@ -893,10 +976,10 @@ func restoreWithResume(t *testing.T, target Target, keyspace string, loadCnt, lo
func TestRestoreTablesVersionedIntegration(t *testing.T) {
testBucket, testKeyspace, testUser := getBucketKeyspaceUser(t)
const (
testLoadCnt = 2
testLoadCnt = 5
testLoadSize = 1
testBatchSize = 1
testParallel = 3
testParallel = 0
corruptCnt = 3
)

Expand All @@ -920,10 +1003,10 @@ func TestRestoreTablesVersionedIntegration(t *testing.T) {
func TestRestoreSchemaVersionedIntegration(t *testing.T) {
testBucket, testKeyspace, testUser := getBucketKeyspaceUser(t)
const (
testLoadCnt = 2
testLoadCnt = 5
testLoadSize = 1
testBatchSize = 1
testParallel = 3
testParallel = 0
corruptCnt = 3
)

Expand Down Expand Up @@ -973,6 +1056,11 @@ func restoreWithVersions(t *testing.T, target Target, keyspace string, loadCnt,
corruptedTable = "keyspaces"
}

// Force creation of integer SSTables in the snapshot dir,
// as only integer SSTables can be versioned.
// This also allows us to test scenario with mixed ID type SSTables.
srcH.Hrt.SetRespInterceptor(newIntegerSSTablesSnapshotResponseNotifier(srcH.Client, srcSession))

// Restore should be performed on user with limited permissions
dropNonSuperUsers(t, dstSession)
createUser(t, dstSession, user, "pass")
Expand Down Expand Up @@ -1012,6 +1100,16 @@ func restoreWithVersions(t *testing.T, target Target, keyspace string, loadCnt,
if _, err = VersionedFileCreationTime(item.Name); err == nil {
t.Fatalf("Versioned file %s present after first backup", path.Join(remoteDir, item.Path))
}

// Corrupt only integer SSTables
id, err := sstable.ExtractID(item.Name)
if err != nil {
t.Fatal(err)
}
if _, err := strconv.Atoi(id); err != nil {
return
}

if strings.HasSuffix(item.Name, ".db") {
switch {
case len(firstCorrupt) < corruptCnt:
Expand All @@ -1026,6 +1124,10 @@ func restoreWithVersions(t *testing.T, target Target, keyspace string, loadCnt,
if err != nil {
t.Fatal(err)
}
if len(firstCorrupt) == 0 || len(bothCorrupt) == 0 || len(secondCorrupt) == 0 {
t.Fatalf("No files to be corrupted, firstCorrupt: %d, bothCorrupt: %d, secondCorrupt: %d",
len(firstCorrupt), len(bothCorrupt), len(secondCorrupt))
}

crc32FileNameFromGivenSSTableFile := func(sstable string) string {
// Split the filename by dashes
Expand Down

0 comments on commit 31cf41c

Please sign in to comment.