Skip to content

Commit

Permalink
Merge pull request #37 from ykulazhenkov/pr-fix-pod-watch-for-node
Browse files Browse the repository at this point in the history
Fix pod watch namespace for nv-ipam-node
  • Loading branch information
ykulazhenkov authored Mar 19, 2024
2 parents 9e198a9 + 1e29014 commit a34b4a2
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 23 deletions.
15 changes: 13 additions & 2 deletions cmd/ipam-node/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/google/renameio/v2"
"github.com/spf13/cobra"
"google.golang.org/grpc"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
Expand All @@ -39,6 +40,7 @@ import (
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/healthz"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"

Expand Down Expand Up @@ -141,13 +143,22 @@ func RunNodeDaemon(ctx context.Context, config *rest.Config, opts *options.Optio
Scheme: scheme,
Metrics: metricsserver.Options{BindAddress: opts.MetricsAddr},
HealthProbeBindAddress: opts.ProbeAddr,
Cache: cache.Options{DefaultNamespaces: map[string]cache.Config{opts.PoolsNamespace: {}}},
Cache: cache.Options{
DefaultNamespaces: map[string]cache.Config{opts.PoolsNamespace: {}},
ByObject: map[client.Object]cache.ByObject{
&corev1.Pod{}: {Namespaces: map[string]cache.Config{cache.AllNamespaces: {}}}},
},
})
if err != nil {
logger.Error(err, "unable to initialize manager")
return err
}

k8sClient, err := client.New(config, client.Options{Scheme: mgr.GetScheme(), Mapper: mgr.GetRESTMapper()})
if err != nil {
logger.Error(err, "unable to direct k8s client")
}

