From 2fe69111637e1724edbb553333b903c8feecdf95 Mon Sep 17 00:00:00 2001 From: haripriya-b Date: Wed, 6 Jul 2022 13:23:22 -0700 Subject: [PATCH 1/4] add apis to retrieve session id and session app id --- .../main/java/org/apache/livy/LivyClient.java | 14 +++++++ .../org/apache/livy/TestClientFactory.java | 5 +++ .../apache/livy/client/http/HttpClient.java | 42 +++++++++---------- .../livy/client/http/HttpClientSpec.scala | 37 ++++++++++++++-- rsc/pom.xml | 6 +++ .../java/org/apache/livy/rsc/RSCClient.java | 37 +++++++++------- 6 files changed, 100 insertions(+), 41 deletions(-) diff --git a/api/src/main/java/org/apache/livy/LivyClient.java b/api/src/main/java/org/apache/livy/LivyClient.java index fc03a1f7a..5181fba45 100644 --- a/api/src/main/java/org/apache/livy/LivyClient.java +++ b/api/src/main/java/org/apache/livy/LivyClient.java @@ -107,4 +107,18 @@ public interface LivyClient { */ Future addFile(URI uri); + /** + * Retrieves the session id internally created by Livy. + * + * @return An integer representing the session id of the running session + */ + int getSessionId(); + + /** + * Retrieves the internally generated session app id. + * + * @return A string representing the livy session app id. + */ + String getSessionAppId(); + } diff --git a/api/src/test/java/org/apache/livy/TestClientFactory.java b/api/src/test/java/org/apache/livy/TestClientFactory.java index 622908c04..793a87b4f 100644 --- a/api/src/test/java/org/apache/livy/TestClientFactory.java +++ b/api/src/test/java/org/apache/livy/TestClientFactory.java @@ -91,6 +91,11 @@ public Future addFile(URI uri) { throw new UnsupportedOperationException(); } + @Override + public int getSessionId(){throw new UnsupportedOperationException();} + + @Override + public String getSessionAppId(){throw new UnsupportedOperationException();} } } diff --git a/client-http/src/main/java/org/apache/livy/client/http/HttpClient.java b/client-http/src/main/java/org/apache/livy/client/http/HttpClient.java index f40148f94..b054e30ff 100644 --- a/client-http/src/main/java/org/apache/livy/client/http/HttpClient.java +++ b/client-http/src/main/java/org/apache/livy/client/http/HttpClient.java @@ -17,23 +17,20 @@ package org.apache.livy.client.http; +import org.apache.livy.Job; +import org.apache.livy.JobHandle; +import org.apache.livy.LivyClient; +import org.apache.livy.client.common.Serializer; + import java.io.File; import java.net.URI; import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.Callable; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; +import java.util.concurrent.*; import java.util.regex.Matcher; import java.util.regex.Pattern; -import org.apache.livy.Job; -import org.apache.livy.JobHandle; -import org.apache.livy.LivyClient; -import org.apache.livy.client.common.Serializer; import static org.apache.livy.client.common.HttpMessages.*; /** @@ -44,7 +41,7 @@ class HttpClient implements LivyClient { private final HttpConf config; private final LivyConnection conn; - private final int sessionId; + private final SessionInfo session; private final ScheduledExecutorService executor; private final Serializer serializer; @@ -66,8 +63,8 @@ class HttpClient implements LivyClient { m.group(1), uri.getQuery(), uri.getFragment()); this.conn = new LivyConnection(base, httpConf); - this.sessionId = Integer.parseInt(m.group(2)); - conn.post(null, SessionInfo.class, "/%d/connect", sessionId); + int sessionId = Integer.parseInt(m.group(2)); + session = conn.post(null, SessionInfo.class, "/%d/connect", sessionId); } else { Map sessionConf = new HashMap<>(); for (Map.Entry e : config) { @@ -76,7 +73,7 @@ class HttpClient implements LivyClient { ClientMessage create = new CreateClientRequest(sessionConf); this.conn = new LivyConnection(uri, httpConf); - this.sessionId = conn.post(create, SessionInfo.class, "/").id; + this.session = conn.post(create, SessionInfo.class, "/"); } } catch (Exception e) { throw propagate(e); @@ -87,7 +84,7 @@ class HttpClient implements LivyClient { this.executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { @Override public Thread newThread(Runnable r) { - Thread t = new Thread(r, "HttpClient-" + sessionId); + Thread t = new Thread(r, "HttpClient-" + session.id); t.setDaemon(true); return t; } @@ -112,7 +109,7 @@ public synchronized void stop(boolean shutdownContext) { executor.shutdownNow(); try { if (shutdownContext) { - conn.delete(Map.class, "/%s", sessionId); + conn.delete(Map.class, "/%s", session.id); } } catch (Exception e) { throw propagate(e); @@ -149,7 +146,7 @@ private Future uploadResource(final File file, final String command, final St Callable task = new Callable() { @Override public Void call() throws Exception { - conn.post(file, Void.class, paramName, "/%d/%s", sessionId, command); + conn.post(file, Void.class, paramName, "/%d/%s", session.id, command); return null; } }; @@ -161,7 +158,7 @@ private Future addResource(final String command, final URI resource) { @Override public Void call() throws Exception { ClientMessage msg = new AddResource(resource.toString()); - conn.post(msg, Void.class, "/%d/%s", sessionId, command); + conn.post(msg, Void.class, "/%d/%s", session.id, command); return null; } }; @@ -170,7 +167,7 @@ public Void call() throws Exception { private JobHandleImpl sendJob(final String command, Job job) { final ByteBuffer serializedJob = serializer.serialize(job); - JobHandleImpl handle = new JobHandleImpl(config, conn, sessionId, executor, serializer); + JobHandleImpl handle = new JobHandleImpl(config, conn, session.id, executor, serializer); handle.start(command, serializedJob); return handle; } @@ -183,9 +180,12 @@ private RuntimeException propagate(Exception cause) { } } - // For testing. - int getSessionId() { - return sessionId; + + public int getSessionId() { + return session.id; } + + public String getSessionAppId() { return session.appId;} + } diff --git a/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala b/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala index f53d9f5b4..d4004f533 100644 --- a/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala +++ b/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala @@ -24,16 +24,13 @@ import java.util.concurrent.{Future => JFuture, _} import java.util.concurrent.atomic.AtomicLong import javax.servlet.ServletContext import javax.servlet.http.HttpServletRequest - import scala.concurrent.{ExecutionContext, Future} - import org.mockito.ArgumentCaptor import org.mockito.Matchers.{eq => meq, _} import org.mockito.Mockito._ import org.scalatest.{BeforeAndAfterAll, FunSpecLike} import org.scalatra.LifeCycle import org.scalatra.servlet.ScalatraListener - import org.apache.livy._ import org.apache.livy.client.common.{BufferUtils, Serializer} import org.apache.livy.client.common.HttpMessages._ @@ -43,6 +40,7 @@ import org.apache.livy.server.recovery.SessionStore import org.apache.livy.sessions.{InteractiveSessionManager, SessionState, Spark} import org.apache.livy.test.jobs.Echo import org.apache.livy.utils.AppInfo +import org.mockito.stubbing.OngoingStubbing /** * The test for the HTTP client is written in Scala so we can reuse the code in the livy-server @@ -181,6 +179,34 @@ class HttpClientSpec extends FunSpecLike with BeforeAndAfterAll with LivyBaseUni testJob(false, response = Some(null)) } + withClient("should retrieve session id and sessionAppID") { + var id = client.getSessionId() + Console.println("SESSION ID: " + id) + assert(id === sessionId) + + var appId = client.getSessionAppId() + Console.println("SESSION APP ID: " + appId) + assert(Some(appId) === session.appId) + + } + + withClient("should retrieve session id when reconnecting to a session") { + var sid = client.asInstanceOf[HttpClient].getSessionId() + val uri = s"http://${InetAddress.getLocalHost.getHostAddress}:${server.port}" + + s"${LivyConnection.SESSIONS_URI}/$sid" + val newClient = new LivyClientBuilder(false).setURI(new URI(uri)).build() + newClient.stop(false) + + var id = client.getSessionId() + Console.println("SESSION ID: " + id) + assert(id === sessionId) + + var appId = client.getSessionAppId() + Console.println("SESSION APP ID: " + appId) + assert(Some(appId) === session.appId) + + } + withClient("should connect to existing sessions") { var sid = client.asInstanceOf[HttpClient].getSessionId() val uri = s"http://${InetAddress.getLocalHost.getHostAddress}:${server.port}" + @@ -256,6 +282,7 @@ private object HttpClientSpec { // Hack warning: keep the session object available so that individual tests can mock // the desired behavior before making requests to the server. var session: InteractiveSession = _ + var sessionId: Int = _ } @@ -272,9 +299,10 @@ private class HttpClientTestBootstrap extends LifeCycle { override protected def createSession(req: HttpServletRequest): InteractiveSession = { val session = mock(classOf[InteractiveSession]) val id = sessionManager.nextId() + val sessionAppId: String = "ASD" when(session.id).thenReturn(id) when(session.name).thenReturn(None) - when(session.appId).thenReturn(None) + when(session.appId).thenReturn(Some(sessionAppId)) when(session.appInfo).thenReturn(AppInfo()) when(session.state).thenReturn(SessionState.Idle) when(session.proxyUser).thenReturn(None) @@ -282,6 +310,7 @@ private class HttpClientTestBootstrap extends LifeCycle { when(session.stop()).thenReturn(Future.successful(())) require(HttpClientSpec.session == null, "Session already created?") HttpClientSpec.session = session + HttpClientSpec.sessionId = id session } } diff --git a/rsc/pom.xml b/rsc/pom.xml index 833ad8366..613de06f6 100644 --- a/rsc/pom.xml +++ b/rsc/pom.xml @@ -121,6 +121,12 @@ slf4j-api provided + + org.apache.livy + livy-api + 0.8.0-incubating-SNAPSHOT + compile + diff --git a/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java b/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java index c1c953400..47cb35ecd 100644 --- a/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java +++ b/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java @@ -17,26 +17,12 @@ package org.apache.livy.rsc; -import java.io.File; -import java.io.IOException; -import java.net.URI; -import java.nio.ByteBuffer; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - import io.netty.channel.ChannelHandlerContext; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.concurrent.ImmediateEventExecutor; import io.netty.util.concurrent.Promise; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import org.apache.livy.Job; import org.apache.livy.JobHandle; import org.apache.livy.LivyClient; @@ -45,8 +31,22 @@ import org.apache.livy.rsc.driver.AddJarJob; import org.apache.livy.rsc.rpc.Rpc; import org.apache.livy.sessions.SessionState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import static org.apache.livy.rsc.RSCConf.Entry.*; +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.livy.rsc.RSCConf.Entry.CLIENT_SHUTDOWN_TIMEOUT; +import static org.apache.livy.rsc.RSCConf.Entry.RPC_MAX_THREADS; public class RSCClient implements LivyClient { private static final Logger LOG = LoggerFactory.getLogger(RSCClient.class); @@ -426,10 +426,15 @@ private void handle(ChannelHandlerContext ctx, ReplState msg) { LOG.trace("Received repl state for {}", msg.state); // Update last activity timestamp when state change is from busy to idle. if (SessionState.Busy$.MODULE$.state().equals(replState) && msg != null && - SessionState.Idle$.MODULE$.state().equals(msg.state)) { + SessionState.Idle$.MODULE$.state().equals(msg.state)) { replLastActivity = System.nanoTime(); } replState = msg.state; } } + @Override + public String getSessionAppId(){throw new UnsupportedOperationException();} + + @Override + public int getSessionId(){throw new UnsupportedOperationException();} } From 6494d0cc96e36576a3dbc20a652a33b68b4c5f3e Mon Sep 17 00:00:00 2001 From: haripriya-b Date: Tue, 3 Jan 2023 16:49:15 -0800 Subject: [PATCH 2/4] Importing only required classes from packages. --- .../main/java/org/apache/livy/client/http/HttpClient.java | 6 +++++- .../scala/org/apache/livy/client/http/HttpClientSpec.scala | 4 ---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/client-http/src/main/java/org/apache/livy/client/http/HttpClient.java b/client-http/src/main/java/org/apache/livy/client/http/HttpClient.java index b054e30ff..7d9c148bf 100644 --- a/client-http/src/main/java/org/apache/livy/client/http/HttpClient.java +++ b/client-http/src/main/java/org/apache/livy/client/http/HttpClient.java @@ -27,7 +27,11 @@ import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.*; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.Callable; import java.util.regex.Matcher; import java.util.regex.Pattern; diff --git a/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala b/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala index d4004f533..82a23314f 100644 --- a/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala +++ b/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala @@ -181,11 +181,9 @@ class HttpClientSpec extends FunSpecLike with BeforeAndAfterAll with LivyBaseUni withClient("should retrieve session id and sessionAppID") { var id = client.getSessionId() - Console.println("SESSION ID: " + id) assert(id === sessionId) var appId = client.getSessionAppId() - Console.println("SESSION APP ID: " + appId) assert(Some(appId) === session.appId) } @@ -198,11 +196,9 @@ class HttpClientSpec extends FunSpecLike with BeforeAndAfterAll with LivyBaseUni newClient.stop(false) var id = client.getSessionId() - Console.println("SESSION ID: " + id) assert(id === sessionId) var appId = client.getSessionAppId() - Console.println("SESSION APP ID: " + appId) assert(Some(appId) === session.appId) } From d19867a8c4188ec17c84b88fa5c32db7c1356ae8 Mon Sep 17 00:00:00 2001 From: haripriya-b Date: Wed, 4 Jan 2023 12:35:06 -0800 Subject: [PATCH 3/4] Fixing order of imports --- .../apache/livy/client/http/HttpClient.java | 11 ++++---- .../java/org/apache/livy/rsc/RSCClient.java | 27 ++++++++++--------- 2 files changed, 20 insertions(+), 18 deletions(-) diff --git a/client-http/src/main/java/org/apache/livy/client/http/HttpClient.java b/client-http/src/main/java/org/apache/livy/client/http/HttpClient.java index 7d9c148bf..7587d6ec3 100644 --- a/client-http/src/main/java/org/apache/livy/client/http/HttpClient.java +++ b/client-http/src/main/java/org/apache/livy/client/http/HttpClient.java @@ -17,24 +17,23 @@ package org.apache.livy.client.http; -import org.apache.livy.Job; -import org.apache.livy.JobHandle; -import org.apache.livy.LivyClient; -import org.apache.livy.client.common.Serializer; - import java.io.File; import java.net.URI; import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.Callable; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; -import java.util.concurrent.Callable; import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.apache.livy.Job; +import org.apache.livy.JobHandle; +import org.apache.livy.LivyClient; +import org.apache.livy.client.common.Serializer; import static org.apache.livy.client.common.HttpMessages.*; /** diff --git a/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java b/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java index 47cb35ecd..cf7a92739 100644 --- a/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java +++ b/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java @@ -17,12 +17,26 @@ package org.apache.livy.rsc; +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + import io.netty.channel.ChannelHandlerContext; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.concurrent.ImmediateEventExecutor; import io.netty.util.concurrent.Promise; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.livy.Job; import org.apache.livy.JobHandle; import org.apache.livy.LivyClient; @@ -31,19 +45,8 @@ import org.apache.livy.rsc.driver.AddJarJob; import org.apache.livy.rsc.rpc.Rpc; import org.apache.livy.sessions.SessionState; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.IOException; -import java.net.URI; -import java.nio.ByteBuffer; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; + import static org.apache.livy.rsc.RSCConf.Entry.CLIENT_SHUTDOWN_TIMEOUT; import static org.apache.livy.rsc.RSCConf.Entry.RPC_MAX_THREADS; From 2369bdc89d7e2c3a5e0f3946ca5de6acf6117fee Mon Sep 17 00:00:00 2001 From: haripriya-b Date: Wed, 4 Jan 2023 14:13:13 -0800 Subject: [PATCH 4/4] Adding spaces b/w imports --- .../scala/org/apache/livy/client/http/HttpClientSpec.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala b/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala index 82a23314f..dcd46ec86 100644 --- a/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala +++ b/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala @@ -24,13 +24,16 @@ import java.util.concurrent.{Future => JFuture, _} import java.util.concurrent.atomic.AtomicLong import javax.servlet.ServletContext import javax.servlet.http.HttpServletRequest + import scala.concurrent.{ExecutionContext, Future} + import org.mockito.ArgumentCaptor import org.mockito.Matchers.{eq => meq, _} import org.mockito.Mockito._ import org.scalatest.{BeforeAndAfterAll, FunSpecLike} import org.scalatra.LifeCycle import org.scalatra.servlet.ScalatraListener + import org.apache.livy._ import org.apache.livy.client.common.{BufferUtils, Serializer} import org.apache.livy.client.common.HttpMessages._ @@ -40,7 +43,6 @@ import org.apache.livy.server.recovery.SessionStore import org.apache.livy.sessions.{InteractiveSessionManager, SessionState, Spark} import org.apache.livy.test.jobs.Echo import org.apache.livy.utils.AppInfo -import org.mockito.stubbing.OngoingStubbing /** * The test for the HTTP client is written in Scala so we can reuse the code in the livy-server