diff --git a/pom.xml b/pom.xml
index 5d08e1cd..7d1815d7 100755
--- a/pom.xml
+++ b/pom.xml
@@ -13,6 +13,17 @@
4.8.1
test
+
+ commons-validator
+ commons-validator
+ 1.6
+
+
+ org.mockito
+ mockito-core
+ 2.24.5
+ test
+
javax.servlet
javax.servlet-api
diff --git a/scripts/format-all.sh b/scripts/format-all.sh
index 1c3153fc..6bea994b 100755
--- a/scripts/format-all.sh
+++ b/scripts/format-all.sh
@@ -15,6 +15,7 @@ SRC_FILES=(src/main/java/com/xiaomi/infra/pegasus/client/*.java
src/test/java/com/xiaomi/infra/pegasus/metrics/*.java
src/test/java/com/xiaomi/infra/pegasus/rpc/async/*.java
src/test/java/com/xiaomi/infra/pegasus/tools/*.java
+ src/test/java/com/xiaomi/infra/pegasus/base/*.java
)
if [ ! -f "${PROJECT_DIR}"/google-java-format-1.7-all-deps.jar ]; then
diff --git a/src/main/java/com/xiaomi/infra/pegasus/base/rpc_address.java b/src/main/java/com/xiaomi/infra/pegasus/base/rpc_address.java
index fa5e3e3f..eeb6dfa2 100644
--- a/src/main/java/com/xiaomi/infra/pegasus/base/rpc_address.java
+++ b/src/main/java/com/xiaomi/infra/pegasus/base/rpc_address.java
@@ -8,20 +8,19 @@
*/
package com.xiaomi.infra.pegasus.base;
-import com.xiaomi.infra.pegasus.thrift.*;
-import com.xiaomi.infra.pegasus.thrift.async.*;
-import com.xiaomi.infra.pegasus.thrift.meta_data.*;
-import com.xiaomi.infra.pegasus.thrift.protocol.*;
-import com.xiaomi.infra.pegasus.thrift.transport.*;
+import com.xiaomi.infra.pegasus.thrift.TBase;
+import com.xiaomi.infra.pegasus.thrift.TException;
+import com.xiaomi.infra.pegasus.thrift.TFieldIdEnum;
+import com.xiaomi.infra.pegasus.thrift.meta_data.FieldMetaData;
+import com.xiaomi.infra.pegasus.thrift.protocol.TProtocol;
+import com.xiaomi.infra.pegasus.thrift.protocol.TStruct;
import java.net.InetAddress;
import java.net.UnknownHostException;
-import java.util.Collections;
-import java.util.EnumMap;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.Map;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.*;
-public class rpc_address
+public final class rpc_address
implements TBase, java.io.Serializable, Cloneable {
private static final TStruct STRUCT_DESC = new TStruct("rpc_address");
@@ -121,12 +120,9 @@ public boolean fromString(String ipPort) {
}
try {
+ // TODO(wutao1): getByName will query DNS if the given address is not valid ip:port.
byte[] byteArray = InetAddress.getByName(pairs[0]).getAddress();
- ip =
- ((int) (byteArray[0] & 0xff) << 24)
- | ((int) (byteArray[1] & 0xff) << 16)
- | ((int) (byteArray[2] & 0xff) << 8)
- | ((int) (byteArray[3] & 0xff));
+ ip = ByteBuffer.wrap(byteArray).order(ByteOrder.BIG_ENDIAN).getInt();
} catch (UnknownHostException e) {
return false;
}
@@ -135,6 +131,12 @@ public boolean fromString(String ipPort) {
address = ((long) ip << 32) + ((long) port << 16) + 1;
return true;
}
+
+ public static rpc_address fromIpPort(String ipPort) {
+ rpc_address addr = new rpc_address();
+ return addr.fromString(ipPort) ? addr : null;
+ }
+
/** Performs a deep copy on other. */
public rpc_address(rpc_address other) {
this.address = other.address;
@@ -192,10 +194,8 @@ public int hashCode() {
}
public int compareTo(rpc_address other) {
- if (!getClass().equals(other.getClass())) {
- return getClass().getName().compareTo(other.getClass().getName());
- }
-
+ if (address < other.address) return -1;
+ if (address > other.address) return 1;
return 0;
}
diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/HostNameResolver.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/HostNameResolver.java
new file mode 100644
index 00000000..3e8ce2f3
--- /dev/null
+++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/HostNameResolver.java
@@ -0,0 +1,40 @@
+// Copyright (c) 2019, Xiaomi, Inc. All rights reserved.
+// This source code is licensed under the Apache License Version 2.0, which
+// can be found in the LICENSE file in the root directory of this source tree.
+package com.xiaomi.infra.pegasus.rpc.async;
+
+import com.xiaomi.infra.pegasus.base.rpc_address;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+/*
+ * Resolves host:port into a set of ip addresses.
+ * The intention of this class is to mock DNS.
+ */
+public class HostNameResolver {
+
+ public rpc_address[] resolve(String hostPort) throws IllegalArgumentException {
+ String[] pairs = hostPort.split(":");
+ if (pairs.length != 2) {
+ throw new IllegalArgumentException("Meta server host name format error!");
+ }
+
+ try {
+ Integer port = Integer.valueOf(pairs[1]);
+ InetAddress[] resolvedAddresses = InetAddress.getAllByName(pairs[0]);
+ rpc_address[] results = new rpc_address[resolvedAddresses.length];
+ int size = 0;
+ for (InetAddress addr : resolvedAddresses) {
+ rpc_address rpcAddr = new rpc_address();
+ int ip = ByteBuffer.wrap(addr.getAddress()).order(ByteOrder.BIG_ENDIAN).getInt();
+ rpcAddr.address = ((long) ip << 32) + ((long) port << 16) + 1;
+ results[size++] = rpcAddr;
+ }
+ return results;
+ } catch (UnknownHostException e) {
+ return null;
+ }
+ }
+}
diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/MetaSession.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/MetaSession.java
index 992798e8..86c3d931 100644
--- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/MetaSession.java
+++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/MetaSession.java
@@ -9,31 +9,40 @@
import com.xiaomi.infra.pegasus.operator.query_cfg_operator;
import com.xiaomi.infra.pegasus.replication.partition_configuration;
import io.netty.channel.EventLoopGroup;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.validator.routines.InetAddressValidator;
-/** Created by weijiesun on 17-9-13. */
-public class MetaSession {
+public class MetaSession extends HostNameResolver {
public MetaSession(
ClusterManager manager,
- String addrList[],
+ String[] addrList,
int eachQueryTimeoutInMills,
int defaultMaxQueryCount,
EventLoopGroup g)
throws IllegalArgumentException {
clusterManager = manager;
metaList = new ArrayList();
- for (String addr : addrList) {
- rpc_address rpc_addr = new rpc_address();
- if (rpc_addr.fromString(addr)) {
- logger.info("add {} as meta server", addr);
- metaList.add(clusterManager.getReplicaSession(rpc_addr));
- } else {
- logger.error("invalid address {}", addr);
+
+ if (addrList.length == 1 && !InetAddressValidator.getInstance().isValid(addrList[0])) {
+ // if the given string is not a valid ip address,
+ // then take it as a hostname for a try.
+ resolveHost(addrList[0]);
+ if (!metaList.isEmpty()) {
+ hostPort = addrList[0];
+ }
+ } else {
+ for (String addr : addrList) {
+ rpc_address rpcAddr = new rpc_address();
+ if (rpcAddr.fromString(addr)) {
+ logger.info("add {} as meta server", addr);
+ metaList.add(clusterManager.getReplicaSession(rpcAddr));
+ } else {
+ logger.error("invalid address {}", addr);
+ }
}
}
if (metaList.isEmpty()) {
@@ -46,13 +55,13 @@ public MetaSession(
this.group = g;
}
- public static final error_types getMetaServiceError(client_operator metaQueryOp) {
+ public static error_types getMetaServiceError(client_operator metaQueryOp) {
if (metaQueryOp.rpc_error.errno != error_types.ERR_OK) return metaQueryOp.rpc_error.errno;
query_cfg_operator op = (query_cfg_operator) metaQueryOp;
return op.get_response().getErr().errno;
}
- public static final rpc_address getMetaServiceForwardAddress(client_operator metaQueryOp) {
+ public static rpc_address getMetaServiceForwardAddress(client_operator metaQueryOp) {
if (metaQueryOp.rpc_error.errno != error_types.ERR_OK) return null;
query_cfg_operator op = (query_cfg_operator) metaQueryOp;
if (op.get_response().getErr().errno != error_types.ERR_FORWARD_TO_OTHERS) return null;
@@ -104,7 +113,7 @@ public final void closeSession() {
}
}
- private final void asyncCall(final MetaRequestRound round) {
+ private void asyncCall(final MetaRequestRound round) {
round.lastSession.asyncSend(
round.op,
new Runnable() {
@@ -116,7 +125,7 @@ public void run() {
eachQueryTimeoutInMills);
}
- private final void onFinishQueryMeta(final MetaRequestRound round) {
+ void onFinishQueryMeta(final MetaRequestRound round) {
client_operator op = round.op;
boolean needDelay = false;
@@ -177,6 +186,17 @@ private final void onFinishQueryMeta(final MetaRequestRound round) {
}
} else if (metaList.get(curLeader) == round.lastSession) {
curLeader = (curLeader + 1) % metaList.size();
+ // try refresh the meta list from DNS
+ // maxResolveCount and "maxQueryCount refresh" is necessary:
+ // for example, maxQueryCount=5, the first error metalist size = 3, when trigger dns
+ // refresh, the "maxQueryCount" may change to 2, the client may can't choose the right
+ // leader when the new metaList size > 2 after retry 2 time. but if the "maxQueryCount"
+ // refresh, the retry will not stop if no maxResolveCount when the meta is error.
+ if (curLeader == 0 && hostPort != null && round.maxResolveCount != 0) {
+ resolveHost(hostPort);
+ round.maxResolveCount--;
+ round.maxQueryCount = metaList.size();
+ }
}
}
round.lastSession = metaList.get(curLeader);
@@ -187,6 +207,10 @@ private final void onFinishQueryMeta(final MetaRequestRound round) {
return;
}
+ retryQueryMeta(round, needDelay);
+ }
+
+ void retryQueryMeta(final MetaRequestRound round, boolean needDelay) {
group.schedule(
new Runnable() {
@Override
@@ -198,7 +222,9 @@ public void run() {
TimeUnit.SECONDS);
}
- private static final class MetaRequestRound {
+ static final class MetaRequestRound {
+ public int maxResolveCount = 2;
+
public client_operator op;
public Runnable callbackFunc;
public int maxQueryCount;
@@ -212,12 +238,61 @@ public MetaRequestRound(client_operator o, Runnable r, int q, ReplicaSession l)
}
}
+ /*
+ * Resolves hostname:port into a set of ip addresses.
+ */
+ void resolveHost(String hostPort) throws IllegalArgumentException {
+ rpc_address[] addrs = resolve(hostPort);
+ if (addrs == null) {
+ logger.error("failed to resolve address \"{}\" into ip addresses", hostPort);
+ return;
+ }
+
+ Set newSet = new TreeSet(Arrays.asList(addrs));
+ Set oldSet = new TreeSet();
+ for (ReplicaSession meta : metaList) {
+ oldSet.add(meta.getAddress());
+ }
+
+ // fast path: do nothing if meta list is unchanged.
+ if (newSet.equals(oldSet)) {
+ return;
+ }
+
+ // removed metas
+ Set removed = new HashSet(oldSet);
+ removed.removeAll(newSet);
+ for (rpc_address addr : removed) {
+ logger.info("meta server {} was removed", addr);
+ for (int i = 0; i < metaList.size(); i++) {
+ if (metaList.get(i).getAddress().equals(addr)) {
+ ReplicaSession session = metaList.remove(i);
+ session.closeSession();
+ }
+ }
+ }
+
+ // newly added metas
+ Set added = new HashSet(newSet);
+ added.removeAll(oldSet);
+ for (rpc_address addr : added) {
+ metaList.add(clusterManager.getReplicaSession(addr));
+ logger.info("add {} as meta server", addr);
+ }
+ }
+
+ // Only for test.
+ List getMetaList() {
+ return metaList;
+ }
+
private ClusterManager clusterManager;
private List metaList;
private int curLeader;
private int eachQueryTimeoutInMills;
private int defaultMaxQueryCount;
private EventLoopGroup group;
+ private String hostPort;
private static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(MetaSession.class);
diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java
index 8675b405..1cc2ddcc 100644
--- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java
+++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java
@@ -124,6 +124,9 @@ public void closeSession() {
VolatileFields f = fields;
if (f.state == ConnState.CONNECTED && f.nettyChannel != null) {
try {
+ // close().sync() means calling system API `close()` synchronously,
+ // but the connection may not be completely closed then, that is,
+ // the state may not be marked as DISCONNECTED immediately.
f.nettyChannel.close().sync();
logger.info("channel to {} closed", address.toString());
} catch (Exception ex) {
@@ -146,7 +149,7 @@ public final rpc_address getAddress() {
return address;
}
- private void doConnect() {
+ void doConnect() {
try {
// we will receive the channel connect event in DefaultHandler.ChannelActive
boot.connect(address.get_ip(), address.get_port())
diff --git a/src/test/java/com/xiaomi/infra/pegasus/base/TestRpcAddress.java b/src/test/java/com/xiaomi/infra/pegasus/base/TestRpcAddress.java
new file mode 100644
index 00000000..6ac95a9a
--- /dev/null
+++ b/src/test/java/com/xiaomi/infra/pegasus/base/TestRpcAddress.java
@@ -0,0 +1,44 @@
+// Copyright (c) 2019, Xiaomi, Inc. All rights reserved.
+// This source code is licensed under the Apache License Version 2.0, which
+// can be found in the LICENSE file in the root directory of this source tree.
+
+package com.xiaomi.infra.pegasus.base;
+
+import com.xiaomi.infra.pegasus.rpc.async.HostNameResolver;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestRpcAddress {
+ @Test
+ public void testResolveFromHostPort() throws Exception {
+ HostNameResolver hostNameResolver = new HostNameResolver();
+ rpc_address[] addrs = hostNameResolver.resolve("127.0.0.1:34601");
+
+ Assert.assertNotNull(addrs);
+ Assert.assertEquals(addrs.length, 1);
+ Assert.assertEquals(addrs[0].get_ip(), "127.0.0.1");
+ Assert.assertEquals(addrs[0].get_port(), 34601);
+
+ addrs = hostNameResolver.resolve("www.baidu.com:80");
+ Assert.assertNotNull(addrs);
+ Assert.assertTrue(addrs.length >= 1);
+
+ addrs = hostNameResolver.resolve("abcabcabcabc:34601");
+ Assert.assertNull(addrs);
+
+ try {
+ addrs = hostNameResolver.resolve("localhost");
+ } catch (IllegalArgumentException e) {
+ e.printStackTrace();
+ Assert.assertNull(addrs);
+ }
+ }
+
+ @Test
+ public void testFromString() throws Exception {
+ rpc_address addr = new rpc_address();
+ Assert.assertTrue(addr.fromString("127.0.0.1:34601"));
+ Assert.assertEquals(addr.get_ip(), "127.0.0.1");
+ Assert.assertEquals(addr.get_port(), 34601);
+ }
+}
diff --git a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/MetaSessionTest.java b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/MetaSessionTest.java
index 011cd9f5..5b973833 100644
--- a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/MetaSessionTest.java
+++ b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/MetaSessionTest.java
@@ -3,28 +3,35 @@
// can be found in the LICENSE file in the root directory of this source tree.
package com.xiaomi.infra.pegasus.rpc.async;
-import com.xiaomi.infra.pegasus.base.*;
-import com.xiaomi.infra.pegasus.operator.*;
+import com.xiaomi.infra.pegasus.base.error_code;
+import com.xiaomi.infra.pegasus.base.error_code.error_types;
+import com.xiaomi.infra.pegasus.base.gpid;
+import com.xiaomi.infra.pegasus.base.rpc_address;
+import com.xiaomi.infra.pegasus.operator.client_operator;
+import com.xiaomi.infra.pegasus.operator.query_cfg_operator;
+import com.xiaomi.infra.pegasus.replication.partition_configuration;
import com.xiaomi.infra.pegasus.replication.query_cfg_request;
+import com.xiaomi.infra.pegasus.replication.query_cfg_response;
import com.xiaomi.infra.pegasus.tools.Toollet;
import com.xiaomi.infra.pegasus.tools.Tools;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
+import org.apache.commons.lang3.reflect.FieldUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mockito;
-/**
- * MetaSession Tester.
- *
- * @author sunweijie@xiaomi.com
- * @version 1.0
- */
public class MetaSessionTest {
+ // "Mockito.when(meta.resolve(("localhost:34601"))).thenReturn(addrs)" is for simulating DNS
+ // resolution: ->
+
@Before
public void before() throws Exception {}
@@ -52,11 +59,10 @@ private static void ensureNotLeader(rpc_address addr) {
/** Method: connect() */
@Test
- public void testConnect() throws Exception {
+ public void testMetaConnect() throws Exception {
// test: first connect to a wrong server
// then it forward to the right server
// then the wrong server crashed
-
String[] addr_list = {"127.0.0.1:34602", "127.0.0.1:34603", "127.0.0.1:34601"};
ClusterManager manager = new ClusterManager(1000, 4, false, null, 60, addr_list);
MetaSession session = manager.getMetaSession();
@@ -94,4 +100,229 @@ public Void call() throws Exception {
manager.close();
}
+
+ private rpc_address[] getAddressFromSession(List sessions) {
+ rpc_address[] results = new rpc_address[sessions.size()];
+ for (int i = 0; i < results.length; i++) {
+ results[i] = sessions.get(i).getAddress();
+ }
+ return results;
+ }
+
+ @Test
+ public void testDNSResolveHost() throws Exception {
+ // ensure meta list keeps consistent with dns.
+ ClusterManager manager =
+ new ClusterManager(
+ 1000,
+ 4,
+ false,
+ null,
+ 60,
+ new String[] {"127.0.0.1:34602", "127.0.0.1:34603", "127.0.0.1:34601"});
+ MetaSession session = manager.getMetaSession();
+ MetaSession meta = Mockito.spy(session);
+ ReplicaSession meta2 = meta.getMetaList().get(0); // 127.0.0.1:34602
+ meta2.doConnect();
+ while (meta2.getState() != ReplicaSession.ConnState.CONNECTED) {
+ Thread.sleep(1);
+ }
+ Assert.assertEquals(meta2.getState(), ReplicaSession.ConnState.CONNECTED);
+
+ // DNS refreshed
+ rpc_address[] addrs = new rpc_address[2];
+ addrs[0] = rpc_address.fromIpPort("172.0.0.1:34601");
+ addrs[1] = rpc_address.fromIpPort("172.0.0.2:34601");
+ // simulating DNS resolution:localhost:34601->{172.0.0.1:34601,172.0.0.2:34601}
+ Mockito.when(meta.resolve(("localhost:34601"))).thenReturn(addrs);
+ Assert.assertArrayEquals(meta.resolve("localhost:34601"), addrs);
+ meta.resolveHost("localhost:34601"); // update local meta list
+ Assert.assertArrayEquals(getAddressFromSession(meta.getMetaList()), addrs);
+ while (meta2.getState() != ReplicaSession.ConnState.DISCONNECTED) {
+ Thread.sleep(1);
+ }
+ // ensure MetaSession#resolveHost will close unused sessions.
+ Assert.assertEquals(meta2.getState(), ReplicaSession.ConnState.DISCONNECTED);
+
+ // DNS refreshed again
+ addrs = new rpc_address[2];
+ addrs[0] = rpc_address.fromIpPort("172.0.0.1:34601");
+ addrs[1] = rpc_address.fromIpPort("172.0.0.3:34601");
+ // simulating DNS resolution:localhost:34601->{172.0.0.1:34601,172.0.0.3:34601}
+ Mockito.when(meta.resolve(("localhost:34601"))).thenReturn(addrs);
+ meta.resolveHost("localhost:34601");
+ Assert.assertArrayEquals(getAddressFromSession(meta.getMetaList()), addrs);
+
+ manager.close();
+ }
+
+ @Test
+ public void testDNSMetaAllChanged() throws Exception {
+ ClusterManager manager =
+ new ClusterManager(1000, 4, false, null, 60, new String[] {"localhost:34601"});
+ MetaSession session = manager.getMetaSession();
+ MetaSession meta = Mockito.spy(session);
+ // curLeader=0, hostPort="localhost:34601"
+
+ // metaList = 172.0.0.1:34601, 172.0.0.2:34601
+ rpc_address[] addrs = new rpc_address[2];
+ addrs[0] = rpc_address.fromIpPort("172.0.0.1:34601");
+ addrs[1] = rpc_address.fromIpPort("172.0.0.2:34601");
+ // simulating DNS resolution:localhost:34601->{172.0.0.1:34601,172.0.0.2:34601}
+ Mockito.when(meta.resolve(("localhost:34601"))).thenReturn(addrs);
+ meta.resolveHost("localhost:34601");
+ Assert.assertArrayEquals(getAddressFromSession(meta.getMetaList()), addrs);
+
+ query_cfg_request req = new query_cfg_request("temp", new ArrayList());
+ client_operator op = new query_cfg_operator(new gpid(-1, -1), req);
+ op.rpc_error.errno = error_code.error_types.ERR_SESSION_RESET;
+ MetaSession.MetaRequestRound round =
+ new MetaSession.MetaRequestRound(
+ op,
+ new Runnable() {
+ @Override
+ public void run() {}
+ },
+ 10,
+ meta.getMetaList().get(0));
+
+ // simulate a failed query meta, but ensure it will not retry after a failure.
+ Mockito.doNothing().when(meta).retryQueryMeta(round, false);
+
+ // DNS updated.
+ rpc_address[] addrs2 = new rpc_address[2];
+ addrs2[0] = rpc_address.fromIpPort("172.0.0.3:34601");
+ addrs2[1] = rpc_address.fromIpPort("172.0.0.4:34601");
+ // simulating DNS resolution:localhost:34601->{172.0.0.3:34601,172.0.0.4:34601}
+ Mockito.when(meta.resolve(("localhost:34601"))).thenReturn(addrs2);
+
+ // meta all dead, query failed.
+ meta.onFinishQueryMeta(round);
+ // switch curLeader to 1, meta list unchanged.
+ Assert.assertArrayEquals(getAddressFromSession(meta.getMetaList()), addrs);
+ Integer curLeader = (Integer) FieldUtils.readField(meta, "curLeader", true);
+ Assert.assertEquals(curLeader.intValue(), 1);
+
+ // failed again
+ meta.onFinishQueryMeta(round);
+ // switch curLeader to 0, meta list updated
+ Assert.assertArrayEquals(getAddressFromSession(meta.getMetaList()), addrs2);
+ curLeader = (Integer) FieldUtils.readField(meta, "curLeader", true);
+ Assert.assertEquals(curLeader.intValue(), 0);
+
+ // retry
+ meta.onFinishQueryMeta(round);
+ Assert.assertArrayEquals(getAddressFromSession(meta.getMetaList()), addrs2);
+ }
+
+ @Test
+ public void testMetaForwardUnknownPrimary() throws Exception {
+ // ensures that client will accept the forwarded meta
+ // into local meta list, and set it to current leader.
+
+ ClusterManager manager =
+ new ClusterManager(1000, 4, false, null, 60, new String[] {"localhost:34601"});
+ MetaSession session = manager.getMetaSession();
+ MetaSession meta = Mockito.spy(session);
+ // curLeader=0, hostPort="localhost:34601"
+
+ // metaList = 172.0.0.1:34601, 172.0.0.2:34601
+ rpc_address[] addrs = new rpc_address[2];
+ addrs[0] = rpc_address.fromIpPort("172.0.0.1:34601");
+ addrs[1] = rpc_address.fromIpPort("172.0.0.2:34601");
+ Mockito.when(meta.resolve(("localhost:34601"))).thenReturn(addrs);
+ meta.resolveHost("localhost:34601");
+ Assert.assertArrayEquals(getAddressFromSession(meta.getMetaList()), addrs);
+
+ query_cfg_request req = new query_cfg_request("temp", new ArrayList());
+ query_cfg_operator op = new query_cfg_operator(new gpid(-1, -1), req);
+ op.rpc_error.errno = error_code.error_types.ERR_OK;
+ FieldUtils.writeField(op, "response", new query_cfg_response(), true);
+ op.get_response().err = new error_code();
+ op.get_response().err.errno = error_code.error_types.ERR_FORWARD_TO_OTHERS;
+ op.get_response().partitions = Arrays.asList(new partition_configuration[1]);
+ op.get_response().partitions.set(0, new partition_configuration());
+ op.get_response().partitions.get(0).primary = rpc_address.fromIpPort("172.0.0.3:34601");
+ MetaSession.MetaRequestRound round =
+ new MetaSession.MetaRequestRound(
+ op,
+ new Runnable() {
+ @Override
+ public void run() {}
+ },
+ 10,
+ meta.getMetaList().get(0));
+
+ // do not retry after a failed QueryMeta.
+ Mockito.doNothing().when(meta).retryQueryMeta(round, false);
+
+ // failed to query meta
+ meta.onFinishQueryMeta(round);
+
+ rpc_address[] addrs2 = Arrays.copyOf(addrs, 3);
+ addrs2[2] = rpc_address.fromIpPort("172.0.0.3:34601");
+
+ // forward to 172.0.0.3:34601
+ Assert.assertArrayEquals(getAddressFromSession(meta.getMetaList()), addrs2);
+ Integer curLeader = (Integer) FieldUtils.readField(meta, "curLeader", true);
+ Assert.assertEquals(curLeader.intValue(), 2);
+ }
+
+ @Test
+ public void testDNSResetMetaMaxQueryCount() {
+ ClusterManager manager =
+ new ClusterManager(1000, 4, false, null, 60, new String[] {"localhost:34601"});
+ MetaSession metaMock = Mockito.spy(manager.getMetaSession());
+
+ List metaList = metaMock.getMetaList();
+ metaList.remove(0); // del the "localhost:34601"
+ metaList.add(manager.getReplicaSession(rpc_address.fromIpPort("172.0.0.1:34602")));
+ metaList.add(manager.getReplicaSession(rpc_address.fromIpPort("172.0.0.1:34603")));
+ metaList.add(manager.getReplicaSession(rpc_address.fromIpPort("172.0.0.1:34601")));
+
+ rpc_address[] newAddrs = new rpc_address[5];
+ newAddrs[0] = rpc_address.fromIpPort("137.0.0.1:34602");
+ newAddrs[1] = rpc_address.fromIpPort("137.0.0.1:34603");
+ // one of the followings is the real primary.
+ newAddrs[2] = rpc_address.fromIpPort("127.0.0.1:34602");
+ newAddrs[3] = rpc_address.fromIpPort("127.0.0.1:34603");
+ newAddrs[4] = rpc_address.fromIpPort("127.0.0.1:34601");
+
+ // DNS refreshed
+ Mockito.when(metaMock.resolve("localhost:34601")).thenReturn(newAddrs);
+
+ query_cfg_request req = new query_cfg_request("temp", new ArrayList());
+ client_operator op = new query_cfg_operator(new gpid(-1, -1), req);
+
+ // `MetaSession#query` will first query the 3 old addresses (and failed), then resolve the DNS
+ // and find the 5 new addresses.
+ // Even though the given maxQueryCount is given 3, the total query count is at least 6.
+ metaMock.query(op, metaList.size());
+ error_types err = MetaSession.getMetaServiceError(op);
+ Assert.assertEquals(error_code.error_types.ERR_OK, err);
+ }
+
+ @Test
+ public void testDNSMetaUnavailable() {
+ // Ensures when the DNS returns meta all unavailable, finally the query will timeout.
+ ClusterManager manager =
+ new ClusterManager(1000, 4, false, null, 60, new String[] {"localhost:34601"});
+ MetaSession metaMock = Mockito.spy(manager.getMetaSession());
+ List metaList = metaMock.getMetaList();
+ metaList.remove(0); // del the "localhost:34601"
+ metaList.add(manager.getReplicaSession(rpc_address.fromIpPort("172.0.0.1:34602")));
+ metaList.add(manager.getReplicaSession(rpc_address.fromIpPort("172.0.0.1:34603")));
+ metaList.add(manager.getReplicaSession(rpc_address.fromIpPort("172.0.0.1:34601")));
+ rpc_address[] newAddrs =
+ new rpc_address[] {
+ rpc_address.fromIpPort("137.0.0.1:34602"),
+ rpc_address.fromIpPort("137.0.0.1:34603"),
+ rpc_address.fromIpPort("137.0.0.1:34601")
+ };
+ Mockito.when(metaMock.resolve("localhost:34601")).thenReturn(newAddrs);
+ query_cfg_request req = new query_cfg_request("temp", new ArrayList());
+ client_operator op = new query_cfg_operator(new gpid(-1, -1), req);
+ metaMock.query(op, metaList.size());
+ Assert.assertEquals(error_types.ERR_TIMEOUT, MetaSession.getMetaServiceError(op));
+ }
}