From d53d17235a46a4c74612fd9d50b31a5ccbf953b0 Mon Sep 17 00:00:00 2001 From: Peter Barron Date: Wed, 11 Jul 2012 17:24:08 +0100 Subject: [PATCH 1/2] NodeAutoDiscoverService discovering listen_address instead of rpc_address Issue #490 --- .../connection/NodeAutoDiscoverService.java | 87 ++++++++++--------- 1 file changed, 48 insertions(+), 39 deletions(-) diff --git a/core/src/main/java/me/prettyprint/cassandra/connection/NodeAutoDiscoverService.java b/core/src/main/java/me/prettyprint/cassandra/connection/NodeAutoDiscoverService.java index 240cf4e65..6f9bf99a1 100644 --- a/core/src/main/java/me/prettyprint/cassandra/connection/NodeAutoDiscoverService.java +++ b/core/src/main/java/me/prettyprint/cassandra/connection/NodeAutoDiscoverService.java @@ -1,5 +1,6 @@ package me.prettyprint.cassandra.connection; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -28,7 +29,7 @@ public class NodeAutoDiscoverService extends BackgroundCassandraHostService { public NodeAutoDiscoverService(HConnectionManager connectionManager, - CassandraHostConfigurator cassandraHostConfigurator) { + CassandraHostConfigurator cassandraHostConfigurator) { super(connectionManager, cassandraHostConfigurator); this.retryDelayInSeconds = cassandraHostConfigurator.getAutoDiscoveryDelayInSeconds(); this.dataCenterValidator = new DataCenterValidator(cassandraHostConfigurator.getAutoDiscoveryDataCenters()); @@ -38,10 +39,10 @@ public NodeAutoDiscoverService(HConnectionManager connectionManager, @Override void shutdown() { log.error("Auto Discovery retry shutdown hook called"); - if ( sf != null ) { + if (sf != null) { sf.cancel(true); } - if ( executor != null ) { + if (executor != null) { executor.shutdownNow(); } log.error("AutoDiscovery retry shutdown complete"); @@ -60,13 +61,13 @@ public void run() { } } - + public void doAddNodes() { - if ( log.isDebugEnabled() ) { + if (log.isDebugEnabled()) { log.debug("Auto discovery service running..."); } Set foundHosts = discoverNodes(); - if ( foundHosts != null && foundHosts.size() > 0 ) { + if (foundHosts != null && foundHosts.size() > 0) { log.info("Found {} new host(s) in Ring", foundHosts.size()); for (CassandraHost cassandraHost : foundHosts) { log.info("Addding found host {} to pool", cassandraHost); @@ -74,60 +75,68 @@ public void doAddNodes() { connectionManager.addCassandraHost(cassandraHost); } } - if ( log.isDebugEnabled() ) { + if (log.isDebugEnabled()) { log.debug("Auto discovery service run complete."); } } public Set discoverNodes() { Set existingHosts = connectionManager.getHosts(); - Set foundHosts = new HashSet(); - - if (log.isDebugEnabled()) { + if (log.isDebugEnabled()) log.debug("using existing hosts {}", existingHosts); - } + return discoverNodesFromCluster(existingHosts); + } + private Set discoverNodesFromCluster(Set existingHosts) { try { - - String clusterName = connectionManager.getClusterName(); - - //this could be suspect, but we need this - ThriftCluster cluster = (ThriftCluster) HFactory.getCluster(clusterName); - - for(KeyspaceDefinition keyspaceDefinition: cluster.describeKeyspaces()) { - if (!keyspaceDefinition.getName().equals(Keyspace.KEYSPACE_SYSTEM)) { - List tokenRanges = cluster.describeRing(keyspaceDefinition.getName()); - for (TokenRange tokenRange : tokenRanges) { - - for (EndpointDetails endPointDetail : tokenRange.getEndpoint_details()) { - // Check if we are allowed to include this Data Center. - if (dataCenterValidator.validate(endPointDetail.getDatacenter())) { - // Maybe add this host if it's a new host. - CassandraHost foundHost = new CassandraHost(endPointDetail.getHost(), cassandraHostConfigurator.getPort()); - if ( !existingHosts.contains(foundHost) ) { - log.info("Found a node we don't know about {} for TokenRange {}", foundHost, tokenRange); - foundHosts.add(foundHost); - } - } - } - - } - break; + //this could be suspect, but we need this + ThriftCluster cluster = (ThriftCluster) HFactory.getCluster(connectionManager.getClusterName()); + for (KeyspaceDefinition keyspaceDefinition : cluster.describeKeyspaces()){ + if (!keyspaceDefinition.getName().equals(Keyspace.KEYSPACE_SYSTEM)){ + return processSystemTokenRangesForNewNodes(existingHosts, cluster.describeRing(keyspaceDefinition.getName())); } } } catch (Exception e) { log.error("Discovery Service failed attempt to connect CassandraHost", e); } + return Collections.emptySet(); + } + private Set processSystemTokenRangesForNewNodes(Set existingHosts, List tokenRanges) { + Set foundHosts = new HashSet(); + for (TokenRange tokenRange : tokenRanges) + processTokenRangeForNewNodes(existingHosts, foundHosts, tokenRange); return foundHosts; } + private void processTokenRangeForNewNodes(Set existingHosts, Set foundHosts, TokenRange tokenRange) { + List endpointDetails = tokenRange.getEndpoint_details(); + List rpcEndpoints = tokenRange.getRpc_endpoints(); + + //Sanity check. Assumes that endpointDetails and rpcEndpoints are in the same order and therefor the same length. + if (endpointDetails.size() == rpcEndpoints.size()){ + processTokenRangeForNewNodesWithinValidDCs(existingHosts, foundHosts, tokenRange, endpointDetails, rpcEndpoints); + } + } + + private void processTokenRangeForNewNodesWithinValidDCs(Set existingHosts, Set foundHosts, TokenRange tokenRange, List endpointDetails, List rpcEndpoints) { + for (int i = 0; i < endpointDetails.size(); i++) { + // Check if we are allowed to include this Data Center. + if (dataCenterValidator.validate(endpointDetails.get(i).getDatacenter())){ + CassandraHost foundHost = new CassandraHost(rpcEndpoints.get(i), cassandraHostConfigurator.getPort()); + // Maybe add this host if it's a new host. + if (!existingHosts.contains(foundHost)) { + log.info("Found a node we don't know about {} for TokenRange {}", foundHost, tokenRange); + foundHosts.add(foundHost); + } + } + } + } /** * Abstraction to validate that the discovered nodes belong to a specific datacenters. - * - * @author patricioe (Patricio Echague - patricio@datastax.com) * + * @author patricioe (Patricio Echague - patricio@datastax.com) */ class DataCenterValidator { @@ -146,6 +155,6 @@ public boolean validate(String dcName) { return dataCenters.contains(dcName); } } - + } From 134581ad656acff0bde7bc811c6f40fc46cf146c Mon Sep 17 00:00:00 2001 From: Peter Barron Date: Wed, 11 Jul 2012 17:32:38 +0100 Subject: [PATCH 2/2] NodeAutoDiscoverService discovering listen_address instead of rpc_address Issue #490 --- CHANGELOG | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG b/CHANGELOG index 7e255c2f8..f62a931b8 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -10,6 +10,7 @@ See object-mapper/CHANGELOG for HOM specifics ===== set the ClockResolution for all clusters in a static way #479 changed buildRingInfo to uses rpc_endpoint to build ringInfo instead of listen_address #486 +changed NodeAutoDiscoverService to rpc_address instead of listen_address to discover 1.1-0 =====