Skip to content

Commit

Permalink
Merge pull request #312 from gunjan5/double-del
Browse files Browse the repository at this point in the history
Check ContainerID before Deleting a pod
  • Loading branch information
tomdee authored Apr 24, 2017
2 parents 78a894f + 5355d6f commit 2618641
Show file tree
Hide file tree
Showing 7 changed files with 372 additions and 106 deletions.
83 changes: 20 additions & 63 deletions calico.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,11 @@ import (
"fmt"
"os"
"runtime"
"strings"

"github.com/vishvananda/netlink"

"net"

log "github.com/Sirupsen/logrus"
"github.com/containernetworking/cni/pkg/ip"
"github.com/containernetworking/cni/pkg/ipam"
"github.com/containernetworking/cni/pkg/ns"
"github.com/containernetworking/cni/pkg/skel"
"github.com/containernetworking/cni/pkg/types"
"github.com/containernetworking/cni/pkg/types/current"
Expand Down Expand Up @@ -306,80 +301,42 @@ func cmdDel(args *skel.CmdArgs) error {
"Node": nodename,
}).Info("Extracted identifiers")

// Always try to release the address. Don't deal with any errors till the endpoints are cleaned up.
fmt.Fprintf(os.Stderr, "Calico CNI releasing IP address\n")
logger.WithFields(log.Fields{"paths": os.Getenv("CNI_PATH"),
"type": conf.IPAM.Type}).Debug("Looking for IPAM plugin in paths")

// We need to replace "usePodCidr" with a valid, but dummy podCidr string with "host-local" IPAM.
if conf.IPAM.Type == "host-local" && strings.EqualFold(conf.IPAM.Subnet, "usePodCidr") {

// host-local IPAM releases the IP by ContainerID, so podCidr isn't really used to release the IP.
// It just needs a valid CIDR, but it doesn't have to be the CIDR associated with the host.
dummyPodCidr := "0.0.0.0/0"
var stdinData map[string]interface{}

if err := json.Unmarshal(args.StdinData, &stdinData); err != nil {
return err
}

logger.WithField("podCidr", dummyPodCidr).Info("Using a dummy podCidr to release the IP")
stdinData["ipam"].(map[string]interface{})["subnet"] = dummyPodCidr

args.StdinData, err = json.Marshal(stdinData)
if err != nil {
return err
}
logger.WithField("stdin", args.StdinData).Debug("Updated stdin data for Delete Cmd")
}

ipamErr := ipam.ExecDel(conf.IPAM.Type, args.StdinData)

if ipamErr != nil {
logger.Error(ipamErr)
}

calicoClient, err := CreateClient(conf)
if err != nil {
return err
}

