From 25601844b9f9d1d6f6d626041b10c9958ad60376 Mon Sep 17 00:00:00 2001 From: neverchanje Date: Mon, 6 Jan 2020 17:35:40 +0800 Subject: [PATCH 01/11] fix: prevent session to be actively closed when it gets response from server --- .../pegasus/rpc/async/ReplicaSession.java | 45 +++++++------------ .../rpc/async/SessionFailureDetector.java | 40 +++++++++++++++++ .../pegasus/rpc/async/ReplicaSessionTest.java | 8 ++-- 3 files changed, 61 insertions(+), 32 deletions(-) create mode 100644 src/main/java/com/xiaomi/infra/pegasus/rpc/async/SessionFailureDetector.java diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java index 2bab88f3..71fab7b1 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java @@ -59,7 +59,7 @@ public void initChannel(SocketChannel ch) { } }); - this.firstRecentTimedOutMs = new AtomicLong(0); + failureDetector = new SessionFailureDetector(); } // You can specify a message response filter with constructor or with "setMessageResponseFilter" @@ -238,14 +238,14 @@ void markSessionDisconnect() { try { while (!pendingSend.isEmpty()) { RequestEntry e = pendingSend.poll(); - tryNotifyWithSequenceID(e.sequenceId, error_types.ERR_SESSION_RESET, false); + tryNotifyFailureWithSeqID(e.sequenceId, error_types.ERR_SESSION_RESET, false); } List l = new LinkedList(); for (Map.Entry entry : pendingResponse.entrySet()) { l.add(entry.getValue()); } for (RequestEntry e : l) { - tryNotifyWithSequenceID(e.sequenceId, error_types.ERR_SESSION_RESET, false); + tryNotifyFailureWithSeqID(e.sequenceId, error_types.ERR_SESSION_RESET, false); } } catch (Exception e) { logger.error( @@ -267,7 +267,7 @@ void markSessionDisconnect() { } // Notify the RPC sender if failure occurred. - void tryNotifyWithSequenceID(int seqID, error_types errno, boolean isTimeoutTask) + void tryNotifyFailureWithSeqID(int seqID, error_types errno, boolean isTimeoutTask) throws Exception { logger.debug( "{}: {} is notified with error {}, isTimeoutTask {}", @@ -275,29 +275,22 @@ void tryNotifyWithSequenceID(int seqID, error_types errno, boolean isTimeoutTask seqID, errno.toString(), isTimeoutTask); + assert errno == error_types.ERR_TIMEOUT || errno == error_types.ERR_SESSION_RESET; RequestEntry entry = pendingResponse.remove(seqID); if (entry != null) { if (!isTimeoutTask && entry.timeoutTask != null) { entry.timeoutTask.cancel(true); } + // The error must be ERR_TIMEOUT or ERR_SESSION_RESET if (errno == error_types.ERR_TIMEOUT) { - long firstTs = firstRecentTimedOutMs.get(); - if (firstTs == 0) { - // it is the first timeout in the window. - firstRecentTimedOutMs.set(System.currentTimeMillis()); - } else if (System.currentTimeMillis() - firstTs >= sessionResetTimeWindowMs) { - // ensure that closeSession() will be invoked only once. - if (firstRecentTimedOutMs.compareAndSet(firstTs, 0)) { - logger.warn( - "{}: actively close the session because it's not responding for {} seconds", - name(), - sessionResetTimeWindowMs / 1000); - closeSession(); // maybe fail when the session is already disconnected. - errno = error_types.ERR_SESSION_RESET; - } + if (failureDetector.markTimeout()) { + logger.warn( + "{}: actively close the session because it's not responding for {} seconds", + name(), + SessionFailureDetector.FAILURE_DETECT_WINDOW_MS / 1000); + closeSession(); // maybe fail when the session is already disconnected. + errno = error_types.ERR_SESSION_RESET; } - } else { - firstRecentTimedOutMs.set(0); } entry.op.rpc_error.errno = errno; entry.callback.run(); @@ -327,7 +320,7 @@ public void operationComplete(ChannelFuture channelFuture) throws Exception { name(), entry.sequenceId, channelFuture.cause()); - tryNotifyWithSequenceID(entry.sequenceId, error_types.ERR_TIMEOUT, false); + tryNotifyFailureWithSeqID(entry.sequenceId, error_types.ERR_TIMEOUT, false); } } }); @@ -342,7 +335,7 @@ private ScheduledFuture addTimer(final int seqID, long timeoutInMillseconds) @Override public void run() { try { - tryNotifyWithSequenceID(seqID, error_types.ERR_TIMEOUT, true); + tryNotifyFailureWithSeqID(seqID, error_types.ERR_TIMEOUT, true); } catch (Exception e) { logger.warn("try notify with sequenceID {} exception!", seqID, e); } @@ -368,6 +361,7 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception { @Override public void channelRead0(ChannelHandlerContext ctx, final RequestEntry msg) { logger.debug("{}: handle response with seqid({})", name(), msg.sequenceId); + failureDetector.markOK(); // This session is currently healthy. if (msg.callback != null) { msg.callback.run(); } else { @@ -415,12 +409,7 @@ static final class VolatileFields { private Bootstrap boot; private EventLoopGroup rpcGroup; - // Session will be actively closed if all the rpcs across `sessionResetTimeWindowMs` - // are timed out, in that case we suspect that the server is unavailable. - - // Timestamp of the first timed out rpc. - private AtomicLong firstRecentTimedOutMs; - private static final long sessionResetTimeWindowMs = 10 * 1000; // 10s + private SessionFailureDetector failureDetector; private static final Logger logger = org.slf4j.LoggerFactory.getLogger(ReplicaSession.class); } diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/SessionFailureDetector.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/SessionFailureDetector.java new file mode 100644 index 00000000..b40c2307 --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/SessionFailureDetector.java @@ -0,0 +1,40 @@ +package com.xiaomi.infra.pegasus.rpc.async; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * SessionFailureDetector detects whether the session is half-closed by the remote host, in which + * case we need to actively close the session and reconnect. + */ +public class SessionFailureDetector { + + // Timestamp of the first timed out rpc. + private AtomicLong firstRecentTimedOutMs; + + // Session is marked failure if all the RPCs across + // `sessionResetTimeWindowMs` are timed out. + public static final long FAILURE_DETECT_WINDOW_MS = 10 * 1000; // 10s + + public SessionFailureDetector() { + this.firstRecentTimedOutMs = new AtomicLong(0); + } + + /** @return true if session is confirmed to be failed. */ + public boolean markTimeout() { + // The error must be ERR_TIMEOUT or ERR_SESSION_RESET + long firstTs = firstRecentTimedOutMs.get(); + if (firstTs == 0) { + // it is the first timeout in the window. + firstRecentTimedOutMs.set(System.currentTimeMillis()); + } else if (System.currentTimeMillis() - firstTs >= FAILURE_DETECT_WINDOW_MS) { + // ensure that session will be closed only once. + return firstRecentTimedOutMs.compareAndSet(firstTs, 0); + } + return false; + } + + /** Mark this session to be healthy. */ + public void markOK() { + firstRecentTimedOutMs.set(0); + } +} diff --git a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSessionTest.java b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSessionTest.java index 5cd8ccfa..9f8d41d4 100644 --- a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSessionTest.java +++ b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSessionTest.java @@ -220,7 +220,7 @@ public void testTryNotifyWithSequenceID() throws Exception { // no pending RequestEntry, ensure no NPE thrown Assert.assertTrue(rs.pendingResponse.isEmpty()); try { - rs.tryNotifyWithSequenceID(100, error_code.error_types.ERR_TIMEOUT, false); + rs.tryNotifyFailureWithSeqID(100, error_code.error_types.ERR_TIMEOUT, false); } catch (Exception e) { Assert.assertNull(e); } @@ -234,12 +234,12 @@ public void testTryNotifyWithSequenceID() throws Exception { entry.timeoutTask = null; // simulate the timeoutTask has been null entry.op = new rrdb_put_operator(new gpid(1, 1), null, null, 0); rs.pendingResponse.put(100, entry); - rs.tryNotifyWithSequenceID(100, error_code.error_types.ERR_TIMEOUT, false); + rs.tryNotifyFailureWithSeqID(100, error_code.error_types.ERR_TIMEOUT, false); Assert.assertTrue(passed.get()); // simulate the entry has been removed, ensure no NPE thrown rs.getAndRemoveEntry(entry.sequenceId); - rs.tryNotifyWithSequenceID(entry.sequenceId, entry.op.rpc_error.errno, true); + rs.tryNotifyFailureWithSeqID(entry.sequenceId, entry.op.rpc_error.errno, true); // ensure mark session state to disconnect when TryNotifyWithSequenceID incur any exception ReplicaSession mockRs = Mockito.spy(rs); @@ -247,7 +247,7 @@ public void testTryNotifyWithSequenceID() throws Exception { mockRs.fields.state = ConnState.CONNECTED; Mockito.doThrow(new Exception()) .when(mockRs) - .tryNotifyWithSequenceID(entry.sequenceId, entry.op.rpc_error.errno, false); + .tryNotifyFailureWithSeqID(entry.sequenceId, entry.op.rpc_error.errno, false); mockRs.markSessionDisconnect(); Assert.assertEquals(mockRs.getState(), ConnState.DISCONNECTED); } From 31ea60d0f83ab00589a4fd6b95e4025444e617ed Mon Sep 17 00:00:00 2001 From: neverchanje Date: Mon, 6 Jan 2020 17:40:56 +0800 Subject: [PATCH 02/11] fix --- .../java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java index 71fab7b1..598fd73b 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java @@ -19,7 +19,6 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; import org.slf4j.Logger; public class ReplicaSession { From d1c66d69bb59df59b3cca7f0ee7c1009263ec51e Mon Sep 17 00:00:00 2001 From: neverchanje Date: Mon, 6 Jan 2020 17:42:17 +0800 Subject: [PATCH 03/11] fix --- .../infra/pegasus/rpc/async/ReplicaSession.java | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java index 598fd73b..578c69ce 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java @@ -281,15 +281,13 @@ void tryNotifyFailureWithSeqID(int seqID, error_types errno, boolean isTimeoutTa entry.timeoutTask.cancel(true); } // The error must be ERR_TIMEOUT or ERR_SESSION_RESET - if (errno == error_types.ERR_TIMEOUT) { - if (failureDetector.markTimeout()) { - logger.warn( - "{}: actively close the session because it's not responding for {} seconds", - name(), - SessionFailureDetector.FAILURE_DETECT_WINDOW_MS / 1000); - closeSession(); // maybe fail when the session is already disconnected. - errno = error_types.ERR_SESSION_RESET; - } + if (errno == error_types.ERR_TIMEOUT && failureDetector.markTimeout()) { + logger.warn( + "{}: actively close the session because it's not responding for {} seconds", + name(), + SessionFailureDetector.FAILURE_DETECT_WINDOW_MS / 1000); + closeSession(); // maybe fail when the session is already disconnected. + errno = error_types.ERR_SESSION_RESET; } entry.op.rpc_error.errno = errno; entry.callback.run(); From f0e9ddeedc256ef5d77edf6ea8a49099c280dbd0 Mon Sep 17 00:00:00 2001 From: Wu Tao Date: Thu, 21 May 2020 11:56:52 +0800 Subject: [PATCH 04/11] Update .travis.yml --- .travis.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.travis.yml b/.travis.yml index f4e2c41b..9bee03a1 100644 --- a/.travis.yml +++ b/.travis.yml @@ -4,6 +4,8 @@ language: java jdk: - openjdk8 + - openjdk11 + - oraclejdk11 cache: directories: From 43e992d160d411bc3ca128cec19865cf7ca43891 Mon Sep 17 00:00:00 2001 From: neverchanje Date: Thu, 21 May 2020 12:10:00 +0800 Subject: [PATCH 05/11] fix --- .../java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java index 70d657c6..ebb7aac4 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java @@ -18,7 +18,6 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; import org.apache.thrift.protocol.TMessage; import org.slf4j.Logger; From 3516b0662099f94437e6727c2fc1962fd3563ad7 Mon Sep 17 00:00:00 2001 From: neverchanje Date: Tue, 15 Sep 2020 19:30:56 +0800 Subject: [PATCH 06/11] fix --- .travis.yml | 1 + .../java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java | 1 - .../xiaomi/infra/pegasus/rpc/async/SessionFailureDetector.java | 2 +- 3 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index f4e2c41b..9791013f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -4,6 +4,7 @@ language: java jdk: - openjdk8 + - openjdk11 cache: directories: diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java index ee704f07..1d56f071 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java @@ -293,7 +293,6 @@ void tryNotifyFailureWithSeqID(int seqID, error_types errno, boolean isTimeoutTa seqID, errno.toString(), isTimeoutTask); - assert errno == error_types.ERR_TIMEOUT || errno == error_types.ERR_SESSION_RESET; RequestEntry entry = pendingResponse.remove(seqID); if (entry != null) { if (!isTimeoutTask && entry.timeoutTask != null) { diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/SessionFailureDetector.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/SessionFailureDetector.java index b40c2307..1cd7ec11 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/SessionFailureDetector.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/SessionFailureDetector.java @@ -12,7 +12,7 @@ public class SessionFailureDetector { private AtomicLong firstRecentTimedOutMs; // Session is marked failure if all the RPCs across - // `sessionResetTimeWindowMs` are timed out. + // `FAILURE_DETECT_WINDOW_MS` are timed out. public static final long FAILURE_DETECT_WINDOW_MS = 10 * 1000; // 10s public SessionFailureDetector() { From 6832bc2ffc93c8d79eec4095cd9e15c6fce5010e Mon Sep 17 00:00:00 2001 From: neverchanje Date: Thu, 17 Sep 2020 14:39:55 +0800 Subject: [PATCH 07/11] fix --- .../pegasus/rpc/async/ReplicaSession.java | 21 +++++++++++-------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java index 1d56f071..c4539ee5 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java @@ -285,8 +285,7 @@ void markSessionDisconnect() { } // Notify the RPC sender if failure occurred. - void tryNotifyFailureWithSeqID(int seqID, error_types errno, boolean isTimeoutTask) - throws Exception { + void tryNotifyFailureWithSeqID(int seqID, error_types errno, boolean isTimeoutTask) { logger.debug( "{}: {} is notified with error {}, isTimeoutTask {}", name(), @@ -299,13 +298,17 @@ void tryNotifyFailureWithSeqID(int seqID, error_types errno, boolean isTimeoutTa entry.timeoutTask.cancel(true); } // The error must be ERR_TIMEOUT or ERR_SESSION_RESET - if (errno == error_types.ERR_TIMEOUT && failureDetector.markTimeout()) { - logger.warn( - "{}: actively close the session because it's not responding for {} seconds", - name(), - SessionFailureDetector.FAILURE_DETECT_WINDOW_MS / 1000); - closeSession(); // maybe fail when the session is already disconnected. - errno = error_types.ERR_SESSION_RESET; + if (errno == error_types.ERR_TIMEOUT) { + if (failureDetector.markTimeout()) { + logger.warn( + "{}: actively close the session because it's not responding for {} seconds", + name(), + SessionFailureDetector.FAILURE_DETECT_WINDOW_MS / 1000); + closeSession(); // maybe fail when the session is already disconnected. + errno = error_types.ERR_SESSION_RESET; + } + } else { + failureDetector.markOK(); } entry.op.rpc_error.errno = errno; entry.callback.run(); From dfe1a6cc509a48dafb11fe0227fbfd271505fd7d Mon Sep 17 00:00:00 2001 From: neverchanje Date: Thu, 17 Sep 2020 14:55:56 +0800 Subject: [PATCH 08/11] fix --- .../com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java index c4539ee5..50fc2066 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java @@ -285,7 +285,8 @@ void markSessionDisconnect() { } // Notify the RPC sender if failure occurred. - void tryNotifyFailureWithSeqID(int seqID, error_types errno, boolean isTimeoutTask) { + void tryNotifyFailureWithSeqID(int seqID, error_types errno, boolean isTimeoutTask) + throws Exception { logger.debug( "{}: {} is notified with error {}, isTimeoutTask {}", name(), From e0201cd32827c5ec413fe0815b0be699de78a6f2 Mon Sep 17 00:00:00 2001 From: Wu Tao Date: Fri, 25 Sep 2020 10:54:31 +0800 Subject: [PATCH 09/11] add license --- .../rpc/async/SessionFailureDetector.java | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/SessionFailureDetector.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/SessionFailureDetector.java index 1cd7ec11..cbfff205 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/SessionFailureDetector.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/SessionFailureDetector.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + package com.xiaomi.infra.pegasus.rpc.async; import java.util.concurrent.atomic.AtomicLong; From d13bc51f08fd2baf9a6740b7e5d59911ff048f45 Mon Sep 17 00:00:00 2001 From: neverchanje Date: Fri, 25 Sep 2020 13:31:04 +0800 Subject: [PATCH 10/11] fix --- .../com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java index 50fc2066..c4539ee5 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java @@ -285,8 +285,7 @@ void markSessionDisconnect() { } // Notify the RPC sender if failure occurred. - void tryNotifyFailureWithSeqID(int seqID, error_types errno, boolean isTimeoutTask) - throws Exception { + void tryNotifyFailureWithSeqID(int seqID, error_types errno, boolean isTimeoutTask) { logger.debug( "{}: {} is notified with error {}, isTimeoutTask {}", name(), From 4fd1d3032dbf4b18b001dbed6a126f722fb67e83 Mon Sep 17 00:00:00 2001 From: neverchanje Date: Fri, 25 Sep 2020 13:34:44 +0800 Subject: [PATCH 11/11] format --- .../infra/pegasus/rpc/async/SessionFailureDetector.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/SessionFailureDetector.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/SessionFailureDetector.java index cbfff205..a31fef87 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/SessionFailureDetector.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/SessionFailureDetector.java @@ -6,15 +6,15 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. -*/ + */ package com.xiaomi.infra.pegasus.rpc.async;