-
Notifications
You must be signed in to change notification settings - Fork 7
/
InterconnectClusterManager.java
241 lines (211 loc) · 9.53 KB
/
InterconnectClusterManager.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
/*
* Made with all the love in the world
* by scireum in Remshalden, Germany
*
* Copyright by scireum GmbH
* http://www.scireum.de - [email protected]
*/
package sirius.biz.cluster;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import sirius.kernel.async.CallContext;
import sirius.kernel.commons.Json;
import sirius.kernel.commons.Strings;
import sirius.kernel.di.std.ConfigValue;
import sirius.kernel.di.std.Part;
import sirius.kernel.di.std.Register;
import sirius.kernel.health.Exceptions;
import sirius.kernel.health.metrics.Metric;
import sirius.kernel.health.metrics.MetricState;
import sirius.web.health.Cluster;
import sirius.web.health.ClusterManager;
import sirius.web.health.NodeInfo;
import sirius.web.http.WebServer;
import sirius.web.services.JSONCall;
import javax.annotation.Nonnull;
import java.net.InetAddress;
import java.net.URI;
import java.net.UnknownHostException;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Stream;
/**
* Implements a {@link ClusterManager} by discovering and managing nodes via the {@link Interconnect}.
*/
@Register(classes = {InterconnectClusterManager.class, ClusterManager.class, InterconnectHandler.class})
public class InterconnectClusterManager implements ClusterManager, InterconnectHandler {
private static final String MESSAGE_TYPE = "type";
private static final String TYPE_PING = "PING";
private static final String TYPE_PONG = "PONG";
private static final String TYPE_KILL = "KILL";
private static final String MESSAGE_NAME = "name";
private static final String MESSAGE_ADDRESS = "address";
private static final Duration PING_INTERVAL = Duration.ofMinutes(15);
private static final int SHORT_CLUSTER_HTTP_TIMEOUT_MILLIS = 1000;
public static final String RESPONSE_NODE_NAME = "node";
public static final String RESPONSE_ERROR = "error";
public static final String RESPONSE_ERROR_MESAGE = "errorMesage";
private static final int HTTP_DEFAULT_PORT = 80;
private final Map<String, String> members = new ConcurrentHashMap<>();
private LocalDateTime lastPing = null;
@Part
private Interconnect interconnect;
@ConfigValue("sirius.clusterToken")
private String clusterAPIToken;
@ConfigValue("sirius.nodeAddress")
private String localNodeAddress;
@Nonnull
@Override
public String getName() {
return "cluster";
}
/**
* Returns the cluster API token which is used to authenticate nodes against each other and also
* maintenance workers (e.g. systems which start bleeding of nodes before a system update).
*
* @return the cluster token to use
*/
public String getClusterAPIToken() {
if (Strings.isEmpty(clusterAPIToken)) {
clusterAPIToken = Strings.generateCode(32);
}
return clusterAPIToken;
}
/**
* Determines if the given token matches the cluster token.
*
* @param token the token to check
* @return <tt>true</tt> if the given token is the cluster token, <tt>false</tt> otherwise
*/
public boolean isClusterAPIToken(String token) {
return Strings.areEqual(token, clusterAPIToken);
}
protected void sendPing() {
interconnect.dispatch(getName(), Json.createObject().put(MESSAGE_TYPE, TYPE_PING));
}
@Override
public void handleEvent(ObjectNode event) {
String messageType = event.path(MESSAGE_TYPE).asText(null);
if (Strings.areEqual(messageType, TYPE_PING)) {
lastPing = LocalDateTime.now();
interconnect.dispatch(getName(),
Json.createObject()
.put(MESSAGE_TYPE, TYPE_PONG)
.put(MESSAGE_NAME, CallContext.getNodeName())
.put(MESSAGE_ADDRESS, getLocalAddress()));
} else if (Strings.areEqual(messageType, TYPE_PONG)) {
String address = event.path(MESSAGE_ADDRESS).asText(null);
if (!Strings.areEqual(address, getLocalAddress()) && Strings.isFilled(address)) {
String nodeName = event.path(MESSAGE_NAME).asText(null);
if (!Strings.areEqual(members.put(nodeName, address), address)) {
Cluster.LOG.INFO("Discovered a new node: %s - %s", nodeName, address);
}
}
} else if (Strings.areEqual(messageType, TYPE_KILL)) {
members.remove(event.path(MESSAGE_NAME).asText(null));
}
}
private String getLocalAddress() {
try {
if (Strings.isEmpty(localNodeAddress)) {
int port = WebServer.getPort();
if (port != HTTP_DEFAULT_PORT) {
localNodeAddress = "http://" + InetAddress.getLocalHost().getHostAddress() + ":" + port;
} else {
localNodeAddress = "http://" + InetAddress.getLocalHost().getHostAddress();
}
}
return localNodeAddress;
} catch (UnknownHostException exception) {
return "";
}
}
/**
* Removes a node as known cluster member.
* <p>
* Note that if the node is still alive, it will be re-discovered. Therefore this should only be used to
* remove nodes which are permanently shut down.
*
* @param name the name of the node to remove as member
*/
public void killNode(String name) {
interconnect.dispatch(getName(), Json.createObject().put(MESSAGE_TYPE, TYPE_KILL).put(MESSAGE_NAME, name));
}
/**
* Invokes the URI on each cluster member and returns the received JSON.
* <p>
* Note that this will not include the local node.
*
* @param uri the uri to invoke
* @return the JSON per node as stream
*/
public Stream<ObjectNode> callEachNode(String uri) {
return members.entrySet().stream().map(e -> callNode(e.getKey(), e.getValue(), uri));
}
private ObjectNode callNode(String nodeName, String endpoint, String uri) {
try {
JSONCall call = JSONCall.to(new URI(endpoint + uri));
// Set short-lived timeouts as we do not want to block a cluster wide query if one node is down...
call.getOutcall().setConnectTimeout(SHORT_CLUSTER_HTTP_TIMEOUT_MILLIS);
call.getOutcall().setReadTimeout(SHORT_CLUSTER_HTTP_TIMEOUT_MILLIS);
ObjectNode result = call.getInput();
result.put(RESPONSE_NODE_NAME, nodeName);
if (!result.has(RESPONSE_ERROR)) {
result.put(RESPONSE_ERROR, false);
}
return result;
} catch (Exception exception) {
return Json.createObject()
.put(RESPONSE_NODE_NAME, nodeName)
.put(RESPONSE_ERROR, true)
.put(RESPONSE_ERROR_MESAGE, exception.getMessage());
}
}
@Override
public List<NodeInfo> updateClusterState() {
if (lastPing == null) {
Cluster.LOG.INFO("Starting node discovery - I am %s - %s", CallContext.getNodeName(), getLocalAddress());
}
if (lastPing == null || Duration.between(lastPing, LocalDateTime.now()).compareTo(PING_INTERVAL) >= 0) {
sendPing();
}
return callEachNode("/system/cluster/state/" + getClusterAPIToken()).map(this::parseNodeState)
.sorted(Comparator.comparing(NodeInfo::getName))
.toList();
}
private NodeInfo parseNodeState(ObjectNode response) {
NodeInfo result = new NodeInfo();
Json.tryValueString(response, RESPONSE_NODE_NAME).ifPresent(result::setName);
if (response.path(RESPONSE_ERROR).asBoolean()) {
result.setNodeState(MetricState.RED);
return result;
}
result.setNodeState(MetricState.valueOf(Json.tryValueString(response, ClusterController.RESPONSE_NODE_STATE)
.orElse(null)));
Json.tryValueString(response, ClusterController.RESPONSE_UPTIME).ifPresent(result::setUptime);
ArrayNode nodeMetrics = Json.getArray(response, ClusterController.RESPONSE_METRICS);
for (int i = 0; i < nodeMetrics.size(); i++) {
try {
ObjectNode jsonMetric = (ObjectNode) nodeMetrics.get(i);
Metric metric =
new Metric(Json.tryValueString(jsonMetric, ClusterController.RESPONSE_CODE).orElse(null),
Json.tryValueString(jsonMetric, ClusterController.RESPONSE_LABEL).orElse(null),
Json.tryGet(jsonMetric, ClusterController.RESPONSE_VALUE)
.map(JsonNode::asDouble)
.orElse(null),
MetricState.valueOf(Json.tryValueString(jsonMetric, ClusterController.RESPONSE_STATE)
.orElse(null)),
Json.tryValueString(jsonMetric, ClusterController.RESPONSE_UNIT).orElse(null));
result.getMetrics().add(metric);
} catch (Exception exception) {
Exceptions.handle(Cluster.LOG, exception);
}
}
return result;
}
}