From 845ebd2e338022a4e4c2135eadbf3034039d66b7 Mon Sep 17 00:00:00 2001 From: Sheng Cheng Date: Mon, 13 Aug 2012 13:11:07 -0700 Subject: [PATCH 1/4] This patch fix issue#494 and add a new feature for tracking socket timeouts. - The trackers are disabled by default. --- .../connection/HConnectionManager.java | 37 ++++- .../connection/HostTimeoutTracker.java | 49 +++--- .../connection/SocketTimeoutTracker.java | 61 +++++++ .../service/CassandraHostConfigurator.java | 28 ++++ .../connection/HostTimeoutTrackerTest.java | 150 ++++++++++++++---- .../connection/SocketTimeoutTrackerTest.java | 140 ++++++++++++++++ 6 files changed, 409 insertions(+), 56 deletions(-) create mode 100644 core/src/main/java/me/prettyprint/cassandra/connection/SocketTimeoutTracker.java create mode 100644 core/src/test/java/me/prettyprint/cassandra/connection/SocketTimeoutTrackerTest.java diff --git a/core/src/main/java/me/prettyprint/cassandra/connection/HConnectionManager.java b/core/src/main/java/me/prettyprint/cassandra/connection/HConnectionManager.java index d60a58dc3..c5f20545c 100644 --- a/core/src/main/java/me/prettyprint/cassandra/connection/HConnectionManager.java +++ b/core/src/main/java/me/prettyprint/cassandra/connection/HConnectionManager.java @@ -43,6 +43,7 @@ public class HConnectionManager { private final CassandraHostConfigurator cassandraHostConfigurator; private final HClientFactory clientFactory; private HostTimeoutTracker hostTimeoutTracker; + private SocketTimeoutTracker socketTimeoutTracker; private final ClockResolution clock; final ExceptionsTranslator exceptionsTranslator; @@ -78,6 +79,11 @@ public HConnectionManager(String clusterName, CassandraHostConfigurator cassandr if ( cassandraHostConfigurator.getUseHostTimeoutTracker() ) { hostTimeoutTracker = new HostTimeoutTracker(this, cassandraHostConfigurator); } + + if ( cassandraHostConfigurator.getUseSocketTimeoutTracker() ) { + socketTimeoutTracker = new SocketTimeoutTracker(this, cassandraHostConfigurator); + } + monitor = JmxMonitor.getInstance().getCassandraMonitor(this); exceptionsTranslator = new ExceptionsTranslatorImpl(); this.cassandraHostConfigurator = cassandraHostConfigurator; @@ -270,16 +276,13 @@ public void operateWithFailover(Operation op) throws HectorException { throw he; } else if (he instanceof HectorTransportException) { closeClient(client); - markHostAsDown(pool.getCassandraHost()); - excludeHosts.add(pool.getCassandraHost()); + doSocketTimeoutCheck(pool.getCassandraHost(), excludeHosts); retryable = true; - monitor.incCounter(Counter.RECOVERABLE_TRANSPORT_EXCEPTIONS); - } else if (he instanceof HTimedOutException ) { // DO NOT drecrement retries, we will be keep retrying on timeouts until it comes back // if HLT.checkTimeout(cassandraHost): suspendHost(cassandraHost); - doTimeoutCheck(pool.getCassandraHost()); + doHostTimeoutCheck(pool.getCassandraHost()); retryable = true; @@ -362,14 +365,34 @@ public void removeAllListeners(){ * we are configured for such AND there is more than one operating host pool * @param cassandraHost */ - private void doTimeoutCheck(CassandraHost cassandraHost) { + private void doHostTimeoutCheck(final CassandraHost cassandraHost) { if ( hostTimeoutTracker != null && hostPools.size() > 1) { - if (hostTimeoutTracker.checkTimeout(cassandraHost) ) { + if (hostTimeoutTracker.penalizeTimeout(cassandraHost) ) { suspendCassandraHost(cassandraHost); } } } + /** + * If useHostTimeoutTracker is enabled, initiate a pool shutdown only if + * socketTimeoutTracker consider the Cassandra host is down. + * + * If SocketTimeoutCheck is disabled, initiate a pool shutdown immediately. + * + * @param cassandraHost + */ + private void doSocketTimeoutCheck(final CassandraHost cassandraHost, final Set excludeHosts) { + if ( socketTimeoutTracker != null ) { + if (socketTimeoutTracker.penalizeTimeout(cassandraHost) ) { + markHostAsDown(cassandraHost); + excludeHosts.add(cassandraHost); + } + } else { + markHostAsDown(cassandraHost); + excludeHosts.add(cassandraHost); + } + } + /** * Sleeps for the specified time as determined by sleepBetweenHostsMilli. * In many cases failing over to other hosts is done b/c the cluster is too busy, so the sleep b/w diff --git a/core/src/main/java/me/prettyprint/cassandra/connection/HostTimeoutTracker.java b/core/src/main/java/me/prettyprint/cassandra/connection/HostTimeoutTracker.java index 7bdf0fa0e..e863a0795 100644 --- a/core/src/main/java/me/prettyprint/cassandra/connection/HostTimeoutTracker.java +++ b/core/src/main/java/me/prettyprint/cassandra/connection/HostTimeoutTracker.java @@ -27,10 +27,10 @@ public class HostTimeoutTracker extends BackgroundCassandraHostService { private static final Logger log = LoggerFactory.getLogger(HostTimeoutTracker.class); - private ConcurrentHashMap> timeouts; + private ConcurrentHashMap> hostTimeouts; private ConcurrentHashMap suspended; - private int timeoutCounter; - private int timeoutWindow; + private int hostTimeoutCounter; + private int hostTimeoutWindow; private int nodeSuspensionDurationInSeconds; public static final int DEF_TIMEOUT_COUNTER = 10; @@ -43,29 +43,40 @@ public HostTimeoutTracker(HConnectionManager connectionManager, CassandraHostConfigurator cassandraHostConfigurator) { super(connectionManager, cassandraHostConfigurator); retryDelayInSeconds = cassandraHostConfigurator.getHostTimeoutUnsuspendCheckDelay(); - timeouts = new ConcurrentHashMap>(); + hostTimeouts = new ConcurrentHashMap>(); suspended = new ConcurrentHashMap(); sf = executor.scheduleWithFixedDelay(new Unsuspender(), retryDelayInSeconds,retryDelayInSeconds, TimeUnit.SECONDS); - timeoutCounter = cassandraHostConfigurator.getHostTimeoutCounter(); - timeoutWindow = cassandraHostConfigurator.getHostTimeoutWindow(); + hostTimeoutCounter = cassandraHostConfigurator.getHostTimeoutCounter(); + hostTimeoutWindow = cassandraHostConfigurator.getHostTimeoutWindow(); nodeSuspensionDurationInSeconds = cassandraHostConfigurator.getHostTimeoutSuspensionDurationInSeconds(); } - public boolean checkTimeout(CassandraHost cassandraHost) { - timeouts.putIfAbsent(cassandraHost, new LinkedBlockingQueue()); - long currentTimeMillis = System.currentTimeMillis(); - timeouts.get(cassandraHost).add(currentTimeMillis); - boolean timeout = false; - // if there are 3 timeouts within 500ms, return false - if ( timeouts.get(cassandraHost).size() > timeoutCounter) { - Long last = timeouts.get(cassandraHost).remove(); - if (last.longValue() < (currentTimeMillis - timeoutWindow)) { - timeout = true; + public boolean penalizeTimeout(CassandraHost cassandraHost) { + final long currentTimeMillis = System.currentTimeMillis(); + if (hostTimeoutCounter <= 1 ) + { connectionManager.suspendCassandraHost(cassandraHost); - suspended.putIfAbsent(cassandraHost, currentTimeMillis); + suspended.putIfAbsent(cassandraHost, currentTimeMillis); + return true; + } + + hostTimeouts.putIfAbsent(cassandraHost, new LinkedBlockingQueue(hostTimeoutCounter - 1 )); + + if(hostTimeouts.get(cassandraHost).offer(currentTimeMillis)) { + return false; + } else { + final long oldestTimeoutMillis = hostTimeouts.get(cassandraHost).peek(); + hostTimeouts.get(cassandraHost).poll(); + hostTimeouts.get(cassandraHost).offer(currentTimeMillis); + if (currentTimeMillis - oldestTimeoutMillis < hostTimeoutWindow) { + connectionManager.suspendCassandraHost(cassandraHost); + suspended.putIfAbsent(cassandraHost, currentTimeMillis); + return true; + } else { + return false; } - } - return timeout; + } + } class Unsuspender implements Runnable { diff --git a/core/src/main/java/me/prettyprint/cassandra/connection/SocketTimeoutTracker.java b/core/src/main/java/me/prettyprint/cassandra/connection/SocketTimeoutTracker.java new file mode 100644 index 000000000..1a19a5527 --- /dev/null +++ b/core/src/main/java/me/prettyprint/cassandra/connection/SocketTimeoutTracker.java @@ -0,0 +1,61 @@ +package me.prettyprint.cassandra.connection; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; + +import me.prettyprint.cassandra.service.CassandraHost; +import me.prettyprint.cassandra.service.CassandraHostConfigurator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Keep track of how often a node replies with a HectorTransportException. If we go + * past the threshold of [socketTimeoutCounter] timeouts within [socketTimeoutWindow] + * milliseconds, then penalizeTimeout method will return true. (10 timeouts within 500ms + * by default) + * + * @author Sheng Cheng + */ +public class SocketTimeoutTracker { + private static final Logger log = LoggerFactory.getLogger(SocketTimeoutTracker.class); + private final ConcurrentHashMap> socketTimeouts; + + private final int socketTimeoutCounter; + private final int socketTimeoutWindow; + + public static final int DEF_SOCKET_TIMEOUT_COUNTER = 10; + public static final int DEF_SOCKET_TIMEOUT_WINDOW = 500; + + public SocketTimeoutTracker(HConnectionManager connectionManager, + CassandraHostConfigurator cassandraHostConfigurator) { + socketTimeouts = new ConcurrentHashMap>(); + socketTimeoutCounter = cassandraHostConfigurator.getSocketTimeoutCounter(); + socketTimeoutWindow = cassandraHostConfigurator.getSocketTimeoutWindow(); + } + + public boolean penalizeTimeout(CassandraHost cassandraHost) { + if (socketTimeoutCounter <= 1 ) + { + return true; + } + + socketTimeouts.putIfAbsent(cassandraHost, new LinkedBlockingQueue(socketTimeoutCounter - 1 )); + final long currentTimeMillis = System.currentTimeMillis(); + + if(socketTimeouts.get(cassandraHost).offer(currentTimeMillis)) { + return false; + } + else { + final long oldestTimeoutMillis = socketTimeouts.get(cassandraHost).peek(); + socketTimeouts.get(cassandraHost).poll(); + socketTimeouts.get(cassandraHost).offer(currentTimeMillis); + if (currentTimeMillis - oldestTimeoutMillis < socketTimeoutWindow) { + return true; + } else { + return false; + } + } + } + +} diff --git a/core/src/main/java/me/prettyprint/cassandra/service/CassandraHostConfigurator.java b/core/src/main/java/me/prettyprint/cassandra/service/CassandraHostConfigurator.java index d8679cec5..1520a840f 100644 --- a/core/src/main/java/me/prettyprint/cassandra/service/CassandraHostConfigurator.java +++ b/core/src/main/java/me/prettyprint/cassandra/service/CassandraHostConfigurator.java @@ -11,6 +11,7 @@ import me.prettyprint.cassandra.connection.NodeAutoDiscoverService; import me.prettyprint.cassandra.connection.NullOpTimer; import me.prettyprint.cassandra.connection.RoundRobinBalancingPolicy; +import me.prettyprint.cassandra.connection.SocketTimeoutTracker; import me.prettyprint.hector.api.ClockResolution; import me.prettyprint.hector.api.factory.HFactory; @@ -41,9 +42,12 @@ public final class CassandraHostConfigurator implements Serializable { private LoadBalancingPolicy loadBalancingPolicy = new RoundRobinBalancingPolicy(); private int hostTimeoutCounter = HostTimeoutTracker.DEF_TIMEOUT_COUNTER; private int hostTimeoutWindow = HostTimeoutTracker.DEF_TIMEOUT_WINDOW; + private int socketTimeoutCounter = SocketTimeoutTracker.DEF_SOCKET_TIMEOUT_COUNTER; + private int socketTimeoutWindow = SocketTimeoutTracker.DEF_SOCKET_TIMEOUT_WINDOW; private int hostTimeoutSuspensionDurationInSeconds = HostTimeoutTracker.DEF_NODE_SUSPENSION_DURATION_IN_SECONDS; private int hostTimeoutUnsuspendCheckDelay = HostTimeoutTracker.DEF_NODE_UNSUSPEND_CHECK_DELAY_IN_SECONDS; private boolean useHostTimeoutTracker = false; + private boolean useSocketTimeoutTracker = false; private boolean runAutoDiscoveryAtStartup = false; private boolean useSocketKeepalive = false; private HOpTimer opTimer = new NullOpTimer(); @@ -272,6 +276,22 @@ public void setHostTimeoutWindow(int hostTimeoutWindow) { this.hostTimeoutWindow = hostTimeoutWindow; } + public int getSocketTimeoutCounter() { + return socketTimeoutCounter; + } + + public void setSocketTimeoutCounter(int socketTimeoutCounter) { + this.socketTimeoutCounter = socketTimeoutCounter; + } + + public int getSocketTimeoutWindow() { + return socketTimeoutWindow; + } + + public void setSocketTimeoutWindow(int socketTimeoutWindow) { + this.socketTimeoutWindow = socketTimeoutWindow; + } + public int getHostTimeoutSuspensionDurationInSeconds() { return hostTimeoutSuspensionDurationInSeconds; } @@ -296,6 +316,14 @@ public void setUseHostTimeoutTracker(boolean useHostTimeoutTracker) { this.useHostTimeoutTracker = useHostTimeoutTracker; } + public boolean getUseSocketTimeoutTracker() { + return useSocketTimeoutTracker; + } + + public void setUseSocketTimeoutTracker(boolean useSocketTimeoutTracker) { + this.useSocketTimeoutTracker = useSocketTimeoutTracker; + } + public boolean getRunAutoDiscoveryAtStartup() { return runAutoDiscoveryAtStartup; } diff --git a/core/src/test/java/me/prettyprint/cassandra/connection/HostTimeoutTrackerTest.java b/core/src/test/java/me/prettyprint/cassandra/connection/HostTimeoutTrackerTest.java index 833a34ed5..9204c5186 100644 --- a/core/src/test/java/me/prettyprint/cassandra/connection/HostTimeoutTrackerTest.java +++ b/core/src/test/java/me/prettyprint/cassandra/connection/HostTimeoutTrackerTest.java @@ -10,38 +10,128 @@ public class HostTimeoutTrackerTest { - private HostTimeoutTracker hostTimeoutTracker; - @Before - public void setup() { - // map cassandraHost --> arrayBlockingQueue of timestamp Longs - // - if abq.size > size, && peekLast.get > time, add host to excludedMap for ms - // - background thread to sweep for exclusion time expiration every seconds - // - getExcludedHosts calls excludedMap.keySet - // - // HTL with a three timeout trigger durring 500ms intervals - CassandraHostConfigurator cassandraHostConfigurator = new CassandraHostConfigurator("localhost:9170"); - cassandraHostConfigurator.setHostTimeoutCounter(3); - HConnectionManager connectionManager = new HConnectionManager("TestCluster", cassandraHostConfigurator); - hostTimeoutTracker = new HostTimeoutTracker(connectionManager, cassandraHostConfigurator); - } + @Test + public void testRegularLimit() { + CassandraHostConfigurator cassandraHostConfigurator = new CassandraHostConfigurator("localhost:9170"); + cassandraHostConfigurator.setHostTimeoutCounter(3); + cassandraHostConfigurator.setHostTimeoutWindow(500); + HConnectionManager connectionManager = new HConnectionManager("TestCluster", cassandraHostConfigurator); + HostTimeoutTracker hostTimeoutTracker = new HostTimeoutTracker(connectionManager, cassandraHostConfigurator); + CassandraHost cassandraHost = new CassandraHost("localhost:9170"); + + assertFalse(hostTimeoutTracker.penalizeTimeout(cassandraHost)); + assertFalse(hostTimeoutTracker.penalizeTimeout(cassandraHost)); + assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); + assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); + try { + Thread.currentThread().sleep(450); + } catch (InterruptedException e) { + + } + assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); + assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); + try { + Thread.currentThread().sleep(550); + } catch (InterruptedException e) { + + } + assertFalse(hostTimeoutTracker.penalizeTimeout(cassandraHost)); + assertFalse(hostTimeoutTracker.penalizeTimeout(cassandraHost)); + assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); + assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); + } + + @Test + public void testNegtiveTimeoutCounter() { + CassandraHostConfigurator cassandraHostConfigurator = new CassandraHostConfigurator("localhost:9170"); + cassandraHostConfigurator.setHostTimeoutCounter(-1); + cassandraHostConfigurator.setHostTimeoutWindow(500); + HConnectionManager connectionManager = new HConnectionManager("TestCluster", cassandraHostConfigurator); + HostTimeoutTracker hostTimeoutTracker = new HostTimeoutTracker(connectionManager, cassandraHostConfigurator); + CassandraHost cassandraHost = new CassandraHost("localhost:9170"); + + assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); + assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); + + try { + Thread.currentThread().sleep(450); + } catch (InterruptedException e) { + + } + + assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); + assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); + + try { + Thread.currentThread().sleep(550); + } catch (InterruptedException e) { + + } + + assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); + assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); + } + + @Test + public void testZeroTimeoutCounter() { + CassandraHostConfigurator cassandraHostConfigurator = new CassandraHostConfigurator("localhost:9170"); + cassandraHostConfigurator.setHostTimeoutCounter(0); + cassandraHostConfigurator.setHostTimeoutWindow(500); + HConnectionManager connectionManager = new HConnectionManager("TestCluster", cassandraHostConfigurator); + HostTimeoutTracker hostTimeoutTracker = new HostTimeoutTracker(connectionManager, cassandraHostConfigurator); + CassandraHost cassandraHost = new CassandraHost("localhost:9170"); + + assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); + assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); + + try { + Thread.currentThread().sleep(450); + } catch (InterruptedException e) { + + } + + assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); + assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); + + try { + Thread.currentThread().sleep(550); + } catch (InterruptedException e) { + + } + + assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); + assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); + } - @Test - public void testTrackHostLatency() { - CassandraHost cassandraHost = new CassandraHost("localhost:9170"); - assertFalse(hostTimeoutTracker.checkTimeout(cassandraHost)); - assertFalse(hostTimeoutTracker.checkTimeout(cassandraHost)); - assertFalse(hostTimeoutTracker.checkTimeout(cassandraHost)); - try { - Thread.currentThread().sleep(501); - } catch (InterruptedException e) { + @Test + public void testOneTimeoutCounter() { + CassandraHostConfigurator cassandraHostConfigurator = new CassandraHostConfigurator("localhost:9170"); + cassandraHostConfigurator.setHostTimeoutCounter(1); + cassandraHostConfigurator.setHostTimeoutWindow(500); + HConnectionManager connectionManager = new HConnectionManager("TestCluster", cassandraHostConfigurator); + HostTimeoutTracker hostTimeoutTracker = new HostTimeoutTracker(connectionManager, cassandraHostConfigurator); + CassandraHost cassandraHost = new CassandraHost("localhost:9170"); + + assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); + assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); + + try { + Thread.currentThread().sleep(450); + } catch (InterruptedException e) { + + } + + assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); + assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); + + try { + Thread.currentThread().sleep(550); + } catch (InterruptedException e) { + + } + assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); + assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); } - assertTrue(hostTimeoutTracker.checkTimeout(cassandraHost)); - // ... - // in HConnectionManager: - // - if ( hostLatencyTracker.checkTimeout(cassandraHost) ) - // markHostAsDown(cassandraHost); - // excludeHosts.add(cassandraHost); - } } diff --git a/core/src/test/java/me/prettyprint/cassandra/connection/SocketTimeoutTrackerTest.java b/core/src/test/java/me/prettyprint/cassandra/connection/SocketTimeoutTrackerTest.java new file mode 100644 index 000000000..447b822c7 --- /dev/null +++ b/core/src/test/java/me/prettyprint/cassandra/connection/SocketTimeoutTrackerTest.java @@ -0,0 +1,140 @@ +package me.prettyprint.cassandra.connection; + +import static org.junit.Assert.*; + +import org.junit.Before; +import org.junit.Test; + +import me.prettyprint.cassandra.service.CassandraHost; +import me.prettyprint.cassandra.service.CassandraHostConfigurator; + +/** + * @author Sheng Cheng + */ +public class SocketTimeoutTrackerTest { + + @Test + public void testRegularLimit() { + CassandraHostConfigurator cassandraHostConfigurator = new CassandraHostConfigurator("localhost:9170"); + cassandraHostConfigurator.setSocketTimeoutCounter(3); + cassandraHostConfigurator.setSocketTimeoutWindow(500); + HConnectionManager connectionManager = new HConnectionManager("TestCluster", cassandraHostConfigurator); + SocketTimeoutTracker socketTimeoutTracker = new SocketTimeoutTracker(connectionManager, cassandraHostConfigurator); + CassandraHost cassandraHost = new CassandraHost("localhost:9170"); + + assertFalse(socketTimeoutTracker.penalizeTimeout(cassandraHost)); + assertFalse(socketTimeoutTracker.penalizeTimeout(cassandraHost)); + assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); + assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); + try { + Thread.currentThread().sleep(450); + } catch (InterruptedException e) { + + } + assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); + assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); + try { + Thread.currentThread().sleep(550); + } catch (InterruptedException e) { + + } + assertFalse(socketTimeoutTracker.penalizeTimeout(cassandraHost)); + assertFalse(socketTimeoutTracker.penalizeTimeout(cassandraHost)); + assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); + assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); + } + + @Test + public void testNegtiveTimeoutCounter() { + CassandraHostConfigurator cassandraHostConfigurator = new CassandraHostConfigurator("localhost:9170"); + cassandraHostConfigurator.setSocketTimeoutCounter(-1); + cassandraHostConfigurator.setSocketTimeoutWindow(500); + HConnectionManager connectionManager = new HConnectionManager("TestCluster", cassandraHostConfigurator); + SocketTimeoutTracker socketTimeoutTracker = new SocketTimeoutTracker(connectionManager, cassandraHostConfigurator); + CassandraHost cassandraHost = new CassandraHost("localhost:9170"); + + assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); + assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); + + try { + Thread.currentThread().sleep(450); + } catch (InterruptedException e) { + + } + + assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); + assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); + + try { + Thread.currentThread().sleep(550); + } catch (InterruptedException e) { + + } + + assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); + assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); + } + + @Test + public void testZeroTimeoutCounter() { + CassandraHostConfigurator cassandraHostConfigurator = new CassandraHostConfigurator("localhost:9170"); + cassandraHostConfigurator.setSocketTimeoutCounter(0); + cassandraHostConfigurator.setSocketTimeoutWindow(500); + HConnectionManager connectionManager = new HConnectionManager("TestCluster", cassandraHostConfigurator); + SocketTimeoutTracker socketTimeoutTracker = new SocketTimeoutTracker(connectionManager, cassandraHostConfigurator); + CassandraHost cassandraHost = new CassandraHost("localhost:9170"); + + assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); + assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); + + try { + Thread.currentThread().sleep(450); + } catch (InterruptedException e) { + + } + + assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); + assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); + + try { + Thread.currentThread().sleep(550); + } catch (InterruptedException e) { + + } + + assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); + assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); + } + + @Test + public void testOneTimeoutCounter() { + CassandraHostConfigurator cassandraHostConfigurator = new CassandraHostConfigurator("localhost:9170"); + cassandraHostConfigurator.setSocketTimeoutCounter(1); + cassandraHostConfigurator.setSocketTimeoutWindow(500); + HConnectionManager connectionManager = new HConnectionManager("TestCluster", cassandraHostConfigurator); + SocketTimeoutTracker socketTimeoutTracker = new SocketTimeoutTracker(connectionManager, cassandraHostConfigurator); + CassandraHost cassandraHost = new CassandraHost("localhost:9170"); + + assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); + assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); + + try { + Thread.currentThread().sleep(450); + } catch (InterruptedException e) { + + } + + assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); + assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); + + try { + Thread.currentThread().sleep(550); + } catch (InterruptedException e) { + + } + + assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); + assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); + } + +} From cb67cdbaadfb55f8b775b1f5492ca109f3557f97 Mon Sep 17 00:00:00 2001 From: Sheng Cheng Date: Tue, 14 Aug 2012 10:58:10 -0700 Subject: [PATCH 2/4] Minor formatting change. --- .../connection/HConnectionManager.java | 9 +-- .../connection/SocketTimeoutTracker.java | 2 +- .../connection/HostTimeoutTrackerTest.java | 64 +++++------------ .../connection/SocketTimeoutTrackerTest.java | 71 ++++++------------- 4 files changed, 44 insertions(+), 102 deletions(-) diff --git a/core/src/main/java/me/prettyprint/cassandra/connection/HConnectionManager.java b/core/src/main/java/me/prettyprint/cassandra/connection/HConnectionManager.java index c5f20545c..ca5421179 100644 --- a/core/src/main/java/me/prettyprint/cassandra/connection/HConnectionManager.java +++ b/core/src/main/java/me/prettyprint/cassandra/connection/HConnectionManager.java @@ -380,14 +380,11 @@ private void doHostTimeoutCheck(final CassandraHost cassandraHost) { * If SocketTimeoutCheck is disabled, initiate a pool shutdown immediately. * * @param cassandraHost + * @param excludeHosts + * */ private void doSocketTimeoutCheck(final CassandraHost cassandraHost, final Set excludeHosts) { - if ( socketTimeoutTracker != null ) { - if (socketTimeoutTracker.penalizeTimeout(cassandraHost) ) { - markHostAsDown(cassandraHost); - excludeHosts.add(cassandraHost); - } - } else { + if (socketTimeoutTracker == null || socketTimeoutTracker.penalizeTimeout(cassandraHost) ) { markHostAsDown(cassandraHost); excludeHosts.add(cassandraHost); } diff --git a/core/src/main/java/me/prettyprint/cassandra/connection/SocketTimeoutTracker.java b/core/src/main/java/me/prettyprint/cassandra/connection/SocketTimeoutTracker.java index 1a19a5527..da47a5969 100644 --- a/core/src/main/java/me/prettyprint/cassandra/connection/SocketTimeoutTracker.java +++ b/core/src/main/java/me/prettyprint/cassandra/connection/SocketTimeoutTracker.java @@ -35,7 +35,7 @@ public SocketTimeoutTracker(HConnectionManager connectionManager, } public boolean penalizeTimeout(CassandraHost cassandraHost) { - if (socketTimeoutCounter <= 1 ) + if ( socketTimeoutCounter <= 1 ) { return true; } diff --git a/core/src/test/java/me/prettyprint/cassandra/connection/HostTimeoutTrackerTest.java b/core/src/test/java/me/prettyprint/cassandra/connection/HostTimeoutTrackerTest.java index 9204c5186..310526fa7 100644 --- a/core/src/test/java/me/prettyprint/cassandra/connection/HostTimeoutTrackerTest.java +++ b/core/src/test/java/me/prettyprint/cassandra/connection/HostTimeoutTrackerTest.java @@ -11,7 +11,7 @@ public class HostTimeoutTrackerTest { @Test - public void testRegularLimit() { + public void testRegularLimit() throws Exception { CassandraHostConfigurator cassandraHostConfigurator = new CassandraHostConfigurator("localhost:9170"); cassandraHostConfigurator.setHostTimeoutCounter(3); cassandraHostConfigurator.setHostTimeoutWindow(500); @@ -23,18 +23,14 @@ public void testRegularLimit() { assertFalse(hostTimeoutTracker.penalizeTimeout(cassandraHost)); assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); - try { - Thread.currentThread().sleep(450); - } catch (InterruptedException e) { - } + Thread.sleep(450); + assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); - try { - Thread.currentThread().sleep(550); - } catch (InterruptedException e) { - - } + + Thread.sleep(550); + assertFalse(hostTimeoutTracker.penalizeTimeout(cassandraHost)); assertFalse(hostTimeoutTracker.penalizeTimeout(cassandraHost)); assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); @@ -42,7 +38,7 @@ public void testRegularLimit() { } @Test - public void testNegtiveTimeoutCounter() { + public void testNegtiveTimeoutCounter() throws Exception { CassandraHostConfigurator cassandraHostConfigurator = new CassandraHostConfigurator("localhost:9170"); cassandraHostConfigurator.setHostTimeoutCounter(-1); cassandraHostConfigurator.setHostTimeoutWindow(500); @@ -53,27 +49,19 @@ public void testNegtiveTimeoutCounter() { assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); - try { - Thread.currentThread().sleep(450); - } catch (InterruptedException e) { + Thread.sleep(450); - } - assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); - try { - Thread.currentThread().sleep(550); - } catch (InterruptedException e) { - - } - + Thread.sleep(550); + assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); } @Test - public void testZeroTimeoutCounter() { + public void testZeroTimeoutCounter() throws Exception { CassandraHostConfigurator cassandraHostConfigurator = new CassandraHostConfigurator("localhost:9170"); cassandraHostConfigurator.setHostTimeoutCounter(0); cassandraHostConfigurator.setHostTimeoutWindow(500); @@ -83,28 +71,20 @@ public void testZeroTimeoutCounter() { assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); + + Thread.sleep(450); - try { - Thread.currentThread().sleep(450); - } catch (InterruptedException e) { - - } - assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); - - try { - Thread.currentThread().sleep(550); - } catch (InterruptedException e) { - } + Thread.sleep(550); assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); } @Test - public void testOneTimeoutCounter() { + public void testOneTimeoutCounter() throws Exception { CassandraHostConfigurator cassandraHostConfigurator = new CassandraHostConfigurator("localhost:9170"); cassandraHostConfigurator.setHostTimeoutCounter(1); cassandraHostConfigurator.setHostTimeoutWindow(500); @@ -115,21 +95,13 @@ public void testOneTimeoutCounter() { assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); - try { - Thread.currentThread().sleep(450); - } catch (InterruptedException e) { - - } + Thread.sleep(450); assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); - try { - Thread.currentThread().sleep(550); - } catch (InterruptedException e) { - - } - + Thread.sleep(550); + assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); } diff --git a/core/src/test/java/me/prettyprint/cassandra/connection/SocketTimeoutTrackerTest.java b/core/src/test/java/me/prettyprint/cassandra/connection/SocketTimeoutTrackerTest.java index 447b822c7..ad5c83a93 100644 --- a/core/src/test/java/me/prettyprint/cassandra/connection/SocketTimeoutTrackerTest.java +++ b/core/src/test/java/me/prettyprint/cassandra/connection/SocketTimeoutTrackerTest.java @@ -14,7 +14,7 @@ public class SocketTimeoutTrackerTest { @Test - public void testRegularLimit() { + public void testRegularLimit() throws Exception { CassandraHostConfigurator cassandraHostConfigurator = new CassandraHostConfigurator("localhost:9170"); cassandraHostConfigurator.setSocketTimeoutCounter(3); cassandraHostConfigurator.setSocketTimeoutWindow(500); @@ -26,18 +26,14 @@ public void testRegularLimit() { assertFalse(socketTimeoutTracker.penalizeTimeout(cassandraHost)); assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); - try { - Thread.currentThread().sleep(450); - } catch (InterruptedException e) { - - } + + Thread.sleep(450); + assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); - try { - Thread.currentThread().sleep(550); - } catch (InterruptedException e) { - - } + + Thread.sleep(550); + assertFalse(socketTimeoutTracker.penalizeTimeout(cassandraHost)); assertFalse(socketTimeoutTracker.penalizeTimeout(cassandraHost)); assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); @@ -45,7 +41,7 @@ public void testRegularLimit() { } @Test - public void testNegtiveTimeoutCounter() { + public void testNegtiveTimeoutCounter() throws Exception { CassandraHostConfigurator cassandraHostConfigurator = new CassandraHostConfigurator("localhost:9170"); cassandraHostConfigurator.setSocketTimeoutCounter(-1); cassandraHostConfigurator.setSocketTimeoutWindow(500); @@ -56,27 +52,20 @@ public void testNegtiveTimeoutCounter() { assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); - try { - Thread.currentThread().sleep(450); - } catch (InterruptedException e) { - - } - + Thread.sleep(450); + assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); - try { - Thread.currentThread().sleep(550); - } catch (InterruptedException e) { - - } - + + Thread.sleep(550); + assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); } @Test - public void testZeroTimeoutCounter() { + public void testZeroTimeoutCounter() throws Exception { CassandraHostConfigurator cassandraHostConfigurator = new CassandraHostConfigurator("localhost:9170"); cassandraHostConfigurator.setSocketTimeoutCounter(0); cassandraHostConfigurator.setSocketTimeoutWindow(500); @@ -87,27 +76,19 @@ public void testZeroTimeoutCounter() { assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); - try { - Thread.currentThread().sleep(450); - } catch (InterruptedException e) { - - } - + Thread.sleep(450); + assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); - - try { - Thread.currentThread().sleep(550); - } catch (InterruptedException e) { - } + Thread.sleep(550); assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); } @Test - public void testOneTimeoutCounter() { + public void testOneTimeoutCounter() throws Exception { CassandraHostConfigurator cassandraHostConfigurator = new CassandraHostConfigurator("localhost:9170"); cassandraHostConfigurator.setSocketTimeoutCounter(1); cassandraHostConfigurator.setSocketTimeoutWindow(500); @@ -118,21 +99,13 @@ public void testOneTimeoutCounter() { assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); - try { - Thread.currentThread().sleep(450); - } catch (InterruptedException e) { - - } - + Thread.sleep(450); + assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); + + Thread.sleep(550); - try { - Thread.currentThread().sleep(550); - } catch (InterruptedException e) { - - } - assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); } From 3653fd6a3576c3532f9ea7db40c725e49f0364df Mon Sep 17 00:00:00 2001 From: Sheng Cheng Date: Tue, 14 Aug 2012 12:32:36 -0700 Subject: [PATCH 3/4] Prepare release for 1.1-2-AMG. --- core/pom.xml | 2 +- object-mapper/pom.xml | 8 ++++---- pom.xml | 2 +- test/pom.xml | 2 +- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/core/pom.xml b/core/pom.xml index fee1ea896..6c30a249b 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -3,7 +3,7 @@ org.hectorclient hector - 1.1-2-SNAPSHOT + 1.1-2-AMG hector-core diff --git a/object-mapper/pom.xml b/object-mapper/pom.xml index ea96df687..d1b431936 100644 --- a/object-mapper/pom.xml +++ b/object-mapper/pom.xml @@ -4,11 +4,11 @@ org.hectorclient hector - 1.1-2-SNAPSHOT + 1.1-2-AMG hector-object-mapper hector-object-mapper - 3.1-07-SNAPSHOT + 3.1-07-AMG @@ -50,7 +50,7 @@ org.hectorclient hector-core - 1.1-2-SNAPSHOT + 1.1-2-AMG log4j @@ -61,7 +61,7 @@ org.hectorclient hector-test - 1.1-2-SNAPSHOT + 1.1-2-AMG test diff --git a/pom.xml b/pom.xml index b0243640b..7c7ea2e35 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ org.hectorclient hector pom - 1.1-2-SNAPSHOT + 1.1-2-AMG hector Cassandra Java Client Library http://github.com/hector-client/hector diff --git a/test/pom.xml b/test/pom.xml index 08f9c6278..94ce15ae0 100644 --- a/test/pom.xml +++ b/test/pom.xml @@ -3,7 +3,7 @@ org.hectorclient hector - 1.1-2-SNAPSHOT + 1.1-2-AMG hector-test jar From 0bfcf52117afec7f2c4ced67dc5a0b4cd66de788 Mon Sep 17 00:00:00 2001 From: Sheng Cheng Date: Mon, 20 Aug 2012 11:08:59 -0700 Subject: [PATCH 4/4] prepare for next development iteration --- core/pom.xml | 2 +- object-mapper/pom.xml | 8 ++++---- pom.xml | 2 +- test/pom.xml | 2 +- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/core/pom.xml b/core/pom.xml index 6c30a249b..fee1ea896 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -3,7 +3,7 @@ org.hectorclient hector - 1.1-2-AMG + 1.1-2-SNAPSHOT hector-core diff --git a/object-mapper/pom.xml b/object-mapper/pom.xml index d1b431936..ea96df687 100644 --- a/object-mapper/pom.xml +++ b/object-mapper/pom.xml @@ -4,11 +4,11 @@ org.hectorclient hector - 1.1-2-AMG + 1.1-2-SNAPSHOT hector-object-mapper hector-object-mapper - 3.1-07-AMG + 3.1-07-SNAPSHOT @@ -50,7 +50,7 @@ org.hectorclient hector-core - 1.1-2-AMG + 1.1-2-SNAPSHOT log4j @@ -61,7 +61,7 @@ org.hectorclient hector-test - 1.1-2-AMG + 1.1-2-SNAPSHOT test diff --git a/pom.xml b/pom.xml index 7c7ea2e35..b0243640b 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ org.hectorclient hector pom - 1.1-2-AMG + 1.1-2-SNAPSHOT hector Cassandra Java Client Library http://github.com/hector-client/hector diff --git a/test/pom.xml b/test/pom.xml index 94ce15ae0..08f9c6278 100644 --- a/test/pom.xml +++ b/test/pom.xml @@ -3,7 +3,7 @@ org.hectorclient hector - 1.1-2-AMG + 1.1-2-SNAPSHOT hector-test jar