From ba2217bdb98c5de979a29463ed0e085038627ccf Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Thu, 9 Jan 2020 04:38:09 +0000 Subject: [PATCH] [AIRFLOW-6516] BugFix: airflow.cfg does not exist in Volume Mounts --- .../kubernetes/worker_configuration.py | 43 ++-- .../executors/test_kubernetes_executor.py | 185 +++++++++++++++++- 2 files changed, 211 insertions(+), 17 deletions(-) diff --git a/airflow/contrib/kubernetes/worker_configuration.py b/airflow/contrib/kubernetes/worker_configuration.py index f2740f194717b..337d92854ff0a 100644 --- a/airflow/contrib/kubernetes/worker_configuration.py +++ b/airflow/contrib/kubernetes/worker_configuration.py @@ -311,6 +311,33 @@ def _construct_volume(name, claim, host): 'mode': 0o440 } + if self.kube_config.airflow_local_settings_configmap: + config_path = '{}/config/airflow_local_settings.py'.format(self.worker_airflow_home) + + if self.kube_config.airflow_local_settings_configmap != self.kube_config.airflow_configmap: + config_volume_name = 'airflow-local-settings' + volumes[config_volume_name] = { + 'name': config_volume_name, + 'configMap': { + 'name': self.kube_config.airflow_local_settings_configmap + } + } + + volume_mounts[config_volume_name] = { + 'name': config_volume_name, + 'mountPath': config_path, + 'subPath': 'airflow_local_settings.py', + 'readOnly': True + } + + else: + volume_mounts['airflow-local-settings'] = { + 'name': 'airflow-config', + 'mountPath': config_path, + 'subPath': 'airflow_local_settings.py', + 'readOnly': True + } + # Mount the airflow.cfg file via a configmap the user has specified if self.kube_config.airflow_configmap: config_volume_name = 'airflow-config' @@ -328,22 +355,6 @@ def _construct_volume(name, claim, host): 'readOnly': True } - if self.kube_config.airflow_local_settings_configmap: - config_volume_name = 'airflow-config' - config_path = '{}/config/airflow_local_settings.py'.format(self.worker_airflow_home) - volumes[config_volume_name] = { - 'name': config_volume_name, - 'configMap': { - 'name': self.kube_config.airflow_local_settings_configmap - } - } - volume_mounts[config_volume_name] = { - 'name': config_volume_name, - 'mountPath': config_path, - 'subPath': 'airflow_local_settings.py', - 'readOnly': True - } - return volumes, volume_mounts def generate_dag_volume_mount_path(self): diff --git a/tests/contrib/executors/test_kubernetes_executor.py b/tests/contrib/executors/test_kubernetes_executor.py index 64fc6d08414c5..bbcad9a2ba4c9 100644 --- a/tests/contrib/executors/test_kubernetes_executor.py +++ b/tests/contrib/executors/test_kubernetes_executor.py @@ -183,13 +183,16 @@ def setUp(self): self.kube_config.git_dags_folder_mount_point = None self.kube_config.kube_labels = {'dag_id': 'original_dag_id', 'my_label': 'label_id'} + def tearDown(self): + self.kube_config = None + def test_worker_configuration_no_subpaths(self): worker_config = WorkerConfiguration(self.kube_config) volumes, volume_mounts = worker_config._get_volumes_and_mounts() volumes_list = [value for value in volumes.values()] volume_mounts_list = [value for value in volume_mounts.values()] for volume_or_mount in volumes_list + volume_mounts_list: - if volume_or_mount['name'] != 'airflow-config': + if volume_or_mount['name'] not in ['airflow-config', 'airflow-local-settings']: self.assertNotIn( 'subPath', volume_or_mount, "subPath shouldn't be defined" @@ -635,6 +638,186 @@ def test_worker_container_dags(self): self.assertEqual(0, len(dag_volume_mount)) self.assertEqual(0, len(init_containers)) + def test_set_airflow_config_configmap(self): + """ + Test that airflow.cfg can be set via configmap by + checking volume & volume-mounts are set correctly. + """ + self.kube_config.airflow_home = '/usr/local/airflow' + self.kube_config.airflow_configmap = 'airflow-configmap' + self.kube_config.airflow_local_settings_configmap = None + self.kube_config.dags_folder = '/workers/path/to/dags' + + worker_config = WorkerConfiguration(self.kube_config) + kube_executor_config = KubernetesExecutorConfig(annotations=[], + volumes=[], + volume_mounts=[]) + + pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id", + "test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'", + kube_executor_config) + + airflow_config_volume = [ + volume for volume in pod.volumes if volume["name"] == 'airflow-config' + ] + # Test that volume_name is found + self.assertEqual(1, len(airflow_config_volume)) + + # Test that config map exists + self.assertEqual( + {'configMap': {'name': 'airflow-configmap'}, 'name': 'airflow-config'}, + airflow_config_volume[0] + ) + + # Test that only 1 Volume Mounts exists with 'airflow-config' name + # One for airflow.cfg + volume_mounts = [ + volume_mount for volume_mount in pod.volume_mounts + if volume_mount['name'] == 'airflow-config' + ] + + self.assertEqual([ + { + 'mountPath': '/usr/local/airflow/airflow.cfg', + 'name': 'airflow-config', + 'readOnly': True, + 'subPath': 'airflow.cfg', + } + ], + volume_mounts + ) + + def test_set_airflow_local_settings_configmap(self): + """ + Test that airflow_local_settings.py can be set via configmap by + checking volume & volume-mounts are set correctly. + """ + self.kube_config.airflow_home = '/usr/local/airflow' + self.kube_config.airflow_configmap = 'airflow-configmap' + self.kube_config.airflow_local_settings_configmap = 'airflow-configmap' + self.kube_config.dags_folder = '/workers/path/to/dags' + + worker_config = WorkerConfiguration(self.kube_config) + kube_executor_config = KubernetesExecutorConfig(annotations=[], + volumes=[], + volume_mounts=[]) + + pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id", + "test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'", + kube_executor_config) + + airflow_config_volume = [ + volume for volume in pod.volumes if volume["name"] == 'airflow-config' + ] + # Test that volume_name is found + self.assertEqual(1, len(airflow_config_volume)) + + # Test that config map exists + self.assertEqual( + {'configMap': {'name': 'airflow-configmap'}, 'name': 'airflow-config'}, + airflow_config_volume[0] + ) + + # Test that 2 Volume Mounts exists and has 2 different mount-paths + # One for airflow.cfg + # Second for airflow_local_settings.py + volume_mounts = [ + volume_mount for volume_mount in pod.volume_mounts + if volume_mount['name'] == 'airflow-config' + ] + self.assertEqual(2, len(volume_mounts)) + + self.assertCountEqual( + [ + { + 'mountPath': '/usr/local/airflow/airflow.cfg', + 'name': 'airflow-config', + 'readOnly': True, + 'subPath': 'airflow.cfg', + }, + { + 'mountPath': '/usr/local/airflow/config/airflow_local_settings.py', + 'name': 'airflow-config', + 'readOnly': True, + 'subPath': 'airflow_local_settings.py', + } + ], + volume_mounts + ) + + def test_set_airflow_configmap_different_for_local_setting(self): + """ + Test that airflow_local_settings.py can be set via configmap by + checking volume & volume-mounts are set correctly. + """ + self.kube_config.airflow_home = '/usr/local/airflow' + self.kube_config.airflow_configmap = 'airflow-configmap' + self.kube_config.airflow_local_settings_configmap = 'airflow-ls-configmap' + self.kube_config.dags_folder = '/workers/path/to/dags' + + worker_config = WorkerConfiguration(self.kube_config) + kube_executor_config = KubernetesExecutorConfig(annotations=[], + volumes=[], + volume_mounts=[]) + + pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id", + "test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'", + kube_executor_config) + + airflow_local_settings_volume = [ + volume for volume in pod.volumes if volume["name"] == 'airflow-local-settings' + ] + # Test that volume_name is found + self.assertEqual(1, len(airflow_local_settings_volume)) + + # Test that config map exists + self.assertEqual( + [{'configMap': {'name': 'airflow-ls-configmap'}, 'name': 'airflow-local-settings'}], + airflow_local_settings_volume + ) + + # Test that 2 Volume Mounts exists and has 2 different mount-paths + # One for airflow.cfg + # Second for airflow_local_settings.py + airflow_cfg_volume_mount = [ + volume_mount for volume_mount in pod.volume_mounts + if volume_mount['name'] == 'airflow-config' + ] + + local_setting_volume_mount = [ + volume_mount for volume_mount in pod.volume_mounts + if volume_mount['name'] == 'airflow-local-settings' + ] + self.assertEqual(1, len(airflow_cfg_volume_mount)) + self.assertEqual(1, len(local_setting_volume_mount)) + + self.assertEqual( + [ + { + 'mountPath': '/usr/local/airflow/config/airflow_local_settings.py', + 'name': 'airflow-local-settings', + 'readOnly': True, + 'subPath': 'airflow_local_settings.py', + } + ], + local_setting_volume_mount + ) + + print(airflow_cfg_volume_mount) + + self.assertEqual( + [ + { + 'mountPath': '/usr/local/airflow/airflow.cfg', + 'name': 'airflow-config', + 'readOnly': True, + 'subPath': 'airflow.cfg', + } + ], + airflow_cfg_volume_mount + ) + + def test_kubernetes_environment_variables(self): # Tests the kubernetes environment variables get copied into the worker pods input_environment = {