diff --git a/docs/multi-attach.md b/docs/multi-attach.md new file mode 100644 index 000000000..f0325ba61 --- /dev/null +++ b/docs/multi-attach.md @@ -0,0 +1,81 @@ +# Multi-Attach + +The multi-attach capability allows you to attach a single EBS volume to multiple EC2 instances located within the same Availability Zone (AZ). This shared volume can be utilized by several pods running on distinct nodes. + +Multi-attach is enabled by specifying `ReadWriteMany` for the `PersistentVolumeClaim.spec.accessMode`. + +## Important + +- Application-level coordination (e.g., via I/O fencing) is required to use multi-attach safely. Failure to do so can result in data loss and silent data corruption. Refer to the AWS documentation on Multi-Attach for more information. +- Currently, the EBS CSI driver only supports multi-attach for `IO2` volumes in `Block` mode. + +Refer to the official AWS documentation on [Multi-Attach](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ebs-volumes-multi.html) for more information, best practices, and limitations of this capability. + +## Example + +1. Create a `StorageClass` referencing an `IO2` volume type: +``` +apiVersion: storage.k8s.io/v1 +kind: StorageClass +metadata: + name: ebs-sc +provisioner: ebs.csi.aws.com +volumeBindingMode: WaitForFirstConsumer +parameters: + type: io2 + iops: "1000" +``` + +2. Create a `PersistentVolumeClaim` referencing the `ReadWriteMany` access and `Block` device modes: +``` +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: block-claim +spec: + accessModes: + - ReadWriteMany + volumeMode: Block + storageClassName: ebs-sc + resources: + requests: + storage: 4Gi +``` + +3. Create a `DaemonSet` referencing the `PersistentVolumeClaim` created in the previous step: +``` +apiVersion: apps/v1 +kind: DaemonSet +metadata: + name: app-daemon +spec: + selector: + matchLabels: + name: app + template: + metadata: + labels: + name: app + spec: + containers: + - name: app + image: busybox + command: ["/bin/sh", "-c"] + args: ["tail -f /dev/null"] + volumeDevices: + - name: data + devicePath: /dev/xvda + volumes: + - name: data + persistentVolumeClaim: + claimName: block-claim +``` + +4. Verify the `DaemonSet` is running: +``` +$ kubectl get pods -A + +NAMESPACE NAME READY STATUS RESTARTS AGE +default app-daemon-9hdgw 1/1 Running 0 18s +default app-daemon-xm8zr 1/1 Running 0 18s +``` diff --git a/pkg/cloud/cloud.go b/pkg/cloud/cloud.go index 9dbd0ef9a..72252648c 100644 --- a/pkg/cloud/cloud.go +++ b/pkg/cloud/cloud.go @@ -195,6 +195,7 @@ type DiskOptions struct { OutpostArn string Encrypted bool BlockExpress bool + MultiAttachEnabled bool // KmsKeyID represents a fully qualified resource name to the key to use for encryption. // example: arn:aws:kms:us-east-1:012345678910:key/abcd1234-a123-456a-a12b-a123b4cd56ef KmsKeyID string @@ -345,6 +346,10 @@ func (c *cloud) CreateDisk(ctx context.Context, volumeName string, diskOptions * return nil, fmt.Errorf("invalid AWS VolumeType %q", diskOptions.VolumeType) } + if diskOptions.MultiAttachEnabled && createType != VolumeTypeIO2 { + return nil, fmt.Errorf("CreateDisk: multi-attach is only supported for io2 volumes") + } + if maxIops > 0 { if diskOptions.IOPS > 0 { requestedIops = int64(diskOptions.IOPS) @@ -381,11 +386,12 @@ func (c *cloud) CreateDisk(ctx context.Context, volumeName string, diskOptions * clientToken := sha256.Sum256([]byte(volumeName)) requestInput := &ec2.CreateVolumeInput{ - AvailabilityZone: aws.String(zone), - ClientToken: aws.String(hex.EncodeToString(clientToken[:])), - Size: aws.Int64(capacityGiB), - VolumeType: aws.String(createType), - Encrypted: aws.Bool(diskOptions.Encrypted), + AvailabilityZone: aws.String(zone), + ClientToken: aws.String(hex.EncodeToString(clientToken[:])), + Size: aws.Int64(capacityGiB), + VolumeType: aws.String(createType), + Encrypted: aws.Bool(diskOptions.Encrypted), + MultiAttachEnabled: aws.Bool(diskOptions.MultiAttachEnabled), } if !util.IsSBE(zone) { @@ -549,18 +555,12 @@ func (c *cloud) AttachDisk(ctx context.Context, volumeID, nodeID string) (string resp, attachErr := c.ec2.AttachVolumeWithContext(ctx, request) if attachErr != nil { - var awsErr awserr.Error - if errors.As(attachErr, &awsErr) { - if awsErr.Code() == "VolumeInUse" { - return "", ErrVolumeInUse - } - } return "", fmt.Errorf("could not attach volume %q to node %q: %w", volumeID, nodeID, attachErr) } klog.V(5).InfoS("[Debug] AttachVolume", "volumeID", volumeID, "nodeID", nodeID, "resp", resp) } - attachment, err := c.WaitForAttachmentState(ctx, volumeID, volumeAttachedState, *instance.InstanceId, device.Path, device.IsAlreadyAssigned) + _, err = c.WaitForAttachmentState(ctx, volumeID, volumeAttachedState, *instance.InstanceId, device.Path, device.IsAlreadyAssigned) // This is the only situation where we taint the device if err != nil { @@ -568,21 +568,6 @@ func (c *cloud) AttachDisk(ctx context.Context, volumeID, nodeID string) (string return "", err } - // Double check the attachment to be 100% sure we attached the correct volume at the correct mountpoint - // It could happen otherwise that we see the volume attached from a previous/separate AttachVolume call, - // which could theoretically be against a different device (or even instance). - if attachment == nil { - // Impossible? - return "", fmt.Errorf("unexpected state: attachment nil after attached %q to %q", volumeID, nodeID) - } - if device.Path != aws.StringValue(attachment.Device) { - // Already checked in waitForAttachmentState(), but just to be sure... - return "", fmt.Errorf("disk attachment of %q to %q failed: requested device %q but found %q", volumeID, nodeID, device.Path, aws.StringValue(attachment.Device)) - } - if *instance.InstanceId != aws.StringValue(attachment.InstanceId) { - return "", fmt.Errorf("disk attachment of %q to %q failed: requested instance %q but found %q", volumeID, nodeID, *instance.InstanceId, aws.StringValue(attachment.InstanceId)) - } - // TODO: Check volume capability matches for ALREADY_EXISTS // This could happen when request volume already attached to request node, // but is incompatible with the specified volume_capability or readonly flag @@ -674,40 +659,30 @@ func (c *cloud) WaitForAttachmentState(ctx context.Context, volumeID, expectedSt return false, nil } - if len(volume.Attachments) > 1 { - // Shouldn't happen; log so we know if it is + if volume.MultiAttachEnabled != nil && !*volume.MultiAttachEnabled && len(volume.Attachments) > 1 { klog.InfoS("Found multiple attachments for volume", "volumeID", volumeID, "volume", volume) + return false, fmt.Errorf("volume %q has multiple attachments", volumeID) } + attachmentState := "" + for _, a := range volume.Attachments { - if attachmentState != "" { - // Shouldn't happen; log so we know if it is - klog.InfoS("Found multiple attachments for volume", "volumeID", volumeID, "volume", volume) - } - if a.State != nil { - attachment = a - attachmentState = *a.State - } else { - // Shouldn't happen; log so we know if it is - klog.InfoS("Ignoring nil attachment state for volume", "volumeID", volumeID, "attachment", a) + if a.State != nil && a.InstanceId != nil { + if aws.StringValue(a.InstanceId) == expectedInstance { + attachmentState = aws.StringValue(a.State) + attachment = a + } } } + if attachmentState == "" { attachmentState = volumeDetachedState } - if attachment != nil { - // AWS eventual consistency can go back in time. - // For example, we're waiting for a volume to be attached as /dev/xvdba, but AWS can tell us it's - // attached as /dev/xvdbb, where it was attached before and it was already detached. - // Retry couple of times, hoping AWS starts reporting the right status. + + if attachment != nil && attachment.Device != nil && (expectedState == volumeAttachedState) { device := aws.StringValue(attachment.Device) - if expectedDevice != "" && device != "" && device != expectedDevice { - klog.InfoS("Expected device for volume not found", "expectedDevice", expectedDevice, "expectedState", expectedState, "volumeID", volumeID, "device", device, "attachmentState", attachmentState) - return false, nil - } - instanceID := aws.StringValue(attachment.InstanceId) - if expectedInstance != "" && instanceID != "" && instanceID != expectedInstance { - klog.InfoS("Expected instance for volume not found", "expectedInstance", expectedInstance, "expectedState", expectedState, "volumeID", volumeID, "instanceID", instanceID, "attachmentState", attachmentState) + if device != expectedDevice { + klog.InfoS("WaitForAttachmentState: device mismatch", "device", device, "expectedDevice", expectedDevice, "attachment", attachment) return false, nil } } diff --git a/pkg/cloud/cloud_test.go b/pkg/cloud/cloud_test.go index 8e4b89f7b..3a2c7ad6e 100644 --- a/pkg/cloud/cloud_test.go +++ b/pkg/cloud/cloud_test.go @@ -575,6 +575,43 @@ func TestCreateDisk(t *testing.T) { }, expErr: nil, }, + { + name: "success: multi-attach with IO2", + volumeName: "vol-test-name", + diskOptions: &DiskOptions{ + CapacityBytes: util.GiBToBytes(4), + Tags: map[string]string{VolumeNameTagKey: "vol-test", AwsEbsDriverTagKey: "true"}, + VolumeType: VolumeTypeIO2, + MultiAttachEnabled: true, + IOPSPerGB: 10000, + }, + expDisk: &Disk{ + VolumeID: "vol-test", + CapacityGiB: 4, + AvailabilityZone: defaultZone, + }, + expCreateVolumeInput: &ec2.CreateVolumeInput{ + Iops: aws.Int64(2000), + }, + expErr: nil, + }, + { + name: "failure: multi-attach with GP3", + volumeName: "vol-test-name", + diskOptions: &DiskOptions{ + CapacityBytes: util.GiBToBytes(4), + Tags: map[string]string{VolumeNameTagKey: "vol-test", AwsEbsDriverTagKey: "true"}, + VolumeType: VolumeTypeGP3, + MultiAttachEnabled: true, + IOPSPerGB: 10000, + }, + expDisk: &Disk{ + VolumeID: "vol-test", + CapacityGiB: 4, + AvailabilityZone: defaultZone, + }, + expErr: fmt.Errorf("CreateDisk: multi-attach is only supported for io2 volumes"), + }, } for _, tc := range testCases { @@ -715,9 +752,10 @@ func TestAttachDisk(t *testing.T) { name string volumeID string nodeID string + nodeID2 string path string expErr error - mockFunc func(*MockEC2API, context.Context, string, string, string, dm.DeviceManager) + mockFunc func(*MockEC2API, context.Context, string, string, string, string, dm.DeviceManager) }{ { name: "success: AttachVolume normal", @@ -725,14 +763,14 @@ func TestAttachDisk(t *testing.T) { nodeID: defaultNodeID, path: defaultPath, expErr: nil, - mockFunc: func(mockEC2 *MockEC2API, ctx context.Context, volumeID, nodeID, path string, dm dm.DeviceManager) { + mockFunc: func(mockEC2 *MockEC2API, ctx context.Context, volumeID, nodeID, nodeID2, path string, dm dm.DeviceManager) { volumeRequest := createVolumeRequest(volumeID) instanceRequest := createInstanceRequest(nodeID) attachRequest := createAttachRequest(volumeID, nodeID, path) gomock.InOrder( mockEC2.EXPECT().DescribeInstancesWithContext(ctx, instanceRequest).Return(newDescribeInstancesOutput(nodeID), nil), - mockEC2.EXPECT().AttachVolumeWithContext(ctx, attachRequest).Return(createAttachVolumeOutput(volumeID, nodeID, path, "attached"), nil), + mockEC2.EXPECT().AttachVolumeWithContext(ctx, attachRequest).Return(createAttachVolumeOutput(volumeID, nodeID, path), nil), mockEC2.EXPECT().DescribeVolumesWithContext(ctx, volumeRequest).Return(createDescribeVolumesOutput(volumeID, nodeID, path, "attached"), nil), ) }, @@ -743,7 +781,7 @@ func TestAttachDisk(t *testing.T) { nodeID: defaultNodeID, path: defaultPath, expErr: nil, - mockFunc: func(mockEC2 *MockEC2API, ctx context.Context, volumeID, nodeID, path string, dm dm.DeviceManager) { + mockFunc: func(mockEC2 *MockEC2API, ctx context.Context, volumeID, nodeID, nodeID2, path string, dm dm.DeviceManager) { volumeRequest := createVolumeRequest(volumeID) instanceRequest := createInstanceRequest(nodeID) @@ -762,7 +800,7 @@ func TestAttachDisk(t *testing.T) { nodeID: defaultNodeID, path: defaultPath, expErr: fmt.Errorf("could not attach volume %q to node %q: %w", defaultVolumeID, defaultNodeID, errors.New("AttachVolume error")), - mockFunc: func(mockEC2 *MockEC2API, ctx context.Context, volumeID, nodeID, path string, dm dm.DeviceManager) { + mockFunc: func(mockEC2 *MockEC2API, ctx context.Context, volumeID, nodeID, nodeID2, path string, dm dm.DeviceManager) { instanceRequest := createInstanceRequest(nodeID) attachRequest := createAttachRequest(volumeID, nodeID, path) @@ -778,7 +816,7 @@ func TestAttachDisk(t *testing.T) { nodeID: defaultNodeID, path: defaultPath, expErr: fmt.Errorf("could not attach volume %q to node %q: %w", defaultVolumeID, defaultNodeID, ErrVolumeInUse), - mockFunc: func(mockEC2 *MockEC2API, ctx context.Context, volumeID, nodeID, path string, dm dm.DeviceManager) { + mockFunc: func(mockEC2 *MockEC2API, ctx context.Context, volumeID, nodeID, nodeID2, path string, dm dm.DeviceManager) { instanceRequest := createInstanceRequest(nodeID) attachRequest := createAttachRequest(volumeID, nodeID, path) @@ -788,6 +826,52 @@ func TestAttachDisk(t *testing.T) { ) }, }, + { + name: "success: AttachVolume multi-attach", + volumeID: defaultVolumeID, + nodeID: defaultNodeID, + nodeID2: "node-1239", + path: defaultPath, + expErr: nil, + mockFunc: func(mockEC2 *MockEC2API, ctx context.Context, volumeID, nodeID, nodeID2, path string, dm dm.DeviceManager) { + volumeRequest := createVolumeRequest(volumeID) + instanceRequest := createInstanceRequest(nodeID) + attachRequest := createAttachRequest(volumeID, nodeID, path) + + createInstanceRequest2 := createInstanceRequest(nodeID2) + attachRequest2 := createAttachRequest(volumeID, nodeID2, path) + + dvOutput := &ec2.DescribeVolumesOutput{ + Volumes: []*ec2.Volume{ + { + VolumeId: aws.String(volumeID), + Attachments: []*ec2.VolumeAttachment{ + { + Device: aws.String(path), + InstanceId: aws.String(nodeID), + State: aws.String("attached"), + }, + { + Device: aws.String(path), + InstanceId: aws.String(nodeID2), + State: aws.String("attached"), + }, + }, + }, + }, + } + + gomock.InOrder( + mockEC2.EXPECT().DescribeInstancesWithContext(ctx, instanceRequest).Return(newDescribeInstancesOutput(nodeID), nil), + mockEC2.EXPECT().AttachVolumeWithContext(ctx, attachRequest).Return(createAttachVolumeOutput(volumeID, nodeID, path), nil), + mockEC2.EXPECT().DescribeVolumesWithContext(ctx, volumeRequest).Return(createDescribeVolumesOutput(volumeID, nodeID, path, "attached"), nil), + + mockEC2.EXPECT().DescribeInstancesWithContext(ctx, createInstanceRequest2).Return(newDescribeInstancesOutput(nodeID2), nil), + mockEC2.EXPECT().AttachVolumeWithContext(ctx, attachRequest2).Return(createAttachVolumeOutput(volumeID, nodeID2, path), nil), + mockEC2.EXPECT().DescribeVolumesWithContext(ctx, volumeRequest).Return(dvOutput, nil), + ) + }, + }, } for _, tc := range testCases { @@ -799,7 +883,7 @@ func TestAttachDisk(t *testing.T) { ctx := context.Background() dm := c.(*cloud).dm - tc.mockFunc(mockEC2, ctx, tc.volumeID, tc.nodeID, tc.path, dm) + tc.mockFunc(mockEC2, ctx, tc.volumeID, tc.nodeID, tc.nodeID2, tc.path, dm) devicePath, err := c.AttachDisk(ctx, tc.volumeID, tc.nodeID) @@ -811,6 +895,12 @@ func TestAttachDisk(t *testing.T) { assert.Equal(t, tc.path, devicePath) } + if tc.nodeID2 != "" { + devicePath, err := c.AttachDisk(ctx, tc.volumeID, tc.nodeID2) + assert.NoError(t, err) + assert.Equal(t, tc.path, devicePath) + } + mockCtrl.Finish() }) } @@ -1972,6 +2062,15 @@ func TestWaitForAttachmentState(t *testing.T) { alreadyAssigned: false, expectError: false, }, + { + name: "success: multiple attachments with Multi-Attach enabled", + volumeID: "vol-test-1234", + expectedState: volumeAttachedState, + expectedInstance: "1234", + expectedDevice: defaultPath, + alreadyAssigned: false, + expectError: false, + }, { name: "failure: disk not found, expected attached", volumeID: "vol-test-1234", @@ -2009,25 +2108,7 @@ func TestWaitForAttachmentState(t *testing.T) { expectError: true, }, { - name: "success: multiple attachments", - volumeID: "vol-test-1234", - expectedState: volumeAttachedState, - expectedInstance: "1234", - expectedDevice: defaultPath, - alreadyAssigned: false, - expectError: false, - }, - { - name: "failure: disk still attaching", - volumeID: "vol-test-1234", - expectedState: volumeAttachedState, - expectedInstance: "1234", - expectedDevice: defaultPath, - alreadyAssigned: false, - expectError: true, - }, - { - name: "failure: context cancelled", + name: "failure: multiple attachments with Multi-Attach disabled", volumeID: "vol-test-1234", expectedState: volumeAttachedState, expectedInstance: "1234", @@ -2061,8 +2142,9 @@ func TestWaitForAttachmentState(t *testing.T) { } multipleAttachmentsVol := &ec2.Volume{ - VolumeId: aws.String(tc.volumeID), - Attachments: []*ec2.VolumeAttachment{{Device: aws.String(defaultPath), InstanceId: aws.String("1235"), State: aws.String("attached")}, {Device: aws.String(defaultPath), InstanceId: aws.String("1234"), State: aws.String("attached")}}, + VolumeId: aws.String(tc.volumeID), + Attachments: []*ec2.VolumeAttachment{{Device: aws.String(defaultPath), InstanceId: aws.String("1235"), State: aws.String("attached")}, {Device: aws.String(defaultPath), InstanceId: aws.String("1234"), State: aws.String("attached")}}, + MultiAttachEnabled: aws.Bool(false), } ctx, cancel := context.WithCancel(context.Background()) @@ -2073,7 +2155,10 @@ func TestWaitForAttachmentState(t *testing.T) { mockEC2.EXPECT().DescribeVolumesWithContext(gomock.Any(), gomock.Any()).Return(&ec2.DescribeVolumesOutput{Volumes: []*ec2.Volume{detachedVol}}, nil).AnyTimes() case "success: disk not found, assumed detached", "failure: disk not found, expected attached": mockEC2.EXPECT().DescribeVolumesWithContext(gomock.Any(), gomock.Any()).Return(nil, awserr.New("InvalidVolume.NotFound", "foo", fmt.Errorf(""))).AnyTimes() - case "success: multiple attachments": + case "success: multiple attachments with Multi-Attach enabled": + multipleAttachmentsVol.MultiAttachEnabled = aws.Bool(true) + mockEC2.EXPECT().DescribeVolumesWithContext(gomock.Any(), gomock.Any()).Return(&ec2.DescribeVolumesOutput{Volumes: []*ec2.Volume{multipleAttachmentsVol}}, nil).AnyTimes() + case "failure: multiple attachments with Multi-Attach disabled": mockEC2.EXPECT().DescribeVolumesWithContext(gomock.Any(), gomock.Any()).Return(&ec2.DescribeVolumesOutput{Volumes: []*ec2.Volume{multipleAttachmentsVol}}, nil).AnyTimes() case "failure: disk still attaching": mockEC2.EXPECT().DescribeVolumesWithContext(gomock.Any(), gomock.Any()).Return(&ec2.DescribeVolumesOutput{Volumes: []*ec2.Volume{attachingVol}}, nil).AnyTimes() @@ -2204,12 +2289,12 @@ func createDescribeVolumesOutput(volumeID, nodeID, path, state string) *ec2.Desc } } -func createAttachVolumeOutput(volumeID, nodeID, path, state string) *ec2.VolumeAttachment { +func createAttachVolumeOutput(volumeID, nodeID, path string) *ec2.VolumeAttachment { return &ec2.VolumeAttachment{ VolumeId: aws.String(volumeID), Device: aws.String(path), InstanceId: aws.String(nodeID), - State: aws.String(state), + State: aws.String("attached"), } } diff --git a/pkg/driver/controller.go b/pkg/driver/controller.go index cff2eb35a..8441b4456 100644 --- a/pkg/driver/controller.go +++ b/pkg/driver/controller.go @@ -37,16 +37,13 @@ import ( "k8s.io/klog/v2" ) -var ( - // volumeCaps represents how the volume could be accessed. - // It is SINGLE_NODE_WRITER since EBS volume could only be - // attached to a single node at any given time. - volumeCaps = []csi.VolumeCapability_AccessMode{ - { - Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER, - }, - } +// Supported access modes +const ( + SingleNodeWriter = csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER + MultiNodeMultiWriter = csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER +) +var ( // controllerCaps represents the capability of controller service controllerCaps = []csi.ControllerServiceCapability_RPC_Type{ csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME, @@ -115,13 +112,22 @@ func (d *controllerService) CreateVolume(ctx context.Context, req *csi.CreateVol return nil, err } volName := req.GetName() + volCap := req.GetVolumeCapabilities() + + multiAttach := false + for _, c := range volCap { + if c.GetAccessMode().GetMode() == MultiNodeMultiWriter && isBlock(c) { + klog.V(4).InfoS("CreateVolume: multi-attach is enabled", "volumeID", volName) + multiAttach = true + } + } // check if a request is already in-flight - if ok := d.inFlight.Insert(volName); !ok { + if ok := d.inFlight.Insert(volName, ""); !ok { msg := fmt.Sprintf("Create volume request for %s is already in progress", volName) return nil, status.Error(codes.Aborted, msg) } - defer d.inFlight.Delete(volName) + defer d.inFlight.Delete(volName, "") var ( volumeType string @@ -231,37 +237,37 @@ func (d *controllerService) CreateVolume(ctx context.Context, req *csi.CreateVol if len(blockSize) > 0 { responseCtx[BlockSizeKey] = blockSize - if err = validateVolumeCapabilities(req.GetVolumeCapabilities(), BlockSizeKey, FileSystemConfigs); err != nil { + if err = validateFormattingOption(volCap, BlockSizeKey, FileSystemConfigs); err != nil { return nil, err } } if len(inodeSize) > 0 { responseCtx[InodeSizeKey] = inodeSize - if err = validateVolumeCapabilities(req.GetVolumeCapabilities(), InodeSizeKey, FileSystemConfigs); err != nil { + if err = validateFormattingOption(volCap, InodeSizeKey, FileSystemConfigs); err != nil { return nil, err } } if len(bytesPerInode) > 0 { responseCtx[BytesPerInodeKey] = bytesPerInode - if err = validateVolumeCapabilities(req.GetVolumeCapabilities(), BytesPerInodeKey, FileSystemConfigs); err != nil { + if err = validateFormattingOption(volCap, BytesPerInodeKey, FileSystemConfigs); err != nil { return nil, err } } if len(numberOfInodes) > 0 { responseCtx[NumberOfInodesKey] = numberOfInodes - if err = validateVolumeCapabilities(req.GetVolumeCapabilities(), NumberOfInodesKey, FileSystemConfigs); err != nil { + if err = validateFormattingOption(volCap, NumberOfInodesKey, FileSystemConfigs); err != nil { return nil, err } } if ext4BigAlloc { responseCtx[Ext4BigAllocKey] = "true" - if err = validateVolumeCapabilities(req.GetVolumeCapabilities(), Ext4BigAllocKey, FileSystemConfigs); err != nil { + if err = validateFormattingOption(volCap, Ext4BigAllocKey, FileSystemConfigs); err != nil { return nil, err } } if len(ext4ClusterSize) > 0 { responseCtx[Ext4ClusterSizeKey] = ext4ClusterSize - if err = validateVolumeCapabilities(req.GetVolumeCapabilities(), Ext4ClusterSizeKey, FileSystemConfigs); err != nil { + if err = validateFormattingOption(volCap, Ext4ClusterSizeKey, FileSystemConfigs); err != nil { return nil, err } } @@ -329,6 +335,7 @@ func (d *controllerService) CreateVolume(ctx context.Context, req *csi.CreateVol BlockExpress: blockExpress, KmsKeyID: kmsKeyID, SnapshotID: snapshotID, + MultiAttachEnabled: multiAttach, } disk, err := d.cloud.CreateDisk(ctx, volName, opts) @@ -357,10 +364,7 @@ func validateCreateVolumeRequest(req *csi.CreateVolumeRequest) error { } if !isValidVolumeCapabilities(volCaps) { - modes := util.GetAccessModes(volCaps) - stringModes := strings.Join(*modes, ", ") - errString := "Volume capabilities " + stringModes + " not supported. Only AccessModes[ReadWriteOnce] supported." - return status.Error(codes.InvalidArgument, errString) + return status.Error(codes.InvalidArgument, "Volume capabilities not supported") } return nil } @@ -372,13 +376,12 @@ func (d *controllerService) DeleteVolume(ctx context.Context, req *csi.DeleteVol } volumeID := req.GetVolumeId() - // check if a request is already in-flight - if ok := d.inFlight.Insert(volumeID); !ok { + if ok := d.inFlight.Insert(volumeID, ""); !ok { msg := fmt.Sprintf(internal.VolumeOperationAlreadyExistsErrorMsg, volumeID) return nil, status.Error(codes.Aborted, msg) } - defer d.inFlight.Delete(volumeID) + defer d.inFlight.Delete(volumeID, "") if _, err := d.cloud.DeleteDisk(ctx, volumeID); err != nil { if errors.Is(err, cloud.ErrNotFound) { @@ -407,17 +410,14 @@ func (d *controllerService) ControllerPublishVolume(ctx context.Context, req *cs volumeID := req.GetVolumeId() nodeID := req.GetNodeId() - if !d.inFlight.Insert(volumeID) { + if !d.inFlight.Insert(volumeID, nodeID) { return nil, status.Error(codes.Aborted, fmt.Sprintf(internal.VolumeOperationAlreadyExistsErrorMsg, volumeID)) } - defer d.inFlight.Delete(volumeID) + defer d.inFlight.Delete(volumeID, nodeID) klog.V(2).InfoS("ControllerPublishVolume: attaching", "volumeID", volumeID, "nodeID", nodeID) devicePath, err := d.cloud.AttachDisk(ctx, volumeID, nodeID) if err != nil { - if errors.Is(err, cloud.ErrVolumeInUse) { - return nil, status.Errorf(codes.FailedPrecondition, "Volume %q is already attached to a different node, expected node: %q", volumeID, nodeID) - } return nil, status.Errorf(codes.Internal, "Could not attach volume %q to node %q: %v", volumeID, nodeID, err) } klog.V(2).InfoS("ControllerPublishVolume: attached", "volumeID", volumeID, "nodeID", nodeID, "devicePath", devicePath) @@ -440,12 +440,8 @@ func validateControllerPublishVolumeRequest(req *csi.ControllerPublishVolumeRequ return status.Error(codes.InvalidArgument, "Volume capability not provided") } - caps := []*csi.VolumeCapability{volCap} - if !isValidVolumeCapabilities(caps) { - modes := util.GetAccessModes(caps) - stringModes := strings.Join(*modes, ", ") - errString := "Volume capabilities " + stringModes + " not supported. Only AccessModes[ReadWriteOnce] supported." - return status.Error(codes.InvalidArgument, errString) + if !isValidCapability(volCap) { + return status.Error(codes.InvalidArgument, "Volume capability not supported") } return nil } @@ -459,10 +455,10 @@ func (d *controllerService) ControllerUnpublishVolume(ctx context.Context, req * volumeID := req.GetVolumeId() nodeID := req.GetNodeId() - if !d.inFlight.Insert(volumeID) { + if !d.inFlight.Insert(volumeID, nodeID) { return nil, status.Error(codes.Aborted, fmt.Sprintf(internal.VolumeOperationAlreadyExistsErrorMsg, volumeID)) } - defer d.inFlight.Delete(volumeID) + defer d.inFlight.Delete(volumeID, nodeID) klog.V(2).InfoS("ControllerUnpublishVolume: detaching", "volumeID", volumeID, "nodeID", nodeID) if err := d.cloud.DetachDisk(ctx, volumeID, nodeID); err != nil { @@ -601,23 +597,40 @@ func (d *controllerService) ControllerGetVolume(ctx context.Context, req *csi.Co return nil, status.Error(codes.Unimplemented, "") } -func isValidVolumeCapabilities(volCaps []*csi.VolumeCapability) bool { - hasSupport := func(cap *csi.VolumeCapability) bool { - for _, c := range volumeCaps { - if c.GetMode() == cap.AccessMode.GetMode() { - return true - } +func isValidVolumeCapabilities(v []*csi.VolumeCapability) bool { + for _, c := range v { + if !isValidCapability(c) { + return false } - return false } + return true +} - foundAll := true - for _, c := range volCaps { - if !hasSupport(c) { - foundAll = false +func isValidCapability(c *csi.VolumeCapability) bool { + accessMode := c.GetAccessMode().GetMode() + + //nolint:exhaustive + switch accessMode { + case SingleNodeWriter: + return true + + case MultiNodeMultiWriter: + if isBlock(c) { + return true + } else { + klog.InfoS("isValidCapability: access mode is only supported for block devices", "accessMode", accessMode) + return false } + + default: + klog.InfoS("isValidCapability: access mode is not supported", "accessMode", accessMode) + return false } - return foundAll +} + +func isBlock(cap *csi.VolumeCapability) bool { + _, isBlock := cap.GetAccessType().(*csi.VolumeCapability_Block) + return isBlock } func isValidVolumeContext(volContext map[string]string) bool { @@ -647,11 +660,11 @@ func (d *controllerService) CreateSnapshot(ctx context.Context, req *csi.CreateS volumeID := req.GetSourceVolumeId() // check if a request is already in-flight - if ok := d.inFlight.Insert(snapshotName); !ok { + if ok := d.inFlight.Insert(snapshotName, ""); !ok { msg := fmt.Sprintf(internal.VolumeOperationAlreadyExistsErrorMsg, snapshotName) return nil, status.Error(codes.Aborted, msg) } - defer d.inFlight.Delete(snapshotName) + defer d.inFlight.Delete(snapshotName, "") snapshot, err := d.cloud.GetSnapshotByName(ctx, snapshotName) if err != nil && !errors.Is(err, cloud.ErrNotFound) { @@ -772,11 +785,11 @@ func (d *controllerService) DeleteSnapshot(ctx context.Context, req *csi.DeleteS snapshotID := req.GetSnapshotId() // check if a request is already in-flight - if ok := d.inFlight.Insert(snapshotID); !ok { + if ok := d.inFlight.Insert(snapshotID, ""); !ok { msg := fmt.Sprintf("DeleteSnapshot for Snapshot %s is already in progress", snapshotID) return nil, status.Error(codes.Aborted, msg) } - defer d.inFlight.Delete(snapshotID) + defer d.inFlight.Delete(snapshotID, "") if _, err := d.cloud.DeleteSnapshot(ctx, snapshotID); err != nil { if errors.Is(err, cloud.ErrNotFound) { @@ -1006,7 +1019,7 @@ func BuildOutpostArn(segments map[string]string) string { ) } -func validateVolumeCapabilities(volumeCapabilities []*csi.VolumeCapability, paramName string, fsConfigs map[string]fileSystemConfig) error { +func validateFormattingOption(volumeCapabilities []*csi.VolumeCapability, paramName string, fsConfigs map[string]fileSystemConfig) error { for _, volCap := range volumeCapabilities { switch volCap.GetAccessType().(type) { case *csi.VolumeCapability_Block: diff --git a/pkg/driver/controller_test.go b/pkg/driver/controller_test.go index dbf113e4a..dae5760d4 100644 --- a/pkg/driver/controller_test.go +++ b/pkg/driver/controller_test.go @@ -165,6 +165,26 @@ func TestCreateVolume(t *testing.T) { }, }, } + multiAttachVolCap := []*csi.VolumeCapability{ + { + AccessType: &csi.VolumeCapability_Block{ + Block: &csi.VolumeCapability_BlockVolume{}, + }, + AccessMode: &csi.VolumeCapability_AccessMode{ + Mode: csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER, + }, + }, + } + invalidMultiAttachVolCap := []*csi.VolumeCapability{ + { + AccessType: &csi.VolumeCapability_Mount{ + Mount: &csi.VolumeCapability_MountVolume{}, + }, + AccessMode: &csi.VolumeCapability_AccessMode{ + Mode: csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER, + }, + }, + } stdVolSize := int64(5 * 1024 * 1024 * 1024) stdCapRange := &csi.CapacityRange{RequiredBytes: stdVolSize} stdParams := map[string]string{} @@ -1593,8 +1613,8 @@ func TestCreateVolume(t *testing.T) { mockCloud := cloud.NewMockCloud(mockCtl) inFlight := internal.NewInFlight() - inFlight.Insert(req.GetName()) - defer inFlight.Delete(req.GetName()) + inFlight.Insert(req.GetName(), "") + defer inFlight.Delete(req.GetName(), "") awsDriver := controllerService{ cloud: mockCloud, @@ -1634,8 +1654,81 @@ func TestCreateVolume(t *testing.T) { checkExpectedErrorCode(t, err, codes.AlreadyExists) }, }, - } + { + name: "success multi-attach", + testFunc: func(t *testing.T) { + req := &csi.CreateVolumeRequest{ + Name: "random-vol-name", + CapacityRange: stdCapRange, + VolumeCapabilities: multiAttachVolCap, + Parameters: nil, + } + + ctx := context.Background() + + mockDisk := &cloud.Disk{ + VolumeID: req.Name, + AvailabilityZone: expZone, + CapacityGiB: util.BytesToGiB(stdVolSize), + } + + mockCtl := gomock.NewController(t) + defer mockCtl.Finish() + + mockCloud := cloud.NewMockCloud(mockCtl) + mockCloud.EXPECT().CreateDisk(gomock.Eq(ctx), gomock.Eq(req.Name), gomock.Any()).Return(mockDisk, nil) + + awsDriver := controllerService{ + cloud: mockCloud, + inFlight: internal.NewInFlight(), + driverOptions: &DriverOptions{}, + } + + if _, err := awsDriver.CreateVolume(ctx, req); err != nil { + srvErr, ok := status.FromError(err) + if !ok { + t.Fatalf("Could not get error status code from error: %v", srvErr) + } + t.Fatalf("Unexpected error: %v", srvErr.Code()) + } + }, + }, + { + name: "fail multi-attach - invalid mount capability", + testFunc: func(t *testing.T) { + req := &csi.CreateVolumeRequest{ + Name: "random-vol-name", + CapacityRange: stdCapRange, + VolumeCapabilities: invalidMultiAttachVolCap, + } + + ctx := context.Background() + + mockCtl := gomock.NewController(t) + defer mockCtl.Finish() + + mockCloud := cloud.NewMockCloud(mockCtl) + + awsDriver := controllerService{ + cloud: mockCloud, + inFlight: internal.NewInFlight(), + driverOptions: &DriverOptions{}, + } + _, err := awsDriver.CreateVolume(ctx, req) + if err == nil { + t.Fatalf("Expected CreateVolume to fail but got no error") + } + srvErr, ok := status.FromError(err) + if !ok { + t.Fatalf("Could not get error status code from error: %v", srvErr) + } + if srvErr.Code() != codes.InvalidArgument { + t.Fatalf("Expect InvalidArgument but got: %s", srvErr.Code()) + } + }, + }, + } for _, tc := range testCases { t.Run(tc.name, tc.testFunc) } @@ -1924,8 +2017,8 @@ func TestDeleteVolume(t *testing.T) { mockCloud := cloud.NewMockCloud(mockCtl) inFlight := internal.NewInFlight() - inFlight.Insert(req.GetVolumeId()) - defer inFlight.Delete(req.GetVolumeId()) + inFlight.Insert(req.GetVolumeId(), "") + defer inFlight.Delete(req.GetVolumeId(), "") awsDriver := controllerService{ cloud: mockCloud, inFlight: inFlight, @@ -2496,8 +2589,8 @@ func TestCreateSnapshot(t *testing.T) { mockCloud := cloud.NewMockCloud(mockCtl) inFlight := internal.NewInFlight() - inFlight.Insert(req.GetName()) - defer inFlight.Delete(req.GetName()) + inFlight.Insert(req.GetName(), "") + defer inFlight.Delete(req.GetName(), "") awsDriver := controllerService{ cloud: mockCloud, @@ -2983,8 +3076,8 @@ func TestDeleteSnapshot(t *testing.T) { SnapshotId: "test-snapshotID", } inFlight := internal.NewInFlight() - inFlight.Insert(req.GetSnapshotId()) - defer inFlight.Delete(req.GetSnapshotId()) + inFlight.Insert(req.GetSnapshotId(), "") + defer inFlight.Delete(req.GetSnapshotId(), "") awsDriver := controllerService{ cloud: mockCloud, @@ -3327,16 +3420,6 @@ func TestControllerPublishVolume(t *testing.T) { }, errorCode: codes.Internal, }, - { - name: "Fail when volume is already attached to another node", - volumeId: "vol-test", - nodeId: expInstanceID, - volumeCapability: stdVolCap, - mockAttach: func(mockCloud *cloud.MockCloud, ctx context.Context, volumeId string, nodeId string) { - mockCloud.EXPECT().AttachDisk(gomock.Eq(ctx), gomock.Eq(volumeId), gomock.Eq(expInstanceID)).Return("", (cloud.ErrVolumeInUse)) - }, - errorCode: codes.FailedPrecondition, - }, { name: "Aborted error when AttachDisk operation already in-flight", volumeId: "vol-test", @@ -3346,7 +3429,7 @@ func TestControllerPublishVolume(t *testing.T) { }, errorCode: codes.Aborted, setupFunc: func(controllerService *controllerService) { - controllerService.inFlight.Insert("vol-test") + controllerService.inFlight.Insert("vol-test", expInstanceID) }, }, } @@ -3442,7 +3525,7 @@ func TestControllerUnpublishVolume(t *testing.T) { nodeId: expInstanceID, errorCode: codes.Aborted, setupFunc: func(driver *controllerService) { - driver.inFlight.Insert("vol-test") + driver.inFlight.Insert("vol-test", expInstanceID) }, }, } diff --git a/pkg/driver/internal/inflight.go b/pkg/driver/internal/inflight.go index 31b76cd58..c44c32d40 100644 --- a/pkg/driver/internal/inflight.go +++ b/pkg/driver/internal/inflight.go @@ -35,7 +35,7 @@ const ( VolumeOperationAlreadyExistsErrorMsg = "An operation with the given Volume %s already exists" ) -// InFlight is a struct used to manage in flight requests per volumeId. +// InFlight is a struct used to manage in flight requests for a unique identifier. type InFlight struct { mux *sync.Mutex inFlight map[string]bool @@ -49,27 +49,28 @@ func NewInFlight() *InFlight { } } -// Insert inserts the entry to the current list of inflight request key is volumeId for node and req hash for controller . +// Insert inserts the entry to the current list of inflight, request key is a unique identifier. +// extra is an optional parameter used to ensure uniqueness of the key. // Returns false when the key already exists. -func (db *InFlight) Insert(key string) bool { +func (db *InFlight) Insert(key, extra string) bool { db.mux.Lock() defer db.mux.Unlock() - _, ok := db.inFlight[key] + _, ok := db.inFlight[key+extra] if ok { return false } - db.inFlight[key] = true + db.inFlight[key+extra] = true return true } // Delete removes the entry from the inFlight entries map. // It doesn't return anything, and will do nothing if the specified key doesn't exist. -func (db *InFlight) Delete(key string) { +func (db *InFlight) Delete(key, extra string) { db.mux.Lock() defer db.mux.Unlock() - delete(db.inFlight, key) - klog.V(4).InfoS("Node Service: volume operation finished", "key", key) + delete(db.inFlight, key+extra) + klog.V(4).InfoS("Node Service: volume operation finished", "key", key+extra) } diff --git a/pkg/driver/internal/inflight_test.go b/pkg/driver/internal/inflight_test.go index faaeb74c1..c88eb9e04 100644 --- a/pkg/driver/internal/inflight_test.go +++ b/pkg/driver/internal/inflight_test.go @@ -22,6 +22,7 @@ import ( type testRequest struct { volumeId string + extra string expResp bool delete bool } @@ -73,15 +74,18 @@ func TestInFlight(t *testing.T) { requests: []testRequest{ { volumeId: "random-vol-name", + extra: "random-node-id", expResp: true, }, { volumeId: "random-vol-name", + extra: "random-node-id", expResp: false, delete: true, }, { volumeId: "random-vol-name", + extra: "random-node-id", expResp: true, }, }, @@ -94,9 +98,9 @@ func TestInFlight(t *testing.T) { for _, r := range tc.requests { var resp bool if r.delete { - db.Delete(r.volumeId) + db.Delete(r.volumeId, r.extra) } else { - resp = db.Insert(r.volumeId) + resp = db.Insert(r.volumeId, r.extra) } if r.expResp != resp { t.Fatalf("expected insert to be %+v, got %+v", r.expResp, resp) diff --git a/pkg/driver/node.go b/pkg/driver/node.go index ba5c326a8..1a6ae6ab6 100644 --- a/pkg/driver/node.go +++ b/pkg/driver/node.go @@ -184,12 +184,12 @@ func (d *nodeService) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol mountOptions := collectMountOptions(fsType, mountVolume.MountFlags) - if ok = d.inFlight.Insert(volumeID); !ok { + if ok = d.inFlight.Insert(volumeID, target); !ok { return nil, status.Errorf(codes.Aborted, VolumeOperationAlreadyExists, volumeID) } defer func() { klog.V(4).InfoS("NodeStageVolume: volume operation finished", "volumeID", volumeID) - d.inFlight.Delete(volumeID) + d.inFlight.Delete(volumeID, target) }() devicePath, ok := req.PublishContext[DevicePathKey] @@ -310,12 +310,12 @@ func (d *nodeService) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag return nil, status.Error(codes.InvalidArgument, "Staging target not provided") } - if ok := d.inFlight.Insert(volumeID); !ok { + if ok := d.inFlight.Insert(volumeID, target); !ok { return nil, status.Errorf(codes.Aborted, VolumeOperationAlreadyExists, volumeID) } defer func() { klog.V(4).InfoS("NodeUnStageVolume: volume operation finished", "volumeID", volumeID) - d.inFlight.Delete(volumeID) + d.inFlight.Delete(volumeID, target) }() // Check if target directory is a mount point. GetDeviceNameFromMount @@ -443,12 +443,12 @@ func (d *nodeService) NodePublishVolume(ctx context.Context, req *csi.NodePublis return nil, status.Error(codes.InvalidArgument, "Volume capability not supported") } - if ok := d.inFlight.Insert(volumeID); !ok { + if ok := d.inFlight.Insert(volumeID, target); !ok { return nil, status.Errorf(codes.Aborted, VolumeOperationAlreadyExists, volumeID) } defer func() { klog.V(4).InfoS("NodePublishVolume: volume operation finished", "volumeId", volumeID) - d.inFlight.Delete(volumeID) + d.inFlight.Delete(volumeID, target) }() mountOptions := []string{"bind"} @@ -481,12 +481,12 @@ func (d *nodeService) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu if len(target) == 0 { return nil, status.Error(codes.InvalidArgument, "Target path not provided") } - if ok := d.inFlight.Insert(volumeID); !ok { + if ok := d.inFlight.Insert(volumeID, target); !ok { return nil, status.Errorf(codes.Aborted, VolumeOperationAlreadyExists, volumeID) } defer func() { klog.V(4).InfoS("NodeUnPublishVolume: volume operation finished", "volumeId", volumeID) - d.inFlight.Delete(volumeID) + d.inFlight.Delete(volumeID, target) }() klog.V(4).InfoS("NodeUnpublishVolume: unmounting", "target", target) diff --git a/pkg/driver/node_test.go b/pkg/driver/node_test.go index a8be6083c..679c6d923 100644 --- a/pkg/driver/node_test.go +++ b/pkg/driver/node_test.go @@ -453,7 +453,7 @@ func TestNodeStageVolume(t *testing.T) { VolumeId: volumeID, }, inFlightFunc: func(inFlight *internal.InFlight) *internal.InFlight { - inFlight.Insert(volumeID) + inFlight.Insert(volumeID, targetPath) return inFlight }, expectedCode: codes.Aborted, @@ -693,7 +693,7 @@ func TestNodeUnstageVolume(t *testing.T) { VolumeId: volumeID, } - awsDriver.inFlight.Insert(volumeID) + awsDriver.inFlight.Insert(volumeID, targetPath) _, err := awsDriver.NodeUnstageVolume(context.TODO(), req) expectErr(t, err, codes.Aborted) }, @@ -1580,7 +1580,7 @@ func TestNodePublishVolume(t *testing.T) { }, }, } - awsDriver.inFlight.Insert(volumeID) + awsDriver.inFlight.Insert(volumeID, "/test/target/path") _, err := awsDriver.NodePublishVolume(context.TODO(), req) expectErr(t, err, codes.Aborted) @@ -1817,7 +1817,7 @@ func TestNodeUnpublishVolume(t *testing.T) { VolumeId: volumeID, } - awsDriver.inFlight.Insert(volumeID) + awsDriver.inFlight.Insert(volumeID, targetPath) _, err := awsDriver.NodeUnpublishVolume(context.TODO(), req) expectErr(t, err, codes.Aborted) }, diff --git a/tests/e2e/driver/driver.go b/tests/e2e/driver/driver.go index 7f1cbc076..fab162448 100644 --- a/tests/e2e/driver/driver.go +++ b/tests/e2e/driver/driver.go @@ -41,7 +41,7 @@ type DynamicPVTestDriver interface { // PreProvisionedVolumeTestDriver represents an interface for a CSI driver that supports pre-provisioned volume type PreProvisionedVolumeTestDriver interface { // GetPersistentVolume returns a PersistentVolume with pre-provisioned volumeHandle - GetPersistentVolume(volumeID string, fsType string, size string, reclaimPolicy *v1.PersistentVolumeReclaimPolicy, namespace string) *v1.PersistentVolume + GetPersistentVolume(volumeID string, fsType string, size string, reclaimPolicy *v1.PersistentVolumeReclaimPolicy, namespace string, accessMode v1.PersistentVolumeAccessMode, volumeMode v1.PersistentVolumeMode) *v1.PersistentVolume } type VolumeSnapshotTestDriver interface { diff --git a/tests/e2e/driver/ebs_csi_driver.go b/tests/e2e/driver/ebs_csi_driver.go index 9bdca76bc..ac34277f7 100644 --- a/tests/e2e/driver/ebs_csi_driver.go +++ b/tests/e2e/driver/ebs_csi_driver.go @@ -68,7 +68,7 @@ func (d *ebsCSIDriver) GetVolumeSnapshotClass(namespace string) *volumesnapshotv return getVolumeSnapshotClass(generateName, provisioner) } -func (d *ebsCSIDriver) GetPersistentVolume(volumeID string, fsType string, size string, reclaimPolicy *v1.PersistentVolumeReclaimPolicy, namespace string) *v1.PersistentVolume { +func (d *ebsCSIDriver) GetPersistentVolume(volumeID string, fsType string, size string, reclaimPolicy *v1.PersistentVolumeReclaimPolicy, namespace string, accessMode v1.PersistentVolumeAccessMode, volumeMode v1.PersistentVolumeMode) *v1.PersistentVolume { provisioner := d.driverName generateName := fmt.Sprintf("%s-%s-preprovsioned-pv-", namespace, provisioner) // Default to Retain ReclaimPolicy for pre-provisioned volumes @@ -76,6 +76,11 @@ func (d *ebsCSIDriver) GetPersistentVolume(volumeID string, fsType string, size if reclaimPolicy != nil { pvReclaimPolicy = *reclaimPolicy } + + if accessMode == "" { + accessMode = v1.ReadWriteOnce + } + return &v1.PersistentVolume{ ObjectMeta: metav1.ObjectMeta{ GenerateName: generateName, @@ -86,7 +91,7 @@ func (d *ebsCSIDriver) GetPersistentVolume(volumeID string, fsType string, size }, }, Spec: v1.PersistentVolumeSpec{ - AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}, + AccessModes: []v1.PersistentVolumeAccessMode{accessMode}, Capacity: v1.ResourceList{ v1.ResourceName(v1.ResourceStorage): resource.MustParse(size), }, @@ -98,6 +103,7 @@ func (d *ebsCSIDriver) GetPersistentVolume(volumeID string, fsType string, size FSType: fsType, }, }, + VolumeMode: &volumeMode, }, } } diff --git a/tests/e2e/dynamic_provisioning.go b/tests/e2e/dynamic_provisioning.go index 9342e4e52..abe13d6d1 100644 --- a/tests/e2e/dynamic_provisioning.go +++ b/tests/e2e/dynamic_provisioning.go @@ -238,6 +238,187 @@ var _ = Describe("[ebs-csi-e2e] [single-az] Dynamic Provisioning", func() { test.Run(cs, ns) }) + It("should succeed multi-attach with dynamically provisioned IO2 block device", func() { + volumeBindingMode := storagev1.VolumeBindingWaitForFirstConsumer + pods := []testsuites.PodDetails{ + { + Volumes: []testsuites.VolumeDetails{ + { + VolumeType: awscloud.VolumeTypeIO2, + ClaimSize: driver.MinimumSizeForVolumeType(awscloud.VolumeTypeIO2), + VolumeMode: testsuites.Block, + VolumeDevice: testsuites.VolumeDeviceDetails{ + NameGenerate: "test-block-volume-", + DevicePath: "/dev/xvda", + }, + AccessMode: v1.ReadWriteMany, + VolumeBindingMode: &volumeBindingMode, + }, + }, + }, + { + Volumes: []testsuites.VolumeDetails{ + { + VolumeType: awscloud.VolumeTypeIO2, + ClaimSize: driver.MinimumSizeForVolumeType(awscloud.VolumeTypeIO2), + VolumeMode: testsuites.Block, + VolumeDevice: testsuites.VolumeDeviceDetails{ + NameGenerate: "test-block-volume-", + DevicePath: "/dev/xvda", + }, + AccessMode: v1.ReadWriteMany, + VolumeBindingMode: &volumeBindingMode, + }, + }, + }, + } + test := testsuites.DynamicallyProvisionedMultiAttachTest{ + CSIDriver: ebsDriver, + Pods: pods, + VolumeMode: testsuites.Block, + VolumeType: awscloud.VolumeTypeIO2, + AccessMode: v1.ReadWriteMany, + RunningPod: true, + } + test.Run(cs, ns) + }) + + It("should fail to multi-attach dynamically provisioned IO2 block device - not enabled", func() { + volumeBindingMode := storagev1.VolumeBindingWaitForFirstConsumer + pods := []testsuites.PodDetails{ + { + Volumes: []testsuites.VolumeDetails{ + { + VolumeType: awscloud.VolumeTypeIO2, + ClaimSize: driver.MinimumSizeForVolumeType(awscloud.VolumeTypeIO2), + VolumeMode: testsuites.Block, + VolumeDevice: testsuites.VolumeDeviceDetails{ + NameGenerate: "test-block-volume-", + DevicePath: "/dev/xvda", + }, + AccessMode: v1.ReadWriteOnce, + VolumeBindingMode: &volumeBindingMode, + }, + }, + }, + { + Volumes: []testsuites.VolumeDetails{ + { + VolumeType: awscloud.VolumeTypeIO2, + ClaimSize: driver.MinimumSizeForVolumeType(awscloud.VolumeTypeIO2), + VolumeMode: testsuites.Block, + VolumeDevice: testsuites.VolumeDeviceDetails{ + NameGenerate: "test-block-volume-", + DevicePath: "/dev/xvda", + }, + AccessMode: v1.ReadWriteOnce, + VolumeBindingMode: &volumeBindingMode, + }, + }, + }, + } + test := testsuites.DynamicallyProvisionedMultiAttachTest{ + CSIDriver: ebsDriver, + Pods: pods, + VolumeMode: testsuites.Block, + AccessMode: v1.ReadWriteOnce, + VolumeType: awscloud.VolumeTypeIO2, + } + test.Run(cs, ns) + }) + + It("should fail to multi-attach when VolumeMode is not Block", func() { + volumeBindingMode := storagev1.VolumeBindingWaitForFirstConsumer + pods := []testsuites.PodDetails{ + { + Volumes: []testsuites.VolumeDetails{ + { + VolumeType: awscloud.VolumeTypeIO2, + FSType: ebscsidriver.FSTypeExt4, + VolumeMode: testsuites.FileSystem, + ClaimSize: driver.MinimumSizeForVolumeType(awscloud.VolumeTypeIO2), + VolumeMount: testsuites.VolumeMountDetails{ + NameGenerate: "test-volume-", + MountPathGenerate: "/mnt/test-", + }, + AccessMode: v1.ReadWriteMany, + VolumeBindingMode: &volumeBindingMode, + }, + }, + }, + { + Volumes: []testsuites.VolumeDetails{ + { + VolumeType: awscloud.VolumeTypeIO2, + FSType: ebscsidriver.FSTypeExt4, + VolumeMode: testsuites.FileSystem, + ClaimSize: driver.MinimumSizeForVolumeType(awscloud.VolumeTypeIO2), + VolumeMount: testsuites.VolumeMountDetails{ + NameGenerate: "test-volume-", + MountPathGenerate: "/mnt/test-", + }, + AccessMode: v1.ReadWriteMany, + VolumeBindingMode: &volumeBindingMode, + }, + }, + }, + } + test := testsuites.DynamicallyProvisionedMultiAttachTest{ + CSIDriver: ebsDriver, + Pods: pods, + VolumeMode: testsuites.FileSystem, + AccessMode: v1.ReadWriteMany, + VolumeType: awscloud.VolumeTypeIO2, + PendingPVC: true, + } + test.Run(cs, ns) + }) + + It("should fail to multi-attach non io2 VolumeType", func() { + volumeBindingMode := storagev1.VolumeBindingWaitForFirstConsumer + pods := []testsuites.PodDetails{ + { + Volumes: []testsuites.VolumeDetails{ + { + VolumeType: awscloud.VolumeTypeGP3, + ClaimSize: driver.MinimumSizeForVolumeType(awscloud.VolumeTypeGP3), + VolumeBindingMode: &volumeBindingMode, + VolumeMode: testsuites.Block, + VolumeDevice: testsuites.VolumeDeviceDetails{ + NameGenerate: "test-block-volume-", + DevicePath: "/dev/xvda", + }, + AccessMode: v1.ReadWriteMany, + }, + }, + }, + { + Volumes: []testsuites.VolumeDetails{ + { + VolumeType: awscloud.VolumeTypeGP3, + ClaimSize: driver.MinimumSizeForVolumeType(awscloud.VolumeTypeGP3), + VolumeBindingMode: &volumeBindingMode, + VolumeMode: testsuites.Block, + VolumeDevice: testsuites.VolumeDeviceDetails{ + NameGenerate: "test-block-volume-", + DevicePath: "/dev/xvda", + }, + AccessMode: v1.ReadWriteMany, + }, + }, + }, + } + test := testsuites.DynamicallyProvisionedMultiAttachTest{ + CSIDriver: ebsDriver, + Pods: pods, + VolumeMode: testsuites.FileSystem, + AccessMode: v1.ReadWriteMany, + VolumeType: awscloud.VolumeTypeIO2, + PendingPVC: true, + } + test.Run(cs, ns) + }) + It("should create a raw block volume and a filesystem volume on demand and bind to the same pod", func() { volumeBindingMode := storagev1.VolumeBindingWaitForFirstConsumer pods := []testsuites.PodDetails{ diff --git a/tests/e2e/pre_provsioning.go b/tests/e2e/pre_provsioning.go index 2924f42b1..0de2b65ce 100644 --- a/tests/e2e/pre_provsioning.go +++ b/tests/e2e/pre_provsioning.go @@ -118,14 +118,7 @@ var _ = Describe("[ebs-csi-e2e] [single-az] Pre-Provisioned", func() { AfterEach(func() { if !skipManuallyDeletingVolume { - _, err := cloud.WaitForAttachmentState(context.Background(), volumeID, "detached", "", "", false) - if err != nil { - Fail(fmt.Sprintf("could not detach volume %q: %v", volumeID, err)) - } - ok, err := cloud.DeleteDisk(context.Background(), volumeID) - if err != nil || !ok { - Fail(fmt.Sprintf("could not delete volume %q: %v", volumeID, err)) - } + deleteDiskWithRetry(cloud, volumeID) } }) @@ -233,3 +226,116 @@ var _ = Describe("[ebs-csi-e2e] [single-az] Pre-Provisioned", func() { test.Run(cs, ns) }) }) + +var _ = Describe("[ebs-csi-e2e] [single-az] Pre-Provisioned with Multi-Attach", func() { + f := framework.NewDefaultFramework("ebs") + f.NamespacePodSecurityEnforceLevel = admissionapi.LevelPrivileged + + var ( + cs clientset.Interface + ns *v1.Namespace + ebsDriver driver.PreProvisionedVolumeTestDriver + cloud awscloud.Cloud + volumeID string + skipManuallyDeletingVolume bool + ) + + BeforeEach(func() { + cs = f.ClientSet + ns = f.Namespace + ebsDriver = driver.InitEbsCSIDriver() + + if os.Getenv(awsAvailabilityZonesEnv) == "" { + Skip(fmt.Sprintf("env %q not set", awsAvailabilityZonesEnv)) + } + availabilityZones := strings.Split(os.Getenv(awsAvailabilityZonesEnv), ",") + availabilityZone := availabilityZones[rand.Intn(len(availabilityZones))] + region := availabilityZone[0 : len(availabilityZone)-1] + + diskOptions := &awscloud.DiskOptions{ + CapacityBytes: defaultDiskSizeBytes, + VolumeType: awscloud.VolumeTypeIO2, + MultiAttachEnabled: true, + AvailabilityZone: availabilityZone, + IOPS: 1000, + Tags: map[string]string{awscloud.VolumeNameTagKey: dummyVolumeName, awscloud.AwsEbsDriverTagKey: "true"}, + } + var err error + cloud, err = awscloud.NewCloud(region, false, "") + if err != nil { + Fail(fmt.Sprintf("could not get NewCloud: %v", err)) + } + r1 := rand.New(rand.NewSource(time.Now().UnixNano())) + disk, err := cloud.CreateDisk(context.Background(), fmt.Sprintf("pvc-%d", r1.Uint64()), diskOptions) + if err != nil { + Fail(fmt.Sprintf("could not provision a volume: %v", err)) + } + volumeID = disk.VolumeID + By(fmt.Sprintf("Successfully provisioned EBS volume: %q\n", volumeID)) + }) + + AfterEach(func() { + if !skipManuallyDeletingVolume { + deleteDiskWithRetry(cloud, volumeID) + } + }) + + It("should succeed multi-attach pre-provisioned IO2 block device", func() { + reclaimPolicy := v1.PersistentVolumeReclaimDelete + pods := []testsuites.PodDetails{ + { + Volumes: []testsuites.VolumeDetails{ + { + VolumeType: awscloud.VolumeTypeIO2, + ClaimSize: driver.MinimumSizeForVolumeType(awscloud.VolumeTypeIO2), + VolumeMode: testsuites.Block, + VolumeDevice: testsuites.VolumeDeviceDetails{ + NameGenerate: "test-block-volume-", + DevicePath: "/dev/xvda", + }, + AccessMode: v1.ReadWriteMany, + VolumeID: volumeID, + ReclaimPolicy: &reclaimPolicy, + }, + }, + }, + { + Volumes: []testsuites.VolumeDetails{ + { + VolumeType: awscloud.VolumeTypeIO2, + ClaimSize: driver.MinimumSizeForVolumeType(awscloud.VolumeTypeIO2), + VolumeMode: testsuites.Block, + VolumeDevice: testsuites.VolumeDeviceDetails{ + NameGenerate: "test-block-volume-", + DevicePath: "/dev/xvda", + }, + AccessMode: v1.ReadWriteMany, + VolumeID: volumeID, + ReclaimPolicy: &reclaimPolicy, + }, + }, + }, + } + test := testsuites.StaticallyProvisionedMultiAttachTest{ + CSIDriver: ebsDriver, + Pods: pods, + VolumeMode: v1.PersistentVolumeBlock, + VolumeType: awscloud.VolumeTypeIO2, + VolumeID: volumeID, + AccessMode: v1.ReadWriteMany, + } + test.Run(cs, ns) + }) +}) + +func deleteDiskWithRetry(cloud awscloud.Cloud, volumeID string) { + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + for { + ok, err := cloud.DeleteDisk(context.Background(), volumeID) + if err == nil && ok { + return + } + <-ticker.C + } +} diff --git a/tests/e2e/testsuites/dynamically_provisioned_multi_attach_tester.go b/tests/e2e/testsuites/dynamically_provisioned_multi_attach_tester.go new file mode 100644 index 000000000..9decddece --- /dev/null +++ b/tests/e2e/testsuites/dynamically_provisioned_multi_attach_tester.go @@ -0,0 +1,93 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testsuites + +import ( + "context" + "fmt" + "time" + + "github.com/kubernetes-sigs/aws-ebs-csi-driver/tests/e2e/driver" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + clientset "k8s.io/client-go/kubernetes" + e2epod "k8s.io/kubernetes/test/e2e/framework/pod" + e2epv "k8s.io/kubernetes/test/e2e/framework/pv" +) + +type DynamicallyProvisionedMultiAttachTest struct { + CSIDriver driver.DynamicPVTestDriver + Pods []PodDetails + VolumeMode VolumeMode + AccessMode v1.PersistentVolumeAccessMode + VolumeType string + RunningPod bool + PendingPVC bool +} + +func (t *DynamicallyProvisionedMultiAttachTest) Run(client clientset.Interface, namespace *v1.Namespace) { + // Setup StorageClass and PVC + tpvc, _ := t.Pods[0].Volumes[0].SetupDynamicPersistentVolumeClaim(client, namespace, t.CSIDriver) + defer tpvc.Cleanup() + + for n, podDetail := range t.Pods { + tpod := NewTestPod(client, namespace, "tail -f /dev/null") + + if podDetail.Volumes[0].VolumeMode == Block { + name := fmt.Sprintf("%s%d", podDetail.Volumes[0].VolumeDevice.NameGenerate, n+1) + devicePath := podDetail.Volumes[0].VolumeDevice.DevicePath + tpod.SetupRawBlockVolume(tpvc.persistentVolumeClaim, name, devicePath) + } else { + name := fmt.Sprintf("%s%d", podDetail.Volumes[0].VolumeMount.NameGenerate, n+1) + mountPath := fmt.Sprintf("%s%d", podDetail.Volumes[0].VolumeMount.MountPathGenerate, n+1) + readOnly := podDetail.Volumes[0].VolumeMount.ReadOnly + tpod.SetupVolume(tpvc.persistentVolumeClaim, name, mountPath, readOnly) + } + + tpod.pod.ObjectMeta.Labels = map[string]string{"app": "my-service"} + tpod.pod.Spec.TopologySpreadConstraints = []v1.TopologySpreadConstraint{ + { + MaxSkew: 1, + TopologyKey: "kubernetes.io/hostname", + WhenUnsatisfiable: v1.DoNotSchedule, + LabelSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{"app": "my-service"}, + }, + }, + } + + By("deploying the pod") + tpod.Create() + defer tpod.Cleanup() + + if t.PendingPVC { + By("checking that the PVC is not bound") + pvcList := []*v1.PersistentVolumeClaim{tpvc.persistentVolumeClaim} + _, err := e2epv.WaitForPVClaimBoundPhase(context.Background(), client, pvcList, 30*time.Second) + Expect(err).To(HaveOccurred(), "Failed to wait for PVC to be in Pending state") + return + } + if t.RunningPod || n == 0 { + By("checking that the pod is running") + tpod.WaitForRunning() + } else { + By("checking that the pod is not running") + err := e2epod.WaitTimeoutForPodRunningInNamespace(context.Background(), client, tpod.pod.Name, namespace.Name, 30*time.Second) + Expect(err).To(HaveOccurred(), "Failed to wait for pod to be in a running state") + } + } +} diff --git a/tests/e2e/testsuites/pre_provisioned_multi_attach_tester.go b/tests/e2e/testsuites/pre_provisioned_multi_attach_tester.go new file mode 100644 index 000000000..7f6349697 --- /dev/null +++ b/tests/e2e/testsuites/pre_provisioned_multi_attach_tester.go @@ -0,0 +1,66 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testsuites + +import ( + "fmt" + + "github.com/kubernetes-sigs/aws-ebs-csi-driver/tests/e2e/driver" + . "github.com/onsi/ginkgo/v2" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + clientset "k8s.io/client-go/kubernetes" +) + +type StaticallyProvisionedMultiAttachTest struct { + CSIDriver driver.PreProvisionedVolumeTestDriver + Pods []PodDetails + AccessMode v1.PersistentVolumeAccessMode + VolumeMode v1.PersistentVolumeMode + VolumeType string + RunningPod bool + PendingPVC bool + VolumeID string +} + +func (t *StaticallyProvisionedMultiAttachTest) Run(client clientset.Interface, namespace *v1.Namespace) { + tpvc, _ := t.Pods[0].Volumes[0].SetupPreProvisionedPersistentVolumeClaim(client, namespace, t.CSIDriver) + + for n, podDetail := range t.Pods { + tpod := NewTestPod(client, namespace, "tail -f /dev/null") + name := fmt.Sprintf("%s%d", podDetail.Volumes[0].VolumeDevice.NameGenerate, n+1) + devicePath := podDetail.Volumes[0].VolumeDevice.DevicePath + + tpod.SetupRawBlockVolume(tpvc.persistentVolumeClaim, name, devicePath) + tpod.pod.ObjectMeta.Labels = map[string]string{"app": "my-service"} + tpod.pod.Spec.TopologySpreadConstraints = []v1.TopologySpreadConstraint{ + { + MaxSkew: 1, + TopologyKey: "kubernetes.io/hostname", + WhenUnsatisfiable: v1.DoNotSchedule, + LabelSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{"app": "my-service"}, + }, + }, + } + + By("deploying the pod") + tpod.Create() + defer tpod.Cleanup() + + By("checking that the pods command exits with no error") + tpod.WaitForRunning() + } +} diff --git a/tests/e2e/testsuites/specs.go b/tests/e2e/testsuites/specs.go index ff068283c..f84528468 100644 --- a/tests/e2e/testsuites/specs.go +++ b/tests/e2e/testsuites/specs.go @@ -41,6 +41,7 @@ type VolumeDetails struct { ReclaimPolicy *v1.PersistentVolumeReclaimPolicy AllowVolumeExpansion *bool VolumeBindingMode *storagev1.VolumeBindingMode + AccessMode v1.PersistentVolumeAccessMode AllowedTopologyValues []string VolumeMode VolumeMode VolumeMount VolumeMountDetails @@ -125,7 +126,7 @@ func (pod *PodDetails) SetupDeployment(client clientset.Interface, namespace *v1 createdStorageClass := tsc.Create() cleanupFuncs = append(cleanupFuncs, tsc.Cleanup) By("setting up the PVC") - tpvc := NewTestPersistentVolumeClaim(client, namespace, volume.ClaimSize, volume.VolumeMode, &createdStorageClass) + tpvc := NewTestPersistentVolumeClaim(client, namespace, volume.ClaimSize, volume.VolumeMode, &createdStorageClass, v1.ReadWriteOnce) tpvc.Create() tpvc.WaitForBound() tpvc.ValidateProvisionedPersistentVolume() @@ -152,9 +153,9 @@ func (volume *VolumeDetails) SetupDynamicPersistentVolumeClaim(client clientset. Kind: VolumeSnapshotKind, APIGroup: &SnapshotAPIGroup, } - tpvc = NewTestPersistentVolumeClaimWithDataSource(client, namespace, volume.ClaimSize, volume.VolumeMode, &createdStorageClass, dataSource) + tpvc = NewTestPersistentVolumeClaimWithDataSource(client, namespace, volume.ClaimSize, volume.VolumeMode, &createdStorageClass, dataSource, volume.AccessMode) } else { - tpvc = NewTestPersistentVolumeClaim(client, namespace, volume.ClaimSize, volume.VolumeMode, &createdStorageClass) + tpvc = NewTestPersistentVolumeClaim(client, namespace, volume.ClaimSize, volume.VolumeMode, &createdStorageClass, volume.AccessMode) } tpvc.Create() cleanupFuncs = append(cleanupFuncs, tpvc.Cleanup) @@ -169,12 +170,16 @@ func (volume *VolumeDetails) SetupDynamicPersistentVolumeClaim(client clientset. func (volume *VolumeDetails) SetupPreProvisionedPersistentVolumeClaim(client clientset.Interface, namespace *v1.Namespace, csiDriver driver.PreProvisionedVolumeTestDriver) (*TestPersistentVolumeClaim, []func()) { cleanupFuncs := make([]func(), 0) + volumeMode := v1.PersistentVolumeFilesystem + if volume.VolumeMode == Block { + volumeMode = v1.PersistentVolumeBlock + } By("setting up the PV") - pv := csiDriver.GetPersistentVolume(volume.VolumeID, volume.FSType, volume.ClaimSize, volume.ReclaimPolicy, namespace.Name) + pv := csiDriver.GetPersistentVolume(volume.VolumeID, volume.FSType, volume.ClaimSize, volume.ReclaimPolicy, namespace.Name, volume.AccessMode, volumeMode) tpv := NewTestPreProvisionedPersistentVolume(client, pv) tpv.Create() By("setting up the PVC") - tpvc := NewTestPersistentVolumeClaim(client, namespace, volume.ClaimSize, volume.VolumeMode, nil) + tpvc := NewTestPersistentVolumeClaim(client, namespace, volume.ClaimSize, volume.VolumeMode, nil, volume.AccessMode) tpvc.Create() cleanupFuncs = append(cleanupFuncs, tpvc.DeleteBoundPersistentVolume) cleanupFuncs = append(cleanupFuncs, tpvc.Cleanup) diff --git a/tests/e2e/testsuites/testsuites.go b/tests/e2e/testsuites/testsuites.go index 2d7709c7b..e5ad0a59e 100644 --- a/tests/e2e/testsuites/testsuites.go +++ b/tests/e2e/testsuites/testsuites.go @@ -282,6 +282,7 @@ type TestPersistentVolumeClaim struct { client clientset.Interface claimSize string volumeMode v1.PersistentVolumeMode + accessMode v1.PersistentVolumeAccessMode storageClass *storagev1.StorageClass namespace *v1.Namespace persistentVolume *v1.PersistentVolume @@ -290,7 +291,7 @@ type TestPersistentVolumeClaim struct { dataSource *v1.TypedLocalObjectReference } -func NewTestPersistentVolumeClaim(c clientset.Interface, ns *v1.Namespace, claimSize string, volumeMode VolumeMode, sc *storagev1.StorageClass) *TestPersistentVolumeClaim { +func NewTestPersistentVolumeClaim(c clientset.Interface, ns *v1.Namespace, claimSize string, volumeMode VolumeMode, sc *storagev1.StorageClass, accessMode v1.PersistentVolumeAccessMode) *TestPersistentVolumeClaim { mode := v1.PersistentVolumeFilesystem if volumeMode == Block { mode = v1.PersistentVolumeBlock @@ -301,10 +302,11 @@ func NewTestPersistentVolumeClaim(c clientset.Interface, ns *v1.Namespace, claim volumeMode: mode, namespace: ns, storageClass: sc, + accessMode: accessMode, } } -func NewTestPersistentVolumeClaimWithDataSource(c clientset.Interface, ns *v1.Namespace, claimSize string, volumeMode VolumeMode, sc *storagev1.StorageClass, dataSource *v1.TypedLocalObjectReference) *TestPersistentVolumeClaim { +func NewTestPersistentVolumeClaimWithDataSource(c clientset.Interface, ns *v1.Namespace, claimSize string, volumeMode VolumeMode, sc *storagev1.StorageClass, dataSource *v1.TypedLocalObjectReference, accessMode v1.PersistentVolumeAccessMode) *TestPersistentVolumeClaim { mode := v1.PersistentVolumeFilesystem if volumeMode == Block { mode = v1.PersistentVolumeBlock @@ -317,6 +319,7 @@ func NewTestPersistentVolumeClaimWithDataSource(c clientset.Interface, ns *v1.Na namespace: ns, storageClass: sc, dataSource: dataSource, + accessMode: accessMode, } } @@ -328,7 +331,7 @@ func (t *TestPersistentVolumeClaim) Create() { if t.storageClass != nil { storageClassName = t.storageClass.Name } - t.requestedPersistentVolumeClaim = generatePVC(t.namespace.Name, storageClassName, t.claimSize, t.volumeMode, t.dataSource) + t.requestedPersistentVolumeClaim = generatePVC(t.namespace.Name, storageClassName, t.claimSize, t.volumeMode, t.dataSource, t.accessMode) t.persistentVolumeClaim, err = t.client.CoreV1().PersistentVolumeClaims(t.namespace.Name).Create(context.Background(), t.requestedPersistentVolumeClaim, metav1.CreateOptions{}) framework.ExpectNoError(err) } @@ -406,7 +409,10 @@ func (t *TestPersistentVolumeClaim) WaitForBound() v1.PersistentVolumeClaim { return *t.persistentVolumeClaim } -func generatePVC(namespace, storageClassName, claimSize string, volumeMode v1.PersistentVolumeMode, dataSource *v1.TypedLocalObjectReference) *v1.PersistentVolumeClaim { +func generatePVC(namespace, storageClassName, claimSize string, volumeMode v1.PersistentVolumeMode, dataSource *v1.TypedLocalObjectReference, accessMode v1.PersistentVolumeAccessMode) *v1.PersistentVolumeClaim { + if accessMode == "" { + accessMode = v1.ReadWriteOnce + } return &v1.PersistentVolumeClaim{ ObjectMeta: metav1.ObjectMeta{ GenerateName: "pvc-", @@ -415,7 +421,7 @@ func generatePVC(namespace, storageClassName, claimSize string, volumeMode v1.Pe Spec: v1.PersistentVolumeClaimSpec{ StorageClassName: &storageClassName, AccessModes: []v1.PersistentVolumeAccessMode{ - v1.ReadWriteOnce, + accessMode, }, Resources: v1.ResourceRequirements{ Requests: v1.ResourceList{