Skip to content

Commit

Permalink
Implement mounting via stage directory
Browse files Browse the repository at this point in the history
Previously, multiple containers with the same mounted volume resulted in multiple
FUSE processes. This behaviour was breaking parallel modifications from different
containers, consumed extra resources, and after mounting via systemd was introduced,
led to the total inability to mount the same volume into multiple containers on
the same host.

Now only one FUSE process is started per volume, per host.
  • Loading branch information
vitalif committed Mar 6, 2023
1 parent 1305b20 commit ecf1031
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 57 deletions.
7 changes: 7 additions & 0 deletions deploy/kubernetes/csi-s3.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ spec:
volumeMounts:
- name: plugin-dir
mountPath: /csi
- name: stage-dir
mountPath: /var/lib/kubelet/plugins/kubernetes.io/csi/ru.yandex.s3.csi
mountPropagation: "Bidirectional"
- name: pods-mount-dir
mountPath: /var/lib/kubelet/pods
mountPropagation: "Bidirectional"
Expand All @@ -119,6 +122,10 @@ spec:
hostPath:
path: /var/lib/kubelet/plugins/ru.yandex.s3.csi
type: DirectoryOrCreate
- name: stage-dir
hostPath:
path: /var/lib/kubelet/plugins/kubernetes.io/csi/ru.yandex.s3.csi
type: DirectoryOrCreate
- name: pods-mount-dir
hostPath:
path: /var/lib/kubelet/pods
Expand Down
2 changes: 1 addition & 1 deletion pkg/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type driver struct {
}

var (
vendorVersion = "v1.2.0"
vendorVersion = "v1.34.6"
driverName = "ru.yandex.s3.csi"
)

Expand Down
38 changes: 24 additions & 14 deletions pkg/driver/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package driver
import (
"fmt"
"os"
"os/exec"
"regexp"
"strconv"

Expand Down Expand Up @@ -68,7 +69,6 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
volumeID := req.GetVolumeId()
targetPath := req.GetTargetPath()
stagingTargetPath := req.GetStagingTargetPath()
bucketName, prefix := volumeIDToBucketPrefix(volumeID)

// Check arguments
if req.GetVolumeCapability() == nil {
Expand Down Expand Up @@ -100,18 +100,12 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
glog.V(4).Infof("target %v\nreadonly %v\nvolumeId %v\nattributes %v\nmountflags %v\n",
targetPath, readOnly, volumeID, attrib, mountFlags)

s3, err := s3.NewClientFromSecret(req.GetSecrets())
if err != nil {
return nil, fmt.Errorf("failed to initialize S3 client: %s", err)
}

meta := getMeta(bucketName, prefix, req.VolumeContext)
mounter, err := mounter.New(meta, s3.Config)
cmd := exec.Command("mount", "--bind", stagingTargetPath, targetPath)
cmd.Stderr = os.Stderr
glog.V(3).Infof("Binding volume %v from %v to %v", volumeID, stagingTargetPath, targetPath)
out, err := cmd.Output()
if err != nil {
return nil, err
}
if err := mounter.Mount(stagingTargetPath, targetPath, volumeID); err != nil {
return nil, err
return nil, fmt.Errorf("Error running mount --bind %v %v: %s", stagingTargetPath, targetPath, out)
}

glog.V(4).Infof("s3: volume %s successfully mounted to %s", volumeID, targetPath)
Expand All @@ -131,7 +125,7 @@ func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
}

if err := mounter.FuseUnmount(targetPath); err != nil {
if err := mounter.Unmount(targetPath); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
glog.V(4).Infof("s3: volume %s has been unmounted.", volumeID)
Expand Down Expand Up @@ -174,7 +168,7 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
if err != nil {
return nil, err
}
if err := mounter.Stage(stagingTargetPath); err != nil {
if err := mounter.Mount(stagingTargetPath, volumeID); err != nil {
return nil, err
}

Expand All @@ -193,6 +187,22 @@ func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag
return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
}

proc, err := mounter.FindFuseMountProcess(stagingTargetPath)
if err != nil {
return nil, err
}
exists := false
if proc == nil {
exists, err = mounter.SystemdUnmount(volumeID)
if exists && err != nil {
return nil, err
}
}
if !exists {
err = mounter.FuseUnmount(stagingTargetPath)
}
glog.V(4).Infof("s3: volume %s has been unmounted from stage path %v.", volumeID, stagingTargetPath)

return &csi.NodeUnstageVolumeResponse{}, nil
}

Expand Down
50 changes: 32 additions & 18 deletions pkg/mounter/geesefs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ package mounter
import (
"fmt"
"os"
"strings"
"time"

systemd "github.com/coreos/go-systemd/v22/dbus"
dbus "github.com/godbus/dbus/v5"
"github.com/golang/glog"

"github.com/yandex-cloud/k8s-csi-s3/pkg/s3"
)
Expand Down Expand Up @@ -35,14 +35,6 @@ func newGeeseFSMounter(meta *s3.FSMeta, cfg *s3.Config) (Mounter, error) {
}, nil
}

