Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(restore): adds --dc-mapping flag to restore command #4213

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ INTEGRATION_TEST_ARGS := -cluster $(PUBLIC_NET)100 \
-managed-cluster $(PUBLIC_NET)11,$(PUBLIC_NET)12,$(PUBLIC_NET)13,$(PUBLIC_NET)21,$(PUBLIC_NET)22,$(PUBLIC_NET)23 \
-test-network $(PUBLIC_NET) \
-managed-second-cluster $(PUBLIC_NET)31,$(PUBLIC_NET)32 \
-managed-third-cluster $(PUBLIC_NET)41,$(PUBLIC_NET)42 \
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having 3 clusters for the docker setup seems excessive.
You have written nice unit tests, so we don't need to cover all scenarios with integration tests.
Having said that, could we simply rename the DC in the second cluster (used just for the restore)?
We could even divide this cluster into 2 single nodes DCs if that could help.

-user cassandra -password cassandra \
-agent-auth-token token \
-s3-data-dir ./testing/minio/data -s3-provider Minio -s3-endpoint $(MINIO_ENDPOINT) -s3-access-key-id $(MINIO_USER_ACCESS_KEY) -s3-secret-access-key $(MINIO_USER_SECRET_KEY)
Expand Down
9 changes: 9 additions & 0 deletions pkg/command/restore/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type command struct {
restoreTables bool
dryRun bool
showTables bool
dcMapping dcMappings
}

