Skip to content

Commit

Permalink
fix: entity unregister node
Browse files Browse the repository at this point in the history
When the `unregisterNode` method is called for another NeonBee node,
and the executing node has an `EntityVerticle` that was also registered
by the other node, the `EntityVerticle` was incorrectly unregistered
for the executing node. This fix ensures that the `EntityVerticle`
remains registered for the executing node.
  • Loading branch information
halber committed Oct 23, 2024
1 parent 64d2ac4 commit 6897374
Show file tree
Hide file tree
Showing 6 changed files with 511 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ public class ClusterEntityRegistry implements Registry<String> {
@VisibleForTesting
final WriteSafeRegistry<JsonObject> clusteringInformation;

private final Vertx vertx;
final WriteSafeRegistry<Object> entityRegistry;

private final WriteSafeRegistry<Object> entityRegistry;
private final Vertx vertx;

/**
* Create a new instance of {@link ClusterEntityRegistry}.
Expand Down Expand Up @@ -76,8 +76,11 @@ public Future<Void> register(String sharedMapKey, String value) {

@Override
public Future<Void> unregister(String sharedMapKey, String value) {
return Future.all(entityRegistry.unregister(sharedMapKey, value), clusteringInformation
.unregister(getClusterNodeId(), clusterRegistrationInformation(sharedMapKey, value)))
return Future.all(
entityRegistry.unregister(sharedMapKey, value),
clusteringInformation.unregister(
getClusterNodeId(),
clusterRegistrationInformation(sharedMapKey, value)))
.mapEmpty();
}

Expand Down Expand Up @@ -123,28 +126,37 @@ Future<Void> removeClusteringInformation(String clusterNodeId) {
* @return the future
*/
public Future<Void> unregisterNode(String clusterNodeId) {
return clusteringInformation.getSharedMap().compose(AsyncMap::entries).compose(map -> {
JsonArray registeredEntities = (JsonArray) map.remove(clusterNodeId);

if (registeredEntities == null) {
// If no entities are registered, return a completed future
return Future.succeededFuture();
}
registeredEntities = registeredEntities.copy();
List<Future<?>> futureList = new ArrayList<>(registeredEntities.size());
for (Object o : registeredEntities) {
if (remove(map, o)) {
JsonObject jo = (JsonObject) o;
String entityName = jo.getString(ENTITY_NAME_KEY);
String qualifiedName = jo.getString(QUALIFIED_NAME_KEY);
futureList.add(unregister(entityName, qualifiedName));
}
}
return Future.join(futureList).mapEmpty();
}).compose(cf -> removeClusteringInformation(clusterNodeId));
return clusteringInformation.getSharedMap()
.compose(AsyncMap::entries)
.compose(map -> {
JsonArray registeredEntities = (JsonArray) map.remove(clusterNodeId);

if (registeredEntities == null) {
// If no entities are registered, return a completed future
return Future.succeededFuture();
}
registeredEntities = registeredEntities.copy();
List<Future<?>> futureList = new ArrayList<>(registeredEntities.size());
for (Object o : registeredEntities) {
if (shouldRemove(map, o)) {
JsonObject jo = (JsonObject) o;
String entityName = jo.getString(ENTITY_NAME_KEY);
String qualifiedName = jo.getString(QUALIFIED_NAME_KEY);
futureList.add(entityRegistry.unregister(entityName, qualifiedName));
}
}
return Future.join(futureList).mapEmpty();
}).compose(cf -> removeClusteringInformation(clusterNodeId));
}

private boolean remove(Map<String, Object> map, Object o) {
/**
* Check if the provided object should be removed from the map.
*
* @param map the map
* @param o the object
* @return true if the object should be removed
*/
private boolean shouldRemove(Map<String, Object> map, Object o) {
for (Map.Entry<String, Object> node : map.entrySet()) {
JsonArray ja = (JsonArray) node.getValue();
if (ja.contains(o)) {
Expand Down
Loading

0 comments on commit 6897374

Please sign in to comment.