javax.servlet
javax.servlet-api
diff --git a/server/src/main/resources/org/apache/livy/server/ui/static/css/livy-ui.css b/server/src/main/resources/org/apache/livy/server/ui/static/css/livy-ui.css
index fc2ca3075..a7df0ec2a 100644
--- a/server/src/main/resources/org/apache/livy/server/ui/static/css/livy-ui.css
+++ b/server/src/main/resources/org/apache/livy/server/ui/static/css/livy-ui.css
@@ -41,6 +41,12 @@ td .progress {
margin: 0;
}
+.with-scroll-bar {
+ display: block;
+ overflow-y: scroll;
+ max-height: 200px;
+}
+
#session-summary {
margin: 20px 0;
}
\ No newline at end of file
diff --git a/server/src/main/resources/org/apache/livy/server/ui/static/js/all-sessions.js b/server/src/main/resources/org/apache/livy/server/ui/static/js/all-sessions.js
index d8a84a761..fd68ff715 100644
--- a/server/src/main/resources/org/apache/livy/server/ui/static/js/all-sessions.js
+++ b/server/src/main/resources/org/apache/livy/server/ui/static/js/all-sessions.js
@@ -30,7 +30,7 @@ function loadSessionsTable(sessions) {
tdWrap(session.proxyUser) +
tdWrap(session.kind) +
tdWrap(session.state) +
- tdWrap(logLinks(session, "session")) +
+ tdWrapWithClass(logLinks(session, "session"), "with-scroll-bar") +
""
);
});
@@ -46,7 +46,7 @@ function loadBatchesTable(sessions) {
tdWrap(session.owner) +
tdWrap(session.proxyUser) +
tdWrap(session.state) +
- tdWrap(logLinks(session, "batch")) +
+ tdWrapWithClass(logLinks(session, "batch"), "with-scroll-bar") +
""
);
});
diff --git a/server/src/main/resources/org/apache/livy/server/ui/static/js/livy-ui.js b/server/src/main/resources/org/apache/livy/server/ui/static/js/livy-ui.js
index f2d743ae6..af9352512 100644
--- a/server/src/main/resources/org/apache/livy/server/ui/static/js/livy-ui.js
+++ b/server/src/main/resources/org/apache/livy/server/ui/static/js/livy-ui.js
@@ -52,10 +52,23 @@ function driverLogLink(session) {
}
}
+function executorsLogLinks(session) {
+ var executorLogUrls = session.appInfo.executorLogUrls;
+ if (executorLogUrls != null) {
+ return executorLogUrls.split(";").map(function (pair) {
+ var nameAndLink = pair.split("#");
+ return divWrap(anchorLink(nameAndLink[1], nameAndLink[0]));
+ }).join("");
+ } else {
+ return "";
+ }
+}
+
function logLinks(session, kind) {
var sessionLog = divWrap(uiLink(kind + "/" + session.id + "/log", "session"));
var driverLog = divWrap(driverLogLink(session));
- return sessionLog + driverLog;
+ var executorsLogs = executorsLogLinks(session);
+ return sessionLog + driverLog + executorsLogs;
}
function appIdLink(session) {
@@ -75,6 +88,18 @@ function tdWrap(val) {
return "" + inner + " | ";
}
+function tdWrapWithClass(val, cl) {
+ var inner = "";
+ if (val != null) {
+ inner = val;
+ }
+ var clVal = "";
+ if (cl != null) {
+ clVal = " class=\"" + cl + "\"";
+ }
+ return "" + inner + " | ";
+}
+
function preWrap(inner) {
return "" + escapeHtml(inner) + "
";
}
diff --git a/server/src/main/resources/org/apache/livy/server/ui/static/js/session.js b/server/src/main/resources/org/apache/livy/server/ui/static/js/session.js
index c87e5ca40..3a23dc982 100644
--- a/server/src/main/resources/org/apache/livy/server/ui/static/js/session.js
+++ b/server/src/main/resources/org/apache/livy/server/ui/static/js/session.js
@@ -23,6 +23,18 @@ function sumWrap(name, val) {
}
}
+function sumWrapWithClass(name, val, cl) {
+ var clVal = "";
+ if (cl != null) {
+ clVal = " class=\"" + cl + "\"";
+ }
+ if (val != null) {
+ return "" + name + ": " + val + "";
+ } else {
+ return "";
+ }
+}
+
function formatError(output) {
var errStr = output.evalue + "\n";
var trace = output.traceback;
@@ -93,7 +105,7 @@ function appendSummary(session) {
sumWrap("Proxy User", session.proxyUser) +
sumWrap("Session Kind", session.kind) +
sumWrap("State", session.state) +
- sumWrap("Logs", logLinks(session, "session")) +
+ sumWrapWithClass("Logs", logLinks(session, "session"), "with-scroll-bar") +
""
);
}
diff --git a/server/src/main/scala/org/apache/livy/LivyConf.scala b/server/src/main/scala/org/apache/livy/LivyConf.scala
index 720aa4e15..6bef09748 100644
--- a/server/src/main/scala/org/apache/livy/LivyConf.scala
+++ b/server/src/main/scala/org/apache/livy/LivyConf.scala
@@ -258,6 +258,63 @@ object LivyConf {
// value specifies max attempts to retry when safe mode is ON in hdfs filesystem
val HDFS_SAFE_MODE_MAX_RETRY_ATTEMPTS = Entry("livy.server.hdfs.safe-mode.max.retry.attempts", 12)
+ // Kubernetes oauth token file path.
+ val KUBERNETES_OAUTH_TOKEN_FILE = Entry("livy.server.kubernetes.oauthTokenFile", "")
+ // Kubernetes oauth token string value.
+ val KUBERNETES_OAUTH_TOKEN_VALUE = Entry("livy.server.kubernetes.oauthTokenValue", "")
+ // Kubernetes CA cert file path.
+ val KUBERNETES_CA_CERT_FILE = Entry("livy.server.kubernetes.caCertFile", "")
+ // Kubernetes client key file path.
+ val KUBERNETES_CLIENT_KEY_FILE = Entry("livy.server.kubernetes.clientKeyFile", "")
+ // Kubernetes client cert file path.
+ val KUBERNETES_CLIENT_CERT_FILE = Entry("livy.server.kubernetes.clientCertFile", "")
+
+ // If Livy can't find the Kubernetes app within this time, consider it lost.
+ val KUBERNETES_APP_LOOKUP_TIMEOUT = Entry("livy.server.kubernetes.app-lookup-timeout", "600s")
+ // How often Livy polls Kubernetes to refresh Kubernetes app state.
+ val KUBERNETES_POLL_INTERVAL = Entry("livy.server.kubernetes.poll-interval", "15s")
+
+ // How long to check livy session leakage.
+ val KUBERNETES_APP_LEAKAGE_CHECK_TIMEOUT =
+ Entry("livy.server.kubernetes.app-leakage.check-timeout", "600s")
+ // How often to check livy session leakage.
+ val KUBERNETES_APP_LEAKAGE_CHECK_INTERVAL =
+ Entry("livy.server.kubernetes.app-leakage.check-interval", "60s")
+
+ // Weather to create Kubernetes Nginx Ingress for Spark UI.
+ val KUBERNETES_INGRESS_CREATE = Entry("livy.server.kubernetes.ingress.create", false)
+ // Kubernetes Ingress class name.
+ val KUBERNETES_INGRESS_CLASS_NAME = Entry("livy.server.kubernetes.ingress.className", "")
+ // Kubernetes Nginx Ingress protocol.
+ val KUBERNETES_INGRESS_PROTOCOL = Entry("livy.server.kubernetes.ingress.protocol", "http")
+ // Kubernetes Nginx Ingress host.
+ val KUBERNETES_INGRESS_HOST = Entry("livy.server.kubernetes.ingress.host", "localhost")
+ // Kubernetes Nginx Ingress additional configuration snippet.
+ val KUBERNETES_INGRESS_ADDITIONAL_CONF_SNIPPET =
+ Entry("livy.server.kubernetes.ingress.additionalConfSnippet", "")
+ // Kubernetes Nginx Ingress additional annotations: key1=value1;key2=value2;... .
+ val KUBERNETES_INGRESS_ADDITIONAL_ANNOTATIONS =
+ Entry("livy.server.kubernetes.ingress.additionalAnnotations", "")
+ // Kubernetes secret name for Nginx Ingress TLS.
+ // Is omitted if 'livy.server.kubernetes.ingress.protocol' value doesn't end with 's'
+ val KUBERNETES_INGRESS_TLS_SECRET_NAME =
+ Entry("livy.server.kubernetes.ingress.tls.secretName", "spark-cluster-tls")
+
+ val KUBERNETES_GRAFANA_LOKI_ENABLED = Entry("livy.server.kubernetes.grafana.loki.enabled", false)
+ val KUBERNETES_GRAFANA_URL = Entry("livy.server.kubernetes.grafana.url", "http://localhost:3000")
+ val KUBERNETES_GRAFANA_LOKI_DATASOURCE =
+ Entry("livy.server.kubernetes.grafana.loki.datasource", "loki")
+ val KUBERNETES_GRAFANA_TIME_RANGE = Entry("livy.server.kubernetes.grafana.timeRange", "6h")
+
+ // side car container for spark pods enabled?
+ val KUBERNETES_SPARK_SIDECAR_ENABLED =
+ Entry("livy.server.kubernetes.spark.sidecar.enabled", true)
+ // container name to identify spark pod if running with sidecar containers
+ val KUBERNETES_SPARK_CONTAINER_NAME =
+ Entry("livy.server.kubernetes.spark.container.name", "spark-container")
+
+ val UI_HISTORY_SERVER_URL = Entry("livy.ui.history-server-url", "http://spark-history-server")
+
// Whether session timeout should be checked, by default it will be checked, which means inactive
// session will be stopped after "livy.server.session.timeout"
val SESSION_TIMEOUT_CHECK = Entry("livy.server.session.timeout-check", true)
@@ -371,6 +428,9 @@ class LivyConf(loadDefaults: Boolean) extends ClientConf[LivyConf](null) {
/** Return true if spark master starts with yarn. */
def isRunningOnYarn(): Boolean = sparkMaster().startsWith("yarn")
+ /** Return true if spark master starts with k8s. */
+ def isRunningOnKubernetes(): Boolean = sparkMaster().startsWith("k8s")
+
/** Return the spark deploy mode Livy sessions should use. */
def sparkDeployMode(): Option[String] = Option(get(LIVY_SPARK_DEPLOY_MODE)).filterNot(_.isEmpty)
diff --git a/server/src/main/scala/org/apache/livy/server/LivyServer.scala b/server/src/main/scala/org/apache/livy/server/LivyServer.scala
index 3e715bdf1..c7c7fe756 100644
--- a/server/src/main/scala/org/apache/livy/server/LivyServer.scala
+++ b/server/src/main/scala/org/apache/livy/server/LivyServer.scala
@@ -43,8 +43,8 @@ import org.apache.livy.server.recovery.{SessionStore, StateStore, ZooKeeperManag
import org.apache.livy.server.ui.UIServlet
import org.apache.livy.sessions.{BatchSessionManager, InteractiveSessionManager}
import org.apache.livy.sessions.SessionManager.SESSION_RECOVERY_MODE_OFF
+import org.apache.livy.utils.{SparkKubernetesApp, SparkYarnApp}
import org.apache.livy.utils.LivySparkUtils._
-import org.apache.livy.utils.SparkYarnApp
class LivyServer extends Logging {
@@ -142,10 +142,12 @@ class LivyServer extends Logging {
testRecovery(livyConf)
- // Initialize YarnClient ASAP to save time.
+ // Initialize YarnClient/KubernetesClient ASAP to save time.
if (livyConf.isRunningOnYarn()) {
SparkYarnApp.init(livyConf)
Future { SparkYarnApp.yarnClient }
+ } else if (livyConf.isRunningOnKubernetes()) {
+ SparkKubernetesApp.init(livyConf)
}
if (livyConf.get(LivyConf.RECOVERY_STATE_STORE) == "zookeeper") {
@@ -415,10 +417,10 @@ class LivyServer extends Logging {
}
private[livy] def testRecovery(livyConf: LivyConf): Unit = {
- if (!livyConf.isRunningOnYarn()) {
- // If recovery is turned on but we are not running on YARN, quit.
+ if (!livyConf.isRunningOnYarn() && !livyConf.isRunningOnKubernetes()) {
+ // If recovery is turned on, and we are not running on YARN or Kubernetes, quit.
require(livyConf.get(LivyConf.RECOVERY_MODE) == SESSION_RECOVERY_MODE_OFF,
- "Session recovery requires YARN.")
+ "Session recovery requires YARN or Kubernetes.")
}
}
}
diff --git a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
index 34499d34c..d8c4a16b5 100644
--- a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
+++ b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
@@ -462,11 +462,11 @@ class InteractiveSession(
app = mockApp.orElse {
val driverProcess = client.flatMap { c => Option(c.getDriverProcess) }
.map(new LineBufferedProcess(_, livyConf.getInt(LivyConf.SPARK_LOGS_SIZE)))
-
- if (livyConf.isRunningOnYarn() || driverProcess.isDefined) {
- Some(SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this)))
+ if (!livyConf.isRunningOnKubernetes()) {
+ driverProcess.map(_ => SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this)))
} else {
- None
+ // Create SparkApp for Kubernetes anyway
+ Some(SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this)))
}
}
@@ -547,6 +547,8 @@ class InteractiveSession(
transition(SessionState.ShuttingDown)
sessionStore.remove(RECOVERY_SESSION_TYPE, id)
client.foreach { _.stop(true) }
+ // We need to call #kill here explicitly to delete Interactive pods from the cluster
+ if (livyConf.isRunningOnKubernetes()) app.foreach(_.kill())
} catch {
case _: Exception =>
app.foreach {
diff --git a/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala b/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala
index b81bbc036..8742f65e7 100644
--- a/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala
+++ b/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala
@@ -192,10 +192,10 @@ class SessionManager[S <: Session, R <: RecoveryMetadata : ClassTag](
Future.sequence(all().filter(expired).map { s =>
s.state match {
- case st: FinishedSessionState =>
+ case _: FinishedSessionState =>
info(s"Deleting $s because it finished before ${sessionStateRetainedInSec / 1e9} secs.")
case _ =>
- info(s"Deleting $s because it was inactive or the time to leave the period is over.")
+ info(s"Deleting $s because it was inactive for more than ${sessionTimeout / 1e9} secs.")
}
delete(s)
})
diff --git a/server/src/main/scala/org/apache/livy/utils/SparkApp.scala b/server/src/main/scala/org/apache/livy/utils/SparkApp.scala
index 9afe28162..e424f80fc 100644
--- a/server/src/main/scala/org/apache/livy/utils/SparkApp.scala
+++ b/server/src/main/scala/org/apache/livy/utils/SparkApp.scala
@@ -24,12 +24,20 @@ import org.apache.livy.LivyConf
object AppInfo {
val DRIVER_LOG_URL_NAME = "driverLogUrl"
val SPARK_UI_URL_NAME = "sparkUiUrl"
+ val EXECUTORS_LOG_URLS_NAME = "executorLogUrls"
}
-case class AppInfo(var driverLogUrl: Option[String] = None, var sparkUiUrl: Option[String] = None) {
+case class AppInfo(
+ var driverLogUrl: Option[String] = None,
+ var sparkUiUrl: Option[String] = None,
+ var executorLogUrls: Option[String] = None) {
import AppInfo._
def asJavaMap: java.util.Map[String, String] =
- Map(DRIVER_LOG_URL_NAME -> driverLogUrl.orNull, SPARK_UI_URL_NAME -> sparkUiUrl.orNull).asJava
+ Map(
+ DRIVER_LOG_URL_NAME -> driverLogUrl.orNull,
+ SPARK_UI_URL_NAME -> sparkUiUrl.orNull,
+ EXECUTORS_LOG_URLS_NAME -> executorLogUrls.orNull
+ ).asJava
}
trait SparkAppListener {
@@ -71,13 +79,21 @@ object SparkApp {
sparkConf ++ Map(
SPARK_YARN_TAG_KEY -> mergedYarnTags,
"spark.yarn.submit.waitAppCompletion" -> "false")
+ } else if (livyConf.isRunningOnKubernetes()) {
+ import KubernetesConstants._
+ sparkConf ++ Map(
+ s"spark.kubernetes.driver.label.$SPARK_APP_TAG_LABEL" -> uniqueAppTag,
+ s"spark.kubernetes.executor.label.$SPARK_APP_TAG_LABEL" -> uniqueAppTag,
+ "spark.kubernetes.submission.waitAppCompletion" -> "false",
+ "spark.ui.proxyBase" -> s"/$uniqueAppTag")
} else {
sparkConf
}
}
/**
- * Return a SparkApp object to control the underlying Spark application via YARN or spark-submit.
+ * Return a SparkApp object to control the underlying Spark application via YARN, Kubernetes
+ * or spark-submit.
*
* @param uniqueAppTag A tag that can uniquely identify the application.
*/
@@ -89,8 +105,11 @@ object SparkApp {
listener: Option[SparkAppListener]): SparkApp = {
if (livyConf.isRunningOnYarn()) {
new SparkYarnApp(uniqueAppTag, appId, process, listener, livyConf)
+ } else if (livyConf.isRunningOnKubernetes()) {
+ new SparkKubernetesApp(uniqueAppTag, appId, process, listener, livyConf)
} else {
- require(process.isDefined, "process must not be None when Livy master is not YARN.")
+ require(process.isDefined, "process must not be None when Livy master is not YARN or" +
+ "Kubernetes.")
new SparkProcApp(process.get, listener)
}
}
diff --git a/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala
new file mode 100644
index 000000000..b5160aaf1
--- /dev/null
+++ b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala
@@ -0,0 +1,739 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.livy.utils
+
+import java.net.URLEncoder
+import java.util.Collections
+import java.util.concurrent.TimeoutException
+
+import scala.annotation.tailrec
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent._
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.util.{Failure, Success, Try}
+import scala.util.control.NonFatal
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.api.model.networking.v1.{Ingress, IngressBuilder}
+import io.fabric8.kubernetes.client.{Config, ConfigBuilder, _}
+import org.apache.commons.lang.StringUtils
+
+import org.apache.livy.{LivyConf, Logging, Utils}
+
+object SparkKubernetesApp extends Logging {
+
+ private val leakedAppTags = new java.util.concurrent.ConcurrentHashMap[String, Long]()
+
+ private val leakedAppsGCThread = new Thread() {
+ override def run(): Unit = {
+ import KubernetesExtensions._
+ while (true) {
+ if (!leakedAppTags.isEmpty) {
+ // kill the app if found it and remove it if exceeding a threshold
+ val iter = leakedAppTags.entrySet().iterator()
+ var isRemoved = false
+ val now = System.currentTimeMillis()
+ val apps = withRetry(kubernetesClient.getApplications())
+ while (iter.hasNext) {
+ val entry = iter.next()
+ apps.find(_.getApplicationTag.contains(entry.getKey))
+ .foreach({
+ app =>
+ info(s"Kill leaked app ${app.getApplicationId}")
+ withRetry(kubernetesClient.killApplication(app))
+ iter.remove()
+ isRemoved = true
+ })
+ if (!isRemoved) {
+ if ((entry.getValue - now) > sessionLeakageCheckTimeout) {
+ iter.remove()
+ info(s"Remove leaked Kubernetes app tag ${entry.getKey}")
+ }
+ }
+ }
+ }
+ Thread.sleep(sessionLeakageCheckInterval)
+ }
+ }
+ }
+
+ val RefreshServiceAccountTokenThread = new Thread() {
+ override def run(): Unit = {
+ while (true) {
+ var currentContext = new Context()
+ var currentContextName = new String
+ val config = kubernetesClient.getConfiguration
+ if (config.getCurrentContext != null) {
+ currentContext = config.getCurrentContext.getContext
+ currentContextName = config.getCurrentContext.getName
+ }
+
+ var newAccessToken = new String
+ val newestConfig = Config.autoConfigure(currentContextName)
+ newAccessToken = newestConfig.getOauthToken
+ info(s"Refresh a new token ${newAccessToken}")
+
+ config.setOauthToken(newAccessToken)
+ kubernetesClient = new DefaultKubernetesClient(config)
+
+ // Token will expire 1 hour default, community recommend to update every 5 minutes
+ Thread.sleep(300000)
+ }
+ }
+ }
+
+ private var livyConf: LivyConf = _
+
+ private var cacheLogSize: Int = _
+ private var appLookupTimeout: FiniteDuration = _
+ private var pollInterval: FiniteDuration = _
+
+ private var sessionLeakageCheckTimeout: Long = _
+ private var sessionLeakageCheckInterval: Long = _
+
+ var kubernetesClient: DefaultKubernetesClient = _
+
+ def init(livyConf: LivyConf): Unit = {
+ this.livyConf = livyConf
+
+ // KubernetesClient is thread safe. Create once, share it across threads.
+ kubernetesClient =
+ KubernetesClientFactory.createKubernetesClient(livyConf)
+
+ cacheLogSize = livyConf.getInt(LivyConf.SPARK_LOGS_SIZE)
+ appLookupTimeout = livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LOOKUP_TIMEOUT).milliseconds
+ pollInterval = livyConf.getTimeAsMs(LivyConf.KUBERNETES_POLL_INTERVAL).milliseconds
+
+ sessionLeakageCheckInterval =
+ livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LEAKAGE_CHECK_INTERVAL)
+ sessionLeakageCheckTimeout = livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LEAKAGE_CHECK_TIMEOUT)
+
+ leakedAppsGCThread.setDaemon(true)
+ leakedAppsGCThread.setName("LeakedAppsGCThread")
+ leakedAppsGCThread.start()
+
+ RefreshServiceAccountTokenThread.
+ setName("RefreshServiceAccountTokenThread")
+ RefreshServiceAccountTokenThread.setDaemon(true)
+ RefreshServiceAccountTokenThread.start()
+ }
+
+ // Returning T, throwing the exception on failure
+ // When istio-proxy restarts, the access to K8s API from livy could be down
+ // until envoy comes back, which could take upto 30 seconds
+ @tailrec
+ private def withRetry[T](fn: => T, n: Int = 10, retryBackoff: Long = 3000): T = {
+ Try { fn } match {
+ case Success(x) => x
+ case _ if n > 1 =>
+ Thread.sleep(Math.max(retryBackoff, 3000))
+ withRetry(fn, n - 1)
+ case Failure(e) => throw e
+ }
+ }
+
+}
+
+class SparkKubernetesApp private[utils] (
+ appTag: String,
+ appIdOption: Option[String],
+ process: Option[LineBufferedProcess],
+ listener: Option[SparkAppListener],
+ livyConf: LivyConf,
+ kubernetesClient: => KubernetesClient = SparkKubernetesApp.kubernetesClient) // For unit test.
+ extends SparkApp
+ with Logging {
+
+ import KubernetesExtensions._
+ import SparkKubernetesApp._
+
+ private val appPromise: Promise[KubernetesApplication] = Promise()
+ private[utils] var state: SparkApp.State = SparkApp.State.STARTING
+ private var kubernetesDiagnostics: IndexedSeq[String] = IndexedSeq.empty[String]
+ private var kubernetesAppLog: IndexedSeq[String] = IndexedSeq.empty[String]
+
+ // Exposed for unit test.
+ // TODO Instead of spawning a thread for every session, create a centralized thread and
+ // batch Kubernetes queries.
+ private[utils] val kubernetesAppMonitorThread = Utils
+ .startDaemonThread(s"kubernetesAppMonitorThread-$this") {
+ try {
+ // Get KubernetesApplication by appTag.
+ val app: KubernetesApplication = try {
+ getAppFromTag(appTag, pollInterval, appLookupTimeout.fromNow)
+ } catch {
+ case e: Exception =>
+ appPromise.failure(e)
+ throw e
+ }
+ appPromise.success(app)
+ val appId = app.getApplicationId
+
+ Thread.currentThread().setName(s"kubernetesAppMonitorThread-$appId")
+ listener.foreach(_.appIdKnown(appId))
+
+ if (livyConf.getBoolean(LivyConf.KUBERNETES_INGRESS_CREATE)) {
+ withRetry(kubernetesClient.createSparkUIIngress(app, livyConf))
+ }
+
+ var appInfo = AppInfo()
+ while (isRunning) {
+ try {
+ Clock.sleep(pollInterval.toMillis)
+
+ // Refresh application state
+ val appReport = withRetry {
+ debug(s"getApplicationReport, applicationId: ${app.getApplicationId}, " +
+ s"namespace: ${app.getApplicationNamespace} " +
+ s"applicationTag: ${app.getApplicationTag}")
+ val report = kubernetesClient.getApplicationReport(livyConf, app,
+ cacheLogSize = cacheLogSize)
+ report
+ }
+
+ kubernetesAppLog = appReport.getApplicationLog
+ kubernetesDiagnostics = appReport.getApplicationDiagnostics
+ changeState(mapKubernetesState(appReport.getApplicationState, appTag))
+
+ val latestAppInfo = AppInfo(
+ appReport.getDriverLogUrl,
+ appReport.getTrackingUrl,
+ appReport.getExecutorsLogUrls
+ )
+ if (appInfo != latestAppInfo) {
+ listener.foreach(_.infoChanged(latestAppInfo))
+ appInfo = latestAppInfo
+ }
+ } catch {
+ // TODO analyse available exceptions
+ case e: Throwable =>
+ throw e
+ }
+ }
+ debug(s"$appId $state ${kubernetesDiagnostics.mkString(" ")}")
+ } catch {
+ case _: InterruptedException =>
+ kubernetesDiagnostics = ArrayBuffer("Application stopped by user.")
+ changeState(SparkApp.State.KILLED)
+ case NonFatal(e) =>
+ error(s"Error while refreshing Kubernetes state", e)
+ kubernetesDiagnostics = ArrayBuffer(e.getMessage)
+ changeState(SparkApp.State.FAILED)
+ } finally {
+ if (!isRunning) {
+ listener.foreach(_.infoChanged(AppInfo(sparkUiUrl = Option(buildHistoryServerUiUrl(
+ livyConf, Try(appPromise.future.value.get.get.getApplicationId).getOrElse("unknown")
+ )))))
+ }
+ }
+ }
+
+ override def log(): IndexedSeq[String] =
+ ("stdout: " +: kubernetesAppLog) ++
+ ("\nstderr: " +: (process.map(_.inputLines).getOrElse(ArrayBuffer.empty[String]) ++
+ process.map(_.errorLines).getOrElse(ArrayBuffer.empty[String]))) ++
+ ("\nKubernetes Diagnostics: " +: kubernetesDiagnostics)
+
+ override def kill(): Unit = synchronized {
+ try {
+ withRetry(kubernetesClient.killApplication(Await.result(appPromise.future, appLookupTimeout)))
+ } catch {
+ // We cannot kill the Kubernetes app without the appTag.
+ // There's a chance the Kubernetes app hasn't been submitted during a livy-server failure.
+ // We don't want a stuck session that can't be deleted. Emit a warning and move on.
+ case _: TimeoutException | _: InterruptedException =>
+ warn("Deleting a session while its Kubernetes application is not found.")
+ kubernetesAppMonitorThread.interrupt()
+ } finally {
+ process.foreach(_.destroy())
+ }
+ }
+
+ private def isRunning: Boolean = {
+ state != SparkApp.State.FAILED &&
+ state != SparkApp.State.FINISHED &&
+ state != SparkApp.State.KILLED
+ }
+
+ private def changeState(newState: SparkApp.State.Value): Unit = {
+ if (state != newState) {
+ listener.foreach(_.stateChanged(state, newState))
+ state = newState
+ }
+ }
+
+ /**
+ * Find the corresponding KubernetesApplication from an application tag.
+ *
+ * @param appTag The application tag tagged on the target application.
+ * If the tag is not unique, it returns the first application it found.
+ * @return KubernetesApplication or the failure.
+ */
+ @tailrec
+ private def getAppFromTag(
+ appTag: String,
+ pollInterval: duration.Duration,
+ deadline: Deadline): KubernetesApplication = {
+ import KubernetesExtensions._
+
+ withRetry(kubernetesClient.getApplications().find(_.getApplicationTag.contains(appTag)))
+ match {
+ case Some(app) => app
+ case None =>
+ if (deadline.isOverdue) {
+ process.foreach(_.destroy())
+ leakedAppTags.put(appTag, System.currentTimeMillis())
+ throw new IllegalStateException(s"No Kubernetes application is found with tag" +
+ s" $appTag in ${livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LOOKUP_TIMEOUT) / 1000}" +
+ " seconds. This may be because 1) spark-submit failed to submit application to " +
+ "Kubernetes; or 2) Kubernetes cluster doesn't have enough resources to start the " +
+ "application in time. Please check Livy log and Kubernetes log to know the details.")
+ } else if (process.exists(p => !p.isAlive && p.exitValue() != 0)) {
+ throw new IllegalStateException(s"Failed to submit Kubernetes application with tag" +
+ s" $appTag. 'spark-submit' exited with non-zero status. " +
+ s"Please check Livy log and Kubernetes log to know the details.")
+ } else {
+ Clock.sleep(pollInterval.toMillis)
+ getAppFromTag(appTag, pollInterval, deadline)
+ }
+ }
+ }
+
+ // Exposed for unit test.
+ private[utils] def mapKubernetesState(
+ kubernetesAppState: String,
+ appTag: String
+ ): SparkApp.State.Value = {
+ import KubernetesApplicationState._
+ kubernetesAppState.toLowerCase match {
+ case PENDING | CONTAINER_CREATING =>
+ SparkApp.State.STARTING
+ case RUNNING =>
+ SparkApp.State.RUNNING
+ case COMPLETED | SUCCEEDED =>
+ SparkApp.State.FINISHED
+ case FAILED | ERROR =>
+ SparkApp.State.FAILED
+ case other => // any other combination is invalid, so FAIL the application.
+ error(s"Unknown Kubernetes state $other for app with tag $appTag.")
+ SparkApp.State.FAILED
+ }
+ }
+
+ private def buildHistoryServerUiUrl(livyConf: LivyConf, appId: String): String =
+ s"${livyConf.get(LivyConf.UI_HISTORY_SERVER_URL)}/history/$appId/jobs/"
+
+}
+
+object KubernetesApplicationState {
+ val PENDING = "pending"
+ val CONTAINER_CREATING = "containercreating"
+ val RUNNING = "running"
+ val COMPLETED = "completed"
+ val SUCCEEDED = "succeeded"
+ val FAILED = "failed"
+ val ERROR = "error"
+}
+
+object KubernetesConstants {
+ val SPARK_APP_ID_LABEL = "spark-app-selector"
+ val SPARK_APP_TAG_LABEL = "spark-app-tag"
+ val SPARK_ROLE_LABEL = "spark-role"
+ val SPARK_EXEC_ID_LABEL = "spark-exec-id"
+ val SPARK_ROLE_DRIVER = "driver"
+ val SPARK_ROLE_EXECUTOR = "executor"
+ val SPARK_UI_PORT_NAME = "spark-ui"
+ val CREATED_BY_LIVY_LABEL = Map("created-by" -> "livy")
+}
+
+class KubernetesApplication(driverPod: Pod) {
+
+ import KubernetesConstants._
+
+ private val appTag = driverPod.getMetadata.getLabels.get(SPARK_APP_TAG_LABEL)
+ private val appId = driverPod.getMetadata.getLabels.get(SPARK_APP_ID_LABEL)
+ private val namespace = driverPod.getMetadata.getNamespace
+
+ def getApplicationTag: String = appTag
+
+ def getApplicationId: String = appId
+
+ def getApplicationNamespace: String = namespace
+
+ def getApplicationPod: Pod = driverPod
+}
+
+private[utils] case class KubernetesAppReport(driver: Option[Pod], executors: Seq[Pod],
+ appLog: IndexedSeq[String], ingress: Option[Ingress], livyConf: LivyConf) {
+
+ import KubernetesConstants._
+
+ private val grafanaUrl = livyConf.get(LivyConf.KUBERNETES_GRAFANA_URL)
+ private val timeRange = livyConf.get(LivyConf.KUBERNETES_GRAFANA_TIME_RANGE)
+ private val lokiDatasource = livyConf.get(LivyConf.KUBERNETES_GRAFANA_LOKI_DATASOURCE)
+ private val sparkAppTagLogLabel = SPARK_APP_TAG_LABEL.replaceAll("-", "_")
+ private val sparkRoleLogLabel = SPARK_ROLE_LABEL.replaceAll("-", "_")
+ private val sparkExecIdLogLabel = SPARK_EXEC_ID_LABEL.replaceAll("-", "_")
+
+ def getApplicationState: String =
+ driver.map(getDriverState).getOrElse("unknown")
+
+ // if 'KUBERNETES_SPARK_SIDECAR_ENABLED' is set
+ // inspect the spark container status to figure out the termination status
+ // if spark container cannot be detected, default to pod phase.
+ def getDriverState(driverPod: Pod): String = {
+ val podStatus = driverPod.getStatus
+ val phase = podStatus.getPhase.toLowerCase
+ // if not running with sidecars, just return the pod phase
+ if (!livyConf.getBoolean(LivyConf.KUBERNETES_SPARK_SIDECAR_ENABLED)) {
+ return phase
+ }
+ if (phase != KubernetesApplicationState.RUNNING) {
+ return phase
+ }
+ // if the POD is still running, check spark container termination status
+ // default to pod phase if container state is indeterminate.
+ getTerminalState(podStatus).getOrElse(phase)
+ }
+
+ // if the spark container has terminated
+ // try to figure out status based on termination status
+ def getTerminalState(podStatus: PodStatus): Option[String] = {
+ import scala.collection.JavaConverters._
+ val sparkContainerName = livyConf.get(LivyConf.KUBERNETES_SPARK_CONTAINER_NAME)
+ for (c <- podStatus.getContainerStatuses.asScala) {
+ if (c.getName == sparkContainerName && c.getState.getTerminated != null) {
+ val exitCode = c.getState.getTerminated.getExitCode
+ if (exitCode == 0) {
+ return Some(KubernetesApplicationState.SUCCEEDED)
+ } else {
+ return Some(KubernetesApplicationState.FAILED)
+ }
+ }
+ }
+ None
+ }
+
+ def getApplicationLog: IndexedSeq[String] = appLog
+
+ def getDriverLogUrl: Option[String] = {
+ if (livyConf.getBoolean(LivyConf.KUBERNETES_GRAFANA_LOKI_ENABLED)) {
+ val appTag = driver.map(_.getMetadata.getLabels.get(SPARK_APP_TAG_LABEL))
+ if (appTag.isDefined && appTag.get != null) {
+ return Some(
+ s"""$grafanaUrl/explore?left=""" + URLEncoder.encode(
+ s"""["now-$timeRange","now","$lokiDatasource",""" +
+ s"""{"expr":"{$sparkAppTagLogLabel=\\"${appTag.get}\\",""" +
+ s"""$sparkRoleLogLabel=\\"$SPARK_ROLE_DRIVER\\"}"},""" +
+ s"""{"ui":[true,true,true,"exact"]}]""", "UTF-8")
+ )
+ }
+ }
+ None
+ }
+
+ def getExecutorsLogUrls: Option[String] = {
+ if (livyConf.getBoolean(LivyConf.KUBERNETES_GRAFANA_LOKI_ENABLED)) {
+ val urls = executors.map(_.getMetadata.getLabels).flatMap(labels => {
+ val sparkAppTag = labels.get(SPARK_APP_TAG_LABEL)
+ val sparkExecId = labels.get(SPARK_EXEC_ID_LABEL)
+ if (sparkAppTag != null && sparkExecId != null) {
+ val sparkRole = labels.getOrDefault(SPARK_ROLE_LABEL, SPARK_ROLE_EXECUTOR)
+ Some(s"executor-$sparkExecId#$grafanaUrl/explore?left=" + URLEncoder.encode(
+ s"""["now-$timeRange","now","$lokiDatasource",""" +
+ s"""{"expr":"{$sparkAppTagLogLabel=\\"$sparkAppTag\\",""" +
+ s"""$sparkRoleLogLabel=\\"$sparkRole\\",""" +
+ s"""$sparkExecIdLogLabel=\\"$sparkExecId\\"}"},""" +
+ s"""{"ui":[true,true,true,"exact"]}]""", "UTF-8"))
+ } else {
+ None
+ }
+ })
+ if (urls.nonEmpty) return Some(urls.mkString(";"))
+ }
+ None
+ }
+
+ def getTrackingUrl: Option[String] = {
+ val host = ingress.flatMap(i => Try(i.getSpec.getRules.get(0).getHost).toOption)
+ val path = driver
+ .map(_.getMetadata.getLabels.getOrDefault(SPARK_APP_TAG_LABEL, "unknown"))
+ val protocol = livyConf.get(LivyConf.KUBERNETES_INGRESS_PROTOCOL)
+ if (host.isDefined && path.isDefined) Some(s"$protocol://${host.get}/${path.get}")
+ else None
+ }
+
+ def getApplicationDiagnostics: IndexedSeq[String] = {
+ (Seq(driver) ++ executors.sortBy(_.getMetadata.getName).map(Some(_)))
+ .filter(_.nonEmpty)
+ .map(opt => buildSparkPodDiagnosticsPrettyString(opt.get))
+ .flatMap(_.split("\n")).toIndexedSeq
+ }
+
+ private def buildSparkPodDiagnosticsPrettyString(pod: Pod): String = {
+ import scala.collection.JavaConverters._
+ def printMap(map: Map[_, _]): String = map.map {
+ case (key, value) => s"$key=$value"
+ }.mkString(", ")
+
+ if (pod == null) return "unknown"
+
+ s"${pod.getMetadata.getName}.${pod.getMetadata.getNamespace}:" +
+ s"\n\tnode: ${pod.getSpec.getNodeName}" +
+ s"\n\thostname: ${pod.getSpec.getHostname}" +
+ s"\n\tpodIp: ${pod.getStatus.getPodIP}" +
+ s"\n\tstartTime: ${pod.getStatus.getStartTime}" +
+ s"\n\tphase: ${pod.getStatus.getPhase}" +
+ s"\n\treason: ${pod.getStatus.getReason}" +
+ s"\n\tmessage: ${pod.getStatus.getMessage}" +
+ s"\n\tlabels: ${printMap(pod.getMetadata.getLabels.asScala.toMap)}" +
+ s"\n\tcontainers:" +
+ s"\n\t\t${
+ pod.getSpec.getContainers.asScala.map(container =>
+ s"${container.getName}:" +
+ s"\n\t\t\timage: ${container.getImage}" +
+ s"\n\t\t\trequests: ${printMap(container.getResources.getRequests.asScala.toMap)}" +
+ s"\n\t\t\tlimits: ${printMap(container.getResources.getLimits.asScala.toMap)}" +
+ s"\n\t\t\tcommand: ${container.getCommand} ${container.getArgs}"
+ ).mkString("\n\t\t")
+ }" +
+ s"\n\tconditions:" +
+ s"\n\t\t${pod.getStatus.getConditions.asScala.mkString("\n\t\t")}"
+ }
+
+}
+
+private[utils] object KubernetesExtensions {
+ import KubernetesConstants._
+
+ implicit class KubernetesClientExtensions(client: KubernetesClient) {
+ import scala.collection.JavaConverters._
+
+ private val NGINX_CONFIG_SNIPPET: String =
+ """
+ |proxy_set_header Accept-Encoding "";
+ |sub_filter_last_modified off;
+ |sub_filter_once off;
+ |sub_filter_types text/html text/css text/javascript application/javascript;
+ """.stripMargin
+
+ def getApplications(
+ labels: Map[String, String] = Map(SPARK_ROLE_LABEL -> SPARK_ROLE_DRIVER),
+ appTagLabel: String = SPARK_APP_TAG_LABEL,
+ appIdLabel: String = SPARK_APP_ID_LABEL
+ ): Seq[KubernetesApplication] = {
+ client.pods.inAnyNamespace
+ .withLabels(labels.asJava)
+ .withLabel(appTagLabel)
+ .withLabel(appIdLabel)
+ .list.getItems.asScala.map(new KubernetesApplication(_))
+ }
+
+ def killApplication(app: KubernetesApplication): Boolean = {
+ client.pods.inAnyNamespace.delete(app.getApplicationPod)
+ }
+
+ def getApplicationReport(
+ livyConf: LivyConf,
+ app: KubernetesApplication,
+ cacheLogSize: Int,
+ appTagLabel: String = SPARK_APP_TAG_LABEL
+ ): KubernetesAppReport = {
+ val pods = client.pods.inNamespace(app.getApplicationNamespace)
+ .withLabels(Map(appTagLabel -> app.getApplicationTag).asJava)
+ .list.getItems.asScala
+ val driver = pods.find(_.getMetadata.getLabels.get(SPARK_ROLE_LABEL) == SPARK_ROLE_DRIVER)
+ val executors =
+ pods.filter(_.getMetadata.getLabels.get(SPARK_ROLE_LABEL) == SPARK_ROLE_EXECUTOR)
+ val appLog = Try(
+ client.pods.inNamespace(app.getApplicationNamespace)
+ .withName(app.getApplicationPod.getMetadata.getName)
+ .tailingLines(cacheLogSize).getLog.split("\n").toIndexedSeq
+ ).getOrElse(IndexedSeq.empty)
+ val ingress = client.network.v1.ingresses.inNamespace(app.getApplicationNamespace)
+ .withLabel(SPARK_APP_TAG_LABEL, app.getApplicationTag)
+ .list.getItems.asScala.headOption
+ KubernetesAppReport(driver, executors, appLog, ingress, livyConf)
+ }
+
+ def createSparkUIIngress(app: KubernetesApplication, livyConf: LivyConf): Unit = {
+ val annotationsString = livyConf.get(LivyConf.KUBERNETES_INGRESS_ADDITIONAL_ANNOTATIONS)
+ var annotations: Seq[(String, String)] = Seq.empty
+ if (annotationsString != null && annotationsString.trim.nonEmpty) {
+ annotations = annotationsString
+ .split(";").map(_.split("="))
+ .map(array => array.head -> array.tail.mkString("=")).toSeq
+ }
+
+ val sparkUIIngress = buildSparkUIIngress(
+ app,
+ livyConf.get(LivyConf.KUBERNETES_INGRESS_CLASS_NAME),
+ livyConf.get(LivyConf.KUBERNETES_INGRESS_PROTOCOL),
+ livyConf.get(LivyConf.KUBERNETES_INGRESS_HOST),
+ livyConf.get(LivyConf.KUBERNETES_INGRESS_TLS_SECRET_NAME),
+ livyConf.get(LivyConf.KUBERNETES_INGRESS_ADDITIONAL_CONF_SNIPPET),
+ annotations: _*
+ )
+ val resources: Seq[HasMetadata] = Seq(sparkUIIngress)
+ addOwnerReference(app.getApplicationPod, resources: _*)
+ client.network.v1.ingresses.inNamespace(app.getApplicationNamespace).
+ createOrReplace(sparkUIIngress)
+ }
+
+ private[utils] def buildSparkUIIngress(
+ app: KubernetesApplication, className: String, protocol: String, host: String,
+ tlsSecretName: String, additionalConfSnippet: String, additionalAnnotations: (String, String)*
+ ): Ingress = {
+ val appTag = app.getApplicationTag
+ val serviceHost = s"${getServiceName(app)}.${app.getApplicationNamespace}.svc.cluster.local"
+
+ // Common annotations
+ val annotations = Map(
+ "nginx.ingress.kubernetes.io/rewrite-target" -> "/$1",
+ "nginx.ingress.kubernetes.io/proxy-redirect-to" -> s"/$appTag/",
+ "nginx.ingress.kubernetes.io/proxy-redirect-from" -> s"http://$serviceHost/",
+ "nginx.ingress.kubernetes.io/upstream-vhost" -> s"$serviceHost",
+ "nginx.ingress.kubernetes.io/service-upstream" -> "true",
+ "nginx.ingress.kubernetes.io/x-forwarded-prefix" -> s"/$appTag",
+ "nginx.ingress.kubernetes.io/configuration-snippet" ->
+ NGINX_CONFIG_SNIPPET.concat(additionalConfSnippet)
+ ) ++ additionalAnnotations
+
+ val builder = new IngressBuilder()
+ .withApiVersion("networking.k8s.io/v1")
+ .withNewMetadata()
+ .withName(getServiceName(app))
+ .withNamespace(app.getApplicationNamespace)
+ .addToAnnotations(annotations.asJava)
+ .addToLabels(SPARK_APP_TAG_LABEL, appTag)
+ .addToLabels(CREATED_BY_LIVY_LABEL.asJava)
+ .endMetadata()
+ .withNewSpec()
+ .withIngressClassName(className)
+ .addNewRule()
+ .withHost(host)
+ .withNewHttp()
+ .addNewPath()
+ .withPath(s"/$appTag/?(.*)")
+ .withPathType("ImplementationSpecific")
+ .withNewBackend()
+ .withNewService()
+ .withName(getServiceName(app))
+ .withNewPort()
+ .withName(SPARK_UI_PORT_NAME).endPort()
+ .endService()
+ .endBackend()
+ .endPath()
+ .endHttp()
+ .endRule()
+ if (protocol.endsWith("s") && tlsSecretName != null && tlsSecretName.nonEmpty) {
+ builder.addNewTl().withSecretName(tlsSecretName).addToHosts(host).endTl()
+ }
+ builder.endSpec().build()
+ }
+
+ private def getServiceName(app: KubernetesApplication): String =
+ StringUtils.stripEnd(
+ StringUtils.left(s"${app.getApplicationPod.getMetadata.getName}-svc", 63), "-"
+ ).toLowerCase
+
+ // Add a OwnerReference to the given resources making the driver pod an owner of them so when
+ // the driver pod is deleted, the resources are garbage collected.
+ private def addOwnerReference(owner: Pod, resources: HasMetadata*): Unit = {
+ val driverPodOwnerReference = new OwnerReferenceBuilder()
+ .withName(owner.getMetadata.getName)
+ .withApiVersion(owner.getApiVersion)
+ .withUid(owner.getMetadata.getUid)
+ .withKind(owner.getKind)
+ .withController(true)
+ .build()
+ resources.foreach {
+ resource =>
+ val originalMetadata = resource.getMetadata
+ originalMetadata.setOwnerReferences(Collections.singletonList(driverPodOwnerReference))
+ }
+ }
+
+ }
+
+}
+
+private[utils] object KubernetesClientFactory {
+ import java.io.File
+ import com.google.common.base.Charsets
+ import com.google.common.io.Files
+
+ private implicit class OptionString(val string: String) extends AnyVal {
+ def toOption: Option[String] = if (string == null || string.isEmpty) None else Option(string)
+ }
+
+ def createKubernetesClient(livyConf: LivyConf): DefaultKubernetesClient = {
+ val masterUrl = sparkMasterToKubernetesApi(livyConf.sparkMaster())
+
+ val oauthTokenFile = livyConf.get(LivyConf.KUBERNETES_OAUTH_TOKEN_FILE).toOption
+ val oauthTokenValue = livyConf.get(LivyConf.KUBERNETES_OAUTH_TOKEN_VALUE).toOption
+ require(oauthTokenFile.isEmpty || oauthTokenValue.isEmpty,
+ s"Cannot specify OAuth token through both " +
+ s"a file $oauthTokenFile and a value $oauthTokenValue.")
+
+ val caCertFile = livyConf.get(LivyConf.KUBERNETES_CA_CERT_FILE).toOption
+ val clientKeyFile = livyConf.get(LivyConf.KUBERNETES_CLIENT_KEY_FILE).toOption
+ val clientCertFile = livyConf.get(LivyConf.KUBERNETES_CLIENT_CERT_FILE).toOption
+
+ val config = new ConfigBuilder()
+ .withApiVersion("v1")
+ .withMasterUrl(masterUrl)
+ .withOption(oauthTokenValue) {
+ (token, configBuilder) => configBuilder.withOauthToken(token)
+ }
+ .withOption(oauthTokenFile) {
+ (file, configBuilder) =>
+ configBuilder
+ .withOauthToken(Files.toString(new File(file), Charsets.UTF_8))
+ }
+ .withOption(caCertFile) {
+ (file, configBuilder) => configBuilder.withCaCertFile(file)
+ }
+ .withOption(clientKeyFile) {
+ (file, configBuilder) => configBuilder.withClientKeyFile(file)
+ }
+ .withOption(clientCertFile) {
+ (file, configBuilder) => configBuilder.withClientCertFile(file)
+ }
+ .build()
+ new DefaultKubernetesClient(config)
+ }
+
+ def sparkMasterToKubernetesApi(sparkMaster: String): String = {
+ val replaced = sparkMaster.replaceFirst("k8s://", "")
+ if (!replaced.startsWith("http")) s"https://$replaced"
+ else replaced
+ }
+
+ private implicit class OptionConfigurableConfigBuilder(
+ val configBuilder: ConfigBuilder) extends AnyVal {
+ def withOption[T]
+ (option: Option[T])
+ (configurator: (T, ConfigBuilder) => ConfigBuilder): ConfigBuilder = {
+ option.map {
+ opt => configurator(opt, configBuilder)
+ }.getOrElse(configBuilder)
+ }
+ }
+
+}
diff --git a/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala b/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala
new file mode 100644
index 000000000..00257acd8
--- /dev/null
+++ b/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.livy.utils
+
+import java.util.Objects._
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.api.model.networking.v1.{Ingress, IngressRule, IngressSpec}
+import org.mockito.Mockito.when
+import org.scalatest.FunSpec
+import org.scalatestplus.mockito.MockitoSugar._
+
+import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf}
+import org.apache.livy.utils.KubernetesConstants.SPARK_APP_TAG_LABEL
+
+class SparkKubernetesAppSpec extends FunSpec with LivyBaseUnitTestSuite {
+
+ describe("KubernetesAppReport") {
+ import scala.collection.JavaConverters._
+
+ it("should return application state") {
+ val status = when(mock[PodStatus].getPhase).thenReturn("Status").getMock[PodStatus]
+ val driver = when(mock[Pod].getStatus).thenReturn(status).getMock[Pod]
+ assertResult("status") {
+ KubernetesAppReport(Option(driver), Seq.empty, IndexedSeq.empty, None, new LivyConf(false))
+ .getApplicationState
+ }
+ assertResult("unknown") {
+ KubernetesAppReport(None, Seq.empty, IndexedSeq.empty, None, new LivyConf(false))
+ .getApplicationState
+ }
+ }
+
+ def livyConf(lokiEnabled: Boolean): LivyConf = new LivyConf(false)
+ .set(LivyConf.KUBERNETES_GRAFANA_LOKI_ENABLED, lokiEnabled)
+
+ def podMockWithLabels(labelMap: Map[String, String]): Pod = {
+ val metaWithLabel = when(mock[ObjectMeta].getLabels).thenReturn(labelMap.asJava)
+ .getMock[ObjectMeta]
+ when(mock[Pod].getMetadata).thenReturn(metaWithLabel).getMock[Pod]
+ }
+
+ def driverMock(labelExists: Boolean): Option[Pod] = {
+ val labels = if (labelExists) Map(KubernetesConstants.SPARK_APP_TAG_LABEL -> "app_tag")
+ else Map.empty[String, String]
+ Some(podMockWithLabels(labels))
+ }
+
+ it("should return driver log url") {
+
+ def test(labelExists: Boolean, lokiEnabled: Boolean, shouldBeDefined: Boolean): Unit =
+ assertResult(shouldBeDefined) {
+ KubernetesAppReport(
+ driverMock(labelExists), Seq.empty, IndexedSeq.empty, None, livyConf(lokiEnabled)
+ ).getDriverLogUrl.isDefined
+ }
+
+ test(labelExists = false, lokiEnabled = false, shouldBeDefined = false)
+ test(labelExists = false, lokiEnabled = true, shouldBeDefined = false)
+ test(labelExists = true, lokiEnabled = false, shouldBeDefined = false)
+ test(labelExists = true, lokiEnabled = true, shouldBeDefined = true)
+ assert(KubernetesAppReport(None, Seq.empty, IndexedSeq.empty, None, livyConf(true))
+ .getDriverLogUrl.isEmpty)
+ }
+
+ it("should return executors log urls") {
+ def executorMock(labelsExist: Boolean): Option[Pod] = {
+ val labels = if (labelsExist) {
+ Map(KubernetesConstants.SPARK_APP_TAG_LABEL -> "app_tag",
+ KubernetesConstants.SPARK_EXEC_ID_LABEL -> "exec-1")
+ } else {
+ Map.empty[String, String]
+ }
+ Some(podMockWithLabels(labels))
+ }
+
+ def test(labelExists: Boolean, lokiEnabled: Boolean, shouldBeDefined: Boolean): Unit =
+ assertResult(shouldBeDefined) {
+ KubernetesAppReport(
+ None, Seq(executorMock(labelExists).get), IndexedSeq.empty, None, livyConf(lokiEnabled)
+ ).getExecutorsLogUrls.isDefined
+ }
+
+ test(labelExists = false, lokiEnabled = false, shouldBeDefined = false)
+ test(labelExists = false, lokiEnabled = true, shouldBeDefined = false)
+ test(labelExists = true, lokiEnabled = false, shouldBeDefined = false)
+ test(labelExists = true, lokiEnabled = true, shouldBeDefined = true)
+ assert(KubernetesAppReport(None, Seq.empty, IndexedSeq.empty, None, livyConf(true))
+ .getExecutorsLogUrls.isEmpty)
+ }
+
+ it("should return driver ingress url") {
+
+ def livyConf(protocol: Option[String]): LivyConf = {
+ val conf = new LivyConf()
+ protocol.map(conf.set(LivyConf.KUBERNETES_INGRESS_PROTOCOL, _)).getOrElse(conf)
+ }
+
+ def ingressMock(host: Option[String]): Ingress = {
+ val ingressRules = host.map(h =>
+ List(when(mock[IngressRule].getHost).thenReturn(h).getMock[IngressRule]))
+ .getOrElse(List.empty).asJava
+ val ingressSpec = when(mock[IngressSpec].getRules)
+ .thenReturn(ingressRules).getMock[IngressSpec]
+ when(mock[Ingress].getSpec).thenReturn(ingressSpec).getMock[Ingress]
+ }
+
+ def test(driver: Option[Pod], ingress: Option[Ingress],
+ protocol: Option[String], shouldBeDefined: Boolean): Unit = {
+ assertResult(shouldBeDefined) {
+ KubernetesAppReport(driver, Seq.empty, IndexedSeq.empty, ingress, livyConf(protocol))
+ .getTrackingUrl.isDefined
+ }
+ }
+
+ val hostname = Some("hostname")
+ val protocol = Some("protocol")
+
+ test(None, None, None, shouldBeDefined = false)
+ test(None, None, protocol, shouldBeDefined = false)
+ test(None, Some(ingressMock(None)), None, shouldBeDefined = false)
+ test(None, Some(ingressMock(None)), protocol, shouldBeDefined = false)
+ test(None, Some(ingressMock(hostname)), None, shouldBeDefined = false)
+ test(None, Some(ingressMock(hostname)), protocol, shouldBeDefined = false)
+
+ test(driverMock(true), None, None, shouldBeDefined = false)
+ test(driverMock(true), None, protocol, shouldBeDefined = false)
+ test(driverMock(true), Some(ingressMock(None)), None, shouldBeDefined = false)
+ test(driverMock(true), Some(ingressMock(None)), protocol, shouldBeDefined = false)
+ test(driverMock(true), Some(ingressMock(hostname)), None, shouldBeDefined = true)
+ test(driverMock(true), Some(ingressMock(hostname)), protocol, shouldBeDefined = true)
+
+ test(driverMock(false), None, None, shouldBeDefined = false)
+ test(driverMock(false), None, protocol, shouldBeDefined = false)
+ test(driverMock(false), Some(ingressMock(None)), None, shouldBeDefined = false)
+ test(driverMock(false), Some(ingressMock(None)), protocol, shouldBeDefined = false)
+ test(driverMock(false), Some(ingressMock(hostname)), None, shouldBeDefined = true)
+ test(driverMock(false), Some(ingressMock(hostname)), protocol, shouldBeDefined = true)
+
+ assertResult(s"${protocol.get}://${hostname.get}/app_tag") {
+ KubernetesAppReport(driverMock(true), Seq.empty, IndexedSeq.empty,
+ Some(ingressMock(hostname)), livyConf(protocol)).getTrackingUrl.get
+ }
+ assertResult(s"${protocol.get}://${hostname.get}/unknown") {
+ KubernetesAppReport(driverMock(false), Seq.empty, IndexedSeq.empty,
+ Some(ingressMock(hostname)), livyConf(protocol)).getTrackingUrl.get
+ }
+ }
+
+ }
+
+ describe("KubernetesClientFactory") {
+ it("should build KubernetesApi url from LivyConf masterUrl") {
+ def actual(sparkMaster: String): String =
+ KubernetesClientFactory.sparkMasterToKubernetesApi(sparkMaster)
+
+ val masterUrl = "kubernetes.default.svc:443"
+
+ assertResult(s"https://local")(actual(s"https://local"))
+ assertResult(s"https://$masterUrl")(actual(s"k8s://$masterUrl"))
+ assertResult(s"http://$masterUrl")(actual(s"k8s://http://$masterUrl"))
+ assertResult(s"https://$masterUrl")(actual(s"k8s://https://$masterUrl"))
+ assertResult(s"http://$masterUrl")(actual(s"http://$masterUrl"))
+ assertResult(s"https://$masterUrl")(actual(s"https://$masterUrl"))
+ }
+
+ it("should create KubernetesClient with default configs") {
+ assert(nonNull(KubernetesClientFactory.createKubernetesClient(new LivyConf(false))))
+ }
+
+ it("should throw IllegalArgumentException in both oauth file and token provided") {
+ val conf = new LivyConf(false)
+ .set(LivyConf.KUBERNETES_OAUTH_TOKEN_FILE, "dummy_path")
+ .set(LivyConf.KUBERNETES_OAUTH_TOKEN_VALUE, "dummy_value")
+ intercept[IllegalArgumentException] {
+ KubernetesClientFactory.createKubernetesClient(conf)
+ }
+ }
+ }
+
+ describe("KubernetesClientExtensions") {
+ it("should build an ingress from the supplied KubernetesApplication") {
+ def test(app: KubernetesApplication, expectedAnnotations: Map[String, String]): Unit = {
+ import scala.collection.JavaConverters._
+ val livyConf = new LivyConf(false)
+ val client = KubernetesClientFactory.createKubernetesClient(livyConf)
+ val clientExtensions = KubernetesExtensions.KubernetesClientExtensions(client)
+ val ingress = clientExtensions.buildSparkUIIngress(app, "ingress-class", "https",
+ "cluster.example.com", "tlsSecret", "")
+ val diff = expectedAnnotations.toSet diff ingress.getMetadata.getAnnotations.asScala.toSet
+ assert(ingress.getMetadata.getName == s"${app.getApplicationPod.getMetadata.getName}-svc")
+ assert(diff.isEmpty)
+ }
+
+ def mockPod(name: String, namespace: String, tag: String): Pod = {
+ new PodBuilder().withNewMetadata().withName(name).withNamespace(namespace).
+ addToLabels(SPARK_APP_TAG_LABEL, tag).endMetadata().withNewSpec().endSpec().build()
+ }
+
+ def app(name: String, namespace: String, tag: String): KubernetesApplication = {
+ new KubernetesApplication(mockPod(name, namespace, tag))
+ }
+
+ test(app("app1", "ns-1", "tag-1"), Map(
+ "nginx.ingress.kubernetes.io/rewrite-target" -> "/$1",
+ "nginx.ingress.kubernetes.io/proxy-redirect-to" -> s"/tag-1/",
+ "nginx.ingress.kubernetes.io/x-forwarded-prefix" -> s"/tag-1",
+ "nginx.ingress.kubernetes.io/proxy-redirect-from" ->
+ s"http://app1-svc.ns-1.svc.cluster.local/",
+ "nginx.ingress.kubernetes.io/upstream-vhost" ->
+ s"app1-svc.ns-1.svc.cluster.local",
+ "nginx.ingress.kubernetes.io/service-upstream" -> "true"
+ ))
+
+ test(app("app2", "ns-2", "tag-2"), Map(
+ "nginx.ingress.kubernetes.io/rewrite-target" -> "/$1",
+ "nginx.ingress.kubernetes.io/proxy-redirect-to" -> s"/tag-2/",
+ "nginx.ingress.kubernetes.io/x-forwarded-prefix" -> s"/tag-2",
+ "nginx.ingress.kubernetes.io/proxy-redirect-from" ->
+ s"http://app2-svc.ns-2.svc.cluster.local/",
+ "nginx.ingress.kubernetes.io/upstream-vhost" ->
+ s"app2-svc.ns-2.svc.cluster.local",
+ "nginx.ingress.kubernetes.io/service-upstream" -> "true"
+ ))
+ }
+ }
+
+}