func NewCommand(client *managerclient.Client) *cobra.Command {
Expand Down Expand Up @@ -90,6 +91,7 @@ func (cmd *command) init() {
w.Unwrap().BoolVar(&cmd.restoreTables, "restore-tables", false, "")
w.Unwrap().BoolVar(&cmd.dryRun, "dry-run", false, "")
w.Unwrap().BoolVar(&cmd.showTables, "show-tables", false, "")
w.Unwrap().Var(&cmd.dcMapping, "dc-mapping", "")
}

func (cmd *command) run(args []string) error {
Expand Down Expand Up @@ -182,6 +184,13 @@ func (cmd *command) run(args []string) error {
props["restore_tables"] = cmd.restoreTables
ok = true
}
if cmd.Flag("dc-mapping").Changed {
if cmd.Update() {
return wrapper("dc-mapping")
}
props["dc-mapping"] = cmd.dcMapping
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are using the convention of naming json fields with _ instead of -, but we use - in the flag names:)
It can cause confusion and name mismatchs, but I think that we should follow it since its present in all of the flag names.

ok = true
}

if cmd.dryRun {
res, err := cmd.client.GetRestoreTarget(cmd.Context(), cmd.cluster, task)
Expand Down
89 changes: 89 additions & 0 deletions pkg/command/restore/dcmappings.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright (C) 2025 ScyllaDB

package restore

import (
"slices"
"strings"

"github.com/pkg/errors"
)

type dcMappings []dcMapping

type dcMapping struct {
Source []string `json:"source"`
IgnoreSource []string `json:"ignore_source"`
Target []string `json:"target"`
IgnoreTarget []string `json:"ignore_target"`
}

const ignoreDCPrefix = "!"

// Set parses --dc-mapping flag, where the syntax is following:
// ; - used to split different mappings
// => - used to split source => target DCs
// , - used to seprate DCs
// ! - used to ignore DC.
func (dcm *dcMappings) Set(v string) error {
mappingParts := strings.Split(v, ";")
for _, dcMapPart := range mappingParts {
sourceTargetParts := strings.Split(dcMapPart, "=>")
if len(sourceTargetParts) != 2 {
return errors.New("invalid syntax, mapping should be in a format of sourceDcs=>targetDcs, but got: " + dcMapPart)
}
if sourceTargetParts[0] == "" || sourceTargetParts[1] == "" {
return errors.New("invalid syntax, mapping should be in a format of sourceDcs=>targetDcs, but got: " + dcMapPart)
}

var mapping dcMapping
mapping.Source, mapping.IgnoreSource = parseDCList(strings.Split(sourceTargetParts[0], ","))
mapping.Target, mapping.IgnoreTarget = parseDCList(strings.Split(sourceTargetParts[1], ","))

*dcm = append(*dcm, mapping)
}
return nil
}

func parseDCList(list []string) (dcs, ignore []string) {
for _, dc := range list {
if strings.HasPrefix(dc, ignoreDCPrefix) {
ignore = append(ignore, strings.TrimPrefix(dc, ignoreDCPrefix))
continue
}
dcs = append(dcs, dc)
}
return dcs, ignore
}

// String builds --dc-mapping flag back from struct.
func (dcm *dcMappings) String() string {
if dcm == nil {
return ""
}
var res strings.Builder
for i, mapping := range *dcm {
source := slices.Concat(mapping.Source, addIgnorePrefix(mapping.IgnoreSource))
target := slices.Concat(mapping.Target, addIgnorePrefix(mapping.IgnoreTarget))
res.WriteString(
strings.Join(source, ",") + "=>" + strings.Join(target, ","),
)
if i != len(*dcm)-1 {
res.WriteString(";")
}
}
return res.String()
}

func addIgnorePrefix(ignore []string) []string {
var result []string
for _, v := range ignore {
result = append(result, ignoreDCPrefix+v)
}
return result
}

// Type implements pflag.Value interface.
func (dcm *dcMappings) Type() string {
return "dc-mapping"
}
185 changes: 185 additions & 0 deletions pkg/command/restore/dcmappings_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
// Copyright (C) 2025 ScyllaDB
package restore

import (
"fmt"
"slices"
"testing"
)

func TestSetDCMapping(t *testing.T) {
testCases := []struct {
input string
expectedErr bool
expectedMappings dcMappings
}{
{
input: "dc1=>dc2",
expectedMappings: dcMappings{
{Source: []string{"dc1"}, Target: []string{"dc2"}},
},
},
{
input: "dc1, dc2=>dc1, dc2",
expectedMappings: dcMappings{
{Source: []string{"dc1", "dc2"}, Target: []string{"dc1", "dc2"}},
},
Comment on lines +23 to +26
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test case does not pass when the result is validated.
I don't see any code handling white spaces during the parsing, but it would be nice to add it.

},
{
input: "dc1=>dc3;dc2=>dc4",
expectedMappings: dcMappings{
{Source: []string{"dc1"}, Target: []string{"dc3"}},
{Source: []string{"dc2"}, Target: []string{"dc4"}},
},
},
{
input: "dc1,dc2=>dc3",
expectedMappings: dcMappings{
{Source: []string{"dc1", "dc2"}, Target: []string{"dc3"}},
},
},
{
input: "dc1,!dc2=>dc3",
expectedMappings: dcMappings{
{
Source: []string{"dc1"},
Target: []string{"dc3"},
IgnoreSource: []string{"dc2"},
},
},
},
{
input: "dc1,!dc2=>dc3,!dc4",
expectedMappings: dcMappings{
{
Source: []string{"dc1"},
Target: []string{"dc3"},
IgnoreSource: []string{"dc2"},
IgnoreTarget: []string{"dc4"},
},
},
},
{
input: "dc1,!dc2=>dc3,!dc4",
expectedMappings: dcMappings{
{
Source: []string{"dc1"},
Target: []string{"dc3"},

IgnoreSource: []string{"dc2"},
IgnoreTarget: []string{"dc4"},
},
},
},
{
input: "!dc1,dc2=>dc3,!dc4",
expectedMappings: dcMappings{
{
Source: []string{"dc2"},
Target: []string{"dc3"},

IgnoreSource: []string{"dc1"},
IgnoreTarget: []string{"dc4"},
},
},
},
{
input: "dc1=>dc3=>dc2=>dc4",
expectedMappings: dcMappings{},
expectedErr: true,
},
{
input: ";",
expectedMappings: dcMappings{},
expectedErr: true,
},
{
input: "=>",
expectedMappings: dcMappings{},
expectedErr: true,
},
{
input: "dc1=>",
expectedMappings: dcMappings{},
expectedErr: true,
},
{
input: "dc1=>;",
expectedMappings: dcMappings{},
expectedErr: true,
},
}

for _, tc := range testCases {
t.Run(tc.input, func(t *testing.T) {
var mappings dcMappings

err := mappings.Set(tc.input)
if tc.expectedErr && err == nil {
t.Fatal("Expected err, but got nil")
}
if !tc.expectedErr && err != nil {
t.Fatalf("Unexpected err: %v", err)
}
slices.EqualFunc(tc.expectedMappings, mappings, func(a, b dcMapping) bool {
return slices.Equal(a.Source, b.Source) &&
slices.Equal(a.Target, b.Target) &&
slices.Equal(a.IgnoreSource, b.IgnoreSource) &&
slices.Equal(a.IgnoreTarget, b.IgnoreTarget)
})
Comment on lines +124 to +129
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that you forgot to check the result of the slices.EqualFunc:)

})
}

}

func TestDCMappingString(t *testing.T) {
testCases := []struct {
mappings dcMappings
expected string
}{
{
mappings: dcMappings{
{Source: []string{"dc1"}, Target: []string{"dc2"}},
},
expected: "dc1=>dc2",
},
{
mappings: dcMappings{
{Source: []string{"dc1"}, Target: []string{"dc2"}},
{Source: []string{"dc3"}, Target: []string{"dc4"}},
},
expected: "dc1=>dc2;dc3=>dc4",
},
{
mappings: dcMappings{
{
Source: []string{"dc1"},
Target: []string{"dc2"},
IgnoreSource: []string{"dc2"},
},
},
expected: "dc1,!dc2=>dc2",
},
{
mappings: dcMappings{
{
Source: []string{"dc1"},
Target: []string{"dc2"},
IgnoreSource: []string{"dc2"},
IgnoreTarget: []string{"dc3"},
},
},
expected: "dc1,!dc2=>dc2,!dc3",
},
{},
}

for i, tc := range testCases {
t.Run(fmt.Sprintf("case %d", i), func(t *testing.T) {
actual := tc.mappings.String()
if actual != tc.expected {
t.Fatalf("Expected %q, but got %q", tc.expected, actual)
}
})
}
}
13 changes: 13 additions & 0 deletions pkg/command/restore/res.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,16 @@ dry-run: |

show-tables: |
Prints table names together with keyspace, used in combination with --dry-run.

dc-mapping: |
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sctool command docs are actually also a part of the regular SM docs.
In order to include those changes in the docs, you should run make docs from repo root dir. In order to see how the changes were rendered, you can run make -C docs preview(or justmake previewfromdocs` dir).

Specifies mapping between DCs from the backup and DCs in the restored(target) cluster.
All DCs from source cluster should be explicitly mapped to all DCs in the target cluster. The only exception is when
source and target cluster has exact match: source dcs == target dcs.
Only works with tables restoration (--restore-tables=true).
Syntax:
"source_dc1,source_dc2=>target_dc1,target_dc2"
Multiple mappings are separated by semicolons (;). Exclamation mark (!) before a DC indicates that it should be ignored during restore.
Examples:
"dc1,dc2=>dc3" - data from dc1 and dc2 DCs should be restored to dc3 DC.
"dc1,dc2=>dc3,!dc4" - data from dc1 and dc2 DCs should be restored to dc3 DC. Ignoring dc4 DC from target cluster.
"dc1,!dc2=>dc2" - data from dc1 should be restored to dc2 DC. Ignoring dc2 from source cluster.
15 changes: 9 additions & 6 deletions pkg/service/restore/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync"

"github.com/pkg/errors"
"github.com/scylladb/go-set/strset"
. "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec"
)

Expand Down Expand Up @@ -57,7 +58,7 @@ type batchDispatcher struct {
hostShardCnt map[string]uint
}

func newBatchDispatcher(workload Workload, batchSize int, hostShardCnt map[string]uint, locationHosts map[Location][]string) *batchDispatcher {
func newBatchDispatcher(workload Workload, batchSize int, hostShardCnt map[string]uint, locationHosts map[Location][]string, hostDCs map[string][]string) *batchDispatcher {
sortWorkload(workload)
var shards uint
for _, sh := range hostShardCnt {
Expand All @@ -70,7 +71,7 @@ func newBatchDispatcher(workload Workload, batchSize int, hostShardCnt map[strin
mu: sync.Mutex{},
wait: make(chan struct{}),
workload: workload,
workloadProgress: newWorkloadProgress(workload, locationHosts),
workloadProgress: newWorkloadProgress(workload, locationHosts, hostDCs),
batchSize: batchSize,
expectedShardWorkload: workload.TotalSize / int64(shards),
hostShardCnt: hostShardCnt,
Expand Down Expand Up @@ -106,7 +107,7 @@ type remoteSSTableDirProgress struct {
RemainingSSTables []RemoteSSTable
}

func newWorkloadProgress(workload Workload, locationHosts map[Location][]string) workloadProgress {
func newWorkloadProgress(workload Workload, locationHosts map[Location][]string, hostDCs map[string][]string) workloadProgress {
dcBytes := make(map[string]int64)
locationDC := make(map[string][]string)
p := make([]remoteSSTableDirProgress, len(workload.RemoteDir))
Expand All @@ -121,7 +122,9 @@ func newWorkloadProgress(workload Workload, locationHosts map[Location][]string)
hostDCAccess := make(map[string][]string)
for loc, hosts := range locationHosts {
for _, h := range hosts {
hostDCAccess[h] = append(hostDCAccess[h], locationDC[loc.StringWithoutDC()]...)
dcsInLoc := locationDC[loc.StringWithoutDC()]
hostAllDCs := hostDCs[h]
hostDCAccess[h] = append(hostDCAccess[h], strset.Intersection(strset.New(dcsInLoc...), strset.New(hostAllDCs...)).List()...)
Comment on lines +125 to +127
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that we should just make an intersection here.
If someone specified that given DC should be restored by given DC, then all hosts from this DC should have access to the appropriate location. This should be done inside the GetTarget initial validation.

}
}
return workloadProgress{
Expand Down Expand Up @@ -201,8 +204,8 @@ func (bd *batchDispatcher) ValidateAllDispatched() error {
for i, rdp := range bd.workloadProgress.remoteDir {
if rdp.RemainingSize != 0 || len(rdp.RemainingSSTables) != 0 {
rdw := bd.workload.RemoteDir[i]
return errors.Errorf("failed to restore sstables from location %s table %s.%s (%d bytes). See logs for more info",
rdw.Location, rdw.Keyspace, rdw.Table, rdw.Size)
return errors.Errorf("failed to restore sstables from location %s dc %s table %s.%s (%d bytes). See logs for more info",
rdw.Location, rdw.DC, rdw.Keyspace, rdw.Table, rdw.Size)
}
}
for dc, bytes := range bd.workloadProgress.dcBytesToBeRestored {
Expand Down
Loading
Loading