Skip to content

Commit

Permalink
Remove in-memory launching queue in RunRecordMonitorService
Browse files Browse the repository at this point in the history
  • Loading branch information
vsethi09 committed Jan 13, 2025
1 parent 9dc64ce commit ef61989
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 137 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,23 @@

package io.cdap.cdap.internal.app.services;

import com.google.common.util.concurrent.AbstractScheduledService;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.inject.Inject;
import io.cdap.cdap.api.metrics.MetricsCollectionService;
import io.cdap.cdap.app.runtime.ProgramRuntimeService;
import io.cdap.cdap.common.app.RunIds;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.common.conf.Constants.Metrics.FlowControl;
import io.cdap.cdap.internal.app.store.AppMetadataStore;
import io.cdap.cdap.proto.ProgramRunStatus;
import io.cdap.cdap.proto.ProgramType;
import io.cdap.cdap.proto.id.ProgramRunId;
import io.cdap.cdap.spi.data.transaction.TransactionRunner;
import io.cdap.cdap.spi.data.transaction.TransactionRunners;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.twill.common.Threads;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -44,23 +41,13 @@
* flow-control mechanism for launch requests. It also has a cleanup mechanism to automatically
* remove old (i.e., configurable) entries from the counter as a safe-guard mechanism.
*/
public class RunRecordMonitorService extends AbstractScheduledService {
public class RunRecordMonitorService extends AbstractIdleService {

private static final Logger LOG = LoggerFactory.getLogger(RunRecordMonitorService.class);

/**
* Contains ProgramRunIds of runs that have been accepted, but have not been added to metadata
* store plus all run records with {@link ProgramRunStatus#PENDING} or {@link
* ProgramRunStatus#STARTING} status.
*/
private final BlockingQueue<ProgramRunId> launchingQueue;

private final ProgramRuntimeService runtimeService;
private final long ageThresholdSec;
private final CConfiguration cConf;
private final MetricsCollectionService metricsCollectionService;
private final int maxConcurrentRuns;
private ScheduledExecutorService executor;
private final TransactionRunner transactionRunner;

/**
* Tracks the program runs.
Expand All @@ -73,16 +60,12 @@ public class RunRecordMonitorService extends AbstractScheduledService {
public RunRecordMonitorService(
CConfiguration cConf,
ProgramRuntimeService runtimeService,
MetricsCollectionService metricsCollectionService) {
this.cConf = cConf;
MetricsCollectionService metricsCollectionService,
TransactionRunner transactionRunner) {
this.runtimeService = runtimeService;
this.metricsCollectionService = metricsCollectionService;

this.launchingQueue =
new PriorityBlockingQueue<>(
128, Comparator.comparingLong(o -> RunIds.getTime(o.getRun(), TimeUnit.MILLISECONDS)));
this.ageThresholdSec = cConf.getLong(Constants.AppFabric.MONITOR_RECORD_AGE_THRESHOLD_SECONDS);
this.maxConcurrentRuns = cConf.getInt(Constants.AppFabric.MAX_CONCURRENT_RUNS);
this.transactionRunner = transactionRunner;
}

@Override
Expand All @@ -92,31 +75,9 @@ protected void startUp() throws Exception {

@Override
protected void shutDown() throws Exception {
if (executor != null) {
executor.shutdownNow();
}
LOG.info("RunRecordMonitorService successfully shut down.");
}

@Override
protected void runOneIteration() throws Exception {
cleanupQueue();
}

@Override
protected Scheduler scheduler() {
return Scheduler.newFixedRateSchedule(
0, cConf.getInt(Constants.AppFabric.MONITOR_CLEANUP_INTERVAL_SECONDS), TimeUnit.SECONDS);
}

@Override
protected final ScheduledExecutorService executor() {
executor =
Executors.newSingleThreadScheduledExecutor(
Threads.createDaemonThreadFactory("run-record-monitor-service-cleanup-scheduler"));
return executor;
}

/**
* Add a new in-flight launch request and return total number of launching and running programs.
*
Expand Down Expand Up @@ -144,26 +105,30 @@ public Counter addRequestAndGetCount(ProgramRunId programRunId) throws Exception
* @return total number of launching and running program runs.
*/
public Counter getCount() {
int launchingCount = launchingQueue.size();
int launchingCount = TransactionRunners.run(transactionRunner, context -> {
AppMetadataStore store = AppMetadataStore.create(context);
return store.countLaunchingRuns(null);
});
int runningCount = getProgramsRunningCount();

return new Counter(launchingCount, runningCount);
}

/**
* Add a new in-flight launch request.
*
* @param programRunId run id associated with the launch request
*
* @return Returns the count of launching programs.
*/
public int addRequest(ProgramRunId programRunId) {
int result;
synchronized (launchingQueue) {
launchingQueue.add(programRunId);
result = launchingQueue.size();
}
emitMetrics(Constants.Metrics.FlowControl.LAUNCHING_COUNT, result);
int launchingCount = TransactionRunners.run(transactionRunner, context -> {
AppMetadataStore store = AppMetadataStore.create(context);
store.recordProgramPending(programRunId);
return store.countLaunchingRuns(null);
});
emitMetrics(Constants.Metrics.FlowControl.LAUNCHING_COUNT, launchingCount);
LOG.info("Added request with runId {}.", programRunId);
return result;
return launchingCount;
}

/**
Expand All @@ -176,28 +141,22 @@ public int addRequest(ProgramRunId programRunId) {
* Constants.Metrics.FlowControl#RUNNING_COUNT}
*/
public void removeRequest(ProgramRunId programRunId, boolean emitRunningChange) {
if (launchingQueue.remove(programRunId)) {
LOG.info(
"Removed request with runId {}. Counter has {} concurrent launching requests.",
programRunId,
launchingQueue.size());
emitMetrics(Constants.Metrics.FlowControl.LAUNCHING_COUNT, launchingQueue.size());
}

// TODO: See if this func can be refactored as it only emits metrics. Merge it with other
// functions if needed.
emitLaunchingMetrics();
if (emitRunningChange) {
emitRunningMetrics();
}
}

public void emitLaunchingMetrics(long value) {
emitMetrics(Constants.Metrics.FlowControl.LAUNCHING_COUNT, value);
}

/**
* Emit the {@link Constants.Metrics.FlowControl#LAUNCHING_COUNT} metric for runs.
*/
public void emitLaunchingMetrics() {
emitMetrics(Constants.Metrics.FlowControl.LAUNCHING_COUNT, launchingQueue.size());
int launchingCount = TransactionRunners.run(transactionRunner, context -> {
return AppMetadataStore.create(context).countLaunchingRuns(null);
});
emitMetrics(Constants.Metrics.FlowControl.LAUNCHING_COUNT, launchingCount);
}


Expand All @@ -213,25 +172,6 @@ private void emitMetrics(String metricName, long value) {
metricsCollectionService.getContext(Collections.emptyMap()).gauge(metricName, value);
}

private void cleanupQueue() {
while (true) {
ProgramRunId programRunId = launchingQueue.peek();
if (programRunId == null
|| RunIds.getTime(programRunId.getRun(), TimeUnit.MILLISECONDS) + (ageThresholdSec * 1000)
>= System.currentTimeMillis()) {
// Queue is empty or queue head has not expired yet.
break;
}
// Queue head might have already been removed. So instead of calling poll, we call remove.
if (launchingQueue.remove(programRunId)) {
LOG.info("Removing request with runId {} due to expired retention time.", programRunId);
}
}
// Always emit both metrics after cleanup.
emitLaunchingMetrics();
emitRunningMetrics();
}

/**
* Returns the total number of programs in running state. The count includes batch (i.e., {@link
* ProgramType#WORKFLOW}), streaming (i.e., {@link ProgramType#SPARK}) with no parent and
Expand All @@ -242,16 +182,19 @@ private int getProgramsRunningCount() {
runtimeService.listAll(
ProgramType.WORKFLOW, ProgramType.WORKER, ProgramType.SPARK, ProgramType.MAPREDUCE);

int launchingCount = launchingQueue.size();
int launchingCount = TransactionRunners.run(transactionRunner, context -> {
AppMetadataStore store = AppMetadataStore.create(context);
return store.countLaunchingRuns(null);
});

// We use program controllers (instead of querying metadata store) to count the total number of
// programs in running state.
// A program controller is created when a launch request is in the middle of starting state.
// Therefore, the returning running count is NOT precise.
int impreciseRunningCount =
(int) list.stream()
.filter(r -> isRunning(r.getController().getState().getRunStatus()))
.count();
.filter(r -> isRunning(r.getController().getState().getRunStatus()))
.count();

if (maxConcurrentRuns < 0 || (launchingCount + impreciseRunningCount < maxConcurrentRuns)) {
// It is safe to return the imprecise value since either flow control for runs is disabled
Expand All @@ -261,11 +204,8 @@ private int getProgramsRunningCount() {

// Flow control is at the threshold. We return the precise count.
return (int) list.stream()
.filter(
r ->
isRunning(r.getController().getState().getRunStatus())
&& !launchingQueue.contains(r.getController().getProgramRunId()))
.count();
.filter( r -> isRunning(r.getController().getState().getRunStatus()))
.count();
}

private boolean isRunning(ProgramRunStatus status) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -938,13 +938,13 @@ public RunRecordDetail recordProgramProvisioning(ProgramRunId programRunId,

RunRecordDetail existing = getRun(programRunId);
// for some reason, there is an existing run record.
if (existing != null) {
LOG.error(
"Ignoring unexpected request to record provisioning state for program run {} that has an existing "
+ "run record in run state {} and cluster state {}.",
programRunId, existing.getStatus(), existing.getCluster().getStatus());
return null;
}
// if (existing != null) {
// LOG.error(
// "Ignoring unexpected request to record provisioning state for program run {} that has an existing "
// + "run record in run state {} and cluster state {}.",
// programRunId, existing.getStatus(), existing.getCluster().getStatus());
// return null;
// }

Optional<ProfileId> profileId = SystemArguments.getProfileIdFromArgs(
programRunId.getNamespaceId(), systemArgs);
Expand All @@ -970,11 +970,58 @@ public RunRecordDetail recordProgramProvisioning(ProgramRunId programRunId,
.setArtifactId(artifactId)
.setPrincipal(systemArgs.get(ProgramOptionConstants.PRINCIPAL))
.build();
writeNewRunRecord(meta, TYPE_RUN_RECORD_ACTIVE);
// writeNewRunRecord(meta, TYPE_RUN_RECORD_ACTIVE);
List<Field<?>> key = getProgramRunInvertedTimeKey(TYPE_RUN_RECORD_ACTIVE,
existing.getProgramRunId(),
existing.getStartTs());
writeToStructuredTableWithPrimaryKeys(
key, meta, getRunRecordsTable(), StoreDefinition.AppMetadataStore.RUN_RECORD_DATA);
LOG.trace("Recorded {} for program {}", ProgramRunClusterStatus.PROVISIONING, programRunId);
return meta;
}

/**
* Record that the program run is provisioning compute resources for the run. If the current
* status has a higher source id, this call will be ignored.
*
* @param programRunId program run
*
* @return {@link RunRecordDetail} with status Pending.
*/
@Nullable
public RunRecordDetail recordProgramPending(ProgramRunId programRunId)
throws IOException {
long startTs = RunIds.getTime(programRunId.getRun(), TimeUnit.SECONDS);
if (startTs == -1L) {
LOG.error(
"Ignoring unexpected request to record provisioning state for program run {} that does not have "
+ "a timestamp in the run id.", programRunId);
return null;
}

RunRecordDetail existing = getRun(programRunId);
// If for some reason, there is an existing run record then return null.
if (existing != null) {
LOG.error(
"Ignoring unexpected request to record pending state for program run {} that has an "
+ "existing run record in run state {} and cluster state {}.",
programRunId, existing.getStatus());
return null;
}

ProgramRunCluster cluster = new ProgramRunCluster(ProgramRunClusterStatus.PROVISIONING, null,
null);
RunRecordDetail meta = RunRecordDetail.builder()
.setProgramRunId(programRunId)
.setStartTime(startTs)
.setCluster(cluster)
.setStatus(ProgramRunStatus.PENDING)
.build();
writeNewRunRecord(meta, TYPE_RUN_RECORD_ACTIVE);
LOG.trace("Recorded {} for program {}", ProgramRunStatus.PENDING, programRunId);
return meta;
}

// return the property map to set in the RunRecordDetail
private Map<String, String> getRecordProperties(Map<String, String> systemArgs,
Map<String, String> runtimeArgs) {
Expand Down Expand Up @@ -1643,6 +1690,24 @@ public int countActiveRuns(@Nullable Integer limit) throws IOException {
return count.get();
}

/**
* Count all active runs.
*
* @param limit count at most that many runs, stop if there are more.
*/
public int countLaunchingRuns(@Nullable Integer limit) throws IOException {
AtomicInteger count = new AtomicInteger(0);
try (CloseableIterator<RunRecordDetail> iterator = queryProgramRuns(
Range.singleton(getRunRecordNamespacePrefix(TYPE_RUN_RECORD_ACTIVE, null)),
key -> !NamespaceId.SYSTEM.getNamespace()
.equals(key.getString(StoreDefinition.AppMetadataStore.NAMESPACE_FIELD)),
runRecordDetail -> runRecordDetail.getStatus().equals(ProgramRunStatus.PENDING),
limit != null ? limit : Integer.MAX_VALUE)) {
iterator.forEachRemaining(m -> count.getAndIncrement());
}
return count.get();
}

/**
* Scans active runs, starting from the given cursor.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,10 +284,6 @@ public static final class AppFabric {
public static final String PROGRAM_TRANSACTION_CONTROL = "app.program.transaction.control";
public static final String MAX_CONCURRENT_RUNS = "app.max.concurrent.runs";
public static final String MAX_CONCURRENT_LAUNCHING = "app.max.concurrent.launching";
public static final String MONITOR_RECORD_AGE_THRESHOLD_SECONDS =
"run.record.monitor.record.age.threshold.seconds";
public static final String MONITOR_CLEANUP_INTERVAL_SECONDS =
"run.record.monitor.cleanup.interval.seconds";
public static final String PROGRAM_LAUNCH_THREADS = "app.program.launch.threads";
public static final String PROGRAM_KILL_THREADS = "app.program.kill.threads";
public static final String RUN_DATA_CLEANUP_TTL_DAYS = "app.run.records.ttl.days";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,9 +291,9 @@ public RunRecordDetail build() {
if (programRunId == null) {
throw new IllegalArgumentException("Run record run id must be specified.");
}
if (sourceId == null) {
throw new IllegalArgumentException("Run record source id must be specified.");
}
// if (sourceId == null) {
// throw new IllegalArgumentException("Run record source id must be specified.");
// }
// we are not validating artifactId for null,
// artifactId could be null for program starts that were recorded pre 5.0 but weren't processed
// we don't want to throw exception while processing them
Expand Down
Loading

0 comments on commit ef61989

Please sign in to comment.