Skip to content

Commit

Permalink
Merge pull request #26403 from vespa-engine/bratseth/autoscaling-comp…
Browse files Browse the repository at this point in the history
…letion

Bratseth/autoscaling completion
  • Loading branch information
freva authored Mar 10, 2023
2 parents de3b08f + 7ddc151 commit b06d77b
Show file tree
Hide file tree
Showing 10 changed files with 57 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@ public Optional<ScalingEvent> lastScalingEvent() {
return Optional.of(scalingEvents.get(scalingEvents.size() - 1));
}

/** Returns whether the last scaling event in this has yet to complete. */
public boolean scalingInProgress() {
return lastScalingEvent().isPresent() && lastScalingEvent().get().completion().isEmpty();
}

public Cluster withConfiguration(boolean exclusive, Capacity capacity) {
return new Cluster(id, exclusive,
capacity.minResources(), capacity.maxResources(), capacity.groupSize(), capacity.isRequired(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ public ScalingEvent withCompletion(Instant completion) {
@Override
public boolean equals(Object o) {
if (o == this) return true;
if ( ! (o instanceof ScalingEvent)) return true;
ScalingEvent other = (ScalingEvent)o;
if ( ! (o instanceof ScalingEvent other)) return true;
if ( other.generation != this.generation) return false;
if ( ! other.at.equals(this.at)) return false;
if ( ! other.from.equals(this.from)) return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public enum Status {
/** The cluster should be rescaled further, but no better configuration is allowed by the current limits */
insufficient,

/** Rescaling of this cluster has been scheduled */
/** This cluster should be rescaled */
rescaling

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,10 @@ private Load adjustQueryDependentIdealLoadByBcpGroupInfo(Load ideal) {
}

private boolean hasScaledIn(Duration period) {
return cluster.lastScalingEvent().map(event -> event.at()).orElse(Instant.MIN)
.isAfter(clock.instant().minus(period));
if (cluster.lastScalingEvent().isEmpty()) return false;
var lastCompletion = cluster.lastScalingEvent().get().completion();
if (lastCompletion.isEmpty()) return true; // Ongoing
return lastCompletion.get().isAfter(clock.instant().minus(period));
}

private ClusterNodesTimeseries nodeTimeseries() { return nodeTimeseries; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ private boolean autoscale(ApplicationId applicationId, ClusterSpec.Id clusterId)
if (application.isEmpty()) return true;
if (application.get().cluster(clusterId).isEmpty()) return true;
Cluster cluster = application.get().cluster(clusterId).get();
Cluster unchangedCluster = cluster;

NodeList clusterNodes = nodeRepository().nodes().list(Node.State.active).owner(applicationId).cluster(clusterId);
cluster = updateCompletion(cluster, clusterNodes);
Expand All @@ -82,14 +83,15 @@ private boolean autoscale(ApplicationId applicationId, ClusterSpec.Id clusterId)

// Autoscale unless an autoscaling is already in progress
Autoscaling autoscaling = null;
if (cluster.target().resources().isEmpty() || current.equals(cluster.target().resources().get())) {
if (cluster.target().resources().isEmpty() && !cluster.scalingInProgress()) {
autoscaling = autoscaler.autoscale(application.get(), cluster, clusterNodes);
if ( autoscaling.isPresent() || cluster.target().isEmpty()) // Ignore empty from recently started servers
if (autoscaling.isPresent() || cluster.target().isEmpty()) // Ignore empty from recently started servers
cluster = cluster.withTarget(autoscaling);
}

// Always store updates
applications().put(application.get().with(cluster), lock);
// Always store any updates
if (cluster != unchangedCluster)
applications().put(application.get().with(cluster), lock);

// Attempt to perform the autoscaling immediately, and log it regardless
if (autoscaling != null && autoscaling.resources().isPresent() && !current.equals(autoscaling.resources().get())) {
Expand Down Expand Up @@ -127,7 +129,7 @@ private Cluster updateCompletion(Cluster cluster, NodeList clusterNodes) {
if (clusterNodes.retired().stream()
.anyMatch(node -> node.history().hasEventAt(History.Event.Type.retired, event.at())))
return cluster;
// - 2. all nodes have switched to the right config generation (currently only measured on containers)
// - 2. all nodes have switched to the right config generation
for (var nodeTimeseries : nodeRepository().metricsDb().getNodeTimeseries(Duration.between(event.at(), clock().instant()),
clusterNodes)) {
Optional<NodeMetricSnapshot> onNewGeneration =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ public void test_autoscaling_single_content_group() {

fixture.tester().clock().advance(Duration.ofDays(2));
fixture.loader().applyCpuLoad(0.1f, 10);
assertTrue("Last scaling not completed", fixture.autoscale().resources().isEmpty());

fixture.completeLastScaling();
fixture.tester().clock().advance(Duration.ofDays(7));
fixture.loader().applyCpuLoad(0.1f, 10);
fixture.tester().assertResources("Scaling cpu down since usage has gone down significantly",
8, 1, 1.0, 7.3, 22.1,
fixture.autoscale());
Expand Down Expand Up @@ -243,14 +248,15 @@ public void test_autoscaling_single_container_group() {
var fixture = DynamicProvisioningTester.fixture().awsProdSetup(true).clusterType(ClusterSpec.Type.container).build();

fixture.loader().applyCpuLoad(0.25f, 120);
ClusterResources scaledResources = fixture.tester().assertResources("Scaling cpu up",
4, 1, 4, 16.0, 40.8,
fixture.autoscale());
var scaledResources = fixture.tester().assertResources("Scaling cpu up",
3, 1, 4, 16.0, 40.8,
fixture.autoscale());
fixture.deploy(Capacity.from(scaledResources));
fixture.deactivateRetired(Capacity.from(scaledResources));
fixture.completeLastScaling();
fixture.loader().applyCpuLoad(0.1f, 120);
fixture.tester().assertResources("Scaling down since cpu usage has gone down",
3, 1, 4, 16, 30.6,
3, 1, 2, 16, 27.2,
fixture.autoscale());
}

Expand Down Expand Up @@ -585,7 +591,7 @@ public void scaling_down_only_after_delay() {
var fixture = DynamicProvisioningTester.fixture().awsProdSetup(true).build();
fixture.loader().applyCpuLoad(0.02, 120);
assertTrue("Too soon after initial deployment", fixture.autoscale().resources().isEmpty());
fixture.tester().clock().advance(Duration.ofDays(2));
fixture.tester().clock().advance(Duration.ofHours(12 * 3 + 1));
fixture.loader().applyCpuLoad(0.02, 120);
fixture.tester().assertResources("Scaling down since enough time has passed",
3, 1, 1.0, 24.6, 101.4,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public Fixture(Fixture.Builder builder, Optional<ClusterResources> initialResour
var deployCapacity = initialResources.isPresent() ? Capacity.from(initialResources.get()) : capacity;
tester.deploy(builder.application, builder.cluster, deployCapacity);
this.loader = new Loader(this);
store(cluster().with(cluster().lastScalingEvent().get().withCompletion(tester.clock().instant())));
}

public DynamicProvisioningTester tester() { return tester; }
Expand Down Expand Up @@ -141,6 +142,16 @@ public void store(BcpGroupInfo bcpGroupInfo) {
tester.nodeRepository().applications().put(application, tester.nodeRepository().applications().lock(applicationId));
}

public void store(Cluster cluster) {
var application = application();
application = application.with(cluster);
tester.nodeRepository().applications().put(application, tester.nodeRepository().applications().lock(applicationId));
}

public void completeLastScaling() {
store(cluster().with(cluster().lastScalingEvent().get().withCompletion(tester().clock().instant())));
}

public static class Builder {

ApplicationId application = DynamicProvisioningTester.applicationId("application1");
Expand All @@ -162,7 +173,7 @@ public Fixture.Builder zone(Zone zone) {
}

/** Set to true to behave as if hosts are provisioned dynamically. */
public Fixture. Builder dynamicProvisioning(boolean dynamicProvisioning) {
public Fixture.Builder dynamicProvisioning(boolean dynamicProvisioning) {
this.zone = new Zone(Cloud.builder()
.dynamicProvisioning(dynamicProvisioning)
.allowHostSharing(zone.cloud().allowHostSharing())
Expand All @@ -174,7 +185,7 @@ public Fixture. Builder dynamicProvisioning(boolean dynamicProvisioning) {
}

/** Set to true to allow multiple nodes be provisioned on the same host. */
public Fixture. Builder allowHostSharing(boolean allowHostSharing) {
public Fixture.Builder allowHostSharing(boolean allowHostSharing) {
this.zone = new Zone(Cloud.builder()
.dynamicProvisioning(zone.cloud().dynamicProvisioning())
.allowHostSharing(allowHostSharing)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,11 @@ public void testScalingSuggestionsMaintainer() {
tester.deploy(app1, cluster1, Capacity.from(new ClusterResources(5, 1, new NodeResources(4, 4, 10, 0.1)),
new ClusterResources(5, 1, new NodeResources(4, 4, 10, 0.1)),
IntRange.empty(), false, true, Optional.empty(), ClusterInfo.empty()));
storeCompletion(app1, cluster1.id(), tester.nodeRepository());
tester.deploy(app2, cluster2, Capacity.from(new ClusterResources(5, 1, new NodeResources(4, 4, 10, 0.1)),
new ClusterResources(10, 1, new NodeResources(6.5, 5, 15, 0.1)),
IntRange.empty(), false, true, Optional.empty(), ClusterInfo.empty()));

storeCompletion(app2, cluster2.id(), tester.nodeRepository());
tester.clock().advance(Duration.ofHours(13));
Duration timeAdded = addMeasurements(0.90f, 0.90f, 0.90f, 0, 500, app1, tester.nodeRepository());
tester.clock().advance(timeAdded.negated());
Expand Down Expand Up @@ -109,6 +110,16 @@ public void testScalingSuggestionsMaintainer() {
assertFalse("Suggestion is not made as it matches what we have", shouldSuggest(app1, cluster1, tester));
}

private void storeCompletion(ApplicationId appId, ClusterSpec.Id clusterId, NodeRepository nodeRepository) {
try (var lock = nodeRepository.applications().lock(appId)) {
var app = nodeRepository.applications().require(appId);
var cluster = app.cluster(clusterId).get();
cluster = cluster.with(cluster.lastScalingEvent().get().withCompletion(nodeRepository.clock().instant()));
app = app.with(cluster);
nodeRepository.applications().put(app, lock);
}
}

private Autoscaling suggestionOf(ApplicationId app, ClusterSpec cluster, ProvisioningTester tester) {
return tester.nodeRepository().applications().get(app).get().cluster(cluster.id()).get().suggested();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,8 @@ public void optimize(RankingExpression expression, ContextIndex context, Optimiz
*/
private ExpressionNode findAndOptimize(ExpressionNode node) {
ExpressionNode newNode = optimize(node);
if ( ! (newNode instanceof CompositeNode)) return newNode; //
if ( ! (newNode instanceof CompositeNode newComposite)) return newNode;

CompositeNode newComposite = (CompositeNode)newNode;
List<ExpressionNode> newChildren = new ArrayList<>();
for (ExpressionNode child : newComposite.children()) {
newChildren.add(findAndOptimize(child));
Expand Down Expand Up @@ -84,10 +83,9 @@ private boolean optimize(ExpressionNode node, List<Double> forest) {
currentTreesOptimized++;
return true;
}
if (!(node instanceof OperationNode)) {
if (!(node instanceof OperationNode aNode)) {
return false;
}
OperationNode aNode = (OperationNode)node;
for (Operator op : aNode.operators()) {
if (op != Operator.plus) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,7 @@ private ExpressionNode createGBDTNode(IfNode cNode,ContextIndex context) {
*/
private int consumeNode(ExpressionNode node, List<Double> values, ContextIndex context) {
int beforeIndex = values.size();
if ( node instanceof IfNode) {
IfNode ifNode = (IfNode)node;
if (node instanceof IfNode ifNode) {
int jumpValueIndex = consumeIfCondition(ifNode.getCondition(), values, context);
values.add(0d); // jumpValue goes here after the next line
int jumpValue = consumeNode(ifNode.getTrueExpression(), values, context) + 1;
Expand Down

0 comments on commit b06d77b

Please sign in to comment.