Skip to content

Commit

Permalink
Merge pull request #23855 from vespa-engine/hmusum/cleanup-20
Browse files Browse the repository at this point in the history
Cluster controller cleanup, part 7 [run-systemtest]
  • Loading branch information
Harald Musum authored Aug 30, 2022
2 parents 106374a + 7455d58 commit c817800
Show file tree
Hide file tree
Showing 12 changed files with 159 additions and 170 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.TimeZone;
Expand Down Expand Up @@ -130,12 +131,9 @@ public FleetController(FleetControllerContext context,
this.systemStateBroadcaster = systemStateBroadcaster;
this.stateVersionTracker = new StateVersionTracker(options.minMergeCompletionRatio);
this.metricUpdater = metricUpdater;

this.statusPageServer = statusPage;
this.statusPageServer = Objects.requireNonNull(statusPage, "statusPage cannot be null");
this.rpcServer = server;

this.masterElectionHandler = masterElectionHandler;

this.statusRequestRouter.addHandler(
"^/node=([a-z]+)\\.(\\d+)$",
new LegacyNodePageRequestHandler(timer, eventLog, cluster));
Expand Down Expand Up @@ -277,9 +275,7 @@ public void shutdown() throws InterruptedException, java.io.IOException {
controllerThreadId = Thread.currentThread().getId();
database.shutdown(databaseContext);

if (statusPageServer != null) {
statusPageServer.shutdown();
}
statusPageServer.shutdown();
if (rpcServer != null) {
rpcServer.shutdown();
}
Expand Down Expand Up @@ -530,12 +526,10 @@ private void propagateOptions() {
}
}

if (statusPageServer != null) {
try{
statusPageServer.setPort(options.httpPort);
} catch (Exception e) {
context.log(logger, Level.WARNING, "Failed to initialize status server socket. This may be natural if cluster has altered the services running on this node: " + e.getMessage());
}
try {
statusPageServer.setPort(options.httpPort);
} catch (Exception e) {
context.log(logger, Level.WARNING, "Failed to initialize status server socket. This may be natural if cluster has altered the services running on this node: " + e.getMessage());
}

long currentTime = timer.getCurrentTimeInMillis();
Expand Down Expand Up @@ -679,12 +673,10 @@ private void switchToNewConfig() {
}

private boolean processAnyPendingStatusPageRequest() {
if (statusPageServer != null) {
StatusPageServer.HttpRequest statusRequest = statusPageServer.getCurrentHttpRequest();
if (statusRequest != null) {
statusPageServer.answerCurrentStatusRequest(fetchStatusPage(statusRequest));
return true;
}
StatusPageServer.HttpRequest statusRequest = statusPageServer.getCurrentHttpRequest();
if (statusRequest != null) {
statusPageServer.answerCurrentStatusRequest(fetchStatusPage(statusRequest));
return true;
}
return false;
}
Expand Down Expand Up @@ -1224,8 +1216,6 @@ public void waitForNodesInSlobrok(int distNodeCount, int storNodeCount, Duration

public ContentCluster getCluster() { return cluster; }

public List<NodeEvent> getNodeEvents(Node n) { return eventLog.getNodeEvents(n); }

public EventLog getEventLog() {
return eventLog;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,13 @@
import com.yahoo.vdslib.distribution.ConfiguredNode;
import com.yahoo.vdslib.distribution.Distribution;
import com.yahoo.vdslib.state.NodeType;

import java.text.DecimalFormat;
import java.text.DecimalFormatSymbols;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.stream.Collectors;

/**
* This class represents all the options that can be set in the fleetcontroller.
Expand Down Expand Up @@ -125,7 +120,7 @@ public class FleetControllerOptions implements Cloneable {
// TODO: Get rid of this by always getting nodes by distribution.getNodes()
public Set<ConfiguredNode> nodes;

private Duration maxDeferredTaskVersionWaitTime = Duration.ofSeconds(30);
public Duration maxDeferredTaskVersionWaitTime = Duration.ofSeconds(30);

public boolean clusterHasGlobalDocumentTypes = false;

Expand Down Expand Up @@ -175,82 +170,4 @@ public FleetControllerOptions clone() {
}
}

public static String splitZooKeeperAddress(String s) {
StringBuilder sb = new StringBuilder();
while (true) {
int index = s.indexOf(',');
if (index > 0) {
sb.append(s.substring(0, index + 1)).append(' ');
s = s.substring(index+1);
} else {
break;
}
}
sb.append(s);
return sb.toString();
}

static DecimalFormat DecimalDot2 = new DecimalFormat("0.00", new DecimalFormatSymbols(Locale.ENGLISH));

public void writeHtmlState(StringBuilder sb) {
String slobrokspecs = "";
for (int i=0; i<slobrokConnectionSpecs.length; ++i) {
if (i != 0) slobrokspecs += "<br>";
slobrokspecs += slobrokConnectionSpecs[i];
}
sb.append("<h1>Current config</h1>\n")
.append("<p>Fleet controller config id: ").append(fleetControllerConfigId == null ? null : fleetControllerConfigId.replaceAll("\n", "<br>\n")).append("</p>\n")
.append("<p>Slobrok config id: ").append(slobrokConfigId == null ? null : slobrokConfigId.replaceAll("\n", "<br>\n")).append("</p>\n")
.append("<table border=\"1\" cellspacing=\"0\"><tr><th>Property</th><th>Value</th></tr>\n");

sb.append("<tr><td><nobr>Cluster name</nobr></td><td align=\"right\">").append(clusterName).append("</td></tr>");
sb.append("<tr><td><nobr>Fleet controller index</nobr></td><td align=\"right\">").append(fleetControllerIndex).append("/").append(fleetControllerCount).append("</td></tr>");
sb.append("<tr><td><nobr>Number of fleetcontrollers gathering states from nodes</nobr></td><td align=\"right\">").append(stateGatherCount).append("</td></tr>");

sb.append("<tr><td><nobr>Slobrok connection spec</nobr></td><td align=\"right\">").append(slobrokspecs).append("</td></tr>");
sb.append("<tr><td><nobr>RPC port</nobr></td><td align=\"right\">").append(rpcPort == 0 ? "Pick random available" : rpcPort).append("</td></tr>");
sb.append("<tr><td><nobr>HTTP port</nobr></td><td align=\"right\">").append(httpPort == 0 ? "Pick random available" : httpPort).append("</td></tr>");
sb.append("<tr><td><nobr>Master cooldown period</nobr></td><td align=\"right\">").append(RealTimer.printDuration(masterZooKeeperCooldownPeriod)).append("</td></tr>");
String zooKeeperAddress = (zooKeeperServerAddress == null ? "Not using Zookeeper" : splitZooKeeperAddress(zooKeeperServerAddress));
sb.append("<tr><td><nobr>Zookeeper server address</nobr></td><td align=\"right\">").append(zooKeeperAddress).append("</td></tr>");
sb.append("<tr><td><nobr>Zookeeper session timeout</nobr></td><td align=\"right\">").append(RealTimer.printDuration(zooKeeperSessionTimeout)).append("</td></tr>");

sb.append("<tr><td><nobr>Cycle wait time</nobr></td><td align=\"right\">").append(cycleWaitTime).append(" ms</td></tr>");
sb.append("<tr><td><nobr>Minimum time before first clusterstate broadcast as master</nobr></td><td align=\"right\">").append(RealTimer.printDuration(minTimeBeforeFirstSystemStateBroadcast)).append("</td></tr>");
sb.append("<tr><td><nobr>Minimum time between official cluster states</nobr></td><td align=\"right\">").append(RealTimer.printDuration(minTimeBetweenNewSystemStates)).append("</td></tr>");
sb.append("<tr><td><nobr>Slobrok mirror backoff policy</nobr></td><td align=\"right\">").append(slobrokBackOffPolicy == null ? "default" : "overridden").append("</td></tr>");

sb.append("<tr><td><nobr>Node state request timeout</nobr></td><td align=\"right\">").append(RealTimer.printDuration(nodeStateRequestTimeoutMS)).append("</td></tr>");
sb.append("<tr><td><nobr>VDS 4.1 node state polling frequency</nobr></td><td align=\"right\">").append(RealTimer.printDuration(statePollingFrequency)).append("</td></tr>");
sb.append("<tr><td><nobr>Maximum distributor transition time</nobr></td><td align=\"right\">").append(RealTimer.printDuration(maxTransitionTime.get(NodeType.DISTRIBUTOR))).append("</td></tr>");
sb.append("<tr><td><nobr>Maximum storage transition time</nobr></td><td align=\"right\">").append(RealTimer.printDuration(maxTransitionTime.get(NodeType.STORAGE))).append("</td></tr>");
sb.append("<tr><td><nobr>Maximum initialize without progress time</nobr></td><td align=\"right\">").append(RealTimer.printDuration(maxInitProgressTime)).append("</td></tr>");
sb.append("<tr><td><nobr>Maximum premature crashes</nobr></td><td align=\"right\">").append(maxPrematureCrashes).append("</td></tr>");
sb.append("<tr><td><nobr>Stable state time period</nobr></td><td align=\"right\">").append(RealTimer.printDuration(stableStateTimePeriod)).append("</td></tr>");
sb.append("<tr><td><nobr>Slobrok disconnect grace period</nobr></td><td align=\"right\">").append(RealTimer.printDuration(maxSlobrokDisconnectGracePeriod)).append("</td></tr>");

sb.append("<tr><td><nobr>Number of distributor nodes</nobr></td><td align=\"right\">").append(nodes == null ? "Autodetect" : nodes.size()).append("</td></tr>");
sb.append("<tr><td><nobr>Number of storage nodes</nobr></td><td align=\"right\">").append(nodes == null ? "Autodetect" : nodes.size()).append("</td></tr>");
sb.append("<tr><td><nobr>Minimum distributor nodes being up for cluster to be up</nobr></td><td align=\"right\">").append(minDistributorNodesUp).append("</td></tr>");
sb.append("<tr><td><nobr>Minimum storage nodes being up for cluster to be up</nobr></td><td align=\"right\">").append(minStorageNodesUp).append("</td></tr>");
sb.append("<tr><td><nobr>Minimum percentage of distributor nodes being up for cluster to be up</nobr></td><td align=\"right\">").append(DecimalDot2.format(100 * minRatioOfDistributorNodesUp)).append(" %</td></tr>");
sb.append("<tr><td><nobr>Minimum percentage of storage nodes being up for cluster to be up</nobr></td><td align=\"right\">").append(DecimalDot2.format(100 * minRatioOfStorageNodesUp)).append(" %</td></tr>");

sb.append("<tr><td><nobr>Show local cluster state changes</nobr></td><td align=\"right\">").append(showLocalSystemStatesInEventLog).append("</td></tr>");
sb.append("<tr><td><nobr>Maximum event log size</nobr></td><td align=\"right\">").append(eventLogMaxSize).append("</td></tr>");
sb.append("<tr><td><nobr>Maximum node event log size</nobr></td><td align=\"right\">").append(eventNodeLogMaxSize).append("</td></tr>");
sb.append("<tr><td><nobr>Wanted distribution bits</nobr></td><td align=\"right\">").append(distributionBits).append("</td></tr>");
sb.append("<tr><td><nobr>Max deferred task version wait time</nobr></td><td align=\"right\">").append(maxDeferredTaskVersionWaitTime.toMillis()).append("ms</td></tr>");
sb.append("<tr><td><nobr>Cluster has global document types configured</nobr></td><td align=\"right\">").append(clusterHasGlobalDocumentTypes).append("</td></tr>");
sb.append("<tr><td><nobr>Enable 2-phase cluster state activation protocol</nobr></td><td align=\"right\">").append(enableTwoPhaseClusterStateActivation).append("</td></tr>");
sb.append("<tr><td><nobr>Cluster auto feed block on resource exhaustion enabled</nobr></td><td align=\"right\">")
.append(clusterFeedBlockEnabled).append("</td></tr>");
sb.append("<tr><td><nobr>Feed block limits</nobr></td><td align=\"right\">")
.append(clusterFeedBlockLimit.entrySet().stream()
.map(kv -> String.format("%s: %.2f%%", kv.getKey(), kv.getValue() * 100.0))
.collect(Collectors.joining("<br/>"))).append("</td></tr>");

sb.append("</table>");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,9 @@ public MasterElectionHandler(FleetControllerContext context, int index, int tota
this.index = index;
this.totalCount = totalCount;
this.nextInLineCount = Integer.MAX_VALUE;
// Only a given set of nodes can ever become master
if (index > (totalCount - 1) / 2) {
if (cannotBecomeMaster())
context.log(logger, Level.FINE, () -> "We can never become master and will always stay a follower.");
}
// Tag current time as when we have not seen any other master. Make sure we're not taking over at once for master that is on the way down
// Tag current time as when we have not seen any other master. Make sure we're not taking over at once for master that is on the way down
masterGoneFromZooKeeperTime = timer.getCurrentTimeInMillis();
}

Expand Down Expand Up @@ -77,16 +75,15 @@ public boolean inMasterMoratorium() {

@Override
public Integer getMaster() {
// If too few followers there can be no master
if (2 * followers <= totalCount) {
if (tooFewFollowersToHaveAMaster()) {
return null;
}
// If all are following master candidate, it is master if it exists.
// If all are following master candidate, it is master if it exists.
if (followers == totalCount) {
return masterCandidate;
}
// If not all are following we only accept master candidate if old master
// disappeared sufficient time ago
// If not all are following we only accept master candidate if old master
// disappeared sufficient time ago
if (masterGoneFromZooKeeperTime + masterZooKeeperCooldownPeriod > timer.getCurrentTimeInMillis()) {
return null;
}
Expand All @@ -97,8 +94,7 @@ public String getMasterReason() {
if (masterCandidate == null) {
return "There is currently no master candidate.";
}
// If too few followers there can be no master
if (2 * followers <= totalCount) {
if (tooFewFollowersToHaveAMaster()) {
return "More than half of the nodes must agree for there to be a master. Only " + followers + " of "
+ totalCount + " nodes agree on current master candidate (" + masterCandidate + ").";
}
Expand All @@ -118,6 +114,10 @@ public String getMasterReason() {
return followers + " of " + totalCount + " nodes agree " + masterCandidate + " is master.";
}

private boolean tooFewFollowersToHaveAMaster() {
return 2 * followers <= totalCount;
}

public boolean isAmongNthFirst(int first) { return (nextInLineCount < first); }

public boolean watchMasterElection(DatabaseHandler database,
Expand All @@ -131,8 +131,8 @@ public boolean watchMasterElection(DatabaseHandler database,
}
return false; // Nothing have happened since last time.
}
// Move next data to temporary, such that we don't need to keep lock, and such that we don't retry
// if we happen to fail processing the data.
// Move next data to temporary, such that we don't need to keep lock, and such that we don't retry
// if we happen to fail processing the data.
Map<Integer, Integer> state;
context.log(logger, Level.INFO, "Handling new master election, as we have received " + nextMasterData.size() + " entries");
synchronized (monitor) {
Expand Down Expand Up @@ -184,8 +184,7 @@ public boolean watchMasterElection(DatabaseHandler database,
database.setMasterVote(dbContext, first.getKey());
}
}
// Only a given set of nodes can ever become master
if (index <= (totalCount - 1) / 2) {
if (canBecomeMaster()) {
int ourPosition = 0;
for (Map.Entry<Integer, Integer> entry : state.entrySet()) {
if (entry.getKey() != index) {
Expand All @@ -205,6 +204,11 @@ public boolean watchMasterElection(DatabaseHandler database,
return true;
}

// Only a given set of nodes can ever become master
private boolean canBecomeMaster() {return index <= (totalCount - 1) / 2;}

private boolean cannotBecomeMaster() {return ! canBecomeMaster();}

private static String toString(Map<Integer, Integer> data) {
StringBuilder sb = new StringBuilder();
for (Map.Entry<Integer, Integer> entry : data.entrySet()) {
Expand Down Expand Up @@ -253,10 +257,10 @@ public void writeHtmlState(StringBuilder sb, int stateGatherCount) {
Integer master = getMaster();
if (master != null) {
sb.append("<p>Current cluster controller master is node " + master + ".");
if (master.intValue() == index) sb.append(" (This node)");
if (master == index) sb.append(" (This node)");
sb.append("</p>");
} else {
if (2 * followers <= totalCount) {
if (tooFewFollowersToHaveAMaster()) {
sb.append("<p>There is currently no master. Less than half the fleet controllers (")
.append(followers).append(") are following master candidate ").append(masterCandidate)
.append(".</p>");
Expand All @@ -267,19 +271,19 @@ public void writeHtmlState(StringBuilder sb, int stateGatherCount) {
.append(" before electing new master unless all possible master candidates are online.</p>");
}
}
if ((master == null || master.intValue() != index) && nextInLineCount < stateGatherCount) {
if ((master == null || master != index) && nextInLineCount < stateGatherCount) {
sb.append("<p>As we are number ").append(nextInLineCount)
.append(" in line for taking over as master, we're gathering state from nodes.</p>");
sb.append("<p><font color=\"red\">As we are not the master, we don't know about nodes current system state"
+ " or wanted states, so some statistics below may be stale. Look at status page on master "
+ "for updated data.</font></p>");
}
if (index * 2 > totalCount) {
if (cannotBecomeMaster()) {
sb.append("<p>As lowest index fleet controller is prioritized to become master, and more than half "
+ "of the fleet controllers need to be available to select a master, we can never become master.</p>");
}

// Debug data
// Debug data
sb.append("<p><font size=\"-1\" color=\"grey\">Master election handler internal state:")
.append("<br>Index: " + index)
.append("<br>Fleet controller count: " + totalCount)
Expand Down
Loading

0 comments on commit c817800

Please sign in to comment.