From 66d1e6247a12691a5133d9ece724537d2b5ed76a Mon Sep 17 00:00:00 2001 From: Adelbert Chang Date: Thu, 26 Jul 2018 10:42:43 -0700 Subject: [PATCH] [SPARK-24960][K8S] explicitly expose ports on driver container --- .../apache/spark/deploy/k8s/Constants.scala | 1 + .../k8s/features/BasicDriverFeatureStep.scala | 22 +++++++++++++++++++ .../BasicDriverFeatureStepSuite.scala | 18 ++++++++++++++- 3 files changed, 40 insertions(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala index 5ecdd3a04d77b..f82cd7fd02e12 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala @@ -47,6 +47,7 @@ private[spark] object Constants { val DEFAULT_BLOCKMANAGER_PORT = 7079 val DRIVER_PORT_NAME = "driver-rpc-port" val BLOCK_MANAGER_PORT_NAME = "blockmanager" + val UI_PORT_NAME = "spark-ui" // Environment Variables val ENV_DRIVER_URL = "SPARK_DRIVER_URL" diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala index 7e67b51de6e04..575bc54ffe2bb 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala @@ -27,6 +27,7 @@ import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.submit._ import org.apache.spark.internal.config._ +import org.apache.spark.ui.SparkUI private[spark] class BasicDriverFeatureStep( conf: KubernetesConf[KubernetesDriverSpecificConf]) @@ -72,10 +73,31 @@ private[spark] class BasicDriverFeatureStep( ("cpu", new QuantityBuilder(false).withAmount(limitCores).build()) } + val driverPort = conf.sparkConf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT) + val driverBlockManagerPort = conf.sparkConf.getInt( + DRIVER_BLOCK_MANAGER_PORT.key, + DEFAULT_BLOCKMANAGER_PORT + ) + val driverUIPort = SparkUI.getUIPort(conf.sparkConf) val driverContainer = new ContainerBuilder(pod.container) .withName(DRIVER_CONTAINER_NAME) .withImage(driverContainerImage) .withImagePullPolicy(conf.imagePullPolicy()) + .addNewPort() + .withName(DRIVER_PORT_NAME) + .withContainerPort(driverPort) + .withProtocol("TCP") + .endPort() + .addNewPort() + .withName(BLOCK_MANAGER_PORT_NAME) + .withContainerPort(driverBlockManagerPort) + .withProtocol("TCP") + .endPort() + .addNewPort() + .withName(UI_PORT_NAME) + .withContainerPort(driverUIPort) + .withProtocol("TCP") + .endPort() .addAllToEnv(driverCustomEnvs.asJava) .addNewEnv() .withName(ENV_DRIVER_BIND_ADDRESS) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala index 165f46a07df2f..d98e113554648 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.deploy.k8s.features import scala.collection.JavaConverters._ -import io.fabric8.kubernetes.api.model.LocalObjectReferenceBuilder +import io.fabric8.kubernetes.api.model.{ContainerPort, ContainerPortBuilder, LocalObjectReferenceBuilder} import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, SparkPod} @@ -26,6 +26,7 @@ import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.submit.JavaMainAppResource import org.apache.spark.deploy.k8s.submit.PythonMainAppResource +import org.apache.spark.ui.SparkUI class BasicDriverFeatureStepSuite extends SparkFunSuite { @@ -87,6 +88,14 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { assert(configuredPod.container.getImage === "spark-driver:latest") assert(configuredPod.container.getImagePullPolicy === CONTAINER_IMAGE_PULL_POLICY) + val expectedPortNames = Set( + containerPort(DRIVER_PORT_NAME, DEFAULT_DRIVER_PORT), + containerPort(BLOCK_MANAGER_PORT_NAME, DEFAULT_BLOCKMANAGER_PORT), + containerPort(UI_PORT_NAME, SparkUI.DEFAULT_PORT) + ) + val foundPortNames = configuredPod.container.getPorts.asScala.toSet + assert(expectedPortNames === foundPortNames) + assert(configuredPod.container.getEnv.size === 3) val envs = configuredPod.container .getEnv @@ -203,4 +212,11 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { "spark.files" -> "https://localhost:9000/file1.txt,/opt/spark/file2.txt") assert(additionalProperties === expectedSparkConf) } + + def containerPort(name: String, portNumber: Int): ContainerPort = + new ContainerPortBuilder() + .withName(name) + .withContainerPort(portNumber) + .withProtocol("TCP") + .build() }