Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New feature for socket timeout and the fix for issue #494 #504

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class HConnectionManager {
private final CassandraHostConfigurator cassandraHostConfigurator;
private final HClientFactory clientFactory;
private HostTimeoutTracker hostTimeoutTracker;
private SocketTimeoutTracker socketTimeoutTracker;
private final ClockResolution clock;

final ExceptionsTranslator exceptionsTranslator;
Expand Down Expand Up @@ -78,6 +79,11 @@ public HConnectionManager(String clusterName, CassandraHostConfigurator cassandr
if ( cassandraHostConfigurator.getUseHostTimeoutTracker() ) {
hostTimeoutTracker = new HostTimeoutTracker(this, cassandraHostConfigurator);
}

if ( cassandraHostConfigurator.getUseSocketTimeoutTracker() ) {
socketTimeoutTracker = new SocketTimeoutTracker(this, cassandraHostConfigurator);
}

monitor = JmxMonitor.getInstance().getCassandraMonitor(this);
exceptionsTranslator = new ExceptionsTranslatorImpl();
this.cassandraHostConfigurator = cassandraHostConfigurator;
Expand Down Expand Up @@ -270,16 +276,13 @@ public void operateWithFailover(Operation<?> op) throws HectorException {
throw he;
} else if (he instanceof HectorTransportException) {
closeClient(client);
markHostAsDown(pool.getCassandraHost());
excludeHosts.add(pool.getCassandraHost());
doSocketTimeoutCheck(pool.getCassandraHost(), excludeHosts);
retryable = true;

monitor.incCounter(Counter.RECOVERABLE_TRANSPORT_EXCEPTIONS);

} else if (he instanceof HTimedOutException ) {
// DO NOT drecrement retries, we will be keep retrying on timeouts until it comes back
// if HLT.checkTimeout(cassandraHost): suspendHost(cassandraHost);
doTimeoutCheck(pool.getCassandraHost());
doHostTimeoutCheck(pool.getCassandraHost());

retryable = true;

Expand Down Expand Up @@ -362,14 +365,31 @@ public void removeAllListeners(){
* we are configured for such AND there is more than one operating host pool
* @param cassandraHost
*/
private void doTimeoutCheck(CassandraHost cassandraHost) {
private void doHostTimeoutCheck(final CassandraHost cassandraHost) {
if ( hostTimeoutTracker != null && hostPools.size() > 1) {
if (hostTimeoutTracker.checkTimeout(cassandraHost) ) {
if (hostTimeoutTracker.penalizeTimeout(cassandraHost) ) {
suspendCassandraHost(cassandraHost);
}
}
}

/**
* If useHostTimeoutTracker is enabled, initiate a pool shutdown only if
* socketTimeoutTracker consider the Cassandra host is down.
*
* If SocketTimeoutCheck is disabled, initiate a pool shutdown immediately.
*
* @param cassandraHost
* @param excludeHosts
*
*/
private void doSocketTimeoutCheck(final CassandraHost cassandraHost, final Set<CassandraHost> excludeHosts) {
if (socketTimeoutTracker == null || socketTimeoutTracker.penalizeTimeout(cassandraHost) ) {
markHostAsDown(cassandraHost);
excludeHosts.add(cassandraHost);
}
}

/**
* Sleeps for the specified time as determined by sleepBetweenHostsMilli.
* In many cases failing over to other hosts is done b/c the cluster is too busy, so the sleep b/w
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@
public class HostTimeoutTracker extends BackgroundCassandraHostService {
private static final Logger log = LoggerFactory.getLogger(HostTimeoutTracker.class);

private ConcurrentHashMap<CassandraHost, LinkedBlockingQueue<Long>> timeouts;
private ConcurrentHashMap<CassandraHost, LinkedBlockingQueue<Long>> hostTimeouts;
private ConcurrentHashMap<CassandraHost, Long> suspended;
private int timeoutCounter;
private int timeoutWindow;
private int hostTimeoutCounter;
private int hostTimeoutWindow;
private int nodeSuspensionDurationInSeconds;

public static final int DEF_TIMEOUT_COUNTER = 10;
Expand All @@ -43,29 +43,40 @@ public HostTimeoutTracker(HConnectionManager connectionManager,
CassandraHostConfigurator cassandraHostConfigurator) {
super(connectionManager, cassandraHostConfigurator);
retryDelayInSeconds = cassandraHostConfigurator.getHostTimeoutUnsuspendCheckDelay();
timeouts = new ConcurrentHashMap<CassandraHost, LinkedBlockingQueue<Long>>();
hostTimeouts = new ConcurrentHashMap<CassandraHost, LinkedBlockingQueue<Long>>();
suspended = new ConcurrentHashMap<CassandraHost, Long>();
sf = executor.scheduleWithFixedDelay(new Unsuspender(), retryDelayInSeconds,retryDelayInSeconds, TimeUnit.SECONDS);
timeoutCounter = cassandraHostConfigurator.getHostTimeoutCounter();
timeoutWindow = cassandraHostConfigurator.getHostTimeoutWindow();
hostTimeoutCounter = cassandraHostConfigurator.getHostTimeoutCounter();
hostTimeoutWindow = cassandraHostConfigurator.getHostTimeoutWindow();
nodeSuspensionDurationInSeconds = cassandraHostConfigurator.getHostTimeoutSuspensionDurationInSeconds();
}

public boolean checkTimeout(CassandraHost cassandraHost) {
timeouts.putIfAbsent(cassandraHost, new LinkedBlockingQueue<Long>());
long currentTimeMillis = System.currentTimeMillis();
timeouts.get(cassandraHost).add(currentTimeMillis);
boolean timeout = false;
// if there are 3 timeouts within 500ms, return false
if ( timeouts.get(cassandraHost).size() > timeoutCounter) {
Long last = timeouts.get(cassandraHost).remove();
if (last.longValue() < (currentTimeMillis - timeoutWindow)) {
timeout = true;
public boolean penalizeTimeout(CassandraHost cassandraHost) {
final long currentTimeMillis = System.currentTimeMillis();
if (hostTimeoutCounter <= 1 )
{
connectionManager.suspendCassandraHost(cassandraHost);
suspended.putIfAbsent(cassandraHost, currentTimeMillis);
suspended.putIfAbsent(cassandraHost, currentTimeMillis);
return true;
}

hostTimeouts.putIfAbsent(cassandraHost, new LinkedBlockingQueue<Long>(hostTimeoutCounter - 1 ));

if(hostTimeouts.get(cassandraHost).offer(currentTimeMillis)) {
return false;
} else {
final long oldestTimeoutMillis = hostTimeouts.get(cassandraHost).peek();
hostTimeouts.get(cassandraHost).poll();
hostTimeouts.get(cassandraHost).offer(currentTimeMillis);
if (currentTimeMillis - oldestTimeoutMillis < hostTimeoutWindow) {
connectionManager.suspendCassandraHost(cassandraHost);
suspended.putIfAbsent(cassandraHost, currentTimeMillis);
return true;
} else {
return false;
}
}
return timeout;
}

}

class Unsuspender implements Runnable {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package me.prettyprint.cassandra.connection;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;

import me.prettyprint.cassandra.service.CassandraHost;
import me.prettyprint.cassandra.service.CassandraHostConfigurator;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Keep track of how often a node replies with a HectorTransportException. If we go
* past the threshold of [socketTimeoutCounter] timeouts within [socketTimeoutWindow]
* milliseconds, then penalizeTimeout method will return true. (10 timeouts within 500ms
* by default)
*
* @author <a href="mailto:[email protected]">Sheng Cheng</a>
*/
public class SocketTimeoutTracker {
private static final Logger log = LoggerFactory.getLogger(SocketTimeoutTracker.class);
private final ConcurrentHashMap<CassandraHost, LinkedBlockingQueue<Long>> socketTimeouts;

private final int socketTimeoutCounter;
private final int socketTimeoutWindow;

public static final int DEF_SOCKET_TIMEOUT_COUNTER = 10;
public static final int DEF_SOCKET_TIMEOUT_WINDOW = 500;

public SocketTimeoutTracker(HConnectionManager connectionManager,
CassandraHostConfigurator cassandraHostConfigurator) {
socketTimeouts = new ConcurrentHashMap<CassandraHost, LinkedBlockingQueue<Long>>();
socketTimeoutCounter = cassandraHostConfigurator.getSocketTimeoutCounter();
socketTimeoutWindow = cassandraHostConfigurator.getSocketTimeoutWindow();
}

public boolean penalizeTimeout(CassandraHost cassandraHost) {
if ( socketTimeoutCounter <= 1 )
{
return true;
}

socketTimeouts.putIfAbsent(cassandraHost, new LinkedBlockingQueue<Long>(socketTimeoutCounter - 1 ));
final long currentTimeMillis = System.currentTimeMillis();

if(socketTimeouts.get(cassandraHost).offer(currentTimeMillis)) {
return false;
}
else {
final long oldestTimeoutMillis = socketTimeouts.get(cassandraHost).peek();
socketTimeouts.get(cassandraHost).poll();
socketTimeouts.get(cassandraHost).offer(currentTimeMillis);
if (currentTimeMillis - oldestTimeoutMillis < socketTimeoutWindow) {
return true;
} else {
return false;
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import me.prettyprint.cassandra.connection.NodeAutoDiscoverService;
import me.prettyprint.cassandra.connection.NullOpTimer;
import me.prettyprint.cassandra.connection.RoundRobinBalancingPolicy;
import me.prettyprint.cassandra.connection.SocketTimeoutTracker;
import me.prettyprint.hector.api.ClockResolution;
import me.prettyprint.hector.api.factory.HFactory;

Expand Down Expand Up @@ -41,9 +42,12 @@ public final class CassandraHostConfigurator implements Serializable {
private LoadBalancingPolicy loadBalancingPolicy = new RoundRobinBalancingPolicy();
private int hostTimeoutCounter = HostTimeoutTracker.DEF_TIMEOUT_COUNTER;
private int hostTimeoutWindow = HostTimeoutTracker.DEF_TIMEOUT_WINDOW;
private int socketTimeoutCounter = SocketTimeoutTracker.DEF_SOCKET_TIMEOUT_COUNTER;
private int socketTimeoutWindow = SocketTimeoutTracker.DEF_SOCKET_TIMEOUT_WINDOW;
private int hostTimeoutSuspensionDurationInSeconds = HostTimeoutTracker.DEF_NODE_SUSPENSION_DURATION_IN_SECONDS;
private int hostTimeoutUnsuspendCheckDelay = HostTimeoutTracker.DEF_NODE_UNSUSPEND_CHECK_DELAY_IN_SECONDS;
private boolean useHostTimeoutTracker = false;
private boolean useSocketTimeoutTracker = false;
private boolean runAutoDiscoveryAtStartup = false;
private boolean useSocketKeepalive = false;
private HOpTimer opTimer = new NullOpTimer();
Expand Down Expand Up @@ -272,6 +276,22 @@ public void setHostTimeoutWindow(int hostTimeoutWindow) {
this.hostTimeoutWindow = hostTimeoutWindow;
}

public int getSocketTimeoutCounter() {
return socketTimeoutCounter;
}

public void setSocketTimeoutCounter(int socketTimeoutCounter) {
this.socketTimeoutCounter = socketTimeoutCounter;
}

public int getSocketTimeoutWindow() {
return socketTimeoutWindow;
}

public void setSocketTimeoutWindow(int socketTimeoutWindow) {
this.socketTimeoutWindow = socketTimeoutWindow;
}

public int getHostTimeoutSuspensionDurationInSeconds() {
return hostTimeoutSuspensionDurationInSeconds;
}
Expand All @@ -296,6 +316,14 @@ public void setUseHostTimeoutTracker(boolean useHostTimeoutTracker) {
this.useHostTimeoutTracker = useHostTimeoutTracker;
}

public boolean getUseSocketTimeoutTracker() {
return useSocketTimeoutTracker;
}

public void setUseSocketTimeoutTracker(boolean useSocketTimeoutTracker) {
this.useSocketTimeoutTracker = useSocketTimeoutTracker;
}

public boolean getRunAutoDiscoveryAtStartup() {
return runAutoDiscoveryAtStartup;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,38 +10,100 @@

public class HostTimeoutTrackerTest {

private HostTimeoutTracker hostTimeoutTracker;
@Before
public void setup() {
// map cassandraHost --> arrayBlockingQueue of timestamp Longs
// - if abq.size > size, && peekLast.get > time, add host to excludedMap<cassandraHost,timestampOfExclusion> for <suspendTime> ms
// - background thread to sweep for exclusion time expiration every <sweepInterval> seconds
// - getExcludedHosts calls excludedMap.keySet
//
// HTL with a three timeout trigger durring 500ms intervals
CassandraHostConfigurator cassandraHostConfigurator = new CassandraHostConfigurator("localhost:9170");
cassandraHostConfigurator.setHostTimeoutCounter(3);
HConnectionManager connectionManager = new HConnectionManager("TestCluster", cassandraHostConfigurator);
hostTimeoutTracker = new HostTimeoutTracker(connectionManager, cassandraHostConfigurator);
}
@Test
public void testRegularLimit() throws Exception {
CassandraHostConfigurator cassandraHostConfigurator = new CassandraHostConfigurator("localhost:9170");
cassandraHostConfigurator.setHostTimeoutCounter(3);
cassandraHostConfigurator.setHostTimeoutWindow(500);
HConnectionManager connectionManager = new HConnectionManager("TestCluster", cassandraHostConfigurator);
HostTimeoutTracker hostTimeoutTracker = new HostTimeoutTracker(connectionManager, cassandraHostConfigurator);
CassandraHost cassandraHost = new CassandraHost("localhost:9170");

assertFalse(hostTimeoutTracker.penalizeTimeout(cassandraHost));
assertFalse(hostTimeoutTracker.penalizeTimeout(cassandraHost));
assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost));
assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost));

Thread.sleep(450);

assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost));
assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost));

Thread.sleep(550);

assertFalse(hostTimeoutTracker.penalizeTimeout(cassandraHost));
assertFalse(hostTimeoutTracker.penalizeTimeout(cassandraHost));
assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost));
assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost));
}

@Test
public void testNegtiveTimeoutCounter() throws Exception {
CassandraHostConfigurator cassandraHostConfigurator = new CassandraHostConfigurator("localhost:9170");
cassandraHostConfigurator.setHostTimeoutCounter(-1);
cassandraHostConfigurator.setHostTimeoutWindow(500);
HConnectionManager connectionManager = new HConnectionManager("TestCluster", cassandraHostConfigurator);
HostTimeoutTracker hostTimeoutTracker = new HostTimeoutTracker(connectionManager, cassandraHostConfigurator);
CassandraHost cassandraHost = new CassandraHost("localhost:9170");

assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost));
assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost));

