From 5355d6f114dceedafec44d65af650966d1415199 Mon Sep 17 00:00:00 2001 From: gunjan5 Date: Fri, 14 Apr 2017 11:03:24 -0700 Subject: [PATCH] Pass containerID wo WEP and check it before deleting --- calico.go | 83 ++++------------- calico_cni_k8s_test.go | 203 ++++++++++++++++++++++++++++++++++++----- calico_cni_test.go | 44 +++++---- glide.lock | 13 ++- glide.yaml | 2 +- k8s/k8s.go | 63 +++++++++++++ utils/utils.go | 70 ++++++++++++++ 7 files changed, 372 insertions(+), 106 deletions(-) diff --git a/calico.go b/calico.go index dc0516254..2199661d1 100644 --- a/calico.go +++ b/calico.go @@ -20,16 +20,11 @@ import ( "fmt" "os" "runtime" - "strings" - - "github.com/vishvananda/netlink" "net" log "github.com/Sirupsen/logrus" - "github.com/containernetworking/cni/pkg/ip" "github.com/containernetworking/cni/pkg/ipam" - "github.com/containernetworking/cni/pkg/ns" "github.com/containernetworking/cni/pkg/skel" "github.com/containernetworking/cni/pkg/types" "github.com/containernetworking/cni/pkg/types/current" @@ -306,80 +301,42 @@ func cmdDel(args *skel.CmdArgs) error { "Node": nodename, }).Info("Extracted identifiers") - // Always try to release the address. Don't deal with any errors till the endpoints are cleaned up. - fmt.Fprintf(os.Stderr, "Calico CNI releasing IP address\n") - logger.WithFields(log.Fields{"paths": os.Getenv("CNI_PATH"), - "type": conf.IPAM.Type}).Debug("Looking for IPAM plugin in paths") - - // We need to replace "usePodCidr" with a valid, but dummy podCidr string with "host-local" IPAM. - if conf.IPAM.Type == "host-local" && strings.EqualFold(conf.IPAM.Subnet, "usePodCidr") { - - // host-local IPAM releases the IP by ContainerID, so podCidr isn't really used to release the IP. - // It just needs a valid CIDR, but it doesn't have to be the CIDR associated with the host. - dummyPodCidr := "0.0.0.0/0" - var stdinData map[string]interface{} - - if err := json.Unmarshal(args.StdinData, &stdinData); err != nil { - return err - } - - logger.WithField("podCidr", dummyPodCidr).Info("Using a dummy podCidr to release the IP") - stdinData["ipam"].(map[string]interface{})["subnet"] = dummyPodCidr - - args.StdinData, err = json.Marshal(stdinData) - if err != nil { - return err - } - logger.WithField("stdin", args.StdinData).Debug("Updated stdin data for Delete Cmd") - } - - ipamErr := ipam.ExecDel(conf.IPAM.Type, args.StdinData) - - if ipamErr != nil { - logger.Error(ipamErr) - } - calicoClient, err := CreateClient(conf) if err != nil { return err } - ep := api.WorkloadEndpointMetadata{ + wep := api.WorkloadEndpointMetadata{ Name: args.IfName, Node: nodename, Orchestrator: orchestrator, - Workload: workload} - if err = calicoClient.WorkloadEndpoints().Delete(ep); err != nil { - if _, ok := err.(errors.ErrorResourceDoesNotExist); ok { - logger.WithField("endpoint", ep).Info("Endpoint object does not exist, no need to clean up.") - } else { - return err - } + Workload: workload, } - // Only try to delete the device if a namespace was passed in. - if args.Netns != "" { - logger.Debug("Checking namespace & device exist.") - devErr := ns.WithNetNSPath(args.Netns, func(_ ns.NetNS) error { - _, err := netlink.LinkByName(args.IfName) - return err - }) + // Handle k8s specific bits of handling the DEL. + if orchestrator == "k8s" { + return k8s.CmdDelK8s(calicoClient, wep, args, conf, logger) + } - if devErr == nil { - fmt.Fprintf(os.Stderr, "Calico CNI deleting device in netns %s\n", args.Netns) - err = ns.WithNetNSPath(args.Netns, func(_ ns.NetNS) error { - _, err = ip.DelLinkByNameAddr(args.IfName, netlink.FAMILY_V4) - return err - }) + // Release the IP address by calling the configured IPAM plugin. + ipamErr := CleanUpIPAM(conf, args, logger) - if err != nil { - return err - } + // Delete the WorkloadEndpoint object from the datastore. + if err = calicoClient.WorkloadEndpoints().Delete(wep); err != nil { + if _, ok := err.(errors.ErrorResourceDoesNotExist); ok { + // Log and proceed with the clean up if WEP doesn't exist. + logger.WithField("endpoint", wep).Info("Endpoint object does not exist, no need to clean up.") } else { - logger.Info("veth does not exist, no need to clean up.") + return err } } + // Clean up namespace by removing the interfaces. + err = CleanUpNamespace(args, logger) + if err != nil { + return err + } + // Return the IPAM error if there was one. The IPAM error will be lost if there was also an error in cleaning up // the device or endpoint, but crucially, the user will know the overall operation failed. return ipamErr diff --git a/calico_cni_k8s_test.go b/calico_cni_k8s_test.go index 8d1c043bc..3cffa0e1e 100644 --- a/calico_cni_k8s_test.go +++ b/calico_cni_k8s_test.go @@ -114,12 +114,16 @@ var _ = Describe("CalicoCni", func() { endpoints, err := calicoClient.WorkloadEndpoints().List(api.WorkloadEndpointMetadata{}) Expect(err).ShouldNot(HaveOccurred()) Expect(endpoints.Items).Should(HaveLen(1)) + + // Set the Revision to nil since we can't assert it's exact value. + endpoints.Items[0].Metadata.Revision = nil Expect(endpoints.Items[0].Metadata).Should(Equal(api.WorkloadEndpointMetadata{ - Node: hostname, - Name: "eth0", - Workload: fmt.Sprintf("test.%s", name), - Orchestrator: "k8s", - Labels: map[string]string{"calico/k8s_ns": "test"}, + Node: hostname, + Name: "eth0", + Workload: fmt.Sprintf("test.%s", name), + ActiveInstanceID: containerID, + Orchestrator: "k8s", + Labels: map[string]string{"calico/k8s_ns": "test"}, })) Expect(endpoints.Items[0].Spec).Should(Equal(api.WorkloadEndpointSpec{ InterfaceName: interfaceName, @@ -312,7 +316,7 @@ var _ = Describe("CalicoCni", func() { logger.Infof("Created POD object: %v", pod) - _, _, _, contVeth, contAddresses, _, err := CreateContainer(netconfCalicoIPAM, name, "") + containerID, _, _, contVeth, contAddresses, _, err := CreateContainer(netconfCalicoIPAM, name, "") Expect(err).NotTo(HaveOccurred()) mac := contVeth.Attrs().HardwareAddr @@ -327,12 +331,16 @@ var _ = Describe("CalicoCni", func() { endpoints, err := calicoClient.WorkloadEndpoints().List(api.WorkloadEndpointMetadata{}) Expect(err).ShouldNot(HaveOccurred()) Expect(endpoints.Items).Should(HaveLen(1)) + + // Set the Revision to nil since we can't assert it's exact value. + endpoints.Items[0].Metadata.Revision = nil Expect(endpoints.Items[0].Metadata).Should(Equal(api.WorkloadEndpointMetadata{ - Node: hostname, - Name: "eth0", - Workload: fmt.Sprintf("test.%s", name), - Orchestrator: "k8s", - Labels: map[string]string{"calico/k8s_ns": "test"}, + Node: hostname, + Name: "eth0", + Workload: fmt.Sprintf("test.%s", name), + ActiveInstanceID: containerID, + Orchestrator: "k8s", + Labels: map[string]string{"calico/k8s_ns": "test"}, })) Expect(endpoints.Items[0].Spec).Should(Equal(api.WorkloadEndpointSpec{ InterfaceName: interfaceName, @@ -400,7 +408,7 @@ var _ = Describe("CalicoCni", func() { logger.Infof("Created POD object: %v", pod) - _, _, _, contVeth, contAddresses, _, err := CreateContainer(netconfCalicoIPAM, name, "") + containerID, _, _, contVeth, contAddresses, _, err := CreateContainer(netconfCalicoIPAM, name, "") Expect(err).NotTo(HaveOccurred()) mac := contVeth.Attrs().HardwareAddr @@ -415,12 +423,16 @@ var _ = Describe("CalicoCni", func() { endpoints, err := calicoClient.WorkloadEndpoints().List(api.WorkloadEndpointMetadata{}) Expect(err).ShouldNot(HaveOccurred()) Expect(endpoints.Items).Should(HaveLen(1)) + + // Set the Revision to nil since we can't assert it's exact value. + endpoints.Items[0].Metadata.Revision = nil Expect(endpoints.Items[0].Metadata).Should(Equal(api.WorkloadEndpointMetadata{ - Node: hostname, - Name: "eth0", - Workload: fmt.Sprintf("test.%s", name), - Orchestrator: "k8s", - Labels: map[string]string{"calico/k8s_ns": "test"}, + Node: hostname, + Name: "eth0", + Workload: fmt.Sprintf("test.%s", name), + ActiveInstanceID: containerID, + Orchestrator: "k8s", + Labels: map[string]string{"calico/k8s_ns": "test"}, })) Expect(endpoints.Items[0].Spec).Should(Equal(api.WorkloadEndpointSpec{ InterfaceName: interfaceName, @@ -486,7 +498,7 @@ var _ = Describe("CalicoCni", func() { logger.Infof("Created POD object: %v", pod) - _, _, _, contVeth, contAddresses, _, err := CreateContainer(netconfCalicoIPAM, name, "") + containerID, _, _, contVeth, contAddresses, _, err := CreateContainer(netconfCalicoIPAM, name, "") Expect(err).NotTo(HaveOccurred()) mac := contVeth.Attrs().HardwareAddr @@ -501,12 +513,16 @@ var _ = Describe("CalicoCni", func() { endpoints, err := calicoClient.WorkloadEndpoints().List(api.WorkloadEndpointMetadata{}) Expect(err).ShouldNot(HaveOccurred()) Expect(endpoints.Items).Should(HaveLen(1)) + + // Set the Revision to nil since we can't assert it's exact value. + endpoints.Items[0].Metadata.Revision = nil Expect(endpoints.Items[0].Metadata).Should(Equal(api.WorkloadEndpointMetadata{ - Node: hostname, - Name: "eth0", - Workload: fmt.Sprintf("test.%s", name), - Orchestrator: "k8s", - Labels: map[string]string{"calico/k8s_ns": "test"}, + Node: hostname, + Name: "eth0", + Workload: fmt.Sprintf("test.%s", name), + ActiveInstanceID: containerID, + Orchestrator: "k8s", + Labels: map[string]string{"calico/k8s_ns": "test"}, })) Expect(endpoints.Items[0].Spec).Should(Equal(api.WorkloadEndpointSpec{ InterfaceName: interfaceName, @@ -602,6 +618,147 @@ var _ = Describe("CalicoCni", func() { Expect(pod2IP).Should(Equal(expectedIP)) }) }) + + // This specific test case is for an issue where k8s would send extra DELs being "aggressive". See: https://github.com/kubernetes/kubernetes/issues/44100 + Context("ADD a container with a ContainerID and DEL it with the same ContainerID then ADD a new container with a different ContainerID, and send a DEL for the old ContainerID", func() { + It("Use different CNI_ContainerIDs to ADD and DEL the container", func() { + netconf := fmt.Sprintf(` + { + "cniVersion": "%s", + "name": "net7", + "type": "calico", + "etcd_endpoints": "http://%s:2379", + "ipam": { + "type": "calico-ipam" + }, + "kubernetes": { + "k8s_api_root": "http://127.0.0.1:8080" + }, + "policy": {"type": "k8s"}, + "log_level":"info" + }`, cniVersion, os.Getenv("ETCD_IP")) + + // Create a new ipPool. + ipPool := "10.0.0.0/24" + c, _ := testutils.NewClient("") + testutils.CreateNewIPPool(*c, ipPool, false, false, true) + + config, err := clientcmd.DefaultClientConfig.ClientConfig() + Expect(err).NotTo(HaveOccurred()) + + clientset, err := kubernetes.NewForConfig(config) + Expect(err).NotTo(HaveOccurred()) + + // Now create a K8s pod. + name := fmt.Sprintf("run%d-pool", rand.Uint32()) + + cniContainerIDX := "container-id-00X" + cniContainerIDY := "container-id-00Y" + + pod, err := clientset.Pods(K8S_TEST_NS).Create( + &v1.Pod{ + ObjectMeta: v1.ObjectMeta{ + Name: name, + }, + Spec: v1.PodSpec{Containers: []v1.Container{{ + Name: fmt.Sprintf("container-%s", name), + Image: "ignore", + }}}, + }) + + Expect(err).NotTo(HaveOccurred()) + + logger.Infof("Created POD object: %v", pod) + + // ADD the container with passing a CNI_ContainerID "X". + _, netnspath, session, _, _, _, err := CreateContainerWithId(netconf, name, "", cniContainerIDX) + Expect(err).ShouldNot(HaveOccurred()) + Eventually(session).Should(gexec.Exit()) + + result, err := GetResultForCurrent(session, cniVersion) + if err != nil { + log.Fatalf("Error getting result from the session: %v\n", err) + } + + log.Printf("Unmarshaled result: %v\n", result) + + // Assert that the endpoint is created in the backend datastore with ContainerID "X". + endpoints, err := calicoClient.WorkloadEndpoints().List(api.WorkloadEndpointMetadata{}) + Expect(err).ShouldNot(HaveOccurred()) + Expect(endpoints.Items).Should(HaveLen(1)) + + // Set the Revision to nil since we can't assert it's exact value. + endpoints.Items[0].Metadata.Revision = nil + Expect(endpoints.Items[0].Metadata).Should(Equal(api.WorkloadEndpointMetadata{ + Node: hostname, + Name: "eth0", + Workload: fmt.Sprintf("test.%s", name), + ActiveInstanceID: cniContainerIDX, + Orchestrator: "k8s", + Labels: map[string]string{"calico/k8s_ns": "test"}, + })) + + // Delete the container with the CNI_ContainerID "X". + exitCode, err := DeleteContainerWithId(netconf, netnspath, name, cniContainerIDX) + Expect(err).ShouldNot(HaveOccurred()) + Expect(exitCode).Should(Equal(0)) + + // The endpoint for ContainerID "X" should not exist in the backend datastore. + endpoints, err = calicoClient.WorkloadEndpoints().List(api.WorkloadEndpointMetadata{}) + Expect(err).ShouldNot(HaveOccurred()) + Expect(endpoints.Items).Should(HaveLen(0)) + + // ADD a new container with passing a CNI_ContainerID "Y". + _, netnspath, session, _, _, _, err = CreateContainerWithId(netconf, name, "", cniContainerIDY) + Expect(err).ShouldNot(HaveOccurred()) + Eventually(session).Should(gexec.Exit()) + + result, err = GetResultForCurrent(session, cniVersion) + if err != nil { + log.Fatalf("Error getting result from the session: %v\n", err) + } + + log.Printf("Unmarshaled result: %v\n", result) + + // Assert that the endpoint is created in the backend datastore with ContainerID "Y". + endpoints, err = calicoClient.WorkloadEndpoints().List(api.WorkloadEndpointMetadata{}) + Expect(err).ShouldNot(HaveOccurred()) + Expect(endpoints.Items).Should(HaveLen(1)) + + // Set the Revision to nil since we can't assert it's exact value. + endpoints.Items[0].Metadata.Revision = nil + Expect(endpoints.Items[0].Metadata).Should(Equal(api.WorkloadEndpointMetadata{ + Node: hostname, + Name: "eth0", + Workload: fmt.Sprintf("test.%s", name), + ActiveInstanceID: cniContainerIDY, + Orchestrator: "k8s", + Labels: map[string]string{"calico/k8s_ns": "test"}, + })) + + // Delete the container with the CNI_ContainerID "X" again. + exitCode, err = DeleteContainerWithId(netconf, netnspath, name, cniContainerIDX) + Expect(err).ShouldNot(HaveOccurred()) + Expect(exitCode).Should(Equal(0)) + + // Assert that the endpoint with ContainerID "Y" is still in the datastore. + endpoints, err = calicoClient.WorkloadEndpoints().List(api.WorkloadEndpointMetadata{}) + Expect(err).ShouldNot(HaveOccurred()) + Expect(endpoints.Items).Should(HaveLen(1)) + + // Set the Revision to nil since we can't assert it's exact value. + endpoints.Items[0].Metadata.Revision = nil + Expect(endpoints.Items[0].Metadata).Should(Equal(api.WorkloadEndpointMetadata{ + Node: hostname, + Name: "eth0", + Workload: fmt.Sprintf("test.%s", name), + ActiveInstanceID: cniContainerIDY, + Orchestrator: "k8s", + Labels: map[string]string{"calico/k8s_ns": "test"}, + })) + + }) + }) }) }) }) diff --git a/calico_cni_test.go b/calico_cni_test.go index 156dd6f76..c2c755c05 100644 --- a/calico_cni_test.go +++ b/calico_cni_test.go @@ -92,11 +92,15 @@ var _ = Describe("CalicoCni", func() { endpoints, err := calicoClient.WorkloadEndpoints().List(api.WorkloadEndpointMetadata{}) Expect(err).ShouldNot(HaveOccurred()) Expect(endpoints.Items).Should(HaveLen(1)) + + // Set the Revision to nil since we can't assert it's exact value. + endpoints.Items[0].Metadata.Revision = nil Expect(endpoints.Items[0].Metadata).Should(Equal(api.WorkloadEndpointMetadata{ - Node: hostname, - Name: "eth0", - Workload: containerID, - Orchestrator: "cni", + Node: hostname, + Name: "eth0", + Workload: containerID, + ActiveInstanceID: "", + Orchestrator: "cni", })) Expect(endpoints.Items[0].Spec).Should(Equal(api.WorkloadEndpointSpec{ @@ -176,7 +180,7 @@ var _ = Describe("CalicoCni", func() { }) Describe("Run Calico CNI plugin", func() { - Context("depricate Hostname for nodename", func() { + Context("deprecate Hostname for nodename", func() { netconf := fmt.Sprintf(` { "cniVersion": "%s", @@ -200,17 +204,21 @@ var _ = Describe("CalicoCni", func() { log.Fatalf("Error getting result from the session: %v\n", err) } - log.Printf("Unmarshaled result: %v\n", result) + log.Printf("Unmarshalled result: %v\n", result) // The endpoint is created in etcd endpoints, err := calicoClient.WorkloadEndpoints().List(api.WorkloadEndpointMetadata{}) Expect(err).ShouldNot(HaveOccurred()) Expect(endpoints.Items).Should(HaveLen(1)) + + // Set the Revision to nil since we can't assert it's exact value. + endpoints.Items[0].Metadata.Revision = nil Expect(endpoints.Items[0].Metadata).Should(Equal(api.WorkloadEndpointMetadata{ - Node: "namedHostname", - Name: "eth0", - Workload: containerID, - Orchestrator: "cni", + Node: "namedHostname", + Name: "eth0", + Workload: containerID, + ActiveInstanceID: "", + Orchestrator: "cni", })) _, err = DeleteContainer(netconf, netnspath, "") @@ -220,7 +228,7 @@ var _ = Describe("CalicoCni", func() { }) Describe("Run Calico CNI plugin", func() { - Context("depricate Hostname for nodename", func() { + Context("deprecate Hostname for nodename", func() { netconf := fmt.Sprintf(` { "cniVersion": "%s", @@ -245,17 +253,21 @@ var _ = Describe("CalicoCni", func() { log.Fatalf("Error getting result from the session: %v\n", err) } - log.Printf("Unmarshaled result: %v\n", result) + log.Printf("Unmarshalled result: %v\n", result) // The endpoint is created in etcd endpoints, err := calicoClient.WorkloadEndpoints().List(api.WorkloadEndpointMetadata{}) Expect(err).ShouldNot(HaveOccurred()) Expect(endpoints.Items).Should(HaveLen(1)) + + // Set the Revision to nil since we can't assert it's exact value. + endpoints.Items[0].Metadata.Revision = nil Expect(endpoints.Items[0].Metadata).Should(Equal(api.WorkloadEndpointMetadata{ - Node: "namedNodename", - Name: "eth0", - Workload: containerID, - Orchestrator: "cni", + Node: "namedNodename", + Name: "eth0", + Workload: containerID, + ActiveInstanceID: "", + Orchestrator: "cni", })) _, err = DeleteContainer(netconf, netnspath, "") diff --git a/glide.lock b/glide.lock index 8626834f4..9ec6d3ff9 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: 8004af46ac86678ab2d3788808fd138bc43e5bc67bf54747f5af99bf8b4f9ad3 -updated: 2017-04-10T13:05:24.123332168-07:00 +hash: a92eaff56ed933f7b3f4880111e995609207a3598d605c6e3bb9844bc2ec528c +updated: 2017-04-24T14:11:45.254478307-07:00 imports: - name: cloud.google.com/go version: 3b1ae45394a234c385be014e9a488f2bb6eef821 @@ -154,7 +154,7 @@ imports: - name: github.com/projectcalico/go-yaml-wrapper version: 598e54215bee41a19677faa4f0c32acd2a87eb56 - name: github.com/projectcalico/libcalico-go - version: 83090c92db7fde08538caea98d766464f94aadd0 + version: 7b7ca3676570fd4652ece26c734e3b02b8a13228 subpackages: - lib/api - lib/api/unversioned @@ -168,12 +168,17 @@ imports: - lib/backend/model - lib/client - lib/errors + - lib/hash - lib/hwm - lib/ipip - lib/net - lib/numorstring - lib/scope + - lib/selector + - lib/selector/parser + - lib/selector/tokenizer - lib/testutils + - lib/validator - name: github.com/PuerkitoBio/purell version: 8a290539e2e8629dbc4e6bad948158f790ec31f4 - name: github.com/PuerkitoBio/urlesc @@ -243,6 +248,8 @@ imports: - internal/remote_api - internal/urlfetch - urlfetch +- name: gopkg.in/go-playground/validator.v8 + version: 5f57d2222ad794d0dffb07e664ea05e2ee07d60c - name: gopkg.in/inf.v0 version: 3887ee99ecf07df5b447e9b00d9c0b2adaa9f3e4 - name: gopkg.in/tchap/go-patricia.v2 diff --git a/glide.yaml b/glide.yaml index 38e90e72f..8e9f2a3ff 100644 --- a/glide.yaml +++ b/glide.yaml @@ -15,7 +15,7 @@ import: subpackages: - gexec - package: github.com/projectcalico/libcalico-go - version: v1.1.0 + version: v1.1.5 subpackages: - lib/api - lib/client diff --git a/k8s/k8s.go b/k8s/k8s.go index 6581a5eb0..0401f8e72 100644 --- a/k8s/k8s.go +++ b/k8s/k8s.go @@ -28,6 +28,7 @@ import ( "github.com/projectcalico/cni-plugin/utils" "github.com/projectcalico/libcalico-go/lib/api" k8sbackend "github.com/projectcalico/libcalico-go/lib/backend/k8s" + cerrors "github.com/projectcalico/libcalico-go/lib/errors" cnet "github.com/projectcalico/libcalico-go/lib/net" "encoding/json" @@ -239,6 +240,7 @@ func CmdAddK8s(args *skel.CmdArgs, conf utils.NetConf, nodename string, calicoCl endpoint = api.NewWorkloadEndpoint() endpoint.Metadata.Name = args.IfName endpoint.Metadata.Node = nodename + endpoint.Metadata.ActiveInstanceID = args.ContainerID endpoint.Metadata.Orchestrator = orchestrator endpoint.Metadata.Workload = workload endpoint.Metadata.Labels = labels @@ -294,6 +296,67 @@ func CmdAddK8s(args *skel.CmdArgs, conf utils.NetConf, nodename string, calicoCl return result, nil } +// CmdDelK8s performs the "DEL" operation on a kubernetes pod. +// The following logic only applies to kubernetes since it sends multiple DELs for the same +// endpoint. See: https://github.com/kubernetes/kubernetes/issues/44100 +// We store CNI_CONTAINERID as ActiveInstanceID in WEP Metadata for k8s, +// so in this function we need to get the WEP and make sure we check if ContainerID and ActiveInstanceID +// are the same before deleting the pod being deleted. +func CmdDelK8s(c *calicoclient.Client, ep api.WorkloadEndpointMetadata, args *skel.CmdArgs, conf utils.NetConf, logger *log.Entry) error { + wep, err := c.WorkloadEndpoints().Get(ep) + if err != nil { + if _, ok := err.(cerrors.ErrorResourceDoesNotExist); ok { + // We can talk to the datastore but WEP doesn't exist in there, + // but we still want to go ahead with the clean up. So, log a warning and continue with the clean up. + logger.WithField("WorkloadEndpoint", ep).Warning("WorkloadEndpoint does not exist in the datastore, moving forward with the clean up") + } else { + // Could not connect to datastore (connection refused, unauthorized, etc.) + // so we have no way of knowing/checking ActiveInstanceID. To protect the endpoint + // from false DEL, we return the error without deleting/cleaning up. + return err + } + + // Check if ActiveInstanceID is populated (it will be an empty string "" if it was populated + // before this field was added to the API), and if it is there then compare it with ContainerID + // passed by the orchestrator to make sure they are the same, return without deleting if they aren't. + } else if wep.Metadata.ActiveInstanceID != "" && args.ContainerID != wep.Metadata.ActiveInstanceID { + logger.WithField("WorkloadEndpoint", wep).Warning("CNI_ContainerID does not match WorkloadEndpoint ActiveInstanceID so ignoring the DELETE cmd.") + return nil + + // Delete the WorkloadEndpoint object from the datastore. + // In case of k8s, where we are deleting the WEP we got from the Datastore, + // this Delete is a Compare-and-Delete, so if *any* field in the WEP changed from + // the time we get WEP until here then the Delete operation will fail. + } else if err = c.WorkloadEndpoints().Delete(wep.Metadata); err != nil { + switch err := err.(type) { + case cerrors.ErrorResourceDoesNotExist: + // Log and proceed with the clean up if WEP doesn't exist. + logger.WithField("endpoint", wep).Info("Endpoint object does not exist, no need to clean up.") + case cerrors.ErrorResourceUpdateConflict: + // This case means the WEP object was modified between the time we did the Get and now, + // so it's not a safe Compare-and-Delete operation, so log and abort with the error. + // Returning an error here is with the assumption that k8s (kubelet) retries deleting again. + logger.WithField("endpoint", wep).Warning("Error deleting endpoint: endpoint was modified before it could be deleted.") + return fmt.Errorf("Error deleting endpoint: endpoint was modified before it could be deleted: %v", err) + default: + return err + } + } + + // Release the IP address by calling the configured IPAM plugin. + ipamErr := utils.CleanUpIPAM(conf, args, logger) + + // Clean up namespace by removing the interfaces. + err = utils.CleanUpNamespace(args, logger) + if err != nil { + return err + } + + // Return the IPAM error if there was one. The IPAM error will be lost if there was also an error in cleaning up + // the device or endpoint, but crucially, the user will know the overall operation failed. + return ipamErr +} + // ipAddrsResult parses the ipAddrs annotation and calls the configured IPAM plugin for // each IP passed to it by setting the IP field in CNI_ARGS, and returns the result of calling the IPAM plugin. // Example annotation value string: "[\"10.0.0.1\", \"2001:db8::1\"]" diff --git a/utils/utils.go b/utils/utils.go index 48d76a35f..60c6cfa9d 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -14,6 +14,7 @@ package utils import ( + "encoding/json" "errors" "fmt" "net" @@ -24,13 +25,16 @@ import ( "strings" + "github.com/containernetworking/cni/pkg/ip" "github.com/containernetworking/cni/pkg/ipam" + "github.com/containernetworking/cni/pkg/ns" "github.com/containernetworking/cni/pkg/skel" "github.com/containernetworking/cni/pkg/types" "github.com/containernetworking/cni/pkg/types/current" "github.com/projectcalico/libcalico-go/lib/api" "github.com/projectcalico/libcalico-go/lib/client" cnet "github.com/projectcalico/libcalico-go/lib/net" + "github.com/vishvananda/netlink" ) func min(a, b int) int { @@ -40,6 +44,72 @@ func min(a, b int) int { return b } +// CleanUpNamespace deletes the devices in the network namespace. +func CleanUpNamespace(args *skel.CmdArgs, logger *log.Entry) error { + // Only try to delete the device if a namespace was passed in. + if args.Netns != "" { + logger.Debug("Checking namespace & device exist.") + devErr := ns.WithNetNSPath(args.Netns, func(_ ns.NetNS) error { + _, err := netlink.LinkByName(args.IfName) + return err + }) + + if devErr == nil { + fmt.Fprintf(os.Stderr, "Calico CNI deleting device in netns %s\n", args.Netns) + err := ns.WithNetNSPath(args.Netns, func(_ ns.NetNS) error { + _, err := ip.DelLinkByNameAddr(args.IfName, netlink.FAMILY_V4) + return err + }) + + if err != nil { + return err + } + } else { + logger.Info("veth does not exist, no need to clean up.") + } + } + + return nil +} + +// CleanUpIPAM calls IPAM plugin to release the IP address. +// It also contains IPAM plugin specific changes needed before calling the plugin. +func CleanUpIPAM(conf NetConf, args *skel.CmdArgs, logger *log.Entry) error { + fmt.Fprintf(os.Stderr, "Calico CNI releasing IP address\n") + logger.WithFields(log.Fields{"paths": os.Getenv("CNI_PATH"), + "type": conf.IPAM.Type}).Debug("Looking for IPAM plugin in paths") + + // We need to replace "usePodCidr" with a valid, but dummy podCidr string with "host-local" IPAM. + if conf.IPAM.Type == "host-local" && strings.EqualFold(conf.IPAM.Subnet, "usePodCidr") { + // host-local IPAM releases the IP by ContainerID, so podCidr isn't really used to release the IP. + // It just needs a valid CIDR, but it doesn't have to be the CIDR associated with the host. + dummyPodCidr := "0.0.0.0/0" + var stdinData map[string]interface{} + + err := json.Unmarshal(args.StdinData, &stdinData) + if err != nil { + return err + } + + logger.WithField("podCidr", dummyPodCidr).Info("Using a dummy podCidr to release the IP") + stdinData["ipam"].(map[string]interface{})["subnet"] = dummyPodCidr + + args.StdinData, err = json.Marshal(stdinData) + if err != nil { + return err + } + logger.WithField("stdin", args.StdinData).Debug("Updated stdin data for Delete Cmd") + } + + err := ipam.ExecDel(conf.IPAM.Type, args.StdinData) + + if err != nil { + logger.Error(err) + } + + return err +} + // ValidateNetworkName checks that the network name meets felix's expectations func ValidateNetworkName(name string) error { matched, err := regexp.MatchString(`^[a-zA-Z0-9_\.\-]+$`, name)