Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(restore): adds --dc-mapping flag to restore command #4213

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions pkg/service/restore/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync"

"github.com/pkg/errors"
"github.com/scylladb/go-set/strset"
. "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec"
)

Expand Down Expand Up @@ -57,7 +58,7 @@ type batchDispatcher struct {
hostShardCnt map[string]uint
}

func newBatchDispatcher(workload Workload, batchSize int, hostShardCnt map[string]uint, locationHosts map[Location][]string) *batchDispatcher {
func newBatchDispatcher(workload Workload, batchSize int, hostShardCnt map[string]uint, locationHosts map[Location][]string, hostDCs map[string][]string) *batchDispatcher {
sortWorkload(workload)
var shards uint
for _, sh := range hostShardCnt {
Expand All @@ -70,7 +71,7 @@ func newBatchDispatcher(workload Workload, batchSize int, hostShardCnt map[strin
mu: sync.Mutex{},
wait: make(chan struct{}),
workload: workload,
workloadProgress: newWorkloadProgress(workload, locationHosts),
workloadProgress: newWorkloadProgress(workload, locationHosts, hostDCs),
batchSize: batchSize,
expectedShardWorkload: workload.TotalSize / int64(shards),
hostShardCnt: hostShardCnt,
Expand Down Expand Up @@ -106,7 +107,7 @@ type remoteSSTableDirProgress struct {
RemainingSSTables []RemoteSSTable
}

func newWorkloadProgress(workload Workload, locationHosts map[Location][]string) workloadProgress {
func newWorkloadProgress(workload Workload, locationHosts map[Location][]string, hostDCs map[string][]string) workloadProgress {
dcBytes := make(map[string]int64)
locationDC := make(map[string][]string)
p := make([]remoteSSTableDirProgress, len(workload.RemoteDir))
Expand All @@ -121,7 +122,9 @@ func newWorkloadProgress(workload Workload, locationHosts map[Location][]string)
hostDCAccess := make(map[string][]string)
for loc, hosts := range locationHosts {
for _, h := range hosts {
hostDCAccess[h] = append(hostDCAccess[h], locationDC[loc.StringWithoutDC()]...)
dcsInLoc := locationDC[loc.StringWithoutDC()]
hostAllDCs := hostDCs[h]
hostDCAccess[h] = append(hostDCAccess[h], strset.Intersection(strset.New(dcsInLoc...), strset.New(hostAllDCs...)).List()...)
Comment on lines +125 to +127
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that we should just make an intersection here.
If someone specified that given DC should be restored by given DC, then all hosts from this DC should have access to the appropriate location. This should be done inside the GetTarget initial validation.

}
}
return workloadProgress{
Expand Down Expand Up @@ -201,8 +204,8 @@ func (bd *batchDispatcher) ValidateAllDispatched() error {
for i, rdp := range bd.workloadProgress.remoteDir {
if rdp.RemainingSize != 0 || len(rdp.RemainingSSTables) != 0 {
rdw := bd.workload.RemoteDir[i]
return errors.Errorf("failed to restore sstables from location %s table %s.%s (%d bytes). See logs for more info",
rdw.Location, rdw.Keyspace, rdw.Table, rdw.Size)
return errors.Errorf("failed to restore sstables from location %s dc %s table %s.%s (%d bytes). See logs for more info",
rdw.Location, rdw.DC, rdw.Keyspace, rdw.Table, rdw.Size)
}
}
for dc, bytes := range bd.workloadProgress.dcBytesToBeRestored {
Expand Down
169 changes: 168 additions & 1 deletion pkg/service/restore/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ package restore
import (
"testing"

"github.com/google/go-cmp/cmp"

"github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec"
)

Expand Down Expand Up @@ -114,7 +116,13 @@ func TestBatchDispatcher(t *testing.T) {
"h3": 3,
}

bd := newBatchDispatcher(workload, 1, hostToShard, locationHosts)
hostDCs := map[string][]string{
"h1": {"dc1", "dc2"},
"h2": {"dc1", "dc2"},
"h3": {"dc3"},
}

bd := newBatchDispatcher(workload, 1, hostToShard, locationHosts, hostDCs)

scenario := []struct {
host string
Expand Down Expand Up @@ -166,3 +174,162 @@ func TestBatchDispatcher(t *testing.T) {
t.Fatalf("Expected sstables to be batched: %s", err)
}
}

func TestNewWorkloadProgress(t *testing.T) {
testCases := []struct {
name string

workload Workload
locationHosts map[backupspec.Location][]string
hostDCs map[string][]string

expected map[string][]string
}{
{
name: "one location with one DC",
workload: generateWorkload(t, []string{""}, map[string][]string{"": {"dc1"}}),
locationHosts: map[backupspec.Location][]string{
{}: {"host1", "host2"},
},
hostDCs: map[string][]string{
"host1": {"dc1"},
"host2": {"dc1"},
},

expected: map[string][]string{
"host1": {"dc1"},
"host2": {"dc1"},
},
},
{
name: "one location with two DC's",
workload: generateWorkload(t, []string{""}, map[string][]string{"": {"dc1", "dc2"}}),
locationHosts: map[backupspec.Location][]string{
{}: {"host1", "host2"},
},
hostDCs: map[string][]string{
"host1": {"dc1"},
"host2": {"dc2"},
},

expected: map[string][]string{
"host1": {"dc1"},
"host2": {"dc2"},
},
},
{
name: "one location with two DC's, more nodes",
workload: generateWorkload(t, []string{""}, map[string][]string{"": {"dc1", "dc2"}}),
locationHosts: map[backupspec.Location][]string{
{}: {"host1", "host2", "host3", "host4"},
},
hostDCs: map[string][]string{
"host1": {"dc1"},
"host2": {"dc1"},
"host3": {"dc2"},
"host4": {"dc2"},
},

expected: map[string][]string{
"host1": {"dc1"},
"host2": {"dc1"},
"host3": {"dc2"},
"host4": {"dc2"},
},
},
{
name: "two locations with one DC each",
workload: generateWorkload(t,
[]string{"location1", "location2"},
map[string][]string{"location1": {"dc1"}, "location2": {"dc2"}},
),
locationHosts: map[backupspec.Location][]string{
{Path: "location1"}: {"host1", "host2"},
{Path: "location2"}: {"host1", "host2"},
},
hostDCs: map[string][]string{
"host1": {"dc1"},
"host2": {"dc2"},
},

expected: map[string][]string{
"host1": {"dc1"},
"host2": {"dc2"},
},
},
{
name: "two locations with one DC each, but some hosts doesn't have access",
workload: generateWorkload(t,
[]string{"location1", "location2"},
map[string][]string{"location1": {"dc1"}, "location2": {"dc2"}},
),
locationHosts: map[backupspec.Location][]string{
{Path: "location1"}: {"host1", "host3", "host4"},
{Path: "location2"}: {"host1", "host2", "host4"},
},
hostDCs: map[string][]string{
"host1": {"dc1"},
"host2": {"dc1"},
"host3": {"dc2"},
"host4": {"dc2"},
},

expected: map[string][]string{
"host1": {"dc1"},
"host2": nil,
"host3": nil,
"host4": {"dc2"},
},
},
{
name: "two locations with one DC each, but hosts maps to all dcs",
workload: generateWorkload(t,
[]string{"location1", "location2"},
map[string][]string{"location1": {"dc1"}, "location2": {"dc2"}},
),
locationHosts: map[backupspec.Location][]string{
{Path: "location1"}: {"host1", "host2"},
{Path: "location2"}: {"host1", "host2"},
},
hostDCs: map[string][]string{
"host1": {"dc1", "dc2"},
"host2": {"dc1", "dc2"},
},

expected: map[string][]string{
"host1": {"dc1", "dc2"},
"host2": {"dc1", "dc2"},
},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
wp := newWorkloadProgress(tc.workload, tc.locationHosts, tc.hostDCs)
if diff := cmp.Diff(wp.hostDCAccess, tc.expected); diff != "" {
t.Fatalf("Actual != Expected: %s", diff)
}
})
}
}

func generateWorkload(t *testing.T, locationPaths []string, dcsInLocation map[string][]string) Workload {
t.Helper()

var remoteDirs []RemoteDirWorkload
for _, path := range locationPaths {
dcs, ok := dcsInLocation[path]
if !ok {
t.Fatalf("each location should have corresponding entry in dcsInLocation map")
}
for _, dc := range dcs {
remoteDirs = append(remoteDirs, RemoteDirWorkload{
ManifestInfo: &backupspec.ManifestInfo{
DC: dc,
Location: backupspec.Location{Path: path},
},
})
}
}
return Workload{RemoteDir: remoteDirs}
}
4 changes: 4 additions & 0 deletions pkg/service/restore/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ func (w *tablesWorker) indexLocationWorkload(ctx context.Context, location Locat
func (w *tablesWorker) createRemoteDirWorkloads(ctx context.Context, location Location) ([]RemoteDirWorkload, error) {
var rawWorkload []RemoteDirWorkload
err := w.forEachManifest(ctx, location, func(m ManifestInfoWithContent) error {
if slices.Contains(w.target.ignoredSourceDC, m.DC) {
w.logger.Info(ctx, "Ignoring DC", "dc", m.DC, "location", location)
return nil
}
return m.ForEachIndexIterWithError(nil, func(fm FilesMeta) error {
if !unitsContainTable(w.run.Units, fm.Keyspace, fm.Table) {
return nil
Expand Down
55 changes: 55 additions & 0 deletions pkg/service/restore/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/gocql/gocql"
"github.com/pkg/errors"
"github.com/scylladb/go-set/strset"
"github.com/scylladb/gocqlx/v2"
"github.com/scylladb/scylla-manager/v3/pkg/scyllaclient"
"github.com/scylladb/scylla-manager/v3/pkg/service/repair"
Expand All @@ -33,9 +34,14 @@ type Target struct {
RestoreSchema bool `json:"restore_schema,omitempty"`
RestoreTables bool `json:"restore_tables,omitempty"`
Continue bool `json:"continue"`
DCMappings DCMappings `json:"dc-mapping"`

// Cache for host with access to remote location
locationHosts map[Location][]string `json:"-"`
// Cache for host and their DC after applying DCMappings
hostDCs map[string][]string
// Cache for dcs that shouldn't be restored from location
ignoredSourceDC []string
Comment on lines +37 to +44
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be way simpler if we just had a single mapping map[string][string] of dst DC -> src DC list?
It could replace all locationHosts, hostDCs, ignoredSourceDC.
During batch dispatch we could/should have access to given hosts DC, so it would give us enough information for correctness.
Ignored DCs could be replaced with restored DCs which could be easily obtained from such mapping when needed for filtering.

The assumption is that we don't need to care for cases like: what if only a few hosts from this DC have access to this location. Even when making backup, the whole DC needs to be backed up to a single location and all hosts from given DC needs to have access to it.

}

const (
Expand Down Expand Up @@ -82,6 +88,16 @@ func (t Target) validateProperties(dcMap map[string][]string) error {
if t.RestoreSchema && t.Keyspace != nil {
return errors.New("restore schema always restores 'system_schema.*' tables only, no need to specify '--keyspace' flag")
}
// Check for duplicates in Location
allLocations := strset.New()
for _, l := range t.Location {
p := l.RemotePath("")
if allLocations.Has(p) {
return errors.Errorf("location %s is specified multiple times", l)
}
allLocations.Add(p)
}

return nil
}

Expand Down Expand Up @@ -289,3 +305,42 @@ type HostInfo struct {
Transfers int
RateLimit int
}

// DCMappings represents how DCs from the backup cluster are mapped to DCs in the restore cluster.
// For details about how DCs can be mapped refer to --dc-mapping documentation.
type DCMappings []DCMapping

// DCMapping represent single instance of datacenter mappings. See DCMappings for details.
type DCMapping struct {
Source []string `json:"source"`
IgnoreSource []string `json:"ignore_source"`
Target []string `json:"target"`
IgnoreTarget []string `json:"ignore_target"`
}

func (mappings DCMappings) calculateMappings() (targetMap map[string][]string, ignoreSource, ignoreTarget []string) {
targetMap = map[string][]string{}
for _, mapping := range mappings {
ignoreSource = append(ignoreSource, mapping.IgnoreSource...)
ignoreTarget = append(ignoreTarget, mapping.IgnoreTarget...)

if len(mapping.Source) == 0 || len(mapping.Target) == 0 {
continue
}
tIdx, sIdx := 0, 0
for {
target, source := mapping.Target[tIdx], mapping.Source[sIdx]
targetMap[target] = append(targetMap[target], source)
if tIdx == len(mapping.Target)-1 && sIdx == len(mapping.Source)-1 {
break
}
if tIdx < len(mapping.Target)-1 {
tIdx++
}
if sIdx < len(mapping.Source)-1 {
sIdx++
}
}
}
return targetMap, ignoreSource, ignoreTarget
}
Loading