Skip to content

Commit

Permalink
feat: support placement constraint in app level (#682)
Browse files Browse the repository at this point in the history
* feat: support placement constraint in app level

* fix ci
  • Loading branch information
zuston authored Oct 14, 2023
1 parent 52bcaa3 commit 7ceaaca
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.apache.hadoop.yarn.util.AbstractLivelinessMonitor;
import org.apache.hadoop.yarn.util.UTCClock;

import static com.linkedin.tony.TonyConfigurationKeys.APPLICATION_PLACEMENT_SPEC;

public class ApplicationMaster {
private static final Log LOG = LogFactory.getLog(ApplicationMaster.class);
Expand Down Expand Up @@ -490,7 +491,14 @@ private boolean prepare() throws IOException {
String dashboardHttpUrl = dashboardHttpServer.start();
this.dashboardHttpServer = dashboardHttpServer;

response = amRMClient.registerApplicationMaster(amHostname, amPort, dashboardHttpUrl);
String appLevelPlacementConstraintSpec = tonyConf.get(APPLICATION_PLACEMENT_SPEC);
if (StringUtils.isNotEmpty(appLevelPlacementConstraintSpec)) {
response = HadoopCompatibleAdapter.registerAppMaster(amRMClient, amHostname, amPort,
dashboardHttpUrl, appLevelPlacementConstraintSpec);
} else {
response = amRMClient.registerApplicationMaster(amHostname, amPort, dashboardHttpUrl);
}

amHostPort = hostNameOrIpFromTokenConf + ":" + amPort;
} catch (Exception e) {
LOG.error("Exception while preparing AM", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.Priority;
Expand Down Expand Up @@ -174,6 +175,30 @@ public static boolean existGPUResource() {
}
}

public static RegisterApplicationMasterResponse registerAppMaster(
AMRMClientAsync<AMRMClient.ContainerRequest> amRMClient,
String appHostName,
int appHostPort,
String appTrackingUrl,
String appLevelPlacementConstraintSpec) {
try {
Map<Set<String>, Object> placementConstraints = new HashMap<>();

placementConstraints.put(
Collections.singleton(""),
parsePlacementConstraintSpec(appLevelPlacementConstraintSpec)
);

Method method = Arrays.stream(amRMClient.getClass().getMethods())
.filter(x -> x.getName().equals("registerApplicationMaster") && x.getParameterCount() == 4)
.findFirst().get();
return (RegisterApplicationMasterResponse) method.invoke(amRMClient, appHostName, appHostPort,
appTrackingUrl, placementConstraints);
} catch (Exception e) {
throw new RuntimeException("Errors on registering app master.", e);
}
}

public static void constructAndAddSchedulingRequest(AMRMClientAsync<AMRMClient.ContainerRequest> amRMClient,
JobContainerRequest containerRequest) {
try {
Expand All @@ -192,6 +217,23 @@ public static void constructAndAddSchedulingRequest(AMRMClientAsync<AMRMClient.C
}
}

private static Object parsePlacementConstraintSpec(String spec) throws Exception {
if (StringUtils.isEmpty(spec)) {
return null;
}

Class<?> placementConstraintCls =
Class.forName("org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser");
Method parseMethod = placementConstraintCls.getMethod("parseExpression", String.class);

Object parsedObj = parseMethod.invoke(placementConstraintCls, spec);
Class<?> abstractConstraintCls =
Class.forName("org.apache.hadoop.yarn.api.resource.PlacementConstraint$AbstractConstraint");

Object placementConstraintObj = abstractConstraintCls.getMethod("build").invoke(parsedObj);
return placementConstraintObj;
}

private static Object constructSchedulingRequest(JobContainerRequest containerRequest) {
try {
Priority priority = Priority.newInstance(containerRequest.getPriority());
Expand All @@ -202,15 +244,18 @@ private static Object constructSchedulingRequest(JobContainerRequest containerRe
Set<String> allocationTags = CollectionUtils.isEmpty(containerRequest.getAllocationTags())
? Collections.singleton("") : new HashSet<>(containerRequest.getAllocationTags());

Class<?> placementConstraintCls =
Class.forName("org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser");
Method parseMethod = placementConstraintCls.getMethod("parseExpression", String.class);
Object placementConstraintObj = parsePlacementConstraintSpec(containerRequest.getPlacementSpec());
if (StringUtils.isNotEmpty(containerRequest.getPlacementSpec())) {
Class<?> placementConstraintCls =
Class.forName("org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser");
Method parseMethod = placementConstraintCls.getMethod("parseExpression", String.class);

Object parsedObj = parseMethod.invoke(placementConstraintCls, containerRequest.getPlacementSpec());
Class<?> abstractConstraintCls =
Class.forName("org.apache.hadoop.yarn.api.resource.PlacementConstraint$AbstractConstraint");
Object parsedObj = parseMethod.invoke(placementConstraintCls, containerRequest.getPlacementSpec());
Class<?> abstractConstraintCls =
Class.forName("org.apache.hadoop.yarn.api.resource.PlacementConstraint$AbstractConstraint");

Object placementConstraintObj = abstractConstraintCls.getMethod("build").invoke(parsedObj);
placementConstraintObj = abstractConstraintCls.getMethod("build").invoke(parsedObj);
}

Class<?> resourceSizingCls = Class.forName("org.apache.hadoop.yarn.api.records.ResourceSizing");
Method resourceSizingMethod = Arrays.stream(resourceSizingCls.getMethods())
Expand All @@ -226,7 +271,7 @@ private static Object constructSchedulingRequest(JobContainerRequest containerRe
resourceSizingObj, placementConstraintObj);

return schedReq;
} catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
} catch (Exception e) {
throw new RuntimeException("Errors on constructing scheduling requests of Yarn.", e);
}
}
Expand Down
8 changes: 7 additions & 1 deletion tony-core/src/main/java/com/linkedin/tony/TaskScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -25,6 +27,7 @@
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;

import static com.linkedin.tony.TonyConfigurationKeys.APPLICATION_PLACEMENT_SPEC;

public class TaskScheduler {
private static final Log LOG = LogFactory.getLog(TaskScheduler.class);
Expand Down Expand Up @@ -89,9 +92,12 @@ boolean checkDependencySatisfied(JobContainerRequest request) {
}

private void scheduleJob(JobContainerRequest request) {
if (request.getPlacementSpec() != null) {
if (request.getPlacementSpec() != null || StringUtils.isNotEmpty(tonyConf.get(APPLICATION_PLACEMENT_SPEC))) {
// this should use newer api of Yarn with this placement constraint feature,
// only be supported in hadoop 3.2.x
//
// Tips: the app level placement constraint spec must be together with scheduling
// request api, otherwise, it is invalid.
HadoopCompatibleAdapter.constructAndAddSchedulingRequest(amRMClient, request);
} else {
AMRMClient.ContainerRequest containerAsk = Utils.setupContainerRequestForRM(request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ private TonyConfigurationKeys() {

public static final String APPLICATION_NODE_LABEL = TONY_APPLICATION_PREFIX + "node-label";

public static final String APPLICATION_PLACEMENT_SPEC = TONY_APPLICATION_PREFIX + "placement-spec";

public static final String ENABLE_PREPROCESSING_JOB = TONY_APPLICATION_PREFIX + "enable-preprocess";
public static final boolean DEFAULT_ENABLE_PREPROCESSING_JOB = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public void initializeMemberVariables() {
configurationPropsToSkipCompare.add(TonyConfigurationKeys.TB_MEMORY);
configurationPropsToSkipCompare.add(TonyConfigurationKeys.TB_INSTANCES);
configurationPropsToSkipCompare.add(TonyConfigurationKeys.TB_GPUS);
configurationPropsToSkipCompare.add(TonyConfigurationKeys.APPLICATION_PLACEMENT_SPEC);
}

@BeforeTest
Expand Down

0 comments on commit 7ceaaca

Please sign in to comment.