-
Notifications
You must be signed in to change notification settings - Fork 34
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(restore): adds --dc-mapping flag to restore command #4213
base: master
Are you sure you want to change the base?
Changes from all commits
e3d9b34
532ad04
4622a83
f0229b4
e03889c
e30e28a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -37,6 +37,7 @@ type command struct { | |
restoreTables bool | ||
dryRun bool | ||
showTables bool | ||
dcMapping dcMappings | ||
} | ||
|
||
func NewCommand(client *managerclient.Client) *cobra.Command { | ||
|
@@ -90,6 +91,7 @@ func (cmd *command) init() { | |
w.Unwrap().BoolVar(&cmd.restoreTables, "restore-tables", false, "") | ||
w.Unwrap().BoolVar(&cmd.dryRun, "dry-run", false, "") | ||
w.Unwrap().BoolVar(&cmd.showTables, "show-tables", false, "") | ||
w.Unwrap().Var(&cmd.dcMapping, "dc-mapping", "") | ||
} | ||
|
||
func (cmd *command) run(args []string) error { | ||
|
@@ -182,6 +184,13 @@ func (cmd *command) run(args []string) error { | |
props["restore_tables"] = cmd.restoreTables | ||
ok = true | ||
} | ||
if cmd.Flag("dc-mapping").Changed { | ||
if cmd.Update() { | ||
return wrapper("dc-mapping") | ||
} | ||
props["dc-mapping"] = cmd.dcMapping | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We are using the convention of naming |
||
ok = true | ||
} | ||
|
||
if cmd.dryRun { | ||
res, err := cmd.client.GetRestoreTarget(cmd.Context(), cmd.cluster, task) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
// Copyright (C) 2025 ScyllaDB | ||
|
||
package restore | ||
|
||
import ( | ||
"slices" | ||
"strings" | ||
|
||
"github.com/pkg/errors" | ||
) | ||
|
||
type dcMappings []dcMapping | ||
|
||
type dcMapping struct { | ||
Source []string `json:"source"` | ||
IgnoreSource []string `json:"ignore_source"` | ||
Target []string `json:"target"` | ||
IgnoreTarget []string `json:"ignore_target"` | ||
} | ||
|
||
const ignoreDCPrefix = "!" | ||
|
||
// Set parses --dc-mapping flag, where the syntax is following: | ||
// ; - used to split different mappings | ||
// => - used to split source => target DCs | ||
// , - used to seprate DCs | ||
// ! - used to ignore DC. | ||
func (dcm *dcMappings) Set(v string) error { | ||
mappingParts := strings.Split(v, ";") | ||
for _, dcMapPart := range mappingParts { | ||
sourceTargetParts := strings.Split(dcMapPart, "=>") | ||
if len(sourceTargetParts) != 2 { | ||
return errors.New("invalid syntax, mapping should be in a format of sourceDcs=>targetDcs, but got: " + dcMapPart) | ||
} | ||
if sourceTargetParts[0] == "" || sourceTargetParts[1] == "" { | ||
return errors.New("invalid syntax, mapping should be in a format of sourceDcs=>targetDcs, but got: " + dcMapPart) | ||
} | ||
|
||
var mapping dcMapping | ||
mapping.Source, mapping.IgnoreSource = parseDCList(strings.Split(sourceTargetParts[0], ",")) | ||
mapping.Target, mapping.IgnoreTarget = parseDCList(strings.Split(sourceTargetParts[1], ",")) | ||
|
||
*dcm = append(*dcm, mapping) | ||
} | ||
return nil | ||
} | ||
|
||
func parseDCList(list []string) (dcs, ignore []string) { | ||
for _, dc := range list { | ||
if strings.HasPrefix(dc, ignoreDCPrefix) { | ||
ignore = append(ignore, strings.TrimPrefix(dc, ignoreDCPrefix)) | ||
continue | ||
} | ||
dcs = append(dcs, dc) | ||
} | ||
return dcs, ignore | ||
} | ||
|
||
// String builds --dc-mapping flag back from struct. | ||
func (dcm *dcMappings) String() string { | ||
if dcm == nil { | ||
return "" | ||
} | ||
var res strings.Builder | ||
for i, mapping := range *dcm { | ||
source := slices.Concat(mapping.Source, addIgnorePrefix(mapping.IgnoreSource)) | ||
target := slices.Concat(mapping.Target, addIgnorePrefix(mapping.IgnoreTarget)) | ||
res.WriteString( | ||
strings.Join(source, ",") + "=>" + strings.Join(target, ","), | ||
) | ||
if i != len(*dcm)-1 { | ||
res.WriteString(";") | ||
} | ||
} | ||
return res.String() | ||
} | ||
|
||
func addIgnorePrefix(ignore []string) []string { | ||
var result []string | ||
for _, v := range ignore { | ||
result = append(result, ignoreDCPrefix+v) | ||
} | ||
return result | ||
} | ||
|
||
// Type implements pflag.Value interface. | ||
func (dcm *dcMappings) Type() string { | ||
return "dc-mapping" | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,185 @@ | ||
// Copyright (C) 2025 ScyllaDB | ||
package restore | ||
|
||
import ( | ||
"fmt" | ||
"slices" | ||
"testing" | ||
) | ||
|
||
func TestSetDCMapping(t *testing.T) { | ||
testCases := []struct { | ||
input string | ||
expectedErr bool | ||
expectedMappings dcMappings | ||
}{ | ||
{ | ||
input: "dc1=>dc2", | ||
expectedMappings: dcMappings{ | ||
{Source: []string{"dc1"}, Target: []string{"dc2"}}, | ||
}, | ||
}, | ||
{ | ||
input: "dc1, dc2=>dc1, dc2", | ||
expectedMappings: dcMappings{ | ||
{Source: []string{"dc1", "dc2"}, Target: []string{"dc1", "dc2"}}, | ||
}, | ||
Comment on lines
+23
to
+26
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This test case does not pass when the result is validated. |
||
}, | ||
{ | ||
input: "dc1=>dc3;dc2=>dc4", | ||
expectedMappings: dcMappings{ | ||
{Source: []string{"dc1"}, Target: []string{"dc3"}}, | ||
{Source: []string{"dc2"}, Target: []string{"dc4"}}, | ||
}, | ||
}, | ||
{ | ||
input: "dc1,dc2=>dc3", | ||
expectedMappings: dcMappings{ | ||
{Source: []string{"dc1", "dc2"}, Target: []string{"dc3"}}, | ||
}, | ||
}, | ||
{ | ||
input: "dc1,!dc2=>dc3", | ||
expectedMappings: dcMappings{ | ||
{ | ||
Source: []string{"dc1"}, | ||
Target: []string{"dc3"}, | ||
IgnoreSource: []string{"dc2"}, | ||
}, | ||
}, | ||
}, | ||
{ | ||
input: "dc1,!dc2=>dc3,!dc4", | ||
expectedMappings: dcMappings{ | ||
{ | ||
Source: []string{"dc1"}, | ||
Target: []string{"dc3"}, | ||
IgnoreSource: []string{"dc2"}, | ||
IgnoreTarget: []string{"dc4"}, | ||
}, | ||
}, | ||
}, | ||
{ | ||
input: "dc1,!dc2=>dc3,!dc4", | ||
expectedMappings: dcMappings{ | ||
{ | ||
Source: []string{"dc1"}, | ||
Target: []string{"dc3"}, | ||
|
||
IgnoreSource: []string{"dc2"}, | ||
IgnoreTarget: []string{"dc4"}, | ||
}, | ||
}, | ||
}, | ||
{ | ||
input: "!dc1,dc2=>dc3,!dc4", | ||
expectedMappings: dcMappings{ | ||
{ | ||
Source: []string{"dc2"}, | ||
Target: []string{"dc3"}, | ||
|
||
IgnoreSource: []string{"dc1"}, | ||
IgnoreTarget: []string{"dc4"}, | ||
}, | ||
}, | ||
}, | ||
{ | ||
input: "dc1=>dc3=>dc2=>dc4", | ||
expectedMappings: dcMappings{}, | ||
expectedErr: true, | ||
}, | ||
{ | ||
input: ";", | ||
expectedMappings: dcMappings{}, | ||
expectedErr: true, | ||
}, | ||
{ | ||
input: "=>", | ||
expectedMappings: dcMappings{}, | ||
expectedErr: true, | ||
}, | ||
{ | ||
input: "dc1=>", | ||
expectedMappings: dcMappings{}, | ||
expectedErr: true, | ||
}, | ||
{ | ||
input: "dc1=>;", | ||
expectedMappings: dcMappings{}, | ||
expectedErr: true, | ||
}, | ||
} | ||
|
||
for _, tc := range testCases { | ||
t.Run(tc.input, func(t *testing.T) { | ||
var mappings dcMappings | ||
|
||
err := mappings.Set(tc.input) | ||
if tc.expectedErr && err == nil { | ||
t.Fatal("Expected err, but got nil") | ||
} | ||
if !tc.expectedErr && err != nil { | ||
t.Fatalf("Unexpected err: %v", err) | ||
} | ||
slices.EqualFunc(tc.expectedMappings, mappings, func(a, b dcMapping) bool { | ||
return slices.Equal(a.Source, b.Source) && | ||
slices.Equal(a.Target, b.Target) && | ||
slices.Equal(a.IgnoreSource, b.IgnoreSource) && | ||
slices.Equal(a.IgnoreTarget, b.IgnoreTarget) | ||
}) | ||
Comment on lines
+124
to
+129
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that you forgot to check the result of the |
||
}) | ||
} | ||
|
||
} | ||
|
||
func TestDCMappingString(t *testing.T) { | ||
testCases := []struct { | ||
mappings dcMappings | ||
expected string | ||
}{ | ||
{ | ||
mappings: dcMappings{ | ||
{Source: []string{"dc1"}, Target: []string{"dc2"}}, | ||
}, | ||
expected: "dc1=>dc2", | ||
}, | ||
{ | ||
mappings: dcMappings{ | ||
{Source: []string{"dc1"}, Target: []string{"dc2"}}, | ||
{Source: []string{"dc3"}, Target: []string{"dc4"}}, | ||
}, | ||
expected: "dc1=>dc2;dc3=>dc4", | ||
}, | ||
{ | ||
mappings: dcMappings{ | ||
{ | ||
Source: []string{"dc1"}, | ||
Target: []string{"dc2"}, | ||
IgnoreSource: []string{"dc2"}, | ||
}, | ||
}, | ||
expected: "dc1,!dc2=>dc2", | ||
}, | ||
{ | ||
mappings: dcMappings{ | ||
{ | ||
Source: []string{"dc1"}, | ||
Target: []string{"dc2"}, | ||
IgnoreSource: []string{"dc2"}, | ||
IgnoreTarget: []string{"dc3"}, | ||
}, | ||
}, | ||
expected: "dc1,!dc2=>dc2,!dc3", | ||
}, | ||
{}, | ||
} | ||
|
||
for i, tc := range testCases { | ||
t.Run(fmt.Sprintf("case %d", i), func(t *testing.T) { | ||
actual := tc.mappings.String() | ||
if actual != tc.expected { | ||
t.Fatalf("Expected %q, but got %q", tc.expected, actual) | ||
} | ||
}) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -72,3 +72,16 @@ dry-run: | | |
|
||
show-tables: | | ||
Prints table names together with keyspace, used in combination with --dry-run. | ||
|
||
dc-mapping: | | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The sctool command docs are actually also a part of the regular SM docs. |
||
Specifies mapping between DCs from the backup and DCs in the restored(target) cluster. | ||
All DCs from source cluster should be explicitly mapped to all DCs in the target cluster. The only exception is when | ||
source and target cluster has exact match: source dcs == target dcs. | ||
Only works with tables restoration (--restore-tables=true). | ||
Syntax: | ||
"source_dc1,source_dc2=>target_dc1,target_dc2" | ||
Multiple mappings are separated by semicolons (;). Exclamation mark (!) before a DC indicates that it should be ignored during restore. | ||
Examples: | ||
"dc1,dc2=>dc3" - data from dc1 and dc2 DCs should be restored to dc3 DC. | ||
"dc1,dc2=>dc3,!dc4" - data from dc1 and dc2 DCs should be restored to dc3 DC. Ignoring dc4 DC from target cluster. | ||
"dc1,!dc2=>dc2" - data from dc1 should be restored to dc2 DC. Ignoring dc2 from source cluster. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,6 +8,7 @@ import ( | |
"sync" | ||
|
||
"github.com/pkg/errors" | ||
"github.com/scylladb/go-set/strset" | ||
. "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec" | ||
) | ||
|
||
|
@@ -57,7 +58,7 @@ type batchDispatcher struct { | |
hostShardCnt map[string]uint | ||
} | ||
|
||
func newBatchDispatcher(workload Workload, batchSize int, hostShardCnt map[string]uint, locationHosts map[Location][]string) *batchDispatcher { | ||
func newBatchDispatcher(workload Workload, batchSize int, hostShardCnt map[string]uint, locationHosts map[Location][]string, hostDCs map[string][]string) *batchDispatcher { | ||
sortWorkload(workload) | ||
var shards uint | ||
for _, sh := range hostShardCnt { | ||
|
@@ -70,7 +71,7 @@ func newBatchDispatcher(workload Workload, batchSize int, hostShardCnt map[strin | |
mu: sync.Mutex{}, | ||
wait: make(chan struct{}), | ||
workload: workload, | ||
workloadProgress: newWorkloadProgress(workload, locationHosts), | ||
workloadProgress: newWorkloadProgress(workload, locationHosts, hostDCs), | ||
batchSize: batchSize, | ||
expectedShardWorkload: workload.TotalSize / int64(shards), | ||
hostShardCnt: hostShardCnt, | ||
|
@@ -106,7 +107,7 @@ type remoteSSTableDirProgress struct { | |
RemainingSSTables []RemoteSSTable | ||
} | ||
|
||
func newWorkloadProgress(workload Workload, locationHosts map[Location][]string) workloadProgress { | ||
func newWorkloadProgress(workload Workload, locationHosts map[Location][]string, hostDCs map[string][]string) workloadProgress { | ||
dcBytes := make(map[string]int64) | ||
locationDC := make(map[string][]string) | ||
p := make([]remoteSSTableDirProgress, len(workload.RemoteDir)) | ||
|
@@ -121,7 +122,9 @@ func newWorkloadProgress(workload Workload, locationHosts map[Location][]string) | |
hostDCAccess := make(map[string][]string) | ||
for loc, hosts := range locationHosts { | ||
for _, h := range hosts { | ||
hostDCAccess[h] = append(hostDCAccess[h], locationDC[loc.StringWithoutDC()]...) | ||
dcsInLoc := locationDC[loc.StringWithoutDC()] | ||
hostAllDCs := hostDCs[h] | ||
hostDCAccess[h] = append(hostDCAccess[h], strset.Intersection(strset.New(dcsInLoc...), strset.New(hostAllDCs...)).List()...) | ||
Comment on lines
+125
to
+127
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think that we should just make an intersection here. |
||
} | ||
} | ||
return workloadProgress{ | ||
|
@@ -201,8 +204,8 @@ func (bd *batchDispatcher) ValidateAllDispatched() error { | |
for i, rdp := range bd.workloadProgress.remoteDir { | ||
if rdp.RemainingSize != 0 || len(rdp.RemainingSSTables) != 0 { | ||
rdw := bd.workload.RemoteDir[i] | ||
return errors.Errorf("failed to restore sstables from location %s table %s.%s (%d bytes). See logs for more info", | ||
rdw.Location, rdw.Keyspace, rdw.Table, rdw.Size) | ||
return errors.Errorf("failed to restore sstables from location %s dc %s table %s.%s (%d bytes). See logs for more info", | ||
rdw.Location, rdw.DC, rdw.Keyspace, rdw.Table, rdw.Size) | ||
} | ||
} | ||
for dc, bytes := range bd.workloadProgress.dcBytesToBeRestored { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having 3 clusters for the docker setup seems excessive.
You have written nice unit tests, so we don't need to cover all scenarios with integration tests.
Having said that, could we simply rename the DC in the second cluster (used just for the restore)?
We could even divide this cluster into 2 single nodes DCs if that could help.