Skip to content

Commit

Permalink
feat: add replicated maps
Browse files Browse the repository at this point in the history
  • Loading branch information
halber committed Oct 9, 2024
1 parent 1567b39 commit 51eefe0
Show file tree
Hide file tree
Showing 13 changed files with 794 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json, NeonBee
obj.setTrackingDataHandlingStrategy((String) member.getValue());
}
break;
case "useReplicatedMaps":
if (member.getValue() instanceof Boolean) {
obj.setUseReplicatedMaps((Boolean) member.getValue());
}
break;
case "verticleDeploymentTimeout":
if (member.getValue() instanceof Number) {
obj.setVerticleDeploymentTimeout(((Number) member.getValue()).intValue());
Expand Down Expand Up @@ -155,6 +160,7 @@ static void toJson(NeonBeeConfig obj, java.util.Map<String, Object> json) {
if (obj.getTrackingDataHandlingStrategy() != null) {
json.put("trackingDataHandlingStrategy", obj.getTrackingDataHandlingStrategy());
}
json.put("useReplicatedMaps", obj.isUseReplicatedMaps());
if (obj.getVerticleDeploymentTimeout() != null) {
json.put("verticleDeploymentTimeout", obj.getVerticleDeploymentTimeout());
}
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/io/neonbee/NeonBee.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import io.neonbee.internal.Registry;
import io.neonbee.internal.ReplyInboundInterceptor;
import io.neonbee.internal.SharedDataAccessor;
import io.neonbee.internal.SharedDataAccessorFactory;
import io.neonbee.internal.WriteSafeRegistry;
import io.neonbee.internal.buffer.ImmutableBuffer;
import io.neonbee.internal.cluster.ClusterHelper;
Expand Down Expand Up @@ -449,7 +450,8 @@ private Future<Void> registerHooks() {
*/
@VisibleForTesting
Future<Void> initializeSharedMaps() {
SharedDataAccessor sharedData = new SharedDataAccessor(vertx, NeonBee.class);
SharedDataAccessor sharedData = new SharedDataAccessorFactory(this)
.getSharedDataAccessor(NeonBee.class);
sharedLocalMap = sharedData.getLocalMap(SHARED_MAP_NAME);
return sharedData.<String, Object>getAsyncMap(SHARED_MAP_NAME).onSuccess(asyncMap -> sharedAsyncMap = asyncMap)
.mapEmpty();
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/io/neonbee/cache/CachingDataVerticle.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.neonbee.data.DataRequest;
import io.neonbee.data.DataVerticle;
import io.neonbee.internal.SharedDataAccessor;
import io.neonbee.internal.SharedDataAccessorFactory;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
Expand Down Expand Up @@ -147,7 +148,8 @@ public void init(Vertx vertx, Context context) {

// we will only need to retrieve locks if we coalesce requests
if (coalescingTimeout > 0) {
sharedDataAccessor = new SharedDataAccessor(vertx, getClass());
sharedDataAccessor = new SharedDataAccessorFactory(vertx)
.getSharedDataAccessor(getClass());
}
}

Expand Down
38 changes: 37 additions & 1 deletion src/main/java/io/neonbee/config/NeonBeeConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

/**
* In contrast to the {@link NeonBeeOptions} the {@link NeonBeeConfig} is persistent configuration in a file.
*
* <p>
* Whilst the {@link NeonBeeOptions} contain information which is to specify when NeonBee starts, such as the port of
* the server to start on and the cluster to connect to, which potentially could be different across cluster nodes, the
* {@link NeonBeeConfig} contains information which is mostly shared across different cluster nodes or you would like to
Expand Down Expand Up @@ -100,6 +100,8 @@ public class NeonBeeConfig {

private int jsonMaxStringSize;

private boolean useReplicatedMaps;

/**
* Are the metrics enabled?
*
Expand Down Expand Up @@ -535,4 +537,38 @@ public NeonBeeConfig setJsonMaxStringSize(int jsonMaxStringSize) {
public int getJsonMaxStringSize() {
return jsonMaxStringSize;
}

/**
* Set the value to enable, disable replicated maps.
* <p>
* <b>Currently this feature is only supported for a Hazelcast-Cluster</b>
* <p>
* Replicated maps are a distributed data structure that provides a way to replicate data across multiple nodes in a
* cluster. Replicated maps are useful when you need to replicate data across multiple nodes in a cluster, and you
* don't need to partition the data.
* <p>
*
* @return true if the replicated maps should be used, false otherwise
*/
public boolean isUseReplicatedMaps() {
return useReplicatedMaps;
}

/**
* Set the value to enable, disable replicated maps.
* <p>
* <b>Currently this feature is only supported for a Hazelcast-Cluster</b>
* <p>
* Replicated maps are a distributed data structure that provides a way to replicate data across multiple nodes in a
* cluster. Replicated maps are useful when you need to replicate data across multiple nodes in a cluster, and you
* don't need to partition the data.
* <p>
*
* @param useReplicatedMaps true if the replicated maps should be enabled, false otherwise
* @return the {@linkplain NeonBeeConfig} for fluent use
*/
public NeonBeeConfig setUseReplicatedMaps(boolean useReplicatedMaps) {
this.useReplicatedMaps = useReplicatedMaps;
return this;
}
}
52 changes: 30 additions & 22 deletions src/main/java/io/neonbee/endpoint/odatav4/ODataV4Endpoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import io.neonbee.endpoint.odatav4.internal.olingo.OlingoEndpointHandler;
import io.neonbee.entity.EntityModel;
import io.neonbee.internal.RegexBlockList;
import io.neonbee.internal.SharedDataAccessor;
import io.neonbee.internal.SharedDataAccessorFactory;
import io.neonbee.logging.LoggingFacade;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
Expand Down Expand Up @@ -201,27 +201,35 @@ public Future<Router> createEndpointRouter(Vertx vertx, String basePath, JsonObj
// when NeonBee is started and / or in case the endpoint is not used.
Route initialRoute = router.route();
initialRoute.handler(
routingContext -> new SharedDataAccessor(vertx, ODataV4Endpoint.class).getLocalLock(asyncLock ->
// immediately initialize the router, this will also "arm" the event bus listener
(!initialized.getAndSet(true)
? refreshRouter(vertx, router, basePath, uriConversion, exposedEntities, models)
: succeededFuture()).onComplete(handler -> {
// wait for the refresh to finish (the result doesn't matter), remove the initial route, as
// this will redirect all requests to the registered service endpoint handlers (if non have
// been registered, e.g. due to a failure in model loading, it'll result in an 404). Could
// have been removed already by refreshRouter, we don't care!
initialRoute.remove();
if (asyncLock.succeeded()) {
// releasing the lock will cause other requests unblock and not call the initial route
asyncLock.result().release();
}

// let the router again handle the context again, now with either all service endpoints
// registered, or none in case there have been a failure while loading the models.
// NOTE: Re-route is the only elegant way I found to restart the current router to take
// the new routes! Might consider checking again with the Vert.x 4.0 release.
routingContext.reroute(routingContext.request().uri());
})));
routingContext -> new SharedDataAccessorFactory(vertx)
.getSharedDataAccessor(ODataV4Endpoint.class)
.getLocalLock(asyncLock ->
// immediately initialize the router, this will also "arm" the event bus listener
(!initialized.getAndSet(true)
? refreshRouter(vertx, router, basePath, uriConversion, exposedEntities, models)
: succeededFuture()).onComplete(handler -> {
// wait for the refresh to finish (the result doesn't matter), remove the initial
// route, as
// this will redirect all requests to the registered service endpoint handlers (if
// non have
// been registered, e.g. due to a failure in model loading, it'll result in an 404).
// Could
// have been removed already by refreshRouter, we don't care!
initialRoute.remove();
if (asyncLock.succeeded()) {
// releasing the lock will cause other requests unblock and not call the initial
// route
asyncLock.result().release();
}

// let the router again handle the context again, now with either all service
// endpoints
// registered, or none in case there have been a failure while loading the models.
// NOTE: Re-route is the only elegant way I found to restart the current router to
// take
// the new routes! Might consider checking again with the Vert.x 4.0 release.
routingContext.reroute(routingContext.request().uri());
})));

return succeededFuture(router);
}
Expand Down
7 changes: 5 additions & 2 deletions src/main/java/io/neonbee/entity/EntityModelManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import com.google.common.base.Functions;

import io.neonbee.NeonBee;
import io.neonbee.internal.SharedDataAccessor;
import io.neonbee.internal.SharedDataAccessorFactory;
import io.neonbee.logging.LoggingFacade;
import io.vertx.core.Future;
import io.vertx.core.eventbus.DeliveryOptions;
Expand Down Expand Up @@ -147,7 +147,10 @@ public Future<Map<String, EntityModel>> getSharedModels() {
}

// if not try to reload the models and return the loaded data model
return new SharedDataAccessor(neonBee.getVertx(), EntityModelManager.class).getLocalLock()

return new SharedDataAccessorFactory(neonBee)
.getSharedDataAccessor(EntityModelManager.class)
.getLocalLock()
.transform(asyncLocalLock -> {
Map<String, EntityModel> retryModels = getBufferedModels();
if (retryModels != null) {
Expand Down
36 changes: 36 additions & 0 deletions src/main/java/io/neonbee/internal/SharedDataAccessorFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package io.neonbee.internal;

import io.neonbee.NeonBee;
import io.neonbee.config.NeonBeeConfig;
import io.neonbee.internal.cluster.ClusterHelper;
import io.neonbee.internal.hazelcast.ReplicatedDataAccessor;
import io.vertx.core.Vertx;

public class SharedDataAccessorFactory {
private final NeonBee neonBee;

public SharedDataAccessorFactory() {
this.neonBee = NeonBee.get();
}

public SharedDataAccessorFactory(Vertx vertx) {
this.neonBee = NeonBee.get(vertx);
}

public SharedDataAccessorFactory(NeonBee neonBee) {
this.neonBee = neonBee;
}

private boolean useHazelcastReplicatedMaps() {
NeonBeeConfig config = neonBee.getConfig();
return config.isUseReplicatedMaps() && ClusterHelper.getHazelcastClusterManager(neonBee.getVertx()).isPresent();
}

public SharedDataAccessor getSharedDataAccessor(Class<?> accessClass) {
if (useHazelcastReplicatedMaps()) {
return new ReplicatedDataAccessor(neonBee.getVertx(), accessClass);
} else {
return new SharedDataAccessor(neonBee.getVertx(), accessClass);
}
}
}
3 changes: 2 additions & 1 deletion src/main/java/io/neonbee/internal/WriteSafeRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ public class WriteSafeRegistry<T> implements Registry<T> {
*/
public WriteSafeRegistry(Vertx vertx, String registryName) {
this.registryName = registryName;
this.sharedDataAccessor = new SharedDataAccessor(vertx, this.getClass());
this.sharedDataAccessor = new SharedDataAccessorFactory(vertx)
.getSharedDataAccessor(this.getClass());
}

/**
Expand Down
Loading

0 comments on commit 51eefe0

Please sign in to comment.