diff --git a/Makefile b/Makefile index 569ca52b3..e95384784 100644 --- a/Makefile +++ b/Makefile @@ -115,6 +115,7 @@ INTEGRATION_TEST_ARGS := -cluster $(PUBLIC_NET)100 \ -managed-cluster $(PUBLIC_NET)11,$(PUBLIC_NET)12,$(PUBLIC_NET)13,$(PUBLIC_NET)21,$(PUBLIC_NET)22,$(PUBLIC_NET)23 \ -test-network $(PUBLIC_NET) \ -managed-second-cluster $(PUBLIC_NET)31,$(PUBLIC_NET)32 \ +-managed-third-cluster $(PUBLIC_NET)41,$(PUBLIC_NET)42 \ -user cassandra -password cassandra \ -agent-auth-token token \ -s3-data-dir ./testing/minio/data -s3-provider Minio -s3-endpoint $(MINIO_ENDPOINT) -s3-access-key-id $(MINIO_USER_ACCESS_KEY) -s3-secret-access-key $(MINIO_USER_SECRET_KEY) diff --git a/pkg/command/restore/cmd.go b/pkg/command/restore/cmd.go index 834f132a8..58db403ce 100644 --- a/pkg/command/restore/cmd.go +++ b/pkg/command/restore/cmd.go @@ -37,6 +37,7 @@ type command struct { restoreTables bool dryRun bool showTables bool + dcMapping dcMappings } func NewCommand(client *managerclient.Client) *cobra.Command { @@ -90,6 +91,7 @@ func (cmd *command) init() { w.Unwrap().BoolVar(&cmd.restoreTables, "restore-tables", false, "") w.Unwrap().BoolVar(&cmd.dryRun, "dry-run", false, "") w.Unwrap().BoolVar(&cmd.showTables, "show-tables", false, "") + w.Unwrap().Var(&cmd.dcMapping, "dc-mapping", "") } func (cmd *command) run(args []string) error { @@ -182,6 +184,13 @@ func (cmd *command) run(args []string) error { props["restore_tables"] = cmd.restoreTables ok = true } + if cmd.Flag("dc-mapping").Changed { + if cmd.Update() { + return wrapper("dc-mapping") + } + props["dc-mapping"] = cmd.dcMapping + ok = true + } if cmd.dryRun { res, err := cmd.client.GetRestoreTarget(cmd.Context(), cmd.cluster, task) diff --git a/pkg/command/restore/dcmappings.go b/pkg/command/restore/dcmappings.go new file mode 100644 index 000000000..2b43bc58a --- /dev/null +++ b/pkg/command/restore/dcmappings.go @@ -0,0 +1,89 @@ +// Copyright (C) 2025 ScyllaDB + +package restore + +import ( + "slices" + "strings" + + "github.com/pkg/errors" +) + +type dcMappings []dcMapping + +type dcMapping struct { + Source []string `json:"source"` + IgnoreSource []string `json:"ignore_source"` + Target []string `json:"target"` + IgnoreTarget []string `json:"ignore_target"` +} + +const ignoreDCPrefix = "!" + +// Set parses --dc-mapping flag, where the syntax is following: +// ; - used to split different mappings +// => - used to split source => target DCs +// , - used to seprate DCs +// ! - used to ignore DC. +func (dcm *dcMappings) Set(v string) error { + mappingParts := strings.Split(v, ";") + for _, dcMapPart := range mappingParts { + sourceTargetParts := strings.Split(dcMapPart, "=>") + if len(sourceTargetParts) != 2 { + return errors.New("invalid syntax, mapping should be in a format of sourceDcs=>targetDcs, but got: " + dcMapPart) + } + if sourceTargetParts[0] == "" || sourceTargetParts[1] == "" { + return errors.New("invalid syntax, mapping should be in a format of sourceDcs=>targetDcs, but got: " + dcMapPart) + } + + var mapping dcMapping + mapping.Source, mapping.IgnoreSource = parseDCList(strings.Split(sourceTargetParts[0], ",")) + mapping.Target, mapping.IgnoreTarget = parseDCList(strings.Split(sourceTargetParts[1], ",")) + + *dcm = append(*dcm, mapping) + } + return nil +} + +func parseDCList(list []string) (dcs, ignore []string) { + for _, dc := range list { + if strings.HasPrefix(dc, ignoreDCPrefix) { + ignore = append(ignore, strings.TrimPrefix(dc, ignoreDCPrefix)) + continue + } + dcs = append(dcs, dc) + } + return dcs, ignore +} + +// String builds --dc-mapping flag back from struct. +func (dcm *dcMappings) String() string { + if dcm == nil { + return "" + } + var res strings.Builder + for i, mapping := range *dcm { + source := slices.Concat(mapping.Source, addIgnorePrefix(mapping.IgnoreSource)) + target := slices.Concat(mapping.Target, addIgnorePrefix(mapping.IgnoreTarget)) + res.WriteString( + strings.Join(source, ",") + "=>" + strings.Join(target, ","), + ) + if i != len(*dcm)-1 { + res.WriteString(";") + } + } + return res.String() +} + +func addIgnorePrefix(ignore []string) []string { + var result []string + for _, v := range ignore { + result = append(result, ignoreDCPrefix+v) + } + return result +} + +// Type implements pflag.Value interface. +func (dcm *dcMappings) Type() string { + return "dc-mapping" +} diff --git a/pkg/command/restore/dcmappings_test.go b/pkg/command/restore/dcmappings_test.go new file mode 100644 index 000000000..8697d5aa6 --- /dev/null +++ b/pkg/command/restore/dcmappings_test.go @@ -0,0 +1,185 @@ +// Copyright (C) 2025 ScyllaDB +package restore + +import ( + "fmt" + "slices" + "testing" +) + +func TestSetDCMapping(t *testing.T) { + testCases := []struct { + input string + expectedErr bool + expectedMappings dcMappings + }{ + { + input: "dc1=>dc2", + expectedMappings: dcMappings{ + {Source: []string{"dc1"}, Target: []string{"dc2"}}, + }, + }, + { + input: "dc1, dc2=>dc1, dc2", + expectedMappings: dcMappings{ + {Source: []string{"dc1", "dc2"}, Target: []string{"dc1", "dc2"}}, + }, + }, + { + input: "dc1=>dc3;dc2=>dc4", + expectedMappings: dcMappings{ + {Source: []string{"dc1"}, Target: []string{"dc3"}}, + {Source: []string{"dc2"}, Target: []string{"dc4"}}, + }, + }, + { + input: "dc1,dc2=>dc3", + expectedMappings: dcMappings{ + {Source: []string{"dc1", "dc2"}, Target: []string{"dc3"}}, + }, + }, + { + input: "dc1,!dc2=>dc3", + expectedMappings: dcMappings{ + { + Source: []string{"dc1"}, + Target: []string{"dc3"}, + IgnoreSource: []string{"dc2"}, + }, + }, + }, + { + input: "dc1,!dc2=>dc3,!dc4", + expectedMappings: dcMappings{ + { + Source: []string{"dc1"}, + Target: []string{"dc3"}, + IgnoreSource: []string{"dc2"}, + IgnoreTarget: []string{"dc4"}, + }, + }, + }, + { + input: "dc1,!dc2=>dc3,!dc4", + expectedMappings: dcMappings{ + { + Source: []string{"dc1"}, + Target: []string{"dc3"}, + + IgnoreSource: []string{"dc2"}, + IgnoreTarget: []string{"dc4"}, + }, + }, + }, + { + input: "!dc1,dc2=>dc3,!dc4", + expectedMappings: dcMappings{ + { + Source: []string{"dc2"}, + Target: []string{"dc3"}, + + IgnoreSource: []string{"dc1"}, + IgnoreTarget: []string{"dc4"}, + }, + }, + }, + { + input: "dc1=>dc3=>dc2=>dc4", + expectedMappings: dcMappings{}, + expectedErr: true, + }, + { + input: ";", + expectedMappings: dcMappings{}, + expectedErr: true, + }, + { + input: "=>", + expectedMappings: dcMappings{}, + expectedErr: true, + }, + { + input: "dc1=>", + expectedMappings: dcMappings{}, + expectedErr: true, + }, + { + input: "dc1=>;", + expectedMappings: dcMappings{}, + expectedErr: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.input, func(t *testing.T) { + var mappings dcMappings + + err := mappings.Set(tc.input) + if tc.expectedErr && err == nil { + t.Fatal("Expected err, but got nil") + } + if !tc.expectedErr && err != nil { + t.Fatalf("Unexpected err: %v", err) + } + slices.EqualFunc(tc.expectedMappings, mappings, func(a, b dcMapping) bool { + return slices.Equal(a.Source, b.Source) && + slices.Equal(a.Target, b.Target) && + slices.Equal(a.IgnoreSource, b.IgnoreSource) && + slices.Equal(a.IgnoreTarget, b.IgnoreTarget) + }) + }) + } + +} + +func TestDCMappingString(t *testing.T) { + testCases := []struct { + mappings dcMappings + expected string + }{ + { + mappings: dcMappings{ + {Source: []string{"dc1"}, Target: []string{"dc2"}}, + }, + expected: "dc1=>dc2", + }, + { + mappings: dcMappings{ + {Source: []string{"dc1"}, Target: []string{"dc2"}}, + {Source: []string{"dc3"}, Target: []string{"dc4"}}, + }, + expected: "dc1=>dc2;dc3=>dc4", + }, + { + mappings: dcMappings{ + { + Source: []string{"dc1"}, + Target: []string{"dc2"}, + IgnoreSource: []string{"dc2"}, + }, + }, + expected: "dc1,!dc2=>dc2", + }, + { + mappings: dcMappings{ + { + Source: []string{"dc1"}, + Target: []string{"dc2"}, + IgnoreSource: []string{"dc2"}, + IgnoreTarget: []string{"dc3"}, + }, + }, + expected: "dc1,!dc2=>dc2,!dc3", + }, + {}, + } + + for i, tc := range testCases { + t.Run(fmt.Sprintf("case %d", i), func(t *testing.T) { + actual := tc.mappings.String() + if actual != tc.expected { + t.Fatalf("Expected %q, but got %q", tc.expected, actual) + } + }) + } +} diff --git a/pkg/command/restore/res.yaml b/pkg/command/restore/res.yaml index eaeb4bd56..bfe58fa30 100644 --- a/pkg/command/restore/res.yaml +++ b/pkg/command/restore/res.yaml @@ -72,3 +72,16 @@ dry-run: | show-tables: | Prints table names together with keyspace, used in combination with --dry-run. + +dc-mapping: | + Specifies mapping between DCs from the backup and DCs in the restored(target) cluster. + All DCs from source cluster should be explicitly mapped to all DCs in the target cluster. The only exception is when + source and target cluster has exact match: source dcs == target dcs. + Only works with tables restoration (--restore-tables=true). + Syntax: + "source_dc1,source_dc2=>target_dc1,target_dc2" + Multiple mappings are separated by semicolons (;). Exclamation mark (!) before a DC indicates that it should be ignored during restore. + Examples: + "dc1,dc2=>dc3" - data from dc1 and dc2 DCs should be restored to dc3 DC. + "dc1,dc2=>dc3,!dc4" - data from dc1 and dc2 DCs should be restored to dc3 DC. Ignoring dc4 DC from target cluster. + "dc1,!dc2=>dc2" - data from dc1 should be restored to dc2 DC. Ignoring dc2 from source cluster. diff --git a/pkg/service/restore/batch.go b/pkg/service/restore/batch.go index 50646e846..85bd5bb29 100644 --- a/pkg/service/restore/batch.go +++ b/pkg/service/restore/batch.go @@ -8,6 +8,7 @@ import ( "sync" "github.com/pkg/errors" + "github.com/scylladb/go-set/strset" . "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec" ) @@ -57,7 +58,7 @@ type batchDispatcher struct { hostShardCnt map[string]uint } -func newBatchDispatcher(workload Workload, batchSize int, hostShardCnt map[string]uint, locationHosts map[Location][]string) *batchDispatcher { +func newBatchDispatcher(workload Workload, batchSize int, hostShardCnt map[string]uint, locationHosts map[Location][]string, hostDCs map[string][]string) *batchDispatcher { sortWorkload(workload) var shards uint for _, sh := range hostShardCnt { @@ -70,7 +71,7 @@ func newBatchDispatcher(workload Workload, batchSize int, hostShardCnt map[strin mu: sync.Mutex{}, wait: make(chan struct{}), workload: workload, - workloadProgress: newWorkloadProgress(workload, locationHosts), + workloadProgress: newWorkloadProgress(workload, locationHosts, hostDCs), batchSize: batchSize, expectedShardWorkload: workload.TotalSize / int64(shards), hostShardCnt: hostShardCnt, @@ -106,7 +107,7 @@ type remoteSSTableDirProgress struct { RemainingSSTables []RemoteSSTable } -func newWorkloadProgress(workload Workload, locationHosts map[Location][]string) workloadProgress { +func newWorkloadProgress(workload Workload, locationHosts map[Location][]string, hostDCs map[string][]string) workloadProgress { dcBytes := make(map[string]int64) locationDC := make(map[string][]string) p := make([]remoteSSTableDirProgress, len(workload.RemoteDir)) @@ -121,7 +122,9 @@ func newWorkloadProgress(workload Workload, locationHosts map[Location][]string) hostDCAccess := make(map[string][]string) for loc, hosts := range locationHosts { for _, h := range hosts { - hostDCAccess[h] = append(hostDCAccess[h], locationDC[loc.StringWithoutDC()]...) + dcsInLoc := locationDC[loc.StringWithoutDC()] + hostAllDCs := hostDCs[h] + hostDCAccess[h] = append(hostDCAccess[h], strset.Intersection(strset.New(dcsInLoc...), strset.New(hostAllDCs...)).List()...) } } return workloadProgress{ @@ -201,8 +204,8 @@ func (bd *batchDispatcher) ValidateAllDispatched() error { for i, rdp := range bd.workloadProgress.remoteDir { if rdp.RemainingSize != 0 || len(rdp.RemainingSSTables) != 0 { rdw := bd.workload.RemoteDir[i] - return errors.Errorf("failed to restore sstables from location %s table %s.%s (%d bytes). See logs for more info", - rdw.Location, rdw.Keyspace, rdw.Table, rdw.Size) + return errors.Errorf("failed to restore sstables from location %s dc %s table %s.%s (%d bytes). See logs for more info", + rdw.Location, rdw.DC, rdw.Keyspace, rdw.Table, rdw.Size) } } for dc, bytes := range bd.workloadProgress.dcBytesToBeRestored { diff --git a/pkg/service/restore/batch_test.go b/pkg/service/restore/batch_test.go index 9f206716e..a79c58b4c 100644 --- a/pkg/service/restore/batch_test.go +++ b/pkg/service/restore/batch_test.go @@ -5,6 +5,8 @@ package restore import ( "testing" + "github.com/google/go-cmp/cmp" + "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec" ) @@ -114,7 +116,13 @@ func TestBatchDispatcher(t *testing.T) { "h3": 3, } - bd := newBatchDispatcher(workload, 1, hostToShard, locationHosts) + hostDCs := map[string][]string{ + "h1": {"dc1", "dc2"}, + "h2": {"dc1", "dc2"}, + "h3": {"dc3"}, + } + + bd := newBatchDispatcher(workload, 1, hostToShard, locationHosts, hostDCs) scenario := []struct { host string @@ -166,3 +174,162 @@ func TestBatchDispatcher(t *testing.T) { t.Fatalf("Expected sstables to be batched: %s", err) } } + +func TestNewWorkloadProgress(t *testing.T) { + testCases := []struct { + name string + + workload Workload + locationHosts map[backupspec.Location][]string + hostDCs map[string][]string + + expected map[string][]string + }{ + { + name: "one location with one DC", + workload: generateWorkload(t, []string{""}, map[string][]string{"": {"dc1"}}), + locationHosts: map[backupspec.Location][]string{ + {}: {"host1", "host2"}, + }, + hostDCs: map[string][]string{ + "host1": {"dc1"}, + "host2": {"dc1"}, + }, + + expected: map[string][]string{ + "host1": {"dc1"}, + "host2": {"dc1"}, + }, + }, + { + name: "one location with two DC's", + workload: generateWorkload(t, []string{""}, map[string][]string{"": {"dc1", "dc2"}}), + locationHosts: map[backupspec.Location][]string{ + {}: {"host1", "host2"}, + }, + hostDCs: map[string][]string{ + "host1": {"dc1"}, + "host2": {"dc2"}, + }, + + expected: map[string][]string{ + "host1": {"dc1"}, + "host2": {"dc2"}, + }, + }, + { + name: "one location with two DC's, more nodes", + workload: generateWorkload(t, []string{""}, map[string][]string{"": {"dc1", "dc2"}}), + locationHosts: map[backupspec.Location][]string{ + {}: {"host1", "host2", "host3", "host4"}, + }, + hostDCs: map[string][]string{ + "host1": {"dc1"}, + "host2": {"dc1"}, + "host3": {"dc2"}, + "host4": {"dc2"}, + }, + + expected: map[string][]string{ + "host1": {"dc1"}, + "host2": {"dc1"}, + "host3": {"dc2"}, + "host4": {"dc2"}, + }, + }, + { + name: "two locations with one DC each", + workload: generateWorkload(t, + []string{"location1", "location2"}, + map[string][]string{"location1": {"dc1"}, "location2": {"dc2"}}, + ), + locationHosts: map[backupspec.Location][]string{ + {Path: "location1"}: {"host1", "host2"}, + {Path: "location2"}: {"host1", "host2"}, + }, + hostDCs: map[string][]string{ + "host1": {"dc1"}, + "host2": {"dc2"}, + }, + + expected: map[string][]string{ + "host1": {"dc1"}, + "host2": {"dc2"}, + }, + }, + { + name: "two locations with one DC each, but some hosts doesn't have access", + workload: generateWorkload(t, + []string{"location1", "location2"}, + map[string][]string{"location1": {"dc1"}, "location2": {"dc2"}}, + ), + locationHosts: map[backupspec.Location][]string{ + {Path: "location1"}: {"host1", "host3", "host4"}, + {Path: "location2"}: {"host1", "host2", "host4"}, + }, + hostDCs: map[string][]string{ + "host1": {"dc1"}, + "host2": {"dc1"}, + "host3": {"dc2"}, + "host4": {"dc2"}, + }, + + expected: map[string][]string{ + "host1": {"dc1"}, + "host2": nil, + "host3": nil, + "host4": {"dc2"}, + }, + }, + { + name: "two locations with one DC each, but hosts maps to all dcs", + workload: generateWorkload(t, + []string{"location1", "location2"}, + map[string][]string{"location1": {"dc1"}, "location2": {"dc2"}}, + ), + locationHosts: map[backupspec.Location][]string{ + {Path: "location1"}: {"host1", "host2"}, + {Path: "location2"}: {"host1", "host2"}, + }, + hostDCs: map[string][]string{ + "host1": {"dc1", "dc2"}, + "host2": {"dc1", "dc2"}, + }, + + expected: map[string][]string{ + "host1": {"dc1", "dc2"}, + "host2": {"dc1", "dc2"}, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + wp := newWorkloadProgress(tc.workload, tc.locationHosts, tc.hostDCs) + if diff := cmp.Diff(wp.hostDCAccess, tc.expected); diff != "" { + t.Fatalf("Actual != Expected: %s", diff) + } + }) + } +} + +func generateWorkload(t *testing.T, locationPaths []string, dcsInLocation map[string][]string) Workload { + t.Helper() + + var remoteDirs []RemoteDirWorkload + for _, path := range locationPaths { + dcs, ok := dcsInLocation[path] + if !ok { + t.Fatalf("each location should have corresponding entry in dcsInLocation map") + } + for _, dc := range dcs { + remoteDirs = append(remoteDirs, RemoteDirWorkload{ + ManifestInfo: &backupspec.ManifestInfo{ + DC: dc, + Location: backupspec.Location{Path: path}, + }, + }) + } + } + return Workload{RemoteDir: remoteDirs} +} diff --git a/pkg/service/restore/index.go b/pkg/service/restore/index.go index 32f91dee9..b80417d9a 100644 --- a/pkg/service/restore/index.go +++ b/pkg/service/restore/index.go @@ -76,6 +76,10 @@ func (w *tablesWorker) indexLocationWorkload(ctx context.Context, location Locat func (w *tablesWorker) createRemoteDirWorkloads(ctx context.Context, location Location) ([]RemoteDirWorkload, error) { var rawWorkload []RemoteDirWorkload err := w.forEachManifest(ctx, location, func(m ManifestInfoWithContent) error { + if slices.Contains(w.target.ignoredSourceDC, m.DC) { + w.logger.Info(ctx, "Ignoring DC", "dc", m.DC, "location", location) + return nil + } return m.ForEachIndexIterWithError(nil, func(fm FilesMeta) error { if !unitsContainTable(w.run.Units, fm.Keyspace, fm.Table) { return nil diff --git a/pkg/service/restore/model.go b/pkg/service/restore/model.go index 2dc654399..84f776f03 100644 --- a/pkg/service/restore/model.go +++ b/pkg/service/restore/model.go @@ -10,6 +10,7 @@ import ( "github.com/gocql/gocql" "github.com/pkg/errors" + "github.com/scylladb/go-set/strset" "github.com/scylladb/gocqlx/v2" "github.com/scylladb/scylla-manager/v3/pkg/scyllaclient" "github.com/scylladb/scylla-manager/v3/pkg/service/repair" @@ -33,9 +34,14 @@ type Target struct { RestoreSchema bool `json:"restore_schema,omitempty"` RestoreTables bool `json:"restore_tables,omitempty"` Continue bool `json:"continue"` + DCMappings DCMappings `json:"dc-mapping"` // Cache for host with access to remote location locationHosts map[Location][]string `json:"-"` + // Cache for host and their DC after applying DCMappings + hostDCs map[string][]string + // Cache for dcs that shouldn't be restored from location + ignoredSourceDC []string } const ( @@ -82,6 +88,16 @@ func (t Target) validateProperties(dcMap map[string][]string) error { if t.RestoreSchema && t.Keyspace != nil { return errors.New("restore schema always restores 'system_schema.*' tables only, no need to specify '--keyspace' flag") } + // Check for duplicates in Location + allLocations := strset.New() + for _, l := range t.Location { + p := l.RemotePath("") + if allLocations.Has(p) { + return errors.Errorf("location %s is specified multiple times", l) + } + allLocations.Add(p) + } + return nil } @@ -289,3 +305,42 @@ type HostInfo struct { Transfers int RateLimit int } + +// DCMappings represents how DCs from the backup cluster are mapped to DCs in the restore cluster. +// For details about how DCs can be mapped refer to --dc-mapping documentation. +type DCMappings []DCMapping + +// DCMapping represent single instance of datacenter mappings. See DCMappings for details. +type DCMapping struct { + Source []string `json:"source"` + IgnoreSource []string `json:"ignore_source"` + Target []string `json:"target"` + IgnoreTarget []string `json:"ignore_target"` +} + +func (mappings DCMappings) calculateMappings() (targetMap map[string][]string, ignoreSource, ignoreTarget []string) { + targetMap = map[string][]string{} + for _, mapping := range mappings { + ignoreSource = append(ignoreSource, mapping.IgnoreSource...) + ignoreTarget = append(ignoreTarget, mapping.IgnoreTarget...) + + if len(mapping.Source) == 0 || len(mapping.Target) == 0 { + continue + } + tIdx, sIdx := 0, 0 + for { + target, source := mapping.Target[tIdx], mapping.Source[sIdx] + targetMap[target] = append(targetMap[target], source) + if tIdx == len(mapping.Target)-1 && sIdx == len(mapping.Source)-1 { + break + } + if tIdx < len(mapping.Target)-1 { + tIdx++ + } + if sIdx < len(mapping.Source)-1 { + sIdx++ + } + } + } + return targetMap, ignoreSource, ignoreTarget +} diff --git a/pkg/service/restore/model_test.go b/pkg/service/restore/model_test.go new file mode 100644 index 000000000..7d304b0cd --- /dev/null +++ b/pkg/service/restore/model_test.go @@ -0,0 +1,167 @@ +// Copyright (C) 2025 ScyllaDB +package restore + +import ( + "maps" + "slices" + "testing" +) + +func TestCalculateMappings(t *testing.T) { + testCases := []struct { + name string + + mappings DCMappings + expectedTargetMap map[string][]string + expectedIgnoreSource []string + expectedIgnoreTarget []string + }{ + { + name: "dc1=>dc2", + mappings: []DCMapping{ + { + Source: []string{"dc1"}, + Target: []string{"dc2"}, + }, + }, + expectedTargetMap: map[string][]string{ + "dc2": {"dc1"}, + }, + }, + { + name: "dc1=>dc1,dc2", + mappings: []DCMapping{ + { + Source: []string{"dc1"}, + Target: []string{"dc1", "dc2"}, + }, + }, + expectedTargetMap: map[string][]string{ + "dc1": {"dc1"}, + "dc2": {"dc1"}, + }, + }, + { + name: "dc1,dc2=>dc3", + mappings: []DCMapping{ + { + Source: []string{"dc1", "dc2"}, + Target: []string{"dc3"}, + }, + }, + expectedTargetMap: map[string][]string{ + "dc3": {"dc1", "dc2"}, + }, + }, + { + name: "dc1,dc2=>dc2", + mappings: []DCMapping{ + { + Source: []string{"dc1", "dc2"}, + Target: []string{"dc2"}, + }, + }, + expectedTargetMap: map[string][]string{ + "dc2": {"dc1", "dc2"}, + }, + }, + { + name: "empty Source", + mappings: []DCMapping{ + { + Source: []string{}, + Target: []string{"dc2"}, + }, + }, + expectedTargetMap: map[string][]string{}, + }, + { + name: "empty Target", + mappings: []DCMapping{ + { + Source: []string{"dc1"}, + Target: []string{}, + }, + }, + expectedTargetMap: map[string][]string{}, + }, + { + name: "dc1,dc2,dc3=>dc1,dc2", + mappings: []DCMapping{ + { + Source: []string{"dc1", "dc2", "dc3"}, + Target: []string{"dc1", "dc2"}, + }, + }, + expectedTargetMap: map[string][]string{ + "dc1": {"dc1"}, + "dc2": {"dc2", "dc3"}, + }, + }, + { + name: "dc1,dc2=>dc1,dc2;dc2=>dc3", + mappings: []DCMapping{ + { + Source: []string{"dc1", "dc2"}, + Target: []string{"dc1", "dc2"}, + }, + { + Source: []string{"dc2"}, + Target: []string{"dc3"}, + }, + }, + expectedTargetMap: map[string][]string{ + "dc1": {"dc1"}, + "dc2": {"dc2"}, + "dc3": {"dc2"}, + }, + }, + { + name: "dc1,!dc2=>dc1,dc2", + mappings: []DCMapping{ + { + Source: []string{"dc1"}, + Target: []string{"dc1", "dc2"}, + IgnoreSource: []string{"dc2"}, + }, + }, + expectedTargetMap: map[string][]string{ + "dc1": {"dc1"}, + "dc2": {"dc1"}, + }, + expectedIgnoreSource: []string{"dc2"}, + }, + { + name: "dc1,dc2=>dc1,!dc2", + mappings: []DCMapping{ + { + Source: []string{"dc1", "dc2"}, + Target: []string{"dc1"}, + IgnoreTarget: []string{"dc2"}, + }, + }, + expectedTargetMap: map[string][]string{ + "dc1": {"dc1", "dc2"}, + }, + expectedIgnoreTarget: []string{"dc2"}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + targetMap, ignoreSource, ignoreTarget := tc.mappings.calculateMappings() + + if !maps.EqualFunc(targetMap, tc.expectedTargetMap, slices.Equal) { + t.Fatalf("Expected %v, but got %v", tc.expectedTargetMap, targetMap) + } + + if !slices.Equal(ignoreSource, tc.expectedIgnoreSource) { + t.Fatalf("Expected %v, but got %v\n", tc.expectedIgnoreSource, ignoreSource) + } + + if !slices.Equal(ignoreTarget, tc.expectedIgnoreTarget) { + t.Fatalf("Expected %v, but got %v\n", tc.expectedIgnoreTarget, ignoreTarget) + } + }) + } +} diff --git a/pkg/service/restore/restore_integration_test.go b/pkg/service/restore/restore_integration_test.go index 90e584c4b..3ebcb4605 100644 --- a/pkg/service/restore/restore_integration_test.go +++ b/pkg/service/restore/restore_integration_test.go @@ -59,6 +59,10 @@ func TestRestoreTablesUserIntegration(t *testing.T) { "location": loc, "snapshot_tag": tag, "restore_tables": true, + // DC Mapping is required + "dc-mapping": []map[string][]string{ + {"source": {"dc1"}, "target": {"dc1", "dc2"}}, + }, }) Print("Log in via restored user and check permissions") @@ -106,6 +110,10 @@ func TestRestoreTablesNoReplicationIntegration(t *testing.T) { "keyspace": ksFilter, "snapshot_tag": tag, "restore_tables": true, + // DC Mapping is required + "dc-mapping": []map[string][]string{ + {"source": {"dc1"}, "target": {"dc1", "dc2"}}, + }, }) h.validateIdenticalTables(t, []table{{ks: ks, tab: tab}}) @@ -296,6 +304,10 @@ func TestRestoreSchemaDropAddColumnIntegration(t *testing.T) { "keyspace": ksFilter, "snapshot_tag": tag, "restore_tables": true, + // DC Mapping is required + "dc-mapping": []map[string][]string{ + {"source": {"dc1"}, "target": {"dc1", "dc2"}}, + }, }) h.validateIdenticalTables(t, []table{{ks: ks, tab: tab}}) @@ -354,6 +366,10 @@ func TestRestoreTablesVnodeToTabletsIntegration(t *testing.T) { "keyspace": ksFilter, "snapshot_tag": tag, "restore_tables": true, + // DC Mapping is required + "dc-mapping": []map[string][]string{ + {"source": {"dc1"}, "target": {"dc1", "dc2"}}, + }, }) validateTableContent[int, int](t, h.srcCluster.rootSession, h.dstCluster.rootSession, ks, tab, c1, c2) @@ -455,6 +471,10 @@ func TestRestoreTablesPausedIntegration(t *testing.T) { "keyspace": []string{ks1, ks2}, "snapshot_tag": tag, "restore_tables": true, + // DC Mapping is required + "dc-mapping": []map[string][]string{ + {"source": {"dc1"}, "target": {"dc1", "dc2"}}, + }, } err = runPausedRestore(t, func(ctx context.Context) error { h.dstCluster.RunID = uuid.NewTime() @@ -625,6 +645,10 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { "rate_limit": []string{"0"}, "unpin_agent_cpu": true, "restore_tables": true, + // DC Mapping is required + "dc-mapping": []map[string][]string{ + {"source": {"dc1", "dc2"}, "target": {"dc1"}}, + }, }) if err != nil { finishedRestore <- err @@ -772,6 +796,10 @@ func TestRestoreTablesBatchRetryIntegration(t *testing.T) { "keyspace": ksFilter, "snapshot_tag": tag, "restore_tables": true, + // DC Mapping is required + "dc-mapping": []map[string][]string{ + {"source": {"dc1"}, "target": {"dc1", "dc2"}}, + }, } t.Run("batch retry finished with success", func(t *testing.T) { @@ -971,6 +999,10 @@ func TestRestoreTablesMultiLocationIntegration(t *testing.T) { "parallel": 1, "snapshot_tag": tag, "restore_tables": true, + // DC Mapping is required, because dcs are reversed :D + "dc-mapping": []map[string][]string{ + {"source": {"dc1", "dc2"}, "target": {"dc2", "dc1"}}, + }, }) close(res) }() @@ -989,3 +1021,161 @@ func TestRestoreTablesMultiLocationIntegration(t *testing.T) { t.Fatalf("tables have different contents\nsrc:\n%v\ndst:\n%v", srcM, dstM) } } + +func TestRestoreTablesIntoClusterWithAnotherDCNameIntegration(t *testing.T) { + h := newTestHelper(t, ManagedSecondClusterHosts(), ManagedThirdClusterHosts()) + + Print("Keyspace setup") + // Source cluster + ksStmt := "CREATE KEYSPACE %q WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1}" + ks := randomizedName("multi_location_") + ExecStmt(t, h.srcCluster.rootSession, fmt.Sprintf(ksStmt, ks)) + + // Target cluster + ksStmtDst := "CREATE KEYSPACE %q WITH replication = {'class': 'NetworkTopologyStrategy', 'dc3': 1}" + ExecStmt(t, h.dstCluster.rootSession, fmt.Sprintf(ksStmtDst, ks)) + + Print("Table setup") + tabStmt := "CREATE TABLE %q.%q (id int PRIMARY KEY, data int)" + tab := randomizedName("tab_") + ExecStmt(t, h.srcCluster.rootSession, fmt.Sprintf(tabStmt, ks, tab)) + ExecStmt(t, h.dstCluster.rootSession, fmt.Sprintf(tabStmt, ks, tab)) + + Print("Fill setup") + fillTable(t, h.srcCluster.rootSession, 100, ks, tab) + + Print("Save filled table into map") + srcM := selectTableAsMap[int, int](t, h.srcCluster.rootSession, ks, tab, "id", "data") + + Print("Run backup") + loc := []Location{ + testLocation("one-location-1", ""), + } + S3InitBucket(t, loc[0].Path) + ksFilter := []string{ks} + tag := h.runBackup(t, map[string]any{ + "location": loc, + "keyspace": ksFilter, + "batch_size": 100, + }) + + Print("Run restore") + grantRestoreTablesPermissions(t, h.dstCluster.rootSession, ksFilter, h.dstUser) + res := make(chan struct{}) + go func() { + h.runRestore(t, map[string]any{ + "location": loc, + "keyspace": ksFilter, + // Test if batching does not hang with + // limited parallel and location access. + "parallel": 1, + "snapshot_tag": tag, + "restore_tables": true, + // DC Mapping is required + "dc-mapping": []map[string][]string{ + {"source": {"dc1"}, "target": {"dc3"}}, + }, + }) + close(res) + }() + + select { + case <-res: + case <-time.NewTimer(2 * time.Minute).C: + t.Fatal("Restore hanged") + } + + Print("Save restored table into map") + dstM := selectTableAsMap[int, int](t, h.dstCluster.rootSession, ks, tab, "id", "data") + + Print("Validate success") + if !maps.Equal(srcM, dstM) { + t.Fatalf("tables have different contents\nsrc:\n%v\ndst:\n%v", srcM, dstM) + } +} + +func TestRestoreOnlyOneDCFromLocation(t *testing.T) { + h := newTestHelper(t, ManagedClusterHosts(), ManagedThirdClusterHosts()) + + Print("Keyspace setup") + // Source cluster + ksStmt := "CREATE KEYSPACE %q WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1, 'dc2': 1}" + ksTwoDC := randomizedName("two_dc_") + ExecStmt(t, h.srcCluster.rootSession, fmt.Sprintf(ksStmt, ksTwoDC)) + + // Keyspace thats only available in dc2 + ksStmtOneDC := "CREATE KEYSPACE %q WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1':0, 'dc2': 1}" + ksOneDC := randomizedName("one_dc_") + ExecStmt(t, h.srcCluster.rootSession, fmt.Sprintf(ksStmtOneDC, ksOneDC)) + + // Target cluster + ksStmtDst := "CREATE KEYSPACE %q WITH replication = {'class': 'NetworkTopologyStrategy', 'dc3': 1}" + ExecStmt(t, h.dstCluster.rootSession, fmt.Sprintf(ksStmtDst, ksTwoDC)) + ExecStmt(t, h.dstCluster.rootSession, fmt.Sprintf(ksStmtDst, ksOneDC)) + + Print("Table setup") + tabStmt := "CREATE TABLE %q.%q (id int PRIMARY KEY, data int)" + tab := randomizedName("tab_") + for _, ks := range []string{ksTwoDC, ksOneDC} { + ExecStmt(t, h.srcCluster.rootSession, fmt.Sprintf(tabStmt, ks, tab)) + ExecStmt(t, h.dstCluster.rootSession, fmt.Sprintf(tabStmt, ks, tab)) + } + + Print("Fill setup") + for _, ks := range []string{ksTwoDC, ksOneDC} { + fillTable(t, h.srcCluster.rootSession, 100, ks, tab) + } + + Print("Save filled table into map") + srcMTwoDC := selectTableAsMap[int, int](t, h.srcCluster.rootSession, ksTwoDC, tab, "id", "data") + + Print("Run backup") + loc := []Location{ + testLocation("one-location-1", ""), + } + S3InitBucket(t, loc[0].Path) + ksFilter := []string{ksTwoDC, ksOneDC} + tag := h.runBackup(t, map[string]any{ + "location": loc, + "keyspace": ksFilter, + "batch_size": 100, + }) + + Print("Run restore") + grantRestoreTablesPermissions(t, h.dstCluster.rootSession, ksFilter, h.dstUser) + res := make(chan struct{}) + go func() { + h.runRestore(t, map[string]any{ + "location": loc, + "keyspace": ksFilter, + // Test if batching does not hang with + // limited parallel and location access. + "parallel": 1, + "snapshot_tag": tag, + "restore_tables": true, + // DC Mapping is required + "dc-mapping": []map[string][]string{ + {"source": {"dc1", "!dc2"}, "target": {"dc3"}}, + }, + }) + close(res) + }() + + select { + case <-res: + case <-time.NewTimer(2 * time.Minute).C: + t.Fatal("Restore hanged") + } + + Print("Save restored table into map") + dstMTwoDC := selectTableAsMap[int, int](t, h.dstCluster.rootSession, ksTwoDC, tab, "id", "data") + dstMOneDC := selectTableAsMap[int, int](t, h.dstCluster.rootSession, ksOneDC, tab, "id", "data") + + Print("Validate success") + if !maps.Equal(srcMTwoDC, dstMTwoDC) { + t.Fatalf("tables have different contents\nsrc:\n%v\ndst:\n%v", srcMTwoDC, dstMTwoDC) + } + if len(dstMOneDC) != 0 { + t.Fatalf("dc2 shouldn't be restored") + } +} diff --git a/pkg/service/restore/service_restore_integration_test.go b/pkg/service/restore/service_restore_integration_test.go index 9fea258a5..22ac72bfe 100644 --- a/pkg/service/restore/service_restore_integration_test.go +++ b/pkg/service/restore/service_restore_integration_test.go @@ -553,6 +553,9 @@ func TestRestoreTablesSmokeIntegration(t *testing.T) { BatchSize: testBatchSize, Parallel: testParallel, RestoreTables: true, + DCMappings: DCMappings{ + {Source: []string{"dc1"}, Target: []string{"dc1", "dc2"}}, + }, } smokeRestore(t, target, testKeyspace, testLoadCnt, testLoadSize, testUser, "{'class': 'NetworkTopologyStrategy', 'dc1': 2}") @@ -618,7 +621,7 @@ func smokeRestore(t *testing.T, target Target, keyspace string, loadCnt, loadSiz Print("When: restore backup on different cluster = (dc1: 3 nodes, dc2: 3 nodes)") if err := dstH.service.Restore(ctx, dstH.ClusterID, dstH.TaskID, dstH.RunID, dstH.targetToProperties(target)); err != nil { - t.Fatal(err) + t.Fatal("Restore:", err) } dstH.validateRestoreSuccess(dstSession, srcSession, target, []table{{ks: keyspace, tab: BigTableName}}) @@ -645,6 +648,9 @@ func TestRestoreTablesRestartAgentsIntegration(t *testing.T) { BatchSize: testBatchSize, Parallel: testParallel, RestoreTables: true, + DCMappings: DCMappings{ + {Source: []string{"dc1"}, Target: []string{"dc1", "dc2"}}, + }, } restoreWithAgentRestart(t, target, testKeyspace, testLoadCnt, testLoadSize, testUser) @@ -722,6 +728,9 @@ func TestRestoreTablesResumeIntegration(t *testing.T) { Parallel: testParallel, RestoreTables: true, Continue: true, + DCMappings: DCMappings{ + {Source: []string{"dc1"}, Target: []string{"dc1", "dc2"}}, + }, } restoreWithResume(t, target, testKeyspace, testLoadCnt, testLoadSize, testUser) @@ -748,6 +757,9 @@ func TestRestoreTablesResumeContinueFalseIntegration(t *testing.T) { BatchSize: testBatchSize, Parallel: testParallel, RestoreTables: true, + DCMappings: DCMappings{ + {Source: []string{"dc1"}, Target: []string{"dc1", "dc2"}}, + }, } restoreWithResume(t, target, testKeyspace, testLoadCnt, testLoadSize, testUser) @@ -911,6 +923,9 @@ func TestRestoreTablesVersionedIntegration(t *testing.T) { BatchSize: testBatchSize, Parallel: testParallel, RestoreTables: true, + DCMappings: DCMappings{ + {Source: []string{"dc1"}, Target: []string{"dc1", "dc2"}}, + }, } restoreWithVersions(t, target, testKeyspace, testLoadCnt, testLoadSize, corruptCnt, testUser) @@ -1185,6 +1200,9 @@ func TestRestoreTablesViewCQLSchemaIntegration(t *testing.T) { BatchSize: testBatchSize, Parallel: testParallel, RestoreTables: true, + DCMappings: DCMappings{ + {Source: []string{"dc1"}, Target: []string{"dc1", "dc2"}}, + }, } restoreViewCQLSchema(t, target, testKeyspace, testLoadCnt, testLoadSize, testUser) @@ -1269,6 +1287,9 @@ func TestRestoreFullViewSSTableSchemaIntegration(t *testing.T) { BatchSize: testBatchSize, Parallel: testParallel, RestoreTables: true, + DCMappings: DCMappings{ + {Source: []string{"dc1"}, Target: []string{"dc1", "dc2"}}, + }, } restoreViewSSTableSchema(t, schemaTarget, tablesTarget, testKeyspace, testLoadCnt, testLoadSize, testUser) @@ -1356,6 +1377,9 @@ func TestRestoreFullIntegration(t *testing.T) { BatchSize: testBatchSize, Parallel: testParallel, RestoreTables: true, + DCMappings: DCMappings{ + {Source: []string{"dc1"}, Target: []string{"dc1", "dc2"}}, + }, } restoreAllTables(t, schemaTarget, tablesTarget, testKeyspace, testLoadCnt, testLoadSize, testUser) @@ -1462,6 +1486,9 @@ func TestRestoreFullAlternatorIntegration(t *testing.T) { BatchSize: testBatchSize, Parallel: testParallel, RestoreTables: true, + DCMappings: DCMappings{ + {Source: []string{"dc1"}, Target: []string{"dc1", "dc2"}}, + }, } restoreAlternator(t, schemaTarget, tablesTarget, testKeyspace, testTable, testUser, testAlternatorPort) diff --git a/pkg/service/restore/tables_worker.go b/pkg/service/restore/tables_worker.go index 91c87c73f..b3af87eb3 100644 --- a/pkg/service/restore/tables_worker.go +++ b/pkg/service/restore/tables_worker.go @@ -213,7 +213,7 @@ func (w *tablesWorker) stageRestoreData(ctx context.Context) error { } } - bd := newBatchDispatcher(workload, w.target.BatchSize, w.hostShardCnt, w.target.locationHosts) + bd := newBatchDispatcher(workload, w.target.BatchSize, w.hostShardCnt, w.target.locationHosts, w.target.hostDCs) f := func(n int) error { host := w.hosts[n] diff --git a/pkg/service/restore/worker.go b/pkg/service/restore/worker.go index c6a42d1b3..d2b50bb7e 100644 --- a/pkg/service/restore/worker.go +++ b/pkg/service/restore/worker.go @@ -6,6 +6,7 @@ import ( "context" "encoding/json" "fmt" + "maps" "math/rand" "path" "regexp" @@ -96,20 +97,19 @@ func (w *worker) initTarget(ctx context.Context, properties json.RawMessage) err return errors.Wrap(err, "get status") } + targetMap, ignoreSource, _ := t.DCMappings.calculateMappings() + t.hostDCs = w.applyDCMapping(status, targetMap) + w.logger.Info(ctx, "Applied dc mappings", "mappings", targetMap, "host_dcs", t.hostDCs) + + t.ignoredSourceDC = ignoreSource + // All nodes should be up during restore if err := w.client.VerifyNodesAvailability(ctx); err != nil { return errors.Wrap(err, "verify all nodes availability") } - allLocations := strset.New() locationHosts := make(map[Location][]string) for _, l := range t.Location { - p := l.RemotePath("") - if allLocations.Has(p) { - return errors.Errorf("location %s is specified multiple times", l) - } - allLocations.Add(p) - var ( remotePath = l.RemotePath("") locationStatus = status @@ -161,12 +161,96 @@ func (w *worker) initTarget(ctx context.Context, properties json.RawMessage) err } else { w.logger.Info(ctx, "Found schema file") } + return nil + } + + // The only way to have a list of dcs from the source cluster is to + // get them from backup locations. + sourceDC, err := w.collectDCFromLocation(ctx, t.locationHosts) + if err != nil { + return err + } + targetDC := slices.Collect(maps.Keys(dcMap)) + if err := w.validateDCMappings(t.DCMappings, sourceDC, targetDC); err != nil { + w.logger.Debug(ctx, + "Validate dc mapping", + "source_dc", sourceDC, + "target_dc", targetDC, + "mappings", t.DCMappings, + ) + return err } w.logger.Info(ctx, "Initialized target", "target", t) return nil } +// applyDCMapping applices given mappings to each node, mapping each host's datacenter in the target cluster to its corresponding +// datacenter(s) in the source cluster. +func (w *worker) applyDCMapping(status scyllaclient.NodeStatusInfoSlice, targetMap map[string][]string) map[string][]string { + hostDCs := map[string][]string{} + for _, n := range status { + hostDCs[n.Addr] = append(hostDCs[n.Addr], targetMap[n.Datacenter]...) + } + return hostDCs +} + +func (w *worker) collectDCFromLocation(ctx context.Context, locationHosts map[Location][]string) ([]string, error) { + sourceDCs := strset.New() + for loc, hosts := range locationHosts { + manifests, err := w.getManifestInfo(ctx, hosts[0], loc) + if err != nil { + return nil, errors.Wrap(err, "getManifestInfo") + } + if len(manifests) == 0 { + return nil, errors.Errorf("no snapshot with tag %s", w.run.SnapshotTag) + } + for _, m := range manifests { + sourceDCs.Add(m.DC) + } + } + return sourceDCs.List(), nil +} + +// validateDCMappings validates that every dc in source cluster has corresponding dc in target cluster +// taking into account dc mappings. +func (w *worker) validateDCMappings(mappings DCMappings, sourceDC, targetDC []string) error { + slices.Sort(sourceDC) + slices.Sort(targetDC) + + if len(mappings) == 0 { + if slices.Equal(sourceDC, targetDC) { + return nil + } + return errors.Errorf("Source DC(%s) != Target DC(%s)", sourceDC, targetDC) + } + targetMap, ignoreSource, ignoreTarget := mappings.calculateMappings() + mappedTargetDCs := strset.New() + // Make sure that each dc from target cluster has corresponding dc (or mapping) in source cluster + for _, dc := range targetDC { + if slices.Contains(ignoreTarget, dc) { + continue + } + if dcs, ok := targetMap[dc]; ok { + mappedTargetDCs.Add(dcs...) + continue + } + return errors.Errorf("Target DC(%s) doesn't have a match in the source cluster: %v", dc, sourceDC) + } + + sourceDCSet := strset.New(sourceDC...) + ignoreSourceSet := strset.New(ignoreSource...) + sourceDCSet = strset.Difference(sourceDCSet, ignoreSourceSet) + + // Check that every dc from source has been mapped to the target dc + if !sourceDCSet.IsEqual(mappedTargetDCs) { + return errors.Errorf( + "Source DCs(%v) doesn't have a match in the target cluster: %v", + strset.Difference(sourceDCSet, mappedTargetDCs), targetDC) + } + return nil +} + func skipRestorePatterns(ctx context.Context, client *scyllaclient.Client, session gocqlx.Session) ([]string, error) { keyspaces, err := client.KeyspacesByType(ctx) if err != nil { @@ -344,6 +428,10 @@ func (w *worker) initUnits(ctx context.Context) error { var foundManifest bool for _, l := range w.target.Location { manifestHandler := func(miwc ManifestInfoWithContent) error { + // For now dc mapping is applied only to restore tables + if w.target.RestoreTables && slices.Contains(w.target.ignoredSourceDC, miwc.DC) { + return nil + } foundManifest = true filesHandler := func(fm FilesMeta) { diff --git a/pkg/service/restore/worker_test.go b/pkg/service/restore/worker_test.go new file mode 100644 index 000000000..4b5bfd1af --- /dev/null +++ b/pkg/service/restore/worker_test.go @@ -0,0 +1,109 @@ +// Copyright (C) 2025 ScyllaDB +package restore + +import "testing" + +func TestValidateDCMappings(t *testing.T) { + testCases := []struct { + name string + sourceDC []string + targetDC []string + dcMappings DCMappings + + expectedErr bool + }{ + { + name: "sourceDC != targetDC, but with full mapping", + sourceDC: []string{"dc1"}, + targetDC: []string{"dc2"}, + dcMappings: []DCMapping{ + {Source: []string{"dc1"}, Target: []string{"dc2"}}, + }, + expectedErr: false, + }, + { + name: "source != target, but will full mapping, two dcs per cluster", + sourceDC: []string{"dc1", "dc2"}, + targetDC: []string{"dc3", "dc4"}, + dcMappings: []DCMapping{ + {Source: []string{"dc1", "dc2"}, Target: []string{"dc3", "dc4"}}, + }, + expectedErr: false, + }, + { + name: "sourceDCs == targetDCs, no mapping", + sourceDC: []string{"dc1"}, + targetDC: []string{"dc1"}, + expectedErr: false, + }, + { + name: "sourceDCs == targetDCs, no mapping, two dcs per cluster", + sourceDC: []string{"dc1", "dc2"}, + targetDC: []string{"dc1", "dc2"}, + expectedErr: false, + }, + { + name: "sourceDCs != targetDCs, no mapping", + sourceDC: []string{"dc1"}, + targetDC: []string{"dc2"}, + expectedErr: true, + }, + { + name: "sourceDCs != targetDCs, but with full mapping", + sourceDC: []string{"dc1", "dc2"}, + targetDC: []string{"dc2"}, + dcMappings: []DCMapping{ + {Source: []string{"dc1", "dc2"}, Target: []string{"dc2"}}, + }, + expectedErr: false, + }, + { + name: "sourceDCs != targetDCs, but with partial mapping", + sourceDC: []string{"dc1", "dc2"}, + targetDC: []string{"dc2"}, + dcMappings: []DCMapping{ + {Source: []string{"dc1"}, Target: []string{"dc2"}}, + }, + expectedErr: true, + }, + { + name: "sourceDCs != targetDCs, with deletion in mapping", + sourceDC: []string{"dc1", "dc2"}, + targetDC: []string{"dc2"}, + dcMappings: []DCMapping{ + { + Source: []string{"dc1"}, + Target: []string{"dc2"}, + IgnoreSource: []string{"dc2"}, + }, + }, + expectedErr: false, + }, + { + name: "sourceDCs != targetDCs, with deletion in mapping", + sourceDC: []string{"dc1"}, + targetDC: []string{"dc1", "dc2"}, + dcMappings: []DCMapping{ + { + Source: []string{"dc1"}, + Target: []string{"dc1"}, + IgnoreTarget: []string{"dc2"}, + }, + }, + expectedErr: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + w := &worker{} + err := w.validateDCMappings(tc.dcMappings, tc.sourceDC, tc.targetDC) + if tc.expectedErr && err == nil { + t.Fatalf("Expected err, but got nil") + } + if !tc.expectedErr && err != nil { + t.Fatalf("Unexpected err: %v", err) + } + }) + } +} diff --git a/pkg/testutils/testconfig/testconfig.go b/pkg/testutils/testconfig/testconfig.go index 81542779e..671963c22 100644 --- a/pkg/testutils/testconfig/testconfig.go +++ b/pkg/testutils/testconfig/testconfig.go @@ -30,6 +30,7 @@ var ( flagManagedCluster = flag.String("managed-cluster", "127.0.0.1", "a comma-separated list of host:port tuples of data cluster hosts") flagManagedSecondCluster = flag.String("managed-second-cluster", "127.0.0.1", "a comma-separated list of host:port tuples of data second cluster hosts") + flagManagedThirdCluster = flag.String("managed-third-cluster", "127.0.0.1", "a comma-separated list of host:port tuples of data third cluster hosts") flagTestNet = flag.String("test-network", "192.168.200.", "a network where test nodes are residing") ) @@ -69,6 +70,14 @@ func ManagedSecondClusterHosts() []string { return strings.Split(*flagManagedSecondCluster, ",") } +// ManagedThirdClusterHosts specifies addresses of nodes in a test second cluster. +func ManagedThirdClusterHosts() []string { + if !flag.Parsed() { + flag.Parse() + } + return strings.Split(*flagManagedThirdCluster, ",") +} + // ManagedClusterCredentials returns CQL username and password. func ManagedClusterCredentials() (user, password string) { if !flag.Parsed() { diff --git a/testing/Makefile b/testing/Makefile index fadbf8558..f33034f82 100644 --- a/testing/Makefile +++ b/testing/Makefile @@ -5,6 +5,7 @@ CQLSH := $(COMPOSE) exec scylla-manager-db cqlsh CQLSH_NODE := $(COMPOSE) exec -T dc1_node_1 cqlsh NODETOOL := $(COMPOSE) exec -T dc1_node_1 nodetool SECOND_NODETOOL := $(COMPOSE) exec -T second_cluster_dc1_node_1 nodetool +THIRD_NODETOOL := $(COMPOSE) exec -T third_cluster_dc3_node_1 nodetool SM_NODETOOL := $(COMPOSE) exec -T scylla-manager-db nodetool YQ := ../bin/yq CURRENT_UID := $(shell id -u) @@ -33,9 +34,11 @@ endif SCYLLA_ARGS := --smp 2 --memory 2G --seeds $(SECOND_NET)11 SCYLLA_SECOND_CLUSTER_ARGS := --smp 2 --memory 2G --seeds $(SECOND_NET)31 +SCYLLA_THIRD_CLUSTER_ARGS := --smp 1 --memory 2G --seeds $(SECOND_NET)41 export SCYLLA_ARGS export SCYLLA_SECOND_CLUSTER_ARGS +export SCYLLA_THIRD_CLUSTER_ARGS export MINIO_ENDPOINT export MINIO_CERT_DIR @@ -66,7 +69,7 @@ up: ifeq ($(SSL_ENABLED),true) # disable non-ssl port - @$(YQ) delete -i scylla/scylla.yaml 'native_transport_port' + # @$(YQ) delete -i scylla/scylla.yaml 'native_transport_port' # merge into scylla.yaml values from config/scylla-ssl.yaml with overwrite option (-x) @$(YQ) merge -i -x scylla/scylla.yaml scylla/config/scylla-ssl.yaml @cp scylla/config/cqlshrc-ssl scylla/cqlshrc @@ -89,10 +92,13 @@ endif @cp scylla/scylla.yaml scylla/scylla-second-cluster.yaml @$(YQ) write -i scylla/scylla-second-cluster.yaml 'cluster_name' 'Managed Other Cluster' + @cp scylla/scylla.yaml scylla/scylla-third-cluster.yaml + @$(YQ) write -i scylla/scylla-third-cluster.yaml 'cluster_name' 'Managed Third Cluster' + @echo "==> Starting containers" mkdir -p $(MINIO_DATA_DIR) - @. ./.env && CURRENT_UID=$(CURRENT_UID) CURRENT_GID=$(CURRENT_GID) $(COMPOSE) -f docker-compose.yaml -f $(COMPOSE_FILE) up -d dc1_node_1 second_cluster_dc1_node_1 + @. ./.env && CURRENT_UID=$(CURRENT_UID) CURRENT_GID=$(CURRENT_GID) $(COMPOSE) -f docker-compose.yaml -f $(COMPOSE_FILE) up -d dc1_node_1 second_cluster_dc1_node_1 third_cluster_dc3_node_1 $(COMPOSE) exec -T --privileged dc1_node_1 sudo bash -c 'echo "fs.aio-max-nr = 1048579" > /etc/sysctl.d/50-scylla.conf' $(COMPOSE) exec -T --privileged dc1_node_1 sudo sysctl -p /etc/sysctl.d/50-scylla.conf @echo "==> Waiting for dc1 node1" @@ -104,13 +110,16 @@ endif @until [ 1 -le $$($(SECOND_NODETOOL) status | grep -c "UN") ]; do echo -n "."; sleep 2; done ; echo "" @. ./.env && CURRENT_UID=$(CURRENT_UID) CURRENT_GID=$(CURRENT_GID) $(COMPOSE) -f docker-compose.yaml -f $(COMPOSE_FILE) up -d second_cluster_dc1_node_2 + @until [ 1 -le $$($(THIRD_NODETOOL) status | grep -c "UN") ]; do echo -n "."; sleep 2; done ; echo "" + @. ./.env && CURRENT_UID=$(CURRENT_UID) CURRENT_GID=$(CURRENT_GID) $(COMPOSE) -f docker-compose.yaml -f $(COMPOSE_FILE) up -d third_cluster_dc3_node_2 + @cnt=1; for node in dc1_node_2 dc1_node_3 dc2_node_1 dc2_node_2 dc2_node_3; do \ . ./.env && CURRENT_UID=$(CURRENT_UID) CURRENT_GID=$(CURRENT_GID) $(COMPOSE) -f docker-compose.yaml -f $(COMPOSE_FILE) up -d $$node; \ cnt=$$(expr $$cnt + 1); \ echo "==> Waiting node $$node number $$cnt"; \ until [ $$cnt -le $$($(NODETOOL) status | grep -c "UN") ]; do echo -n "."; sleep 2; done ; echo ""; done - @for node in dc1_node_1 dc1_node_2 dc1_node_3 dc2_node_1 dc2_node_2 dc2_node_3 second_cluster_dc1_node_1 second_cluster_dc1_node_2; do \ + @for node in dc1_node_1 dc1_node_2 dc1_node_3 dc2_node_1 dc2_node_2 dc2_node_3 second_cluster_dc1_node_1 second_cluster_dc1_node_2 third_cluster_dc3_node_1 third_cluster_dc3_node_2; do \ $(COMPOSE) exec -T --privileged $$node sudo bash -c 'service ssh start'; done @. ./.env && CURRENT_UID=$(CURRENT_UID) CURRENT_GID=$(CURRENT_GID) $(COMPOSE) -f docker-compose.yaml -f $(COMPOSE_FILE) up -d diff --git a/testing/docker-compose-ipv4.yaml b/testing/docker-compose-ipv4.yaml index 5b294d700..d7f4c587e 100644 --- a/testing/docker-compose-ipv4.yaml +++ b/testing/docker-compose-ipv4.yaml @@ -63,6 +63,22 @@ services: second: ipv4_address: 192.168.200.32 + third_cluster_dc3_node_1: + command: ${SCYLLA_THIRD_CLUSTER_ARGS} --rpc-address 192.168.100.41 --alternator-address 192.168.200.41 --listen-address 192.168.200.41 + networks: + public: + ipv4_address: 192.168.100.41 + second: + ipv4_address: 192.168.200.41 + + third_cluster_dc3_node_2: + command: ${SCYLLA_THIRD_CLUSTER_ARGS} --rpc-address 192.168.100.42 --alternator-address 192.168.200.42 --listen-address 192.168.200.42 + networks: + public: + ipv4_address: 192.168.100.42 + second: + ipv4_address: 192.168.200.42 + scylla-manager-db: command: --smp 1 --memory 500M --api-address 0.0.0.0 --seeds 192.168.200.100 --rpc-address 192.168.200.100 --alternator-address 192.168.200.100 --listen-address 192.168.200.100 networks: diff --git a/testing/docker-compose-ipv6.yaml b/testing/docker-compose-ipv6.yaml index 863b1b957..77fc1d81f 100644 --- a/testing/docker-compose-ipv6.yaml +++ b/testing/docker-compose-ipv6.yaml @@ -63,6 +63,22 @@ services: second: ipv6_address: 2001:0DB9:200::32 + third_cluster_dc3_node_1: + command: ${SCYLLA_THIRD_CLUSTER_ARGS} --rpc-address 2001:0DB9:100::41 --alternator-address 2001:0DB9:200::41 --listen-address 2001:0DB9:200::41 + networks: + public: + ipv6_address: 2001:0DB9:100::41 + second: + ipv6_address: 2001:0DB9:200::41 + + third_cluster_dc3_node_2: + command: ${SCYLLA_THIRD_CLUSTER_ARGS} --rpc-address 2001:0DB9:100::42 --alternator-address 2001:0DB9:200::42 --listen-address 2001:0DB9:200::42 + networks: + public: + ipv6_address: 2001:0DB9:100::42 + second: + ipv6_address: 2001:0DB9:200::42 + scylla-manager-db: command: --smp 1 --memory 500M --api-address 0.0.0.0 --seeds 2001:0DB9:200::100 --rpc-address 2001:0DB9:200::100 --alternator-address 2001:0DB9:200::100 --listen-address 2001:0DB9:200::100 networks: diff --git a/testing/docker-compose.yaml b/testing/docker-compose.yaml index d4c9530c0..45d96ebeb 100644 --- a/testing/docker-compose.yaml +++ b/testing/docker-compose.yaml @@ -135,6 +135,40 @@ services: public: second: + third_cluster_dc3_node_1: + image: scylladb/agent-${SCYLLA_VERSION} + privileged: true + volumes: + - type: bind + source: ./scylla/cassandra-rackdc.3.properties + target: /etc/scylla/cassandra-rackdc.properties + - type: bind + source: ./scylla/scylla-third-cluster.yaml + target: /etc/scylla/scylla.yaml + - type: bind + source: ./scylla/certs/ + target: /etc/scylla/certs + networks: + public: + third: + + third_cluster_dc3_node_2: + image: scylladb/agent-${SCYLLA_VERSION} + privileged: true + volumes: + - type: bind + source: ./scylla/cassandra-rackdc.3.properties + target: /etc/scylla/cassandra-rackdc.properties + - type: bind + source: ./scylla/scylla-third-cluster.yaml + target: /etc/scylla/scylla.yaml + - type: bind + source: ./scylla/certs/ + target: /etc/scylla/certs + networks: + public: + third: + scylla-manager-db: image: scylladb/${SCYLLA_VERSION} ports: @@ -188,3 +222,7 @@ networks: driver: bridge ipam: driver: default + third: + driver: bridge + ipam: + driver: default diff --git a/testing/nodes_cp b/testing/nodes_cp index 5cf2f0c6c..b55c0ee7f 100755 --- a/testing/nodes_cp +++ b/testing/nodes_cp @@ -8,6 +8,6 @@ set -eu -o pipefail declare -r SRC_PATH=$1 declare -r DEST_PATH=$2 -for name in $(docker ps -f name=dc1_node -f name=dc2_node --format {{.Names}}); do +for name in $(docker ps -f name=dc1_node -f name=dc2_node -f name=dc3_node --format {{.Names}}); do docker cp ${SRC_PATH} ${name}:${DEST_PATH} done diff --git a/testing/nodes_exec b/testing/nodes_exec index 86afaaa22..b387b3aa5 100755 --- a/testing/nodes_exec +++ b/testing/nodes_exec @@ -14,11 +14,11 @@ case "$1" in ;; esac -for name in $(docker ps -f name=dc1_node -f name=dc2_node --format {{.Names}}); do +for name in $(docker ps -f name=dc1_node -f name=dc2_node -f name=dc3_node --format {{.Names}}); do if [[ ${SILENT} == 1 ]]; then docker exec ${name} bash -c "$*" > /dev/null else echo "> ${name}" docker exec ${name} bash -c "$*" fi -done \ No newline at end of file +done diff --git a/testing/scylla/cassandra-rackdc.3.properties b/testing/scylla/cassandra-rackdc.3.properties new file mode 100644 index 000000000..674d47621 --- /dev/null +++ b/testing/scylla/cassandra-rackdc.3.properties @@ -0,0 +1,3 @@ +dc=dc3 +rack=rack1 +prefer_local=true diff --git a/testing/scylla/config/scylla.yaml b/testing/scylla/config/scylla.yaml index cec07429a..6bc120a95 100644 --- a/testing/scylla/config/scylla.yaml +++ b/testing/scylla/config/scylla.yaml @@ -647,4 +647,4 @@ alternator_write_isolation: only_rmw_uses_lwt alternator_enforce_authorization: true enable_ipv6_dns_lookup: true -uuid_sstable_identifiers_enabled: false \ No newline at end of file +uuid_sstable_identifiers_enabled: false diff --git a/testing/scylla/scylla-third-cluster.yaml b/testing/scylla/scylla-third-cluster.yaml new file mode 100644 index 000000000..0fcf43ae9 --- /dev/null +++ b/testing/scylla/scylla-third-cluster.yaml @@ -0,0 +1,619 @@ +# Scylla storage config YAML + +####################################### +# This file is split to two sections: +# 1. Supported parameters +# 2. Unsupported parameters: reserved for future use or backwards +# compatibility. +# Scylla will only read and use the first segment +####################################### + +### Supported Parameters + +# The name of the cluster. This is mainly used to prevent machines in +# one logical cluster from joining another. +# It is recommended to change the default value when creating a new cluster. +# You can NOT modify this value for an existing cluster +cluster_name: Managed Third Cluster +# This defines the number of tokens randomly assigned to this node on the ring +# The more tokens, relative to other nodes, the larger the proportion of data +# that this node will store. You probably want all nodes to have the same number +# of tokens assuming they have equal hardware capability. +num_tokens: 256 +# Directory where Scylla should store all its files, which are commitlog, +# data, hints, view_hints and saved_caches subdirectories. All of these +# subs can be overridden by the respective options below. +# If unset, the value defaults to /var/lib/scylla +# workdir: /var/lib/scylla + +# Directory where Scylla should store data on disk. +data_file_directories: + - /var/lib/scylla/data +# commit log. when running on magnetic HDD, this should be a +# separate spindle than the data directories. +commitlog_directory: /var/lib/scylla/commitlog +# schema commit log. A special commitlog instance +# used for schema and system tables. +# When running on magnetic HDD, this should be a +# separate spindle than the data directories. +# schema_commitlog_directory: /var/lib/scylla/commitlog/schema + +# commitlog_sync may be either "periodic" or "batch." +# +# When in batch mode, Scylla won't ack writes until the commit log +# has been fsynced to disk. It will wait +# commitlog_sync_batch_window_in_ms milliseconds between fsyncs. +# This window should be kept short because the writer threads will +# be unable to do extra work while waiting. (You may need to increase +# concurrent_writes for the same reason.) +# +# commitlog_sync: batch +# commitlog_sync_batch_window_in_ms: 2 +# +# the other option is "periodic" where writes may be acked immediately +# and the CommitLog is simply synced every commitlog_sync_period_in_ms +# milliseconds. +commitlog_sync: periodic +commitlog_sync_period_in_ms: 10000 +# The size of the individual commitlog file segments. A commitlog +# segment may be archived, deleted, or recycled once all the data +# in it (potentially from each columnfamily in the system) has been +# flushed to sstables. +# +# The default size is 32, which is almost always fine, but if you are +# archiving commitlog segments (see commitlog_archiving.properties), +# then you probably want a finer granularity of archiving; 8 or 16 MB +# is reasonable. +commitlog_segment_size_in_mb: 32 +# The size of the individual schema commitlog file segments. +# +# The default size is 128, which is 4 times larger than the default +# size of the data commitlog. It's because the segment size puts +# a limit on the mutation size that can be written at once, and some +# schema mutation writes are much larger than average. +schema_commitlog_segment_size_in_mb: 128 +# seed_provider class_name is saved for future use. +# A seed address is mandatory. +seed_provider: + # The addresses of hosts that will serve as contact points for the joining node. + # It allows the node to discover the cluster ring topology on startup (when + # joining the cluster). + # Once the node has joined the cluster, the seed list has no function. + - class_name: org.apache.cassandra.locator.SimpleSeedProvider + parameters: + # In a new cluster, provide the address of the first node. + # In an existing cluster, specify the address of at least one existing node. + # If you specify addresses of more than one node, use a comma to separate them. + # For example: ",," + - seeds: "127.0.0.1" +# Address to bind to and tell other Scylla nodes to connect to. +# You _must_ change this if you want multiple nodes to be able to communicate! +# +# If you leave broadcast_address (below) empty, then setting listen_address +# to 0.0.0.0 is wrong as other nodes will not know how to reach this node. +# If you set broadcast_address, then you can set listen_address to 0.0.0.0. +listen_address: localhost +# Address to broadcast to other Scylla nodes +# Leaving this blank will set it to the same value as listen_address +# broadcast_address: 1.2.3.4 + +# When using multiple physical network interfaces, set this to true to listen on broadcast_address +# in addition to the listen_address, allowing nodes to communicate in both interfaces. +# Ignore this property if the network configuration automatically routes between the public and private networks such as EC2. +# +# listen_on_broadcast_address: false + +# port for the CQL native transport to listen for clients on +# For security reasons, you should not expose this port to the internet. Firewall it if needed. +# To disable the CQL native transport, remove this option and configure native_transport_port_ssl. +native_transport_port: 9042 +# Like native_transport_port, but clients are forwarded to specific shards, based on the +# client-side port numbers. +native_shard_aware_transport_port: 19042 +# Enabling native transport encryption in client_encryption_options allows you to either use +# encryption for the standard port or to use a dedicated, additional port along with the unencrypted +# standard native_transport_port. +# Enabling client encryption and keeping native_transport_port_ssl disabled will use encryption +# for native_transport_port. Setting native_transport_port_ssl to a different value +# from native_transport_port will use encryption for native_transport_port_ssl while +# keeping native_transport_port unencrypted. +#native_transport_port_ssl: 9142 + +# Like native_transport_port_ssl, but clients are forwarded to specific shards, based on the +# client-side port numbers. +#native_shard_aware_transport_port_ssl: 19142 + +# How long the coordinator should wait for read operations to complete +read_request_timeout_in_ms: 5000 +# How long the coordinator should wait for writes to complete +write_request_timeout_in_ms: 2000 +# how long a coordinator should continue to retry a CAS operation +# that contends with other proposals for the same row +cas_contention_timeout_in_ms: 1000 +# phi value that must be reached for a host to be marked down. +# most users should never need to adjust this. +# phi_convict_threshold: 8 + +# IEndpointSnitch. The snitch has two functions: +# - it teaches Scylla enough about your network topology to route +# requests efficiently +# - it allows Scylla to spread replicas around your cluster to avoid +# correlated failures. It does this by grouping machines into +# "datacenters" and "racks." Scylla will do its best not to have +# more than one replica on the same "rack" (which may not actually +# be a physical location) +# +# IF YOU CHANGE THE SNITCH AFTER DATA IS INSERTED INTO THE CLUSTER, +# YOU MUST RUN A FULL REPAIR, SINCE THE SNITCH AFFECTS WHERE REPLICAS +# ARE PLACED. +# +# Out of the box, Scylla provides +# - SimpleSnitch: +# Treats Strategy order as proximity. This can improve cache +# locality when disabling read repair. Only appropriate for +# single-datacenter deployments. +# - GossipingPropertyFileSnitch +# This should be your go-to snitch for production use. The rack +# and datacenter for the local node are defined in +# cassandra-rackdc.properties and propagated to other nodes via +# gossip. If cassandra-topology.properties exists, it is used as a +# fallback, allowing migration from the PropertyFileSnitch. +# - PropertyFileSnitch: +# Proximity is determined by rack and data center, which are +# explicitly configured in cassandra-topology.properties. +# - Ec2Snitch: +# Appropriate for EC2 deployments in a single Region. Loads Region +# and Availability Zone information from the EC2 API. The Region is +# treated as the datacenter, and the Availability Zone as the rack. +# Only private IPs are used, so this will not work across multiple +# Regions. +# - Ec2MultiRegionSnitch: +# Uses public IPs as broadcast_address to allow cross-region +# connectivity. (Thus, you should set seed addresses to the public +# IP as well.) You will need to open the storage_port or +# ssl_storage_port on the public IP firewall. (For intra-Region +# traffic, Scylla will switch to the private IP after +# establishing a connection.) +# - RackInferringSnitch: +# Proximity is determined by rack and data center, which are +# assumed to correspond to the 3rd and 2nd octet of each node's IP +# address, respectively. Unless this happens to match your +# deployment conventions, this is best used as an example of +# writing a custom Snitch class and is provided in that spirit. +# +# You can use a custom Snitch by setting this to the full class name +# of the snitch, which will be assumed to be on your classpath. +endpoint_snitch: GossipingPropertyFileSnitch +# The address or interface to bind the native transport server to. +# +# Set rpc_address OR rpc_interface, not both. Interfaces must correspond +# to a single address, IP aliasing is not supported. +# +# Leaving rpc_address blank has the same effect as on listen_address +# (i.e. it will be based on the configured hostname of the node). +# +# Note that unlike listen_address, you can specify 0.0.0.0, but you must also +# set broadcast_rpc_address to a value other than 0.0.0.0. +# +# For security reasons, you should not expose this port to the internet. Firewall it if needed. +# +# If you choose to specify the interface by name and the interface has an ipv4 and an ipv6 address +# you can specify which should be chosen using rpc_interface_prefer_ipv6. If false the first ipv4 +# address will be used. If true the first ipv6 address will be used. Defaults to false preferring +# ipv4. If there is only one address it will be selected regardless of ipv4/ipv6. +rpc_address: localhost +# rpc_interface: eth1 +# rpc_interface_prefer_ipv6: false + +# port for Thrift to listen for clients on +rpc_port: 9160 +# port for REST API server +api_port: 10000 +# IP for the REST API server +api_address: 127.0.0.1 +# Log WARN on any batch size exceeding this value. 128 kiB per batch by default. +# Caution should be taken on increasing the size of this threshold as it can lead to node instability. +batch_size_warn_threshold_in_kb: 128 +# Fail any multiple-partition batch exceeding this value. 1 MiB (8x warn threshold) by default. +batch_size_fail_threshold_in_kb: 1024 +# Authentication backend, identifying users +# Out of the box, Scylla provides org.apache.cassandra.auth.{AllowAllAuthenticator, +# PasswordAuthenticator}. +# +# - AllowAllAuthenticator performs no checks - set it to disable authentication. +# - PasswordAuthenticator relies on username/password pairs to authenticate +# users. It keeps usernames and hashed passwords in system_auth.credentials table. +# Please increase system_auth keyspace replication factor if you use this authenticator. +# - com.scylladb.auth.TransitionalAuthenticator requires username/password pair +# to authenticate in the same manner as PasswordAuthenticator, but improper credentials +# result in being logged in as an anonymous user. Use for upgrading clusters' auth. +authenticator: PasswordAuthenticator +# Authorization backend, implementing IAuthorizer; used to limit access/provide permissions +# Out of the box, Scylla provides org.apache.cassandra.auth.{AllowAllAuthorizer, +# CassandraAuthorizer}. +# +# - AllowAllAuthorizer allows any action to any user - set it to disable authorization. +# - CassandraAuthorizer stores permissions in system_auth.permissions table. Please +# increase system_auth keyspace replication factor if you use this authorizer. +# - com.scylladb.auth.TransitionalAuthorizer wraps around the CassandraAuthorizer, using it for +# authorizing permission management. Otherwise, it allows all. Use for upgrading +# clusters' auth. +authorizer: CassandraAuthorizer +# initial_token allows you to specify tokens manually. While you can use # it with +# vnodes (num_tokens > 1, above) -- in which case you should provide a +# comma-separated list -- it's primarily used when adding nodes # to legacy clusters +# that do not have vnodes enabled. +# initial_token: + +# RPC address to broadcast to drivers and other Scylla nodes. This cannot +# be set to 0.0.0.0. If left blank, this will be set to the value of +# rpc_address. If rpc_address is set to 0.0.0.0, broadcast_rpc_address must +# be set. +# broadcast_rpc_address: 1.2.3.4 + +# Uncomment to enable experimental features +# experimental_features: +# - udf +# - alternator-streams +# - broadcast-tables +# - keyspace-storage-options + +# The directory where hints files are stored if hinted handoff is enabled. +# hints_directory: /var/lib/scylla/hints + +# The directory where hints files are stored for materialized-view updates +# view_hints_directory: /var/lib/scylla/view_hints + +# See https://docs.scylladb.com/architecture/anti-entropy/hinted-handoff +# May either be "true" or "false" to enable globally, or contain a list +# of data centers to enable per-datacenter. +# hinted_handoff_enabled: DC1,DC2 +# hinted_handoff_enabled: true + +# this defines the maximum amount of time a dead host will have hints +# generated. After it has been dead this long, new hints for it will not be +# created until it has been seen alive and gone down again. +# max_hint_window_in_ms: 10800000 # 3 hours + +# Validity period for permissions cache (fetching permissions can be an +# expensive operation depending on the authorizer, CassandraAuthorizer is +# one example). Defaults to 10000, set to 0 to disable. +# Will be disabled automatically for AllowAllAuthorizer. +# permissions_validity_in_ms: 10000 + +# Refresh interval for permissions cache (if enabled). +# After this interval, cache entries become eligible for refresh. Upon next +# access, an async reload is scheduled and the old value returned until it +# completes. If permissions_validity_in_ms is non-zero, then this also must have +# a non-zero value. Defaults to 2000. It's recommended to set this value to +# be at least 3 times smaller than the permissions_validity_in_ms. +# permissions_update_interval_in_ms: 2000 + +# The partitioner is responsible for distributing groups of rows (by +# partition key) across nodes in the cluster. You should leave this +# alone for new clusters. The partitioner can NOT be changed without +# reloading all data, so when upgrading you should set this to the +# same partitioner you were already using. +# +# Murmur3Partitioner is currently the only supported partitioner, +# +partitioner: org.apache.cassandra.dht.Murmur3Partitioner +# Total space to use for commitlogs. +# +# If space gets above this value (it will round up to the next nearest +# segment multiple), Scylla will flush every dirty CF in the oldest +# segment and remove it. So a small total commitlog space will tend +# to cause more flush activity on less-active columnfamilies. +# +# A value of -1 (default) will automatically equate it to the total amount of memory +# available for Scylla. +commitlog_total_space_in_mb: -1 +# TCP port, for commands and data +# For security reasons, you should not expose this port to the internet. Firewall it if needed. +# storage_port: 7000 + +# SSL port, for encrypted communication. Unused unless enabled in +# encryption_options +# For security reasons, you should not expose this port to the internet. Firewall it if needed. +# ssl_storage_port: 7001 + +# listen_interface: eth0 +# listen_interface_prefer_ipv6: false + +# Whether to start the native transport server. +# Please note that the address on which the native transport is bound is the +# same as the rpc_address. The port however is different and specified below. +# start_native_transport: true + +# The maximum size of allowed frame. Frame (requests) larger than this will +# be rejected as invalid. The default is 256MB. +# native_transport_max_frame_size_in_mb: 256 + +# Whether to start the thrift rpc server. +# start_rpc: true + +# enable or disable keepalive on rpc/native connections +# rpc_keepalive: true + +# Set to true to have Scylla create a hard link to each sstable +# flushed or streamed locally in a backups/ subdirectory of the +# keyspace data. Removing these links is the operator's +# responsibility. +# incremental_backups: false + +# Whether or not to take a snapshot before each compaction. Be +# careful using this option, since Scylla won't clean up the +# snapshots for you. Mostly useful if you're paranoid when there +# is a data format change. +# snapshot_before_compaction: false + +# Whether or not a snapshot is taken of the data before keyspace truncation +# or dropping of column families. The STRONGLY advised default of true +# should be used to provide data safety. If you set this flag to false, you will +# lose data on truncation or drop. +auto_snapshot: false +# When executing a scan, within or across a partition, we need to keep the +# tombstones seen in memory so we can return them to the coordinator, which +# will use them to make sure other replicas also know about the deleted rows. +# With workloads that generate a lot of tombstones, this can cause performance +# problems and even exhaust the server heap. +# (http://www.datastax.com/dev/blog/cassandra-anti-patterns-queues-and-queue-like-datasets) +# Adjust the thresholds here if you understand the dangers and want to +# scan more tombstones anyway. These thresholds may also be adjusted at runtime +# using the StorageService mbean. +# tombstone_warn_threshold: 1000 +# tombstone_failure_threshold: 100000 + +# Granularity of the collation index of rows within a partition. +# Increase if your rows are large, or if you have a very large +# number of rows per partition. The competing goals are these: +# 1) a smaller granularity means more index entries are generated +# and looking up rows within the partition by collation column +# is faster +# 2) but, Scylla will keep the collation index in memory for hot +# rows (as part of the key cache), so a larger granularity means +# you can cache more hot rows +# column_index_size_in_kb: 64 + +# Auto-scaling of the promoted index prevents running out of memory +# when the promoted index grows too large (due to partitions with many rows +# vs. too small column_index_size_in_kb). When the serialized representation +# of the promoted index grows by this threshold, the desired block size +# for this partition (initialized to column_index_size_in_kb) +# is doubled, to decrease the sampling resolution by half. +# +# To disable promoted index auto-scaling, set the threshold to 0. +# column_index_auto_scale_threshold_in_kb: 10240 + +# Log a warning when writing partitions larger than this value +# compaction_large_partition_warning_threshold_mb: 1000 + +# Log a warning when writing rows larger than this value +# compaction_large_row_warning_threshold_mb: 10 + +# Log a warning when writing cells larger than this value +# compaction_large_cell_warning_threshold_mb: 1 + +# Log a warning when row number is larger than this value +# compaction_rows_count_warning_threshold: 100000 + +# Log a warning when writing a collection containing more elements than this value +# compaction_collection_elements_count_warning_threshold: 10000 + +# How long the coordinator should wait for seq or index scans to complete +# range_request_timeout_in_ms: 10000 +# How long the coordinator should wait for writes to complete +# counter_write_request_timeout_in_ms: 5000 +# How long a coordinator should continue to retry a CAS operation +# that contends with other proposals for the same row +# cas_contention_timeout_in_ms: 1000 +# How long the coordinator should wait for truncates to complete +# (This can be much longer, because unless auto_snapshot is disabled +# we need to flush first so we can snapshot before removing the data.) +# truncate_request_timeout_in_ms: 60000 +# The default timeout for other, miscellaneous operations +# request_timeout_in_ms: 10000 + +# Enable or disable inter-node encryption. +# You must also generate keys and provide the appropriate key and trust store locations and passwords. +# +# The available internode options are : all, none, dc, rack +# If set to dc scylla will encrypt the traffic between the DCs +# If set to rack scylla will encrypt the traffic between the racks +# +# SSL/TLS algorithm and ciphers used can be controlled by +# the priority_string parameter. Info on priority string +# syntax and values is available at: +# https://gnutls.org/manual/html_node/Priority-Strings.html +# +# The require_client_auth parameter allows you to +# restrict access to service based on certificate +# validation. Client must provide a certificate +# accepted by the used trust store to connect. +# +# server_encryption_options: +# internode_encryption: none +# certificate: conf/scylla.crt +# keyfile: conf/scylla.key +# truststore: +# certficate_revocation_list: +# require_client_auth: False +# priority_string: + +# enable or disable client/server encryption. +client_encryption_options: + enabled: true + certificate: /etc/scylla/db.crt + keyfile: /etc/scylla/db.key + truststore: /etc/scylla/ca.crt + require_client_auth: true +# truststore: +# require_client_auth: False +# priority_string: + +# internode_compression controls whether traffic between nodes is +# compressed. +# can be: all - all traffic is compressed +# dc - traffic between different datacenters is compressed +# none - nothing is compressed. +# internode_compression: none + +# Enable or disable tcp_nodelay for inter-dc communication. +# Disabling it will result in larger (but fewer) network packets being sent, +# reducing overhead from the TCP protocol itself, at the cost of increasing +# latency if you block for cross-datacenter responses. +# inter_dc_tcp_nodelay: false + +# Relaxation of environment checks. +# +# Scylla places certain requirements on its environment. If these requirements are +# not met, performance and reliability can be degraded. +# +# These requirements include: +# - A filesystem with good support for asynchronous I/O (AIO). Currently, +# this means XFS. +# +# false: strict environment checks are in place; do not start if they are not met. +# true: relaxed environment checks; performance and reliability may degraade. +# +# developer_mode: false + +# Idle-time background processing +# +# Scylla can perform certain jobs in the background while the system is otherwise idle, +# freeing processor resources when there is other work to be done. +# +# defragment_memory_on_idle: true +# +# prometheus port +# By default, Scylla opens prometheus API port on port 9180 +# setting the port to 0 will disable the prometheus API. +# prometheus_port: 9180 +# +# prometheus address +# Leaving this blank will set it to the same value as listen_address. +# This means that by default, Scylla listens to the prometheus API on the same +# listening address (and therefore network interface) used to listen for +# internal communication. If the monitoring node is not in this internal +# network, you can override prometheus_address explicitly - e.g., setting +# it to 0.0.0.0 to listen on all interfaces. +# prometheus_address: 1.2.3.4 + +# Distribution of data among cores (shards) within a node +# +# Scylla distributes data within a node among shards, using a round-robin +# strategy: +# [shard0] [shard1] ... [shardN-1] [shard0] [shard1] ... [shardN-1] ... +# +# Scylla versions 1.6 and below used just one repetition of the pattern; +# this interfered with data placement among nodes (vnodes). +# +# Scylla versions 1.7 and above use 4096 repetitions of the pattern; this +# provides for better data distribution. +# +# the value below is log (base 2) of the number of repetitions. +# +# Set to 0 to avoid rewriting all data when upgrading from Scylla 1.6 and +# below. +# +# Keep at 12 for new clusters. +murmur3_partitioner_ignore_msb_bits: 12 +# Bypass in-memory data cache (the row cache) when performing reversed queries. +# reversed_reads_auto_bypass_cache: false + +# Use a new optimized algorithm for performing reversed reads. +# Set to `false` to fall-back to the old algorithm. +# enable_optimized_reversed_reads: true + +# Use on a new, parallel algorithm for performing aggregate queries. +# Set to `false` to fall-back to the old algorithm. +# enable_parallelized_aggregation: true + +# When enabled, the node will start using separate commit log for schema changes +# right from the boot. Without this, it only happens following a restart after +# all nodes in the cluster were upgraded. +# +# Having this option ensures that new installations don't need a rolling restart +# to use the feature, but upgrades do. +# +# WARNING: It's unsafe to set this to false if the node previously booted +# with the schema commit log enabled. In such case, some schema changes +# may be lost if the node was not cleanly stopped. +force_schema_commit_log: true +# Time for which task manager task is kept in memory after it completes. +task_ttl_in_seconds: 10 +# Use Raft to consistently manage schema information in the cluster. +# Refer to https://docs.scylladb.com/master/architecture/raft.html for more details. +# The 'Handling Failures' section is especially important. +# +# Once enabled in a cluster, this cannot be turned off. +# If you want to bootstrap a new cluster without Raft, make sure to set this to `false` +# before starting your nodes for the first time. +# +# A cluster not using Raft can be 'upgraded' to use Raft. Refer to the aforementioned +# documentation, section 'Enabling Raft in ScyllaDB 5.2 and further', for the procedure. +consistent_cluster_management: true +# In materialized views, restrictions are allowed only on the view's primary key columns. +# In old versions Scylla mistakenly allowed IS NOT NULL restrictions on columns which were not part +# of the view's primary key. These invalid restrictions were ignored. +# This option controls the behavior when someone tries to create a view with such invalid IS NOT NULL restrictions. +# +# Can be true, false, or warn. +# * `true`: IS NOT NULL is allowed only on the view's primary key columns, +# trying to use it on other columns will cause an error, as it should. +# * `false`: Scylla accepts IS NOT NULL restrictions on regular columns, but they're silently ignored. +# It's useful for backwards compatibility. +# * `warn`: The same as false, but there's a warning about invalid view restrictions. +# +# To preserve backwards compatibility on old clusters, Scylla's default setting is `warn`. +# New clusters have this option set to `true` by scylla.yaml (which overrides the default `warn`) +# to make sure that trying to create an invalid view causes an error. +strict_is_not_null_in_views: true +# The Unix Domain Socket the node uses for maintenance socket. +# The possible options are: +# * ignore: the node will not open the maintenance socket, +# * workdir: the node will open the maintenance socket on the path /cql.m, +# where is a path defined by the workdir configuration option, +# * : the node will open the maintenance socket on the path . +maintenance_socket: ignore +# If set to true, configuration parameters defined with LiveUpdate option can be updated in runtime with CQL +# by updating system.config virtual table. If we don't want any configuration parameter to be changed in runtime +# via CQL, this option should be set to false. This parameter doesn't impose any limits on other mechanisms updating +# configuration parameters in runtime, e.g. sending SIGHUP or using API. This option should be set to false +# e.g. for cloud users, for whom scylla's configuration should be changed only by support engineers. +# live_updatable_config_params_changeable_via_cql: true + +# **************** +# * GUARDRAILS * +# **************** + +# Guardrails to warn or fail when Replication Factor is smaller/greater than the threshold. +# Please note that the value of 0 is always allowed, +# which means that having no replication at all, i.e. RF = 0, is always valid. +# A guardrail value smaller than 0, e.g. -1, means that the guardrail is disabled. +# Commenting out a guardrail also means it is disabled. +# minimum_replication_factor_fail_threshold: -1 +# minimum_replication_factor_warn_threshold: 3 +# maximum_replication_factor_warn_threshold: -1 +# maximum_replication_factor_fail_threshold: -1 + +# Guardrails to warn about or disallow creating a keyspace with specific replication strategy. +# Each of these 2 settings is a list storing replication strategies considered harmful. +# The replication strategies to choose from are: +# 1) SimpleStrategy, +# 2) NetworkTopologyStrategy, +# 3) LocalStrategy, +# 4) EverywhereStrategy +# +# replication_strategy_warn_list: +# - SimpleStrategy +# replication_strategy_fail_list: +api_ui_dir: /usr/lib/scylla/swagger-ui/dist/ +api_doc_dir: /usr/lib/scylla/api/api-doc/ +alternator_port: 8000 +alternator_write_isolation: only_rmw_uses_lwt +alternator_enforce_authorization: true +enable_ipv6_dns_lookup: true +uuid_sstable_identifiers_enabled: false +native_transport_port_ssl: 9142 +enable_tablets: true