diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/AppFabricServiceRuntimeModule.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/AppFabricServiceRuntimeModule.java index 8683d09d555..a8b805bc6f7 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/AppFabricServiceRuntimeModule.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/AppFabricServiceRuntimeModule.java @@ -116,7 +116,7 @@ import io.cdap.cdap.internal.app.services.NoopRunRecordCorrectorService; import io.cdap.cdap.internal.app.services.ProgramLifecycleService; import io.cdap.cdap.internal.app.services.RunRecordCorrectorService; -import io.cdap.cdap.internal.app.services.RunRecordMonitorService; +import io.cdap.cdap.internal.app.services.FlowControlService; import io.cdap.cdap.internal.app.services.ScheduledRunRecordCorrectorService; import io.cdap.cdap.internal.app.store.DefaultStore; import io.cdap.cdap.internal.bootstrap.guice.BootstrapModules; @@ -420,7 +420,7 @@ protected void configure() { bind(ArtifactStore.class).in(Scopes.SINGLETON); bind(ProfileService.class).in(Scopes.SINGLETON); - bind(RunRecordMonitorService.class).in(Scopes.SINGLETON); + bind(FlowControlService.class).in(Scopes.SINGLETON); bind(ProgramLifecycleService.class).in(Scopes.SINGLETON); bind(SystemAppManagementService.class).in(Scopes.SINGLETON); bind(OwnerAdmin.class).to(DefaultOwnerAdmin.class); diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/AppFabricProcessorService.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/AppFabricProcessorService.java index d2a2d891db6..7a046b0ced2 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/AppFabricProcessorService.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/AppFabricProcessorService.java @@ -51,7 +51,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Set; -import javax.annotation.Nullable; import org.apache.twill.common.Cancellable; import org.apache.twill.discovery.DiscoveryService; import org.slf4j.Logger; @@ -75,7 +74,7 @@ public class AppFabricProcessorService extends AbstractIdleService { private final RunRecordCorrectorService runRecordCorrectorService; private final RunDataTimeToLiveService runDataTimeToLiveService; private final ProgramRunStatusMonitorService programRunStatusMonitorService; - private final RunRecordMonitorService runRecordCounterService; + private final FlowControlService runRecordCounterService; private final CoreSchedulerService coreSchedulerService; private final ProvisioningService provisioningService; private final BootstrapService bootstrapService; @@ -111,7 +110,7 @@ public AppFabricProcessorService(CConfiguration cConf, ProvisioningService provisioningService, BootstrapService bootstrapService, SystemAppManagementService systemAppManagementService, - RunRecordMonitorService runRecordCounterService, + FlowControlService runRecordCounterService, RunDataTimeToLiveService runDataTimeToLiveService, OperationNotificationSubscriberService operationNotificationSubscriberService, ScheduleNotificationSubscriberService scheduleNotificationSubscriberService) { diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/FlowControlService.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/FlowControlService.java new file mode 100644 index 00000000000..f2dc1e4e351 --- /dev/null +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/FlowControlService.java @@ -0,0 +1,189 @@ +/* + * Copyright © 2022 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.internal.app.services; + +import com.google.common.util.concurrent.AbstractIdleService; +import com.google.inject.Inject; +import io.cdap.cdap.api.metrics.MetricsCollectionService; +import io.cdap.cdap.app.program.ProgramDescriptor; +import io.cdap.cdap.app.runtime.ProgramOptions; +import io.cdap.cdap.common.app.RunIds; +import io.cdap.cdap.common.conf.Constants; +import io.cdap.cdap.internal.app.store.AppMetadataStore; +import io.cdap.cdap.proto.ProgramRunStatus; +import io.cdap.cdap.proto.ProgramType; +import io.cdap.cdap.proto.id.ProgramRunId; +import io.cdap.cdap.spi.data.transaction.TransactionRunner; +import io.cdap.cdap.spi.data.transaction.TransactionRunners; +import java.util.Collections; +import java.util.EnumSet; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Maintain and provides total number of launching and running run-records. This class is used by + * flow-control mechanism for launch requests. + */ +public class FlowControlService extends AbstractIdleService { + + private static final Logger LOG = LoggerFactory.getLogger(FlowControlService.class); + + // Program types controlled by flow-control mechanism. + private static final Set CONTROL_FLOW_PROGRAM_TYPES = EnumSet.of(ProgramType.MAPREDUCE, + ProgramType.WORKFLOW, + ProgramType.SPARK, + ProgramType.WORKER); + + private final MetricsCollectionService metricsCollectionService; + private final TransactionRunner transactionRunner; + + /** + * Monitors the program flow control. + * + * @param metricsCollectionService collect metrics + */ + @Inject + public FlowControlService( + MetricsCollectionService metricsCollectionService, + TransactionRunner transactionRunner) { + this.metricsCollectionService = metricsCollectionService; + this.transactionRunner = transactionRunner; + } + + @Override + protected void startUp() throws Exception { + LOG.info("FlowControlService started."); + } + + @Override + protected void shutDown() throws Exception { + LOG.info("FlowControlService successfully shut down."); + } + + /** + * Add a new in-flight launch request and return total number of launching and running programs. + * + * @param programRunId run id associated with the launch request + * @return total number of launching and running program runs. + */ + public Counter addRequestAndGetCount(ProgramRunId programRunId, ProgramOptions programOptions, + ProgramDescriptor programDescriptor) throws Exception { + if (RunIds.getTime(programRunId.getRun(), TimeUnit.MILLISECONDS) == -1) { + throw new Exception("None time-based UUIDs are not supported"); + } + + int launchingCount = addRequest(programRunId, programOptions, programDescriptor); + int runningCount = getFlowControlCounter(false, true).getRunningCount(); + + LOG.info( + "Counter has {} concurrent launching and {} running programs.", + launchingCount, + runningCount); + return new Counter(launchingCount, runningCount); + } + + /** + * Get total number of launching and running programs. + * + * @return Counter with total number of launching and running program runs. + */ + public Counter getCount() { + return getFlowControlCounter(true, true); + } + + /** + * Add a new in-flight launch request. + * + * @param programRunId run id associated with the launch request + * + * @return Returns the count of launching programs. + */ + public int addRequest(ProgramRunId programRunId, ProgramOptions programOptions, + ProgramDescriptor programDescriptor) { + int launchingCount = TransactionRunners.run(transactionRunner, context -> { + AppMetadataStore store = AppMetadataStore.create(context); + store.recordProgramPending(programRunId, + programOptions.getArguments().asMap(), + programOptions.getUserArguments().asMap(), + programDescriptor.getArtifactId().toApiArtifactId()); + return store.getLaunchingCount(CONTROL_FLOW_PROGRAM_TYPES, null); + }); + emitMetrics(Constants.Metrics.FlowControl.LAUNCHING_COUNT, launchingCount); + LOG.info("Added request with runId {}.", programRunId); + return launchingCount; + } + + private Counter getFlowControlCounter(boolean getLaunching, boolean getRunning) { + return TransactionRunners.run(transactionRunner, context -> { + AppMetadataStore store = AppMetadataStore.create(context); + return new Counter( + getLaunching ? store.getLaunchingCount(CONTROL_FLOW_PROGRAM_TYPES, null) : null, + getRunning ? store.getRunningCount(CONTROL_FLOW_PROGRAM_TYPES, null) : null + ); + }); + } + + public void emitFlowControlMetrics(boolean emitLaunching, boolean emitRunning) { + Counter counter = getFlowControlCounter(emitLaunching, emitRunning); + if (emitLaunching) { + emitMetrics(Constants.Metrics.FlowControl.LAUNCHING_COUNT, counter.getLaunchingCount()); + } + if (emitRunning) { + emitMetrics(Constants.Metrics.FlowControl.RUNNING_COUNT, counter.getRunningCount()); + } + } + + private void emitMetrics(String metricName, long value) { + LOG.trace("Setting metric {} to value {}", metricName, value); + metricsCollectionService.getContext(Collections.emptyMap()).gauge(metricName, value); + } + + /** + * Counts the concurrent program runs. + */ + public class Counter { + + /** + * Total number of launch requests that have been accepted but still missing in metadata store + + * * total number of run records with {@link ProgramRunStatus#PENDING} status + total number of + * run records with {@link ProgramRunStatus#STARTING} status. + */ + private final Integer launchingCount; + + /** + * Total number of run records with {@link ProgramRunStatus#RUNNING} status + Total number of run + * records with {@link ProgramRunStatus#SUSPENDED} status + Total number of run records with + * {@link ProgramRunStatus#RESUMING} status. + */ + private final Integer runningCount; + + Counter(Integer launchingCount, Integer runningCount) { + this.launchingCount = launchingCount; + this.runningCount = runningCount; + } + + public Integer getLaunchingCount() { + return launchingCount; + } + + public Integer getRunningCount() { + return runningCount; + } + } +} diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/ProgramLifecycleService.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/ProgramLifecycleService.java index 8bd33edccd1..e1c2e26f0bf 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/ProgramLifecycleService.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/ProgramLifecycleService.java @@ -149,7 +149,7 @@ public class ProgramLifecycleService { private final int defaultStopTimeoutSecs; private final int batchSize; private final ArtifactRepository artifactRepository; - private final RunRecordMonitorService runRecordMonitorService; + private final FlowControlService flowControlService; private final boolean userProgramLaunchDisabled; @Inject @@ -161,7 +161,7 @@ public class ProgramLifecycleService { ProvisionerNotifier provisionerNotifier, ProvisioningService provisioningService, ProgramStateWriter programStateWriter, CapabilityReader capabilityReader, ArtifactRepository artifactRepository, - RunRecordMonitorService runRecordMonitorService) { + FlowControlService flowControlService) { this.maxConcurrentRuns = cConf.getInt(Constants.AppFabric.MAX_CONCURRENT_RUNS); this.maxConcurrentLaunching = cConf.getInt(Constants.AppFabric.MAX_CONCURRENT_LAUNCHING); this.defaultStopTimeoutSecs = cConf.getInt(Constants.AppFabric.PROGRAM_MAX_STOP_SECONDS); @@ -180,7 +180,7 @@ public class ProgramLifecycleService { this.programStateWriter = programStateWriter; this.capabilityReader = capabilityReader; this.artifactRepository = artifactRepository; - this.runRecordMonitorService = runRecordMonitorService; + this.flowControlService = flowControlService; } /** @@ -730,8 +730,8 @@ public RunId runInternal(ProgramId programId, Map userArgs, checkCapability(programDescriptor); ProgramRunId programRunId = programId.run(runId); - RunRecordMonitorService.Counter counter = runRecordMonitorService.addRequestAndGetCount( - programRunId); + FlowControlService.Counter counter = flowControlService.addRequestAndGetCount( + programRunId, programOptions, programDescriptor); boolean done = false; try { @@ -765,7 +765,7 @@ public RunId runInternal(ProgramId programId, Map userArgs, done = true; } finally { if (!done) { - runRecordMonitorService.removeRequest(programRunId, false); + flowControlService.emitFlowControlMetrics(true, false); } } diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/ProgramNotificationSubscriberService.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/ProgramNotificationSubscriberService.java index 88fe9dc5ff0..f8de7b9cac6 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/ProgramNotificationSubscriberService.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/ProgramNotificationSubscriberService.java @@ -112,7 +112,7 @@ public class ProgramNotificationSubscriberService extends AbstractIdleService { private final ProgramStateWriter programStateWriter; private final TransactionRunner transactionRunner; private final Store store; - private final RunRecordMonitorService runRecordMonitorService; + private final FlowControlService flowControlService; private Service delegate; private Set programCompletionNotifiers; @@ -127,7 +127,7 @@ public class ProgramNotificationSubscriberService extends AbstractIdleService { ProgramStateWriter programStateWriter, TransactionRunner transactionRunner, Store store, - RunRecordMonitorService runRecordMonitorService) { + FlowControlService flowControlService) { this.messagingService = messagingService; this.cConf = cConf; @@ -138,7 +138,7 @@ public class ProgramNotificationSubscriberService extends AbstractIdleService { this.programStateWriter = programStateWriter; this.transactionRunner = transactionRunner; this.store = store; - this.runRecordMonitorService = runRecordMonitorService; + this.flowControlService = flowControlService; this.programCompletionNotifiers = Collections.emptySet(); } @@ -163,8 +163,7 @@ protected void startUp() throws Exception { } private void emitFlowControlMetrics() { - runRecordMonitorService.emitLaunchingMetrics(); - runRecordMonitorService.emitRunningMetrics(); + flowControlService.emitFlowControlMetrics(true, true); } private void restoreActiveRuns() { @@ -184,23 +183,26 @@ private void restoreActiveRuns() { } try { LOG.info("Found active run: {}", runRecordDetail.getProgramRunId()); + ProgramOptions programOptions = + new SimpleProgramOptions( + runRecordDetail.getProgramRunId().getParent(), + new BasicArguments(runRecordDetail.getSystemArgs()), + new BasicArguments(runRecordDetail.getUserArgs())); + ProgramDescriptor programDescriptor = this.store.loadProgram( + runRecordDetail.getProgramRunId().getParent()); if (runRecordDetail.getStatus() == ProgramRunStatus.PENDING) { - runRecordMonitorService.addRequest(runRecordDetail.getProgramRunId()); + flowControlService.emitFlowControlMetrics(true, false); } else if (runRecordDetail.getStatus() == ProgramRunStatus.STARTING) { - runRecordMonitorService.addRequest(runRecordDetail.getProgramRunId()); + flowControlService.addRequest(runRecordDetail.getProgramRunId(), + programOptions, programDescriptor); // It is unknown what is the state of program runs in STARTING state. // A STARTING message is published again to retry STARTING logic. - ProgramOptions programOptions = - new SimpleProgramOptions( - runRecordDetail.getProgramRunId().getParent(), - new BasicArguments(runRecordDetail.getSystemArgs()), - new BasicArguments(runRecordDetail.getUserArgs())); LOG.debug("Retrying to start run {}.", runRecordDetail.getProgramRunId()); programStateWriter.start( runRecordDetail.getProgramRunId(), programOptions, null, - this.store.loadProgram(runRecordDetail.getProgramRunId().getParent())); + programDescriptor); } } catch (Exception e) { ProgramRunId programRunId = runRecordDetail.getProgramRunId(); @@ -234,7 +236,7 @@ private ProgramNotificationSingleTopicSubscriberService createChildService( provisioningService, programStateWriter, transactionRunner, - runRecordMonitorService, + flowControlService, name, topicName, programCompletionNotifiers); @@ -275,7 +277,7 @@ class ProgramNotificationSingleTopicSubscriberService private final Queue tasks; private final MetricsCollectionService metricsCollectionService; private Set programCompletionNotifiers; - private final RunRecordMonitorService runRecordMonitorService; + private final FlowControlService flowControlService; private final boolean checkTxSeparation; ProgramNotificationSingleTopicSubscriberService( @@ -287,7 +289,7 @@ class ProgramNotificationSingleTopicSubscriberService ProvisioningService provisioningService, ProgramStateWriter programStateWriter, TransactionRunner transactionRunner, - RunRecordMonitorService runRecordMonitorService, + FlowControlService flowControlService, String name, String topicName, Set programCompletionNotifiers) { @@ -310,7 +312,7 @@ class ProgramNotificationSingleTopicSubscriberService this.tasks = new LinkedList<>(); this.metricsCollectionService = metricsCollectionService; this.programCompletionNotifiers = programCompletionNotifiers; - this.runRecordMonitorService = runRecordMonitorService; + this.flowControlService = flowControlService; // If number of partitions equals 1, DB deadlock cannot happen as a result of concurrent // modifications to @@ -582,7 +584,7 @@ private void handleProgramEvent( appMetadataStore.recordProgramRunning( programRunId, logicalStartTimeSecs, twillRunId, messageIdBytes); writeToHeartBeatTable(recordedRunRecord, logicalStartTimeSecs, programHeartbeatTable); - runRecordMonitorService.removeRequest(programRunId, true); + flowControlService.emitFlowControlMetrics(true, true); long startDelayTime = logicalStartTimeSecs - RunIds.getTime(programRunId.getRun(), TimeUnit.SECONDS); emitStartingTimeMetric(programRunId, startDelayTime, recordedRunRecord); @@ -660,7 +662,7 @@ private void handleProgramEvent( Constants.Metrics.Program.PROGRAM_REJECTED_RUNS, null) .ifPresent(runnables::add); - runRecordMonitorService.removeRequest(programRunId, true); + flowControlService.emitFlowControlMetrics(true, true); break; default: // This should not happen @@ -774,7 +776,7 @@ private RunRecordDetail handleProgramCompletion( programCompletionNotifiers.forEach( notifier -> notifier.onProgramCompleted(programRunId, recordedRunRecord.getStatus())); - runRecordMonitorService.removeRequest(programRunId, true); + flowControlService.emitFlowControlMetrics(true, true); }); } return recordedRunRecord; diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/RunRecordMonitorService.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/RunRecordMonitorService.java deleted file mode 100644 index 89c2fb0c8a9..00000000000 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/RunRecordMonitorService.java +++ /dev/null @@ -1,313 +0,0 @@ -/* - * Copyright © 2022 Cask Data, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package io.cdap.cdap.internal.app.services; - -import com.google.common.util.concurrent.AbstractScheduledService; -import com.google.inject.Inject; -import io.cdap.cdap.api.metrics.MetricsCollectionService; -import io.cdap.cdap.app.runtime.ProgramRuntimeService; -import io.cdap.cdap.common.app.RunIds; -import io.cdap.cdap.common.conf.CConfiguration; -import io.cdap.cdap.common.conf.Constants; -import io.cdap.cdap.common.conf.Constants.Metrics.FlowControl; -import io.cdap.cdap.proto.ProgramRunStatus; -import io.cdap.cdap.proto.ProgramType; -import io.cdap.cdap.proto.id.ProgramRunId; -import java.util.Collections; -import java.util.Comparator; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Executors; -import java.util.concurrent.PriorityBlockingQueue; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import org.apache.twill.common.Threads; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Maintain and return total number of launching and running run-records. This class is used by - * flow-control mechanism for launch requests. It also has a cleanup mechanism to automatically - * remove old (i.e., configurable) entries from the counter as a safe-guard mechanism. - */ -public class RunRecordMonitorService extends AbstractScheduledService { - - private static final Logger LOG = LoggerFactory.getLogger(RunRecordMonitorService.class); - - /** - * Contains ProgramRunIds of runs that have been accepted, but have not been added to metadata - * store plus all run records with {@link ProgramRunStatus#PENDING} or {@link - * ProgramRunStatus#STARTING} status. - */ - private final BlockingQueue launchingQueue; - - private final ProgramRuntimeService runtimeService; - private final long ageThresholdSec; - private final CConfiguration cConf; - private final MetricsCollectionService metricsCollectionService; - private final int maxConcurrentRuns; - private ScheduledExecutorService executor; - - /** - * Tracks the program runs. - * - * @param cConf configuration - * @param runtimeService service to get info on programs - * @param metricsCollectionService collect metrics - */ - @Inject - public RunRecordMonitorService( - CConfiguration cConf, - ProgramRuntimeService runtimeService, - MetricsCollectionService metricsCollectionService) { - this.cConf = cConf; - this.runtimeService = runtimeService; - this.metricsCollectionService = metricsCollectionService; - - this.launchingQueue = - new PriorityBlockingQueue<>( - 128, Comparator.comparingLong(o -> RunIds.getTime(o.getRun(), TimeUnit.MILLISECONDS))); - this.ageThresholdSec = cConf.getLong(Constants.AppFabric.MONITOR_RECORD_AGE_THRESHOLD_SECONDS); - this.maxConcurrentRuns = cConf.getInt(Constants.AppFabric.MAX_CONCURRENT_RUNS); - } - - @Override - protected void startUp() throws Exception { - LOG.info("RunRecordMonitorService started."); - } - - @Override - protected void shutDown() throws Exception { - if (executor != null) { - executor.shutdownNow(); - } - LOG.info("RunRecordMonitorService successfully shut down."); - } - - @Override - protected void runOneIteration() throws Exception { - cleanupQueue(); - } - - @Override - protected Scheduler scheduler() { - return Scheduler.newFixedRateSchedule( - 0, cConf.getInt(Constants.AppFabric.MONITOR_CLEANUP_INTERVAL_SECONDS), TimeUnit.SECONDS); - } - - @Override - protected final ScheduledExecutorService executor() { - executor = - Executors.newSingleThreadScheduledExecutor( - Threads.createDaemonThreadFactory("run-record-monitor-service-cleanup-scheduler")); - return executor; - } - - /** - * Add a new in-flight launch request and return total number of launching and running programs. - * - * @param programRunId run id associated with the launch request - * @return total number of launching and running program runs. - */ - public Counter addRequestAndGetCount(ProgramRunId programRunId) throws Exception { - if (RunIds.getTime(programRunId.getRun(), TimeUnit.MILLISECONDS) == -1) { - throw new Exception("None time-based UUIDs are not supported"); - } - - int launchingCount = addRequest(programRunId); - int runningCount = getProgramsRunningCount(); - - LOG.info( - "Counter has {} concurrent launching and {} running programs.", - launchingCount, - runningCount); - return new Counter(launchingCount, runningCount); - } - - /** - * Get imprecise (due to data races) total number of launching and running programs. - * - * @return total number of launching and running program runs. - */ - public Counter getCount() { - int launchingCount = launchingQueue.size(); - int runningCount = getProgramsRunningCount(); - - return new Counter(launchingCount, runningCount); - } - - /** - * Add a new in-flight launch request. - * - * @param programRunId run id associated with the launch request - */ - public int addRequest(ProgramRunId programRunId) { - int result; - synchronized (launchingQueue) { - launchingQueue.add(programRunId); - result = launchingQueue.size(); - } - emitMetrics(Constants.Metrics.FlowControl.LAUNCHING_COUNT, result); - LOG.info("Added request with runId {}.", programRunId); - return result; - } - - /** - * Remove the request with the provided programRunId when the request is no longer launching. - * I.e., not in-flight, not in {@link ProgramRunStatus#PENDING} and not in {@link - * ProgramRunStatus#STARTING} - * - * @param programRunId of the request to be removed from launching queue. - * @param emitRunningChange if true, also updates {@link - * Constants.Metrics.FlowControl#RUNNING_COUNT} - */ - public void removeRequest(ProgramRunId programRunId, boolean emitRunningChange) { - if (launchingQueue.remove(programRunId)) { - LOG.info( - "Removed request with runId {}. Counter has {} concurrent launching requests.", - programRunId, - launchingQueue.size()); - emitMetrics(Constants.Metrics.FlowControl.LAUNCHING_COUNT, launchingQueue.size()); - } - - if (emitRunningChange) { - emitRunningMetrics(); - } - } - - public void emitLaunchingMetrics(long value) { - emitMetrics(Constants.Metrics.FlowControl.LAUNCHING_COUNT, value); - } - - /** - * Emit the {@link Constants.Metrics.FlowControl#LAUNCHING_COUNT} metric for runs. - */ - public void emitLaunchingMetrics() { - emitMetrics(Constants.Metrics.FlowControl.LAUNCHING_COUNT, launchingQueue.size()); - } - - - /** - * Emit the {@link Constants.Metrics.FlowControl#RUNNING_COUNT} metric for runs. - */ - public void emitRunningMetrics() { - emitMetrics(FlowControl.RUNNING_COUNT, getProgramsRunningCount()); - } - - private void emitMetrics(String metricName, long value) { - LOG.trace("Setting metric {} to value {}", metricName, value); - metricsCollectionService.getContext(Collections.emptyMap()).gauge(metricName, value); - } - - private void cleanupQueue() { - while (true) { - ProgramRunId programRunId = launchingQueue.peek(); - if (programRunId == null - || RunIds.getTime(programRunId.getRun(), TimeUnit.MILLISECONDS) + (ageThresholdSec * 1000) - >= System.currentTimeMillis()) { - // Queue is empty or queue head has not expired yet. - break; - } - // Queue head might have already been removed. So instead of calling poll, we call remove. - if (launchingQueue.remove(programRunId)) { - LOG.info("Removing request with runId {} due to expired retention time.", programRunId); - } - } - // Always emit both metrics after cleanup. - emitLaunchingMetrics(); - emitRunningMetrics(); - } - - /** - * Returns the total number of programs in running state. The count includes batch (i.e., {@link - * ProgramType#WORKFLOW}), streaming (i.e., {@link ProgramType#SPARK}) with no parent and - * replication (i.e., {@link ProgramType#WORKER}) jobs. - */ - private int getProgramsRunningCount() { - List list = - runtimeService.listAll( - ProgramType.WORKFLOW, ProgramType.WORKER, ProgramType.SPARK, ProgramType.MAPREDUCE); - - int launchingCount = launchingQueue.size(); - - // We use program controllers (instead of querying metadata store) to count the total number of - // programs in running state. - // A program controller is created when a launch request is in the middle of starting state. - // Therefore, the returning running count is NOT precise. - int impreciseRunningCount = - (int) list.stream() - .filter(r -> isRunning(r.getController().getState().getRunStatus())) - .count(); - - if (maxConcurrentRuns < 0 || (launchingCount + impreciseRunningCount < maxConcurrentRuns)) { - // It is safe to return the imprecise value since either flow control for runs is disabled - // (i.e., -1) or flow control will not reject an incoming request yet. - return impreciseRunningCount; - } - - // Flow control is at the threshold. We return the precise count. - return (int) list.stream() - .filter( - r -> - isRunning(r.getController().getState().getRunStatus()) - && !launchingQueue.contains(r.getController().getProgramRunId())) - .count(); - } - - private boolean isRunning(ProgramRunStatus status) { - if (status == ProgramRunStatus.RUNNING - || status == ProgramRunStatus.SUSPENDED - || status == ProgramRunStatus.RESUMING) { - return true; - } - - return false; - } - - /** - * Counts the concurrent program runs. - */ - public class Counter { - - /** - * Total number of launch requests that have been accepted but still missing in metadata store + - * * total number of run records with {@link ProgramRunStatus#PENDING} status + total number of - * run records with {@link ProgramRunStatus#STARTING} status. - */ - private final int launchingCount; - - /** - * Total number of run records with {@link ProgramRunStatus#RUNNING} status + Total number of run - * records with {@link ProgramRunStatus#SUSPENDED} status + Total number of run records with - * {@link ProgramRunStatus#RESUMING} status. - */ - private final int runningCount; - - Counter(int launchingCount, int runningCount) { - this.launchingCount = launchingCount; - this.runningCount = runningCount; - } - - public int getLaunchingCount() { - return launchingCount; - } - - public int getRunningCount() { - return runningCount; - } - } -} diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/store/AppMetadataStore.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/store/AppMetadataStore.java index c8d5e1def03..ddef7c1007b 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/store/AppMetadataStore.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/store/AppMetadataStore.java @@ -905,6 +905,54 @@ private void addWorkflowNodeState(ProgramRunId programRunId, Map } } + /** + * Record that the program run is pending run. + * + * @param programRunId program run + * + * @return {@link RunRecordDetail} with status Pending. + */ + @Nullable + public RunRecordDetail recordProgramPending(ProgramRunId programRunId, Map runtimeArgs, + Map systemArgs, @Nullable ArtifactId artifactId) + throws IOException { + long startTs = RunIds.getTime(programRunId.getRun(), TimeUnit.SECONDS); + if (startTs == -1L) { + throw new IllegalArgumentException(String.format( + "Provisioning state for program run '%s' does not have a timestamp in the run id", programRunId)); + } + + Optional profileId = SystemArguments.getProfileIdFromArgs( + programRunId.getNamespaceId(), systemArgs); + RunRecordDetail existing = getRun(programRunId); + + // If for some reason, there is an existing run record then return null. + if (existing != null) { + LOG.error( + "Ignoring unexpected request to record pending state for program run {} that has an " + + "existing run record in run state {} and cluster state {}.", + programRunId, existing.getStatus()); + return null; + } + + ProgramRunCluster cluster = new ProgramRunCluster(ProgramRunClusterStatus.PROVISIONING, null, null); + RunRecordDetail meta = RunRecordDetail.builder() + .setProgramRunId(programRunId) + .setStartTime(startTs) + .setProperties(getRecordProperties(systemArgs, runtimeArgs)) + .setSystemArgs(systemArgs) + .setCluster(cluster) + .setStatus(ProgramRunStatus.PENDING) + .setPeerName(systemArgs.get(ProgramOptionConstants.PEER_NAME)) + .setArtifactId(artifactId) + .setPrincipal(systemArgs.get(ProgramOptionConstants.PRINCIPAL)) + .setProfileId(profileId.orElse(null)) + .build(); + writeNewRunRecord(meta, TYPE_RUN_RECORD_ACTIVE); + LOG.trace("Recorded {} for program {}", ProgramRunStatus.PENDING, programRunId); + return meta; + } + /** * Record that the program run is provisioning compute resources for the run. If the current * status has a higher source id, this call will be ignored. @@ -927,25 +975,6 @@ public RunRecordDetail recordProgramProvisioning(ProgramRunId programRunId, Map systemArgs, byte[] sourceId, @Nullable ArtifactId artifactId) throws IOException { - long startTs = RunIds.getTime(programRunId.getRun(), TimeUnit.SECONDS); - if (startTs == -1L) { - LOG.error( - "Ignoring unexpected request to record provisioning state for program run {} that does not have " - - + "a timestamp in the run id.", programRunId); - return null; - } - - RunRecordDetail existing = getRun(programRunId); - // for some reason, there is an existing run record. - if (existing != null) { - LOG.error( - "Ignoring unexpected request to record provisioning state for program run {} that has an existing " - + "run record in run state {} and cluster state {}.", - programRunId, existing.getStatus(), existing.getCluster().getStatus()); - return null; - } - Optional profileId = SystemArguments.getProfileIdFromArgs( programRunId.getNamespaceId(), systemArgs); if (!profileId.isPresent()) { @@ -957,20 +986,59 @@ public RunRecordDetail recordProgramProvisioning(ProgramRunId programRunId, ProgramRunCluster cluster = new ProgramRunCluster(ProgramRunClusterStatus.PROVISIONING, null, null); - RunRecordDetail meta = RunRecordDetail.builder() - .setProgramRunId(programRunId) - .setStartTime(startTs) - .setStatus(ProgramRunStatus.PENDING) - .setProperties(getRecordProperties(systemArgs, runtimeArgs)) - .setSystemArgs(systemArgs) - .setCluster(cluster) - .setProfileId(profileId.get()) - .setPeerName(systemArgs.get(ProgramOptionConstants.PEER_NAME)) - .setSourceId(sourceId) - .setArtifactId(artifactId) - .setPrincipal(systemArgs.get(ProgramOptionConstants.PRINCIPAL)) - .build(); - writeNewRunRecord(meta, TYPE_RUN_RECORD_ACTIVE); + + RunRecordDetail existing = getRun(programRunId); + RunRecordDetail meta; + if (existing == null) { + // Create a new run record if it doesn't exist. + long startTs = RunIds.getTime(programRunId.getRun(), TimeUnit.SECONDS); + if (startTs == -1L) { + LOG.error( + "Ignoring unexpected request to record provisioning state for program run {} that does not have " + + + "a timestamp in the run id.", programRunId); + return null; + } + meta = RunRecordDetail.builder() + .setProgramRunId(programRunId) + .setStartTime(startTs) + .setStatus(ProgramRunStatus.PENDING) + .setProperties(getRecordProperties(systemArgs, runtimeArgs)) + .setSystemArgs(systemArgs) + .setCluster(cluster) + .setProfileId(profileId.get()) + .setPeerName(systemArgs.get(ProgramOptionConstants.PEER_NAME)) + .setSourceId(sourceId) + .setArtifactId(artifactId) + .setPrincipal(systemArgs.get(ProgramOptionConstants.PRINCIPAL)) + .build(); + writeNewRunRecord(meta, TYPE_RUN_RECORD_ACTIVE); + } else { + if (existing.getStatus() != ProgramRunStatus.PENDING && existing.getStatus() != ProgramRunStatus.SUSPENDED) { + LOG.error( + "Ignoring unexpected request to record provisioning state for program run {} that has " + + "a status in end state {}.", programRunId, existing.getStatus()); + return null; + } + delete(existing); + meta = RunRecordDetail.builder(existing) + .setStatus(ProgramRunStatus.PENDING) + .setProperties(getRecordProperties(systemArgs, runtimeArgs)) + .setSystemArgs(systemArgs) + .setCluster(cluster) + .setProfileId(profileId.get()) + .setPeerName(systemArgs.get(ProgramOptionConstants.PEER_NAME)) + .setSourceId(sourceId) + .setArtifactId(artifactId) + .setPrincipal(systemArgs.get(ProgramOptionConstants.PRINCIPAL)) + .build(); + List> key = getProgramRunInvertedTimeKey(TYPE_RUN_RECORD_ACTIVE, + existing.getProgramRunId(), + existing.getStartTs()); + writeToStructuredTableWithPrimaryKeys( + key, meta, getRunRecordsTable(), StoreDefinition.AppMetadataStore.RUN_RECORD_DATA); + } + LOG.trace("Recorded {} for program {}", ProgramRunClusterStatus.PROVISIONING, programRunId); return meta; } @@ -1179,31 +1247,21 @@ public RunRecordDetail recordProgramRejected(ProgramRunId programRunId, Map runtimeArgs, Map systemArgs, byte[] sourceId, @Nullable ArtifactId artifactId) throws IOException { - long startTs = RunIds.getTime(programRunId.getRun(), TimeUnit.SECONDS); - if (startTs == -1L) { - LOG.error( - "Ignoring unexpected request to record provisioning state for program run {} that does not have " - - + "a timestamp in the run id.", programRunId); - return null; - } - RunRecordDetail existing = getRun(programRunId); - // for some reason, there is an existing run record? - if (existing != null) { + + // Return null if for some reason the run record doesn't exist. + if (existing == null) { LOG.error( - "Ignoring unexpected request to record rejected state for program run {} that has an existing " - + "run record in run state {} and cluster state {}.", - programRunId, existing.getStatus(), existing.getCluster().getStatus()); + "Ignoring unexpected request to record rejected state for program run {} that has no existing run record.", + programRunId); return null; } + delete(existing); Optional profileId = SystemArguments.getProfileIdFromArgs( programRunId.getNamespaceId(), systemArgs); - RunRecordDetail meta = RunRecordDetail.builder() - .setProgramRunId(programRunId) - .setStartTime(startTs) - .setStopTime(startTs) // rejected: stop time == start time + RunRecordDetail meta = RunRecordDetail.builder(existing) + .setStopTime(existing.getStartTs()) .setStatus(ProgramRunStatus.REJECTED) .setProperties(getRecordProperties(systemArgs, runtimeArgs)) .setSystemArgs(systemArgs) @@ -1214,7 +1272,11 @@ public RunRecordDetail recordProgramRejected(ProgramRunId programRunId, .setPrincipal(systemArgs.get(ProgramOptionConstants.PRINCIPAL)) .build(); - writeNewRunRecord(meta, TYPE_RUN_RECORD_COMPLETED); + List> key = getProgramRunInvertedTimeKey(TYPE_RUN_RECORD_COMPLETED, + meta.getProgramRunId(), + meta.getStartTs()); + writeToStructuredTableWithPrimaryKeys( + key, meta, getRunRecordsTable(), StoreDefinition.AppMetadataStore.RUN_RECORD_DATA); LOG.trace("Recorded {} for program {}", ProgramRunStatus.REJECTED, programRunId); return meta; } @@ -1643,6 +1705,51 @@ public int countActiveRuns(@Nullable Integer limit) throws IOException { return count.get(); } + /** + * Count all records in launching state. + * + * @param programTypes set of program types to filter. + * @param limit count at most that many runs, stop if there are more. + */ + public int getLaunchingCount(Set programTypes, @Nullable Integer limit) throws IOException { + AtomicInteger count = new AtomicInteger(0); + try (CloseableIterator iterator = queryProgramRuns( + Range.singleton(getRunRecordNamespacePrefix(TYPE_RUN_RECORD_ACTIVE, null)), + key -> !NamespaceId.SYSTEM.getNamespace() + .equals(key.getString(StoreDefinition.AppMetadataStore.NAMESPACE_FIELD)), + runRecordDetail -> { + return (runRecordDetail.getStatus().equals(ProgramRunStatus.PENDING) + || runRecordDetail.getStatus().equals(ProgramRunStatus.STARTING)) + && !runRecordDetail.getSystemArgs().containsKey(ProgramOptionConstants.WORKFLOW_NAME) + && programTypes.contains(runRecordDetail.getProgramRunId().getType()); + }, + limit != null ? limit : Integer.MAX_VALUE)) { + iterator.forEachRemaining(m -> count.getAndIncrement()); + } + return count.get(); + } + + /** + * Count all records in running state. + * + * @param programTypes set of program types to filter. + * @param limit count at most that many runs, stop if there are more. + */ + public int getRunningCount(Set programTypes, @Nullable Integer limit) throws IOException { + AtomicInteger count = new AtomicInteger(0); + try (CloseableIterator iterator = queryProgramRuns( + Range.singleton(getRunRecordNamespacePrefix(TYPE_RUN_RECORD_ACTIVE, null)), + key -> !NamespaceId.SYSTEM.getNamespace() + .equals(key.getString(StoreDefinition.AppMetadataStore.NAMESPACE_FIELD)), + runRecordDetail -> runRecordDetail.getStatus().equals(ProgramRunStatus.RUNNING) + && !runRecordDetail.getSystemArgs().containsKey(ProgramOptionConstants.WORKFLOW_NAME) + && programTypes.contains(runRecordDetail.getProgramRunId().getType()), + limit != null ? limit : Integer.MAX_VALUE)) { + iterator.forEachRemaining(m -> count.getAndIncrement()); + } + return count.get(); + } + /** * Scans active runs, starting from the given cursor. * diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/events/StartProgramEventSubscriber.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/events/StartProgramEventSubscriber.java index e8850209518..47627c540f2 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/events/StartProgramEventSubscriber.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/events/StartProgramEventSubscriber.java @@ -21,7 +21,7 @@ import io.cdap.cdap.common.conf.CConfiguration; import io.cdap.cdap.common.conf.Constants; import io.cdap.cdap.internal.app.services.ProgramLifecycleService; -import io.cdap.cdap.internal.app.services.RunRecordMonitorService; +import io.cdap.cdap.internal.app.services.FlowControlService; import io.cdap.cdap.proto.ProgramType; import io.cdap.cdap.proto.id.ProgramReference; import io.cdap.cdap.proto.id.ProgramRunId; @@ -54,7 +54,7 @@ public class StartProgramEventSubscriber extends EventSubscriber { private final CConfiguration cConf; private final EventReaderProvider extensionProvider; private final ProgramLifecycleService lifecycleService; - private final RunRecordMonitorService runRecordMonitorService; + private final FlowControlService flowControlService; private ScheduledExecutorService executor; private Collection> readers; private ExecutorService threadPoolExecutor; @@ -66,17 +66,17 @@ public class StartProgramEventSubscriber extends EventSubscriber { * @param cConf CDAP configuration * @param extensionProvider eventReaderProvider for StartProgramEvent Readers * @param lifecycleService to publish start programs to TMS - * @param runRecordMonitorService basic flow-control + * @param flowControlService basic flow-control */ @Inject StartProgramEventSubscriber(CConfiguration cConf, EventReaderProvider extensionProvider, ProgramLifecycleService lifecycleService, - RunRecordMonitorService runRecordMonitorService) { + FlowControlService flowControlService) { this.cConf = cConf; this.extensionProvider = extensionProvider; this.lifecycleService = lifecycleService; - this.runRecordMonitorService = runRecordMonitorService; + this.flowControlService = flowControlService; maxConcurrentRuns = -1; } @@ -132,14 +132,14 @@ protected void runOneIteration() throws Exception { if (threadPoolExecutor != null) { for (EventReader reader : readers) { threadPoolExecutor.execute(() -> { - if (runRecordMonitorService.isRunning()) { + if (flowControlService.isRunning()) { // Only attempt to process event if there is no max or the current count is less than max if (hasNominalCapacity()) { processEvents(reader); } } else { - LOG.warn("RunRecordMonitorService not yet running, currently in state: {}." - + " Status will be checked again in next attempt.", runRecordMonitorService.state()); + LOG.warn("FlowControlService not yet running, currently in state: {}." + + " Status will be checked again in next attempt.", flowControlService.state()); } }); } @@ -153,7 +153,7 @@ protected void runOneIteration() throws Exception { */ @VisibleForTesting boolean hasNominalCapacity() { - RunRecordMonitorService.Counter counter = runRecordMonitorService.getCount(); + FlowControlService.Counter counter = flowControlService.getCount(); // no limit if (maxConcurrentRuns <= 0) { return true; diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/ProgramNotificationSubscriberServiceTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/ProgramNotificationSubscriberServiceTest.java index 6c2a9cfd6f5..51a004cf884 100644 --- a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/ProgramNotificationSubscriberServiceTest.java +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/ProgramNotificationSubscriberServiceTest.java @@ -361,7 +361,7 @@ public void testLaunchingCountMetricsOnRestart() throws Exception { // terminate the main service. notificationService.shutDown(); notificationService.startUp(); - // Running counts are not based on metadata store in RunRecordMonitorService so not asserting it + // Running counts are not based on metadata store in FlowControlService so not asserting it // here. Tasks.waitFor(0L, () -> queryMetrics(metricStore, SYSTEM_METRIC_PREFIX + FlowControl.LAUNCHING_COUNT, new HashMap<>()), 10, TimeUnit.SECONDS); diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/store/AppMetadataStoreTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/store/AppMetadataStoreTest.java index ab80eccd8eb..452243c759b 100644 --- a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/store/AppMetadataStoreTest.java +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/store/AppMetadataStoreTest.java @@ -861,7 +861,7 @@ public void testDuplicateWritesIgnored() throws Exception { byte[] sourceId = new byte[] { 0 }; TransactionRunners.run(transactionRunner, context -> { AppMetadataStore store = AppMetadataStore.create(context); - assertSecondCallIsNull(() -> store.recordProgramProvisioning(runId, null, SINGLETON_PROFILE_MAP, + Assert.assertNotNull(store.recordProgramProvisioning(runId, null, SINGLETON_PROFILE_MAP, sourceId, ARTIFACT_ID)); assertSecondCallIsNull(() -> store.recordProgramProvisioned(runId, 0, sourceId)); assertSecondCallIsNull(() -> store.recordProgramStart(runId, null, Collections.emptyMap(), sourceId)); diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/events/StartProgramEventSubscriberTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/events/StartProgramEventSubscriberTest.java index 83f59b7e16f..90b2653a755 100644 --- a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/events/StartProgramEventSubscriberTest.java +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/events/StartProgramEventSubscriberTest.java @@ -27,7 +27,7 @@ import io.cdap.cdap.common.conf.CConfiguration; import io.cdap.cdap.common.conf.Constants; import io.cdap.cdap.internal.app.services.ProgramLifecycleService; -import io.cdap.cdap.internal.app.services.RunRecordMonitorService; +import io.cdap.cdap.internal.app.services.FlowControlService; import io.cdap.cdap.internal.app.services.http.AppFabricTestBase; import io.cdap.cdap.internal.events.dummy.DummyEventReader; import io.cdap.cdap.internal.events.dummy.DummyEventReaderExtensionProvider; @@ -38,7 +38,6 @@ import java.util.Collection; import org.junit.Before; import org.junit.Test; -import org.mockito.Mock; import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,8 +48,8 @@ public class StartProgramEventSubscriberTest extends AppFabricTestBase { private static final Logger LOG = LoggerFactory.getLogger(StartProgramEventSubscriberTest.class); private ProgramLifecycleService lifecycleService; - private RunRecordMonitorService runRecordMonitorService; - private RunRecordMonitorService.Counter mockCounter; + private FlowControlService flowControlService; + private FlowControlService.Counter mockCounter; private CConfiguration cConf; private DummyEventReader eventReader; private Injector injector; @@ -59,16 +58,16 @@ public class StartProgramEventSubscriberTest extends AppFabricTestBase { @Before public void setup() { lifecycleService = Mockito.mock(ProgramLifecycleService.class); - runRecordMonitorService = Mockito.mock(RunRecordMonitorService.class); - mockCounter = Mockito.mock(RunRecordMonitorService.Counter.class); - Mockito.doReturn(mockCounter).when(runRecordMonitorService).getCount(); + flowControlService = Mockito.mock(FlowControlService.class); + mockCounter = Mockito.mock(FlowControlService.Counter.class); + Mockito.doReturn(mockCounter).when(flowControlService).getCount(); cConf = CConfiguration.create(); eventReader = new DummyEventReader<>(mockedEvents()); injector = Guice.createInjector(new AbstractModule() { @Override protected void configure() { bind(ProgramLifecycleService.class).toInstance(lifecycleService); - bind(RunRecordMonitorService.class).toInstance(runRecordMonitorService); + bind(FlowControlService.class).toInstance(flowControlService); bind(CConfiguration.class).toInstance(cConf); bind(new TypeLiteral>() { }) diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/app/RunIds.java b/cdap-common/src/main/java/io/cdap/cdap/common/app/RunIds.java index 3d4e107d09d..f2c661beba9 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/app/RunIds.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/app/RunIds.java @@ -49,7 +49,6 @@ * */ public final class RunIds { - private static final Random RANDOM = new Random(); // Number of 100ns intervals since 15 October 1582 00:00:000000000 until UNIX epoch diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java b/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java index 5cf9f9c2dc9..0997c442c55 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java @@ -284,10 +284,6 @@ public static final class AppFabric { public static final String PROGRAM_TRANSACTION_CONTROL = "app.program.transaction.control"; public static final String MAX_CONCURRENT_RUNS = "app.max.concurrent.runs"; public static final String MAX_CONCURRENT_LAUNCHING = "app.max.concurrent.launching"; - public static final String MONITOR_RECORD_AGE_THRESHOLD_SECONDS = - "run.record.monitor.record.age.threshold.seconds"; - public static final String MONITOR_CLEANUP_INTERVAL_SECONDS = - "run.record.monitor.cleanup.interval.seconds"; public static final String PROGRAM_LAUNCH_THREADS = "app.program.launch.threads"; public static final String PROGRAM_KILL_THREADS = "app.program.kill.threads"; public static final String RUN_DATA_CLEANUP_TTL_DAYS = "app.run.records.ttl.days"; diff --git a/cdap-common/src/main/java/io/cdap/cdap/internal/app/store/RunRecordDetail.java b/cdap-common/src/main/java/io/cdap/cdap/internal/app/store/RunRecordDetail.java index 4d70f44d2b5..c7ab594e859 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/internal/app/store/RunRecordDetail.java +++ b/cdap-common/src/main/java/io/cdap/cdap/internal/app/store/RunRecordDetail.java @@ -26,6 +26,7 @@ import io.cdap.cdap.proto.RunRecord; import io.cdap.cdap.proto.id.ProfileId; import io.cdap.cdap.proto.id.ProgramRunId; +import io.cdap.cdap.runtime.spi.provisioner.Cluster; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -291,7 +292,7 @@ public RunRecordDetail build() { if (programRunId == null) { throw new IllegalArgumentException("Run record run id must be specified."); } - if (sourceId == null) { + if (status != ProgramRunStatus.PENDING && sourceId == null) { throw new IllegalArgumentException("Run record source id must be specified."); } // we are not validating artifactId for null, diff --git a/cdap-common/src/main/resources/cdap-default.xml b/cdap-common/src/main/resources/cdap-default.xml index ecd3b4cb884..ef7bf8e81a4 100644 --- a/cdap-common/src/main/resources/cdap-default.xml +++ b/cdap-common/src/main/resources/cdap-default.xml @@ -575,31 +575,6 @@ - - run.record.monitor.record.age.threshold.seconds - 3600 - - Maximum amount of time (in seconds) in which a run record is retained in - run record monitor. - This is to safe guard launch requests flow-control such that if a request - is somehow stuck in PENDING/STARTING - state, it will be dropped from after the threshold. - Note that run.record.monitor.cleanup.interval.seconds might be needed to - changed if this config changes. - - - - - run.record.monitor.cleanup.interval.seconds - 60 - - Cleanup interval (in seconds) in which run record monitor service cleanup - logic runs to delete old entries. - Note that run.record.monitor.record.age.threshold.seconds might be needed - to changed if this config changes. - - - app.program.launch.threads 20