Skip to content

Commit

Permalink
[ISSUE-3768][Improve]Thread pool instances are created by implementat…
Browse files Browse the repository at this point in the history
…ions in the JDK (#3775)

Co-authored-by: benjobs <[email protected]>
  • Loading branch information
zzzk1 and wolfboys authored Jun 18, 2024
1 parent 0eaef6a commit bbfbdab
Showing 1 changed file with 46 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurerSupport;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
Expand All @@ -34,41 +33,21 @@
@Configuration
public class AsyncExecutorPoolConfig extends AsyncConfigurerSupport {

/**
* Create a standard asynchronous task performer.
*
* @return Executor
*/
@Bean
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

executor.setCorePoolSize(5);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(100);
executor.setKeepAliveSeconds(30);
executor.setThreadNamePrefix("asyncTaskExecutor-");

executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return executor;
}

/**
* Create a ThreadPoolTaskExecutor for SavePointService.
*
* @return Executor
*/
@Bean("triggerSavepointExecutor")
public Executor savepointExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 5);
executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 10);
executor.setQueueCapacity(1024);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("trigger-savepoint-executor-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
return executor;
return new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors() * 5,
Runtime.getRuntime().availableProcessors() * 10,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1024),
ThreadUtils.threadFactory("trigger-savepoint-executor-"),
new ThreadPoolExecutor.AbortPolicy());
}

/**
Expand All @@ -78,14 +57,13 @@ public Executor savepointExecutor() {
*/
@Bean("flinkRestAPIWatchingExecutor")
public Executor restAPIWatchingExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 5);
executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 10);
executor.setQueueCapacity(1024);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("flink-restapi-watching-executor-");
return executor;
return new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors() * 5,
Runtime.getRuntime().availableProcessors() * 10,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1024),
ThreadUtils.threadFactory("flink-restapi-watching-executor-"));
}

/**
Expand All @@ -95,14 +73,13 @@ public Executor restAPIWatchingExecutor() {
*/
@Bean("flinkClusterWatchingExecutor")
public Executor clusterWatchingExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 5);
executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 10);
executor.setQueueCapacity(1024);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("flink-cluster-watching-executor-");
return executor;
return new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors() * 5,
Runtime.getRuntime().availableProcessors() * 10,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1024),
ThreadUtils.threadFactory("flink-cluster-watching-executor-"));
}

/**
Expand Down Expand Up @@ -146,15 +123,14 @@ public ExecutorService clusterExecutor() {
*/
@Bean("streamparkNotifyExecutor")
public Executor notifyExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 5);
executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 10);
executor.setQueueCapacity(1024);
executor.setKeepAliveSeconds(20);
executor.setThreadNamePrefix("streampark-notify-executor-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
return executor;
return new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors() * 5,
Runtime.getRuntime().availableProcessors() * 10,
20L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1024),
ThreadUtils.threadFactory("streampark-notify-executor-"),
new ThreadPoolExecutor.AbortPolicy());
}

/**
Expand All @@ -164,15 +140,14 @@ public Executor notifyExecutor() {
*/
@Bean("streamparkDeployExecutor")
public Executor deployExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 5);
executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 10);
executor.setQueueCapacity(1024);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("streampark-deploy-executor-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
return executor;
return new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors() * 5,
Runtime.getRuntime().availableProcessors() * 10,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1024),
ThreadUtils.threadFactory("streampark-deploy-executor-"),
new ThreadPoolExecutor.AbortPolicy());
}

/**
Expand All @@ -182,14 +157,13 @@ public Executor deployExecutor() {
*/
@Bean("streamparkBuildExecutor")
public Executor buildExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 5);
executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 10);
executor.setQueueCapacity(1024);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("streampark-build-executor-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
return executor;
return new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors() * 5,
Runtime.getRuntime().availableProcessors() * 10,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1024),
ThreadUtils.threadFactory("streampark-build-executor-"),
new ThreadPoolExecutor.AbortPolicy());
}
}

0 comments on commit bbfbdab

Please sign in to comment.