diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/worker/TaskWorkerServiceTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/worker/TaskWorkerServiceTest.java index 1404cbc5d6bb..5ab6d604b40a 100644 --- a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/worker/TaskWorkerServiceTest.java +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/worker/TaskWorkerServiceTest.java @@ -36,6 +36,7 @@ import io.cdap.common.http.HttpResponse; import io.netty.handler.codec.http.HttpResponseStatus; import java.io.IOException; +import java.net.ConnectException; import java.net.HttpURLConnection; import java.net.InetSocketAddress; import java.net.URI; @@ -44,6 +45,7 @@ import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -60,6 +62,7 @@ * Unit test for {@link TaskWorkerService}. */ public class TaskWorkerServiceTest { + @ClassRule public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder(); @@ -91,9 +94,11 @@ public void beforeTest() { InMemoryDiscoveryService discoveryService = new InMemoryDiscoveryService(); TaskWorkerService taskWorkerService = new TaskWorkerService( - cConf, sConf, discoveryService, discoveryService, metricsCollectionService, - new CommonNettyHttpServiceFactory(cConf, metricsCollectionService)); - serviceCompletionFuture = TaskWorkerTestUtil.getServiceCompletionFuture(taskWorkerService); + cConf, sConf, discoveryService, discoveryService, + metricsCollectionService, + new CommonNettyHttpServiceFactory(cConf, metricsCollectionService)); + serviceCompletionFuture = TaskWorkerTestUtil.getServiceCompletionFuture( + taskWorkerService); // start the service taskWorkerService.startAndWait(); this.taskWorkerService = taskWorkerService; @@ -116,9 +121,11 @@ public void testPeriodicRestart() { InMemoryDiscoveryService discoveryService = new InMemoryDiscoveryService(); TaskWorkerService taskWorkerService = new TaskWorkerService( - cConf, sConf, discoveryService, discoveryService, metricsCollectionService, - new CommonNettyHttpServiceFactory(cConf, metricsCollectionService)); - serviceCompletionFuture = TaskWorkerTestUtil.getServiceCompletionFuture(taskWorkerService); + cConf, sConf, discoveryService, discoveryService, + metricsCollectionService, + new CommonNettyHttpServiceFactory(cConf, metricsCollectionService)); + serviceCompletionFuture = TaskWorkerTestUtil.getServiceCompletionFuture( + taskWorkerService); // start the service taskWorkerService.startAndWait(); @@ -135,24 +142,28 @@ public void testPeriodicRestartWithInflightRequest() throws IOException { InMemoryDiscoveryService discoveryService = new InMemoryDiscoveryService(); TaskWorkerService taskWorkerService = new TaskWorkerService( - cConf, sConf, discoveryService, discoveryService, metricsCollectionService, - new CommonNettyHttpServiceFactory(cConf, metricsCollectionService)); - serviceCompletionFuture = TaskWorkerTestUtil.getServiceCompletionFuture(taskWorkerService); + cConf, sConf, discoveryService, discoveryService, + metricsCollectionService, + new CommonNettyHttpServiceFactory(cConf, metricsCollectionService)); + serviceCompletionFuture = TaskWorkerTestUtil.getServiceCompletionFuture( + taskWorkerService); // start the service taskWorkerService.startAndWait(); InetSocketAddress addr = taskWorkerService.getBindAddress(); - URI uri = URI.create(String.format("http://%s:%s", addr.getHostName(), addr.getPort())); + URI uri = URI.create( + String.format("http://%s:%s", addr.getHostName(), addr.getPort())); // Post valid request String want = "5000"; - RunnableTaskRequest req = RunnableTaskRequest.getBuilder(TestRunnableClass.class.getName()) + RunnableTaskRequest req = RunnableTaskRequest.getBuilder( + TestRunnableClass.class.getName()) .withParam(want).withNamespace("testNamespace").build(); String reqBody = GSON.toJson(req); HttpResponse response = HttpRequests.execute( - HttpRequest.post(uri.resolve("/v3Internal/worker/run").toURL()) - .withBody(reqBody).build(), - new DefaultHttpRequestConfig(false)); + HttpRequest.post(uri.resolve("/v3Internal/worker/run").toURL()) + .withBody(reqBody).build(), + new DefaultHttpRequestConfig(false)); Assert.assertEquals(HttpURLConnection.HTTP_OK, response.getResponseCode()); Assert.assertEquals(want, response.getResponseBodyAsString()); @@ -161,11 +172,12 @@ public void testPeriodicRestartWithInflightRequest() throws IOException { } @Test - public void testPeriodicRestartWithNeverEndingInflightRequest() throws IOException { + public void testPeriodicRestartWithNeverEndingInflightRequest() { CConfiguration cConf = createCConf(); SConfiguration sConf = createSConf(); cConf.setInt(Constants.TaskWorker.CONTAINER_KILL_AFTER_REQUEST_COUNT, 10); cConf.setInt(Constants.TaskWorker.CONTAINER_KILL_AFTER_DURATION_SECOND, 2); + cConf.setInt(TaskWorker.TASK_EXECUTION_DEADLINE_SECOND, -1); InMemoryDiscoveryService discoveryService = new InMemoryDiscoveryService(); TaskWorkerService taskWorkerService = @@ -176,18 +188,21 @@ public void testPeriodicRestartWithNeverEndingInflightRequest() throws IOExcepti discoveryService, metricsCollectionService, new CommonNettyHttpServiceFactory(cConf, metricsCollectionService)); - serviceCompletionFuture = TaskWorkerTestUtil.getServiceCompletionFuture(taskWorkerService); + serviceCompletionFuture = TaskWorkerTestUtil.getServiceCompletionFuture( + taskWorkerService); // start the service taskWorkerService.startAndWait(); new Thread( () -> { InetSocketAddress addr = taskWorkerService.getBindAddress(); - URI uri = URI.create(String.format("http://%s:%s", addr.getHostName(), addr.getPort())); + URI uri = URI.create(String.format("http://%s:%s", addr.getHostName(), + addr.getPort())); // Post valid request RunnableTaskRequest req = RunnableTaskRequest.getBuilder(TestRunnableClass.class.getName()) .withParam("200000") + .withNamespace("testNamespace") .build(); String reqBody = GSON.toJson(req); try { @@ -205,6 +220,7 @@ public void testPeriodicRestartWithNeverEndingInflightRequest() throws IOExcepti TaskWorkerTestUtil.waitForServiceCompletion(serviceCompletionFuture); Assert.assertEquals(Service.State.TERMINATED, taskWorkerService.state()); } + @Test public void testRestartAfterMultipleExecutions() throws IOException { CConfiguration cConf = createCConf(); @@ -214,29 +230,33 @@ public void testRestartAfterMultipleExecutions() throws IOException { InMemoryDiscoveryService discoveryService = new InMemoryDiscoveryService(); TaskWorkerService taskWorkerService = new TaskWorkerService( - cConf, sConf, discoveryService, discoveryService, metricsCollectionService, - new CommonNettyHttpServiceFactory(cConf, metricsCollectionService)); - serviceCompletionFuture = TaskWorkerTestUtil.getServiceCompletionFuture(taskWorkerService); + cConf, sConf, discoveryService, discoveryService, + metricsCollectionService, + new CommonNettyHttpServiceFactory(cConf, metricsCollectionService)); + serviceCompletionFuture = TaskWorkerTestUtil.getServiceCompletionFuture( + taskWorkerService); // start the service taskWorkerService.startAndWait(); InetSocketAddress addr = taskWorkerService.getBindAddress(); - URI uri = URI.create(String.format("http://%s:%s", addr.getHostName(), addr.getPort())); + URI uri = URI.create( + String.format("http://%s:%s", addr.getHostName(), addr.getPort())); // Post valid request String want = "100"; - RunnableTaskRequest req = RunnableTaskRequest.getBuilder(TestRunnableClass.class.getName()) + RunnableTaskRequest req = RunnableTaskRequest.getBuilder( + TestRunnableClass.class.getName()) .withParam(want).withNamespace("testNamespace").build(); String reqBody = GSON.toJson(req); - HttpResponse response = HttpRequests.execute( - HttpRequest.post(uri.resolve("/v3Internal/worker/run").toURL()) - .withBody(reqBody).build(), - new DefaultHttpRequestConfig(false)); + HttpRequests.execute( + HttpRequest.post(uri.resolve("/v3Internal/worker/run").toURL()) + .withBody(reqBody).build(), + new DefaultHttpRequestConfig(false)); - response = HttpRequests.execute( - HttpRequest.post(uri.resolve("/v3Internal/worker/run").toURL()) - .withBody(reqBody).build(), - new DefaultHttpRequestConfig(false)); + HttpRequests.execute( + HttpRequest.post(uri.resolve("/v3Internal/worker/run").toURL()) + .withBody(reqBody).build(), + new DefaultHttpRequestConfig(false)); TaskWorkerTestUtil.waitForServiceCompletion(serviceCompletionFuture); Assert.assertEquals(Service.State.TERMINATED, taskWorkerService.state()); @@ -245,17 +265,19 @@ public void testRestartAfterMultipleExecutions() throws IOException { @Test public void testStartAndStopWithValidRequest() throws IOException { InetSocketAddress addr = taskWorkerService.getBindAddress(); - URI uri = URI.create(String.format("http://%s:%s", addr.getHostName(), addr.getPort())); + URI uri = URI.create( + String.format("http://%s:%s", addr.getHostName(), addr.getPort())); // Post valid request String want = "100"; - RunnableTaskRequest req = RunnableTaskRequest.getBuilder(TestRunnableClass.class.getName()) + RunnableTaskRequest req = RunnableTaskRequest.getBuilder( + TestRunnableClass.class.getName()) .withParam(want).withNamespace("testNamespace").build(); String reqBody = GSON.toJson(req); HttpResponse response = HttpRequests.execute( - HttpRequest.post(uri.resolve("/v3Internal/worker/run").toURL()) - .withBody(reqBody).build(), - new DefaultHttpRequestConfig(false)); + HttpRequest.post(uri.resolve("/v3Internal/worker/run").toURL()) + .withBody(reqBody).build(), + new DefaultHttpRequestConfig(false)); TaskWorkerTestUtil.waitForServiceCompletion(serviceCompletionFuture); Assert.assertEquals(HttpURLConnection.HTTP_OK, response.getResponseCode()); Assert.assertEquals(want, response.getResponseBodyAsString()); @@ -265,20 +287,24 @@ public void testStartAndStopWithValidRequest() throws IOException { @Test public void testStartAndStopWithInvalidRequest() throws Exception { InetSocketAddress addr = taskWorkerService.getBindAddress(); - URI uri = URI.create(String.format("http://%s:%s", addr.getHostName(), addr.getPort())); + URI uri = URI.create( + String.format("http://%s:%s", addr.getHostName(), addr.getPort())); // Post invalid request RunnableTaskRequest noClassReq = RunnableTaskRequest.getBuilder("NoClass") .withNamespace("testNamespace").withParam("100").build(); String reqBody = GSON.toJson(noClassReq); HttpResponse response = HttpRequests.execute( - HttpRequest.post(uri.resolve("/v3Internal/worker/run").toURL()) - .withBody(reqBody).build(), - new DefaultHttpRequestConfig(false)); - Assert.assertEquals(HttpURLConnection.HTTP_BAD_REQUEST, response.getResponseCode()); + HttpRequest.post(uri.resolve("/v3Internal/worker/run").toURL()) + .withBody(reqBody).build(), + new DefaultHttpRequestConfig(false)); + Assert.assertEquals(HttpURLConnection.HTTP_BAD_REQUEST, + response.getResponseCode()); BasicThrowable basicThrowable; - basicThrowable = GSON.fromJson(response.getResponseBodyAsString(), BasicThrowable.class); - Assert.assertTrue(basicThrowable.getClassName().contains("java.lang.ClassNotFoundException")); + basicThrowable = GSON.fromJson(response.getResponseBodyAsString(), + BasicThrowable.class); + Assert.assertTrue(basicThrowable.getClassName() + .contains("java.lang.ClassNotFoundException")); Assert.assertNotNull(basicThrowable.getMessage()); Assert.assertTrue(basicThrowable.getMessage().contains("NoClass")); Assert.assertNotEquals(basicThrowable.getStackTraces().length, 0); @@ -300,18 +326,20 @@ public void testConcurrentRequestsWithIsolationEnabled() throws Exception { for (int i = 0; i < concurrentRequests; i++) { calls.add( - () -> HttpRequests.execute( - HttpRequest.post(uri.resolve("/v3Internal/worker/run").toURL()) - .withBody(reqBody).build(), - new DefaultHttpRequestConfig(false)) + () -> HttpRequests.execute( + HttpRequest.post(uri.resolve("/v3Internal/worker/run").toURL()) + .withBody(reqBody).build(), + new DefaultHttpRequestConfig(false)) ); } - List> responses = Executors.newFixedThreadPool(concurrentRequests).invokeAll(calls); + List> responses = Executors.newFixedThreadPool( + concurrentRequests).invokeAll(calls); int okResponse = 0; int conflictResponse = 0; for (int i = 0; i < concurrentRequests; i++) { - if (responses.get(i).get().getResponseCode() == HttpResponseStatus.OK.code()) { + if (responses.get(i).get().getResponseCode() + == HttpResponseStatus.OK.code()) { okResponse++; } else if (responses.get(i).get().getResponseCode() == HttpResponseStatus.TOO_MANY_REQUESTS.code()) { @@ -371,7 +399,7 @@ public void testConcurrentRequestsWithIsolationDisabled() throws Exception { } // Verify that the task worker service doesn't stop automatically. try { - Tasks.waitFor(false, () -> taskWorkerService.isRunning(), 1, + Tasks.waitFor(false, taskWorkerService::isRunning, 1, TimeUnit.SECONDS); Assert.fail(); } catch (TimeoutException e) { @@ -383,6 +411,81 @@ public void testConcurrentRequestsWithIsolationDisabled() throws Exception { Assert.assertEquals(Service.State.TERMINATED, taskWorkerService.state()); } + @Test + public void testRestartWithConcurrentRequests() throws Exception { + CConfiguration cConf = createCConf(); + cConf.setInt(TaskWorker.REQUEST_LIMIT, 3); + cConf.setBoolean(TaskWorker.USER_CODE_ISOLATION_ENABLED, false); + cConf.setInt(Constants.TaskWorker.CONTAINER_KILL_AFTER_DURATION_SECOND, 2); + cConf.setInt(Constants.TaskWorker.TASK_EXECUTION_DEADLINE_SECOND, 1); + InMemoryDiscoveryService discoveryService = new InMemoryDiscoveryService(); + TaskWorkerService taskWorkerService = new TaskWorkerService(cConf, + createSConf(), discoveryService, discoveryService, + metricsCollectionService, + new CommonNettyHttpServiceFactory(cConf, metricsCollectionService)); + serviceCompletionFuture = TaskWorkerTestUtil.getServiceCompletionFuture( + taskWorkerService); + taskWorkerService.startAndWait(); + InetSocketAddress addr = taskWorkerService.getBindAddress(); + URI uri = URI.create( + String.format("http://%s:%s", addr.getHostName(), addr.getPort())); + + List> calls = new ArrayList<>(); + int concurrentRequests = 2; + + for (int i = 0; i < concurrentRequests; i++) { + RunnableTaskRequest request = RunnableTaskRequest.getBuilder( + TestRunnableClass.class.getName()) + .withParam("100").withNamespace("testNamespace").build(); + calls.add( + () -> HttpRequests.execute( + HttpRequest.post(uri.resolve("/v3Internal/worker/run").toURL()) + .withBody(GSON.toJson(request)).build(), + new DefaultHttpRequestConfig(false)) + ); + } + + // Send a request that never ends. + RunnableTaskRequest slowRequest = RunnableTaskRequest.getBuilder( + TestRunnableClass.class.getName()) + .withParam("1000000").withNamespace("testNamespace").build(); + + calls.add( + () -> HttpRequests.execute( + HttpRequest.post(uri.resolve("/v3Internal/worker/run").toURL()) + .withBody(GSON.toJson(slowRequest)).build(), + new DefaultHttpRequestConfig(false)) + ); + + List> responses = Executors.newFixedThreadPool( + concurrentRequests).invokeAll(calls); + + int okResponse = 0; + int connectionRefusedCount = 0; + for (Future response : responses) { + try { + final int responseCode = response.get().getResponseCode(); + if (responseCode == HttpResponseStatus.OK.code()) { + okResponse++; + } + } catch (ExecutionException ex) { + if (ex.getCause() instanceof ConnectException) { + connectionRefusedCount++; + } else { + throw ex; + } + } + } + + // Verify that the task worker service has stopped automatically. + Assert.assertEquals(2, okResponse); + // The slow request will receive a "connection refused" response once the task + // worker service stops. + Assert.assertEquals(connectionRefusedCount, 1); + TaskWorkerTestUtil.waitForServiceCompletion(serviceCompletionFuture); + Assert.assertEquals(Service.State.TERMINATED, taskWorkerService.state()); + } + public static class TestRunnableClass implements RunnableTask { @Override diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java b/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java index 92677c37e6e5..5ad68da11834 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java @@ -506,6 +506,8 @@ public static final class TaskWorker { "task.worker.container.kill.after.duration.second"; public static final String REQUEST_LIMIT = "task.worker.request.limit"; public static final String USER_CODE_ISOLATION_ENABLED = "task.worker.request.userCodeIsolation.enabled"; + public static final String TASK_EXECUTION_DEADLINE_SECOND = + "task.worker.taskExecutionDeadline.second"; public static final String CONTAINER_RUN_AS_USER = "task.worker.container.run.as.user"; public static final String CONTAINER_RUN_AS_GROUP = "task.worker.container.run.as.group"; public static final String CONTAINER_DISK_READONLY = "task.worker.container.disk.readonly"; diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/internal/remote/TaskWorkerHttpHandlerInternal.java b/cdap-common/src/main/java/io/cdap/cdap/common/internal/remote/TaskWorkerHttpHandlerInternal.java index 76ab2d949f9b..a098e752dad1 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/internal/remote/TaskWorkerHttpHandlerInternal.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/internal/remote/TaskWorkerHttpHandlerInternal.java @@ -46,6 +46,7 @@ import java.nio.charset.StandardCharsets; import java.util.Random; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -77,8 +78,7 @@ public class TaskWorkerHttpHandlerInternal extends AbstractHttpHandler { private static final Logger LOG = LoggerFactory.getLogger( TaskWorkerHttpHandlerInternal.class); private static final Gson GSON = new GsonBuilder().registerTypeAdapter( - BasicThrowable.class, - new BasicThrowableCodec()).create(); + BasicThrowable.class, new BasicThrowableCodec()).create(); private final RunnableTaskLauncher runnableTaskLauncher; private final BiConsumer taskCompletionConsumer; @@ -98,7 +98,7 @@ public class TaskWorkerHttpHandlerInternal extends AbstractHttpHandler { * If true, pod will restart once an operation finish its execution. */ private final AtomicBoolean mustRestart = new AtomicBoolean(false); - private final int requestLimit; + private final int concurrentRequestLimit; /** * Constructs the {@link TaskWorkerHttpHandlerInternal}. @@ -119,40 +119,37 @@ public TaskWorkerHttpHandlerInternal(CConfiguration cConf, TaskWorker.USER_CODE_ISOLATION_ENABLED); if (enableUserCodeIsolationEnabled) { // Run only one request at a time in user code isolation mode. - this.requestLimit = 1; - // Restart the service to clean up and re-claim resources after user code - // execution. - this.taskCompletionConsumer = (succeeded, taskDetails) -> { - taskDetails.emitMetrics(succeeded); - runningRequestCount.decrementAndGet(); - requestProcessedCount.incrementAndGet(); - - String className = taskDetails.getClassName(); - - if (mustRestart.get()) { - stopper.accept(className); - return; - } + this.concurrentRequestLimit = 1; + } else { + this.concurrentRequestLimit = cConf.getInt(TaskWorker.REQUEST_LIMIT); + } - if (!taskDetails.isTerminateOnComplete() || className == null - || killAfterRequestCount <= 0) { - // No need to restart. - return; - } + // Restart the service to clean up and re-claim resources after user code + // execution. + this.taskCompletionConsumer = (succeeded, taskDetails) -> { + taskDetails.emitMetrics(succeeded); + final int pendingRequests = runningRequestCount.decrementAndGet(); + requestProcessedCount.incrementAndGet(); - if (requestProcessedCount.get() >= killAfterRequestCount) { - stopper.accept(className); - } - }; + String className = taskDetails.getClassName(); + if (mustRestart.get() && pendingRequests == 0) { + stopper.accept(className); + return; + } - enablePeriodicRestart(cConf, stopper); - } else { - this.requestLimit = cConf.getInt(TaskWorker.REQUEST_LIMIT); - this.taskCompletionConsumer = (succeeded, taskDetails) -> { - taskDetails.emitMetrics(succeeded); - runningRequestCount.decrementAndGet(); - }; - } + if (!enableUserCodeIsolationEnabled + || !taskDetails.isTerminateOnComplete() + || className == null || killAfterRequestCount <= 0) { + // No need to restart. + return; + } + + if (requestProcessedCount.get() >= killAfterRequestCount) { + stopper.accept(className); + } + }; + + enablePeriodicRestart(cConf, stopper); } /** @@ -169,31 +166,37 @@ private void enablePeriodicRestart(CConfiguration cConf, Constants.TaskWorker.CONTAINER_KILL_AFTER_DURATION_SECOND, 0); int lowerBound = (int) (duration - duration * DURATION_FRACTION); int upperBound = (int) (duration + duration * DURATION_FRACTION); - if (lowerBound > 0) { - int waitTime = - (new Random()).nextInt(upperBound - lowerBound) + lowerBound; - Executors.newSingleThreadScheduledExecutor( - Threads.createDaemonThreadFactory("task-worker-restart")) - .scheduleWithFixedDelay( - () -> { - if (mustRestart.get()) { - // We force pod restart as the ongoing request has not finished since last - // periodic restart check. - stopper.accept(""); - return; - } - // we restart once ongoing request (which has set runningRequestCount to 1) - // finishes. - mustRestart.set(true); - if (runningRequestCount.compareAndSet(0, 1)) { - // there is no ongoing request. pod gets restarted. - stopper.accept(""); - } - }, - waitTime, - waitTime, - TimeUnit.SECONDS); + + if (duration <= 0) { + return; } + int waitTime = (new Random()).nextInt(upperBound - lowerBound) + lowerBound; + int finalTaskDeadlineSeconds = calculateFinalTaskDeadlineSeconds(duration); + + ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor( + Threads.createDaemonThreadFactory("task-worker-restart")); + + executorService.scheduleWithFixedDelay(() -> { + // we restart once all ongoing requests finish, i.e. runningRequestCount is 0. + mustRestart.set(true); + LOG.debug( + "Task worker service is about to restart in {} seconds, no new tasks will be accepted.", + finalTaskDeadlineSeconds); + if (runningRequestCount.get() == 0) { + stopper.accept(""); + executorService.shutdown(); + return; + } + try { + Thread.sleep(TimeUnit.SECONDS.toMillis(finalTaskDeadlineSeconds)); + } catch (InterruptedException e) { + LOG.warn( + "Interrupted while waiting for task completion. Stopping immediately", + e); + } + stopper.accept(""); + executorService.shutdown(); + }, waitTime, finalTaskDeadlineSeconds, TimeUnit.SECONDS); } /** @@ -205,7 +208,11 @@ private void enablePeriodicRestart(CConfiguration cConf, @POST @Path("/run") public void run(FullHttpRequest request, HttpResponder responder) { - if (runningRequestCount.incrementAndGet() > requestLimit) { + if (mustRestart.get()) { + responder.sendStatus(HttpResponseStatus.TOO_MANY_REQUESTS); + return; + } + if (runningRequestCount.incrementAndGet() > concurrentRequestLimit) { responder.sendStatus(HttpResponseStatus.TOO_MANY_REQUESTS); runningRequestCount.decrementAndGet(); return; @@ -223,16 +230,18 @@ public void run(FullHttpRequest request, HttpResponder responder) { if (runnableTaskRequest.getParam().getEmbeddedTaskRequest() != null) { // For system app tasks namespaceId = new NamespaceId( - runnableTaskRequest.getParam().getEmbeddedTaskRequest().getNamespace()); + runnableTaskRequest.getParam().getEmbeddedTaskRequest() + .getNamespace()); } else { namespaceId = new NamespaceId(runnableTaskRequest.getNamespace()); } // set the GcpMetadataTaskContext before running the task. - GcpMetadataTaskContextUtil.setGcpMetadataTaskContext(namespaceId, cConf); + GcpMetadataTaskContextUtil.setGcpMetadataTaskContext(namespaceId, + cConf); runnableTaskLauncher.launchRunnableTask(runnableTaskContext); TaskDetails taskDetails = new TaskDetails(metricsCollectionService, - startTime, - runnableTaskContext.isTerminateOnComplete(), runnableTaskRequest); + startTime, runnableTaskContext.isTerminateOnComplete(), + runnableTaskRequest); responder.sendContent(HttpResponseStatus.OK, new RunnableTaskBodyProducer(runnableTaskContext, taskCompletionConsumer, taskDetails), @@ -245,8 +254,8 @@ public void run(FullHttpRequest request, HttpResponder responder) { "application/json")); // Since the user class is not even loaded, no user code ran, hence it's ok to not terminate the runner taskCompletionConsumer.accept(false, - new TaskDetails(metricsCollectionService, - startTime, false, runnableTaskRequest)); + new TaskDetails(metricsCollectionService, startTime, false, + runnableTaskRequest)); } finally { // clear the GcpMetadataTaskContext after the task is completed. GcpMetadataTaskContextUtil.clearGcpMetadataTaskContext(cConf); @@ -267,7 +276,7 @@ public void run(FullHttpRequest request, HttpResponder responder) { /** * Returns a new token from metadata server. * - * @param request The {@link io.netty.handler.codec.http.HttpRequest}. + * @param request The {@link io.netty.handler.codec.http.HttpRequest}. * @param responder a {@link HttpResponder} for sending response. */ @GET @@ -284,12 +293,10 @@ public void token(io.netty.handler.codec.http.HttpRequest request, try { URL url = new URL(metadataServiceEndpoint); HttpRequest tokenRequest = HttpRequest.get(url) - .addHeader("Metadata-Flavor", "Google") - .build(); + .addHeader("Metadata-Flavor", "Google").build(); HttpResponse tokenResponse = HttpRequests.execute(tokenRequest); responder.sendByteArray(HttpResponseStatus.OK, - tokenResponse.getResponseBody(), - EmptyHttpHeaders.INSTANCE); + tokenResponse.getResponseBody(), EmptyHttpHeaders.INSTANCE); } catch (Exception ex) { LOG.warn("Failed to fetch token from metadata service", ex); responder.sendJson(HttpResponseStatus.INTERNAL_SERVER_ERROR, @@ -306,6 +313,28 @@ private String exceptionToJson(Exception ex) { return GSON.toJson(basicThrowable); } + /** + * Compute the final task Dead line in Seconds where if the config {@TaskWorker.TASK_EXECUTION_DEADLINE_SECOND} + * is less than 0 which is not valid then use the duration instead. + * + * @param duration + * @return + */ + private int calculateFinalTaskDeadlineSeconds(int duration) { + int taskDeadlineSeconds = cConf.getInt( + TaskWorker.TASK_EXECUTION_DEADLINE_SECOND, + 0); + + if (taskDeadlineSeconds < 0) { + LOG.info( + "Task deadline is {}, using {} value {} as the deadline instead.", + taskDeadlineSeconds, + Constants.TaskWorker.CONTAINER_KILL_AFTER_DURATION_SECOND, duration); + taskDeadlineSeconds = duration; + } + return taskDeadlineSeconds; + } + /** * By using BodyProducer instead of simply sending out response bytes, the * handler can get notified (through finished method) when sending the diff --git a/cdap-common/src/main/resources/cdap-default.xml b/cdap-common/src/main/resources/cdap-default.xml index 99f5696a4830..1d1905c8495b 100644 --- a/cdap-common/src/main/resources/cdap-default.xml +++ b/cdap-common/src/main/resources/cdap-default.xml @@ -247,7 +247,10 @@ twill.jvm.gc.opts - -XX:+UseG1GC -verbose:gc -Xloggc:<LOG_DIR>/gc.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=1M + -XX:+UseG1GC -verbose:gc -Xloggc:<LOG_DIR>/gc.log -XX:+PrintGCDetails + -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 + -XX:GCLogFileSize=1M + (,9) @@ -2611,7 +2614,9 @@ messaging.system.topics - ${audit.topic},${metadata.messaging.topic},${data.event.topic},${metrics.topic.prefix}:${metrics.messaging.topic.num},${metrics.admin.topic},${time.event.topic},${program.status.event.topic},${program.status.event.topic}:${program.status.event.topic.num.partitions},${operation.status.event.topic}:${operation.status.event.topic.num.partitions},${program.status.record.event.topic},${log.tms.topic.prefix}:${log.publish.num.partitions},${preview.messaging.topic},previewlog0 + + ${audit.topic},${metadata.messaging.topic},${data.event.topic},${metrics.topic.prefix}:${metrics.messaging.topic.num},${metrics.admin.topic},${time.event.topic},${program.status.event.topic},${program.status.event.topic}:${program.status.event.topic.num.partitions},${operation.status.event.topic}:${operation.status.event.topic.num.partitions},${program.status.record.event.topic},${log.tms.topic.prefix}:${log.publish.num.partitions},${preview.messaging.topic},previewlog0 + A comma-separated list of topics that are always available in the system namespace. Multiple topics sharing the same prefix and @@ -3268,7 +3273,9 @@ app.program.runtime.monitor.topics.configs - audit.topic,data.event.topic,metadata.messaging.topic,metrics.topic.prefix:${metrics.messaging.topic.num},program.status.event.topic,program.status.event.topic:${program.status.event.topic.num.partitions},log.tms.topic.prefix:${log.publish.num.partitions} + + audit.topic,data.event.topic,metadata.messaging.topic,metrics.topic.prefix:${metrics.messaging.topic.num},program.status.event.topic,program.status.event.topic:${program.status.event.topic.num.partitions},log.tms.topic.prefix:${log.publish.num.partitions} + A comma-separated list of topic config to be monitored by runtime monitor @@ -3420,7 +3427,8 @@ app.program.runtime.monitor.metrics.aggregation.window.secs 5 - When metrics aggregation in runtime client service is enabled, this property controls the max time difference + When metrics aggregation in runtime client service is enabled, this property controls the max + time difference between two metrics which can be aggregated. @@ -3429,7 +3437,8 @@ app.program.runtime.monitor.metrics.aggregation.polltime.ms 5000 - Polling time in milliseconds to poll updates from a runtime when metrics aggregation is enabled. + Polling time in milliseconds to poll updates from a runtime when metrics aggregation is + enabled. This enables setting longer poll intervals to have better aggregation of metrics. @@ -3752,7 +3761,7 @@ preview.runner.internal.router.enabled false - Whether to route requests from preview runners through the internal router service. This + Whether to route requests from preview runners through the internal router service. This is only applicable for k8s environment presently. By default, it is disabled. @@ -5304,12 +5313,22 @@ + + task.worker.taskExecutionDeadline.second + 1200 + + Duration (in seconds) to wait for a task executing before stopping the task worker service. + If the value is less than or equal 0, ${task.worker.container.kill.after.duration.second} is + used instead. + + + task.worker.request.userCodeIsolation.enabled true Whether user code isolation is enabled in task worker. When enabled, task workers will ensure - multiple requests that run user code are not executed concurrently. + only 1 request is accepted at max and later after the task worker is restarted it can take next request. @@ -5357,7 +5376,7 @@ task.worker.internal.router.enabled false - Whether to route requests from task workers through the internal router service. This + Whether to route requests from task workers through the internal router service. This is only applicable for k8s environment presently. By default, it is disabled. @@ -5548,7 +5567,7 @@ system.worker.internal.router.enabled false - Whether to route requests from system workers through the internal router service. This + Whether to route requests from system workers through the internal router service. This is only applicable for k8s environment presently. By default, it is disabled. @@ -6164,7 +6183,7 @@ - + artifact.localizer.preload.list @@ -6208,7 +6227,9 @@ artifact.localizer.metadata.service.token.endpoint - http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token + + http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token + The GCE metadata server token endpoint. @@ -6284,7 +6305,8 @@ hsts.include.sub.domains true - Whether to include the includeSubDomains directive, which makes this policy extend to subdomains. + Whether to include the includeSubDomains directive, which makes this policy extend to + subdomains. @@ -6296,7 +6318,7 @@ - + operation.status.retry.policy.base.delay.ms @@ -6346,7 +6368,7 @@ Topic prefix for publishing status transitioning events of operation runs to the messaging system. - f + operation.status.event.topic.num.partitions