diff --git a/core/src/main/java/me/prettyprint/cassandra/connection/HConnectionManager.java b/core/src/main/java/me/prettyprint/cassandra/connection/HConnectionManager.java index d60a58dc3..ca5421179 100644 --- a/core/src/main/java/me/prettyprint/cassandra/connection/HConnectionManager.java +++ b/core/src/main/java/me/prettyprint/cassandra/connection/HConnectionManager.java @@ -43,6 +43,7 @@ public class HConnectionManager { private final CassandraHostConfigurator cassandraHostConfigurator; private final HClientFactory clientFactory; private HostTimeoutTracker hostTimeoutTracker; + private SocketTimeoutTracker socketTimeoutTracker; private final ClockResolution clock; final ExceptionsTranslator exceptionsTranslator; @@ -78,6 +79,11 @@ public HConnectionManager(String clusterName, CassandraHostConfigurator cassandr if ( cassandraHostConfigurator.getUseHostTimeoutTracker() ) { hostTimeoutTracker = new HostTimeoutTracker(this, cassandraHostConfigurator); } + + if ( cassandraHostConfigurator.getUseSocketTimeoutTracker() ) { + socketTimeoutTracker = new SocketTimeoutTracker(this, cassandraHostConfigurator); + } + monitor = JmxMonitor.getInstance().getCassandraMonitor(this); exceptionsTranslator = new ExceptionsTranslatorImpl(); this.cassandraHostConfigurator = cassandraHostConfigurator; @@ -270,16 +276,13 @@ public void operateWithFailover(Operation op) throws HectorException { throw he; } else if (he instanceof HectorTransportException) { closeClient(client); - markHostAsDown(pool.getCassandraHost()); - excludeHosts.add(pool.getCassandraHost()); + doSocketTimeoutCheck(pool.getCassandraHost(), excludeHosts); retryable = true; - monitor.incCounter(Counter.RECOVERABLE_TRANSPORT_EXCEPTIONS); - } else if (he instanceof HTimedOutException ) { // DO NOT drecrement retries, we will be keep retrying on timeouts until it comes back // if HLT.checkTimeout(cassandraHost): suspendHost(cassandraHost); - doTimeoutCheck(pool.getCassandraHost()); + doHostTimeoutCheck(pool.getCassandraHost()); retryable = true; @@ -362,14 +365,31 @@ public void removeAllListeners(){ * we are configured for such AND there is more than one operating host pool * @param cassandraHost */ - private void doTimeoutCheck(CassandraHost cassandraHost) { + private void doHostTimeoutCheck(final CassandraHost cassandraHost) { if ( hostTimeoutTracker != null && hostPools.size() > 1) { - if (hostTimeoutTracker.checkTimeout(cassandraHost) ) { + if (hostTimeoutTracker.penalizeTimeout(cassandraHost) ) { suspendCassandraHost(cassandraHost); } } } + /** + * If useHostTimeoutTracker is enabled, initiate a pool shutdown only if + * socketTimeoutTracker consider the Cassandra host is down. + * + * If SocketTimeoutCheck is disabled, initiate a pool shutdown immediately. + * + * @param cassandraHost + * @param excludeHosts + * + */ + private void doSocketTimeoutCheck(final CassandraHost cassandraHost, final Set excludeHosts) { + if (socketTimeoutTracker == null || socketTimeoutTracker.penalizeTimeout(cassandraHost) ) { + markHostAsDown(cassandraHost); + excludeHosts.add(cassandraHost); + } + } + /** * Sleeps for the specified time as determined by sleepBetweenHostsMilli. * In many cases failing over to other hosts is done b/c the cluster is too busy, so the sleep b/w diff --git a/core/src/main/java/me/prettyprint/cassandra/connection/HostTimeoutTracker.java b/core/src/main/java/me/prettyprint/cassandra/connection/HostTimeoutTracker.java index 7bdf0fa0e..e863a0795 100644 --- a/core/src/main/java/me/prettyprint/cassandra/connection/HostTimeoutTracker.java +++ b/core/src/main/java/me/prettyprint/cassandra/connection/HostTimeoutTracker.java @@ -27,10 +27,10 @@ public class HostTimeoutTracker extends BackgroundCassandraHostService { private static final Logger log = LoggerFactory.getLogger(HostTimeoutTracker.class); - private ConcurrentHashMap> timeouts; + private ConcurrentHashMap> hostTimeouts; private ConcurrentHashMap suspended; - private int timeoutCounter; - private int timeoutWindow; + private int hostTimeoutCounter; + private int hostTimeoutWindow; private int nodeSuspensionDurationInSeconds; public static final int DEF_TIMEOUT_COUNTER = 10; @@ -43,29 +43,40 @@ public HostTimeoutTracker(HConnectionManager connectionManager, CassandraHostConfigurator cassandraHostConfigurator) { super(connectionManager, cassandraHostConfigurator); retryDelayInSeconds = cassandraHostConfigurator.getHostTimeoutUnsuspendCheckDelay(); - timeouts = new ConcurrentHashMap>(); + hostTimeouts = new ConcurrentHashMap>(); suspended = new ConcurrentHashMap(); sf = executor.scheduleWithFixedDelay(new Unsuspender(), retryDelayInSeconds,retryDelayInSeconds, TimeUnit.SECONDS); - timeoutCounter = cassandraHostConfigurator.getHostTimeoutCounter(); - timeoutWindow = cassandraHostConfigurator.getHostTimeoutWindow(); + hostTimeoutCounter = cassandraHostConfigurator.getHostTimeoutCounter(); + hostTimeoutWindow = cassandraHostConfigurator.getHostTimeoutWindow(); nodeSuspensionDurationInSeconds = cassandraHostConfigurator.getHostTimeoutSuspensionDurationInSeconds(); } - public boolean checkTimeout(CassandraHost cassandraHost) { - timeouts.putIfAbsent(cassandraHost, new LinkedBlockingQueue()); - long currentTimeMillis = System.currentTimeMillis(); - timeouts.get(cassandraHost).add(currentTimeMillis); - boolean timeout = false; - // if there are 3 timeouts within 500ms, return false - if ( timeouts.get(cassandraHost).size() > timeoutCounter) { - Long last = timeouts.get(cassandraHost).remove(); - if (last.longValue() < (currentTimeMillis - timeoutWindow)) { - timeout = true; + public boolean penalizeTimeout(CassandraHost cassandraHost) { + final long currentTimeMillis = System.currentTimeMillis(); + if (hostTimeoutCounter <= 1 ) + { connectionManager.suspendCassandraHost(cassandraHost); - suspended.putIfAbsent(cassandraHost, currentTimeMillis); + suspended.putIfAbsent(cassandraHost, currentTimeMillis); + return true; + } + + hostTimeouts.putIfAbsent(cassandraHost, new LinkedBlockingQueue(hostTimeoutCounter - 1 )); + + if(hostTimeouts.get(cassandraHost).offer(currentTimeMillis)) { + return false; + } else { + final long oldestTimeoutMillis = hostTimeouts.get(cassandraHost).peek(); + hostTimeouts.get(cassandraHost).poll(); + hostTimeouts.get(cassandraHost).offer(currentTimeMillis); + if (currentTimeMillis - oldestTimeoutMillis < hostTimeoutWindow) { + connectionManager.suspendCassandraHost(cassandraHost); + suspended.putIfAbsent(cassandraHost, currentTimeMillis); + return true; + } else { + return false; } - } - return timeout; + } + } class Unsuspender implements Runnable { diff --git a/core/src/main/java/me/prettyprint/cassandra/connection/SocketTimeoutTracker.java b/core/src/main/java/me/prettyprint/cassandra/connection/SocketTimeoutTracker.java new file mode 100644 index 000000000..da47a5969 --- /dev/null +++ b/core/src/main/java/me/prettyprint/cassandra/connection/SocketTimeoutTracker.java @@ -0,0 +1,61 @@ +package me.prettyprint.cassandra.connection; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; + +import me.prettyprint.cassandra.service.CassandraHost; +import me.prettyprint.cassandra.service.CassandraHostConfigurator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Keep track of how often a node replies with a HectorTransportException. If we go + * past the threshold of [socketTimeoutCounter] timeouts within [socketTimeoutWindow] + * milliseconds, then penalizeTimeout method will return true. (10 timeouts within 500ms + * by default) + * + * @author Sheng Cheng + */ +public class SocketTimeoutTracker { + private static final Logger log = LoggerFactory.getLogger(SocketTimeoutTracker.class); + private final ConcurrentHashMap> socketTimeouts; + + private final int socketTimeoutCounter; + private final int socketTimeoutWindow; + + public static final int DEF_SOCKET_TIMEOUT_COUNTER = 10; + public static final int DEF_SOCKET_TIMEOUT_WINDOW = 500; + + public SocketTimeoutTracker(HConnectionManager connectionManager, + CassandraHostConfigurator cassandraHostConfigurator) { + socketTimeouts = new ConcurrentHashMap>(); + socketTimeoutCounter = cassandraHostConfigurator.getSocketTimeoutCounter(); + socketTimeoutWindow = cassandraHostConfigurator.getSocketTimeoutWindow(); + } + + public boolean penalizeTimeout(CassandraHost cassandraHost) { + if ( socketTimeoutCounter <= 1 ) + { + return true; + } + + socketTimeouts.putIfAbsent(cassandraHost, new LinkedBlockingQueue(socketTimeoutCounter - 1 )); + final long currentTimeMillis = System.currentTimeMillis(); + + if(socketTimeouts.get(cassandraHost).offer(currentTimeMillis)) { + return false; + } + else { + final long oldestTimeoutMillis = socketTimeouts.get(cassandraHost).peek(); + socketTimeouts.get(cassandraHost).poll(); + socketTimeouts.get(cassandraHost).offer(currentTimeMillis); + if (currentTimeMillis - oldestTimeoutMillis < socketTimeoutWindow) { + return true; + } else { + return false; + } + } + } + +} diff --git a/core/src/main/java/me/prettyprint/cassandra/service/CassandraHostConfigurator.java b/core/src/main/java/me/prettyprint/cassandra/service/CassandraHostConfigurator.java index d8679cec5..1520a840f 100644 --- a/core/src/main/java/me/prettyprint/cassandra/service/CassandraHostConfigurator.java +++ b/core/src/main/java/me/prettyprint/cassandra/service/CassandraHostConfigurator.java @@ -11,6 +11,7 @@ import me.prettyprint.cassandra.connection.NodeAutoDiscoverService; import me.prettyprint.cassandra.connection.NullOpTimer; import me.prettyprint.cassandra.connection.RoundRobinBalancingPolicy; +import me.prettyprint.cassandra.connection.SocketTimeoutTracker; import me.prettyprint.hector.api.ClockResolution; import me.prettyprint.hector.api.factory.HFactory; @@ -41,9 +42,12 @@ public final class CassandraHostConfigurator implements Serializable { private LoadBalancingPolicy loadBalancingPolicy = new RoundRobinBalancingPolicy(); private int hostTimeoutCounter = HostTimeoutTracker.DEF_TIMEOUT_COUNTER; private int hostTimeoutWindow = HostTimeoutTracker.DEF_TIMEOUT_WINDOW; + private int socketTimeoutCounter = SocketTimeoutTracker.DEF_SOCKET_TIMEOUT_COUNTER; + private int socketTimeoutWindow = SocketTimeoutTracker.DEF_SOCKET_TIMEOUT_WINDOW; private int hostTimeoutSuspensionDurationInSeconds = HostTimeoutTracker.DEF_NODE_SUSPENSION_DURATION_IN_SECONDS; private int hostTimeoutUnsuspendCheckDelay = HostTimeoutTracker.DEF_NODE_UNSUSPEND_CHECK_DELAY_IN_SECONDS; private boolean useHostTimeoutTracker = false; + private boolean useSocketTimeoutTracker = false; private boolean runAutoDiscoveryAtStartup = false; private boolean useSocketKeepalive = false; private HOpTimer opTimer = new NullOpTimer(); @@ -272,6 +276,22 @@ public void setHostTimeoutWindow(int hostTimeoutWindow) { this.hostTimeoutWindow = hostTimeoutWindow; } + public int getSocketTimeoutCounter() { + return socketTimeoutCounter; + } + + public void setSocketTimeoutCounter(int socketTimeoutCounter) { + this.socketTimeoutCounter = socketTimeoutCounter; + } + + public int getSocketTimeoutWindow() { + return socketTimeoutWindow; + } + + public void setSocketTimeoutWindow(int socketTimeoutWindow) { + this.socketTimeoutWindow = socketTimeoutWindow; + } + public int getHostTimeoutSuspensionDurationInSeconds() { return hostTimeoutSuspensionDurationInSeconds; } @@ -296,6 +316,14 @@ public void setUseHostTimeoutTracker(boolean useHostTimeoutTracker) { this.useHostTimeoutTracker = useHostTimeoutTracker; } + public boolean getUseSocketTimeoutTracker() { + return useSocketTimeoutTracker; + } + + public void setUseSocketTimeoutTracker(boolean useSocketTimeoutTracker) { + this.useSocketTimeoutTracker = useSocketTimeoutTracker; + } + public boolean getRunAutoDiscoveryAtStartup() { return runAutoDiscoveryAtStartup; } diff --git a/core/src/test/java/me/prettyprint/cassandra/connection/HostTimeoutTrackerTest.java b/core/src/test/java/me/prettyprint/cassandra/connection/HostTimeoutTrackerTest.java index 833a34ed5..310526fa7 100644 --- a/core/src/test/java/me/prettyprint/cassandra/connection/HostTimeoutTrackerTest.java +++ b/core/src/test/java/me/prettyprint/cassandra/connection/HostTimeoutTrackerTest.java @@ -10,38 +10,100 @@ public class HostTimeoutTrackerTest { - private HostTimeoutTracker hostTimeoutTracker; - @Before - public void setup() { - // map cassandraHost --> arrayBlockingQueue of timestamp Longs - // - if abq.size > size, && peekLast.get > time, add host to excludedMap for ms - // - background thread to sweep for exclusion time expiration every seconds - // - getExcludedHosts calls excludedMap.keySet - // - // HTL with a three timeout trigger durring 500ms intervals - CassandraHostConfigurator cassandraHostConfigurator = new CassandraHostConfigurator("localhost:9170"); - cassandraHostConfigurator.setHostTimeoutCounter(3); - HConnectionManager connectionManager = new HConnectionManager("TestCluster", cassandraHostConfigurator); - hostTimeoutTracker = new HostTimeoutTracker(connectionManager, cassandraHostConfigurator); - } + @Test + public void testRegularLimit() throws Exception { + CassandraHostConfigurator cassandraHostConfigurator = new CassandraHostConfigurator("localhost:9170"); + cassandraHostConfigurator.setHostTimeoutCounter(3); + cassandraHostConfigurator.setHostTimeoutWindow(500); + HConnectionManager connectionManager = new HConnectionManager("TestCluster", cassandraHostConfigurator); + HostTimeoutTracker hostTimeoutTracker = new HostTimeoutTracker(connectionManager, cassandraHostConfigurator); + CassandraHost cassandraHost = new CassandraHost("localhost:9170"); + + assertFalse(hostTimeoutTracker.penalizeTimeout(cassandraHost)); + assertFalse(hostTimeoutTracker.penalizeTimeout(cassandraHost)); + assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); + assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); + + Thread.sleep(450); + + assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); + assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); + + Thread.sleep(550); + + assertFalse(hostTimeoutTracker.penalizeTimeout(cassandraHost)); + assertFalse(hostTimeoutTracker.penalizeTimeout(cassandraHost)); + assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); + assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); + } + + @Test + public void testNegtiveTimeoutCounter() throws Exception { + CassandraHostConfigurator cassandraHostConfigurator = new CassandraHostConfigurator("localhost:9170"); + cassandraHostConfigurator.setHostTimeoutCounter(-1); + cassandraHostConfigurator.setHostTimeoutWindow(500); + HConnectionManager connectionManager = new HConnectionManager("TestCluster", cassandraHostConfigurator); + HostTimeoutTracker hostTimeoutTracker = new HostTimeoutTracker(connectionManager, cassandraHostConfigurator); + CassandraHost cassandraHost = new CassandraHost("localhost:9170"); + + assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); + assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); + + Thread.sleep(450); + + assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); + assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); + + Thread.sleep(550); + + assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); + assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); + } + + @Test + public void testZeroTimeoutCounter() throws Exception { + CassandraHostConfigurator cassandraHostConfigurator = new CassandraHostConfigurator("localhost:9170"); + cassandraHostConfigurator.setHostTimeoutCounter(0); + cassandraHostConfigurator.setHostTimeoutWindow(500); + HConnectionManager connectionManager = new HConnectionManager("TestCluster", cassandraHostConfigurator); + HostTimeoutTracker hostTimeoutTracker = new HostTimeoutTracker(connectionManager, cassandraHostConfigurator); + CassandraHost cassandraHost = new CassandraHost("localhost:9170"); + + assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); + assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); + + Thread.sleep(450); + + assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); + assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); + + Thread.sleep(550); + + assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); + assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); + } - @Test - public void testTrackHostLatency() { - CassandraHost cassandraHost = new CassandraHost("localhost:9170"); - assertFalse(hostTimeoutTracker.checkTimeout(cassandraHost)); - assertFalse(hostTimeoutTracker.checkTimeout(cassandraHost)); - assertFalse(hostTimeoutTracker.checkTimeout(cassandraHost)); - try { - Thread.currentThread().sleep(501); - } catch (InterruptedException e) { + @Test + public void testOneTimeoutCounter() throws Exception { + CassandraHostConfigurator cassandraHostConfigurator = new CassandraHostConfigurator("localhost:9170"); + cassandraHostConfigurator.setHostTimeoutCounter(1); + cassandraHostConfigurator.setHostTimeoutWindow(500); + HConnectionManager connectionManager = new HConnectionManager("TestCluster", cassandraHostConfigurator); + HostTimeoutTracker hostTimeoutTracker = new HostTimeoutTracker(connectionManager, cassandraHostConfigurator); + CassandraHost cassandraHost = new CassandraHost("localhost:9170"); + + assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); + assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); + Thread.sleep(450); + + assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); + assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); + + Thread.sleep(550); + + assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); + assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost)); } - assertTrue(hostTimeoutTracker.checkTimeout(cassandraHost)); - // ... - // in HConnectionManager: - // - if ( hostLatencyTracker.checkTimeout(cassandraHost) ) - // markHostAsDown(cassandraHost); - // excludeHosts.add(cassandraHost); - } } diff --git a/core/src/test/java/me/prettyprint/cassandra/connection/SocketTimeoutTrackerTest.java b/core/src/test/java/me/prettyprint/cassandra/connection/SocketTimeoutTrackerTest.java new file mode 100644 index 000000000..ad5c83a93 --- /dev/null +++ b/core/src/test/java/me/prettyprint/cassandra/connection/SocketTimeoutTrackerTest.java @@ -0,0 +1,113 @@ +package me.prettyprint.cassandra.connection; + +import static org.junit.Assert.*; + +import org.junit.Before; +import org.junit.Test; + +import me.prettyprint.cassandra.service.CassandraHost; +import me.prettyprint.cassandra.service.CassandraHostConfigurator; + +/** + * @author Sheng Cheng + */ +public class SocketTimeoutTrackerTest { + + @Test + public void testRegularLimit() throws Exception { + CassandraHostConfigurator cassandraHostConfigurator = new CassandraHostConfigurator("localhost:9170"); + cassandraHostConfigurator.setSocketTimeoutCounter(3); + cassandraHostConfigurator.setSocketTimeoutWindow(500); + HConnectionManager connectionManager = new HConnectionManager("TestCluster", cassandraHostConfigurator); + SocketTimeoutTracker socketTimeoutTracker = new SocketTimeoutTracker(connectionManager, cassandraHostConfigurator); + CassandraHost cassandraHost = new CassandraHost("localhost:9170"); + + assertFalse(socketTimeoutTracker.penalizeTimeout(cassandraHost)); + assertFalse(socketTimeoutTracker.penalizeTimeout(cassandraHost)); + assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); + assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); + + Thread.sleep(450); + + assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); + assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); + + Thread.sleep(550); + + assertFalse(socketTimeoutTracker.penalizeTimeout(cassandraHost)); + assertFalse(socketTimeoutTracker.penalizeTimeout(cassandraHost)); + assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); + assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); + } + + @Test + public void testNegtiveTimeoutCounter() throws Exception { + CassandraHostConfigurator cassandraHostConfigurator = new CassandraHostConfigurator("localhost:9170"); + cassandraHostConfigurator.setSocketTimeoutCounter(-1); + cassandraHostConfigurator.setSocketTimeoutWindow(500); + HConnectionManager connectionManager = new HConnectionManager("TestCluster", cassandraHostConfigurator); + SocketTimeoutTracker socketTimeoutTracker = new SocketTimeoutTracker(connectionManager, cassandraHostConfigurator); + CassandraHost cassandraHost = new CassandraHost("localhost:9170"); + + assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); + assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); + + Thread.sleep(450); + + assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); + assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); + + + Thread.sleep(550); + + assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); + assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); + } + + @Test + public void testZeroTimeoutCounter() throws Exception { + CassandraHostConfigurator cassandraHostConfigurator = new CassandraHostConfigurator("localhost:9170"); + cassandraHostConfigurator.setSocketTimeoutCounter(0); + cassandraHostConfigurator.setSocketTimeoutWindow(500); + HConnectionManager connectionManager = new HConnectionManager("TestCluster", cassandraHostConfigurator); + SocketTimeoutTracker socketTimeoutTracker = new SocketTimeoutTracker(connectionManager, cassandraHostConfigurator); + CassandraHost cassandraHost = new CassandraHost("localhost:9170"); + + assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); + assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); + + Thread.sleep(450); + + assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); + assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); + + Thread.sleep(550); + + assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); + assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); + } + + @Test + public void testOneTimeoutCounter() throws Exception { + CassandraHostConfigurator cassandraHostConfigurator = new CassandraHostConfigurator("localhost:9170"); + cassandraHostConfigurator.setSocketTimeoutCounter(1); + cassandraHostConfigurator.setSocketTimeoutWindow(500); + HConnectionManager connectionManager = new HConnectionManager("TestCluster", cassandraHostConfigurator); + SocketTimeoutTracker socketTimeoutTracker = new SocketTimeoutTracker(connectionManager, cassandraHostConfigurator); + CassandraHost cassandraHost = new CassandraHost("localhost:9170"); + + assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); + assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); + + Thread.sleep(450); + + assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); + assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); + + Thread.sleep(550); + + assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); + assertTrue(socketTimeoutTracker.penalizeTimeout(cassandraHost)); + } + +}