diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/RunRecordMonitorService.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/RunRecordMonitorService.java index 89c2fb0c8a9..862fe9b0547 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/RunRecordMonitorService.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/RunRecordMonitorService.java @@ -16,7 +16,7 @@ package io.cdap.cdap.internal.app.services; -import com.google.common.util.concurrent.AbstractScheduledService; +import com.google.common.util.concurrent.AbstractIdleService; import com.google.inject.Inject; import io.cdap.cdap.api.metrics.MetricsCollectionService; import io.cdap.cdap.app.runtime.ProgramRuntimeService; @@ -24,18 +24,15 @@ import io.cdap.cdap.common.conf.CConfiguration; import io.cdap.cdap.common.conf.Constants; import io.cdap.cdap.common.conf.Constants.Metrics.FlowControl; +import io.cdap.cdap.internal.app.store.AppMetadataStore; import io.cdap.cdap.proto.ProgramRunStatus; import io.cdap.cdap.proto.ProgramType; import io.cdap.cdap.proto.id.ProgramRunId; +import io.cdap.cdap.spi.data.transaction.TransactionRunner; +import io.cdap.cdap.spi.data.transaction.TransactionRunners; import java.util.Collections; -import java.util.Comparator; import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Executors; -import java.util.concurrent.PriorityBlockingQueue; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import org.apache.twill.common.Threads; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,23 +41,13 @@ * flow-control mechanism for launch requests. It also has a cleanup mechanism to automatically * remove old (i.e., configurable) entries from the counter as a safe-guard mechanism. */ -public class RunRecordMonitorService extends AbstractScheduledService { +public class RunRecordMonitorService extends AbstractIdleService { private static final Logger LOG = LoggerFactory.getLogger(RunRecordMonitorService.class); - - /** - * Contains ProgramRunIds of runs that have been accepted, but have not been added to metadata - * store plus all run records with {@link ProgramRunStatus#PENDING} or {@link - * ProgramRunStatus#STARTING} status. - */ - private final BlockingQueue launchingQueue; - private final ProgramRuntimeService runtimeService; - private final long ageThresholdSec; - private final CConfiguration cConf; private final MetricsCollectionService metricsCollectionService; private final int maxConcurrentRuns; - private ScheduledExecutorService executor; + private final TransactionRunner transactionRunner; /** * Tracks the program runs. @@ -73,16 +60,12 @@ public class RunRecordMonitorService extends AbstractScheduledService { public RunRecordMonitorService( CConfiguration cConf, ProgramRuntimeService runtimeService, - MetricsCollectionService metricsCollectionService) { - this.cConf = cConf; + MetricsCollectionService metricsCollectionService, + TransactionRunner transactionRunner) { this.runtimeService = runtimeService; this.metricsCollectionService = metricsCollectionService; - - this.launchingQueue = - new PriorityBlockingQueue<>( - 128, Comparator.comparingLong(o -> RunIds.getTime(o.getRun(), TimeUnit.MILLISECONDS))); - this.ageThresholdSec = cConf.getLong(Constants.AppFabric.MONITOR_RECORD_AGE_THRESHOLD_SECONDS); this.maxConcurrentRuns = cConf.getInt(Constants.AppFabric.MAX_CONCURRENT_RUNS); + this.transactionRunner = transactionRunner; } @Override @@ -92,31 +75,9 @@ protected void startUp() throws Exception { @Override protected void shutDown() throws Exception { - if (executor != null) { - executor.shutdownNow(); - } LOG.info("RunRecordMonitorService successfully shut down."); } - @Override - protected void runOneIteration() throws Exception { - cleanupQueue(); - } - - @Override - protected Scheduler scheduler() { - return Scheduler.newFixedRateSchedule( - 0, cConf.getInt(Constants.AppFabric.MONITOR_CLEANUP_INTERVAL_SECONDS), TimeUnit.SECONDS); - } - - @Override - protected final ScheduledExecutorService executor() { - executor = - Executors.newSingleThreadScheduledExecutor( - Threads.createDaemonThreadFactory("run-record-monitor-service-cleanup-scheduler")); - return executor; - } - /** * Add a new in-flight launch request and return total number of launching and running programs. * @@ -144,9 +105,11 @@ public Counter addRequestAndGetCount(ProgramRunId programRunId) throws Exception * @return total number of launching and running program runs. */ public Counter getCount() { - int launchingCount = launchingQueue.size(); + int launchingCount = TransactionRunners.run(transactionRunner, context -> { + AppMetadataStore store = AppMetadataStore.create(context); + return store.countLaunchingRuns(null); + }); int runningCount = getProgramsRunningCount(); - return new Counter(launchingCount, runningCount); } @@ -154,16 +117,18 @@ public Counter getCount() { * Add a new in-flight launch request. * * @param programRunId run id associated with the launch request + * + * @return Returns the count of launching programs. */ public int addRequest(ProgramRunId programRunId) { - int result; - synchronized (launchingQueue) { - launchingQueue.add(programRunId); - result = launchingQueue.size(); - } - emitMetrics(Constants.Metrics.FlowControl.LAUNCHING_COUNT, result); + int launchingCount = TransactionRunners.run(transactionRunner, context -> { + AppMetadataStore store = AppMetadataStore.create(context); + store.recordProgramPending(programRunId); + return store.countLaunchingRuns(null); + }); + emitMetrics(Constants.Metrics.FlowControl.LAUNCHING_COUNT, launchingCount); LOG.info("Added request with runId {}.", programRunId); - return result; + return launchingCount; } /** @@ -176,28 +141,22 @@ public int addRequest(ProgramRunId programRunId) { * Constants.Metrics.FlowControl#RUNNING_COUNT} */ public void removeRequest(ProgramRunId programRunId, boolean emitRunningChange) { - if (launchingQueue.remove(programRunId)) { - LOG.info( - "Removed request with runId {}. Counter has {} concurrent launching requests.", - programRunId, - launchingQueue.size()); - emitMetrics(Constants.Metrics.FlowControl.LAUNCHING_COUNT, launchingQueue.size()); - } - + // TODO: See if this func can be refactored as it only emits metrics. Merge it with other + // functions if needed. + emitLaunchingMetrics(); if (emitRunningChange) { emitRunningMetrics(); } } - public void emitLaunchingMetrics(long value) { - emitMetrics(Constants.Metrics.FlowControl.LAUNCHING_COUNT, value); - } - /** * Emit the {@link Constants.Metrics.FlowControl#LAUNCHING_COUNT} metric for runs. */ public void emitLaunchingMetrics() { - emitMetrics(Constants.Metrics.FlowControl.LAUNCHING_COUNT, launchingQueue.size()); + int launchingCount = TransactionRunners.run(transactionRunner, context -> { + return AppMetadataStore.create(context).countLaunchingRuns(null); + }); + emitMetrics(Constants.Metrics.FlowControl.LAUNCHING_COUNT, launchingCount); } @@ -213,25 +172,6 @@ private void emitMetrics(String metricName, long value) { metricsCollectionService.getContext(Collections.emptyMap()).gauge(metricName, value); } - private void cleanupQueue() { - while (true) { - ProgramRunId programRunId = launchingQueue.peek(); - if (programRunId == null - || RunIds.getTime(programRunId.getRun(), TimeUnit.MILLISECONDS) + (ageThresholdSec * 1000) - >= System.currentTimeMillis()) { - // Queue is empty or queue head has not expired yet. - break; - } - // Queue head might have already been removed. So instead of calling poll, we call remove. - if (launchingQueue.remove(programRunId)) { - LOG.info("Removing request with runId {} due to expired retention time.", programRunId); - } - } - // Always emit both metrics after cleanup. - emitLaunchingMetrics(); - emitRunningMetrics(); - } - /** * Returns the total number of programs in running state. The count includes batch (i.e., {@link * ProgramType#WORKFLOW}), streaming (i.e., {@link ProgramType#SPARK}) with no parent and @@ -242,7 +182,10 @@ private int getProgramsRunningCount() { runtimeService.listAll( ProgramType.WORKFLOW, ProgramType.WORKER, ProgramType.SPARK, ProgramType.MAPREDUCE); - int launchingCount = launchingQueue.size(); + int launchingCount = TransactionRunners.run(transactionRunner, context -> { + AppMetadataStore store = AppMetadataStore.create(context); + return store.countLaunchingRuns(null); + }); // We use program controllers (instead of querying metadata store) to count the total number of // programs in running state. @@ -250,8 +193,8 @@ private int getProgramsRunningCount() { // Therefore, the returning running count is NOT precise. int impreciseRunningCount = (int) list.stream() - .filter(r -> isRunning(r.getController().getState().getRunStatus())) - .count(); + .filter(r -> isRunning(r.getController().getState().getRunStatus())) + .count(); if (maxConcurrentRuns < 0 || (launchingCount + impreciseRunningCount < maxConcurrentRuns)) { // It is safe to return the imprecise value since either flow control for runs is disabled @@ -261,11 +204,8 @@ private int getProgramsRunningCount() { // Flow control is at the threshold. We return the precise count. return (int) list.stream() - .filter( - r -> - isRunning(r.getController().getState().getRunStatus()) - && !launchingQueue.contains(r.getController().getProgramRunId())) - .count(); + .filter( r -> isRunning(r.getController().getState().getRunStatus())) + .count(); } private boolean isRunning(ProgramRunStatus status) { diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/store/AppMetadataStore.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/store/AppMetadataStore.java index c8d5e1def03..bf10a91fe22 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/store/AppMetadataStore.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/store/AppMetadataStore.java @@ -938,13 +938,13 @@ public RunRecordDetail recordProgramProvisioning(ProgramRunId programRunId, RunRecordDetail existing = getRun(programRunId); // for some reason, there is an existing run record. - if (existing != null) { - LOG.error( - "Ignoring unexpected request to record provisioning state for program run {} that has an existing " - + "run record in run state {} and cluster state {}.", - programRunId, existing.getStatus(), existing.getCluster().getStatus()); - return null; - } + // if (existing != null) { + // LOG.error( + // "Ignoring unexpected request to record provisioning state for program run {} that has an existing " + // + "run record in run state {} and cluster state {}.", + // programRunId, existing.getStatus(), existing.getCluster().getStatus()); + // return null; + // } Optional profileId = SystemArguments.getProfileIdFromArgs( programRunId.getNamespaceId(), systemArgs); @@ -970,11 +970,58 @@ public RunRecordDetail recordProgramProvisioning(ProgramRunId programRunId, .setArtifactId(artifactId) .setPrincipal(systemArgs.get(ProgramOptionConstants.PRINCIPAL)) .build(); - writeNewRunRecord(meta, TYPE_RUN_RECORD_ACTIVE); + // writeNewRunRecord(meta, TYPE_RUN_RECORD_ACTIVE); + List> key = getProgramRunInvertedTimeKey(TYPE_RUN_RECORD_ACTIVE, + existing.getProgramRunId(), + existing.getStartTs()); + writeToStructuredTableWithPrimaryKeys( + key, meta, getRunRecordsTable(), StoreDefinition.AppMetadataStore.RUN_RECORD_DATA); LOG.trace("Recorded {} for program {}", ProgramRunClusterStatus.PROVISIONING, programRunId); return meta; } + /** + * Record that the program run is provisioning compute resources for the run. If the current + * status has a higher source id, this call will be ignored. + * + * @param programRunId program run + * + * @return {@link RunRecordDetail} with status Pending. + */ + @Nullable + public RunRecordDetail recordProgramPending(ProgramRunId programRunId) + throws IOException { + long startTs = RunIds.getTime(programRunId.getRun(), TimeUnit.SECONDS); + if (startTs == -1L) { + LOG.error( + "Ignoring unexpected request to record provisioning state for program run {} that does not have " + + "a timestamp in the run id.", programRunId); + return null; + } + + RunRecordDetail existing = getRun(programRunId); + // If for some reason, there is an existing run record then return null. + if (existing != null) { + LOG.error( + "Ignoring unexpected request to record pending state for program run {} that has an " + + "existing run record in run state {} and cluster state {}.", + programRunId, existing.getStatus()); + return null; + } + + ProgramRunCluster cluster = new ProgramRunCluster(ProgramRunClusterStatus.PROVISIONING, null, + null); + RunRecordDetail meta = RunRecordDetail.builder() + .setProgramRunId(programRunId) + .setStartTime(startTs) + .setCluster(cluster) + .setStatus(ProgramRunStatus.PENDING) + .build(); + writeNewRunRecord(meta, TYPE_RUN_RECORD_ACTIVE); + LOG.trace("Recorded {} for program {}", ProgramRunStatus.PENDING, programRunId); + return meta; + } + // return the property map to set in the RunRecordDetail private Map getRecordProperties(Map systemArgs, Map runtimeArgs) { @@ -1643,6 +1690,24 @@ public int countActiveRuns(@Nullable Integer limit) throws IOException { return count.get(); } + /** + * Count all active runs. + * + * @param limit count at most that many runs, stop if there are more. + */ + public int countLaunchingRuns(@Nullable Integer limit) throws IOException { + AtomicInteger count = new AtomicInteger(0); + try (CloseableIterator iterator = queryProgramRuns( + Range.singleton(getRunRecordNamespacePrefix(TYPE_RUN_RECORD_ACTIVE, null)), + key -> !NamespaceId.SYSTEM.getNamespace() + .equals(key.getString(StoreDefinition.AppMetadataStore.NAMESPACE_FIELD)), + runRecordDetail -> runRecordDetail.getStatus().equals(ProgramRunStatus.PENDING), + limit != null ? limit : Integer.MAX_VALUE)) { + iterator.forEachRemaining(m -> count.getAndIncrement()); + } + return count.get(); + } + /** * Scans active runs, starting from the given cursor. * diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java b/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java index 5cf9f9c2dc9..0997c442c55 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java @@ -284,10 +284,6 @@ public static final class AppFabric { public static final String PROGRAM_TRANSACTION_CONTROL = "app.program.transaction.control"; public static final String MAX_CONCURRENT_RUNS = "app.max.concurrent.runs"; public static final String MAX_CONCURRENT_LAUNCHING = "app.max.concurrent.launching"; - public static final String MONITOR_RECORD_AGE_THRESHOLD_SECONDS = - "run.record.monitor.record.age.threshold.seconds"; - public static final String MONITOR_CLEANUP_INTERVAL_SECONDS = - "run.record.monitor.cleanup.interval.seconds"; public static final String PROGRAM_LAUNCH_THREADS = "app.program.launch.threads"; public static final String PROGRAM_KILL_THREADS = "app.program.kill.threads"; public static final String RUN_DATA_CLEANUP_TTL_DAYS = "app.run.records.ttl.days"; diff --git a/cdap-common/src/main/java/io/cdap/cdap/internal/app/store/RunRecordDetail.java b/cdap-common/src/main/java/io/cdap/cdap/internal/app/store/RunRecordDetail.java index 4d70f44d2b5..cc5ae7e2e4c 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/internal/app/store/RunRecordDetail.java +++ b/cdap-common/src/main/java/io/cdap/cdap/internal/app/store/RunRecordDetail.java @@ -291,9 +291,9 @@ public RunRecordDetail build() { if (programRunId == null) { throw new IllegalArgumentException("Run record run id must be specified."); } - if (sourceId == null) { - throw new IllegalArgumentException("Run record source id must be specified."); - } + // if (sourceId == null) { + // throw new IllegalArgumentException("Run record source id must be specified."); + // } // we are not validating artifactId for null, // artifactId could be null for program starts that were recorded pre 5.0 but weren't processed // we don't want to throw exception while processing them diff --git a/cdap-common/src/main/resources/cdap-default.xml b/cdap-common/src/main/resources/cdap-default.xml index ecd3b4cb884..ef7bf8e81a4 100644 --- a/cdap-common/src/main/resources/cdap-default.xml +++ b/cdap-common/src/main/resources/cdap-default.xml @@ -575,31 +575,6 @@ - - run.record.monitor.record.age.threshold.seconds - 3600 - - Maximum amount of time (in seconds) in which a run record is retained in - run record monitor. - This is to safe guard launch requests flow-control such that if a request - is somehow stuck in PENDING/STARTING - state, it will be dropped from after the threshold. - Note that run.record.monitor.cleanup.interval.seconds might be needed to - changed if this config changes. - - - - - run.record.monitor.cleanup.interval.seconds - 60 - - Cleanup interval (in seconds) in which run record monitor service cleanup - logic runs to delete old entries. - Note that run.record.monitor.record.age.threshold.seconds might be needed - to changed if this config changes. - - - app.program.launch.threads 20