diff --git a/cmd/directpv/controller.go b/cmd/directpv/controller.go
index e0ae6dafc..98c81da56 100644
--- a/cmd/directpv/controller.go
+++ b/cmd/directpv/controller.go
@@ -18,12 +18,19 @@ package main
import (
"context"
+ "os"
+ "time"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/minio/directpv/pkg/consts"
"github.com/minio/directpv/pkg/csi/controller"
pkgidentity "github.com/minio/directpv/pkg/csi/identity"
+ "github.com/minio/directpv/pkg/jobs"
+ "github.com/minio/directpv/pkg/k8s"
"github.com/spf13/cobra"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/client-go/tools/leaderelection"
+ "k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/klog/v2"
)
@@ -75,5 +82,44 @@ func startController(ctx context.Context) error {
}
}()
+ go func() {
+ runJobsController(ctx)
+ }()
+
return <-errCh
}
+
+func runJobsController(ctx context.Context) {
+ podName := os.Getenv("HOSTNAME")
+ if podName == "" {
+ klog.V(5).Info("unable to get the pod name from env; defaulting to pod name: directpv-controller")
+ podName = "directpv-controller"
+ }
+ lock := &resourcelock.LeaseLock{
+ LeaseMeta: metav1.ObjectMeta{
+ Name: consts.AppName + "-jobs-controller",
+ Namespace: consts.AppNamespace,
+ },
+ Client: k8s.KubeClient().CoordinationV1(),
+ LockConfig: resourcelock.ResourceLockConfig{
+ Identity: podName,
+ },
+ }
+ // start the leader election code loop
+ leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
+ Lock: lock,
+ ReleaseOnCancel: true,
+ LeaseDuration: 60 * time.Second,
+ RenewDeadline: 15 * time.Second,
+ RetryPeriod: 5 * time.Second,
+ Callbacks: leaderelection.LeaderCallbacks{
+ OnStartedLeading: func(ctx context.Context) {
+ klog.Info("started leading")
+ jobs.StartController(ctx)
+ },
+ OnStoppedLeading: func() {
+ klog.Infof("leader lost")
+ },
+ },
+ })
+}
diff --git a/cmd/directpv/copy.go b/cmd/directpv/copy.go
new file mode 100644
index 000000000..5b9aa0e0d
--- /dev/null
+++ b/cmd/directpv/copy.go
@@ -0,0 +1,237 @@
+// This file is part of MinIO DirectPV
+// Copyright (c) 2023 MinIO, Inc.
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package main
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "io"
+ "os"
+ "path/filepath"
+ "strings"
+ "syscall"
+ "time"
+
+ "github.com/dustin/go-humanize"
+ "github.com/minio/directpv/pkg/client"
+ "github.com/minio/directpv/pkg/sys"
+ "github.com/minio/directpv/pkg/types"
+ "github.com/minio/directpv/pkg/xfs"
+ xfilepath "github.com/minio/filepath"
+ "github.com/spf13/cobra"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/klog/v2"
+)
+
+var (
+ volumeID string
+ dryRunFlag bool
+)
+
+var copyCmd = &cobra.Command{
+ Use: "copy SRC-DRIVE DEST-DRIVE --volume-id VOLUME-ID",
+ Short: "copy the volume data from source drive to destination drive",
+ Aliases: []string{"cp"},
+ SilenceUsage: true,
+ SilenceErrors: true,
+ RunE: func(c *cobra.Command, args []string) error {
+ switch len(args) {
+ case 0:
+ return errors.New("source and destination DRIVE-IDs should be provided")
+ case 1:
+ return errors.New("both the source and destination DRIVE-IDs should be provided")
+ case 2:
+ default:
+ return errors.New("invalid syntax")
+ }
+ if volumeID == "" {
+ return errors.New("'--volume-id' should be provided")
+ }
+ if args[0] == args[1] {
+ return errors.New("both the source and destination DRIVE-IDs are same")
+ }
+
+ ctx := c.Context()
+ srcDrive, err := client.DriveClient().Get(ctx, args[0], metav1.GetOptions{
+ TypeMeta: types.NewDriveTypeMeta(),
+ })
+ if err != nil {
+ return err
+ }
+ destDrive, err := client.DriveClient().Get(ctx, args[1], metav1.GetOptions{
+ TypeMeta: types.NewDriveTypeMeta(),
+ })
+ if err != nil {
+ return err
+ }
+ volume, err := client.VolumeClient().Get(ctx, volumeID, metav1.GetOptions{
+ TypeMeta: types.NewVolumeTypeMeta(),
+ })
+ if err != nil {
+ return err
+ }
+ if !destDrive.VolumeExist(volumeID) {
+ return errors.New("volume finalizer not found on the destination drive")
+ }
+ if volume.GetNodeID() != nodeID {
+ return errors.New("the nodeID in the volume doesn't match")
+ }
+ if err := checkDrive(srcDrive); err != nil {
+ klog.ErrorS(err, "unable to check the source drive", "driveID", srcDrive.Name)
+ return err
+ }
+ if err := checkDrive(destDrive); err != nil {
+ klog.ErrorS(err, "unable to check the destination drive", "driveID", destDrive.Name)
+ return err
+ }
+ err = startCopy(ctx, srcDrive, destDrive, volume)
+ if err != nil {
+ klog.ErrorS(err, "unable to copy", "source", srcDrive.Name, "destination", destDrive.Name)
+ }
+ return err
+ },
+}
+
+func init() {
+ copyCmd.PersistentFlags().StringVar(&volumeID, "volume-id", volumeID, "Set the volumeID of the volume to be copied")
+ copyCmd.PersistentFlags().BoolVar(&dryRunFlag, "dry-run", dryRunFlag, "Enable dry-run mode")
+}
+
+func checkDrive(drive *types.Drive) error {
+ if drive.GetNodeID() != nodeID {
+ return errors.New("the nodeID in the drive doesn't match")
+ }
+ if _, err := os.Lstat(types.GetVolumeRootDir(drive.Status.FSUUID)); err != nil {
+ return fmt.Errorf("unable to stat the volume root directory; %v", err)
+ }
+ if _, err := sys.GetDeviceByFSUUID(drive.Status.FSUUID); err != nil {
+ return fmt.Errorf("unable to find device by its FSUUID; %v", err)
+ }
+ return nil
+}
+
+func startCopy(ctx context.Context, srcDrive, destDrive *types.Drive, volume *types.Volume) error {
+ if dryRunFlag {
+ return nil
+ }
+
+ sourcePath := types.GetVolumeDir(srcDrive.Status.FSUUID, volume.Name)
+ destPath := types.GetVolumeDir(destDrive.Status.FSUUID, volume.Name)
+
+ if _, err := os.Lstat(sourcePath); err != nil {
+ return fmt.Errorf("unable to stat the sourcePath %v; %v", sourcePath, err)
+ }
+ if err := sys.Mkdir(destPath, 0o755); err != nil && !errors.Is(err, os.ErrExist) {
+ return fmt.Errorf("unable to create the targetPath %v; %v", destPath, err)
+ }
+
+ quota := xfs.Quota{
+ HardLimit: uint64(volume.Status.TotalCapacity),
+ SoftLimit: uint64(volume.Status.TotalCapacity),
+ }
+ if err := xfs.SetQuota(ctx, "/dev/"+string(destDrive.GetDriveName()), destPath, volume.Name, quota, false); err != nil {
+ return fmt.Errorf("unable to set quota on volume data path; %w", err)
+ }
+
+ ctxWitCancel, cancel := context.WithCancel(ctx)
+ defer func() {
+ cancel()
+ printProgress(ctx, srcDrive, destDrive, volume)
+ }()
+ go func() {
+ logProgress(ctxWitCancel, srcDrive, destDrive, volume)
+ }()
+
+ return copyData(sourcePath, destPath)
+}
+
+func printProgress(ctx context.Context, srcDrive, destDrive *types.Drive, volume *types.Volume) error {
+ sourceQ, err := xfs.GetQuota(ctx, "/dev/"+string(srcDrive.GetDriveName()), volume.Name)
+ if err != nil {
+ klog.ErrorS(err, "unable to get quota of the source drive", "source drive", srcDrive.GetDriveName(), "volume", volume.Name)
+ return err
+ }
+ destQ, err := xfs.GetQuota(ctx, "/dev/"+string(destDrive.GetDriveName()), volume.Name)
+ if err != nil {
+ klog.ErrorS(err, "unable to get quota of the destination drive", "destination drive", destDrive.GetDriveName(), "volume", volume.Name)
+ return err
+ }
+ fmt.Printf("\nCopied %v/%v", humanize.IBytes(destQ.CurrentSpace), humanize.IBytes(sourceQ.CurrentSpace))
+ return nil
+}
+
+func logProgress(ctx context.Context, srcDrive, destDrive *types.Drive, volume *types.Volume) {
+ ticker := time.NewTicker(10 * time.Second)
+ defer ticker.Stop()
+
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case <-ticker.C:
+ if err := printProgress(ctx, srcDrive, destDrive, volume); err != nil {
+ return
+ }
+ }
+ }
+}
+
+func copyData(source, destination string) error {
+ visitFn := func(f string, fi os.FileInfo, _ error) error {
+ targetPath := filepath.Join(destination, strings.TrimPrefix(f, source))
+ switch {
+ case fi.Mode()&os.ModeDir != 0:
+ return os.MkdirAll(targetPath, fi.Mode().Perm())
+ case fi.Mode()&os.ModeType == 0:
+ if targetFi, err := os.Lstat(targetPath); err == nil {
+ if targetFi.ModTime().Equal(fi.ModTime()) && targetFi.Size() == fi.Size() {
+ return nil
+ }
+ }
+ reader, err := os.Open(f)
+ if err != nil {
+ return err
+ }
+ writer, err := os.OpenFile(targetPath, os.O_RDWR|os.O_CREATE, 0o755)
+ if err != nil {
+ return err
+ }
+ if _, err := io.CopyN(writer, reader, fi.Size()); err != nil {
+ return err
+ }
+ stat, ok := fi.Sys().(*syscall.Stat_t)
+ if !ok {
+ return fmt.Errorf("unable to get the stat information for %v", f)
+ }
+ if err := os.Chown(targetPath, int(stat.Uid), int(stat.Gid)); err != nil {
+ return fmt.Errorf("unable to set UID and GID to path %v; %v", targetPath, err)
+ }
+ if err := os.Chmod(targetPath, fi.Mode().Perm()); err != nil {
+ return fmt.Errorf("unable to chmod on path %v; %v", targetPath, err)
+ }
+ return os.Chtimes(targetPath, fi.ModTime(), fi.ModTime())
+ case fi.Mode()&os.ModeSymlink != 0:
+ // ToDo: Handle symlink
+ return nil
+ default:
+ // unsupported modes
+ return nil
+ }
+ }
+ return xfilepath.Walk(source, visitFn)
+}
diff --git a/cmd/directpv/main.go b/cmd/directpv/main.go
index 60902ad25..022f62df6 100644
--- a/cmd/directpv/main.go
+++ b/cmd/directpv/main.go
@@ -128,6 +128,7 @@ func init() {
mainCmd.AddCommand(legacyControllerCmd)
mainCmd.AddCommand(legacyNodeServerCmd)
mainCmd.AddCommand(nodeControllerCmd)
+ mainCmd.AddCommand(copyCmd)
}
func main() {
diff --git a/cmd/kubectl-directpv/clean.go b/cmd/kubectl-directpv/clean.go
index 4afd69d37..87ca64b83 100644
--- a/cmd/kubectl-directpv/clean.go
+++ b/cmd/kubectl-directpv/clean.go
@@ -23,6 +23,7 @@ import (
"os"
"strings"
+ directpvtypes "github.com/minio/directpv/pkg/apis/directpv.min.io/types"
"github.com/minio/directpv/pkg/client"
"github.com/minio/directpv/pkg/consts"
"github.com/minio/directpv/pkg/k8s"
@@ -152,6 +153,9 @@ func cleanMain(ctx context.Context) {
List(ctx)
matchFunc := func(volume *types.Volume) bool {
+ if volume.Status.Status == directpvtypes.VolumeStatusCopying {
+ return false
+ }
pv, err := k8s.KubeClient().CoreV1().PersistentVolumes().Get(ctx, volume.Name, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
diff --git a/cmd/kubectl-directpv/flags.go b/cmd/kubectl-directpv/flags.go
index ffede0a2d..7c8bd3e97 100644
--- a/cmd/kubectl-directpv/flags.go
+++ b/cmd/kubectl-directpv/flags.go
@@ -24,6 +24,7 @@ import (
"github.com/minio/directpv/pkg/apis/directpv.min.io/types"
directpvtypes "github.com/minio/directpv/pkg/apis/directpv.min.io/types"
"github.com/minio/directpv/pkg/ellipsis"
+ "github.com/minio/directpv/pkg/jobs"
"github.com/minio/directpv/pkg/utils"
"github.com/spf13/cobra"
)
@@ -43,6 +44,16 @@ var volumeStatusValues = []string{
strings.ToLower(string(directpvtypes.VolumeStatusReady)),
}
+var jobStatusValues = []string{
+ string(jobs.JobStatusActive),
+ string(jobs.JobStatusFailed),
+ string(jobs.JobStatusSucceeded),
+}
+
+var jobTypeValues = []string{
+ string(jobs.JobTypeCopy),
+}
+
var (
kubeconfig string // --kubeconfig flag
quietFlag bool // --quiet flag
@@ -55,7 +66,9 @@ var (
driveIDArgs []string // --drive-id flag
podNameArgs []string // --pod-name flag
podNSArgs []string // --pod-namespace flag
- volumeStatusArgs []string // --status flag of volumes
+ volumeStatusArgs []string // --status flag for volumes
+ jobStatusArgs []string // --status flag for jobs
+ jobTypeArgs []string // --type flag for jobs
pvcFlag bool // --pvc flag
dryRunFlag bool // --dry-run flag
idArgs []string // --id flag
@@ -108,6 +121,14 @@ func addVolumeStatusFlag(cmd *cobra.Command, usage string) {
cmd.PersistentFlags().StringSliceVar(&volumeStatusArgs, "status", volumeStatusArgs, fmt.Sprintf("%v; one of: %v", usage, strings.Join(volumeStatusValues, "|")))
}
+func addJobsStatusFlag(cmd *cobra.Command, usage string) {
+ cmd.PersistentFlags().StringSliceVar(&jobStatusArgs, "status", jobStatusArgs, fmt.Sprintf("%v; one of: %v", usage, strings.Join(jobStatusValues, "|")))
+}
+
+func addJobsTypeFlag(cmd *cobra.Command, usage string) {
+ cmd.PersistentFlags().StringSliceVar(&jobTypeArgs, "type", jobTypeArgs, fmt.Sprintf("%v; one of: %v", usage, strings.Join(jobTypeValues, "|")))
+}
+
func addIDFlag(cmd *cobra.Command, usage string) {
cmd.PersistentFlags().StringSliceVar(&idArgs, "ids", idArgs, usage)
}
@@ -136,6 +157,8 @@ var (
driveIDSelectors []directpvtypes.DriveID
volumeStatusSelectors []directpvtypes.VolumeStatus
labelSelectors map[directpvtypes.LabelKey]directpvtypes.LabelValue
+ jobStatusSelectors []jobs.JobStatus
+ jobTypeSelectors []jobs.JobType
dryRunPrinter func(interface{})
)
@@ -243,15 +266,23 @@ func validatePodNSArgs() error {
}
func validateVolumeNameArgs() error {
- for i := range volumeNameArgs {
- volumeNameArgs[i] = strings.TrimSpace(volumeNameArgs[i])
- if volumeNameArgs[i] == "" {
- return fmt.Errorf("empty volume name")
+ return validateNameArgs(volumeNameArgs)
+}
+
+func validateNameArgs(args []string) error {
+ for i := range args {
+ args[i] = strings.TrimSpace(args[i])
+ if args[i] == "" {
+ return fmt.Errorf("empty name")
}
}
return nil
}
+func validateJobNameArgs() error {
+ return validateNameArgs(jobNameArgs)
+}
+
func validateVolumeStatusArgs() error {
for i := range volumeStatusArgs {
volumeStatusArgs[i] = strings.TrimSpace(volumeStatusArgs[i])
@@ -264,6 +295,30 @@ func validateVolumeStatusArgs() error {
return nil
}
+func validateJobStatusArgs() error {
+ for i := range jobStatusArgs {
+ jobStatusArgs[i] = strings.TrimSpace(jobStatusArgs[i])
+ status, err := jobs.ToStatus(jobStatusArgs[i])
+ if err != nil {
+ return err
+ }
+ jobStatusSelectors = append(jobStatusSelectors, status)
+ }
+ return nil
+}
+
+func validateJobTypeArgs() error {
+ for i := range jobTypeArgs {
+ jobTypeArgs[i] = strings.ToLower(strings.TrimSpace(jobTypeArgs[i]))
+ jobType, err := jobs.ToType(jobTypeArgs[i])
+ if err != nil {
+ return err
+ }
+ jobTypeSelectors = append(jobTypeSelectors, jobType)
+ }
+ return nil
+}
+
func validateLabelArgs() error {
if labelSelectors == nil {
labelSelectors = make(map[directpvtypes.LabelKey]directpvtypes.LabelValue)
diff --git a/cmd/kubectl-directpv/list.go b/cmd/kubectl-directpv/list.go
index fe0756e29..74dacb87e 100644
--- a/cmd/kubectl-directpv/list.go
+++ b/cmd/kubectl-directpv/list.go
@@ -27,7 +27,7 @@ import (
var listCmd = &cobra.Command{
Use: "list",
- Short: "List drives and volumes",
+ Short: fmt.Sprintf("List %s resources", consts.AppPrettyName),
PersistentPreRunE: func(cmd *cobra.Command, args []string) error {
if parent := cmd.Parent(); parent != nil {
parent.PersistentPreRunE(parent, args)
@@ -40,12 +40,12 @@ func init() {
setFlagOpts(listCmd)
addNodesFlag(listCmd, "Filter output by nodes")
- addDrivesFlag(listCmd, "Filter output by drive names")
addOutputFormatFlag(listCmd, "Output format. One of: json|yaml|wide")
listCmd.PersistentFlags().BoolVar(&noHeaders, "no-headers", noHeaders, "When using the default or custom-column output format, don't print headers (default print headers)")
listCmd.AddCommand(listDrivesCmd)
listCmd.AddCommand(listVolumesCmd)
+ listCmd.AddCommand(listJobsCmd)
}
func validateListCmd() error {
@@ -55,9 +55,6 @@ func validateListCmd() error {
if err := validateNodeArgs(); err != nil {
return err
}
- if err := validateDriveNameArgs(); err != nil {
- return err
- }
return validateLabelArgs()
}
diff --git a/cmd/kubectl-directpv/list_drives.go b/cmd/kubectl-directpv/list_drives.go
index 5f6dcd9a5..6cea755d2 100644
--- a/cmd/kubectl-directpv/list_drives.go
+++ b/cmd/kubectl-directpv/list_drives.go
@@ -79,6 +79,7 @@ var listDrivesCmd = &cobra.Command{
func init() {
setFlagOpts(listDrivesCmd)
+ addDrivesFlag(listDrivesCmd, "Filter output by drive names")
addDriveStatusFlag(listDrivesCmd, "Filter output by drive status")
addShowLabelsFlag(listDrivesCmd)
addLabelsFlag(listDrivesCmd, "Filter output by drive labels")
@@ -86,6 +87,10 @@ func init() {
}
func validateListDrivesArgs() error {
+ if err := validateDriveNameArgs(); err != nil {
+ return err
+ }
+
if err := validateDriveStatusArgs(); err != nil {
return err
}
diff --git a/cmd/kubectl-directpv/list_jobs.go b/cmd/kubectl-directpv/list_jobs.go
new file mode 100644
index 000000000..41afe800f
--- /dev/null
+++ b/cmd/kubectl-directpv/list_jobs.go
@@ -0,0 +1,168 @@
+// This file is part of MinIO DirectPV
+// Copyright (c) 2023 MinIO, Inc.
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package main
+
+import (
+ "context"
+ "os"
+ "strings"
+
+ "github.com/jedib0t/go-pretty/v6/table"
+ "github.com/minio/directpv/pkg/consts"
+ "github.com/minio/directpv/pkg/jobs"
+ "github.com/minio/directpv/pkg/utils"
+ "github.com/spf13/cobra"
+ batchv1 "k8s.io/api/batch/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+)
+
+var jobNameArgs []string
+
+var listJobsCmd = &cobra.Command{
+ Use: "jobs [JOB ...]",
+ Short: "List jobs",
+ SilenceUsage: true,
+ SilenceErrors: true,
+ Example: strings.ReplaceAll(
+ `1. List all jobs
+ $ kubectl {PLUGIN_NAME} list jobs
+
+2. List jobs by a node
+ $ kubectl {PLUGIN_NAME} list jobs --nodes=node1
+
+3. List jobs by type
+ $ kubectl {PLUGIN_NAME} list jobs --type=copy
+
+3. List jobs filtered by labels
+ $ kubectl {PLUGIN_NAME} list jobs --labels type=copy`,
+ `{PLUGIN_NAME}`,
+ consts.AppName,
+ ),
+ Run: func(c *cobra.Command, args []string) {
+ jobNameArgs = args
+ if err := validateListJobsArgs(); err != nil {
+ utils.Eprintf(quietFlag, true, "%v\n", err)
+ os.Exit(-1)
+ }
+
+ listJobsMain(c.Context())
+ },
+}
+
+func init() {
+ setFlagOpts(listJobsCmd)
+
+ addJobsTypeFlag(listJobsCmd, "Filter output by job type")
+ addJobsStatusFlag(listJobsCmd, "Filter output by job status")
+ addShowLabelsFlag(listJobsCmd)
+ addLabelsFlag(listJobsCmd, "Filter output by job labels")
+}
+
+func validateListJobsArgs() error {
+ if err := validateJobNameArgs(); err != nil {
+ return err
+ }
+
+ if err := validateJobStatusArgs(); err != nil {
+ return err
+ }
+
+ return validateJobTypeArgs()
+}
+
+func listJobsMain(ctx context.Context) {
+ jobObjects, err := jobs.NewLister().
+ JobNameSelector(jobNameArgs).
+ NodeSelector(toLabelValues(nodesArgs)).
+ StatusSelector(jobStatusSelectors).
+ TypeSelector(jobTypeSelectors).
+ LabelSelector(labelSelectors).
+ Get(ctx)
+ if err != nil {
+ utils.Eprintf(quietFlag, true, "%v\n", err)
+ os.Exit(1)
+ }
+
+ if dryRunPrinter != nil {
+ jobList := batchv1.JobList{
+ TypeMeta: metav1.TypeMeta{
+ Kind: "List",
+ APIVersion: "v1",
+ },
+ Items: jobObjects,
+ }
+ dryRunPrinter(jobList)
+ return
+ }
+
+ headers := table.Row{
+ "JOB",
+ "TYPE",
+ "NODE",
+ "STATUS",
+ }
+ if showLabels {
+ headers = append(headers, "LABELS")
+ }
+ writer := newTableWriter(
+ headers,
+ []table.SortBy{
+ {
+ Name: "JOB",
+ Mode: table.Asc,
+ },
+ {
+ Name: "NODE",
+ Mode: table.Asc,
+ },
+ {
+ Name: "STATUS",
+ Mode: table.Asc,
+ },
+ {
+ Name: "TYPE",
+ Mode: table.Asc,
+ },
+ },
+ noHeaders)
+
+ for _, job := range jobObjects {
+ row := []interface{}{
+ job.Name,
+ jobs.GetType(job),
+ printableString(jobs.GetNode(job)),
+ jobs.GetStatus(job),
+ }
+ if showLabels {
+ row = append(row, labelsToString(job.GetLabels()))
+ }
+ writer.AppendRow(row)
+ }
+
+ if writer.Length() > 0 {
+ writer.Render()
+ return
+ }
+
+ if allFlag {
+ utils.Eprintf(quietFlag, false, "No resources found\n")
+ } else {
+ utils.Eprintf(quietFlag, false, "No matching resources found\n")
+ }
+
+ os.Exit(1)
+}
diff --git a/cmd/kubectl-directpv/list_volumes.go b/cmd/kubectl-directpv/list_volumes.go
index b86bd694b..b9478a097 100644
--- a/cmd/kubectl-directpv/list_volumes.go
+++ b/cmd/kubectl-directpv/list_volumes.go
@@ -87,6 +87,7 @@ var listVolumesCmd = &cobra.Command{
func init() {
setFlagOpts(listVolumesCmd)
+ addDrivesFlag(listVolumesCmd, "Filter output by drive names")
addDriveIDFlag(listVolumesCmd, "Filter output by drive IDs")
addPodNameFlag(listVolumesCmd, "Filter output by pod names")
addPodNSFlag(listVolumesCmd, "Filter output by pod namespaces")
@@ -98,6 +99,10 @@ func init() {
}
func validateListVolumesArgs() error {
+ if err := validateDriveNameArgs(); err != nil {
+ return err
+ }
+
if err := validateDriveIDArgs(); err != nil {
return err
}
diff --git a/cmd/kubectl-directpv/main.go b/cmd/kubectl-directpv/main.go
index 26a470251..af3c57fe9 100644
--- a/cmd/kubectl-directpv/main.go
+++ b/cmd/kubectl-directpv/main.go
@@ -154,6 +154,7 @@ Use "{{.CommandPath}} [command] --help" for more information about this command.
mainCmd.AddCommand(suspendCmd)
mainCmd.AddCommand(resumeCmd)
mainCmd.AddCommand(removeCmd)
+ mainCmd.AddCommand(purgeCmd)
mainCmd.AddCommand(uninstallCmd)
mainCmd.SetHelpCommand(&cobra.Command{
Hidden: true,
diff --git a/cmd/kubectl-directpv/move.go b/cmd/kubectl-directpv/move.go
index a1f7eea75..f1bbb3ff5 100644
--- a/cmd/kubectl-directpv/move.go
+++ b/cmd/kubectl-directpv/move.go
@@ -22,48 +22,67 @@ import (
"os"
"strings"
+ "github.com/fatih/color"
directpvtypes "github.com/minio/directpv/pkg/apis/directpv.min.io/types"
"github.com/minio/directpv/pkg/client"
"github.com/minio/directpv/pkg/consts"
+ "github.com/minio/directpv/pkg/jobs"
"github.com/minio/directpv/pkg/types"
"github.com/minio/directpv/pkg/utils"
"github.com/minio/directpv/pkg/volume"
"github.com/spf13/cobra"
+ apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/client-go/util/retry"
)
-var moveCmd = &cobra.Command{
- Use: "move SRC-DRIVE DEST-DRIVE",
- Aliases: []string{"mv"},
- SilenceUsage: true,
- SilenceErrors: true,
- Short: "Move volumes excluding data from source drive to destination drive on a same node",
- Example: strings.ReplaceAll(
- `1. Move volumes from drive af3b8b4c-73b4-4a74-84b7-1ec30492a6f0 to drive 834e8f4c-14f4-49b9-9b77-e8ac854108d5
- $ kubectl {PLUGIN_NAME} drives move af3b8b4c-73b4-4a74-84b7-1ec30492a6f0 834e8f4c-14f4-49b9-9b77-e8ac854108d5`,
- `{PLUGIN_NAME}`,
- consts.AppName,
- ),
- Run: func(c *cobra.Command, args []string) {
- if len(args) != 2 {
- utils.Eprintf(quietFlag, true, "only one source and one destination drive must be provided\n")
- os.Exit(-1)
- }
+var (
+ withData, overwrite bool
+ moveCmd = &cobra.Command{
+ Use: "move SRC-DRIVE DEST-DRIVE",
+ Aliases: []string{"mv"},
+ SilenceUsage: true,
+ SilenceErrors: true,
+ Short: "Move volumes excluding data from source drive to destination drive on a same node",
+ Example: strings.ReplaceAll(
+ `1. Move volumes from drive af3b8b4c-73b4-4a74-84b7-1ec30492a6f0 to drive 834e8f4c-14f4-49b9-9b77-e8ac854108d5
+ $ kubectl {PLUGIN_NAME} drives move af3b8b4c-73b4-4a74-84b7-1ec30492a6f0 834e8f4c-14f4-49b9-9b77-e8ac854108d5
- src := strings.TrimSpace(args[0])
- if src == "" {
- utils.Eprintf(quietFlag, true, "empty source drive\n")
- os.Exit(-1)
- }
+2. Move volumes from drive af3b8b4c-73b4-4a74-84b7-1ec30492a6f0 to drive 834e8f4c-14f4-49b9-9b77-e8ac854108d5 with data
+$ kubectl {PLUGIN_NAME} drives move af3b8b4c-73b4-4a74-84b7-1ec30492a6f0 834e8f4c-14f4-49b9-9b77-e8ac854108d5 --with-data`,
+ `{PLUGIN_NAME}`,
+ consts.AppName,
+ ),
+ Run: func(c *cobra.Command, args []string) {
+ if len(args) != 2 {
+ utils.Eprintf(quietFlag, true, "only one source and one destination drive must be provided\n")
+ os.Exit(-1)
+ }
- dest := strings.TrimSpace(args[1])
- if dest == "" {
- utils.Eprintf(quietFlag, true, "empty destination drive\n")
- os.Exit(-1)
- }
+ src := strings.TrimSpace(args[0])
+ if src == "" {
+ utils.Eprintf(quietFlag, true, "empty source drive\n")
+ os.Exit(-1)
+ }
+
+ dest := strings.TrimSpace(args[1])
+ if dest == "" {
+ utils.Eprintf(quietFlag, true, "empty destination drive\n")
+ os.Exit(-1)
+ }
+
+ if overwrite && !withData {
+ utils.Eprintf(quietFlag, true, "'--overwrite' flag must be set only when '--with-data' flag is set")
+ }
+
+ moveMain(c.Context(), src, dest)
+ },
+ }
+)
- moveMain(c.Context(), src, dest)
- },
+func init() {
+ moveCmd.PersistentFlags().BoolVar(&withData, "with-data", withData, "move the volume with data")
+ moveCmd.PersistentFlags().BoolVar(&overwrite, "overwrite", overwrite, "overwrite any duplicate volume copy job if present")
}
func moveMain(ctx context.Context, src, dest string) {
@@ -133,7 +152,10 @@ func moveMain(ctx context.Context, src, dest string) {
os.Exit(1)
}
- if destDrive.Status.Status != directpvtypes.DriveStatusReady {
+ switch {
+ case destDrive.Status.Status == directpvtypes.DriveStatusReady:
+ case destDrive.Status.Status == directpvtypes.DriveStatusMoving && withData:
+ default:
utils.Eprintf(quietFlag, true, "destination drive is not in ready state\n")
os.Exit(1)
}
@@ -160,12 +182,51 @@ func moveMain(ctx context.Context, src, dest string) {
os.Exit(1)
}
+ var jobParams jobs.ContainerParams
+ if withData {
+ jobParams.Image, jobParams.ImagePullSecrets, jobParams.Tolerations, err = getContainerParams(ctx)
+ if err != nil {
+ utils.Eprintf(quietFlag, true, "unable to get container params; %v", err)
+ os.Exit(1)
+ }
+ srcDrive.AddCopyProtectionFinalizer()
+ }
+
+ var processed bool
for _, volume := range volumes {
+ if withData {
+ if err := jobs.CreateCopyJob(ctx, jobs.CopyOpts{
+ SourceDriveID: srcDrive.GetDriveID(),
+ DestinationDriveID: destDrive.GetDriveID(),
+ VolumeID: volume.Name,
+ NodeID: srcDrive.GetNodeID(),
+ }, jobParams, overwrite); err != nil {
+ if apierrors.IsAlreadyExists(err) && !overwrite {
+ utils.Eprintf(quietFlag, false, "duplicate job found for %v; Please use `--overwrite` for this volume to be moved", volume.Name)
+ }
+ utils.Eprintf(quietFlag, true, "unable to create copy job for volume %v; %v", volume.Name, err)
+ continue
+ }
+ if err := setVolumeStatusCopying(ctx, volume.Name); err != nil {
+ utils.Eprintf(quietFlag, true, "unable to set volume status moving; %v", err)
+ os.Exit(1)
+ }
+ }
+ processed = true
+ if !quietFlag {
+ fmt.Println("Moving volume ", volume.Name)
+ }
if destDrive.AddVolumeFinalizer(volume.Name) {
destDrive.Status.FreeCapacity -= volume.Status.TotalCapacity
destDrive.Status.AllocatedCapacity += volume.Status.TotalCapacity
}
+ srcDrive.RemoveVolumeFinalizer(volume.Name)
}
+
+ if !processed {
+ os.Exit(1)
+ }
+
destDrive.Status.Status = directpvtypes.DriveStatusMoving
_, err = client.DriveClient().Update(
ctx, destDrive, metav1.UpdateOptions{TypeMeta: types.NewDriveTypeMeta()},
@@ -175,13 +236,6 @@ func moveMain(ctx context.Context, src, dest string) {
os.Exit(1)
}
- for _, volume := range volumes {
- if !quietFlag {
- fmt.Println("Moving volume", volume.Name)
- }
- }
-
- srcDrive.ResetFinalizers()
_, err = client.DriveClient().Update(
ctx, srcDrive, metav1.UpdateOptions{TypeMeta: types.NewDriveTypeMeta()},
)
@@ -189,4 +243,25 @@ func moveMain(ctx context.Context, src, dest string) {
utils.Eprintf(quietFlag, true, "unable to remove volume references in source drive; %v\n", err)
os.Exit(1)
}
+
+ if withData && !quietFlag {
+ color.HiGreen("Jobs created successfully to copy the volume data. Please uncordon the node to get the jobs scheduled")
+ }
+}
+
+func setVolumeStatusCopying(ctx context.Context, volumeName string) error {
+ updateFunc := func() error {
+ volume, err := client.VolumeClient().Get(ctx, volumeName, metav1.GetOptions{
+ TypeMeta: types.NewVolumeTypeMeta(),
+ })
+ if err != nil {
+ return err
+ }
+ volume.Status.Status = directpvtypes.VolumeStatusCopying
+ _, err = client.VolumeClient().Update(ctx, volume, metav1.UpdateOptions{
+ TypeMeta: types.NewVolumeTypeMeta(),
+ })
+ return err
+ }
+ return retry.RetryOnConflict(retry.DefaultRetry, updateFunc)
}
diff --git a/cmd/kubectl-directpv/purge.go b/cmd/kubectl-directpv/purge.go
new file mode 100644
index 000000000..ad369378f
--- /dev/null
+++ b/cmd/kubectl-directpv/purge.go
@@ -0,0 +1,49 @@
+// This file is part of MinIO DirectPV
+// Copyright (c) 2023 MinIO, Inc.
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package main
+
+import (
+ "fmt"
+
+ "github.com/minio/directpv/pkg/consts"
+ "github.com/spf13/cobra"
+)
+
+var purgeCmd = &cobra.Command{
+ Use: "purge",
+ Short: fmt.Sprintf("purge %v resources", consts.AppPrettyName),
+ PersistentPreRunE: func(cmd *cobra.Command, args []string) error {
+ if parent := cmd.Parent(); parent != nil {
+ parent.PersistentPreRunE(parent, args)
+ }
+ return validatePurgeCmd()
+ },
+}
+
+func init() {
+ setFlagOpts(purgeCmd)
+
+ addNodesFlag(purgeCmd, "If present, filter objects from given nodes")
+ addAllFlag(purgeCmd, "If present, select all objects")
+ addDryRunFlag(purgeCmd, "Run in dry run mode")
+
+ purgeCmd.AddCommand(purgeJobsCmd)
+}
+
+func validatePurgeCmd() error {
+ return validateNodeArgs()
+}
diff --git a/cmd/kubectl-directpv/purge_jobs.go b/cmd/kubectl-directpv/purge_jobs.go
new file mode 100644
index 000000000..470990aed
--- /dev/null
+++ b/cmd/kubectl-directpv/purge_jobs.go
@@ -0,0 +1,131 @@
+// This file is part of MinIO DirectPV
+// Copyright (c) 2023 MinIO, Inc.
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package main
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "os"
+ "strings"
+
+ "github.com/minio/directpv/pkg/consts"
+ "github.com/minio/directpv/pkg/jobs"
+ "github.com/minio/directpv/pkg/k8s"
+ "github.com/minio/directpv/pkg/utils"
+ "github.com/spf13/cobra"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+)
+
+var purgeJobsCmd = &cobra.Command{
+ Use: "jobs [JOB ...]",
+ Short: "Purge jobs",
+ SilenceUsage: true,
+ SilenceErrors: true,
+ Example: strings.ReplaceAll(
+ `1. Purge all jobs
+ $ kubectl {PLUGIN_NAME} purge jobs --all
+
+2. Purge jobs from a node
+ $ kubectl {PLUGIN_NAME} purge jobs --nodes=node1
+
+3. Purge jobs by type
+ $ kubectl {PLUGIN_NAME} purge jobs --type=copy
+
+3. Purge jobs filtered by labels
+ $ kubectl {PLUGIN_NAME} purge jobs --labels type=copy`,
+ `{PLUGIN_NAME}`,
+ consts.AppName,
+ ),
+ Run: func(c *cobra.Command, args []string) {
+ jobNameArgs = args
+ if err := validatePurgeJobsArgs(); err != nil {
+ utils.Eprintf(quietFlag, true, "%v\n", err)
+ os.Exit(-1)
+ }
+
+ purgeJobsMain(c.Context())
+ },
+}
+
+func init() {
+ setFlagOpts(purgeJobsCmd)
+
+ addJobsTypeFlag(purgeJobsCmd, "Filter output by job type")
+ addJobsStatusFlag(purgeJobsCmd, "Filter output by job status")
+ addLabelsFlag(purgeJobsCmd, "Filter output by job labels")
+ addDangerousFlag(purgeJobsCmd, "Set dangerous flag to forcefully purge active jobs")
+}
+
+func validatePurgeJobsArgs() error {
+ if err := validateListJobsArgs(); err != nil {
+ return err
+ }
+ if err := validateLabelArgs(); err != nil {
+ return err
+ }
+ switch {
+ case allFlag:
+ case len(nodesArgs) != 0:
+ case len(jobNameArgs) != 0:
+ case len(jobStatusArgs) != 0:
+ case len(jobTypeArgs) != 0:
+ case len(labelArgs) != 0:
+ default:
+ return errors.New("no jobs selected to purge")
+ }
+ if allFlag {
+ nodesArgs = nil
+ jobNameArgs = nil
+ jobStatusSelectors = nil
+ jobTypeSelectors = nil
+ labelSelectors = nil
+ }
+ return nil
+}
+
+func purgeJobsMain(ctx context.Context) {
+ resultCh := jobs.NewLister().
+ JobNameSelector(jobNameArgs).
+ NodeSelector(toLabelValues(nodesArgs)).
+ StatusSelector(jobStatusSelectors).
+ TypeSelector(jobTypeSelectors).
+ LabelSelector(labelSelectors).
+ List(ctx)
+ for result := range resultCh {
+ if result.Err != nil {
+ utils.Eprintf(quietFlag, true, "%v\n", result.Err)
+ os.Exit(1)
+ }
+ switch jobs.GetStatus(result.Job) {
+ case jobs.JobStatusActive:
+ if !dangerousFlag {
+ utils.Eprintf(quietFlag, true, "Purging the active job may lead to partial data; Please use `--dangerous` to purge the job %v", result.Job.Name)
+ continue
+ }
+ case jobs.JobStatusSucceeded, jobs.JobStatusFailed:
+ }
+ if !dryRunFlag {
+ if err := k8s.KubeClient().BatchV1().Jobs(consts.AppNamespace).Delete(ctx, result.Job.Name, metav1.DeleteOptions{}); err != nil {
+ utils.Eprintf(quietFlag, true, "unable to delete job %v: %v\n", result.Job.Name, err)
+ }
+ }
+ if !quietFlag {
+ fmt.Printf("Job '%s' purged successfully \n", result.Job.Name)
+ }
+ }
+}
diff --git a/cmd/kubectl-directpv/remove.go b/cmd/kubectl-directpv/remove.go
index 39c9d69e5..0c54139db 100644
--- a/cmd/kubectl-directpv/remove.go
+++ b/cmd/kubectl-directpv/remove.go
@@ -134,6 +134,10 @@ func removeMain(ctx context.Context) {
os.Exit(1)
}
+ if result.Drive.IsCopyProtected() {
+ continue
+ }
+
processed = true
switch result.Drive.Status.Status {
case directpvtypes.DriveStatusRemoved:
diff --git a/cmd/kubectl-directpv/utils.go b/cmd/kubectl-directpv/utils.go
index d5363dbd5..5cfa57694 100644
--- a/cmd/kubectl-directpv/utils.go
+++ b/cmd/kubectl-directpv/utils.go
@@ -31,8 +31,10 @@ import (
"github.com/minio/directpv/pkg/k8s"
"github.com/minio/directpv/pkg/utils"
"github.com/mitchellh/go-homedir"
+ corev1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
storagev1beta1 "k8s.io/api/storage/v1beta1"
+ apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/klog/v2"
@@ -186,3 +188,27 @@ func getCSINodes(ctx context.Context) (nodes []string, err error) {
return nodes, err
}
+
+func getContainerParams(ctx context.Context) (string, []corev1.LocalObjectReference, []corev1.Toleration, error) {
+ daemonSet, err := k8s.KubeClient().AppsV1().DaemonSets(consts.AppName).Get(
+ ctx, consts.NodeServerName, metav1.GetOptions{},
+ )
+
+ if err != nil && !apierrors.IsNotFound(err) {
+ return "", nil, nil, err
+ }
+
+ if daemonSet == nil || daemonSet.UID == "" {
+ return "", nil, nil, fmt.Errorf("invalid daemonset found")
+ }
+
+ var containerImage string
+ for _, container := range daemonSet.Spec.Template.Spec.Containers {
+ if container.Name == consts.NodeServerName {
+ containerImage = container.Image
+ break
+ }
+ }
+
+ return containerImage, daemonSet.Spec.Template.Spec.ImagePullSecrets, daemonSet.Spec.Template.Spec.Tolerations, nil
+}
diff --git a/docs/tools/replace.sh b/docs/tools/replace.sh
index 8fe117b4f..2527e0e79 100755
--- a/docs/tools/replace.sh
+++ b/docs/tools/replace.sh
@@ -126,6 +126,8 @@ function main() {
dest_drive="${2#/dev/}"
node="${3}"
+ shift 3
+
if [ "${src_drive}" == "${dest_drive}" ]; then
echo "the source and destination drives are same"
exit 255
@@ -201,7 +203,7 @@ function main() {
fi
# Run move command
- kubectl directpv move "${src_drive_id}" "${dest_drive_id}"
+ kubectl directpv move "${src_drive_id}" "${dest_drive_id}" "$@"
# Uncordon destination drive
kubectl directpv uncordon "${dest_drive_id}"
diff --git a/go.mod b/go.mod
index 133accff8..0de12a414 100644
--- a/go.mod
+++ b/go.mod
@@ -20,7 +20,7 @@ require (
github.com/spf13/cobra v1.7.0
github.com/spf13/viper v1.16.0
golang.org/x/time v0.3.0
- google.golang.org/grpc v1.58.2
+ google.golang.org/grpc v1.59.0
gopkg.in/yaml.v3 v3.0.1
k8s.io/api v0.28.2
k8s.io/apiextensions-apiserver v0.28.2
@@ -66,6 +66,7 @@ require (
github.com/mattn/go-localereader v0.0.1 // indirect
github.com/mattn/go-runewidth v0.0.15 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
+ github.com/minio/filepath v1.0.0 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
@@ -85,13 +86,13 @@ require (
github.com/spf13/pflag v1.0.5 // indirect
github.com/subosito/gotenv v1.6.0 // indirect
golang.org/x/net v0.17.0 // indirect
- golang.org/x/oauth2 v0.10.0 // indirect
+ golang.org/x/oauth2 v0.11.0 // indirect
golang.org/x/sync v0.3.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/term v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
google.golang.org/appengine v1.6.8 // indirect
- google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98 // indirect
+ google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
diff --git a/go.sum b/go.sum
index a7b1cfbf1..483eb7629 100644
--- a/go.sum
+++ b/go.sum
@@ -231,6 +231,8 @@ github.com/mattn/go-runewidth v0.0.15 h1:UNAjwbU9l54TA3KzvqLGxwWjHmMgBUVhBiTjelZ
github.com/mattn/go-runewidth v0.0.15/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo=
github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=
+github.com/minio/filepath v1.0.0 h1:fvkJu1+6X+ECRA6G3+JJETj4QeAYO9sV43I79H8ubDY=
+github.com/minio/filepath v1.0.0/go.mod h1:/nRZA2ldl5z6jT9/KQuvZcQlxZIMQoFFQPvEXx9T/Bw=
github.com/minio/sha256-simd v1.0.1 h1:6kaan5IFmwTNynnKKpDHe6FWHohJOHhCPchzK49dzMM=
github.com/minio/sha256-simd v1.0.1/go.mod h1:Pz6AKMiUdngCLpeTL/RJY1M9rUuPMYujV5xJjtbRSN8=
github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
@@ -409,6 +411,8 @@ golang.org/x/oauth2 v0.0.0-20201208152858-08078c50e5b5/go.mod h1:KelEdhl1UZF7XfJ
golang.org/x/oauth2 v0.0.0-20210218202405-ba52d332ba99/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.10.0 h1:zHCpF2Khkwy4mMB4bv0U37YtJdTGW8jI0glAApi0Kh8=
golang.org/x/oauth2 v0.10.0/go.mod h1:kTpgurOux7LqtuxjuyZa4Gj2gdezIt/jQtGnNFfypQI=
+golang.org/x/oauth2 v0.11.0 h1:vPL4xzxBM4niKCW6g9whtaWVXTJf1U5e4aZxxFx/gbU=
+golang.org/x/oauth2 v0.11.0/go.mod h1:LdF7O/8bLR/qWK9DrpXmbHLTouvRHK0SgJl0GmDBchk=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@@ -608,6 +612,8 @@ google.golang.org/genproto v0.0.0-20210108203827-ffc7fda8c3d7/go.mod h1:FWY/as6D
google.golang.org/genproto v0.0.0-20210226172003-ab064af71705/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98 h1:bVf09lpb+OJbByTj913DRJioFFAjf/ZGxEz7MajTp2U=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98/go.mod h1:TUfxEVdsvPg18p6AslUXFoLdpED4oBnGwyqk3dV1XzM=
+google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d h1:uvYuEyMHKNt+lT4K3bN6fGswmK8qSvcreM3BwjDh+y4=
+google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d/go.mod h1:+Bk1OCOj40wS2hwAMA+aCW9ypzm63QTBBHp6lQ3p+9M=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
@@ -626,6 +632,8 @@ google.golang.org/grpc v1.34.0/go.mod h1:WotjhfgOW/POjDeRt8vscBtXq+2VjORFy659qA5
google.golang.org/grpc v1.35.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
google.golang.org/grpc v1.58.2 h1:SXUpjxeVF3FKrTYQI4f4KvbGD5u2xccdYdurwowix5I=
google.golang.org/grpc v1.58.2/go.mod h1:tgX3ZQDlNJGU96V6yHh1T/JeoBQ2TXdr43YbYSsCJk0=
+google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk=
+google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
diff --git a/pkg/apis/directpv.min.io/types/label.go b/pkg/apis/directpv.min.io/types/label.go
index c4388b5ff..14538523e 100644
--- a/pkg/apis/directpv.min.io/types/label.go
+++ b/pkg/apis/directpv.min.io/types/label.go
@@ -88,6 +88,18 @@ const (
// ClaimIDLabelKey label key to denote the claim id of the volumes
ClaimIDLabelKey LabelKey = consts.GroupName + "/claim-id"
+
+ // JobTypeLabelKey denotes the type of the job
+ JobTypeLabelKey LabelKey = consts.GroupName + "/job-type"
+
+ // SourceDriveLabelKey denotes the source drive id
+ SourceDriveLabelKey LabelKey = consts.GroupName + "/source-drive"
+
+ // DestinationDriveLabelKey denotes the destination drive id
+ DestinationDriveLabelKey LabelKey = consts.GroupName + "/destination-drive"
+
+ // VolumeLabelKey denotes the volume name
+ VolumeLabelKey LabelKey = consts.GroupName + "/volume"
)
// LabelValue is a type definition for label value
diff --git a/pkg/apis/directpv.min.io/types/types.go b/pkg/apis/directpv.min.io/types/types.go
index 5c1d06f12..751df1275 100644
--- a/pkg/apis/directpv.min.io/types/types.go
+++ b/pkg/apis/directpv.min.io/types/types.go
@@ -69,6 +69,7 @@ type VolumeStatus string
const (
VolumeStatusPending VolumeStatus = "Pending"
VolumeStatusReady VolumeStatus = "Ready"
+ VolumeStatusCopying VolumeStatus = "Copying"
)
// ToVolumeStatus converts string value to VolumeStatus.
diff --git a/pkg/apis/directpv.min.io/v1beta1/drive.go b/pkg/apis/directpv.min.io/v1beta1/drive.go
index ab0cc753f..bd32dc9d5 100644
--- a/pkg/apis/directpv.min.io/v1beta1/drive.go
+++ b/pkg/apis/directpv.min.io/v1beta1/drive.go
@@ -122,8 +122,13 @@ func (drive DirectPVDrive) GetDriveID() types.DriveID {
}
// GetVolumeCount returns number of volumes on this drive.
-func (drive DirectPVDrive) GetVolumeCount() int {
- return len(drive.Finalizers) - 1
+func (drive DirectPVDrive) GetVolumeCount() (count int) {
+ for _, finalizer := range drive.Finalizers {
+ if strings.HasPrefix(finalizer, driveFinalizerVolumePrefix) {
+ count++
+ }
+ }
+ return
}
// VolumeExist returns whether given volume is on this drive or not.
@@ -157,20 +162,49 @@ func (drive *DirectPVDrive) RemoveFinalizers() bool {
// AddVolumeFinalizer adds volume to this drive's finalizer.
func (drive *DirectPVDrive) AddVolumeFinalizer(volume string) (added bool) {
- value := driveFinalizerVolumePrefix + volume
+ return drive.addFinalizer(driveFinalizerVolumePrefix + volume)
+}
+
+func (drive *DirectPVDrive) addFinalizer(value string) (added bool) {
for _, finalizer := range drive.Finalizers {
if finalizer == value {
return false
}
}
-
drive.Finalizers = append(drive.Finalizers, value)
return true
}
+// AddCopyProtectionFinalizer adds a finalizer to protect drive deletions
+// while the drive is being copied.
+func (drive *DirectPVDrive) AddCopyProtectionFinalizer() (added bool) {
+ return drive.addFinalizer(consts.CopyProtectionFinalizer)
+}
+
+// IsCopyProtected checks if copy protection finalizer is
+// present on the drive.
+func (drive *DirectPVDrive) IsCopyProtected() (added bool) {
+ value := consts.CopyProtectionFinalizer
+ for _, finalizer := range drive.Finalizers {
+ if finalizer == value {
+ return true
+ }
+ }
+ return false
+}
+
+// RemoveCopyProtectionFinalizer removes the copy protection finalizer
+// present on the drive
+func (drive *DirectPVDrive) RemoveCopyProtectionFinalizer() (found bool) {
+ return drive.removeFinalizer(consts.CopyProtectionFinalizer)
+}
+
// RemoveVolumeFinalizer remove volume from this drive's finalizer.
func (drive *DirectPVDrive) RemoveVolumeFinalizer(volume string) (found bool) {
- value := driveFinalizerVolumePrefix + volume
+ return drive.removeFinalizer(driveFinalizerVolumePrefix + volume)
+}
+
+func (drive *DirectPVDrive) removeFinalizer(value string) (found bool) {
finalizers := []string{}
for _, finalizer := range drive.Finalizers {
if finalizer == value {
@@ -179,11 +213,9 @@ func (drive *DirectPVDrive) RemoveVolumeFinalizer(volume string) (found bool) {
finalizers = append(finalizers, finalizer)
}
}
-
if found {
drive.Finalizers = finalizers
}
-
return
}
diff --git a/pkg/consts/consts.go b/pkg/consts/consts.go
index 9bee57e0f..6f2f50138 100644
--- a/pkg/consts/consts.go
+++ b/pkg/consts/consts.go
@@ -22,6 +22,9 @@ const (
// AppName denotes application/library/plugin/tool name
AppName = "directpv"
+ // AppNamespace denotes the namespace
+ AppNamespace = "directpv"
+
// AppPrettyName denotes application/library/plugin/tool pretty name
AppPrettyName = "DirectPV"
@@ -97,4 +100,26 @@ const (
// TmpFS mount
TmpMountDir = AppRootDir + "/tmp"
+
+ // LegacyAppRootDir is legacy application root directory.
+ LegacyAppRootDir = "/var/lib/direct-csi"
+
+ AppRootDirVolumeName = AppName + "-common-root"
+ AppRootDirVolumePath = AppRootDir + "/"
+
+ LegacyAppRootDirVolumeName = "direct-csi-common-root"
+ LegacyAppRootDirVolumePath = LegacyAppRootDir + "/"
+
+ SysDirVolumeName = "sysfs"
+ SysDirVolumePath = "/sys"
+
+ DevDirVolumeName = "devfs"
+ DevDirVolumePath = "/dev"
+
+ RunUdevDataVolumeName = "run-udev-data-dir"
+ RunUdevDataVolumePath = UdevDataDir
+
+ KubeNodeNameEnvVarName = "KUBE_NODE_NAME"
+
+ CopyProtectionFinalizer = GroupName + "/copy-protection"
)
diff --git a/pkg/consts/consts.go.in b/pkg/consts/consts.go.in
index bf50b159c..f70952638 100644
--- a/pkg/consts/consts.go.in
+++ b/pkg/consts/consts.go.in
@@ -20,6 +20,9 @@ const (
// AppName denotes application/library/plugin/tool name
AppName = "directpv"
+ // AppNamespace denotes the namespace
+ AppNamespace = "directpv"
+
// AppPrettyName denotes application/library/plugin/tool pretty name
AppPrettyName = "DirectPV"
@@ -95,4 +98,26 @@ const (
// TmpFS mount
TmpMountDir = AppRootDir + "/tmp"
+
+ // LegacyAppRootDir is legacy application root directory.
+ LegacyAppRootDir = "/var/lib/direct-csi"
+
+ AppRootDirVolumeName = AppName + "-common-root"
+ AppRootDirVolumePath = AppRootDir + "/"
+
+ LegacyAppRootDirVolumeName = "direct-csi-common-root"
+ LegacyAppRootDirVolumePath = LegacyAppRootDir + "/"
+
+ SysDirVolumeName = "sysfs"
+ SysDirVolumePath = "/sys"
+
+ DevDirVolumeName = "devfs"
+ DevDirVolumePath = "/dev"
+
+ RunUdevDataVolumeName = "run-udev-data-dir"
+ RunUdevDataVolumePath = UdevDataDir
+
+ KubeNodeNameEnvVarName = "KUBE_NODE_NAME"
+
+ CopyProtectionFinalizer = GroupName + "/copy-protection"
)
diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go
index f15722fb6..55298b25e 100644
--- a/pkg/controller/controller.go
+++ b/pkg/controller/controller.go
@@ -79,7 +79,15 @@ func New(name string, handler EventHandler, workers int, resyncPeriod time.Durat
handler.ListerWatcher(),
handler.ObjectType(),
resyncPeriod,
- cache.Indexers{},
+ cache.Indexers{
+ "objectname": func(obj interface{}) ([]string, error) {
+ key, err := cache.MetaNamespaceKeyFunc(obj)
+ if err != nil {
+ return nil, err
+ }
+ return []string{key}, nil
+ },
+ },
)
queue := workqueue.NewRateLimitingQueue(
diff --git a/pkg/csi/controller/server.go b/pkg/csi/controller/server.go
index 12bfd7bb0..f710e0130 100644
--- a/pkg/csi/controller/server.go
+++ b/pkg/csi/controller/server.go
@@ -270,6 +270,10 @@ func (c *Server) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest)
return nil, status.Errorf(codes.Internal, "unable to get volume %v; %v", volumeID, err)
}
+ if volume.Status.Status == directpvtypes.VolumeStatusCopying {
+ return nil, status.Errorf(codes.FailedPrecondition, "volume %s is busy copying the data", volumeID)
+ }
+
if volume.IsStaged() || volume.IsPublished() {
return nil, status.Errorf(codes.FailedPrecondition, "volume %v is not yet unstaged for deletion", volumeID)
}
diff --git a/pkg/csi/node/publish_unpublish.go b/pkg/csi/node/publish_unpublish.go
index 46b55deac..d89a687f1 100644
--- a/pkg/csi/node/publish_unpublish.go
+++ b/pkg/csi/node/publish_unpublish.go
@@ -120,6 +120,10 @@ func (server *Server) NodePublishVolume(ctx context.Context, req *csi.NodePublis
return nil, status.Errorf(codes.FailedPrecondition, "volume %v is not yet staged, but requested with %v", volume.Name, req.GetStagingTargetPath())
}
+ if volume.Status.Status == directpvtypes.VolumeStatusCopying {
+ return nil, status.Error(codes.FailedPrecondition, "volume is busy; copying the data from the source")
+ }
+
if err := server.publishVolume(req, isSuspended); err != nil {
klog.Errorf("unable to publish volume %s; %v", volume.Name, err)
return nil, status.Errorf(codes.Internal, "unable to publish volume; %v", err)
diff --git a/pkg/csi/node/stage_unstage.go b/pkg/csi/node/stage_unstage.go
index 059945ac5..1da576867 100644
--- a/pkg/csi/node/stage_unstage.go
+++ b/pkg/csi/node/stage_unstage.go
@@ -20,6 +20,7 @@ import (
"context"
"github.com/container-storage-interface/spec/lib/go/csi"
+ directpvtypes "github.com/minio/directpv/pkg/apis/directpv.min.io/types"
"github.com/minio/directpv/pkg/client"
"github.com/minio/directpv/pkg/drive"
"github.com/minio/directpv/pkg/types"
@@ -59,6 +60,10 @@ func (server *Server) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
return &csi.NodeStageVolumeResponse{}, nil
}
+ if volume.Status.Status == directpvtypes.VolumeStatusCopying {
+ return nil, status.Error(codes.FailedPrecondition, "volume is busy; copying the data from the source")
+ }
+
code, err := drive.StageVolume(
ctx,
volume,
diff --git a/pkg/drive/event.go b/pkg/drive/event.go
index 6c106346b..d903a6229 100644
--- a/pkg/drive/event.go
+++ b/pkg/drive/event.go
@@ -140,7 +140,9 @@ func StageVolume(
volume.Status.DataPath = volumeDir
volume.Status.StagingTargetPath = stagingTargetPath
- volume.Status.Status = directpvtypes.VolumeStatusReady
+ if volume.Status.Status != directpvtypes.VolumeStatusCopying {
+ volume.Status.Status = directpvtypes.VolumeStatusReady
+ }
if _, err := client.VolumeClient().Update(ctx, volume, metav1.UpdateOptions{
TypeMeta: types.NewVolumeTypeMeta(),
}); err != nil {
@@ -316,7 +318,7 @@ func (handler *driveEventHandler) move(ctx context.Context, drive *types.Drive)
"stagingTargetPath", volume.Status.StagingTargetPath,
)
}
- } else {
+ } else if volume.Status.Status != directpvtypes.VolumeStatusCopying {
volume.Status.Status = directpvtypes.VolumeStatusPending
}
diff --git a/pkg/installer/consts.go b/pkg/installer/consts.go
index 31a183696..12a285afd 100644
--- a/pkg/installer/consts.go
+++ b/pkg/installer/consts.go
@@ -25,11 +25,9 @@ const (
namespace = consts.AppName
healthZContainerPortName = "healthz"
healthZContainerPort = 9898
- volumePathSysDir = "/sys"
- volumeNameSocketDir = "socket-dir"
- socketDir = "/csi"
+ csiDirVolumeName = "socket-dir"
+ csiDirVolumePath = "/csi"
selectorKey = "selector." + consts.GroupName
- kubeNodeNameEnvVarName = "KUBE_NODE_NAME"
csiEndpointEnvVarName = "CSI_ENDPOINT"
pluginName = "kubectl-" + consts.AppName
selectorValueEnabled = "enabled"
diff --git a/pkg/installer/daemonset.go b/pkg/installer/daemonset.go
index 8e0677114..1deba0d31 100644
--- a/pkg/installer/daemonset.go
+++ b/pkg/installer/daemonset.go
@@ -31,20 +31,11 @@ import (
)
const (
- volumeNameMountpointDir = "mountpoint-dir"
- volumeNameRegistrationDir = "registration-dir"
- volumeNamePluginDir = "plugins-dir"
- volumeNameAppRootDir = consts.AppName + "-common-root"
- volumeNameLegacyAppRootDir = "direct-csi-common-root"
- appRootDir = consts.AppRootDir + "/"
- legacyAppRootDir = "/var/lib/direct-csi/"
- volumeNameSysDir = "sysfs"
- volumeNameDevDir = "devfs"
- volumePathDevDir = "/dev"
- volumeNameRunUdevData = "run-udev-data-dir"
- volumePathRunUdevData = consts.UdevDataDir
- socketFile = "/csi.sock"
- totalDaemonsetSteps = 2
+ kubeletPodsDirVolumeName = "mountpoint-dir"
+ registrationDirVolumeName = "registration-dir"
+ kubeletPluginsDirVolumeName = "plugins-dir"
+ socketFile = "/csi.sock"
+ totalDaemonsetSteps = 2
)
type daemonsetTask struct{}
@@ -94,25 +85,25 @@ func newSecurityContext(seccompProfile string) *corev1.SecurityContext {
func getVolumesAndMounts(pluginSocketDir string) (volumes []corev1.Volume, volumeMounts []corev1.VolumeMount) {
volumes = []corev1.Volume{
- newHostPathVolume(volumeNameSocketDir, pluginSocketDir),
- newHostPathVolume(volumeNameMountpointDir, kubeletDirPath+"/pods"),
- newHostPathVolume(volumeNameRegistrationDir, kubeletDirPath+"/plugins_registry"),
- newHostPathVolume(volumeNamePluginDir, kubeletDirPath+"/plugins"),
- newHostPathVolume(volumeNameAppRootDir, appRootDir),
- newHostPathVolume(volumeNameSysDir, volumePathSysDir),
- newHostPathVolume(volumeNameDevDir, volumePathDevDir),
- newHostPathVolume(volumeNameRunUdevData, volumePathRunUdevData),
- newHostPathVolume(volumeNameLegacyAppRootDir, legacyAppRootDir),
+ k8s.NewHostPathVolume(csiDirVolumeName, pluginSocketDir),
+ k8s.NewHostPathVolume(kubeletPodsDirVolumeName, kubeletDirPath+"/pods"),
+ k8s.NewHostPathVolume(registrationDirVolumeName, kubeletDirPath+"/plugins_registry"),
+ k8s.NewHostPathVolume(kubeletPluginsDirVolumeName, kubeletDirPath+"/plugins"),
+ k8s.NewHostPathVolume(consts.AppRootDirVolumeName, consts.AppRootDirVolumePath),
+ k8s.NewHostPathVolume(consts.LegacyAppRootDirVolumeName, consts.LegacyAppRootDirVolumePath),
+ k8s.NewHostPathVolume(consts.SysDirVolumeName, consts.SysDirVolumePath),
+ k8s.NewHostPathVolume(consts.DevDirVolumeName, consts.DevDirVolumePath),
+ k8s.NewHostPathVolume(consts.RunUdevDataVolumeName, consts.RunUdevDataVolumePath),
}
volumeMounts = []corev1.VolumeMount{
- newVolumeMount(volumeNameSocketDir, socketDir, corev1.MountPropagationNone, false),
- newVolumeMount(volumeNameMountpointDir, kubeletDirPath+"/pods", corev1.MountPropagationBidirectional, false),
- newVolumeMount(volumeNamePluginDir, kubeletDirPath+"/plugins", corev1.MountPropagationBidirectional, false),
- newVolumeMount(volumeNameAppRootDir, appRootDir, corev1.MountPropagationBidirectional, false),
- newVolumeMount(volumeNameSysDir, volumePathSysDir, corev1.MountPropagationBidirectional, false),
- newVolumeMount(volumeNameDevDir, volumePathDevDir, corev1.MountPropagationHostToContainer, true),
- newVolumeMount(volumeNameRunUdevData, volumePathRunUdevData, corev1.MountPropagationBidirectional, true),
- newVolumeMount(volumeNameLegacyAppRootDir, legacyAppRootDir, corev1.MountPropagationBidirectional, false),
+ k8s.NewVolumeMount(csiDirVolumeName, csiDirVolumePath, corev1.MountPropagationNone, false),
+ k8s.NewVolumeMount(kubeletPodsDirVolumeName, kubeletDirPath+"/pods", corev1.MountPropagationBidirectional, false),
+ k8s.NewVolumeMount(kubeletPluginsDirVolumeName, kubeletDirPath+"/plugins", corev1.MountPropagationBidirectional, false),
+ k8s.NewVolumeMount(consts.AppRootDirVolumeName, consts.AppRootDirVolumePath, corev1.MountPropagationBidirectional, false),
+ k8s.NewVolumeMount(consts.LegacyAppRootDirVolumeName, consts.LegacyAppRootDirVolumePath, corev1.MountPropagationBidirectional, false),
+ k8s.NewVolumeMount(consts.SysDirVolumeName, consts.SysDirVolumePath, corev1.MountPropagationBidirectional, false),
+ k8s.NewVolumeMount(consts.DevDirVolumeName, consts.DevDirVolumePath, corev1.MountPropagationHostToContainer, true),
+ k8s.NewVolumeMount(consts.RunUdevDataVolumeName, consts.RunUdevDataVolumePath, corev1.MountPropagationBidirectional, true),
}
return
@@ -129,8 +120,8 @@ func nodeDriverRegistrarContainer(image, pluginSocketDir string) corev1.Containe
},
Env: []corev1.EnvVar{kubeNodeNameEnvVar},
VolumeMounts: []corev1.VolumeMount{
- newVolumeMount(volumeNameSocketDir, socketDir, corev1.MountPropagationNone, false),
- newVolumeMount(volumeNameRegistrationDir, "/registration", corev1.MountPropagationNone, false),
+ k8s.NewVolumeMount(csiDirVolumeName, csiDirVolumePath, corev1.MountPropagationNone, false),
+ k8s.NewVolumeMount(registrationDirVolumeName, "/registration", corev1.MountPropagationNone, false),
},
TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
TerminationMessagePath: "/var/log/driver-registrar-termination-log",
@@ -192,13 +183,13 @@ func livenessProbeContainer(image string) corev1.Container {
Name: "liveness-probe",
Image: image,
Args: []string{
- fmt.Sprintf("--csi-address=%v%v", socketDir, socketFile),
+ fmt.Sprintf("--csi-address=%v%v", csiDirVolumePath, socketFile),
fmt.Sprintf("--health-port=%v", healthZContainerPort),
},
TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
TerminationMessagePath: "/var/log/driver-liveness-termination-log",
VolumeMounts: []corev1.VolumeMount{
- newVolumeMount(volumeNameSocketDir, socketDir, corev1.MountPropagationNone, false),
+ k8s.NewVolumeMount(csiDirVolumeName, csiDirVolumePath, corev1.MountPropagationNone, false),
},
}
}
@@ -254,14 +245,15 @@ func doCreateDaemonset(ctx context.Context, args *Args) (err error) {
fmt.Sprintf("-v=%d", logLevel),
fmt.Sprintf("--identity=%s", consts.Identity),
fmt.Sprintf("--csi-endpoint=$(%s)", csiEndpointEnvVarName),
- fmt.Sprintf("--kube-node-name=$(%s)", kubeNodeNameEnvVarName),
+ fmt.Sprintf("--kube-node-name=$(%s)", consts.KubeNodeNameEnvVarName),
fmt.Sprintf("--readiness-port=%d", consts.ReadinessPort),
fmt.Sprintf("--metrics-port=%d", consts.MetricsPort),
}
+ nodeNameEnvVarName := "KUBE_NODE_NAME"
nodeControllerArgs := []string{
consts.NodeControllerName,
fmt.Sprintf("-v=%d", logLevel),
- fmt.Sprintf("--kube-node-name=$(%s)", kubeNodeNameEnvVarName),
+ fmt.Sprintf("--kube-node-name=$(%s)", nodeNameEnvVarName),
}
podSpec := corev1.PodSpec{
@@ -324,7 +316,7 @@ func doCreateLegacyDaemonset(ctx context.Context, args *Args) (err error) {
consts.LegacyNodeServerName,
fmt.Sprintf("-v=%d", logLevel),
fmt.Sprintf("--csi-endpoint=$(%s)", csiEndpointEnvVarName),
- fmt.Sprintf("--kube-node-name=$(%s)", kubeNodeNameEnvVarName),
+ fmt.Sprintf("--kube-node-name=$(%s)", consts.KubeNodeNameEnvVarName),
fmt.Sprintf("--readiness-port=%d", consts.ReadinessPort),
}
diff --git a/pkg/installer/deployment.go b/pkg/installer/deployment.go
index af1b8aea7..a57ede52b 100644
--- a/pkg/installer/deployment.go
+++ b/pkg/installer/deployment.go
@@ -84,7 +84,7 @@ func doCreateDeployment(ctx context.Context, args *Args, legacy bool, step int)
[]string{
fmt.Sprintf("-v=%d", logLevel),
fmt.Sprintf("--csi-endpoint=$(%s)", csiEndpointEnvVarName),
- fmt.Sprintf("--kube-node-name=$(%s)", kubeNodeNameEnvVarName),
+ fmt.Sprintf("--kube-node-name=$(%s)", consts.KubeNodeNameEnvVarName),
fmt.Sprintf("--readiness-port=%d", consts.ReadinessPort),
}...,
)
@@ -93,8 +93,8 @@ func doCreateDeployment(ctx context.Context, args *Args, legacy bool, step int)
podSpec := corev1.PodSpec{
ServiceAccountName: consts.Identity,
Volumes: []corev1.Volume{
- newHostPathVolume(
- volumeNameSocketDir,
+ k8s.NewHostPathVolume(
+ csiDirVolumeName,
newPluginsSocketDir(kubeletDirPath, fmt.Sprintf("%s-controller", consts.ControllerServerName)),
),
},
@@ -113,7 +113,7 @@ func doCreateDeployment(ctx context.Context, args *Args, legacy bool, step int)
},
Env: []corev1.EnvVar{csiEndpointEnvVar},
VolumeMounts: []corev1.VolumeMount{
- newVolumeMount(volumeNameSocketDir, socketDir, corev1.MountPropagationNone, false),
+ k8s.NewVolumeMount(csiDirVolumeName, csiDirVolumePath, corev1.MountPropagationNone, false),
},
TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
TerminationMessagePath: "/var/log/controller-provisioner-termination-log",
@@ -145,7 +145,7 @@ func doCreateDeployment(ctx context.Context, args *Args, legacy bool, step int)
},
Env: []corev1.EnvVar{csiEndpointEnvVar},
VolumeMounts: []corev1.VolumeMount{
- newVolumeMount(volumeNameSocketDir, socketDir, corev1.MountPropagationNone, false),
+ k8s.NewVolumeMount(csiDirVolumeName, csiDirVolumePath, corev1.MountPropagationNone, false),
},
TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
TerminationMessagePath: "/var/log/controller-csi-resizer-termination-log",
@@ -170,7 +170,7 @@ func doCreateDeployment(ctx context.Context, args *Args, legacy bool, step int)
},
Env: []corev1.EnvVar{kubeNodeNameEnvVar, csiEndpointEnvVar},
VolumeMounts: []corev1.VolumeMount{
- newVolumeMount(volumeNameSocketDir, socketDir, corev1.MountPropagationNone, false),
+ k8s.NewVolumeMount(csiDirVolumeName, csiDirVolumePath, corev1.MountPropagationNone, false),
},
},
},
diff --git a/pkg/installer/namespace.go b/pkg/installer/namespace.go
index e0e81a051..1ea8bc207 100644
--- a/pkg/installer/namespace.go
+++ b/pkg/installer/namespace.go
@@ -19,6 +19,7 @@ package installer
import (
"context"
+ "github.com/minio/directpv/pkg/jobs"
"github.com/minio/directpv/pkg/k8s"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
@@ -106,8 +107,17 @@ func createNamespace(ctx context.Context, args *Args) (err error) {
}
func deleteNamespace(ctx context.Context) error {
+ jobObjects, err := jobs.NewLister().IgnoreNotFound(true).Get(ctx)
+ if err != nil {
+ return err
+ }
+ if len(jobObjects) > 0 {
+ // Do not delete the namespace if there
+ // are jobs in directpv namespace
+ return nil
+ }
propagationPolicy := metav1.DeletePropagationForeground
- err := k8s.KubeClient().CoreV1().Namespaces().Delete(
+ err = k8s.KubeClient().CoreV1().Namespaces().Delete(
ctx, namespace, metav1.DeleteOptions{PropagationPolicy: &propagationPolicy},
)
if err != nil {
diff --git a/pkg/installer/psp.go b/pkg/installer/psp.go
index 430009819..016ec948c 100644
--- a/pkg/installer/psp.go
+++ b/pkg/installer/psp.go
@@ -146,10 +146,10 @@ func createPodSecurityPolicy(ctx context.Context, args *Args) (err error) {
Volumes: []policy.FSType{policy.HostPath},
AllowedHostPaths: []policy.AllowedHostPath{
{PathPrefix: "/proc", ReadOnly: true},
- {PathPrefix: volumePathSysDir},
+ {PathPrefix: consts.SysDirVolumePath},
{PathPrefix: consts.UdevDataDir, ReadOnly: true},
{PathPrefix: consts.AppRootDir},
- {PathPrefix: socketDir},
+ {PathPrefix: csiDirVolumePath},
{PathPrefix: kubeletDirPath},
},
SELinux: policy.SELinuxStrategyOptions{
diff --git a/pkg/installer/rbac.go b/pkg/installer/rbac.go
index ea34ff3e9..0b98848a2 100644
--- a/pkg/installer/rbac.go
+++ b/pkg/installer/rbac.go
@@ -166,6 +166,7 @@ func createClusterRole(ctx context.Context, args *Args) (err error) {
),
newPolicyRule([]string{"pods", "pod"}, nil, getVerb, listVerb, watchVerb),
newPolicyRule([]string{"secrets", "secret"}, nil, getVerb, listVerb, watchVerb),
+ newPolicyRule([]string{"jobs"}, []string{"batch"}, createVerb, deleteVerb, getVerb, listVerb, updateVerb, watchVerb),
},
AggregationRule: nil,
}
diff --git a/pkg/installer/utils.go b/pkg/installer/utils.go
index c37ecaab3..8bc82d7c7 100644
--- a/pkg/installer/utils.go
+++ b/pkg/installer/utils.go
@@ -24,38 +24,13 @@ import (
"strings"
"github.com/minio/directpv/pkg/k8s"
- corev1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
)
-func newHostPathVolume(name, path string) corev1.Volume {
- hostPathType := corev1.HostPathDirectoryOrCreate
- volumeSource := corev1.VolumeSource{
- HostPath: &corev1.HostPathVolumeSource{
- Path: path,
- Type: &hostPathType,
- },
- }
-
- return corev1.Volume{
- Name: name,
- VolumeSource: volumeSource,
- }
-}
-
func newPluginsSocketDir(kubeletDir, name string) string {
return path.Join(kubeletDir, "plugins", k8s.SanitizeResourceName(name))
}
-func newVolumeMount(name, path string, mountPropogation corev1.MountPropagationMode, readOnly bool) corev1.VolumeMount {
- return corev1.VolumeMount{
- Name: name,
- ReadOnly: readOnly,
- MountPath: path,
- MountPropagation: &mountPropogation,
- }
-}
-
func getRandSuffix() string {
b := make([]byte, 5)
if _, err := rand.Read(b); err != nil {
diff --git a/pkg/installer/vars.go b/pkg/installer/vars.go
index 4abf5c2ba..6fc3b074d 100644
--- a/pkg/installer/vars.go
+++ b/pkg/installer/vars.go
@@ -38,7 +38,7 @@ var (
}
kubeNodeNameEnvVar = corev1.EnvVar{
- Name: kubeNodeNameEnvVarName,
+ Name: consts.KubeNodeNameEnvVarName,
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
APIVersion: "v1",
diff --git a/pkg/jobs/copy.go b/pkg/jobs/copy.go
new file mode 100644
index 000000000..0c63c84e3
--- /dev/null
+++ b/pkg/jobs/copy.go
@@ -0,0 +1,147 @@
+// This file is part of MinIO DirectPV
+// Copyright (c) 2023 MinIO, Inc.
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package jobs
+
+import (
+ "context"
+ "fmt"
+
+ directpvtypes "github.com/minio/directpv/pkg/apis/directpv.min.io/types"
+ "github.com/minio/directpv/pkg/consts"
+ "github.com/minio/directpv/pkg/k8s"
+ batchv1 "k8s.io/api/batch/v1"
+ corev1 "k8s.io/api/core/v1"
+ apierrors "k8s.io/apimachinery/pkg/api/errors"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+)
+
+// CopyOpts defines the options for copying
+type CopyOpts struct {
+ SourceDriveID directpvtypes.DriveID
+ DestinationDriveID directpvtypes.DriveID
+ VolumeID string
+ NodeID directpvtypes.NodeID
+}
+
+// ContainerParams represents the container parameters
+type ContainerParams struct {
+ Image string
+ ImagePullSecrets []corev1.LocalObjectReference
+ Tolerations []corev1.Toleration
+}
+
+// CreateCopyJob creates a new job instance for copying the volume.
+func CreateCopyJob(ctx context.Context, opts CopyOpts, params ContainerParams, overwrite bool) error {
+ labels := map[string]string{
+ string(directpvtypes.JobTypeLabelKey): string(JobTypeCopy),
+ string(directpvtypes.SourceDriveLabelKey): string(opts.SourceDriveID),
+ string(directpvtypes.DestinationDriveLabelKey): string(opts.DestinationDriveID),
+ string(directpvtypes.NodeLabelKey): string(opts.NodeID),
+ string(directpvtypes.VolumeLabelKey): opts.VolumeID,
+ }
+ for k, v := range defaultLabels {
+ labels[k] = v
+ }
+ objectMeta := metav1.ObjectMeta{
+ Name: "copy-" + opts.VolumeID,
+ Namespace: consts.AppNamespace,
+ Labels: labels,
+ Finalizers: []string{
+ consts.CopyProtectionFinalizer,
+ },
+ }
+ privileged := true
+ var backoffLimit int32 = 3
+ job := &batchv1.Job{
+ TypeMeta: metav1.TypeMeta{
+ Kind: "Job",
+ APIVersion: "batch/v1",
+ },
+ ObjectMeta: objectMeta,
+ Spec: batchv1.JobSpec{
+ BackoffLimit: &backoffLimit,
+ Template: corev1.PodTemplateSpec{
+ Spec: corev1.PodSpec{
+ NodeSelector: map[string]string{string(directpvtypes.NodeLabelKey): string(opts.NodeID)},
+ ServiceAccountName: consts.Identity,
+ Tolerations: params.Tolerations,
+ ImagePullSecrets: params.ImagePullSecrets,
+ Volumes: []corev1.Volume{
+ k8s.NewHostPathVolume(consts.AppRootDirVolumeName, consts.AppRootDirVolumePath),
+ k8s.NewHostPathVolume(consts.LegacyAppRootDirVolumeName, consts.LegacyAppRootDirVolumePath),
+ k8s.NewHostPathVolume(consts.SysDirVolumeName, consts.SysDirVolumePath),
+ k8s.NewHostPathVolume(consts.DevDirVolumeName, consts.DevDirVolumePath),
+ k8s.NewHostPathVolume(consts.RunUdevDataVolumeName, consts.RunUdevDataVolumePath),
+ },
+ Containers: []corev1.Container{
+ {
+ Name: "copy-job",
+ Image: params.Image,
+ Args: []string{
+ "copy",
+ string(opts.SourceDriveID),
+ string(opts.DestinationDriveID),
+ "--volume-id=" + opts.VolumeID,
+ fmt.Sprintf("--kube-node-name=$(%s)", consts.KubeNodeNameEnvVarName),
+ },
+ Env: []corev1.EnvVar{
+ {
+ Name: consts.KubeNodeNameEnvVarName,
+ ValueFrom: &corev1.EnvVarSource{
+ FieldRef: &corev1.ObjectFieldSelector{
+ APIVersion: "v1",
+ FieldPath: "spec.nodeName",
+ },
+ },
+ },
+ },
+ TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
+ TerminationMessagePath: "/var/log/copy-termination-log",
+ VolumeMounts: []corev1.VolumeMount{
+ k8s.NewVolumeMount(consts.AppRootDirVolumeName, consts.AppRootDirVolumePath, corev1.MountPropagationBidirectional, false),
+ k8s.NewVolumeMount(consts.LegacyAppRootDirVolumeName, consts.LegacyAppRootDirVolumePath, corev1.MountPropagationBidirectional, false),
+ k8s.NewVolumeMount(consts.SysDirVolumeName, consts.SysDirVolumePath, corev1.MountPropagationBidirectional, false),
+ k8s.NewVolumeMount(consts.DevDirVolumeName, consts.DevDirVolumePath, corev1.MountPropagationHostToContainer, true),
+ k8s.NewVolumeMount(consts.RunUdevDataVolumeName, consts.RunUdevDataVolumePath, corev1.MountPropagationBidirectional, true),
+ },
+ SecurityContext: &corev1.SecurityContext{
+ Privileged: &privileged,
+ },
+ },
+ },
+ RestartPolicy: corev1.RestartPolicyNever,
+ },
+ },
+ },
+ }
+
+ if _, err := k8s.KubeClient().BatchV1().Jobs(consts.AppNamespace).Create(ctx, job, metav1.CreateOptions{}); err != nil {
+ if apierrors.IsAlreadyExists(err) && overwrite {
+ return deleteAndCreate(ctx, job)
+ }
+ return err
+ }
+ return nil
+}
+
+func deleteAndCreate(ctx context.Context, job *batchv1.Job) error {
+ if err := k8s.KubeClient().BatchV1().Jobs(consts.AppNamespace).Delete(ctx, job.Name, metav1.DeleteOptions{}); err != nil {
+ return err
+ }
+ _, err := k8s.KubeClient().BatchV1().Jobs(consts.AppNamespace).Create(ctx, job, metav1.CreateOptions{})
+ return err
+}
diff --git a/pkg/jobs/event.go b/pkg/jobs/event.go
new file mode 100644
index 000000000..9fb881607
--- /dev/null
+++ b/pkg/jobs/event.go
@@ -0,0 +1,205 @@
+// This file is part of MinIO DirectPV
+// Copyright (c) 2023 MinIO, Inc.
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package jobs
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ directpvtypes "github.com/minio/directpv/pkg/apis/directpv.min.io/types"
+ "github.com/minio/directpv/pkg/client"
+ "github.com/minio/directpv/pkg/consts"
+ "github.com/minio/directpv/pkg/controller"
+ "github.com/minio/directpv/pkg/k8s"
+ v1 "k8s.io/api/batch/v1"
+ apierrors "k8s.io/apimachinery/pkg/api/errors"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/watch"
+ "k8s.io/client-go/tools/cache"
+)
+
+// JobStatus represents the status of the Job
+type JobStatus string
+
+// JobType represents the type of the Job
+type JobType string
+
+const (
+ // JobTypeCopy represents the mirror job
+ JobTypeCopy JobType = "copy"
+ // JobTypeUnknown represents unknown job type
+ JobTypeUnknown JobType = "unknown"
+ // JobStatusActive represents the active job status
+ JobStatusActive JobStatus = "active"
+ // JobStatusFailed represents the failed job status
+ JobStatusFailed JobStatus = "failed"
+ // JobStatusSucceeded represents the succeeded job status
+ JobStatusSucceeded JobStatus = "succeeded"
+)
+
+var defaultLabels = map[string]string{
+ "application-name": consts.GroupName,
+ "application-type": "CSIDriver",
+ string(directpvtypes.CreatedByLabelKey): "controller",
+ string(directpvtypes.VersionLabelKey): consts.LatestAPIVersion,
+}
+
+const (
+ workerThreads = 10
+ resyncPeriod = 10 * time.Minute
+)
+
+type jobsEventHandler struct{}
+
+func newJobsEventHandler() *jobsEventHandler {
+ return &jobsEventHandler{}
+}
+
+func (handler *jobsEventHandler) ListerWatcher() cache.ListerWatcher {
+ return &cache.ListWatch{
+ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
+ return k8s.KubeClient().BatchV1().Jobs(consts.AppNamespace).List(context.TODO(), options)
+ },
+ WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
+ return k8s.KubeClient().BatchV1().Jobs(consts.AppNamespace).Watch(context.TODO(), options)
+ },
+ }
+}
+
+func (handler *jobsEventHandler) ObjectType() runtime.Object {
+ return &v1.Job{}
+}
+
+func (handler *jobsEventHandler) Handle(ctx context.Context, eventType controller.EventType, object runtime.Object) error {
+ job := object.(*v1.Job)
+ if !job.GetDeletionTimestamp().IsZero() {
+ return handleDelete(ctx, job)
+ }
+ if eventType == controller.UpdateEvent {
+ return handleUpdate(ctx, object.(*v1.Job))
+ }
+ return nil
+}
+
+func handleDelete(ctx context.Context, job *v1.Job) error {
+ jobType, err := getJobType(job)
+ if err != nil {
+ return err
+ }
+ switch jobType {
+ case JobTypeCopy:
+ return handleCopyJobDeletion(ctx, job)
+ default:
+ return fmt.Errorf("Invalid jobType: %v", jobType)
+ }
+}
+
+func handleCopyJobDeletion(ctx context.Context, job *v1.Job) error {
+ if err := updateOnCopyJobCompletion(ctx, job); err != nil {
+ return err
+ }
+ finalizers := []string{}
+ for _, finalizer := range job.ObjectMeta.GetFinalizers() {
+ if finalizer == consts.CopyProtectionFinalizer {
+ continue
+ }
+ finalizers = append(finalizers, finalizer)
+ }
+ job.ObjectMeta.SetFinalizers(finalizers)
+ _, err := k8s.KubeClient().BatchV1().Jobs(consts.AppNamespace).Update(ctx, job, metav1.UpdateOptions{})
+ return err
+}
+
+func handleUpdate(ctx context.Context, job *v1.Job) error {
+ if job.Status.CompletionTime == nil || job.Status.Succeeded == 0 {
+ return nil
+ }
+ jobType, err := getJobType(job)
+ if err != nil {
+ return err
+ }
+ switch jobType {
+ case JobTypeCopy:
+ return updateOnCopyJobCompletion(ctx, job)
+ default:
+ return fmt.Errorf("Invalid jobType: %v", jobType)
+ }
+}
+
+func getJobType(job *v1.Job) (JobType, error) {
+ labels := job.ObjectMeta.GetLabels()
+ if labels == nil {
+ return JobTypeUnknown, fmt.Errorf("No labels present in the job: %v", job.Name)
+ }
+ value, ok := labels[string(directpvtypes.JobTypeLabelKey)]
+ if !ok {
+ return JobTypeUnknown, fmt.Errorf("Unable to identify the job: %v; Missing JobType", job.Name)
+ }
+ jobType, err := ToType(value)
+ if err != nil {
+ return JobTypeUnknown, err
+ }
+ return jobType, nil
+}
+
+func updateOnCopyJobCompletion(ctx context.Context, job *v1.Job) error {
+ labels := job.ObjectMeta.GetLabels()
+
+ // Update volume
+ volumeName := labels[string(directpvtypes.VolumeLabelKey)]
+ if volumeName == "" {
+ return fmt.Errorf("No volumeID present in the copy job: %v", job.Name)
+ }
+ volume, err := client.VolumeClient().Get(ctx, volumeName, metav1.GetOptions{})
+ if err != nil {
+ if apierrors.IsNotFound(err) {
+ return nil
+ }
+ return err
+ }
+ volume.Status.Status = directpvtypes.VolumeStatusReady
+ if !volume.IsStaged() {
+ volume.Status.Status = directpvtypes.VolumeStatusPending
+ }
+ if _, err = client.VolumeClient().Update(ctx, volume, metav1.UpdateOptions{}); err != nil {
+ return err
+ }
+
+ // Update source drive
+ sourceDriveID := labels[string(directpvtypes.SourceDriveLabelKey)]
+ if sourceDriveID == "" {
+ return fmt.Errorf("No source drive ID present in the copy job: %v", job.Name)
+ }
+ sourceDrive, err := client.DriveClient().Get(ctx, sourceDriveID, metav1.GetOptions{})
+ if err != nil {
+ if apierrors.IsNotFound(err) {
+ return nil
+ }
+ return err
+ }
+ sourceDrive.RemoveCopyProtectionFinalizer()
+ _, err = client.DriveClient().Update(ctx, sourceDrive, metav1.UpdateOptions{})
+ return err
+}
+
+// StartController starts volume controller.
+func StartController(ctx context.Context) {
+ ctrl := controller.New("jobs", newJobsEventHandler(), workerThreads, resyncPeriod)
+ ctrl.Run(ctx)
+}
diff --git a/pkg/jobs/lister.go b/pkg/jobs/lister.go
new file mode 100644
index 000000000..e08b6a2d0
--- /dev/null
+++ b/pkg/jobs/lister.go
@@ -0,0 +1,264 @@
+// This file is part of MinIO DirectPV
+// Copyright (c) 2023 MinIO, Inc.
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package jobs
+
+import (
+ "context"
+ "fmt"
+ "strings"
+
+ directpvtypes "github.com/minio/directpv/pkg/apis/directpv.min.io/types"
+ "github.com/minio/directpv/pkg/consts"
+ "github.com/minio/directpv/pkg/k8s"
+ "github.com/minio/directpv/pkg/utils"
+ batchv1 "k8s.io/api/batch/v1"
+ apierrors "k8s.io/apimachinery/pkg/api/errors"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+)
+
+// ListJobResult denotes list of job result.
+type ListJobResult struct {
+ Job batchv1.Job
+ Err error
+}
+
+// Lister is job lister.
+type Lister struct {
+ nodes []directpvtypes.LabelValue
+ statusList []JobStatus
+ typeList []JobType
+ jobNames []string
+ labels map[directpvtypes.LabelKey]directpvtypes.LabelValue
+ maxObjects int64
+ ignoreNotFound bool
+}
+
+// NewLister creates new job lister.
+func NewLister() *Lister {
+ return &Lister{
+ maxObjects: k8s.MaxThreadCount,
+ }
+}
+
+// ToStatus converts a value to job status.
+func ToStatus(value string) (JobStatus, error) {
+ status := JobStatus(strings.ToLower(value))
+ switch status {
+ case JobStatusActive, JobStatusFailed, JobStatusSucceeded:
+ default:
+ return status, fmt.Errorf("invalid job status: %v", value)
+ }
+ return status, nil
+}
+
+// ToType converts a value to job type.
+func ToType(value string) (JobType, error) {
+ status := JobType(strings.ToLower(value))
+ switch status {
+ case JobTypeCopy:
+ default:
+ return status, fmt.Errorf("invalid job type: %v", value)
+ }
+ return status, nil
+}
+
+// NodeSelector adds filter listing by nodes.
+func (lister *Lister) NodeSelector(nodes []directpvtypes.LabelValue) *Lister {
+ lister.nodes = nodes
+ return lister
+}
+
+// StatusSelector adds filter listing by job status.
+func (lister *Lister) StatusSelector(statusList []JobStatus) *Lister {
+ lister.statusList = statusList
+ return lister
+}
+
+// TypeSelector adds filter listing by job status.
+func (lister *Lister) TypeSelector(typeList []JobType) *Lister {
+ lister.typeList = typeList
+ return lister
+}
+
+// JobNameSelector adds filter listing by job names.
+func (lister *Lister) JobNameSelector(jobNames []string) *Lister {
+ lister.jobNames = jobNames
+ return lister
+}
+
+// LabelSelector adds filter listing by labels.
+func (lister *Lister) LabelSelector(labels map[directpvtypes.LabelKey]directpvtypes.LabelValue) *Lister {
+ lister.labels = labels
+ return lister
+}
+
+// MaxObjects controls number of items to be fetched in every iteration.
+func (lister *Lister) MaxObjects(n int64) *Lister {
+ lister.maxObjects = n
+ return lister
+}
+
+// IgnoreNotFound controls listing to ignore job not found error.
+func (lister *Lister) IgnoreNotFound(b bool) *Lister {
+ lister.ignoreNotFound = b
+ return lister
+}
+
+// GetStatus gets the job status.
+func GetStatus(job batchv1.Job) JobStatus {
+ if job.Status.Active > 0 {
+ return JobStatusActive
+ }
+ if job.Status.CompletionTime != nil && job.Status.Succeeded > 0 {
+ return JobStatusSucceeded
+ }
+ return JobStatusFailed
+}
+
+// GetType gets the job type
+func GetType(job batchv1.Job) JobType {
+ labels := job.GetLabels()
+ if v, ok := labels[string(directpvtypes.JobTypeLabelKey)]; ok {
+ jobType, err := ToType(v)
+ if err == nil {
+ return jobType
+ }
+ }
+ return JobTypeUnknown
+}
+
+// GetNode returns the node name of the job
+func GetNode(job batchv1.Job) string {
+ labels := job.GetLabels()
+ if v, ok := labels[string(directpvtypes.NodeLabelKey)]; ok {
+ return v
+ }
+ return ""
+}
+
+// List returns channel to loop through job items.
+func (lister *Lister) List(ctx context.Context) <-chan ListJobResult {
+ getOnly := len(lister.nodes) == 0 &&
+ len(lister.statusList) == 0 &&
+ len(lister.labels) == 0 &&
+ len(lister.typeList) == 0 &&
+ len(lister.jobNames) != 0
+
+ labelMap := map[directpvtypes.LabelKey][]directpvtypes.LabelValue{
+ directpvtypes.NodeLabelKey: lister.nodes,
+ }
+ for k, v := range lister.labels {
+ labelMap[k] = []directpvtypes.LabelValue{v}
+ }
+ labelSelector := directpvtypes.ToLabelSelector(labelMap)
+
+ resultCh := make(chan ListJobResult)
+ go func() {
+ defer close(resultCh)
+
+ send := func(result ListJobResult) bool {
+ select {
+ case <-ctx.Done():
+ return false
+ case resultCh <- result:
+ return true
+ }
+ }
+
+ if !getOnly {
+ options := metav1.ListOptions{
+ Limit: lister.maxObjects,
+ LabelSelector: labelSelector,
+ }
+
+ for {
+ result, err := k8s.KubeClient().BatchV1().Jobs(consts.AppNamespace).List(ctx, options)
+ if err != nil {
+ if apierrors.IsNotFound(err) && lister.ignoreNotFound {
+ break
+ }
+
+ send(ListJobResult{Err: err})
+ return
+ }
+
+ for _, item := range result.Items {
+ var found bool
+ var values []string
+ for i := range lister.jobNames {
+ if lister.jobNames[i] == item.Name {
+ found = true
+ } else {
+ values = append(values, lister.jobNames[i])
+ }
+ }
+ lister.jobNames = values
+
+ switch {
+ case found || (len(lister.statusList) == 0 && len(lister.typeList) == 0):
+ case len(lister.statusList) > 0 && utils.Contains(lister.statusList, GetStatus(item)):
+ case len(lister.typeList) > 0 && utils.Contains(lister.typeList, GetType(item)):
+ default:
+ continue
+ }
+
+ if !send(ListJobResult{Job: item}) {
+ return
+ }
+ }
+
+ if result.Continue == "" {
+ break
+ }
+
+ options.Continue = result.Continue
+ }
+ }
+
+ for _, jobName := range lister.jobNames {
+ job, err := k8s.KubeClient().BatchV1().Jobs(consts.AppNamespace).Get(ctx, jobName, metav1.GetOptions{})
+ if err != nil {
+ if apierrors.IsNotFound(err) && lister.ignoreNotFound {
+ continue
+ }
+ send(ListJobResult{Err: err})
+ return
+ }
+ if !send(ListJobResult{Job: *job}) {
+ return
+ }
+ }
+ }()
+
+ return resultCh
+}
+
+// Get returns list of jobs.
+func (lister *Lister) Get(ctx context.Context) ([]batchv1.Job, error) {
+ ctx, cancelFunc := context.WithCancel(ctx)
+ defer cancelFunc()
+
+ jobList := []batchv1.Job{}
+ for result := range lister.List(ctx) {
+ if result.Err != nil {
+ return jobList, result.Err
+ }
+ jobList = append(jobList, result.Job)
+ }
+
+ return jobList, nil
+}
diff --git a/pkg/k8s/k8s.go b/pkg/k8s/k8s.go
index b33d360b2..0f2e02d16 100644
--- a/pkg/k8s/k8s.go
+++ b/pkg/k8s/k8s.go
@@ -23,6 +23,7 @@ import (
"github.com/minio/directpv/pkg/utils"
"github.com/spf13/viper"
+ corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/kubernetes/scheme"
@@ -190,3 +191,29 @@ func SanitizeResourceName(name string) string {
return string(result)
}
+
+// NewHostPathVolume - creates volume for given name and host path.
+func NewHostPathVolume(name, path string) corev1.Volume {
+ hostPathType := corev1.HostPathDirectoryOrCreate
+ volumeSource := corev1.VolumeSource{
+ HostPath: &corev1.HostPathVolumeSource{
+ Path: path,
+ Type: &hostPathType,
+ },
+ }
+
+ return corev1.Volume{
+ Name: name,
+ VolumeSource: volumeSource,
+ }
+}
+
+// NewVolumeMount - creates volume mount for given name, path, mount propagation and read only flag.
+func NewVolumeMount(name, path string, mountPropogation corev1.MountPropagationMode, readOnly bool) corev1.VolumeMount {
+ return corev1.VolumeMount{
+ Name: name,
+ ReadOnly: readOnly,
+ MountPath: path,
+ MountPropagation: &mountPropogation,
+ }
+}
diff --git a/pkg/volume/event.go b/pkg/volume/event.go
index 7881d7490..e7ab406f6 100644
--- a/pkg/volume/event.go
+++ b/pkg/volume/event.go
@@ -113,6 +113,9 @@ func (handler *volumeEventHandler) delete(ctx context.Context, volume *types.Vol
if !volume.IsReleased() {
return fmt.Errorf("volume %v must be released before cleaning up", volume.Name)
}
+ if volume.Status.Status == directpvtypes.VolumeStatusCopying {
+ return fmt.Errorf("volume %v is busy copying data", volume.Name)
+ }
if volume.Status.TargetPath != "" {
if err := handler.unmount(volume.Status.TargetPath); err != nil {