Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the maximum blocking time config for SyncPutQueuePolicy #1592

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,50 @@

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* Synchronous put queue policy.
*/
@Slf4j
public class SyncPutQueuePolicy implements RejectedExecutionHandler {

// The timeout value for the offer method (ms).
private int timeout;

private final boolean enableTimeout;

public SyncPutQueuePolicy(int timeout){
if (timeout < 0){
throw new IllegalArgumentException("timeout must be greater than 0");
}
this.timeout = timeout;
this.enableTimeout = true;
}

public SyncPutQueuePolicy (){
this.enableTimeout = false;
}

@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (executor.isShutdown()) {
return;
}
try {
executor.getQueue().put(r);
if (enableTimeout) {
if (!executor.getQueue().offer(r, timeout, TimeUnit.MILLISECONDS)) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
executor.toString() + " with timeout " + timeout + "ms.");
}
}
else {
executor.getQueue().put(r);
}
} catch (InterruptedException e) {
log.error("Adding Queue task to thread pool failed.", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,23 @@
import org.junit.Test;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

import static org.junit.Assert.fail;

/**
* Synchronous placement queue policy implementation test
*/
public class SyncPutQueuePolicyTest {

/**
* test thread pool rejected execution
* test thread pool rejected execution without timeout
*/
@Test
public void testRejectedExecution() {
public void testRejectedExecutionWithoutTimeout() {
SyncPutQueuePolicy syncPutQueuePolicy = new SyncPutQueuePolicy();
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 2,
60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1), syncPutQueuePolicy);
Expand All @@ -50,4 +53,34 @@ public void testRejectedExecution() {
}
Assert.assertEquals(4, threadPoolExecutor.getCompletedTaskCount());
}

/**
* test thread pool rejected execution with timeout
*/
@Test
public void testRejectedExecutionWithTimeout() {
SyncPutQueuePolicy syncPutQueuePolicy = new SyncPutQueuePolicy(300);
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1), syncPutQueuePolicy);
threadPoolExecutor.prestartAllCoreThreads();

Assert.assertSame(syncPutQueuePolicy, threadPoolExecutor.getRejectedExecutionHandler());
IntStream.range(0, 4).forEach(s -> {
threadPoolExecutor.execute(() -> ThreadUtil.sleep(200L));
});
IntStream.range(0, 2).forEach(s -> {
threadPoolExecutor.execute(() -> ThreadUtil.sleep(500L));
});
try {
threadPoolExecutor.execute(() -> ThreadUtil.sleep(100L));
ThreadUtil.sleep(1000L);
fail("should throw RejectedExecutionException");
} catch (Exception e) {
Assert.assertTrue(e instanceof RejectedExecutionException);
}
threadPoolExecutor.shutdown();
while (!threadPoolExecutor.isTerminated()) {
}
Assert.assertEquals(6, threadPoolExecutor.getCompletedTaskCount());
}
}