if err = (&ippoolctrl.IPPoolReconciler{
PoolManager: poolManager,
Client: mgr.GetClient(),
Expand Down Expand Up @@ -252,7 +263,7 @@ func RunNodeDaemon(ctx context.Context, config *rest.Config, opts *options.Optio
}
return
}
c := cleaner.New(mgr.GetClient(), store, poolManager, time.Minute, 3)
c := cleaner.New(mgr.GetClient(), k8sClient, store, poolManager, time.Minute, 3)
c.Start(innerCtx)
logger.Info("cleaner stopped")
}()
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/emicklei/go-restful/v3 v3.12.0 // indirect
github.com/evanphx/json-patch v4.12.0+incompatible // indirect
github.com/evanphx/json-patch/v5 v5.9.0 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/go-logr/zapr v1.3.0 // indirect
Expand Down
56 changes: 41 additions & 15 deletions pkg/ipam-node/cleaner/cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,12 @@ type Cleaner interface {
// New creates and initialize new cleaner instance
// "checkInterval" defines delay between checks for stale allocations.
// "checkCountBeforeRelease: defines how many check to do before remove the allocation
func New(client client.Client, store storePkg.Store, poolConfReader pool.ConfigReader,
func New(cachedClient client.Client, directClient client.Client, store storePkg.Store, poolConfReader pool.ConfigReader,
checkInterval time.Duration,
checkCountBeforeRelease int) Cleaner {
return &cleaner{
client: client,
cachedClient: cachedClient,
directClient: directClient,
store: store,
poolConfReader: poolConfReader,
checkInterval: checkInterval,
Expand All @@ -59,7 +60,8 @@ func New(client client.Client, store storePkg.Store, poolConfReader pool.ConfigR
}

type cleaner struct {
client client.Client
cachedClient client.Client
directClient client.Client
store storePkg.Store
poolConfReader pool.ConfigReader
checkInterval time.Duration
Expand Down Expand Up @@ -109,22 +111,27 @@ func (c *cleaner) loop(ctx context.Context) error {
resLogger.V(2).Info("reservation has no required metadata fields, skip")
continue
}
pod := &corev1.Pod{}
err := c.client.Get(ctx, apiTypes.NamespacedName{
Namespace: reservation.Metadata.PodNamespace,
Name: reservation.Metadata.PodName,
}, pod)
if err != nil && !apiErrors.IsNotFound(err) {
// first check in the cache
found, err := c.checkPod(ctx, c.cachedClient, reservation)
if err != nil {
session.Cancel()
return fmt.Errorf("failed to read Pod info from the cache: %v", err)
}
if apiErrors.IsNotFound(err) ||
(reservation.Metadata.PodUUID != "" && reservation.Metadata.PodUUID != string(pod.UID)) {
c.staleAllocations[key]++
resLogger.V(2).Info("pod not found in the API, increase stale counter",
"value", c.staleAllocations[key])
} else {
if !found {
resLogger.V(2).Info("cache check failed for the Pod, check in the API")
// pod not found in the cache, try to query API directly to make sure
// that we use latest state
found, err = c.checkPod(ctx, c.directClient, reservation)
if err != nil {
session.Cancel()
return fmt.Errorf("failed to read Pod info from the API: %v", err)
}
}
if found {
delete(c.staleAllocations, key)
} else {
c.staleAllocations[key]++
resLogger.V(2).Info("pod not found, increase stale counter", "value", c.staleAllocations[key])
}
}
}
Expand Down Expand Up @@ -156,6 +163,25 @@ func (c *cleaner) loop(ctx context.Context) error {
return nil
}

// return true if pod exist, error in case if an unknown error occurred
func (c *cleaner) checkPod(ctx context.Context, k8sClient client.Client, reservation types.Reservation) (bool, error) {
pod := &corev1.Pod{}
err := k8sClient.Get(ctx, apiTypes.NamespacedName{
Namespace: reservation.Metadata.PodNamespace,
Name: reservation.Metadata.PodName,
}, pod)
if err != nil && !apiErrors.IsNotFound(err) {
return false, fmt.Errorf("failed to read Pod info: %v", err)
}
if apiErrors.IsNotFound(err) ||
(reservation.Metadata.PodUUID != "" && reservation.Metadata.PodUUID != string(pod.UID)) {
// pod not found or it has different UUID (was recreated)
return false, nil
}
// pod exist
return true, nil
}

func (c *cleaner) getStaleAllocKey(poolName string, r types.Reservation) string {
return fmt.Sprintf("%s|%s|%s", poolName, r.ContainerID, r.InterfaceName)
}
15 changes: 10 additions & 5 deletions pkg/ipam-node/cleaner/cleaner_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,17 @@ import (

"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/envtest"
)

var (
cfg *rest.Config
k8sClient client.Client
testEnv *envtest.Environment
cFunc context.CancelFunc
ctx context.Context
cfg *rest.Config
k8sClient client.Client
fakeClient client.Client
testEnv *envtest.Environment
cFunc context.CancelFunc
ctx context.Context
)

func TestCleaner(t *testing.T) {
Expand All @@ -56,6 +58,9 @@ var _ = BeforeSuite(func() {
k8sClient, err = client.New(cfg, client.Options{})
Expect(err).NotTo(HaveOccurred())
Expect(k8sClient).NotTo(BeNil())

fakeClient = fake.NewFakeClient()
Expect(fakeClient).NotTo(BeNil())
})

var _ = AfterSuite(func() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ipam-node/cleaner/cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ var _ = Describe("Cleaner", func() {
// this will create empty pool config
session.SetLastReservedIP(testPool3, net.ParseIP("192.168.33.100"))

cleaner := cleanerPkg.New(k8sClient, store, poolManager, time.Millisecond*100, 3)
cleaner := cleanerPkg.New(fakeClient, k8sClient, store, poolManager, time.Millisecond*100, 3)

pod1UID := createPod(testPodName1, testNamespace)
_ = createPod(testPodName2, testNamespace)
Expand Down

0 comments on commit a34b4a2

Please sign in to comment.