From dadb2efa4537c4bccd23d2986657cca52d0d74b7 Mon Sep 17 00:00:00 2001 From: Connor Catlett Date: Tue, 25 Jun 2024 19:42:49 +0000 Subject: [PATCH] Migrate likelyBadNames to ExpiringCache Signed-off-by: Connor Catlett --- pkg/cloud/cloud.go | 46 ++++++------------- pkg/cloud/cloud_test.go | 68 +++++++++++----------------- pkg/cloud/devicemanager/allocator.go | 22 ++++++--- pkg/cloud/devicemanager/manager.go | 4 +- 4 files changed, 59 insertions(+), 81 deletions(-) diff --git a/pkg/cloud/cloud.go b/pkg/cloud/cloud.go index 91a8d377f..67bb5f16f 100644 --- a/pkg/cloud/cloud.go +++ b/pkg/cloud/cloud.go @@ -91,6 +91,7 @@ var ( const ( volumeDetachedState = "detached" volumeAttachedState = "attached" + cacheForgetDelay = 1 * time.Hour ) // AWS provisioning limits. @@ -316,7 +317,7 @@ type cloud struct { bm *batcherManager rm *retryManager vwp volumeWaitParameters - likelyBadNames util.ExpiringCache[string, map[string]struct{}] + likelyBadNames util.ExpiringCache[string, sync.Map] } var _ Cloud = &cloud{} @@ -371,7 +372,7 @@ func newEC2Cloud(region string, awsSdkDebugLog bool, userAgentExtra string, batc bm: bm, rm: newRetryManager(), vwp: vwp, - likelyBadNames: util.NewExpiringCache[string, map[string]struct{}](cacheForgetDelay), + likelyBadNames: util.NewExpiringCache[string, sync.Map](cacheForgetDelay), } } @@ -855,7 +856,11 @@ func (c *cloud) AttachDisk(ctx context.Context, volumeID, nodeID string) (string return "", err } - likelyBadNames, _ := c.likelyBadNames[nodeID] + likelyBadNames, ok := c.likelyBadNames.Get(nodeID) + if !ok { + likelyBadNames = new(sync.Map) + c.likelyBadNames.Set(nodeID, likelyBadNames) + } device, err := c.dm.NewDevice(instance, volumeID, likelyBadNames) if err != nil { @@ -875,37 +880,16 @@ func (c *cloud) AttachDisk(ctx context.Context, volumeID, nodeID string) (string }) if attachErr != nil { if isAWSErrorBlockDeviceInUse(attachErr) { - cacheMutex.Lock() - if node, ok := nodeDeviceCache[nodeID]; ok { - // Node already had existing cached bad names, add on to the list - node.likelyBadNames[device.Path] = struct{}{} - node.timer.Reset(cacheForgetDelay) - } else { - // Node has no existing cached bad device names, setup a new struct instance - nodeDeviceCache[nodeID] = cachedNode{ - timer: time.AfterFunc(cacheForgetDelay, func() { - // If this ever fires, the node has not had a volume attached for an hour - // In order to prevent a semi-permanent memory leak, delete it from the map - cacheMutex.Lock() - delete(nodeDeviceCache, nodeID) - cacheMutex.Unlock() - }), - likelyBadNames: map[string]struct{}{ - device.Path: {}, - }, - } - } - cacheMutex.Unlock() + // If block device is "in use", that likely indicates a bad name that is in use by a block + // device that we do not know about (example: block devices attached in the AMI, which are + // not reported in DescribeInstance's block device map) + // + // Store such bad names in the "likely bad" map to be considered last in future attempts + likelyBadNames.Store(device.Path, struct{}{}) } return "", fmt.Errorf("could not attach volume %q to node %q: %w", volumeID, nodeID, attachErr) } - cacheMutex.Lock() - if node, ok := nodeDeviceCache[nodeID]; ok { - // Remove succesfully attached devices from the "likely bad" list - delete(node.likelyBadNames, device.Path) - node.timer.Reset(cacheForgetDelay) - } - cacheMutex.Unlock() + likelyBadNames.Delete(device.Path) klog.V(5).InfoS("[Debug] AttachVolume", "volumeID", volumeID, "nodeID", nodeID, "resp", resp) } diff --git a/pkg/cloud/cloud_test.go b/pkg/cloud/cloud_test.go index 99b21ed81..2fabc44d3 100644 --- a/pkg/cloud/cloud_test.go +++ b/pkg/cloud/cloud_test.go @@ -20,13 +20,14 @@ import ( "context" "errors" "fmt" - "k8s.io/apimachinery/pkg/util/wait" "reflect" "strings" "sync" "testing" "time" + "k8s.io/apimachinery/pkg/util/wait" + "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/ec2" "github.com/aws/aws-sdk-go-v2/service/ec2/types" @@ -1341,14 +1342,13 @@ func TestAttachDisk(t *testing.T) { } testCases := []struct { - name string - volumeID string - nodeID string - nodeID2 string - path string - expErr error - mockFunc func(*MockEC2API, context.Context, string, string, string, string, dm.DeviceManager) - validateFunc func(t *testing.T) + name string + volumeID string + nodeID string + nodeID2 string + path string + expErr error + mockFunc func(*MockEC2API, context.Context, string, string, string, string, dm.DeviceManager) }{ { name: "success: AttachVolume normal", @@ -1377,16 +1377,23 @@ func TestAttachDisk(t *testing.T) { name: "success: AttachVolume skip likely bad name", volumeID: defaultVolumeID, nodeID: defaultNodeID, + nodeID2: defaultNodeID, // Induce second attach path: "/dev/xvdab", - expErr: nil, + expErr: fmt.Errorf("could not attach volume %q to node %q: %w", defaultVolumeID, defaultNodeID, blockDeviceInUseErr), mockFunc: func(mockEC2 *MockEC2API, ctx context.Context, volumeID, nodeID, nodeID2, path string, dm dm.DeviceManager) { volumeRequest := createVolumeRequest(volumeID) instanceRequest := createInstanceRequest(nodeID) - attachRequest := createAttachRequest(volumeID, nodeID, path) + attachRequest1 := createAttachRequest(volumeID, nodeID, defaultPath) + attachRequest2 := createAttachRequest(volumeID, nodeID, path) gomock.InOrder( + // First call - fail with "already in use" error mockEC2.EXPECT().DescribeInstances(gomock.Any(), gomock.Eq(instanceRequest)).Return(newDescribeInstancesOutput(nodeID), nil), - mockEC2.EXPECT().AttachVolume(gomock.Any(), gomock.Eq(attachRequest), gomock.Any()).Return(&ec2.AttachVolumeOutput{ + mockEC2.EXPECT().AttachVolume(gomock.Any(), gomock.Eq(attachRequest1), gomock.Any()).Return(nil, blockDeviceInUseErr), + + // Second call - succeed, expect bad device name to be skipped + mockEC2.EXPECT().DescribeInstances(gomock.Any(), gomock.Eq(instanceRequest)).Return(newDescribeInstancesOutput(nodeID), nil), + mockEC2.EXPECT().AttachVolume(gomock.Any(), gomock.Eq(attachRequest2), gomock.Any()).Return(&ec2.AttachVolumeOutput{ Device: aws.String(path), InstanceId: aws.String(nodeID), VolumeId: aws.String(volumeID), @@ -1394,15 +1401,6 @@ func TestAttachDisk(t *testing.T) { }, nil), mockEC2.EXPECT().DescribeVolumes(gomock.Any(), volumeRequest).Return(createDescribeVolumesOutput([]*string{&volumeID}, nodeID, path, "attached"), nil), ) - - nodeDeviceCache = map[string]cachedNode{ - defaultNodeID: { - timer: time.NewTimer(1 * time.Hour), - likelyBadNames: map[string]struct{}{ - defaultPath: {}, - }, - }, - } }, }, { @@ -1416,7 +1414,7 @@ func TestAttachDisk(t *testing.T) { instanceRequest := createInstanceRequest(nodeID) fakeInstance := newFakeInstance(nodeID, volumeID, path) - _, err := dm.NewDevice(&fakeInstance, volumeID, map[string]struct{}{}) + _, err := dm.NewDevice(&fakeInstance, volumeID, new(sync.Map)) require.NoError(t, err) gomock.InOrder( @@ -1439,9 +1437,6 @@ func TestAttachDisk(t *testing.T) { mockEC2.EXPECT().AttachVolume(gomock.Any(), attachRequest, gomock.Any()).Return(nil, errors.New("AttachVolume error")), ) }, - validateFunc: func(t *testing.T) { - assert.NotContains(t, nodeDeviceCache, defaultNodeID) - }, }, { name: "fail: AttachVolume returned block device already in use error", @@ -1458,11 +1453,6 @@ func TestAttachDisk(t *testing.T) { mockEC2.EXPECT().AttachVolume(ctx, attachRequest, gomock.Any()).Return(nil, blockDeviceInUseErr), ) }, - validateFunc: func(t *testing.T) { - assert.Contains(t, nodeDeviceCache, defaultNodeID) - assert.NotNil(t, nodeDeviceCache[defaultNodeID].timer) - assert.Contains(t, nodeDeviceCache[defaultNodeID].likelyBadNames, defaultPath) - }, }, { name: "success: AttachVolume multi-attach", @@ -1524,9 +1514,6 @@ func TestAttachDisk(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - // Reset node likely bad names cache - nodeDeviceCache = map[string]cachedNode{} - mockCtrl := gomock.NewController(t) mockEC2 := NewMockEC2API(mockCtrl) c := newCloud(mockEC2) @@ -1552,10 +1539,6 @@ func TestAttachDisk(t *testing.T) { assert.Equal(t, tc.path, devicePath) } - if tc.validateFunc != nil { - tc.validateFunc(t) - } - mockCtrl.Finish() }) } @@ -3086,11 +3069,12 @@ func testVolumeWaitParameters() volumeWaitParameters { func newCloud(mockEC2 EC2API) Cloud { c := &cloud{ - region: "test-region", - dm: dm.NewDeviceManager(), - ec2: mockEC2, - rm: newRetryManager(), - vwp: testVolumeWaitParameters(), + region: "test-region", + dm: dm.NewDeviceManager(), + ec2: mockEC2, + rm: newRetryManager(), + vwp: testVolumeWaitParameters(), + likelyBadNames: util.NewExpiringCache[string, sync.Map](cacheForgetDelay), } return c } diff --git a/pkg/cloud/devicemanager/allocator.go b/pkg/cloud/devicemanager/allocator.go index 52ebb7829..a819aec67 100644 --- a/pkg/cloud/devicemanager/allocator.go +++ b/pkg/cloud/devicemanager/allocator.go @@ -18,6 +18,7 @@ package devicemanager import ( "fmt" + "sync" ) // ExistingNames is a map of assigned device names. Presence of a key with a device @@ -34,7 +35,7 @@ type ExistingNames map[string]string // call), so all available device names are used eventually and it minimizes // device name reuse. type NameAllocator interface { - GetNext(existingNames ExistingNames, likelyBadNames map[string]struct{}) (name string, err error) + GetNext(existingNames ExistingNames, likelyBadNames *sync.Map) (name string, err error) } type nameAllocator struct{} @@ -46,18 +47,27 @@ var _ NameAllocator = &nameAllocator{} // // likelyBadNames is a map of names that have previously returned an "in use" error when attempting to mount to them // These names are unlikely to result in a successful mount, and may be permanently unavailable, so use them last -func (d *nameAllocator) GetNext(existingNames ExistingNames, likelyBadNames map[string]struct{}) (string, error) { +func (d *nameAllocator) GetNext(existingNames ExistingNames, likelyBadNames *sync.Map) (string, error) { for _, name := range deviceNames { _, existing := existingNames[name] - _, likelyBad := likelyBadNames[name] + _, likelyBad := likelyBadNames.Load(name) if !existing && !likelyBad { return name, nil } } - for name := range likelyBadNames { - if _, existing := existingNames[name]; !existing { - return name, nil + + finalResortName := "" + likelyBadNames.Range(func(name, _ interface{}) bool { + if name, ok := name.(string); ok { + if _, existing := existingNames[name]; !existing { + finalResortName = name + return true + } } + return false + }) + if finalResortName != "" { + return finalResortName, nil } return "", fmt.Errorf("there are no names available") diff --git a/pkg/cloud/devicemanager/manager.go b/pkg/cloud/devicemanager/manager.go index 784ac252b..76fa70367 100644 --- a/pkg/cloud/devicemanager/manager.go +++ b/pkg/cloud/devicemanager/manager.go @@ -52,7 +52,7 @@ type DeviceManager interface { // NewDevice retrieves the device if the device is already assigned. // Otherwise it creates a new device with next available device name // and mark it as unassigned device. - NewDevice(instance *types.Instance, volumeID string, likelyBadNames map[string]struct{}) (device *Device, err error) + NewDevice(instance *types.Instance, volumeID string, likelyBadNames *sync.Map) (device *Device, err error) // GetDevice returns the device already assigned to the volume. GetDevice(instance *types.Instance, volumeID string) (device *Device, err error) @@ -103,7 +103,7 @@ func NewDeviceManager() DeviceManager { } } -func (d *deviceManager) NewDevice(instance *types.Instance, volumeID string, likelyBadNames map[string]struct{}) (*Device, error) { +func (d *deviceManager) NewDevice(instance *types.Instance, volumeID string, likelyBadNames *sync.Map) (*Device, error) { d.mux.Lock() defer d.mux.Unlock()