Thread.sleep(450);

assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost));
assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost));

Thread.sleep(550);

assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost));
assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost));
}

@Test
public void testZeroTimeoutCounter() throws Exception {
CassandraHostConfigurator cassandraHostConfigurator = new CassandraHostConfigurator("localhost:9170");
cassandraHostConfigurator.setHostTimeoutCounter(0);
cassandraHostConfigurator.setHostTimeoutWindow(500);
HConnectionManager connectionManager = new HConnectionManager("TestCluster", cassandraHostConfigurator);
HostTimeoutTracker hostTimeoutTracker = new HostTimeoutTracker(connectionManager, cassandraHostConfigurator);
CassandraHost cassandraHost = new CassandraHost("localhost:9170");

assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost));
assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost));

Thread.sleep(450);

assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost));
assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost));

Thread.sleep(550);

assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost));
assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost));
}

@Test
public void testTrackHostLatency() {
CassandraHost cassandraHost = new CassandraHost("localhost:9170");
assertFalse(hostTimeoutTracker.checkTimeout(cassandraHost));
assertFalse(hostTimeoutTracker.checkTimeout(cassandraHost));
assertFalse(hostTimeoutTracker.checkTimeout(cassandraHost));
try {
Thread.currentThread().sleep(501);
} catch (InterruptedException e) {
@Test
public void testOneTimeoutCounter() throws Exception {
CassandraHostConfigurator cassandraHostConfigurator = new CassandraHostConfigurator("localhost:9170");
cassandraHostConfigurator.setHostTimeoutCounter(1);
cassandraHostConfigurator.setHostTimeoutWindow(500);
HConnectionManager connectionManager = new HConnectionManager("TestCluster", cassandraHostConfigurator);
HostTimeoutTracker hostTimeoutTracker = new HostTimeoutTracker(connectionManager, cassandraHostConfigurator);
CassandraHost cassandraHost = new CassandraHost("localhost:9170");

assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost));
assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost));

Thread.sleep(450);

assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost));
assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost));

Thread.sleep(550);

assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost));
assertTrue(hostTimeoutTracker.penalizeTimeout(cassandraHost));
}
assertTrue(hostTimeoutTracker.checkTimeout(cassandraHost));
// ...
// in HConnectionManager:
// - if ( hostLatencyTracker.checkTimeout(cassandraHost) )
// markHostAsDown(cassandraHost);
// excludeHosts.add(cassandraHost);

}
}
Loading