From ac558a7f8728aa837b6627464c728846b02fdf81 Mon Sep 17 00:00:00 2001 From: Casey Davenport Date: Fri, 20 May 2016 13:56:06 -0700 Subject: [PATCH 1/3] Support kubeconfig for policy. --- Makefile | 2 +- calico.py | 6 ++-- calico_cni/policy_drivers.py | 58 ++++++++++++++++++++++++++---------- 3 files changed, 47 insertions(+), 19 deletions(-) diff --git a/Makefile b/Makefile index 876074717..4dbb844cb 100644 --- a/Makefile +++ b/Makefile @@ -21,7 +21,7 @@ dist/calico: $(SRCFILES) docker run --rm \ -v `pwd`:/code \ calico/build:$(BUILD_VERSION) \ - pyinstaller calico.py -ayF + /bin/sh -c "pip install pykube && pyinstaller calico.py -ayF" # Makes the IPAM plugin. dist/calico-ipam: $(SRCFILES) diff --git a/calico.py b/calico.py index 772ea39a4..b4686ccd8 100755 --- a/calico.py +++ b/calico.py @@ -35,7 +35,7 @@ from calico_cni.container_engines import get_container_engine from calico_cni.constants import * -from calico_cni.policy_drivers import ApplyProfileError, get_policy_driver +from calico_cni.policy_drivers import PolicyException, get_policy_driver from ipam import IpamPlugin # Logging configuration. @@ -259,7 +259,7 @@ def _add_new_endpoint(self): # Provision / apply profile on the created endpoint. try: self.policy_driver.apply_profile(endpoint) - except ApplyProfileError as e: + except PolicyException as e: _log.error("Failed to apply profile to endpoint %s", endpoint.name) self._remove_veth(endpoint) @@ -297,7 +297,7 @@ def _add_existing_endpoint(self, endpoint): # Apply a new profile to this endpoint. try: self.policy_driver.apply_profile(endpoint) - except ApplyProfileError as e: + except PolicyException as e: # Hit an exception applying the profile. We haven't configured # anything, so we don't need to clean anything up. Just exit. _log.error("Failed to apply profile to endpoint %s", diff --git a/calico_cni/policy_drivers.py b/calico_cni/policy_drivers.py index 8bbc91b4a..c7be6082c 100644 --- a/calico_cni/policy_drivers.py +++ b/calico_cni/policy_drivers.py @@ -23,6 +23,11 @@ from pycalico.datastore_errors import MultipleEndpointsMatch from pycalico.util import validate_characters +from pykube.config import KubeConfig +from pykube.http import HTTPClient +from pykube.objects import Pod +from pykube.query import Query + from calico_cni.constants import * import calico_cni.policy_parser @@ -78,7 +83,7 @@ def apply_profile(self, endpoint): # Check if the profile has already been applied. if self.profile_name in endpoint.profile_ids: - _log.warning("Endpoint already in profile %s", + _log.warning("Endpoint already in profile %s", self.profile_name) return @@ -131,7 +136,7 @@ class KubernetesNoPolicyDriver(DefaultPolicyDriver): """ Implements default network policy for a Kubernetes container manager. - The different between this an the DefaultPolicyDriver is that this + The different between this an the DefaultPolicyDriver is that this engine creates profiles which allow all incoming traffic. """ def generate_rules(self): @@ -156,14 +161,15 @@ class KubernetesAnnotationDriver(DefaultPolicyDriver): """ def __init__(self, pod_name, namespace, auth_token, api_root, - client_certificate, client_key, certificate_authority): + client_certificate, client_key, certificate_authority, kubeconfig): self.pod_name = pod_name - self.namespace = namespace + self.namespace = namespace self.policy_parser = calico_cni.policy_parser.PolicyParser(namespace) self.auth_token = auth_token self.client_certificate = client_certificate self.client_key = client_key self.certificate_authority = certificate_authority or False + self.kubeconfig_path = kubeconfig self.api_root = api_root self.profile_name = "%s_%s" % (namespace, pod_name) self._annotation_key = "projectcalico.org/policy" @@ -180,11 +186,11 @@ def remove_profile(self): _log.info("Deleting Calico profile: %s", self.profile_name) self._client.remove_profile(self.profile_name) except KeyError: - _log.warning("Profile %s does not exist, cannot delete", + _log.warning("Profile %s does not exist, cannot delete", self.profile_name) def generate_rules(self): - """Generates rules based on Kubernetes annotations. + """Generates rules based on Kubernetes annotations. """ # Get the pod from the API. if self.namespace != "kube-system": @@ -213,8 +219,8 @@ def generate_rules(self): except ValueError: # Invalid rule specified. _log.error("Invalid policy defined: %s", rule) - raise ApplyProfileError("Invalid policy defined", - details=rule) + raise ApplyProfileError("Invalid policy defined", + details=rule) else: # Rule was valid - append. inbound_rules.append(parsed_rule) @@ -222,7 +228,7 @@ def generate_rules(self): # Isolate on namespace boundaries by default. _log.info("No policy annotations - apply namespace isolation") inbound_rules = [Rule(action="allow", src_tag=self.ns_tag)] - + return Rules(id=self.profile_name, inbound_rules=inbound_rules, outbound_rules=outbound_rules) @@ -248,6 +254,20 @@ def _get_api_pod(self): :return: JSON object containing the pod spec """ + # If kubeconfig was specified, use the pykube library. + if self.kubeconfig_path: + _log.info("Using kubeconfig at %s", self.kubeconfig_path) + try: + api = HTTPClient(KubeConfig.from_file(self.kubeconfig_path)) + pod = Query(api, Pod, self.namespace).get_by_name(self.pod_name) + _log.info("Found pod: %s: ", pod.obj) + except Exception as e: + raise PolicyException("Error querying Kubernetes API", + details=str(e.message)) + else: + return pod.obj + + # Otherwise, use direct HTTP query to get pod. with requests.Session() as session: if self.auth_token: _log.debug('Updating header with Token %s', self.auth_token) @@ -277,7 +297,7 @@ def _get_api_pod(self): verify=self.certificate_authority) except BaseException, e: _log.exception("Exception hitting Kubernetes API") - raise ApplyProfileError("Error querying Kubernetes API", + raise ApplyProfileError("Error querying Kubernetes API", details=str(e.message)) else: # Check the HTTP response code for errors. @@ -356,19 +376,25 @@ def remove_profile(self): _log.debug("No profile to remove for pod %s", self.pod_name) -class ApplyProfileError(Exception): +class PolicyException(Exception): """ - Attempting to apply a profile to an endpoint that does not exist. + Generic base class for policy errors. """ def __init__(self, msg=None, details=None): Exception.__init__(self, msg) self.details = details +class ApplyProfileError(PolicyException): + """ + Attempting to apply a profile to an endpoint that does not exist. + """ + pass + def get_policy_driver(cni_plugin): """Returns a policy driver based on CNI configuration arguments. - :return: a policy driver + :return: a policy driver """ # Extract policy config and network name. policy_config = cni_plugin.network_config.get(POLICY_KEY, {}) @@ -397,6 +423,7 @@ def get_policy_driver(cni_plugin): client_key = policy_config.get(K8S_CLIENT_KEY_VAR) certificate_authority = policy_config.get( K8S_CERTIFICATE_AUTHORITY_VAR) + kubeconfig_path = policy_config.get("kubeconfig") if (client_key and not os.path.isfile(client_key)) or \ (client_certificate and not os.path.isfile(client_certificate)) or \ @@ -418,7 +445,8 @@ def get_policy_driver(cni_plugin): api_root, client_certificate, client_key, - certificate_authority] + certificate_authority, + kubeconfig_path] else: _log.debug("Using Kubernetes Driver - no policy") driver_cls = KubernetesNoPolicyDriver @@ -430,7 +458,7 @@ def get_policy_driver(cni_plugin): # Create an instance of the driver class. try: - _log.debug("Creating instance of %s with args %s", + _log.debug("Creating instance of %s with args %s", driver_cls, driver_args) driver = driver_cls(*driver_args) except ValueError as e: From 5ff3bbf4de9159421db21b44eb92eb9481482661 Mon Sep 17 00:00:00 2001 From: Casey Davenport Date: Fri, 20 May 2016 18:43:18 -0700 Subject: [PATCH 2/3] Read podCidr from k8s API when using host-local --- Makefile | 7 ++-- calico.py | 65 +++++++++++++++++++++++++++++++ calico_cni/constants.py | 7 +++- calico_cni/policy_drivers.py | 7 ++-- configuration.md | 3 ++ tests/unit/test_policy_drivers.py | 39 ++++++++++++++----- 6 files changed, 111 insertions(+), 17 deletions(-) diff --git a/Makefile b/Makefile index 4dbb844cb..23c8b64e9 100644 --- a/Makefile +++ b/Makefile @@ -46,13 +46,13 @@ deploy-rkt: dist/calicoctl ut: update-version docker run --rm -v `pwd`:/code \ calico/test \ - nosetests tests/unit -c nose.cfg + sh -c "pip install pykube && nosetests tests/unit -c nose.cfg" # Run the fv tests. fv: update-version docker run --rm -v `pwd`:/code \ calico/test \ - nosetests tests/fv -c nose.cfg + sh -c "pip install pykube && nosetests tests/fv -c nose.cfg" # Makes tests on Circle CI. test-circle: update-version dist/calico dist/calico-ipam @@ -63,7 +63,8 @@ test-circle: update-version dist/calico dist/calico-ipam -v $(CIRCLE_TEST_REPORTS):/circle_output \ -e COVERALLS_REPO_TOKEN=$(COVERALLS_REPO_TOKEN) \ calico/test sh -c \ - 'nosetests tests -c nose.cfg \ + 'pip install pykube && \ + nosetests tests -c nose.cfg \ --with-xunit --xunit-file=/circle_output/output.xml; RC=$$?;\ [[ ! -z "$$COVERALLS_REPO_TOKEN" ]] && coveralls || true; exit $$RC' diff --git a/calico.py b/calico.py index b4686ccd8..9aecd0874 100755 --- a/calico.py +++ b/calico.py @@ -29,6 +29,11 @@ ETCD_ENDPOINTS_ENV) from pycalico.datastore_errors import MultipleEndpointsMatch +from pykube.config import KubeConfig +from pykube.http import HTTPClient +from pykube.objects import Node +from pykube.query import Query + from calico_cni import __version__, __commit__, __branch__ from calico_cni.util import (configure_logging, parse_cni_args, print_cni_error, handle_datastore_error, CniError) @@ -143,6 +148,9 @@ def __init__(self, network_config, env): else: self.workload_id = self.container_id self.orchestrator_id = "cni" + kubernetes_config = network_config.get("kubernetes", {}) + self.kubeconfig_path = kubernetes_config.get("kubeconfig") + self.k8s_node_name = kubernetes_config.get("node_name", socket.gethostname()) """ Configure orchestrator specific settings. workload_id: In Kubernetes, this is the pod's namespace and name. @@ -441,7 +449,24 @@ def _call_ipam_plugin(self, env): "msg": e.msg, "details": e.details}) code = e.code + elif self.ipam_type == "host-local": + # We've been told to use the "host-local" IPAM plugin. + # Check if we need to use the Kubernetes podCidr for this node, and + # if so replace the subnet field with the correct value. + if self.network_config["ipam"].get("subnet") == "usePodCidr": + if not self.running_under_k8s: + print_cni_error(ERR_CODE_GENERIC, "Invalid network config", + "Must be running under Kubernetes to use \ + 'subnet: usePodCidr'") + _log.info("Using Kubernetes podCIDR for node: %s", self.k8s_node_name) + pod_cidr = self._get_kubernetes_pod_cidr() + self.network_config["ipam"]["subnet"] = str(pod_cidr) + + # Call the IPAM plugin. + _log.debug("Calling host-local IPAM plugin") + code, response = self._call_binary_ipam_plugin(env) else: + # Using some other IPAM plugin - call it. _log.debug("Using binary plugin") code, response = self._call_binary_ipam_plugin(env) @@ -449,6 +474,46 @@ def _call_ipam_plugin(self, env): _log.debug("IPAM response (rc=%s): %s", code, response) return code, response + def _get_kubernetes_pod_cidr(self): + """ + Attempt to get the Kubernetes pod CIDR for this node. + First check if we've written it to disk. If so, use that value. If + not, then query the Kubernetes API for it. + """ + _log.info("Getting node.spec.podCidr from API, kubeconfig: %s", + self.kubeconfig_path) + if not self.kubeconfig_path: + # For now, kubeconfig is the only supported auth method. + print_cni_error(ERR_CODE_GENERIC, "Missing kubeconfig", + "usePodCidr requires specification of kubeconfig file") + sys.exit(ERR_CODE_GENERIC) + + # Query the API for this node. Default node name to the hostname. + try: + api = HTTPClient(KubeConfig.from_file(self.kubeconfig_path)) + node = None + for n in Node.objects(api): + if n.obj["metadata"]["name"] == self.k8s_node_name: + _log.debug("Checking node: %s", n.obj["metadata"]["name"]) + node = n + break + if not node: + raise KeyError("Unable to find node in API: %s", self.k8s_node_name) + _log.debug("Found node %s: %s: ", node.obj["metadata"]["name"], + node.obj["spec"]) + except Exception: + print_cni_error(ERR_CODE_GENERIC, "Error querying Kubernetes API", + "Failed to get podCidr from Kubernetes API") + sys.exit(ERR_CODE_GENERIC) + else: + pod_cidr = node.obj["spec"].get("podCIDR") + if not pod_cidr: + print_cni_error(ERR_CODE_GENERIC, "Missing podCidr", + "No podCidr for node %s" % self.k8s_node_name) + sys.exit(ERR_CODE_GENERIC) + _log.debug("Using podCidr: %s", pod_cidr) + return pod_cidr + def _call_binary_ipam_plugin(self, env): """Calls through to the specified IPAM plugin binary. diff --git a/calico_cni/constants.py b/calico_cni/constants.py index 29dd1ec72..49ef03d26 100644 --- a/calico_cni/constants.py +++ b/calico_cni/constants.py @@ -15,8 +15,11 @@ import re import socket +# The hostname of the current node. +HOSTNAME = socket.gethostname() + # Regex to parse CNI_ARGS. Looks for key value pairs separated by an equals -# sign and followed either the end of the string, or a colon (indicating +# sign and followed either the end of the string, or a colon (indicating # that there is another CNI_ARG key/value pair. CNI_ARGS_RE = re.compile("([a-zA-Z0-9/\.\-\_ ]+)=([a-zA-Z0-9/\.\-\_ ]+)(?:;|$)") @@ -54,7 +57,7 @@ ASSIGN_IPV4_KEY = "assign_ipv4" ASSIGN_IPV6_KEY = "assign_ipv6" -# Constants for getting policy specific information +# Constants for getting policy specific information # from the policy dictionary in the network config file. API_ROOT_KEY = "k8s_api_root" AUTH_TOKEN_KEY = "k8s_auth_token" diff --git a/calico_cni/policy_drivers.py b/calico_cni/policy_drivers.py index c7be6082c..3c0790a64 100644 --- a/calico_cni/policy_drivers.py +++ b/calico_cni/policy_drivers.py @@ -252,7 +252,7 @@ def generate_tags(self): def _get_api_pod(self): """Get the pod resource from the API. - :return: JSON object containing the pod spec + :return: Dictionary representation of Pod from k8s API. """ # If kubeconfig was specified, use the pykube library. if self.kubeconfig_path: @@ -260,7 +260,7 @@ def _get_api_pod(self): try: api = HTTPClient(KubeConfig.from_file(self.kubeconfig_path)) pod = Query(api, Pod, self.namespace).get_by_name(self.pod_name) - _log.info("Found pod: %s: ", pod.obj) + _log.debug("Found pod: %s: ", pod.obj) except Exception as e: raise PolicyException("Error querying Kubernetes API", details=str(e.message)) @@ -400,6 +400,7 @@ def get_policy_driver(cni_plugin): policy_config = cni_plugin.network_config.get(POLICY_KEY, {}) network_name = cni_plugin.network_config["name"] policy_type = policy_config.get("type") + k8s_config = cni_plugin.network_config.get("kubernetes", {}) supported_policy_types = [None, POLICY_MODE_KUBERNETES, POLICY_MODE_KUBERNETES_ANNOTATIONS] @@ -423,7 +424,7 @@ def get_policy_driver(cni_plugin): client_key = policy_config.get(K8S_CLIENT_KEY_VAR) certificate_authority = policy_config.get( K8S_CERTIFICATE_AUTHORITY_VAR) - kubeconfig_path = policy_config.get("kubeconfig") + kubeconfig_path = k8s_config.get("kubeconfig") if (client_key and not os.path.isfile(client_key)) or \ (client_certificate and not os.path.isfile(client_certificate)) or \ diff --git a/configuration.md b/configuration.md index b2e0d0635..b143ae9a5 100644 --- a/configuration.md +++ b/configuration.md @@ -44,6 +44,8 @@ When using Calico IPAM, the following flags determine what IP addresses should b A specific IP address can be chosen by using [`CNI_ARGS`](https://github.com/appc/cni/blob/master/SPEC.md#parameters) and setting `IP` to the desired value. +When using the CNI `host-local` IPAM plugin, a special value `usePodCidr` is allowed for the subnet field. This tells the plugin to determine the subnet to use from the Kubernetes API based on the Node.podCIDR field. This is currently only supported when using `kubeconfig` for accessing the API. + ## Kubernetes specific When using the Calico CNI plugin with Kubernetes, an additional config block can be specified to control how network policy is configured. The required config block is `policy`. See the [Calico Kubernetes documentation](https://github.com/projectcalico/calico-containers/tree/master/docs/cni/kubernetes) for more information. @@ -63,6 +65,7 @@ The CNI plugin may need to authenticate with the Kubernetes API server. The foll * `k8s_client_key` * `k8s_certificate_authority` * Verifying the API certificate against a CA only works if connecting to the API server using a hostname. +* `kubeconfig` [![Analytics](https://calico-ga-beacon.appspot.com/UA-52125893-3/calico-cni/configuration.md?pixel)](https://github.com/igrigorik/ga-beacon) diff --git a/tests/unit/test_policy_drivers.py b/tests/unit/test_policy_drivers.py index da1fab7b4..219f0f709 100644 --- a/tests/unit/test_policy_drivers.py +++ b/tests/unit/test_policy_drivers.py @@ -135,7 +135,7 @@ def setUp(self): "policy": {} } self.driver = KubernetesAnnotationDriver(self.pod_name, self.namespace, - self.auth_token, self.api_root, None, None, None) + self.auth_token, self.api_root, None, None, None, None) assert_equal(self.driver.profile_name, self.profile_name) # Mock the DatastoreClient @@ -161,7 +161,7 @@ def test_generate_rules_kube_system(self, m_get_pod): # Generate rules rules = self.driver.generate_rules() - # Assert correct. Should allow all. + # Assert correct. Should allow all. expected = Rules(id=self.profile_name, inbound_rules=[Rule(action="allow")], outbound_rules=[Rule(action="allow")]) @@ -187,7 +187,7 @@ def test_generate_rules_annotations(self, m_get_pod): # Generate rules rules = self.driver.generate_rules() - # Assert correct. Should allow all. + # Assert correct. Should allow all. expected = Rules(id=self.profile_name, inbound_rules=[Rule(action="allow", protocol="tcp")], outbound_rules=[Rule(action="allow")]) @@ -202,7 +202,7 @@ def test_generate_rules_parse_error(self, m_get_pod): # Mock to raise error self.driver.policy_parser = MagicMock(spec=self.driver.policy_parser) - self.driver.policy_parser.parse_line.side_effect = ValueError + self.driver.policy_parser.parse_line.side_effect = ValueError # Generate rules assert_raises(ApplyProfileError, self.driver.generate_rules) @@ -211,7 +211,7 @@ def test_generate_tags(self): # Mock get_metadata to return labels. labels = {"key": "value"} self.driver._get_metadata = MagicMock(spec=self.driver._get_metadata) - self.driver._get_metadata.return_value = labels + self.driver._get_metadata.return_value = labels # Call tags = self.driver.generate_tags() @@ -257,6 +257,28 @@ def test_get_api_pod(self, m_json_load, m_session): verify=False) m_json_load.assert_called_once_with(pod1) + @patch("calico_cni.policy_drivers.HTTPClient", autospec=True) + @patch("calico_cni.policy_drivers.Query", autospec=True) + @patch("calico_cni.policy_drivers.KubeConfig", autospec=True) + def test_get_api_pod_kubeconfig(self, m_kcfg, m_query, m_http): + # Set up driver. + self.driver.pod_name = 'pod-1' + self.driver.namespace = 'a' + + pod = Mock() + pod.obj = '{"metadata": {"namespace": "a", "name": "pod-1"}}' + m_query(1, 2, 3).get_by_name.return_value = pod + + api_root = "http://kubernetesapi:8080/api/v1/" + self.driver.api_root = api_root + self.driver.kubeconfig_path = "/path/to/kubeconfig" + + # Call method under test + p = self.driver._get_api_pod() + + # Assert + assert_equal(p, pod.obj) + @patch('calico_cni.policy_drivers.requests.Session', autospec=True) @patch('json.loads', autospec=True) def test_get_api_pod_with_client_certs(self, m_json_load, m_session): @@ -271,7 +293,6 @@ def test_get_api_pod_with_client_certs(self, m_json_load, m_session): self.driver.client_key = "key.pem" self.driver.certificate_authority = "ca.pem" - get_obj = Mock() get_obj.status_code = 200 get_obj.text = pod1 @@ -353,7 +374,7 @@ def test_get_api_pod_parse_error(self, m_json_load, m_session): # Set up mock objects self.driver.auth_token = 'TOKEN' - m_json_load.side_effect = TypeError + m_json_load.side_effect = TypeError get_obj = Mock() get_obj.status_code = 200 @@ -388,7 +409,7 @@ def setUp(self): self.network_name = "net-name" self.namespace = "default" self.driver = KubernetesPolicyDriver(self.network_name, self.namespace, - None, None, None, None, None) + None, None, None, None, None, None) # Mock the DatastoreClient self.client = MagicMock(spec=DatastoreClient) @@ -417,7 +438,7 @@ def test_remove_profile(self): self.driver.remove_profile() -class GetPolicyDriverTest(unittest.TestCase): +class GetPolicyDriverTest(unittest.TestCase): def test_get_policy_driver_default_k8s(self): cni_plugin = Mock(spec=CniPlugin) From d5da9da69a2350e96cc227961050e97cea34cee5 Mon Sep 17 00:00:00 2001 From: Casey Davenport Date: Mon, 23 May 2016 13:16:28 -0700 Subject: [PATCH 3/3] 100% code coverage of kubeconfig code. --- calico.py | 7 +- calico_cni/constants.py | 3 - configuration.md | 1 + tests/unit/test_cni_plugin.py | 209 +++++++++++++++++++++++------- tests/unit/test_policy_drivers.py | 21 +++ 5 files changed, 189 insertions(+), 52 deletions(-) diff --git a/calico.py b/calico.py index 9aecd0874..d1794cdae 100755 --- a/calico.py +++ b/calico.py @@ -32,7 +32,6 @@ from pykube.config import KubeConfig from pykube.http import HTTPClient from pykube.objects import Node -from pykube.query import Query from calico_cni import __version__, __commit__, __branch__ from calico_cni.util import (configure_logging, parse_cni_args, print_cni_error, @@ -456,8 +455,8 @@ def _call_ipam_plugin(self, env): if self.network_config["ipam"].get("subnet") == "usePodCidr": if not self.running_under_k8s: print_cni_error(ERR_CODE_GENERIC, "Invalid network config", - "Must be running under Kubernetes to use \ - 'subnet: usePodCidr'") + "Must be running under Kubernetes to use 'subnet: usePodCidr'") + sys.exit(ERR_CODE_GENERIC) _log.info("Using Kubernetes podCIDR for node: %s", self.k8s_node_name) pod_cidr = self._get_kubernetes_pod_cidr() self.network_config["ipam"]["subnet"] = str(pod_cidr) @@ -493,8 +492,8 @@ def _get_kubernetes_pod_cidr(self): api = HTTPClient(KubeConfig.from_file(self.kubeconfig_path)) node = None for n in Node.objects(api): + _log.debug("Checking node: %s", n.obj["metadata"]["name"]) if n.obj["metadata"]["name"] == self.k8s_node_name: - _log.debug("Checking node: %s", n.obj["metadata"]["name"]) node = n break if not node: diff --git a/calico_cni/constants.py b/calico_cni/constants.py index 49ef03d26..6f926650d 100644 --- a/calico_cni/constants.py +++ b/calico_cni/constants.py @@ -15,9 +15,6 @@ import re import socket -# The hostname of the current node. -HOSTNAME = socket.gethostname() - # Regex to parse CNI_ARGS. Looks for key value pairs separated by an equals # sign and followed either the end of the string, or a colon (indicating # that there is another CNI_ARG key/value pair. diff --git a/configuration.md b/configuration.md index b143ae9a5..766c87daa 100644 --- a/configuration.md +++ b/configuration.md @@ -66,6 +66,7 @@ The CNI plugin may need to authenticate with the Kubernetes API server. The foll * `k8s_certificate_authority` * Verifying the API certificate against a CA only works if connecting to the API server using a hostname. * `kubeconfig` + * Path to a Kubernetes `kubeconfig` file. [![Analytics](https://calico-ga-beacon.appspot.com/UA-52125893-3/calico-cni/configuration.md?pixel)](https://github.com/igrigorik/ga-beacon) diff --git a/tests/unit/test_cni_plugin.py b/tests/unit/test_cni_plugin.py index dda82e112..bab80cdaf 100644 --- a/tests/unit/test_cni_plugin.py +++ b/tests/unit/test_cni_plugin.py @@ -26,7 +26,7 @@ from calico_cni.constants import * from calico_cni.policy_drivers import (DefaultPolicyDriver, ApplyProfileError) -from calico_cni.container_engines import DockerEngine +from calico_cni.container_engines import DockerEngine from calico_cni.util import CniError from calico import main, CniPlugin @@ -58,7 +58,7 @@ def setUp(self): CNI_CONTAINERID_ENV: self.container_id, CNI_IFNAME_ENV: "eth0", CNI_ARGS_ENV: "IP=1.2.3.4", - CNI_COMMAND_ENV: CNI_CMD_ADD, + CNI_COMMAND_ENV: CNI_CMD_ADD, CNI_PATH_ENV: "/usr/bin/rkt/", CNI_NETNS_ENV: "netns", } @@ -66,7 +66,7 @@ def setUp(self): # Create the CniPlugin to test. self.plugin = CniPlugin(self.network_config, self.env) - # Mock out policy driver. + # Mock out policy driver. self.plugin.policy_driver = MagicMock(spec=DefaultPolicyDriver) # Mock out the datastore client. @@ -98,7 +98,7 @@ def test_execute_del_mainline(self): self.plugin.delete.assert_called_once_with() @patch("calico.json", autospec=True) - def test_add_mainline(self, m_json): + def test_add_mainline(self, m_json): # Mock out _assign_ips. ip4 = IPNetwork("10.0.0.1/32") ip6 = IPNetwork("0:0:0:0:0:ffff:a00:1/128") @@ -112,7 +112,7 @@ def test_add_mainline(self, m_json): self.plugin._create_endpoint = MagicMock(spec=self.plugin._create_endpoint) self.plugin._create_endpoint.return_value = endpoint - # Mock out _provision_veth. + # Mock out _provision_veth. self.plugin._provision_veth = MagicMock(spec=self.plugin._provision_veth) self.plugin._provision_veth.return_value = endpoint @@ -131,7 +131,7 @@ def test_add_mainline(self, m_json): m_json.dumps.assert_called_once_with(ipam_response) @patch("calico.json", autospec=True) - def test_add_host_networking(self, m_json): + def test_add_host_networking(self, m_json): # Mock out. self.plugin.container_engine.uses_host_networking = MagicMock(return_value=True) @@ -139,9 +139,9 @@ def test_add_host_networking(self, m_json): assert_raises(SystemExit, self.plugin.add) @patch("calico.json", autospec=True) - def test_add_exists_new_network(self, m_json): + def test_add_exists_new_network(self, m_json): """ - Test add when the endpoint already exists, adding to a new + Test add when the endpoint already exists, adding to a new network. """ # Mock out methods that should not be called. @@ -172,7 +172,7 @@ def test_add_exists_new_network(self, m_json): m_json.dumps.assert_called_once_with(expected) @patch("calico.json", autospec=True) - def test_add_profile_error(self, m_json): + def test_add_profile_error(self, m_json): """ Test add when the endpoint does not exist, error applying profile. """ @@ -197,26 +197,26 @@ def test_add_profile_error(self, m_json): self.plugin._create_endpoint = MagicMock(spec=self.plugin._create_endpoint) self.plugin._create_endpoint.return_value = endpoint - # Mock out _provision_veth. + # Mock out _provision_veth. self.plugin._provision_veth = MagicMock(spec=self.plugin._provision_veth) self.plugin._provision_veth.return_value = endpoint # Mock out apply_profile to throw error. msg = "Apply Profile Error Message" error = ApplyProfileError(msg) - self.plugin.policy_driver.apply_profile.side_effect = error + self.plugin.policy_driver.apply_profile.side_effect = error # Mock out _get_endpoint - endpoint exists. self.plugin._get_endpoint = MagicMock(spec=self.plugin._get_endpoint) - self.plugin._get_endpoint.return_value = None + self.plugin._get_endpoint.return_value = None # Call method. assert_raises(SystemExit, self.plugin.add) @patch("calico.json", autospec=True) - def test_add_exists_new_network_profile_error(self, m_json): + def test_add_exists_new_network_profile_error(self, m_json): """ - Test add when the endpoint already exists, adding to a new + Test add when the endpoint already exists, adding to a new network, error applying profile. """ # Mock out apply_profile to throw error. @@ -236,10 +236,10 @@ def test_add_exists_new_network_profile_error(self, m_json): assert_raises(SystemExit, self.plugin.add) @patch("calico.json", autospec=True) - def test_add_exists_no_ips(self, m_json): + def test_add_exists_no_ips(self, m_json): """ Tests add to new network when endpoint exists, - no IP addresses are assigned. + no IP addresses are assigned. """ # Mock out _get_endpoint - endpoint exists. endpoint = MagicMock(spec=Endpoint) @@ -289,7 +289,7 @@ def test_delete_no_endpoint(self, m_netns): # Mock out _get_endpoint. self.plugin._get_endpoint = MagicMock(spec=self.plugin._get_endpoint) - self.plugin._get_endpoint.return_value = None + self.plugin._get_endpoint.return_value = None # Call delete() assert_raises(SystemExit, self.plugin.delete) @@ -321,7 +321,7 @@ def test_assign_ip_mainline(self): def test_assign_ip_invalid_response(self): # Mock _call_ipam_plugin. rc = 1 - ipam_result = "Invalid json" + ipam_result = "Invalid json" self.plugin._call_ipam_plugin = MagicMock(spec=self.plugin._call_ipam_plugin) self.plugin._call_ipam_plugin.return_value = rc, ipam_result env = {CNI_COMMAND_ENV: CNI_CMD_ADD} @@ -337,8 +337,8 @@ def test_assign_ip_bad_rc(self): rc = ERR_CODE_GENERIC msg = "Message" details = "Details" - ipam_result = json.dumps({"code": rc, - "msg": msg, + ipam_result = json.dumps({"code": rc, + "msg": msg, "details": details}) self.plugin._call_ipam_plugin = MagicMock(spec=self.plugin._call_ipam_plugin) self.plugin._call_ipam_plugin.return_value = rc, ipam_result @@ -403,6 +403,7 @@ def test_release_ip_failed(self): # Call _release_ip. self.plugin._release_ip(env) + @patch("calico.IpamPlugin", autospec=True) def test_call_ipam_plugin_calico_mainline(self, m_ipam_plugin): # Mock _find_ipam_plugin. @@ -437,7 +438,7 @@ def test_call_ipam_plugin_calico_error(self, m_ipam_plugin): # Mock out return values. env = {} err = CniError(150, "message", "details") - m_ipam_plugin(env, self.network_config).execute.side_effect = err + m_ipam_plugin(env, self.network_config).execute.side_effect = err # Set IPAM type. self.plugin.ipam_type = "calico-ipam" @@ -450,6 +451,31 @@ def test_call_ipam_plugin_calico_error(self, m_ipam_plugin): assert_equal(rc, 150) assert_equal(result, expected) + @patch("calico.CniPlugin._call_binary_ipam_plugin", autospec=True) + def test_call_ipam_plugin_host_local_podcidr(self, m_call_bin): + # Mock _find_ipam_plugin. + plugin_path = "/opt/bin/cni/host-local" + self.plugin._find_ipam_plugin = MagicMock(spec=self.plugin._find_ipam_plugin) + self.plugin._find_ipam_plugin.return_value = plugin_path + + # Mock out return values. + ip4 = "10.0.0.1/32" + ip6 = "0:0:0:0:0:ffff:a00:1" + env = {} + out = json.dumps({"ip4": {"ip": ip4}, "ip6": {"ip": ip6}}) + m_call_bin.return_value = 0, out + + # Set IPAM type. + self.plugin.ipam_type = "host-local" + self.plugin.network_config["ipam"]["subnet"] = "usePodCidr" + self.plugin.network_config["kubernetes"] = {"kubeconfig": "/path/to/kubeconfig"} + + # This is not a valid configuration when not running under Kubernetes. + with assert_raises(SystemExit) as err: + self.plugin._call_ipam_plugin(env) + e = err.exception + assert_equal(e.code, ERR_CODE_GENERIC) + @patch("calico.Popen", autospec=True) def test_call_ipam_plugin_binary_mainline(self, m_popen): # Mock _find_ipam_plugin. @@ -476,9 +502,9 @@ def test_call_ipam_plugin_binary_mainline(self, m_popen): # Assert. assert_equal(rc, 0) - m_popen.assert_called_once_with(plugin_path, - stdin=PIPE, - stdout=PIPE, + m_popen.assert_called_once_with(plugin_path, + stdin=PIPE, + stdout=PIPE, stderr=PIPE, env=env) m_proc.communicate.assert_called_once_with(json.dumps(self.plugin.network_config)) @@ -491,10 +517,10 @@ def test_call_ipam_plugin_binary_missing(self, m_popen): """ # Mock _find_ipam_plugin. self.plugin._find_ipam_plugin = MagicMock(spec=self.plugin._find_ipam_plugin) - self.plugin._find_ipam_plugin.return_value = None + self.plugin._find_ipam_plugin.return_value = None env = {} - # Set IPAM type. + # Set IPAM type. self.plugin.ipam_type = "not-calico" # Call method. @@ -514,7 +540,7 @@ def test_create_endpoint_mainline(self): ep = self.plugin._create_endpoint(ip_list) # Assert. - self.plugin._client.create_endpoint.assert_called_once_with(ANY, + self.plugin._client.create_endpoint.assert_called_once_with(ANY, self.expected_orch_id, self.expected_workload_id, ip_list) assert_equal(ep, endpoint) @@ -541,14 +567,14 @@ def test_remove_workload_mainline(self): # Assert self.plugin._client.remove_workload.assert_called_once_with(hostname=ANY, - workload_id=self.expected_workload_id, + workload_id=self.expected_workload_id, orchestrator_id=self.expected_orch_id) def test_remove_workload_does_not_exist(self): """ Make sure we handle this case gracefully - no exception raised. """ - self.plugin._client.remove_workload.side_effect = KeyError + self.plugin._client.remove_workload.side_effect = KeyError self.plugin._remove_workload() @patch("calico.os", autospec=True) @@ -631,8 +657,8 @@ def test_get_endpoint_mainline(self): # Assert assert_equal(ep, endpoint) - self.plugin._client.get_endpoint.assert_called_once_with(hostname=ANY, - orchestrator_id=self.expected_orch_id, + self.plugin._client.get_endpoint.assert_called_once_with(hostname=ANY, + orchestrator_id=self.expected_orch_id, workload_id=self.expected_workload_id) def test_get_endpoint_no_endpoint(self): @@ -644,15 +670,15 @@ def test_get_endpoint_no_endpoint(self): # Assert assert_equal(ep, None) - calls = [call(hostname=ANY, orchestrator_id=self.expected_orch_id, + calls = [call(hostname=ANY, orchestrator_id=self.expected_orch_id, workload_id=self.expected_workload_id), - call(hostname=ANY, orchestrator_id="cni", + call(hostname=ANY, orchestrator_id="cni", workload_id=self.container_id)] self.plugin._client.get_endpoint.assert_has_calls(calls) def test_get_endpoint_multiple_endpoints(self): # Mock - self.plugin._client.get_endpoint.side_effect = MultipleEndpointsMatch + self.plugin._client.get_endpoint.side_effect = MultipleEndpointsMatch # Call with assert_raises(SystemExit) as err: @@ -661,8 +687,8 @@ def test_get_endpoint_multiple_endpoints(self): assert_equal(e.code, ERR_CODE_GENERIC) # Assert - self.plugin._client.get_endpoint.assert_called_once_with(hostname=ANY, - orchestrator_id=self.expected_orch_id, + self.plugin._client.get_endpoint.assert_called_once_with(hostname=ANY, + orchestrator_id=self.expected_orch_id, workload_id=self.expected_workload_id) def test_remove_stale_endpoint(self): @@ -757,7 +783,7 @@ def test_main_unhandled_exception(self, m_conf_log, m_plugin, m_sys, m_os): class CniPluginKubernetesTest(CniPluginTest): """ - Test class for CniPlugin class when running under Kubernetes. Runs all + Test class for CniPlugin class when running under Kubernetes. Runs all of the CniPluginTest cases with Kubernetes specific parameters specified. """ def setUp(self): @@ -772,7 +798,7 @@ def setUp(self): CNI_CONTAINERID_ENV: self.container_id, CNI_IFNAME_ENV: "eth0", CNI_ARGS_ENV: "K8S_POD_NAME=testpod;K8S_POD_NAMESPACE=k8sns", - CNI_COMMAND_ENV: CNI_CMD_ADD, + CNI_COMMAND_ENV: CNI_CMD_ADD, CNI_PATH_ENV: "/opt/cni/bin", CNI_NETNS_ENV: "netns", } @@ -781,10 +807,10 @@ def setUp(self): # config. self.plugin = CniPlugin(self.network_config, self.env) - # Mock out policy driver. + # Mock out policy driver. self.plugin.policy_driver = MagicMock(spec=DefaultPolicyDriver) - # Mock out container engine + # Mock out container engine self.plugin.container_engine = MagicMock(spec=DockerEngine) self.plugin.container_engine.uses_host_networking.return_value = False @@ -794,11 +820,11 @@ def setUp(self): # Set the expected values. self.expected_orch_id = "k8s" - self.expected_workload_id = "k8sns.testpod" + self.expected_workload_id = "k8sns.testpod" @patch("calico.json", autospec=True) @patch("calico.IpamPlugin", autospec=True) - def test_add_exists_no_ips(self, m_ipam, m_json): + def test_add_exists_no_ips(self, m_ipam, m_json): """ In k8s, if an endpoint exists already, we must clean it up. """ @@ -830,17 +856,110 @@ def test_add_exists_no_ips(self, m_ipam, m_json): # Assert we clean up policy. self.plugin.policy_driver.remove_profile.assert_called_once_with() - + # Assert we add a new endpoint. self.plugin._add_new_endpoint.assert_called_once_with() @patch("calico.json", autospec=True) - def test_add_exists_new_network(self, m_json): + def test_add_exists_new_network(self, m_json): """ In k8s, we never add a new network to an existing endpoint. """ pass @patch("calico.json", autospec=True) - def test_add_exists_new_network_profile_error(self, m_json): + def test_add_exists_new_network_profile_error(self, m_json): pass + + @patch("calico.CniPlugin._call_binary_ipam_plugin", autospec=True) + @patch("calico.HTTPClient", autospec=True) + @patch("calico.Node", autospec=True) + @patch("calico.KubeConfig", autospec=True) + def test_call_ipam_plugin_host_local_podcidr(self, m_kcfg, m_node, m_http, m_call_bin): + # Mock _find_ipam_plugin. + plugin_path = "/opt/bin/cni/host-local" + self.plugin._find_ipam_plugin = MagicMock(spec=self.plugin._find_ipam_plugin) + self.plugin._find_ipam_plugin.return_value = plugin_path + + # Mock out return values. + ip4 = "10.0.0.1/32" + ip6 = "0:0:0:0:0:ffff:a00:1" + env = {} + out = json.dumps({"ip4": {"ip": ip4}, "ip6": {"ip": ip6}}) + m_call_bin.return_value = 0, out + + # Set IPAM type. + self.plugin.ipam_type = "host-local" + self.plugin.network_config["ipam"]["subnet"] = "usePodCidr" + self.plugin.kubeconfig_path = "/path/to/kubeconfig" + self.plugin.k8s_node_name = "nodename" + + # Setup response. + node = MagicMock(obj={"metadata": {"name": "nodename"}, "spec":{"podCIDR": "1.2.3.4"}}) + nodes = [node] + m_node.objects.return_value = nodes + + # Call _call_ipam_plugin. + rc, result = self.plugin._call_ipam_plugin(env) + + # Assert. + assert_equal(rc, 0) + assert_equal(result, out) + + + @patch("calico.CniPlugin._call_binary_ipam_plugin", autospec=True) + @patch("calico.HTTPClient", autospec=True) + @patch("calico.Node", autospec=True) + @patch("calico.KubeConfig", autospec=True) + def test_call_ipam_plugin_host_local_podcidr_no_podcidr(self, m_kcfg, m_node, m_http, m_call_bin): + # Mock _find_ipam_plugin. + plugin_path = "/opt/bin/cni/host-local" + self.plugin._find_ipam_plugin = MagicMock(spec=self.plugin._find_ipam_plugin) + self.plugin._find_ipam_plugin.return_value = plugin_path + + # Mock out return values. + ip4 = "10.0.0.1/32" + ip6 = "0:0:0:0:0:ffff:a00:1" + env = {} + out = json.dumps({"ip4": {"ip": ip4}, "ip6": {"ip": ip6}}) + m_call_bin.return_value = 0, out + + # Set IPAM type. + self.plugin.ipam_type = "host-local" + self.plugin.network_config["ipam"]["subnet"] = "usePodCidr" + self.plugin.kubeconfig_path = "/path/to/kubeconfig" + self.plugin.k8s_node_name = "nodename" + + # Setup response. + node = MagicMock(obj={"metadata": {"name": "nodename"}, "spec":{"podCIDR": ""}}) + nodes = [node] + m_node.objects.return_value = nodes + + with assert_raises(SystemExit) as err: + self.plugin._call_ipam_plugin(env) + e = err.exception + assert_equal(e.code, ERR_CODE_GENERIC) + + def test_get_pod_cidr_no_kcfg(self): + with assert_raises(SystemExit) as err: + self.plugin._get_kubernetes_pod_cidr() + e = err.exception + assert_equal(e.code, ERR_CODE_GENERIC) + + @patch("calico.HTTPClient", autospec=True) + @patch("calico.Node", autospec=True) + @patch("calico.KubeConfig", autospec=True) + def test_get_pod_cidr_no_node_in_api(self, m_kcfg, m_node, m_http): + # Set IPAM type. + self.plugin.ipam_type = "host-local" + self.plugin.network_config["ipam"]["subnet"] = "usePodCidr" + self.plugin.kubeconfig_path = "/path/to/kubeconfig" + self.plugin.k8s_node_name = "nodename" + + # Setup response. + m_node.objects.return_value = [] + + with assert_raises(SystemExit) as err: + self.plugin._get_kubernetes_pod_cidr() + e = err.exception + assert_equal(e.code, ERR_CODE_GENERIC) diff --git a/tests/unit/test_policy_drivers.py b/tests/unit/test_policy_drivers.py index 219f0f709..57c53d6ae 100644 --- a/tests/unit/test_policy_drivers.py +++ b/tests/unit/test_policy_drivers.py @@ -24,6 +24,7 @@ from calico_cni.constants import ERR_CODE_GENERIC from calico_cni.policy_drivers import (ApplyProfileError, get_policy_driver, + PolicyException, DefaultPolicyDriver, KubernetesNoPolicyDriver, KubernetesAnnotationDriver, @@ -279,6 +280,26 @@ def test_get_api_pod_kubeconfig(self, m_kcfg, m_query, m_http): # Assert assert_equal(p, pod.obj) + @patch("calico_cni.policy_drivers.HTTPClient", autospec=True) + @patch("calico_cni.policy_drivers.Query", autospec=True) + @patch("calico_cni.policy_drivers.KubeConfig", autospec=True) + def test_get_api_pod_kubeconfig_error(self, m_kcfg, m_query, m_http): + # Set up driver. + self.driver.pod_name = 'pod-1' + self.driver.namespace = 'a' + + pod = Mock() + pod.obj = '{"metadata": {"namespace": "a", "name": "pod-1"}}' + m_query(1, 2, 3).get_by_name.side_effect = KeyError + + api_root = "http://kubernetesapi:8080/api/v1/" + self.driver.api_root = api_root + self.driver.kubeconfig_path = "/path/to/kubeconfig" + + # Call method under test + with assert_raises(PolicyException) as err: + self.driver._get_api_pod() + @patch('calico_cni.policy_drivers.requests.Session', autospec=True) @patch('json.loads', autospec=True) def test_get_api_pod_with_client_certs(self, m_json_load, m_session):