Skip to content

Commit

Permalink
Shutdown ClusterTopologyRefreshTask when RedisClusterClient is shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
thachlp committed Dec 31, 2024
1 parent ce92e6a commit 34d1ae6
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,15 @@ public void suspendTopologyRefresh() {
}
}

/**
* Cancel any scheduled or running topology refresh tasks.
*/
public void cancelTopologyRefreshTask() {
if (clusterTopologyRefreshTask.get()) {
clusterTopologyRefreshTask.cancel();
}
}

public boolean isTopologyRefreshInProgress() {
return clusterTopologyRefreshTask.get();
}
Expand Down Expand Up @@ -317,12 +326,18 @@ private static class ClusterTopologyRefreshTask extends AtomicBoolean implements

private final Supplier<CompletionStage<?>> reloadTopologyAsync;

private final AtomicBoolean isCanceled = new AtomicBoolean(false);

ClusterTopologyRefreshTask(Supplier<CompletionStage<?>> reloadTopologyAsync) {
this.reloadTopologyAsync = reloadTopologyAsync;
}

public void run() {

if (isCanceled.get()) {
return;
}

if (compareAndSet(false, true)) {
doRun();
return;
Expand Down Expand Up @@ -352,6 +367,10 @@ void doRun() {
}
}

void cancel() {
isCanceled.set(true);
}

}

}
11 changes: 11 additions & 0 deletions src/main/java/io/lettuce/core/cluster/RedisClusterClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -972,6 +972,17 @@ public void suspendTopologyRefresh() {
topologyRefreshScheduler.suspendTopologyRefresh();
}

/**
* Cancel any running topology refresh tasks. This method calls
* {@link ClusterTopologyRefreshScheduler#cancelTopologyRefreshTask()} to ensure that any ongoing refresh tasks are properly
* stopped. It is typically called during the shutdown process to clean up resources and prevent any further topology
* refresh operations.
*
*/
public void cancelTopologyRefresh() {
topologyRefreshScheduler.cancelTopologyRefreshTask();
}

/**
* Return whether a scheduled or adaptive topology refresh is in progress.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -715,4 +715,21 @@ public void clear() {

}

@Test
void shouldCancelTopologyRefreshTaskOnShutdown() {
ClusterTopologyRefreshOptions refreshOptions = ClusterTopologyRefreshOptions.builder()
.enablePeriodicRefresh(Duration.ofSeconds(1)).build();
RedisClusterClient clusterClient = RedisClusterClient.create(TestClientResources.get(),
RedisURI.Builder.redis(TestSettings.host(), ClusterTestSettings.port1).build());
clusterClient.setOptions(ClusterClientOptions.builder().topologyRefreshOptions(refreshOptions).build());
clusterClient.connect().sync();
Delay.delay(Duration.ofMillis(1500));
assertThat(clusterClient.isTopologyRefreshInProgress()).isTrue();

clusterClient.shutdownAsync(0, 10, TimeUnit.SECONDS).join();

assertThat(clusterClient.isTopologyRefreshInProgress()).isFalse();
FastShutdown.shutdown(clusterClient);
}

}

0 comments on commit 34d1ae6

Please sign in to comment.