From 63ad1a515e081e22c9865dc9eef83674bc3f4b92 Mon Sep 17 00:00:00 2001 From: Wu Tao Date: Tue, 24 Sep 2019 17:03:53 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20set=20error=20to=20ERR=5FSESSION=5FRESET?= =?UTF-8?q?=20to=20trigger=20meta=20query=20while=20se=E2=80=A6=20(#54)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 1 + scripts/travis.sh | 14 +++++++---- .../infra/pegasus/client/PException.java | 6 +++-- .../pegasus/rpc/ReplicationException.java | 2 +- .../pegasus/rpc/async/ReplicaSession.java | 23 +++++++++++-------- 5 files changed, 29 insertions(+), 17 deletions(-) diff --git a/.gitignore b/.gitignore index 6c7cde0f..a70baad8 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,4 @@ log.txt rolling_log/ .vscode/ google-java-format-* +pegasus-* diff --git a/scripts/travis.sh b/scripts/travis.sh index 1c3798b3..bcbdc7ad 100755 --- a/scripts/travis.sh +++ b/scripts/travis.sh @@ -26,17 +26,23 @@ if [[ $(git status -s) ]]; then exit 1 fi +PEGASUS_PKG="pegasus-tools-1.11.6-9f4e5ae-glibc2.12-release" +PEGASUS_PKG_URL="https://github.com/XiaoMi/pegasus/releases/download/v1.11.6/pegasus-tools-1.11.6-9f4e5ae-glibc2.12-release.tar.gz" + # start pegasus onebox environment -wget https://github.com/XiaoMi/pegasus/releases/download/v1.11.3/pegasus-1.11.3-b45cb06-linux-x86_64-release.zip -unzip pegasus-1.11.3-b45cb06-linux-x86_64-release.zip -cd pegasus-1.11.3-b45cb06-linux-x86_64-release +if [ ! -f $PEGASUS_PKG.tar.gz ]; then + wget $PEGASUS_PKG_URL + tar xvf $PEGASUS_PKG.tar.gz +fi +cd $PEGASUS_PKG +sed -i "s#https://github.com/xiaomi/pegasus-common/raw/master/zookeeper-3.4.6.tar.gz#https://github.com/XiaoMi/pegasus-common/releases/download/deps/zookeeper-3.4.6.tar.gz#" scripts/start_zk.sh ./run.sh start_onebox -w cd ../ if ! mvn clean test then - cd pegasus-1.11.3-b45cb06-linux-x86_64-release + cd $PEGASUS_PKG ./run.sh list_onebox exit 1 fi diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PException.java b/src/main/java/com/xiaomi/infra/pegasus/client/PException.java index b675dab8..feec9a93 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PException.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PException.java @@ -4,8 +4,10 @@ package com.xiaomi.infra.pegasus.client; /** - * @author qinzuoyan - *

Pegasus exception. + * The generic type of exception thrown by all of the Pegasus APIs. + * + *

Common strategies of handling PException include retrying, or ignoring. We recommend you to + * log the exception for future debugging. */ public class PException extends Exception { private static final long serialVersionUID = 4436491238550521203L; diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/ReplicationException.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/ReplicationException.java index fc5f8ce0..b2000816 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/ReplicationException.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/ReplicationException.java @@ -20,7 +20,7 @@ public ReplicationException(error_code.error_types t) { } public ReplicationException(error_code.error_types t, String message) { - super(t.name() + ": " + message); + super(t.name() + (message.isEmpty() ? "" : (": " + message))); err_type = t; } diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java index 6ce9757a..8675b405 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java @@ -22,7 +22,6 @@ import java.util.concurrent.atomic.AtomicLong; import org.slf4j.Logger; -/** Created by weijiesun on 17-9-13. */ public class ReplicaSession { public static class RequestEntry { public int sequenceId; @@ -87,7 +86,7 @@ public int asyncSend(client_operator op, Runnable callbackFunc, long timeoutInMi entry.callback = callbackFunc; // NOTICE: must make sure the msg is put into the pendingResponse map BEFORE // the timer task is scheduled. - pendingResponse.put(new Integer(entry.sequenceId), entry); + pendingResponse.put(entry.sequenceId, entry); entry.timeoutTask = addTimer(entry.sequenceId, timeoutInMilliseconds); entry.timeoutMs = timeoutInMilliseconds; @@ -136,7 +135,7 @@ public void closeSession() { } public RequestEntry getAndRemoveEntry(int seqID) { - return pendingResponse.remove(new Integer(seqID)); + return pendingResponse.remove(seqID); } public final String name() { @@ -179,7 +178,7 @@ private void markSessionConnected(Channel activeChannel) { synchronized (pendingSend) { while (!pendingSend.isEmpty()) { RequestEntry e = pendingSend.poll(); - if (pendingResponse.get(new Integer(e.sequenceId)) != null) { + if (pendingResponse.get(e.sequenceId) != null) { write(e, newCache); } else { logger.info("{}: {} is removed from pending, perhaps timeout", name(), e.sequenceId); @@ -221,6 +220,7 @@ private void markSessionDisconnect() { } } + // Notify the RPC sender if failure occurred. private void tryNotifyWithSequenceID(int seqID, error_types errno, boolean isTimeoutTask) { logger.debug( "{}: {} is notified with error {}, isTimeoutTask {}", @@ -228,12 +228,9 @@ private void tryNotifyWithSequenceID(int seqID, error_types errno, boolean isTim seqID, errno.toString(), isTimeoutTask); - RequestEntry entry = pendingResponse.remove(new Integer(seqID)); + RequestEntry entry = pendingResponse.remove(seqID); if (entry != null) { if (!isTimeoutTask) entry.timeoutTask.cancel(true); - entry.op.rpc_error.errno = errno; - entry.callback.run(); - if (errno == error_types.ERR_TIMEOUT) { long firstTs = firstRecentTimedOutMs.get(); if (firstTs == 0) { @@ -246,12 +243,15 @@ private void tryNotifyWithSequenceID(int seqID, error_types errno, boolean isTim "{}: actively close the session because it's not responding for {} seconds", name(), sessionResetTimeWindowMs / 1000); - closeSession(); + closeSession(); // maybe fail when the session is already disconnected. + errno = error_types.ERR_SESSION_RESET; } } } else { firstRecentTimedOutMs.set(0); } + entry.op.rpc_error.errno = errno; + entry.callback.run(); } else { logger.warn( "{}: {} is removed by others, current error {}, isTimeoutTask {}", @@ -284,6 +284,9 @@ public void operationComplete(ChannelFuture channelFuture) throws Exception { }); } + // Notify the RPC caller when times out. If the RPC finishes in time, + // this task will be cancelled. + // TODO(wutao1): call it addTimeoutTicker private ScheduledFuture addTimer(final int seqID, long timeoutInMillseconds) { return rpcGroup.schedule( new Runnable() { @@ -337,7 +340,7 @@ ConnState getState() { } interface MessageResponseFilter { - public boolean abandonIt(error_types err, TMessage header); + boolean abandonIt(error_types err, TMessage header); } MessageResponseFilter filter = null;