func (geesefs *geesefsMounter) Stage(stageTarget string) error {
return nil
}

func (geesefs *geesefsMounter) Unstage(stageTarget string) error {
return nil
}

func (geesefs *geesefsMounter) CopyBinary(from, to string) error {
st, err := os.Stat(from)
if err != nil {
Expand Down Expand Up @@ -87,7 +79,7 @@ func (geesefs *geesefsMounter) MountDirect(target string, args []string) error {
return fuseMount(target, geesefsCmd, args)
}

func (geesefs *geesefsMounter) Mount(source, target, volumeID string) error {
func (geesefs *geesefsMounter) Mount(target, volumeID string) error {
fullPath := fmt.Sprintf("%s:%s", geesefs.meta.BucketName, geesefs.meta.Prefix)
var args []string
if geesefs.region != "" {
Expand All @@ -113,7 +105,7 @@ func (geesefs *geesefsMounter) Mount(source, target, volumeID string) error {
}
conn, err := systemd.New()
if err != nil {
fmt.Printf("Failed to connect to systemd dbus service: %v, starting geesefs directly\n", err)
glog.Errorf("Failed to connect to systemd dbus service: %v, starting geesefs directly", err)
return geesefs.MountDirect(target, args)
}
defer conn.Close()
Expand All @@ -127,7 +119,7 @@ func (geesefs *geesefsMounter) Mount(source, target, volumeID string) error {
}
args = append([]string{pluginDir+"/geesefs", "-f", "-o", "allow_other", "--endpoint", geesefs.endpoint}, args...)
unitName := "geesefs-"+systemd.PathBusEscape(volumeID)+".service"
props := []systemd.Property{
newProps := []systemd.Property{
systemd.Property{
Name: "Description",
Value: dbus.MakeVariant("GeeseFS mount for Kubernetes volume "+volumeID),
Expand All @@ -142,13 +134,35 @@ func (geesefs *geesefsMounter) Mount(source, target, volumeID string) error {
Value: dbus.MakeVariant("inactive-or-failed"),
},
}
_, err = conn.StartTransientUnit(unitName, "replace", props, nil)
if err != nil && strings.Contains(err.Error(), "already exists") {
// Stop and garbage collect the unit if automatic collection didn't work for some reason
conn.StopUnit(unitName, "replace", nil)
conn.ResetFailedUnit(unitName)
_, err = conn.StartTransientUnit(unitName, "replace", props, nil)
unitProps, err := conn.GetAllProperties(unitName)
if err == nil {
// Unit already exists
if s, ok := unitProps["ActiveState"].(string); ok && (s == "active" || s == "activating" || s == "reloading") {
// Unit is already active
curPath := ""
prevExec, ok := unitProps["ExecStart"].([][]interface{})
if ok && len(prevExec) > 0 && len(prevExec[0]) >= 2 {
execArgs, ok := prevExec[0][1].([]string)
if ok && len(execArgs) >= 2 {
curPath = execArgs[len(execArgs)-1]
}
}
if curPath != target {
return fmt.Errorf(
"GeeseFS for volume %v is already mounted on host, but"+
" in a different directory. We want %v, but it's in %v",
volumeID, target, curPath,
)
}
// Already mounted at right location
return nil
} else {
// Stop and garbage collect the unit if automatic collection didn't work for some reason
conn.StopUnit(unitName, "replace", nil)
conn.ResetFailedUnit(unitName)
}
}
_, err = conn.StartTransientUnit(unitName, "replace", newProps, nil)
if err != nil {
return fmt.Errorf("Error starting systemd unit %s on host: %v", unitName, err)
}
Expand Down
40 changes: 34 additions & 6 deletions pkg/mounter/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,18 @@ import (
"syscall"
"time"

"github.com/yandex-cloud/k8s-csi-s3/pkg/s3"
systemd "github.com/coreos/go-systemd/v22/dbus"
"github.com/golang/glog"
"github.com/mitchellh/go-ps"
"k8s.io/kubernetes/pkg/util/mount"

"github.com/yandex-cloud/k8s-csi-s3/pkg/s3"
)

// Mounter interface which can be implemented
// by the different mounter types
type Mounter interface {
Stage(stagePath string) error
Unstage(stagePath string) error
Mount(source, target, volumeID string) error
Mount(target, volumeID string) error
}

const (
Expand Down Expand Up @@ -70,12 +70,40 @@ func fuseMount(path string, command string, args []string) error {
return waitForMount(path, 10*time.Second)
}

func Unmount(path string) error {
if err := mount.New("").Unmount(path); err != nil {
return err
}
return nil
}

func SystemdUnmount(volumeID string) (bool, error) {
conn, err := systemd.New()
if err != nil {
glog.Errorf("Failed to connect to systemd dbus service: %v", err)
return false, err
}
defer conn.Close()
unitName := "geesefs-"+systemd.PathBusEscape(volumeID)+".service"
units, err := conn.ListUnitsByNames([]string{ unitName })
glog.Errorf("Got %v", units)
if err != nil {
glog.Errorf("Failed to list systemd unit by name %v: %v", unitName, err)
return false, err
}
if len(units) == 0 || units[0].ActiveState == "inactive" || units[0].ActiveState == "failed" {
return true, nil
}
_, err = conn.StopUnit(unitName, "replace", nil)
return true, err
}

func FuseUnmount(path string) error {
if err := mount.New("").Unmount(path); err != nil {
return err
}
// as fuse quits immediately, we will try to wait until the process is done
process, err := findFuseMountProcess(path)
process, err := FindFuseMountProcess(path)
if err != nil {
glog.Errorf("Error getting PID of fuse mount: %s", err)
return nil
Expand Down Expand Up @@ -107,7 +135,7 @@ func waitForMount(path string, timeout time.Duration) error {
}
}

func findFuseMountProcess(path string) (*os.Process, error) {
func FindFuseMountProcess(path string) (*os.Process, error) {
processes, err := ps.Processes()
if err != nil {
return nil, err
Expand Down
10 changes: 1 addition & 9 deletions pkg/mounter/rclone.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,7 @@ func newRcloneMounter(meta *s3.FSMeta, cfg *s3.Config) (Mounter, error) {
}, nil
}

func (rclone *rcloneMounter) Stage(stageTarget string) error {
return nil
}

func (rclone *rcloneMounter) Unstage(stageTarget string) error {
return nil
}

func (rclone *rcloneMounter) Mount(source, target, volumeID string) error {
func (rclone *rcloneMounter) Mount(target, volumeID string) error {
args := []string{
"mount",
fmt.Sprintf(":s3:%s", path.Join(rclone.meta.BucketName, rclone.meta.Prefix)),
Expand Down
10 changes: 1 addition & 9 deletions pkg/mounter/s3fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,7 @@ func newS3fsMounter(meta *s3.FSMeta, cfg *s3.Config) (Mounter, error) {
}, nil
}

func (s3fs *s3fsMounter) Stage(stageTarget string) error {
return nil
}

func (s3fs *s3fsMounter) Unstage(stageTarget string) error {
return nil
}

func (s3fs *s3fsMounter) Mount(source, target, volumeID string) error {
func (s3fs *s3fsMounter) Mount(target, volumeID string) error {
if err := writes3fsPass(s3fs.pwFileContent); err != nil {
return err
}
Expand Down

0 comments on commit ecf1031

Please sign in to comment.