ep := api.WorkloadEndpointMetadata{
wep := api.WorkloadEndpointMetadata{
Name: args.IfName,
Node: nodename,
Orchestrator: orchestrator,
Workload: workload}
if err = calicoClient.WorkloadEndpoints().Delete(ep); err != nil {
if _, ok := err.(errors.ErrorResourceDoesNotExist); ok {
logger.WithField("endpoint", ep).Info("Endpoint object does not exist, no need to clean up.")
} else {
return err
}
Workload: workload,
}

// Only try to delete the device if a namespace was passed in.
if args.Netns != "" {
logger.Debug("Checking namespace & device exist.")
devErr := ns.WithNetNSPath(args.Netns, func(_ ns.NetNS) error {
_, err := netlink.LinkByName(args.IfName)
return err
})
// Handle k8s specific bits of handling the DEL.
if orchestrator == "k8s" {
return k8s.CmdDelK8s(calicoClient, wep, args, conf, logger)
}

if devErr == nil {
fmt.Fprintf(os.Stderr, "Calico CNI deleting device in netns %s\n", args.Netns)
err = ns.WithNetNSPath(args.Netns, func(_ ns.NetNS) error {
_, err = ip.DelLinkByNameAddr(args.IfName, netlink.FAMILY_V4)
return err
})
// Release the IP address by calling the configured IPAM plugin.
ipamErr := CleanUpIPAM(conf, args, logger)

if err != nil {
return err
}
// Delete the WorkloadEndpoint object from the datastore.
if err = calicoClient.WorkloadEndpoints().Delete(wep); err != nil {
if _, ok := err.(errors.ErrorResourceDoesNotExist); ok {
// Log and proceed with the clean up if WEP doesn't exist.
logger.WithField("endpoint", wep).Info("Endpoint object does not exist, no need to clean up.")
} else {
logger.Info("veth does not exist, no need to clean up.")
return err
}
}

// Clean up namespace by removing the interfaces.
err = CleanUpNamespace(args, logger)
if err != nil {
return err
}

// Return the IPAM error if there was one. The IPAM error will be lost if there was also an error in cleaning up
// the device or endpoint, but crucially, the user will know the overall operation failed.
return ipamErr
Expand Down
203 changes: 180 additions & 23 deletions calico_cni_k8s_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,16 @@ var _ = Describe("CalicoCni", func() {
endpoints, err := calicoClient.WorkloadEndpoints().List(api.WorkloadEndpointMetadata{})
Expect(err).ShouldNot(HaveOccurred())
Expect(endpoints.Items).Should(HaveLen(1))

// Set the Revision to nil since we can't assert it's exact value.
endpoints.Items[0].Metadata.Revision = nil
Expect(endpoints.Items[0].Metadata).Should(Equal(api.WorkloadEndpointMetadata{
Node: hostname,
Name: "eth0",
Workload: fmt.Sprintf("test.%s", name),
Orchestrator: "k8s",
Labels: map[string]string{"calico/k8s_ns": "test"},
Node: hostname,
Name: "eth0",
Workload: fmt.Sprintf("test.%s", name),
ActiveInstanceID: containerID,
Orchestrator: "k8s",
Labels: map[string]string{"calico/k8s_ns": "test"},
}))
Expect(endpoints.Items[0].Spec).Should(Equal(api.WorkloadEndpointSpec{
InterfaceName: interfaceName,
Expand Down Expand Up @@ -312,7 +316,7 @@ var _ = Describe("CalicoCni", func() {

logger.Infof("Created POD object: %v", pod)

_, _, _, contVeth, contAddresses, _, err := CreateContainer(netconfCalicoIPAM, name, "")
containerID, _, _, contVeth, contAddresses, _, err := CreateContainer(netconfCalicoIPAM, name, "")
Expect(err).NotTo(HaveOccurred())
mac := contVeth.Attrs().HardwareAddr

Expand All @@ -327,12 +331,16 @@ var _ = Describe("CalicoCni", func() {
endpoints, err := calicoClient.WorkloadEndpoints().List(api.WorkloadEndpointMetadata{})
Expect(err).ShouldNot(HaveOccurred())
Expect(endpoints.Items).Should(HaveLen(1))

// Set the Revision to nil since we can't assert it's exact value.
endpoints.Items[0].Metadata.Revision = nil
Expect(endpoints.Items[0].Metadata).Should(Equal(api.WorkloadEndpointMetadata{
Node: hostname,
Name: "eth0",
Workload: fmt.Sprintf("test.%s", name),
Orchestrator: "k8s",
Labels: map[string]string{"calico/k8s_ns": "test"},
Node: hostname,
Name: "eth0",
Workload: fmt.Sprintf("test.%s", name),
ActiveInstanceID: containerID,
Orchestrator: "k8s",
Labels: map[string]string{"calico/k8s_ns": "test"},
}))
Expect(endpoints.Items[0].Spec).Should(Equal(api.WorkloadEndpointSpec{
InterfaceName: interfaceName,
Expand Down Expand Up @@ -400,7 +408,7 @@ var _ = Describe("CalicoCni", func() {

logger.Infof("Created POD object: %v", pod)

_, _, _, contVeth, contAddresses, _, err := CreateContainer(netconfCalicoIPAM, name, "")
containerID, _, _, contVeth, contAddresses, _, err := CreateContainer(netconfCalicoIPAM, name, "")
Expect(err).NotTo(HaveOccurred())
mac := contVeth.Attrs().HardwareAddr

Expand All @@ -415,12 +423,16 @@ var _ = Describe("CalicoCni", func() {
endpoints, err := calicoClient.WorkloadEndpoints().List(api.WorkloadEndpointMetadata{})
Expect(err).ShouldNot(HaveOccurred())
Expect(endpoints.Items).Should(HaveLen(1))

// Set the Revision to nil since we can't assert it's exact value.
endpoints.Items[0].Metadata.Revision = nil
Expect(endpoints.Items[0].Metadata).Should(Equal(api.WorkloadEndpointMetadata{
Node: hostname,
Name: "eth0",
Workload: fmt.Sprintf("test.%s", name),
Orchestrator: "k8s",
Labels: map[string]string{"calico/k8s_ns": "test"},
Node: hostname,
Name: "eth0",
Workload: fmt.Sprintf("test.%s", name),
ActiveInstanceID: containerID,
Orchestrator: "k8s",
Labels: map[string]string{"calico/k8s_ns": "test"},
}))
Expect(endpoints.Items[0].Spec).Should(Equal(api.WorkloadEndpointSpec{
InterfaceName: interfaceName,
Expand Down Expand Up @@ -486,7 +498,7 @@ var _ = Describe("CalicoCni", func() {

logger.Infof("Created POD object: %v", pod)

_, _, _, contVeth, contAddresses, _, err := CreateContainer(netconfCalicoIPAM, name, "")
containerID, _, _, contVeth, contAddresses, _, err := CreateContainer(netconfCalicoIPAM, name, "")
Expect(err).NotTo(HaveOccurred())
mac := contVeth.Attrs().HardwareAddr

Expand All @@ -501,12 +513,16 @@ var _ = Describe("CalicoCni", func() {
endpoints, err := calicoClient.WorkloadEndpoints().List(api.WorkloadEndpointMetadata{})
Expect(err).ShouldNot(HaveOccurred())
Expect(endpoints.Items).Should(HaveLen(1))

// Set the Revision to nil since we can't assert it's exact value.
endpoints.Items[0].Metadata.Revision = nil
Expect(endpoints.Items[0].Metadata).Should(Equal(api.WorkloadEndpointMetadata{
Node: hostname,
Name: "eth0",
Workload: fmt.Sprintf("test.%s", name),
Orchestrator: "k8s",
Labels: map[string]string{"calico/k8s_ns": "test"},
Node: hostname,
Name: "eth0",
Workload: fmt.Sprintf("test.%s", name),
ActiveInstanceID: containerID,
Orchestrator: "k8s",
Labels: map[string]string{"calico/k8s_ns": "test"},
}))
Expect(endpoints.Items[0].Spec).Should(Equal(api.WorkloadEndpointSpec{
InterfaceName: interfaceName,
Expand Down Expand Up @@ -602,6 +618,147 @@ var _ = Describe("CalicoCni", func() {
Expect(pod2IP).Should(Equal(expectedIP))
})
})

// This specific test case is for an issue where k8s would send extra DELs being "aggressive". See: https://github.com/kubernetes/kubernetes/issues/44100
Context("ADD a container with a ContainerID and DEL it with the same ContainerID then ADD a new container with a different ContainerID, and send a DEL for the old ContainerID", func() {
It("Use different CNI_ContainerIDs to ADD and DEL the container", func() {
netconf := fmt.Sprintf(`
{
"cniVersion": "%s",
"name": "net7",
"type": "calico",
"etcd_endpoints": "http://%s:2379",
"ipam": {
"type": "calico-ipam"
},
"kubernetes": {
"k8s_api_root": "http://127.0.0.1:8080"
},
"policy": {"type": "k8s"},
"log_level":"info"
}`, cniVersion, os.Getenv("ETCD_IP"))

// Create a new ipPool.
ipPool := "10.0.0.0/24"
c, _ := testutils.NewClient("")
testutils.CreateNewIPPool(*c, ipPool, false, false, true)

config, err := clientcmd.DefaultClientConfig.ClientConfig()
Expect(err).NotTo(HaveOccurred())

clientset, err := kubernetes.NewForConfig(config)
Expect(err).NotTo(HaveOccurred())

// Now create a K8s pod.
name := fmt.Sprintf("run%d-pool", rand.Uint32())

cniContainerIDX := "container-id-00X"
cniContainerIDY := "container-id-00Y"

pod, err := clientset.Pods(K8S_TEST_NS).Create(
&v1.Pod{
ObjectMeta: v1.ObjectMeta{
Name: name,
},
Spec: v1.PodSpec{Containers: []v1.Container{{
Name: fmt.Sprintf("container-%s", name),
Image: "ignore",
}}},
})

Expect(err).NotTo(HaveOccurred())

logger.Infof("Created POD object: %v", pod)

// ADD the container with passing a CNI_ContainerID "X".
_, netnspath, session, _, _, _, err := CreateContainerWithId(netconf, name, "", cniContainerIDX)
Expect(err).ShouldNot(HaveOccurred())
Eventually(session).Should(gexec.Exit())

result, err := GetResultForCurrent(session, cniVersion)
if err != nil {
log.Fatalf("Error getting result from the session: %v\n", err)
}

log.Printf("Unmarshaled result: %v\n", result)

// Assert that the endpoint is created in the backend datastore with ContainerID "X".
endpoints, err := calicoClient.WorkloadEndpoints().List(api.WorkloadEndpointMetadata{})
Expect(err).ShouldNot(HaveOccurred())
Expect(endpoints.Items).Should(HaveLen(1))

// Set the Revision to nil since we can't assert it's exact value.
endpoints.Items[0].Metadata.Revision = nil
Expect(endpoints.Items[0].Metadata).Should(Equal(api.WorkloadEndpointMetadata{
Node: hostname,
Name: "eth0",
Workload: fmt.Sprintf("test.%s", name),
ActiveInstanceID: cniContainerIDX,
Orchestrator: "k8s",
Labels: map[string]string{"calico/k8s_ns": "test"},
}))

// Delete the container with the CNI_ContainerID "X".
exitCode, err := DeleteContainerWithId(netconf, netnspath, name, cniContainerIDX)
Expect(err).ShouldNot(HaveOccurred())
Expect(exitCode).Should(Equal(0))

// The endpoint for ContainerID "X" should not exist in the backend datastore.
endpoints, err = calicoClient.WorkloadEndpoints().List(api.WorkloadEndpointMetadata{})
Expect(err).ShouldNot(HaveOccurred())
Expect(endpoints.Items).Should(HaveLen(0))

// ADD a new container with passing a CNI_ContainerID "Y".
_, netnspath, session, _, _, _, err = CreateContainerWithId(netconf, name, "", cniContainerIDY)
Expect(err).ShouldNot(HaveOccurred())
Eventually(session).Should(gexec.Exit())

result, err = GetResultForCurrent(session, cniVersion)
if err != nil {
log.Fatalf("Error getting result from the session: %v\n", err)
}

log.Printf("Unmarshaled result: %v\n", result)

// Assert that the endpoint is created in the backend datastore with ContainerID "Y".
endpoints, err = calicoClient.WorkloadEndpoints().List(api.WorkloadEndpointMetadata{})
Expect(err).ShouldNot(HaveOccurred())
Expect(endpoints.Items).Should(HaveLen(1))

// Set the Revision to nil since we can't assert it's exact value.
endpoints.Items[0].Metadata.Revision = nil
Expect(endpoints.Items[0].Metadata).Should(Equal(api.WorkloadEndpointMetadata{
Node: hostname,
Name: "eth0",
Workload: fmt.Sprintf("test.%s", name),
ActiveInstanceID: cniContainerIDY,
Orchestrator: "k8s",
Labels: map[string]string{"calico/k8s_ns": "test"},
}))

// Delete the container with the CNI_ContainerID "X" again.
exitCode, err = DeleteContainerWithId(netconf, netnspath, name, cniContainerIDX)
Expect(err).ShouldNot(HaveOccurred())
Expect(exitCode).Should(Equal(0))

// Assert that the endpoint with ContainerID "Y" is still in the datastore.
endpoints, err = calicoClient.WorkloadEndpoints().List(api.WorkloadEndpointMetadata{})
Expect(err).ShouldNot(HaveOccurred())
Expect(endpoints.Items).Should(HaveLen(1))

// Set the Revision to nil since we can't assert it's exact value.
endpoints.Items[0].Metadata.Revision = nil
Expect(endpoints.Items[0].Metadata).Should(Equal(api.WorkloadEndpointMetadata{
Node: hostname,
Name: "eth0",
Workload: fmt.Sprintf("test.%s", name),
ActiveInstanceID: cniContainerIDY,
Orchestrator: "k8s",
Labels: map[string]string{"calico/k8s_ns": "test"},
}))

})
})
})
})
})
Loading

0 comments on commit 2618641

Please sign in to comment.