Skip to content

Commit

Permalink
Remove in-memory launching queue in RunRecordMonitorService
Browse files Browse the repository at this point in the history
  • Loading branch information
vsethi09 committed Jan 16, 2025
1 parent 9dc64ce commit 343101b
Show file tree
Hide file tree
Showing 15 changed files with 400 additions and 446 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@
import io.cdap.cdap.internal.app.services.NoopRunRecordCorrectorService;
import io.cdap.cdap.internal.app.services.ProgramLifecycleService;
import io.cdap.cdap.internal.app.services.RunRecordCorrectorService;
import io.cdap.cdap.internal.app.services.RunRecordMonitorService;
import io.cdap.cdap.internal.app.services.FlowControlService;
import io.cdap.cdap.internal.app.services.ScheduledRunRecordCorrectorService;
import io.cdap.cdap.internal.app.store.DefaultStore;
import io.cdap.cdap.internal.bootstrap.guice.BootstrapModules;
Expand Down Expand Up @@ -420,7 +420,7 @@ protected void configure() {

bind(ArtifactStore.class).in(Scopes.SINGLETON);
bind(ProfileService.class).in(Scopes.SINGLETON);
bind(RunRecordMonitorService.class).in(Scopes.SINGLETON);
bind(FlowControlService.class).in(Scopes.SINGLETON);
bind(ProgramLifecycleService.class).in(Scopes.SINGLETON);
bind(SystemAppManagementService.class).in(Scopes.SINGLETON);
bind(OwnerAdmin.class).to(DefaultOwnerAdmin.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.twill.common.Cancellable;
import org.apache.twill.discovery.DiscoveryService;
import org.slf4j.Logger;
Expand All @@ -75,7 +74,7 @@ public class AppFabricProcessorService extends AbstractIdleService {
private final RunRecordCorrectorService runRecordCorrectorService;
private final RunDataTimeToLiveService runDataTimeToLiveService;
private final ProgramRunStatusMonitorService programRunStatusMonitorService;
private final RunRecordMonitorService runRecordCounterService;
private final FlowControlService runRecordCounterService;
private final CoreSchedulerService coreSchedulerService;
private final ProvisioningService provisioningService;
private final BootstrapService bootstrapService;
Expand Down Expand Up @@ -111,7 +110,7 @@ public AppFabricProcessorService(CConfiguration cConf,
ProvisioningService provisioningService,
BootstrapService bootstrapService,
SystemAppManagementService systemAppManagementService,
RunRecordMonitorService runRecordCounterService,
FlowControlService runRecordCounterService,
RunDataTimeToLiveService runDataTimeToLiveService,
OperationNotificationSubscriberService operationNotificationSubscriberService,
ScheduleNotificationSubscriberService scheduleNotificationSubscriberService) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
/*
* Copyright © 2022 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.internal.app.services;

import com.google.common.util.concurrent.AbstractIdleService;
import com.google.inject.Inject;
import io.cdap.cdap.api.metrics.MetricsCollectionService;
import io.cdap.cdap.app.program.ProgramDescriptor;
import io.cdap.cdap.app.runtime.ProgramOptions;
import io.cdap.cdap.common.app.RunIds;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.internal.app.store.AppMetadataStore;
import io.cdap.cdap.proto.ProgramRunStatus;
import io.cdap.cdap.proto.ProgramType;
import io.cdap.cdap.proto.id.ProgramRunId;
import io.cdap.cdap.spi.data.transaction.TransactionRunner;
import io.cdap.cdap.spi.data.transaction.TransactionRunners;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Maintain and provides total number of launching and running run-records. This class is used by
* flow-control mechanism for launch requests.
*/
public class FlowControlService extends AbstractIdleService {

private static final Logger LOG = LoggerFactory.getLogger(FlowControlService.class);

// Program types controlled by flow-control mechanism.
private static final Set<ProgramType> CONTROL_FLOW_PROGRAM_TYPES = EnumSet.of(ProgramType.MAPREDUCE,
ProgramType.WORKFLOW,
ProgramType.SPARK,
ProgramType.WORKER);

private final MetricsCollectionService metricsCollectionService;
private final TransactionRunner transactionRunner;

/**
* Monitors the program flow control.
*
* @param metricsCollectionService collect metrics
*/
@Inject
public FlowControlService(
MetricsCollectionService metricsCollectionService,
TransactionRunner transactionRunner) {
this.metricsCollectionService = metricsCollectionService;
this.transactionRunner = transactionRunner;
}

@Override
protected void startUp() throws Exception {
LOG.info("FlowControlService started.");
}

@Override
protected void shutDown() throws Exception {
LOG.info("FlowControlService successfully shut down.");
}

/**
* Add a new in-flight launch request and return total number of launching and running programs.
*
* @param programRunId run id associated with the launch request
* @return total number of launching and running program runs.
*/
public Counter addRequestAndGetCount(ProgramRunId programRunId, ProgramOptions programOptions,
ProgramDescriptor programDescriptor) throws Exception {
if (RunIds.getTime(programRunId.getRun(), TimeUnit.MILLISECONDS) == -1) {
throw new Exception("None time-based UUIDs are not supported");
}

int launchingCount = addRequest(programRunId, programOptions, programDescriptor);
int runningCount = getFlowControlCounter(false, true).getRunningCount();

LOG.info(
"Counter has {} concurrent launching and {} running programs.",
launchingCount,
runningCount);
return new Counter(launchingCount, runningCount);
}

/**
* Get total number of launching and running programs.
*
* @return Counter with total number of launching and running program runs.
*/
public Counter getCount() {
return getFlowControlCounter(true, true);
}

/**
* Add a new in-flight launch request.
*
* @param programRunId run id associated with the launch request
*
* @return Returns the count of launching programs.
*/
public int addRequest(ProgramRunId programRunId, ProgramOptions programOptions,
ProgramDescriptor programDescriptor) {
int launchingCount = TransactionRunners.run(transactionRunner, context -> {
AppMetadataStore store = AppMetadataStore.create(context);
store.recordProgramPending(programRunId,
programOptions.getArguments().asMap(),
programOptions.getUserArguments().asMap(),
programDescriptor.getArtifactId().toApiArtifactId());
return store.getLaunchingCount(CONTROL_FLOW_PROGRAM_TYPES, null);
});
emitMetrics(Constants.Metrics.FlowControl.LAUNCHING_COUNT, launchingCount);
LOG.info("Added request with runId {}.", programRunId);
return launchingCount;
}

private Counter getFlowControlCounter(boolean getLaunching, boolean getRunning) {
return TransactionRunners.run(transactionRunner, context -> {
AppMetadataStore store = AppMetadataStore.create(context);
return new Counter(
getLaunching ? store.getLaunchingCount(CONTROL_FLOW_PROGRAM_TYPES, null) : null,
getRunning ? store.getRunningCount(CONTROL_FLOW_PROGRAM_TYPES, null) : null
);
});
}

public void emitFlowControlMetrics(boolean emitLaunching, boolean emitRunning) {
Counter counter = getFlowControlCounter(emitLaunching, emitRunning);
if (emitLaunching) {
emitMetrics(Constants.Metrics.FlowControl.LAUNCHING_COUNT, counter.getLaunchingCount());
}
if (emitRunning) {
emitMetrics(Constants.Metrics.FlowControl.RUNNING_COUNT, counter.getRunningCount());
}
}

private void emitMetrics(String metricName, long value) {
LOG.trace("Setting metric {} to value {}", metricName, value);
metricsCollectionService.getContext(Collections.emptyMap()).gauge(metricName, value);
}

/**
* Counts the concurrent program runs.
*/
public class Counter {

/**
* Total number of launch requests that have been accepted but still missing in metadata store +
* * total number of run records with {@link ProgramRunStatus#PENDING} status + total number of
* run records with {@link ProgramRunStatus#STARTING} status.
*/
private final Integer launchingCount;

/**
* Total number of run records with {@link ProgramRunStatus#RUNNING} status + Total number of run
* records with {@link ProgramRunStatus#SUSPENDED} status + Total number of run records with
* {@link ProgramRunStatus#RESUMING} status.
*/
private final Integer runningCount;

Counter(Integer launchingCount, Integer runningCount) {
this.launchingCount = launchingCount;
this.runningCount = runningCount;
}

public Integer getLaunchingCount() {
return launchingCount;
}

public Integer getRunningCount() {
return runningCount;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public class ProgramLifecycleService {
private final int defaultStopTimeoutSecs;
private final int batchSize;
private final ArtifactRepository artifactRepository;
private final RunRecordMonitorService runRecordMonitorService;
private final FlowControlService flowControlService;
private final boolean userProgramLaunchDisabled;

@Inject
Expand All @@ -161,7 +161,7 @@ public class ProgramLifecycleService {
ProvisionerNotifier provisionerNotifier, ProvisioningService provisioningService,
ProgramStateWriter programStateWriter, CapabilityReader capabilityReader,
ArtifactRepository artifactRepository,
RunRecordMonitorService runRecordMonitorService) {
FlowControlService flowControlService) {
this.maxConcurrentRuns = cConf.getInt(Constants.AppFabric.MAX_CONCURRENT_RUNS);
this.maxConcurrentLaunching = cConf.getInt(Constants.AppFabric.MAX_CONCURRENT_LAUNCHING);
this.defaultStopTimeoutSecs = cConf.getInt(Constants.AppFabric.PROGRAM_MAX_STOP_SECONDS);
Expand All @@ -180,7 +180,7 @@ public class ProgramLifecycleService {
this.programStateWriter = programStateWriter;
this.capabilityReader = capabilityReader;
this.artifactRepository = artifactRepository;
this.runRecordMonitorService = runRecordMonitorService;
this.flowControlService = flowControlService;
}

/**
Expand Down Expand Up @@ -730,8 +730,8 @@ public RunId runInternal(ProgramId programId, Map<String, String> userArgs,
checkCapability(programDescriptor);

ProgramRunId programRunId = programId.run(runId);
RunRecordMonitorService.Counter counter = runRecordMonitorService.addRequestAndGetCount(
programRunId);
FlowControlService.Counter counter = flowControlService.addRequestAndGetCount(
programRunId, programOptions, programDescriptor);

boolean done = false;
try {
Expand Down Expand Up @@ -765,7 +765,7 @@ public RunId runInternal(ProgramId programId, Map<String, String> userArgs,
done = true;
} finally {
if (!done) {
runRecordMonitorService.removeRequest(programRunId, false);
flowControlService.emitFlowControlMetrics(true, false);
}
}

Expand Down
Loading

0 comments on commit 343101b

Please sign in to comment.