From ecf1031dfc99a09adae5f34da3b1642e24cb3bcc Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Tue, 7 Mar 2023 00:41:41 +0300 Subject: [PATCH] Implement mounting via stage directory Previously, multiple containers with the same mounted volume resulted in multiple FUSE processes. This behaviour was breaking parallel modifications from different containers, consumed extra resources, and after mounting via systemd was introduced, led to the total inability to mount the same volume into multiple containers on the same host. Now only one FUSE process is started per volume, per host. --- deploy/kubernetes/csi-s3.yaml | 7 +++++ pkg/driver/driver.go | 2 +- pkg/driver/nodeserver.go | 38 ++++++++++++++++---------- pkg/mounter/geesefs.go | 50 ++++++++++++++++++++++------------- pkg/mounter/mounter.go | 40 +++++++++++++++++++++++----- pkg/mounter/rclone.go | 10 +------ pkg/mounter/s3fs.go | 10 +------ 7 files changed, 100 insertions(+), 57 deletions(-) diff --git a/deploy/kubernetes/csi-s3.yaml b/deploy/kubernetes/csi-s3.yaml index 41c5879..65456eb 100644 --- a/deploy/kubernetes/csi-s3.yaml +++ b/deploy/kubernetes/csi-s3.yaml @@ -103,6 +103,9 @@ spec: volumeMounts: - name: plugin-dir mountPath: /csi + - name: stage-dir + mountPath: /var/lib/kubelet/plugins/kubernetes.io/csi/ru.yandex.s3.csi + mountPropagation: "Bidirectional" - name: pods-mount-dir mountPath: /var/lib/kubelet/pods mountPropagation: "Bidirectional" @@ -119,6 +122,10 @@ spec: hostPath: path: /var/lib/kubelet/plugins/ru.yandex.s3.csi type: DirectoryOrCreate + - name: stage-dir + hostPath: + path: /var/lib/kubelet/plugins/kubernetes.io/csi/ru.yandex.s3.csi + type: DirectoryOrCreate - name: pods-mount-dir hostPath: path: /var/lib/kubelet/pods diff --git a/pkg/driver/driver.go b/pkg/driver/driver.go index 3e24bf3..a342a7a 100644 --- a/pkg/driver/driver.go +++ b/pkg/driver/driver.go @@ -33,7 +33,7 @@ type driver struct { } var ( - vendorVersion = "v1.2.0" + vendorVersion = "v1.34.6" driverName = "ru.yandex.s3.csi" ) diff --git a/pkg/driver/nodeserver.go b/pkg/driver/nodeserver.go index 5acecdd..7b5587b 100644 --- a/pkg/driver/nodeserver.go +++ b/pkg/driver/nodeserver.go @@ -19,6 +19,7 @@ package driver import ( "fmt" "os" + "os/exec" "regexp" "strconv" @@ -68,7 +69,6 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis volumeID := req.GetVolumeId() targetPath := req.GetTargetPath() stagingTargetPath := req.GetStagingTargetPath() - bucketName, prefix := volumeIDToBucketPrefix(volumeID) // Check arguments if req.GetVolumeCapability() == nil { @@ -100,18 +100,12 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis glog.V(4).Infof("target %v\nreadonly %v\nvolumeId %v\nattributes %v\nmountflags %v\n", targetPath, readOnly, volumeID, attrib, mountFlags) - s3, err := s3.NewClientFromSecret(req.GetSecrets()) - if err != nil { - return nil, fmt.Errorf("failed to initialize S3 client: %s", err) - } - - meta := getMeta(bucketName, prefix, req.VolumeContext) - mounter, err := mounter.New(meta, s3.Config) + cmd := exec.Command("mount", "--bind", stagingTargetPath, targetPath) + cmd.Stderr = os.Stderr + glog.V(3).Infof("Binding volume %v from %v to %v", volumeID, stagingTargetPath, targetPath) + out, err := cmd.Output() if err != nil { - return nil, err - } - if err := mounter.Mount(stagingTargetPath, targetPath, volumeID); err != nil { - return nil, err + return nil, fmt.Errorf("Error running mount --bind %v %v: %s", stagingTargetPath, targetPath, out) } glog.V(4).Infof("s3: volume %s successfully mounted to %s", volumeID, targetPath) @@ -131,7 +125,7 @@ func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu return nil, status.Error(codes.InvalidArgument, "Target path missing in request") } - if err := mounter.FuseUnmount(targetPath); err != nil { + if err := mounter.Unmount(targetPath); err != nil { return nil, status.Error(codes.Internal, err.Error()) } glog.V(4).Infof("s3: volume %s has been unmounted.", volumeID) @@ -174,7 +168,7 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol if err != nil { return nil, err } - if err := mounter.Stage(stagingTargetPath); err != nil { + if err := mounter.Mount(stagingTargetPath, volumeID); err != nil { return nil, err } @@ -193,6 +187,22 @@ func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag return nil, status.Error(codes.InvalidArgument, "Target path missing in request") } + proc, err := mounter.FindFuseMountProcess(stagingTargetPath) + if err != nil { + return nil, err + } + exists := false + if proc == nil { + exists, err = mounter.SystemdUnmount(volumeID) + if exists && err != nil { + return nil, err + } + } + if !exists { + err = mounter.FuseUnmount(stagingTargetPath) + } + glog.V(4).Infof("s3: volume %s has been unmounted from stage path %v.", volumeID, stagingTargetPath) + return &csi.NodeUnstageVolumeResponse{}, nil } diff --git a/pkg/mounter/geesefs.go b/pkg/mounter/geesefs.go index 7f8f025..cd28d82 100644 --- a/pkg/mounter/geesefs.go +++ b/pkg/mounter/geesefs.go @@ -3,11 +3,11 @@ package mounter import ( "fmt" "os" - "strings" "time" systemd "github.com/coreos/go-systemd/v22/dbus" dbus "github.com/godbus/dbus/v5" + "github.com/golang/glog" "github.com/yandex-cloud/k8s-csi-s3/pkg/s3" ) @@ -35,14 +35,6 @@ func newGeeseFSMounter(meta *s3.FSMeta, cfg *s3.Config) (Mounter, error) { }, nil } -func (geesefs *geesefsMounter) Stage(stageTarget string) error { - return nil -} - -func (geesefs *geesefsMounter) Unstage(stageTarget string) error { - return nil -} - func (geesefs *geesefsMounter) CopyBinary(from, to string) error { st, err := os.Stat(from) if err != nil { @@ -87,7 +79,7 @@ func (geesefs *geesefsMounter) MountDirect(target string, args []string) error { return fuseMount(target, geesefsCmd, args) } -func (geesefs *geesefsMounter) Mount(source, target, volumeID string) error { +func (geesefs *geesefsMounter) Mount(target, volumeID string) error { fullPath := fmt.Sprintf("%s:%s", geesefs.meta.BucketName, geesefs.meta.Prefix) var args []string if geesefs.region != "" { @@ -113,7 +105,7 @@ func (geesefs *geesefsMounter) Mount(source, target, volumeID string) error { } conn, err := systemd.New() if err != nil { - fmt.Printf("Failed to connect to systemd dbus service: %v, starting geesefs directly\n", err) + glog.Errorf("Failed to connect to systemd dbus service: %v, starting geesefs directly", err) return geesefs.MountDirect(target, args) } defer conn.Close() @@ -127,7 +119,7 @@ func (geesefs *geesefsMounter) Mount(source, target, volumeID string) error { } args = append([]string{pluginDir+"/geesefs", "-f", "-o", "allow_other", "--endpoint", geesefs.endpoint}, args...) unitName := "geesefs-"+systemd.PathBusEscape(volumeID)+".service" - props := []systemd.Property{ + newProps := []systemd.Property{ systemd.Property{ Name: "Description", Value: dbus.MakeVariant("GeeseFS mount for Kubernetes volume "+volumeID), @@ -142,13 +134,35 @@ func (geesefs *geesefsMounter) Mount(source, target, volumeID string) error { Value: dbus.MakeVariant("inactive-or-failed"), }, } - _, err = conn.StartTransientUnit(unitName, "replace", props, nil) - if err != nil && strings.Contains(err.Error(), "already exists") { - // Stop and garbage collect the unit if automatic collection didn't work for some reason - conn.StopUnit(unitName, "replace", nil) - conn.ResetFailedUnit(unitName) - _, err = conn.StartTransientUnit(unitName, "replace", props, nil) + unitProps, err := conn.GetAllProperties(unitName) + if err == nil { + // Unit already exists + if s, ok := unitProps["ActiveState"].(string); ok && (s == "active" || s == "activating" || s == "reloading") { + // Unit is already active + curPath := "" + prevExec, ok := unitProps["ExecStart"].([][]interface{}) + if ok && len(prevExec) > 0 && len(prevExec[0]) >= 2 { + execArgs, ok := prevExec[0][1].([]string) + if ok && len(execArgs) >= 2 { + curPath = execArgs[len(execArgs)-1] + } + } + if curPath != target { + return fmt.Errorf( + "GeeseFS for volume %v is already mounted on host, but"+ + " in a different directory. We want %v, but it's in %v", + volumeID, target, curPath, + ) + } + // Already mounted at right location + return nil + } else { + // Stop and garbage collect the unit if automatic collection didn't work for some reason + conn.StopUnit(unitName, "replace", nil) + conn.ResetFailedUnit(unitName) + } } + _, err = conn.StartTransientUnit(unitName, "replace", newProps, nil) if err != nil { return fmt.Errorf("Error starting systemd unit %s on host: %v", unitName, err) } diff --git a/pkg/mounter/mounter.go b/pkg/mounter/mounter.go index 1f71afa..7a83b7e 100644 --- a/pkg/mounter/mounter.go +++ b/pkg/mounter/mounter.go @@ -11,18 +11,18 @@ import ( "syscall" "time" - "github.com/yandex-cloud/k8s-csi-s3/pkg/s3" + systemd "github.com/coreos/go-systemd/v22/dbus" "github.com/golang/glog" "github.com/mitchellh/go-ps" "k8s.io/kubernetes/pkg/util/mount" + + "github.com/yandex-cloud/k8s-csi-s3/pkg/s3" ) // Mounter interface which can be implemented // by the different mounter types type Mounter interface { - Stage(stagePath string) error - Unstage(stagePath string) error - Mount(source, target, volumeID string) error + Mount(target, volumeID string) error } const ( @@ -70,12 +70,40 @@ func fuseMount(path string, command string, args []string) error { return waitForMount(path, 10*time.Second) } +func Unmount(path string) error { + if err := mount.New("").Unmount(path); err != nil { + return err + } + return nil +} + +func SystemdUnmount(volumeID string) (bool, error) { + conn, err := systemd.New() + if err != nil { + glog.Errorf("Failed to connect to systemd dbus service: %v", err) + return false, err + } + defer conn.Close() + unitName := "geesefs-"+systemd.PathBusEscape(volumeID)+".service" + units, err := conn.ListUnitsByNames([]string{ unitName }) + glog.Errorf("Got %v", units) + if err != nil { + glog.Errorf("Failed to list systemd unit by name %v: %v", unitName, err) + return false, err + } + if len(units) == 0 || units[0].ActiveState == "inactive" || units[0].ActiveState == "failed" { + return true, nil + } + _, err = conn.StopUnit(unitName, "replace", nil) + return true, err +} + func FuseUnmount(path string) error { if err := mount.New("").Unmount(path); err != nil { return err } // as fuse quits immediately, we will try to wait until the process is done - process, err := findFuseMountProcess(path) + process, err := FindFuseMountProcess(path) if err != nil { glog.Errorf("Error getting PID of fuse mount: %s", err) return nil @@ -107,7 +135,7 @@ func waitForMount(path string, timeout time.Duration) error { } } -func findFuseMountProcess(path string) (*os.Process, error) { +func FindFuseMountProcess(path string) (*os.Process, error) { processes, err := ps.Processes() if err != nil { return nil, err diff --git a/pkg/mounter/rclone.go b/pkg/mounter/rclone.go index d918702..7b912c2 100644 --- a/pkg/mounter/rclone.go +++ b/pkg/mounter/rclone.go @@ -31,15 +31,7 @@ func newRcloneMounter(meta *s3.FSMeta, cfg *s3.Config) (Mounter, error) { }, nil } -func (rclone *rcloneMounter) Stage(stageTarget string) error { - return nil -} - -func (rclone *rcloneMounter) Unstage(stageTarget string) error { - return nil -} - -func (rclone *rcloneMounter) Mount(source, target, volumeID string) error { +func (rclone *rcloneMounter) Mount(target, volumeID string) error { args := []string{ "mount", fmt.Sprintf(":s3:%s", path.Join(rclone.meta.BucketName, rclone.meta.Prefix)), diff --git a/pkg/mounter/s3fs.go b/pkg/mounter/s3fs.go index c4eba11..533522f 100644 --- a/pkg/mounter/s3fs.go +++ b/pkg/mounter/s3fs.go @@ -28,15 +28,7 @@ func newS3fsMounter(meta *s3.FSMeta, cfg *s3.Config) (Mounter, error) { }, nil } -func (s3fs *s3fsMounter) Stage(stageTarget string) error { - return nil -} - -func (s3fs *s3fsMounter) Unstage(stageTarget string) error { - return nil -} - -func (s3fs *s3fsMounter) Mount(source, target, volumeID string) error { +func (s3fs *s3fsMounter) Mount(target, volumeID string) error { if err := writes3fsPass(s3fs.pwFileContent); err != nil { return err }