From fd63842d5e1b46b17914af203adbf807245d431e Mon Sep 17 00:00:00 2001 From: Arjan Bal Date: Tue, 2 Apr 2024 15:22:06 +0530 Subject: [PATCH] Throw an exception in DefaultRuntimeJob when program fails --- .../runtimejob/DefaultRuntimeJob.java | 370 +++++++++++------- .../monitor/ProgramRunCompletionDetails.java | 61 +++ .../runtime/monitor/RuntimeClientService.java | 65 +-- .../monitor/RuntimeClientServiceTest.java | 9 +- .../spi/runtimejob/DataprocJobMain.java | 26 +- .../ProgramRunFailureException.java | 29 ++ 6 files changed, 367 insertions(+), 193 deletions(-) create mode 100644 cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/monitor/ProgramRunCompletionDetails.java create mode 100644 cdap-runtime-spi/src/main/java/io/cdap/cdap/runtime/spi/runtimejob/ProgramRunFailureException.java diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/distributed/runtimejob/DefaultRuntimeJob.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/distributed/runtimejob/DefaultRuntimeJob.java index 979b452a8523..fec843399686 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/distributed/runtimejob/DefaultRuntimeJob.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/distributed/runtimejob/DefaultRuntimeJob.java @@ -47,6 +47,7 @@ import io.cdap.cdap.app.program.Programs; import io.cdap.cdap.app.runtime.Arguments; import io.cdap.cdap.app.runtime.ProgramController; +import io.cdap.cdap.app.runtime.ProgramController.State; import io.cdap.cdap.app.runtime.ProgramOptions; import io.cdap.cdap.app.runtime.ProgramRunner; import io.cdap.cdap.app.runtime.ProgramRunnerFactory; @@ -64,6 +65,7 @@ import io.cdap.cdap.common.logging.LoggingContextAccessor; import io.cdap.cdap.common.logging.common.UncaughtExceptionHandler; import io.cdap.cdap.common.utils.DirUtils; +import io.cdap.cdap.common.utils.ImmutablePair; import io.cdap.cdap.common.utils.Networks; import io.cdap.cdap.internal.app.ApplicationSpecificationAdapter; import io.cdap.cdap.internal.app.deploy.ConfiguratorFactory; @@ -90,6 +92,7 @@ import io.cdap.cdap.internal.app.runtime.distributed.DistributedProgramRunner; import io.cdap.cdap.internal.app.runtime.distributed.DistributedWorkerProgramRunner; import io.cdap.cdap.internal.app.runtime.distributed.DistributedWorkflowProgramRunner; +import io.cdap.cdap.internal.app.runtime.monitor.ProgramRunCompletionDetails; import io.cdap.cdap.internal.app.runtime.monitor.RuntimeClientService; import io.cdap.cdap.internal.app.runtime.monitor.RuntimeMonitors; import io.cdap.cdap.internal.app.runtime.monitor.ServiceSocksProxyInfo; @@ -99,9 +102,9 @@ import io.cdap.cdap.logging.appender.LogAppenderInitializer; import io.cdap.cdap.logging.appender.loader.LogAppenderLoaderService; import io.cdap.cdap.logging.context.LoggingContextHelper; -import io.cdap.cdap.messaging.spi.MessagingService; import io.cdap.cdap.messaging.guice.MessagingServerRuntimeModule; import io.cdap.cdap.messaging.server.MessagingHttpService; +import io.cdap.cdap.messaging.spi.MessagingService; import io.cdap.cdap.proto.ProgramType; import io.cdap.cdap.proto.id.ProfileId; import io.cdap.cdap.proto.id.ProgramId; @@ -109,6 +112,7 @@ import io.cdap.cdap.runtime.spi.RuntimeMonitorType; import io.cdap.cdap.runtime.spi.provisioner.Cluster; import io.cdap.cdap.runtime.spi.runtimejob.LaunchMode; +import io.cdap.cdap.runtime.spi.runtimejob.ProgramRunFailureException; import io.cdap.cdap.runtime.spi.runtimejob.RuntimeJob; import io.cdap.cdap.runtime.spi.runtimejob.RuntimeJobEnvironment; import java.io.BufferedReader; @@ -203,7 +207,7 @@ public void run(RuntimeJobEnvironment runtimeJobEnv) throws Exception { LoggingContextHelper.getLoggingContextWithRunId(programRunId, systemArgs.asMap())); // Get the cluster launch type - Cluster cluster = GSON.fromJson(systemArgs.getOption(ProgramOptionConstants.CLUSTER), + final Cluster cluster = GSON.fromJson(systemArgs.getOption(ProgramOptionConstants.CLUSTER), Cluster.class); // Get App spec @@ -214,7 +218,7 @@ public void run(RuntimeJobEnvironment runtimeJobEnv) throws Exception { // Create injector and get program runner Injector injector = Guice.createInjector( - createModules(runtimeJobEnv, createCConf(runtimeJobEnv, programOpts), + createModules(runtimeJobEnv, createCconf(runtimeJobEnv, programOpts), programRunId, programOpts)); CConfiguration cConf = injector.getInstance(CConfiguration.class); @@ -230,55 +234,13 @@ public void run(RuntimeJobEnvironment runtimeJobEnv) throws Exception { Deque coreServices = createCoreServices(injector, systemArgs, cluster); startCoreServices(coreServices); - // regenerate app spec - ConfiguratorFactory configuratorFactory = injector.getInstance(ConfiguratorFactory.class); - try { - Map systemArguments = new HashMap<>(programOpts.getArguments().asMap()); - File pluginDir = new File( - programOpts.getArguments().getOption(ProgramOptionConstants.PLUGIN_DIR, - ProgramRunners.PLUGIN_DIR)); - // create a directory to store plugin artifacts for the regeneration of app spec to fetch plugin artifacts - DirUtils.mkdirs(pluginDir); - - if (!programOpts.getArguments().hasOption(ProgramOptionConstants.PLUGIN_DIR)) { - systemArguments.put(ProgramOptionConstants.PLUGIN_DIR, ProgramRunners.PLUGIN_DIR); - } - - // remember the file names in the artifact folder before app regeneration - List pluginFiles = DirUtils.listFiles(pluginDir, File::isFile).stream() - .map(File::getName) - .collect(Collectors.toList()); - - ApplicationSpecification generatedAppSpec = - regenerateAppSpec(systemArguments, programOpts.getUserArguments().asMap(), programId, - appSpec, - programDescriptor, configuratorFactory); - appSpec = generatedAppSpec != null ? generatedAppSpec : appSpec; - programDescriptor = new ProgramDescriptor(programDescriptor.getProgramId(), appSpec); - - List pluginFilesAfter = DirUtils.listFiles(pluginDir, File::isFile).stream() - .map(File::getName) - .collect(Collectors.toList()); - - if (pluginFilesAfter.isEmpty()) { - systemArguments.remove(ProgramOptionConstants.PLUGIN_DIR); - } - - // if different, create an updated plugin archive - if (!pluginFiles.equals(pluginFilesAfter)) { - File tempDir = DirUtils.createTempDir(new File(cConf.get(Constants.CFG_LOCAL_DATA_DIR), - cConf.get(Constants.AppFabric.TEMP_DIR)).getAbsoluteFile()); - File tempArchiveDir = DirUtils.createTempDir(tempDir); - File rebuiltPluginArchive = ProgramRunners.createPluginArchive(programOpts, tempArchiveDir); - systemArguments.put(ProgramOptionConstants.PLUGIN_ARCHIVE, - rebuiltPluginArchive.getAbsolutePath()); - } - - // update program options - programOpts = new SimpleProgramOptions(programOpts.getProgramId(), - new BasicArguments(systemArguments), - programOpts.getUserArguments(), programOpts.isDebug()); + ImmutablePair regeneratedValues = regeneratePluginArtifacts( + programOpts, programDescriptor, injector); + programOpts = regeneratedValues.getSecond(); + appSpec = regeneratedValues.getFirst(); + programDescriptor = new ProgramDescriptor( + programDescriptor.getProgramId(), appSpec); } catch (Exception e) { LOG.warn("Failed to regenerate the app spec for program {}, using the existing app spec", programId, e); @@ -295,70 +257,30 @@ public void run(RuntimeJobEnvironment runtimeJobEnv) throws Exception { ProgramController controller = programRunner.run(program, programOpts); controllerFuture.complete(controller); - // Failure of any core service can leave the program in an orphaned state - // One example is RuntimeClientService failure when CDAP instance is deleted - // In such situations runtime job should be stopped to free up resources (CDAP-20216) - for (Service service : coreServices) { - service.addListener(new ServiceListenerAdapter() { - @Override - public void failed(Service.State from, Throwable failure) { - LOG.error("Core service {} failed, prev state {}, terminating program run", - service, from, failure); - try { - LOG.error("Forcefully terminating program run {}", programRunId); - controller.kill(); - } catch (Exception e) { - LOG.error("Error in terminating program run", e); - // Fallback in case controller could not be stopped - try { - programStateWriter.error(programRunId, failure); - } catch (Exception ex) { - LOG.error("Error in updating program state to error", ex); - } - programCompletion.completeExceptionally(failure); - } - } - }, Threads.SAME_THREAD_EXECUTOR); - } + monitorServicesHealth(programRunId, coreServices, programStateWriter, + programCompletion, controller); runtimeClientService.onProgramStopRequested(terminateTs -> { - long timeout = TimeUnit.SECONDS.toMillis(terminateTs - STOP_PROPAGATION_DELAY_SECS) - - System.currentTimeMillis(); + long timeout = TimeUnit.SECONDS.toMillis( + terminateTs - STOP_PROPAGATION_DELAY_SECS) + - System.currentTimeMillis(); if (timeout < 0) { // If the timeout is smaller than the propagation delay, use the propagation delay as timeout - // to give the remote process some time to shutdown - LOG.debug("Terminating program run {} short timeout {} seconds", programRunId, - STOP_PROPAGATION_DELAY_SECS); + // to give the remote process some time to shut down. + LOG.debug("Terminating program run {} short timeout {} seconds", + programRunId, STOP_PROPAGATION_DELAY_SECS); controller.stop(STOP_PROPAGATION_DELAY_SECS, TimeUnit.SECONDS); } else { - LOG.debug("Terminating program run {} with timeout {} ms", programRunId, timeout); + LOG.debug("Terminating program run {} with timeout {} ms", + programRunId, timeout); controller.stop(timeout, TimeUnit.MILLISECONDS); } }); - controller.addListener(new AbstractListener() { - @Override - public void completed() { - programCompletion.complete(ProgramController.State.COMPLETED); - } - - @Override - public void killed() { - // Write an extra state to make sure there is always a terminal state even - // if the program application run failed to write out the state. - programStateWriter.killed(programRunId); - programCompletion.complete(ProgramController.State.KILLED); - } - - @Override - public void error(Throwable cause) { - // Write an extra state to make sure there is always a terminal state even - // if the program application run failed to write out the state. - programStateWriter.error(programRunId, cause); - programCompletion.completeExceptionally(cause); - } - }, Threads.SAME_THREAD_EXECUTOR); + controller.addListener( + new ProgramStateListener(programCompletion, programStateWriter, + programRunId), Threads.SAME_THREAD_EXECUTOR); if (stopRequested) { controller.stop(); @@ -377,7 +299,7 @@ public void error(Throwable cause) { if (!programCompletion.isDone()) { // We log here so that the logs would still send back to the program logs collection. // Only log if the program completion is not done. - // Otherwise the program runner itself should have logged the error. + // Otherwise, the program runner itself should have logged the error. LOG.error("Failed to execute program {}", programRunId, t); // If the program completion is not done, then this exception // is due to systematic failure in which fail to run the program. @@ -391,46 +313,16 @@ public void error(Throwable cause) { Authenticator.setDefault(null); runCompletedLatch.countDown(); } - } - - @Nullable - private ApplicationSpecification regenerateAppSpec( - Map systemArguments, Map userArguments, ProgramId programId, - ApplicationSpecification existingAppSpec, ProgramDescriptor programDescriptor, - ConfiguratorFactory configuratorFactory) - throws InterruptedException, ExecutionException, TimeoutException { - - String appClassName = systemArguments.get(ProgramOptionConstants.APPLICATION_CLASS); - Location programJarLocation = Locations.toLocation( - new File(systemArguments.get(ProgramOptionConstants.PROGRAM_JAR))); - - userArguments = SystemArguments.skipNormalMacroEvaluation(userArguments) - ? Collections.emptyMap() : userArguments; - AppDeploymentInfo deploymentInfo = AppDeploymentInfo.builder() - .setArtifactId(programDescriptor.getArtifactId()) - .setArtifactLocation(programJarLocation) - .setApplicationClass(new ApplicationClass(appClassName, "", null)) - .setApplicationId(programId.getParent()) - .setConfigString(existingAppSpec.getConfiguration()) - .setOwnerPrincipal(null) - .setUpdateSchedules(false) - .setRuntimeInfo( - new AppDeploymentRuntimeInfo(existingAppSpec, userArguments, systemArguments)) - .setDeployedApplicationSpec(existingAppSpec) - .build(); - - Configurator configurator = configuratorFactory.create(deploymentInfo); - ListenableFuture future = configurator.config(); - ConfigResponse response = future.get(120, TimeUnit.SECONDS); - - if (response.getExitCode() == 0) { - AppSpecInfo appSpecInfo = response.getAppSpecInfo(); - if (appSpecInfo != null && appSpecInfo.getAppSpec() != null) { - return appSpecInfo.getAppSpec(); - } + final ProgramRunCompletionDetails completionInfo = runtimeClientService.getProgramCompletionDetails(); + if (completionInfo == null) { + LOG.warn( + "RuntimeClientService returned null ProgramRunCompletionDetails even after program completion."); + } else if (completionInfo.getEndStatus().isUnsuccessful()) { + throw new ProgramRunFailureException( + String.format("Program %s finished with unsuccessful status %s", + programId, completionInfo.getEndStatus().name())); } - return null; } @Override @@ -454,7 +346,7 @@ public void requestStop() { * by the {@link RuntimeJobEnvironment#getProperties()} will be set into the returned {@link * CConfiguration} instance. */ - private CConfiguration createCConf(RuntimeJobEnvironment runtimeJobEnv, + private CConfiguration createCconf(RuntimeJobEnvironment runtimeJobEnv, ProgramOptions programOpts) throws IOException { CConfiguration cConf = CConfiguration.create(); cConf.clear(); @@ -657,15 +549,155 @@ private void stopCoreServices(Deque coreServices, try { service.stopAndWait(); } catch (Exception e) { - LOG.warn("Exception raised when stopping service {} during program termination.", service, - e); + LOG.warn( + "Exception raised when stopping service {} during program termination.", + service, e); + } + } + } + + /** + * Regenerates the plugin artifacts and returns the new + * {@link ApplicationSpecification} and {@link ProgramOptions}. + */ + private ImmutablePair regeneratePluginArtifacts( + final ProgramOptions originalProgramOpts, + final ProgramDescriptor programDescriptor, final Injector injector) + throws Exception { + ConfiguratorFactory configuratorFactory = injector.getInstance( + ConfiguratorFactory.class); + CConfiguration cConf = injector.getInstance(CConfiguration.class); + + ApplicationSpecification appSpec = programDescriptor.getApplicationSpecification(); + ProgramId programId = programDescriptor.getProgramId(); + Map systemArguments = new HashMap<>( + originalProgramOpts.getArguments().asMap()); + File pluginDir = new File(originalProgramOpts.getArguments() + .getOption(ProgramOptionConstants.PLUGIN_DIR, + ProgramRunners.PLUGIN_DIR)); + // create a directory to store plugin artifacts for the regeneration of app spec to fetch plugin artifacts + DirUtils.mkdirs(pluginDir); + + if (!originalProgramOpts.getArguments() + .hasOption(ProgramOptionConstants.PLUGIN_DIR)) { + systemArguments.put(ProgramOptionConstants.PLUGIN_DIR, + ProgramRunners.PLUGIN_DIR); + } + + // remember the file names in the artifact folder before app regeneration + final List pluginFiles = DirUtils.listFiles(pluginDir, File::isFile) + .stream().map(File::getName).collect(Collectors.toList()); + + ApplicationSpecification newAppSpec = regenerateAppSpec(systemArguments, + originalProgramOpts.getUserArguments().asMap(), programId, appSpec, + programDescriptor, configuratorFactory); + appSpec = newAppSpec != null ? newAppSpec : appSpec; + + List pluginFilesAfter = DirUtils.listFiles(pluginDir, File::isFile) + .stream().map(File::getName).collect(Collectors.toList()); + + if (pluginFilesAfter.isEmpty()) { + systemArguments.remove(ProgramOptionConstants.PLUGIN_DIR); + } + + // if different, create an updated plugin archive + if (!pluginFiles.equals(pluginFilesAfter)) { + File tempDir = DirUtils.createTempDir( + new File(cConf.get(Constants.CFG_LOCAL_DATA_DIR), + cConf.get(Constants.AppFabric.TEMP_DIR)).getAbsoluteFile()); + File tempArchiveDir = DirUtils.createTempDir(tempDir); + File rebuiltPluginArchive = ProgramRunners.createPluginArchive( + originalProgramOpts, tempArchiveDir); + systemArguments.put(ProgramOptionConstants.PLUGIN_ARCHIVE, + rebuiltPluginArchive.getAbsolutePath()); + } + + // update program options + ProgramOptions newProgramOpts = new SimpleProgramOptions( + originalProgramOpts.getProgramId(), new BasicArguments(systemArguments), + originalProgramOpts.getUserArguments(), originalProgramOpts.isDebug()); + return ImmutablePair.of(appSpec, newProgramOpts); + } + + /** + * Failure of any core service can leave the program in an orphaned state One + * example is RuntimeClientService failure when CDAP instance is deleted In + * such situations runtime job should be stopped to free up resources + * (CDAP-20216) + */ + private void monitorServicesHealth(ProgramRunId programRunId, + Deque services, ProgramStateWriter programStateWriter, + CompletableFuture programCompletion, + ProgramController controller) { + + for (Service service : services) { + service.addListener(new ServiceListenerAdapter() { + @Override + public void failed(Service.State from, Throwable failure) { + LOG.error( + "Core service {} failed, prev state {}, terminating program run", + service, from, failure); + try { + LOG.error("Forcefully terminating program run {}", programRunId); + controller.kill(); + } catch (Exception e) { + LOG.error("Error in terminating program run", e); + // Fallback in case controller could not be stopped + try { + programStateWriter.error(programRunId, failure); + } catch (Exception ex) { + LOG.error("Error in updating program state to error", ex); + } + programCompletion.completeExceptionally(failure); + } + } + }, Threads.SAME_THREAD_EXECUTOR); + } + } + + @Nullable + private ApplicationSpecification regenerateAppSpec( + Map systemArguments, Map userArguments, + ProgramId programId, ApplicationSpecification existingAppSpec, + ProgramDescriptor programDescriptor, + ConfiguratorFactory configuratorFactory) + throws InterruptedException, ExecutionException, TimeoutException { + + String appClassName = systemArguments.get( + ProgramOptionConstants.APPLICATION_CLASS); + Location programJarLocation = Locations.toLocation( + new File(systemArguments.get(ProgramOptionConstants.PROGRAM_JAR))); + + userArguments = SystemArguments.skipNormalMacroEvaluation(userArguments) + ? Collections.emptyMap() : userArguments; + + AppDeploymentInfo deploymentInfo = AppDeploymentInfo.builder() + .setArtifactId(programDescriptor.getArtifactId()) + .setArtifactLocation(programJarLocation) + .setApplicationClass(new ApplicationClass(appClassName, "", null)) + .setApplicationId(programId.getParent()) + .setConfigString(existingAppSpec.getConfiguration()) + .setOwnerPrincipal(null).setUpdateSchedules(false).setRuntimeInfo( + new AppDeploymentRuntimeInfo(existingAppSpec, userArguments, + systemArguments)).setDeployedApplicationSpec(existingAppSpec) + .build(); + + Configurator configurator = configuratorFactory.create(deploymentInfo); + ListenableFuture future = configurator.config(); + ConfigResponse response = future.get(120, TimeUnit.SECONDS); + + if (response.getExitCode() == 0) { + AppSpecInfo appSpecInfo = response.getAppSpecInfo(); + if (appSpecInfo != null && appSpecInfo.getAppSpec() != null) { + return appSpecInfo.getAppSpec(); } } + return null; } /** - * A service wrapper around {@link TrafficRelayServer} for setting address configurations after - * starting the relay server. + * A service wrapper around {@link TrafficRelayServer} for setting address + * configurations after starting the relay server. */ private static final class TrafficRelayService extends AbstractIdleService { @@ -706,14 +738,50 @@ private InetSocketAddress getTrafficRelayTarget() { int port = GSON.fromJson(reader, ServiceSocksProxyInfo.class).getPort(); return port == 0 ? null : new InetSocketAddress(InetAddress.getLoopbackAddress(), port); } catch (Exception e) { - OUTAGE_LOG.warn("Failed to open service proxy file {}", serviceProxyFile, e); + OUTAGE_LOG.warn("Failed to open service proxy file {}", + serviceProxyFile, e); return null; } } private File getServiceProxyFile() { - return new File("/tmp", - Constants.RuntimeMonitor.SERVICE_PROXY_FILE + "-" + programRunId.getRun() + ".json"); + return new File("/tmp", Constants.RuntimeMonitor.SERVICE_PROXY_FILE + "-" + + programRunId.getRun() + ".json"); + } + } + + private class ProgramStateListener extends AbstractListener { + + private final CompletableFuture programCompletion; + private final ProgramStateWriter programStateWriter; + private final ProgramRunId programRunId; + + public ProgramStateListener(CompletableFuture programCompletion, + ProgramStateWriter programStateWriter, ProgramRunId programRunId) { + this.programCompletion = programCompletion; + this.programStateWriter = programStateWriter; + this.programRunId = programRunId; + } + + @Override + public void completed() { + programCompletion.complete(State.COMPLETED); + } + + @Override + public void killed() { + // Write an extra state to make sure there is always a terminal state even + // if the program application run failed to write out the state. + programStateWriter.killed(programRunId); + programCompletion.complete(State.KILLED); + } + + @Override + public void error(Throwable cause) { + // Write an extra state to make sure there is always a terminal state even + // if the program application run failed to write out the state. + programStateWriter.error(programRunId, cause); + programCompletion.completeExceptionally(cause); } } } diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/monitor/ProgramRunCompletionDetails.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/monitor/ProgramRunCompletionDetails.java new file mode 100644 index 000000000000..7316c9589e85 --- /dev/null +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/monitor/ProgramRunCompletionDetails.java @@ -0,0 +1,61 @@ +/* + * Copyright © 2024 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.internal.app.runtime.monitor; + +import io.cdap.cdap.proto.ProgramRunStatus; +import java.util.Objects; + +/** + * Information about the completion of a + * {@link io.cdap.cdap.app.program.Program} run managed by a + * {@link io.cdap.cdap.runtime.spi.runtimejob.RuntimeJob}. + */ +public class ProgramRunCompletionDetails { + + private final long endTimestamp; + private final ProgramRunStatus endStatus; + + ProgramRunCompletionDetails(long endTimestamp, ProgramRunStatus endStatus) { + this.endTimestamp = endTimestamp; + this.endStatus = endStatus; + } + + public long getEndTimestamp() { + return endTimestamp; + } + + public ProgramRunStatus getEndStatus() { + return endStatus; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof ProgramRunCompletionDetails)) { + return false; + } + ProgramRunCompletionDetails that = (ProgramRunCompletionDetails) o; + return endTimestamp == that.endTimestamp && endStatus == that.endStatus; + } + + @Override + public int hashCode() { + return Objects.hash(endTimestamp, endStatus); + } +} diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/monitor/RuntimeClientService.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/monitor/RuntimeClientService.java index 2bc1915b1c9d..c7d3a349fefe 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/monitor/RuntimeClientService.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/monitor/RuntimeClientService.java @@ -42,9 +42,9 @@ import io.cdap.cdap.internal.io.DatumReaderFactory; import io.cdap.cdap.internal.io.DatumWriterFactory; import io.cdap.cdap.internal.io.SchemaGenerator; -import io.cdap.cdap.messaging.spi.MessagingService; import io.cdap.cdap.messaging.context.MultiThreadMessagingContext; import io.cdap.cdap.messaging.data.MessageId; +import io.cdap.cdap.messaging.spi.MessagingService; import io.cdap.cdap.proto.Notification; import io.cdap.cdap.proto.ProgramRunStatus; import io.cdap.cdap.proto.id.NamespaceId; @@ -61,7 +61,7 @@ import java.util.Spliterators; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.LongConsumer; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -90,7 +90,7 @@ public class RuntimeClientService extends AbstractRetryableScheduledService { private final ProgramRunId programRunId; private final RuntimeClient runtimeClient; private final int fetchLimit; - private final AtomicLong programFinishTime; + private final AtomicReference completionDetails; @Inject RuntimeClientService(CConfiguration cConf, @@ -109,7 +109,7 @@ public class RuntimeClientService extends AbstractRetryableScheduledService { this.programRunId = programRunId; this.runtimeClient = runtimeClient; this.fetchLimit = cConf.getInt(Constants.RuntimeMonitor.BATCH_SIZE); - this.programFinishTime = new AtomicLong(-1L); + this.completionDetails = new AtomicReference<>(null); this.topicRelayers = RuntimeMonitors.createTopicNameList(cConf) .stream() .map(name -> createTopicRelayer(cConf, name, schemaGenerator, @@ -124,8 +124,8 @@ protected long runTask() throws Exception { nextPollDelay = Math.min(nextPollDelay, topicRelayer.publishMessages()); } - // If we got the program finished state, determine when to shutdown - if (getProgramFinishTime() > 0) { + // If we got the program finished state, determine when to shut down. + if (getProgramCompletionDetails() != null) { // Gives half the time of the graceful shutdown time to allow empty fetches // Essentially is the wait time for any unpublished events on the remote runtime to publish // E.g. Metrics from the remote runtime process might have some delay after the program state changed, @@ -134,8 +134,10 @@ protected long runTask() throws Exception { // that means all of them fetched till the end of the corresponding topic in the latest fetch. long now = System.currentTimeMillis(); if ((nextPollDelay == pollTimeMillis - && now - (gracefulShutdownMillis >> 1) > getProgramFinishTime()) - || (now - gracefulShutdownMillis > getProgramFinishTime())) { + && now - (gracefulShutdownMillis >> 1) + > getProgramCompletionDetails().getEndTimestamp()) + || (now - gracefulShutdownMillis + > getProgramCompletionDetails().getEndTimestamp())) { LOG.debug( "Program {} terminated. Shutting down runtime client service.", programRunId); @@ -163,7 +165,7 @@ protected void doShutdown() throws Exception { for (TopicRelayer topicRelayer : topicRelayers) { topicRelayer.prepareClose(); } - if (getProgramFinishTime() < 0) { + if (getProgramCompletionDetails() == null) { throw new RetryableException("Program completion is not yet observed"); } }, retryStrategy, @@ -179,9 +181,15 @@ protected void doShutdown() throws Exception { } } - @VisibleForTesting - long getProgramFinishTime() { - return programFinishTime.get(); + /** + * Returns information about the program completion. + * + * @return {@link ProgramRunCompletionDetails} if the program run completion is + * observed, null otherwise. + */ + @Nullable + public ProgramRunCompletionDetails getProgramCompletionDetails() { + return completionDetails.get(); } @VisibleForTesting @@ -192,7 +200,7 @@ Collection getTopicNames() { } /** - * Accepts a Runnable and passes it to RuntimeClient + * Accepts a Runnable and passes it to RuntimeClient. * * @param stopper a {@link LongConsumer} with the termination timestamp in * seconds as the argument @@ -327,7 +335,7 @@ protected Message computeNext() { } /** - * Processes the give list of {@link Message}. By default it sends them + * Processes the give list of {@link Message}. By default, it sends them * through the {@link RuntimeClient}. */ protected void processMessages(Iterator iterator) @@ -411,15 +419,15 @@ protected void processMessages(Iterator iterator) false) .collect(Collectors.toList()); - if (programFinishTime.get() == -1L) { - long finishTime = findProgramFinishTime(message); - if (finishTime >= 0) { + if (completionDetails.get() == null) { + ProgramRunCompletionDetails finishInfo = findProgramFinishInfo(message); + if (finishInfo != null) { detectedProgramFinish = true; LOG.trace("Detected program {} finish time {} in topic {}", - programRunId, finishTime, + programRunId, finishInfo.getEndTimestamp(), topicId.getTopic()); } - programFinishTime.compareAndSet(-1L, finishTime); + completionDetails.compareAndSet(null, finishInfo); } if (detectedProgramFinish) { // Buffer the program state messages and don't publish them until the end @@ -474,11 +482,13 @@ public void close() throws IOException { } /** - * Returns the time where the program finished, meaning it reaches one of - * the terminal states. If the given list of {@link Message} doesn't contain - * such information, {@code -1L} is returned. + * Returns the completion status and time when the program finished, meaning + * it reaches one of the terminal states. If the given list of + * {@link Message} doesn't contain such information, {@code null} is + * returned. */ - private long findProgramFinishTime(List messages) { + @Nullable + private ProgramRunCompletionDetails findProgramFinishInfo(List messages) { for (Message message : messages) { Notification notification = message.decodePayload( r -> GSON.fromJson(r, Notification.class)); @@ -506,18 +516,21 @@ private long findProgramFinishTime(List messages) { } if (ProgramRunStatus.isEndState(programStatus)) { + long endTimeStamp; try { - return Long.parseLong( + endTimeStamp = Long.parseLong( properties.get(ProgramOptionConstants.END_TIME)); } catch (Exception e) { // END_TIME should be a valid long. In case there is any problem, use the timestamp in the message ID - return new MessageId( + endTimeStamp = new MessageId( Bytes.fromHexString(message.getId())).getPublishTimestamp(); } + return new ProgramRunCompletionDetails(endTimeStamp, + ProgramRunStatus.valueOf(programStatus)); } } - return -1L; + return null; } } } diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/runtime/monitor/RuntimeClientServiceTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/runtime/monitor/RuntimeClientServiceTest.java index 318e9e461c75..c204ac2ecf8f 100644 --- a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/runtime/monitor/RuntimeClientServiceTest.java +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/runtime/monitor/RuntimeClientServiceTest.java @@ -56,9 +56,9 @@ import io.cdap.cdap.internal.io.DatumReaderFactory; import io.cdap.cdap.internal.io.DatumWriterFactory; import io.cdap.cdap.internal.io.SchemaGenerator; -import io.cdap.cdap.messaging.spi.MessagingService; import io.cdap.cdap.messaging.context.MultiThreadMessagingContext; import io.cdap.cdap.messaging.guice.MessagingServerRuntimeModule; +import io.cdap.cdap.messaging.spi.MessagingService; import io.cdap.cdap.proto.Notification; import io.cdap.cdap.proto.ProgramRunStatus; import io.cdap.cdap.proto.id.NamespaceId; @@ -440,7 +440,8 @@ public void testProgramTerminate() throws Exception { // Send a terminate program state first, wait for the service sees the state change, // then publish messages to other topics. programStateWriter.completed(PROGRAM_RUN_ID); - Tasks.waitFor(true, () -> runtimeClientService.getProgramFinishTime() >= 0, + Tasks.waitFor(true, + () -> runtimeClientService.getProgramCompletionDetails() != null, 2, TimeUnit.SECONDS); for (String topic : nonStatusTopicNames) { @@ -465,6 +466,10 @@ public void testProgramTerminate() throws Exception { .collect(Collectors.toList()), 5, TimeUnit.SECONDS); } + + Assert.assertEquals( + runtimeClientService.getProgramCompletionDetails().getEndStatus(), + ProgramRunStatus.COMPLETED); waitForStatus(serverMessagingContext, ProgramRunStatus.COMPLETED); } diff --git a/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/runtimejob/DataprocJobMain.java b/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/runtimejob/DataprocJobMain.java index c3fe26b1792e..f18d4d151d64 100644 --- a/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/runtimejob/DataprocJobMain.java +++ b/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/runtimejob/DataprocJobMain.java @@ -55,7 +55,7 @@ public class DataprocJobMain { private static final Logger LOG = LoggerFactory.getLogger(DataprocJobMain.class); /** - * Main method to setup classpath and call the RuntimeJob.run() method. + * Main method to set up classpath and call the RuntimeJob.run() method. * * @param args the name of implementation of RuntimeJob class * @throws Exception any exception while running the job @@ -111,14 +111,14 @@ public static void main(String[] args) throws Exception { // Don't close the classloader since this is the main classloader, // which can be used for shutdown hook execution. // Closing it too early can result in NoClassDefFoundError in shutdown hook execution. - ClassLoader newCL = createContainerClassLoader(urls); + ClassLoader newCl = createContainerClassLoader(urls); CompletableFuture completion = new CompletableFuture<>(); try { - Thread.currentThread().setContextClassLoader(newCL); + Thread.currentThread().setContextClassLoader(newCl); // load environment class and create instance of it String dataprocEnvClassName = DataprocRuntimeEnvironment.class.getName(); - Class dataprocEnvClass = newCL.loadClass(dataprocEnvClassName); + Class dataprocEnvClass = newCl.loadClass(dataprocEnvClassName); Object newDataprocEnvInstance = dataprocEnvClass.newInstance(); try { @@ -130,15 +130,14 @@ public static void main(String[] args) throws Exception { initializeMethod.invoke(newDataprocEnvInstance, sparkCompat, launchMode); // call run() method on runtimeJobClass - Class runEnvCls = newCL.loadClass(RuntimeJobEnvironment.class.getName()); - Class runnerCls = newCL.loadClass(runtimeJobClassName); + Class runEnvCls = newCl.loadClass(RuntimeJobEnvironment.class.getName()); + Class runnerCls = newCl.loadClass(runtimeJobClassName); Method runMethod = runnerCls.getMethod("run", runEnvCls); Method stopMethod = runnerCls.getMethod("requestStop"); - Object runner = runnerCls.newInstance(); Runtime.getRuntime().addShutdownHook(new Thread(() -> { - // Request the runtime job to stop if it it hasn't been completed + // Request the runtime job to stop if it hasn't been completed. if (completion.isDone()) { return; } @@ -171,7 +170,6 @@ public static void main(String[] args) throws Exception { /** * This method will generate class path by adding following to urls to front of default * classpath: - * * expanded.resource.jar expanded.application.jar expanded.application.jar/lib/*.jar * expanded.application.jar/classes expanded.twill.jar expanded.twill.jar/lib/*.jar * expanded.twill.jar/classes @@ -185,7 +183,7 @@ private static URL[] getClasspath(List jarFiles) throws IOException { if (file.equals(Constants.Files.RESOURCES_JAR)) { continue; } - urls.addAll(createClassPathURLs(jarDir)); + urls.addAll(createClassPathUrls(jarDir)); } // Add the system class path to the URL list @@ -200,16 +198,16 @@ private static URL[] getClasspath(List jarFiles) throws IOException { return urls.toArray(new URL[0]); } - private static List createClassPathURLs(File dir) throws MalformedURLException { + private static List createClassPathUrls(File dir) throws MalformedURLException { List urls = new ArrayList<>(); // add jar urls from lib under dir - addJarURLs(new File(dir, "lib"), urls); + addJarUrls(new File(dir, "lib"), urls); // add classes under dir urls.add(new File(dir, "classes").toURI().toURL()); return urls; } - private static void addJarURLs(File dir, List result) throws MalformedURLException { + private static void addJarUrls(File dir, List result) throws MalformedURLException { File[] files = dir.listFiles(f -> f.getName().endsWith(".jar")); if (files == null) { return; @@ -285,7 +283,7 @@ private static Map> fromPosixArray(String[] args) { } /** - * Creates a {@link ClassLoader} for the the job execution. + * Creates a {@link ClassLoader} for the job execution. */ private static ClassLoader createContainerClassLoader(URL[] classpath) { String containerClassLoaderName = System.getProperty(Constants.TWILL_CONTAINER_CLASSLOADER); diff --git a/cdap-runtime-spi/src/main/java/io/cdap/cdap/runtime/spi/runtimejob/ProgramRunFailureException.java b/cdap-runtime-spi/src/main/java/io/cdap/cdap/runtime/spi/runtimejob/ProgramRunFailureException.java new file mode 100644 index 000000000000..bfcd47e8195b --- /dev/null +++ b/cdap-runtime-spi/src/main/java/io/cdap/cdap/runtime/spi/runtimejob/ProgramRunFailureException.java @@ -0,0 +1,29 @@ +/* + * Copyright © 2024 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.runtime.spi.runtimejob; + +/** + * An exception thrown to indicate that a Program launched by the + * {@link io.cdap.cdap.runtime.spi.runtimejob.RuntimeJob} completed gracefully, + * but finished with an unsuccessful status. + */ +public class ProgramRunFailureException extends RuntimeException { + + public ProgramRunFailureException(String message) { + super(message); + } +}