From d6b0d19237fc931805b45b8b32d33a527607d2b6 Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Tue, 25 Aug 2020 16:35:51 +0800 Subject: [PATCH 01/32] init --- scripts/format-all.sh | 1 + .../pegasus/rpc/async/ClientRequestRound.java | 6 +- .../infra/pegasus/rpc/async/TableHandler.java | 47 +++++++------- .../interceptor/BackupRequestInterceptor.java | 64 +++++++++++++++++++ .../tools/interceptor/InterceptorManger.java | 23 +++++++ .../tools/interceptor/TableInterceptor.java | 11 ++++ 6 files changed, 126 insertions(+), 26 deletions(-) create mode 100644 src/main/java/com/xiaomi/infra/pegasus/tools/interceptor/BackupRequestInterceptor.java create mode 100644 src/main/java/com/xiaomi/infra/pegasus/tools/interceptor/InterceptorManger.java create mode 100644 src/main/java/com/xiaomi/infra/pegasus/tools/interceptor/TableInterceptor.java diff --git a/scripts/format-all.sh b/scripts/format-all.sh index 995bc3f4..f013bef0 100755 --- a/scripts/format-all.sh +++ b/scripts/format-all.sh @@ -10,6 +10,7 @@ SRC_FILES=(src/main/java/com/xiaomi/infra/pegasus/client/*.java src/main/java/com/xiaomi/infra/pegasus/rpc/async/*.java src/main/java/com/xiaomi/infra/pegasus/operator/*.java src/main/java/com/xiaomi/infra/pegasus/tools/*.java + src/main/java/com/xiaomi/infra/pegasus/tools/interceptor/*.java src/main/java/com/xiaomi/infra/pegasus/base/*.java src/main/java/com/xiaomi/infra/pegasus/example/*.java src/test/java/com/xiaomi/infra/pegasus/client/*.java diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ClientRequestRound.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ClientRequestRound.java index 2818f511..9b2003f9 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ClientRequestRound.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ClientRequestRound.java @@ -15,13 +15,14 @@ public final class ClientRequestRound { client_operator operator; Table.ClientOPCallback callback; - long timeoutMs; + public long timeoutMs; boolean enableCounter; long createNanoTime; long expireNanoTime; boolean isCompleted; - ScheduledFuture backupRequestTask; + int tryId; + public ScheduledFuture backupRequestTask; /** * Constructor. @@ -42,6 +43,7 @@ public ClientRequestRound( this.createNanoTime = System.nanoTime(); this.expireNanoTime = createNanoTime + timeoutInMilliseconds; this.isCompleted = false; + this.tryId = 1; this.backupRequestTask = null; } diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java index 3c9605c9..779f1106 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java @@ -38,19 +38,19 @@ public static final class ReplicaConfiguration { public List secondarySessions = new ArrayList<>(); } - static final class TableConfiguration { - ArrayList replicas; - long updateVersion; + public static final class TableConfiguration { + public ArrayList replicas; + public long updateVersion; } private static final Logger logger = org.slf4j.LoggerFactory.getLogger(TableHandler.class); ClusterManager manager_; - EventExecutor executor_; // should be only one thread in this service + public EventExecutor executor_; // should be only one thread in this service - AtomicReference tableConfig_; + public AtomicReference tableConfig_; AtomicBoolean inQuerying_; long lastQueryTime_; - int backupRequestDelayMs; + public int backupRequestDelayMs; public TableHandler(ClusterManager mgr, String name, TableOptions options) throws ReplicationException { @@ -237,8 +237,7 @@ public void run() { return true; } - void onRpcReply( - ClientRequestRound round, int tryId, long cachedConfigVersion, String serverAddr) { + public void onRpcReply(ClientRequestRound round, long cachedConfigVersion, String serverAddr) { // judge if it is the first response if (round.isCompleted) { return; @@ -272,7 +271,7 @@ void onRpcReply( serverAddr, operator.get_gpid().toString(), operator, - tryId, + round.tryId, operator.rpc_error.errno.toString()); break; @@ -286,7 +285,7 @@ void onRpcReply( serverAddr, operator.get_gpid().toString(), operator, - tryId, + round.tryId, operator.rpc_error.errno.toString()); needQueryMeta = true; break; @@ -300,7 +299,7 @@ void onRpcReply( serverAddr, operator.get_gpid().toString(), operator, - tryId, + round.tryId, operator.rpc_error.errno.toString()); break; @@ -312,7 +311,7 @@ void onRpcReply( serverAddr, operator.get_gpid().toString(), operator, - tryId, + round.tryId, operator.rpc_error.errno.toString()); round.thisRoundCompletion(); return; @@ -332,17 +331,18 @@ void onRpcReply( round.enableCounter, round.expireNanoTime, round.timeoutMs); - tryDelayCall(delayRequestRound, tryId + 1); + tryDelayCall(delayRequestRound); } - void tryDelayCall(final ClientRequestRound round, final int tryId) { + void tryDelayCall(final ClientRequestRound round) { + round.tryId++; long nanoDelay = manager_.getRetryDelay(round.timeoutMs) * 1000000L; if (round.expireNanoTime - System.nanoTime() > nanoDelay) { executor_.schedule( new Runnable() { @Override public void run() { - call(round, tryId); + call(round); } }, nanoDelay, @@ -357,7 +357,7 @@ public void run() { } } - void call(final ClientRequestRound round, final int tryId) { + void call(final ClientRequestRound round) { // tableConfig & handle is initialized in constructor, so both shouldn't be null final TableConfiguration tableConfig = tableConfig_.get(); final ReplicaConfiguration handle = @@ -366,7 +366,7 @@ void call(final ClientRequestRound round, final int tryId) { if (handle.primarySession != null) { // if backup request is enabled, schedule to send to secondary if (round.operator.enableBackupRequest && isBackupRequestEnabled()) { - backupCall(round, tryId); + backupCall(round); } // send request to primary @@ -375,7 +375,7 @@ void call(final ClientRequestRound round, final int tryId) { new Runnable() { @Override public void run() { - onRpcReply(round, tryId, tableConfig.updateVersion, handle.primarySession.name()); + onRpcReply(round, tableConfig.updateVersion, handle.primarySession.name()); } }, round.timeoutMs, @@ -386,13 +386,13 @@ public void run() { tableName_, round.getOperator().get_gpid().toString(), round.getOperator(), - tryId); + round.tryId); tryQueryMeta(tableConfig.updateVersion); - tryDelayCall(round, tryId + 1); + tryDelayCall(round); } } - void backupCall(final ClientRequestRound round, final int tryId) { + void backupCall(final ClientRequestRound round) { final TableConfiguration tableConfig = tableConfig_.get(); final ReplicaConfiguration handle = tableConfig.replicas.get(round.getOperator().get_gpid().get_pidx()); @@ -411,8 +411,7 @@ public void run() { new Runnable() { @Override public void run() { - onRpcReply( - round, tryId, tableConfig.updateVersion, secondarySession.name()); + onRpcReply(round, tableConfig.updateVersion, secondarySession.name()); } }, round.timeoutMs, @@ -483,7 +482,7 @@ public void asyncOperate(client_operator op, ClientOPCallback callback, int time ClientRequestRound round = new ClientRequestRound(op, callback, manager_.counterEnabled(), (long) timeoutMs); - call(round, 1); + call(round); } private void handleMetaException(error_types err_type, ClusterManager mgr, String name) diff --git a/src/main/java/com/xiaomi/infra/pegasus/tools/interceptor/BackupRequestInterceptor.java b/src/main/java/com/xiaomi/infra/pegasus/tools/interceptor/BackupRequestInterceptor.java new file mode 100644 index 00000000..c9893211 --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/tools/interceptor/BackupRequestInterceptor.java @@ -0,0 +1,64 @@ +package com.xiaomi.infra.pegasus.tools.interceptor; + +import com.xiaomi.infra.pegasus.rpc.async.ClientRequestRound; +import com.xiaomi.infra.pegasus.rpc.async.ReplicaSession; +import com.xiaomi.infra.pegasus.rpc.async.TableHandler; +import com.xiaomi.infra.pegasus.rpc.async.TableHandler.ReplicaConfiguration; +import com.xiaomi.infra.pegasus.rpc.async.TableHandler.TableConfiguration; +import java.util.Random; +import java.util.concurrent.TimeUnit; + +public class BackupRequestInterceptor implements TableInterceptor { + private boolean isEnable; + + private TableHandler tableHandler; + private ClientRequestRound clientRequestRound; + + public BackupRequestInterceptor( + boolean isEnable, TableHandler tableHandler, ClientRequestRound clientRequestRound) { + this.isEnable = isEnable; + this.tableHandler = tableHandler; + this.clientRequestRound = clientRequestRound; + } + + @Override + public void interceptBefore() throws Exception { + backupCall(); + } + + @Override + public void interceptAfter() throws Exception {} + + @Override + public boolean isEnable() { + return isEnable; + } + + private void backupCall() { + final TableConfiguration tableConfig = tableHandler.tableConfig_.get(); + final ReplicaConfiguration handle = + tableConfig.replicas.get(clientRequestRound.getOperator().get_gpid().get_pidx()); + + clientRequestRound.backupRequestTask = + tableHandler.executor_.schedule( + () -> { + // pick a secondary at random + ReplicaSession secondarySession = + handle.secondarySessions.get( + new Random().nextInt(handle.secondarySessions.size())); + secondarySession.asyncSend( + clientRequestRound.getOperator(), + new Runnable() { + @Override + public void run() { + tableHandler.onRpcReply( + clientRequestRound, tableConfig.updateVersion, secondarySession.name()); + } + }, + clientRequestRound.timeoutMs, + true); + }, + tableHandler.backupRequestDelayMs, + TimeUnit.MILLISECONDS); + } +} diff --git a/src/main/java/com/xiaomi/infra/pegasus/tools/interceptor/InterceptorManger.java b/src/main/java/com/xiaomi/infra/pegasus/tools/interceptor/InterceptorManger.java new file mode 100644 index 00000000..f21fa71c --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/tools/interceptor/InterceptorManger.java @@ -0,0 +1,23 @@ +package com.xiaomi.infra.pegasus.tools.interceptor; + +import java.util.ArrayList; +import java.util.List; + +public class InterceptorManger { + + List interceptors = new ArrayList<>(); + + public InterceptorManger add(TableInterceptor interceptor) { + interceptors.add(interceptor); + return this; + } + + public void excuteBefore() { + for(TableInterceptor interceptor: interceptors) { + if(interceptor.isEnable()){ + interceptor. + } + } + } + +} diff --git a/src/main/java/com/xiaomi/infra/pegasus/tools/interceptor/TableInterceptor.java b/src/main/java/com/xiaomi/infra/pegasus/tools/interceptor/TableInterceptor.java new file mode 100644 index 00000000..9b8f88c8 --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/tools/interceptor/TableInterceptor.java @@ -0,0 +1,11 @@ +package com.xiaomi.infra.pegasus.tools.interceptor; + + +public interface TableInterceptor { + // The behavior before the ReplicaSession sends the RPC. + void interceptBefore() throws Exception; + // The behavior after the ReplicaSession get reply or failure of the RPC. + void interceptAfter() throws Exception; + + boolean isEnable(); +} From 9dae4cedbf2dcc657e9d739cc127f31b4759c724 Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Tue, 25 Aug 2020 18:50:20 +0800 Subject: [PATCH 02/32] init the api --- .../infra/pegasus/rpc/TableOptions.java | 4 ++ .../infra/pegasus/rpc/async/TableHandler.java | 42 ++++--------------- .../interceptor/BackupRequestInterceptor.java | 37 +++++++--------- .../tools/interceptor/InterceptorManger.java | 29 ++++++++----- .../tools/interceptor/TableInterceptor.java | 10 +++-- 5 files changed, 51 insertions(+), 71 deletions(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/TableOptions.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/TableOptions.java index 6b6d41ec..a780959a 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/TableOptions.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/TableOptions.java @@ -24,4 +24,8 @@ public TableOptions(KeyHasher h, int backupRequestDelay) { this.keyHasher = h; this.backupRequestDelayMs = backupRequestDelay; } + + public boolean enableBackupRequest() { + return backupRequestDelayMs > 0; + } } diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java index 779f1106..9e14fd54 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java @@ -16,12 +16,13 @@ import com.xiaomi.infra.pegasus.rpc.ReplicationException; import com.xiaomi.infra.pegasus.rpc.Table; import com.xiaomi.infra.pegasus.rpc.TableOptions; +import com.xiaomi.infra.pegasus.tools.interceptor.BackupRequestInterceptor; +import com.xiaomi.infra.pegasus.tools.interceptor.InterceptorManger; import io.netty.channel.ChannelFuture; import io.netty.util.concurrent.EventExecutor; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.Random; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -51,6 +52,7 @@ public static final class TableConfiguration { AtomicBoolean inQuerying_; long lastQueryTime_; public int backupRequestDelayMs; + private InterceptorManger interceptorManger; public TableHandler(ClusterManager mgr, String name, TableOptions options) throws ReplicationException { @@ -107,6 +109,10 @@ public TableHandler(ClusterManager mgr, String name, TableOptions options) inQuerying_ = new AtomicBoolean(false); lastQueryTime_ = 0; + + this.interceptorManger = new InterceptorManger(); + + interceptorManger.add(new BackupRequestInterceptor(options.enableBackupRequest())); } public ReplicaConfiguration getReplicaConfig(int index) { @@ -365,9 +371,7 @@ void call(final ClientRequestRound round) { if (handle.primarySession != null) { // if backup request is enabled, schedule to send to secondary - if (round.operator.enableBackupRequest && isBackupRequestEnabled()) { - backupCall(round); - } + interceptorManger.executeBefore(round, this); // send request to primary handle.primarySession.asyncSend( @@ -392,36 +396,6 @@ public void run() { } } - void backupCall(final ClientRequestRound round) { - final TableConfiguration tableConfig = tableConfig_.get(); - final ReplicaConfiguration handle = - tableConfig.replicas.get(round.getOperator().get_gpid().get_pidx()); - - round.backupRequestTask = - executor_.schedule( - new Runnable() { - @Override - public void run() { - // pick a secondary at random - ReplicaSession secondarySession = - handle.secondarySessions.get( - new Random().nextInt(handle.secondarySessions.size())); - secondarySession.asyncSend( - round.getOperator(), - new Runnable() { - @Override - public void run() { - onRpcReply(round, tableConfig.updateVersion, secondarySession.name()); - } - }, - round.timeoutMs, - true); - } - }, - backupRequestDelayMs, - TimeUnit.MILLISECONDS); - } - @Override public int getPartitionCount() { return tableConfig_.get().replicas.size(); diff --git a/src/main/java/com/xiaomi/infra/pegasus/tools/interceptor/BackupRequestInterceptor.java b/src/main/java/com/xiaomi/infra/pegasus/tools/interceptor/BackupRequestInterceptor.java index c9893211..0c9dabde 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/tools/interceptor/BackupRequestInterceptor.java +++ b/src/main/java/com/xiaomi/infra/pegasus/tools/interceptor/BackupRequestInterceptor.java @@ -1,5 +1,6 @@ package com.xiaomi.infra.pegasus.tools.interceptor; +import com.xiaomi.infra.pegasus.base.error_code.error_types; import com.xiaomi.infra.pegasus.rpc.async.ClientRequestRound; import com.xiaomi.infra.pegasus.rpc.async.ReplicaSession; import com.xiaomi.infra.pegasus.rpc.async.TableHandler; @@ -9,32 +10,28 @@ import java.util.concurrent.TimeUnit; public class BackupRequestInterceptor implements TableInterceptor { - private boolean isEnable; - private TableHandler tableHandler; - private ClientRequestRound clientRequestRound; + private boolean isOpen; - public BackupRequestInterceptor( - boolean isEnable, TableHandler tableHandler, ClientRequestRound clientRequestRound) { - this.isEnable = isEnable; - this.tableHandler = tableHandler; - this.clientRequestRound = clientRequestRound; + public BackupRequestInterceptor(boolean isOpen) { + this.isOpen = isOpen; } @Override - public void interceptBefore() throws Exception { - backupCall(); + public void interceptBefore(ClientRequestRound clientRequestRound, TableHandler tableHandler) { + backupCall(clientRequestRound, tableHandler); } @Override - public void interceptAfter() throws Exception {} + public void interceptAfter( + ClientRequestRound clientRequestRound, error_types errno, TableHandler tableHandler) {} - @Override - public boolean isEnable() { - return isEnable; - } + private void backupCall(ClientRequestRound clientRequestRound, TableHandler tableHandler) { + + if (!isOpen || !clientRequestRound.getOperator().enableBackupRequest) { + return; + } - private void backupCall() { final TableConfiguration tableConfig = tableHandler.tableConfig_.get(); final ReplicaConfiguration handle = tableConfig.replicas.get(clientRequestRound.getOperator().get_gpid().get_pidx()); @@ -48,13 +45,9 @@ private void backupCall() { new Random().nextInt(handle.secondarySessions.size())); secondarySession.asyncSend( clientRequestRound.getOperator(), - new Runnable() { - @Override - public void run() { + () -> tableHandler.onRpcReply( - clientRequestRound, tableConfig.updateVersion, secondarySession.name()); - } - }, + clientRequestRound, tableConfig.updateVersion, secondarySession.name()), clientRequestRound.timeoutMs, true); }, diff --git a/src/main/java/com/xiaomi/infra/pegasus/tools/interceptor/InterceptorManger.java b/src/main/java/com/xiaomi/infra/pegasus/tools/interceptor/InterceptorManger.java index f21fa71c..8bb5af33 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/tools/interceptor/InterceptorManger.java +++ b/src/main/java/com/xiaomi/infra/pegasus/tools/interceptor/InterceptorManger.java @@ -1,23 +1,30 @@ package com.xiaomi.infra.pegasus.tools.interceptor; +import com.xiaomi.infra.pegasus.base.error_code.error_types; +import com.xiaomi.infra.pegasus.rpc.async.ClientRequestRound; +import com.xiaomi.infra.pegasus.rpc.async.TableHandler; import java.util.ArrayList; import java.util.List; public class InterceptorManger { - List interceptors = new ArrayList<>(); + private List interceptors = new ArrayList<>(); - public InterceptorManger add(TableInterceptor interceptor) { - interceptors.add(interceptor); - return this; - } + public InterceptorManger add(TableInterceptor interceptor) { + interceptors.add(interceptor); + return this; + } - public void excuteBefore() { - for(TableInterceptor interceptor: interceptors) { - if(interceptor.isEnable()){ - interceptor. - } - } + public void executeBefore(ClientRequestRound clientRequestRound, TableHandler tableHandler) { + for (TableInterceptor interceptor : interceptors) { + interceptor.interceptBefore(clientRequestRound, tableHandler); } + } + public void executeAfter( + ClientRequestRound clientRequestRound, error_types errno, TableHandler tableHandler) { + for (TableInterceptor interceptor : interceptors) { + interceptor.interceptAfter(clientRequestRound, errno, tableHandler); + } + } } diff --git a/src/main/java/com/xiaomi/infra/pegasus/tools/interceptor/TableInterceptor.java b/src/main/java/com/xiaomi/infra/pegasus/tools/interceptor/TableInterceptor.java index 9b8f88c8..1f745014 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/tools/interceptor/TableInterceptor.java +++ b/src/main/java/com/xiaomi/infra/pegasus/tools/interceptor/TableInterceptor.java @@ -1,11 +1,13 @@ package com.xiaomi.infra.pegasus.tools.interceptor; +import com.xiaomi.infra.pegasus.base.error_code.error_types; +import com.xiaomi.infra.pegasus.rpc.async.ClientRequestRound; +import com.xiaomi.infra.pegasus.rpc.async.TableHandler; public interface TableInterceptor { // The behavior before the ReplicaSession sends the RPC. - void interceptBefore() throws Exception; + void interceptBefore(ClientRequestRound clientRequestRound, TableHandler tableHandler); // The behavior after the ReplicaSession get reply or failure of the RPC. - void interceptAfter() throws Exception; - - boolean isEnable(); + void interceptAfter( + ClientRequestRound clientRequestRound, error_types errno, TableHandler tableHandler); } From a3637aa42ee80e25cf9b7c388daa507bfb3ea5cc Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Wed, 26 Aug 2020 19:27:15 +0800 Subject: [PATCH 03/32] add compress interceptor --- .../rrdb_check_and_mutate_operator.java | 4 + .../operator/rrdb_check_and_set_operator.java | 4 + .../operator/rrdb_multi_put_operator.java | 4 + .../pegasus/operator/rrdb_put_operator.java | 4 + .../infra/pegasus/rpc/TableOptions.java | 10 +- .../infra/pegasus/rpc/async/TableHandler.java | 24 +++- .../infra/pegasus/tools/ZstdWrapper.java | 17 +++ .../interceptor/CompressInterceptor.java | 114 ++++++++++++++++++ .../tools/interceptor/InterceptorManger.java | 7 +- .../tools/interceptor/TableInterceptor.java | 7 +- 10 files changed, 185 insertions(+), 10 deletions(-) create mode 100644 src/main/java/com/xiaomi/infra/pegasus/tools/interceptor/CompressInterceptor.java diff --git a/src/main/java/com/xiaomi/infra/pegasus/operator/rrdb_check_and_mutate_operator.java b/src/main/java/com/xiaomi/infra/pegasus/operator/rrdb_check_and_mutate_operator.java index ed8a61a1..2c724e21 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/operator/rrdb_check_and_mutate_operator.java +++ b/src/main/java/com/xiaomi/infra/pegasus/operator/rrdb_check_and_mutate_operator.java @@ -47,6 +47,10 @@ public check_and_mutate_response get_response() { return resp; } + public check_and_mutate_request get_request() { + return request; + } + private check_and_mutate_request request; private check_and_mutate_response resp; } diff --git a/src/main/java/com/xiaomi/infra/pegasus/operator/rrdb_check_and_set_operator.java b/src/main/java/com/xiaomi/infra/pegasus/operator/rrdb_check_and_set_operator.java index 4d167b9e..3e04f636 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/operator/rrdb_check_and_set_operator.java +++ b/src/main/java/com/xiaomi/infra/pegasus/operator/rrdb_check_and_set_operator.java @@ -47,6 +47,10 @@ public check_and_set_response get_response() { return resp; } + public check_and_set_request get_request() { + return request; + } + private check_and_set_request request; private check_and_set_response resp; } diff --git a/src/main/java/com/xiaomi/infra/pegasus/operator/rrdb_multi_put_operator.java b/src/main/java/com/xiaomi/infra/pegasus/operator/rrdb_multi_put_operator.java index b81e01db..c8399aaa 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/operator/rrdb_multi_put_operator.java +++ b/src/main/java/com/xiaomi/infra/pegasus/operator/rrdb_multi_put_operator.java @@ -47,6 +47,10 @@ public update_response get_response() { return resp; } + public multi_put_request get_request() { + return request; + } + private multi_put_request request; private update_response resp; } diff --git a/src/main/java/com/xiaomi/infra/pegasus/operator/rrdb_put_operator.java b/src/main/java/com/xiaomi/infra/pegasus/operator/rrdb_put_operator.java index 20532851..1b06d2fc 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/operator/rrdb_put_operator.java +++ b/src/main/java/com/xiaomi/infra/pegasus/operator/rrdb_put_operator.java @@ -46,6 +46,10 @@ public update_response get_response() { return resp; } + public update_request get_request() { + return request; + } + private update_request request; private update_response resp; } diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/TableOptions.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/TableOptions.java index a780959a..dc442aaf 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/TableOptions.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/TableOptions.java @@ -7,6 +7,7 @@ public class TableOptions { private final KeyHasher keyHasher; private final int backupRequestDelayMs; + private final boolean enableCompress; public KeyHasher keyHasher() { return this.keyHasher; @@ -17,15 +18,20 @@ public int backupRequestDelayMs() { } public static TableOptions forTest() { - return new TableOptions(KeyHasher.DEFAULT, 0); + return new TableOptions(KeyHasher.DEFAULT, 0, false); } - public TableOptions(KeyHasher h, int backupRequestDelay) { + public TableOptions(KeyHasher h, int backupRequestDelay, boolean enableCompress) { this.keyHasher = h; this.backupRequestDelayMs = backupRequestDelay; + this.enableCompress = enableCompress; } public boolean enableBackupRequest() { return backupRequestDelayMs > 0; } + + public boolean enableCompress() { + return enableCompress; + } } diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java index 9e14fd54..c35ecad2 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java @@ -17,6 +17,7 @@ import com.xiaomi.infra.pegasus.rpc.Table; import com.xiaomi.infra.pegasus.rpc.TableOptions; import com.xiaomi.infra.pegasus.tools.interceptor.BackupRequestInterceptor; +import com.xiaomi.infra.pegasus.tools.interceptor.CompressInterceptor; import com.xiaomi.infra.pegasus.tools.interceptor.InterceptorManger; import io.netty.channel.ChannelFuture; import io.netty.util.concurrent.EventExecutor; @@ -112,7 +113,9 @@ public TableHandler(ClusterManager mgr, String name, TableOptions options) this.interceptorManger = new InterceptorManger(); - interceptorManger.add(new BackupRequestInterceptor(options.enableBackupRequest())); + interceptorManger + .add(new BackupRequestInterceptor(options.enableBackupRequest())) + .add(new CompressInterceptor(options.enableCompress())); } public ReplicaConfiguration getReplicaConfig(int index) { @@ -263,6 +266,11 @@ public void onRpcReply(ClientRequestRound round, long cachedConfigVersion, Strin } client_operator operator = round.getOperator(); + try { + interceptorManger.executeAfter(round, operator.rpc_error.errno, this); + } catch (PException e) { + logger.warn("interceptorManger executeAfter failed!"); + } boolean needQueryMeta = false; switch (operator.rpc_error.errno) { case ERR_OK: @@ -348,7 +356,11 @@ void tryDelayCall(final ClientRequestRound round) { new Runnable() { @Override public void run() { - call(round); + try { + call(round); + } catch (PException e) { + logger.warn("try delay call failed"); + } } }, nanoDelay, @@ -363,7 +375,7 @@ public void run() { } } - void call(final ClientRequestRound round) { + void call(final ClientRequestRound round) throws PException { // tableConfig & handle is initialized in constructor, so both shouldn't be null final TableConfiguration tableConfig = tableConfig_.get(); final ReplicaConfiguration handle = @@ -456,7 +468,11 @@ public void asyncOperate(client_operator op, ClientOPCallback callback, int time ClientRequestRound round = new ClientRequestRound(op, callback, manager_.counterEnabled(), (long) timeoutMs); - call(round); + try { + call(round); + } catch (PException e) { + logger.warn("call failed"); + } } private void handleMetaException(error_types err_type, ClusterManager mgr, String name) diff --git a/src/main/java/com/xiaomi/infra/pegasus/tools/ZstdWrapper.java b/src/main/java/com/xiaomi/infra/pegasus/tools/ZstdWrapper.java index eace0af8..a5d19b67 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/tools/ZstdWrapper.java +++ b/src/main/java/com/xiaomi/infra/pegasus/tools/ZstdWrapper.java @@ -12,6 +12,23 @@ public class ZstdWrapper { private ZstdWrapper() {} + /** + * try decompress the `src`, return `src` directly if failed + * + * @param src + * @return + */ + public static byte[] tryDecompress(byte[] src) { + byte[] decompressedValue; + try { + decompressedValue = decompress(src); + } catch (PException e) { + // decompress fail + decompressedValue = src; + } + return decompressedValue; + } + /** * Compresses the `src` and returns the compressed. * diff --git a/src/main/java/com/xiaomi/infra/pegasus/tools/interceptor/CompressInterceptor.java b/src/main/java/com/xiaomi/infra/pegasus/tools/interceptor/CompressInterceptor.java new file mode 100644 index 00000000..6442536c --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/tools/interceptor/CompressInterceptor.java @@ -0,0 +1,114 @@ +package com.xiaomi.infra.pegasus.tools.interceptor; + +import com.xiaomi.infra.pegasus.apps.key_value; +import com.xiaomi.infra.pegasus.apps.mutate; +import com.xiaomi.infra.pegasus.base.error_code.error_types; +import com.xiaomi.infra.pegasus.client.PException; +import com.xiaomi.infra.pegasus.operator.rrdb_check_and_mutate_operator; +import com.xiaomi.infra.pegasus.operator.rrdb_check_and_set_operator; +import com.xiaomi.infra.pegasus.operator.rrdb_get_operator; +import com.xiaomi.infra.pegasus.operator.rrdb_multi_get_operator; +import com.xiaomi.infra.pegasus.operator.rrdb_multi_put_operator; +import com.xiaomi.infra.pegasus.operator.rrdb_put_operator; +import com.xiaomi.infra.pegasus.operator.rrdb_scan_operator; +import com.xiaomi.infra.pegasus.rpc.async.ClientRequestRound; +import com.xiaomi.infra.pegasus.rpc.async.TableHandler; +import com.xiaomi.infra.pegasus.tools.ZstdWrapper; +import java.util.List; + +public class CompressInterceptor implements TableInterceptor { + private boolean isOpen; + + public CompressInterceptor(boolean isOpen) { + this.isOpen = isOpen; + } + + @Override + public void interceptBefore(ClientRequestRound clientRequestRound, TableHandler tableHandler) + throws PException { + tryCompress(clientRequestRound); + } + + @Override + public void interceptAfter( + ClientRequestRound clientRequestRound, error_types errno, TableHandler tableHandler) + throws PException { + if (errno == error_types.ERR_OK) { + tryDecompress(clientRequestRound); + } + } + + private void tryCompress(ClientRequestRound clientRequestRound) throws PException { + String operatorName = clientRequestRound.getOperator().name(); + switch (operatorName) { + case "put": + { + rrdb_put_operator operator = (rrdb_put_operator) clientRequestRound.getOperator(); + operator.get_request().value.data = + ZstdWrapper.compress(operator.get_request().value.data); + } + case "multi_put": + { + rrdb_multi_put_operator operator = + (rrdb_multi_put_operator) clientRequestRound.getOperator(); + List kvs = operator.get_request().kvs; + for (key_value kv : kvs) { + kv.value.data = ZstdWrapper.compress(kv.value.data); + } + break; + } + case "check_and_set": + { + rrdb_check_and_set_operator operator = + (rrdb_check_and_set_operator) clientRequestRound.getOperator(); + operator.get_request().set_value.data = + ZstdWrapper.compress(operator.get_request().set_value.data); + break; + } + case "check_and_mutate": + { + rrdb_check_and_mutate_operator operator = + (rrdb_check_and_mutate_operator) clientRequestRound.getOperator(); + List mutates = operator.get_request().mutate_list; + for (mutate mu : mutates) { + mu.value.data = ZstdWrapper.compress(mu.value.data); + } + } + default: + throw new PException("unsupported operator = " + operatorName); + } + } + + private void tryDecompress(ClientRequestRound clientRequestRound) throws PException { + String operatorName = clientRequestRound.getOperator().name(); + switch (operatorName) { + case "get": + { + ZstdWrapper.tryDecompress( + ((rrdb_get_operator) clientRequestRound.getOperator()).get_response().value.data); + break; + } + case "multi_get": + { + rrdb_multi_get_operator operator = + (rrdb_multi_get_operator) clientRequestRound.getOperator(); + List kvs = operator.get_response().kvs; + for (key_value kv : kvs) { + kv.value.data = ZstdWrapper.tryDecompress(kv.value.data); + } + break; + } + case "scan": + { + rrdb_scan_operator operator = (rrdb_scan_operator) clientRequestRound.getOperator(); + List kvs = operator.get_response().kvs; + for (key_value kv : kvs) { + kv.value.data = ZstdWrapper.tryDecompress(kv.value.data); + } + break; + } + default: + throw new PException("unsupported operator = " + operatorName); + } + } +} diff --git a/src/main/java/com/xiaomi/infra/pegasus/tools/interceptor/InterceptorManger.java b/src/main/java/com/xiaomi/infra/pegasus/tools/interceptor/InterceptorManger.java index 8bb5af33..6f15ddea 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/tools/interceptor/InterceptorManger.java +++ b/src/main/java/com/xiaomi/infra/pegasus/tools/interceptor/InterceptorManger.java @@ -1,6 +1,7 @@ package com.xiaomi.infra.pegasus.tools.interceptor; import com.xiaomi.infra.pegasus.base.error_code.error_types; +import com.xiaomi.infra.pegasus.client.PException; import com.xiaomi.infra.pegasus.rpc.async.ClientRequestRound; import com.xiaomi.infra.pegasus.rpc.async.TableHandler; import java.util.ArrayList; @@ -15,14 +16,16 @@ public InterceptorManger add(TableInterceptor interceptor) { return this; } - public void executeBefore(ClientRequestRound clientRequestRound, TableHandler tableHandler) { + public void executeBefore(ClientRequestRound clientRequestRound, TableHandler tableHandler) + throws PException { for (TableInterceptor interceptor : interceptors) { interceptor.interceptBefore(clientRequestRound, tableHandler); } } public void executeAfter( - ClientRequestRound clientRequestRound, error_types errno, TableHandler tableHandler) { + ClientRequestRound clientRequestRound, error_types errno, TableHandler tableHandler) + throws PException { for (TableInterceptor interceptor : interceptors) { interceptor.interceptAfter(clientRequestRound, errno, tableHandler); } diff --git a/src/main/java/com/xiaomi/infra/pegasus/tools/interceptor/TableInterceptor.java b/src/main/java/com/xiaomi/infra/pegasus/tools/interceptor/TableInterceptor.java index 1f745014..bbdb789d 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/tools/interceptor/TableInterceptor.java +++ b/src/main/java/com/xiaomi/infra/pegasus/tools/interceptor/TableInterceptor.java @@ -1,13 +1,16 @@ package com.xiaomi.infra.pegasus.tools.interceptor; import com.xiaomi.infra.pegasus.base.error_code.error_types; +import com.xiaomi.infra.pegasus.client.PException; import com.xiaomi.infra.pegasus.rpc.async.ClientRequestRound; import com.xiaomi.infra.pegasus.rpc.async.TableHandler; public interface TableInterceptor { // The behavior before the ReplicaSession sends the RPC. - void interceptBefore(ClientRequestRound clientRequestRound, TableHandler tableHandler); + void interceptBefore(ClientRequestRound clientRequestRound, TableHandler tableHandler) + throws PException; // The behavior after the ReplicaSession get reply or failure of the RPC. void interceptAfter( - ClientRequestRound clientRequestRound, error_types errno, TableHandler tableHandler); + ClientRequestRound clientRequestRound, error_types errno, TableHandler tableHandler) + throws PException; } From cd24eed0f0e0514ee2fadc83c4a2bd75a22a3363 Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Thu, 27 Aug 2020 10:33:19 +0800 Subject: [PATCH 04/32] fix open --- .../pegasus/tools/interceptor/CompressInterceptor.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/tools/interceptor/CompressInterceptor.java b/src/main/java/com/xiaomi/infra/pegasus/tools/interceptor/CompressInterceptor.java index 6442536c..3e8add4b 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/tools/interceptor/CompressInterceptor.java +++ b/src/main/java/com/xiaomi/infra/pegasus/tools/interceptor/CompressInterceptor.java @@ -26,6 +26,9 @@ public CompressInterceptor(boolean isOpen) { @Override public void interceptBefore(ClientRequestRound clientRequestRound, TableHandler tableHandler) throws PException { + if (!isOpen) { + return; + } tryCompress(clientRequestRound); } @@ -33,9 +36,10 @@ public void interceptBefore(ClientRequestRound clientRequestRound, TableHandler public void interceptAfter( ClientRequestRound clientRequestRound, error_types errno, TableHandler tableHandler) throws PException { - if (errno == error_types.ERR_OK) { - tryDecompress(clientRequestRound); + if (errno != error_types.ERR_OK || !isOpen) { + return; } + tryDecompress(clientRequestRound); } private void tryCompress(ClientRequestRound clientRequestRound) throws PException { From 482e3359d900aaf1fa143d8dbd8f39336ee71e67 Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Mon, 31 Aug 2020 12:05:17 +0800 Subject: [PATCH 05/32] move and after --- scripts/format-all.sh | 2 +- .../xiaomi/infra/pegasus/rpc/async/TableHandler.java | 11 ++++------- .../interceptor/BackupRequestInterceptor.java | 9 +++++++-- .../{tools => rpc}/interceptor/InterceptorManger.java | 2 +- .../{tools => rpc}/interceptor/TableInterceptor.java | 2 +- 5 files changed, 14 insertions(+), 12 deletions(-) rename src/main/java/com/xiaomi/infra/pegasus/{tools => rpc}/interceptor/BackupRequestInterceptor.java (89%) rename src/main/java/com/xiaomi/infra/pegasus/{tools => rpc}/interceptor/InterceptorManger.java (94%) rename src/main/java/com/xiaomi/infra/pegasus/{tools => rpc}/interceptor/TableInterceptor.java (91%) diff --git a/scripts/format-all.sh b/scripts/format-all.sh index f013bef0..78c95b1e 100755 --- a/scripts/format-all.sh +++ b/scripts/format-all.sh @@ -10,7 +10,7 @@ SRC_FILES=(src/main/java/com/xiaomi/infra/pegasus/client/*.java src/main/java/com/xiaomi/infra/pegasus/rpc/async/*.java src/main/java/com/xiaomi/infra/pegasus/operator/*.java src/main/java/com/xiaomi/infra/pegasus/tools/*.java - src/main/java/com/xiaomi/infra/pegasus/tools/interceptor/*.java + src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/*.java src/main/java/com/xiaomi/infra/pegasus/base/*.java src/main/java/com/xiaomi/infra/pegasus/example/*.java src/test/java/com/xiaomi/infra/pegasus/client/*.java diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java index 9e14fd54..cd98d25b 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java @@ -16,8 +16,8 @@ import com.xiaomi.infra.pegasus.rpc.ReplicationException; import com.xiaomi.infra.pegasus.rpc.Table; import com.xiaomi.infra.pegasus.rpc.TableOptions; -import com.xiaomi.infra.pegasus.tools.interceptor.BackupRequestInterceptor; -import com.xiaomi.infra.pegasus.tools.interceptor.InterceptorManger; +import com.xiaomi.infra.pegasus.rpc.interceptor.BackupRequestInterceptor; +import com.xiaomi.infra.pegasus.rpc.interceptor.InterceptorManger; import io.netty.channel.ChannelFuture; import io.netty.util.concurrent.EventExecutor; import java.util.ArrayList; @@ -248,7 +248,7 @@ public void onRpcReply(ClientRequestRound round, long cachedConfigVersion, Strin if (round.isCompleted) { return; } else { - synchronized (round) { + synchronized (TableHandler.class) { // the fastest response has been received if (round.isCompleted) { return; @@ -257,10 +257,7 @@ public void onRpcReply(ClientRequestRound round, long cachedConfigVersion, Strin } } - // cancel the backup request task - if (round.backupRequestTask != null) { - round.backupRequestTask.cancel(true); - } + interceptorManger.executeAfter(round, round.getOperator().rpc_error.errno, this); client_operator operator = round.getOperator(); boolean needQueryMeta = false; diff --git a/src/main/java/com/xiaomi/infra/pegasus/tools/interceptor/BackupRequestInterceptor.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/BackupRequestInterceptor.java similarity index 89% rename from src/main/java/com/xiaomi/infra/pegasus/tools/interceptor/BackupRequestInterceptor.java rename to src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/BackupRequestInterceptor.java index 0c9dabde..171a6264 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/tools/interceptor/BackupRequestInterceptor.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/BackupRequestInterceptor.java @@ -1,4 +1,4 @@ -package com.xiaomi.infra.pegasus.tools.interceptor; +package com.xiaomi.infra.pegasus.rpc.interceptor; import com.xiaomi.infra.pegasus.base.error_code.error_types; import com.xiaomi.infra.pegasus.rpc.async.ClientRequestRound; @@ -24,7 +24,12 @@ public void interceptBefore(ClientRequestRound clientRequestRound, TableHandler @Override public void interceptAfter( - ClientRequestRound clientRequestRound, error_types errno, TableHandler tableHandler) {} + ClientRequestRound clientRequestRound, error_types errno, TableHandler tableHandler) { + // cancel the backup request task + if (clientRequestRound.backupRequestTask != null) { + clientRequestRound.backupRequestTask.cancel(true); + } + } private void backupCall(ClientRequestRound clientRequestRound, TableHandler tableHandler) { diff --git a/src/main/java/com/xiaomi/infra/pegasus/tools/interceptor/InterceptorManger.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/InterceptorManger.java similarity index 94% rename from src/main/java/com/xiaomi/infra/pegasus/tools/interceptor/InterceptorManger.java rename to src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/InterceptorManger.java index 8bb5af33..8779403e 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/tools/interceptor/InterceptorManger.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/InterceptorManger.java @@ -1,4 +1,4 @@ -package com.xiaomi.infra.pegasus.tools.interceptor; +package com.xiaomi.infra.pegasus.rpc.interceptor; import com.xiaomi.infra.pegasus.base.error_code.error_types; import com.xiaomi.infra.pegasus.rpc.async.ClientRequestRound; diff --git a/src/main/java/com/xiaomi/infra/pegasus/tools/interceptor/TableInterceptor.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/TableInterceptor.java similarity index 91% rename from src/main/java/com/xiaomi/infra/pegasus/tools/interceptor/TableInterceptor.java rename to src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/TableInterceptor.java index 1f745014..10db465f 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/tools/interceptor/TableInterceptor.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/TableInterceptor.java @@ -1,4 +1,4 @@ -package com.xiaomi.infra.pegasus.tools.interceptor; +package com.xiaomi.infra.pegasus.rpc.interceptor; import com.xiaomi.infra.pegasus.base.error_code.error_types; import com.xiaomi.infra.pegasus.rpc.async.ClientRequestRound; From 2edb6a06407365ab5eb37da871404689c394dbf5 Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Mon, 31 Aug 2020 14:01:45 +0800 Subject: [PATCH 06/32] move and after --- .../java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java | 4 ++-- .../infra/pegasus/rpc/interceptor/InterceptorManger.java | 4 ++-- .../infra/pegasus/rpc/interceptor/TableInterceptor.java | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java index cd98d25b..32bb6d8d 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java @@ -257,7 +257,7 @@ public void onRpcReply(ClientRequestRound round, long cachedConfigVersion, Strin } } - interceptorManger.executeAfter(round, round.getOperator().rpc_error.errno, this); + interceptorManger.interceptAfter(round, round.getOperator().rpc_error.errno, this); client_operator operator = round.getOperator(); boolean needQueryMeta = false; @@ -368,7 +368,7 @@ void call(final ClientRequestRound round) { if (handle.primarySession != null) { // if backup request is enabled, schedule to send to secondary - interceptorManger.executeBefore(round, this); + interceptorManger.interceptBefore(round, this); // send request to primary handle.primarySession.asyncSend( diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/InterceptorManger.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/InterceptorManger.java index 8779403e..15a9f44a 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/InterceptorManger.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/InterceptorManger.java @@ -15,13 +15,13 @@ public InterceptorManger add(TableInterceptor interceptor) { return this; } - public void executeBefore(ClientRequestRound clientRequestRound, TableHandler tableHandler) { + public void interceptBefore(ClientRequestRound clientRequestRound, TableHandler tableHandler) { for (TableInterceptor interceptor : interceptors) { interceptor.interceptBefore(clientRequestRound, tableHandler); } } - public void executeAfter( + public void interceptAfter( ClientRequestRound clientRequestRound, error_types errno, TableHandler tableHandler) { for (TableInterceptor interceptor : interceptors) { interceptor.interceptAfter(clientRequestRound, errno, tableHandler); diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/TableInterceptor.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/TableInterceptor.java index 10db465f..c2e290e8 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/TableInterceptor.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/TableInterceptor.java @@ -5,9 +5,9 @@ import com.xiaomi.infra.pegasus.rpc.async.TableHandler; public interface TableInterceptor { - // The behavior before the ReplicaSession sends the RPC. + // The behavior before sending the RPC to a table. void interceptBefore(ClientRequestRound clientRequestRound, TableHandler tableHandler); - // The behavior after the ReplicaSession get reply or failure of the RPC. + // The behavior after getting reply or failure of the RPC. void interceptAfter( ClientRequestRound clientRequestRound, error_types errno, TableHandler tableHandler); } From e7ecefbcce70b4206675efe4a5b50dd957bf3ac3 Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Mon, 31 Aug 2020 14:07:08 +0800 Subject: [PATCH 07/32] move and after --- .../java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java | 1 - .../infra/pegasus/rpc/interceptor/BackupRequestInterceptor.java | 1 - .../xiaomi/infra/pegasus/rpc/interceptor/TableInterceptor.java | 2 +- 3 files changed, 1 insertion(+), 3 deletions(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java index 32bb6d8d..00cc8c5f 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java @@ -367,7 +367,6 @@ void call(final ClientRequestRound round) { tableConfig.replicas.get(round.getOperator().get_gpid().get_pidx()); if (handle.primarySession != null) { - // if backup request is enabled, schedule to send to secondary interceptorManger.interceptBefore(round, this); // send request to primary diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/BackupRequestInterceptor.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/BackupRequestInterceptor.java index 171a6264..d6efa073 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/BackupRequestInterceptor.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/BackupRequestInterceptor.java @@ -32,7 +32,6 @@ public void interceptAfter( } private void backupCall(ClientRequestRound clientRequestRound, TableHandler tableHandler) { - if (!isOpen || !clientRequestRound.getOperator().enableBackupRequest) { return; } diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/TableInterceptor.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/TableInterceptor.java index c2e290e8..fb35c45b 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/TableInterceptor.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/TableInterceptor.java @@ -7,7 +7,7 @@ public interface TableInterceptor { // The behavior before sending the RPC to a table. void interceptBefore(ClientRequestRound clientRequestRound, TableHandler tableHandler); - // The behavior after getting reply or failure of the RPC. + // The behavior after getting reply or failure of the RPC. void interceptAfter( ClientRequestRound clientRequestRound, error_types errno, TableHandler tableHandler); } From 49ac5fd3aef132f25a449020213e756431a99f60 Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Mon, 31 Aug 2020 15:16:08 +0800 Subject: [PATCH 08/32] move and after --- .../java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java index 00cc8c5f..d20f2074 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java @@ -257,9 +257,8 @@ public void onRpcReply(ClientRequestRound round, long cachedConfigVersion, Strin } } - interceptorManger.interceptAfter(round, round.getOperator().rpc_error.errno, this); - client_operator operator = round.getOperator(); + interceptorManger.interceptAfter(round, operator.rpc_error.errno, this); boolean needQueryMeta = false; switch (operator.rpc_error.errno) { case ERR_OK: From 44eb64a91b9ea2e4eaa42eb34ca1440b3f346950 Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Tue, 1 Sep 2020 13:56:23 +0800 Subject: [PATCH 09/32] fix --- .../pegasus/rpc/async/ClientRequestRound.java | 16 +++++- .../infra/pegasus/rpc/async/TableHandler.java | 26 ++++++++-- .../interceptor/BackupRequestInterceptor.java | 50 +++++++++++-------- 3 files changed, 63 insertions(+), 29 deletions(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ClientRequestRound.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ClientRequestRound.java index 9b2003f9..7765a3d3 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ClientRequestRound.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ClientRequestRound.java @@ -15,14 +15,14 @@ public final class ClientRequestRound { client_operator operator; Table.ClientOPCallback callback; - public long timeoutMs; + long timeoutMs; boolean enableCounter; long createNanoTime; long expireNanoTime; boolean isCompleted; int tryId; - public ScheduledFuture backupRequestTask; + ScheduledFuture backupRequestTask; /** * Constructor. @@ -57,6 +57,18 @@ public ClientRequestRound( this.expireNanoTime = expireNanoTime; } + public long timeoutMs() { + return timeoutMs; + } + + public ScheduledFuture backupRequestTask() { + return backupRequestTask; + } + + public void backupRequestTask(ScheduledFuture task) { + backupRequestTask = task; + } + public com.xiaomi.infra.pegasus.operator.client_operator getOperator() { return operator; } diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java index d20f2074..fc95dc17 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java @@ -40,18 +40,26 @@ public static final class ReplicaConfiguration { } public static final class TableConfiguration { - public ArrayList replicas; - public long updateVersion; + ArrayList replicas; + long updateVersion; + + public ArrayList replicas() { + return replicas; + } + + public long updateVersion() { + return updateVersion; + } } private static final Logger logger = org.slf4j.LoggerFactory.getLogger(TableHandler.class); ClusterManager manager_; - public EventExecutor executor_; // should be only one thread in this service + EventExecutor executor_; // should be only one thread in this service - public AtomicReference tableConfig_; + AtomicReference tableConfig_; AtomicBoolean inQuerying_; long lastQueryTime_; - public int backupRequestDelayMs; + int backupRequestDelayMs; private InterceptorManger interceptorManger; public TableHandler(ClusterManager mgr, String name, TableOptions options) @@ -433,6 +441,14 @@ public void onCompletion(client_operator op) throws Throwable { } } + public TableConfiguration tableConfiguration() { + return tableConfig_.get(); + } + + public int backupRequestDelayMs() { + return backupRequestDelayMs; + } + @Override public EventExecutor getExecutor() { return executor_; diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/BackupRequestInterceptor.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/BackupRequestInterceptor.java index d6efa073..dd714663 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/BackupRequestInterceptor.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/BackupRequestInterceptor.java @@ -7,6 +7,7 @@ import com.xiaomi.infra.pegasus.rpc.async.TableHandler.ReplicaConfiguration; import com.xiaomi.infra.pegasus.rpc.async.TableHandler.TableConfiguration; import java.util.Random; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; public class BackupRequestInterceptor implements TableInterceptor { @@ -26,8 +27,9 @@ public void interceptBefore(ClientRequestRound clientRequestRound, TableHandler public void interceptAfter( ClientRequestRound clientRequestRound, error_types errno, TableHandler tableHandler) { // cancel the backup request task - if (clientRequestRound.backupRequestTask != null) { - clientRequestRound.backupRequestTask.cancel(true); + ScheduledFuture backupRequestTask = clientRequestRound.backupRequestTask(); + if (backupRequestTask != null) { + backupRequestTask.cancel(true); } } @@ -36,26 +38,30 @@ private void backupCall(ClientRequestRound clientRequestRound, TableHandler tabl return; } - final TableConfiguration tableConfig = tableHandler.tableConfig_.get(); + final TableConfiguration tableConfig = tableHandler.tableConfiguration(); final ReplicaConfiguration handle = - tableConfig.replicas.get(clientRequestRound.getOperator().get_gpid().get_pidx()); - - clientRequestRound.backupRequestTask = - tableHandler.executor_.schedule( - () -> { - // pick a secondary at random - ReplicaSession secondarySession = - handle.secondarySessions.get( - new Random().nextInt(handle.secondarySessions.size())); - secondarySession.asyncSend( - clientRequestRound.getOperator(), - () -> - tableHandler.onRpcReply( - clientRequestRound, tableConfig.updateVersion, secondarySession.name()), - clientRequestRound.timeoutMs, - true); - }, - tableHandler.backupRequestDelayMs, - TimeUnit.MILLISECONDS); + tableConfig.replicas().get(clientRequestRound.getOperator().get_gpid().get_pidx()); + + clientRequestRound.backupRequestTask( + tableHandler + .getExecutor() + .schedule( + () -> { + // pick a secondary at random + ReplicaSession secondarySession = + handle.secondarySessions.get( + new Random().nextInt(handle.secondarySessions.size())); + secondarySession.asyncSend( + clientRequestRound.getOperator(), + () -> + tableHandler.onRpcReply( + clientRequestRound, + tableConfig.updateVersion(), + secondarySession.name()), + clientRequestRound.timeoutMs(), + true); + }, + tableHandler.backupRequestDelayMs(), + TimeUnit.MILLISECONDS)); } } From e7c98c83f2f5e36f4331733771ec05445fda1947 Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Tue, 1 Sep 2020 14:01:02 +0800 Subject: [PATCH 10/32] fix --- .../infra/pegasus/rpc/async/TableHandler.java | 18 +++++------------- .../interceptor/BackupRequestInterceptor.java | 6 ++---- 2 files changed, 7 insertions(+), 17 deletions(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java index fc95dc17..9f1c5a4c 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java @@ -39,17 +39,9 @@ public static final class ReplicaConfiguration { public List secondarySessions = new ArrayList<>(); } - public static final class TableConfiguration { + static final class TableConfiguration { ArrayList replicas; long updateVersion; - - public ArrayList replicas() { - return replicas; - } - - public long updateVersion() { - return updateVersion; - } } private static final Logger logger = org.slf4j.LoggerFactory.getLogger(TableHandler.class); @@ -441,14 +433,14 @@ public void onCompletion(client_operator op) throws Throwable { } } - public TableConfiguration tableConfiguration() { - return tableConfig_.get(); - } - public int backupRequestDelayMs() { return backupRequestDelayMs; } + public long updateVersion() { + return tableConfig_.get().updateVersion; + } + @Override public EventExecutor getExecutor() { return executor_; diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/BackupRequestInterceptor.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/BackupRequestInterceptor.java index dd714663..2f1bed38 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/BackupRequestInterceptor.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/BackupRequestInterceptor.java @@ -5,7 +5,6 @@ import com.xiaomi.infra.pegasus.rpc.async.ReplicaSession; import com.xiaomi.infra.pegasus.rpc.async.TableHandler; import com.xiaomi.infra.pegasus.rpc.async.TableHandler.ReplicaConfiguration; -import com.xiaomi.infra.pegasus.rpc.async.TableHandler.TableConfiguration; import java.util.Random; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -38,9 +37,8 @@ private void backupCall(ClientRequestRound clientRequestRound, TableHandler tabl return; } - final TableConfiguration tableConfig = tableHandler.tableConfiguration(); final ReplicaConfiguration handle = - tableConfig.replicas().get(clientRequestRound.getOperator().get_gpid().get_pidx()); + tableHandler.getReplicaConfig(clientRequestRound.getOperator().get_gpid().get_pidx()); clientRequestRound.backupRequestTask( tableHandler @@ -56,7 +54,7 @@ private void backupCall(ClientRequestRound clientRequestRound, TableHandler tabl () -> tableHandler.onRpcReply( clientRequestRound, - tableConfig.updateVersion(), + tableHandler.updateVersion(), secondarySession.name()), clientRequestRound.timeoutMs(), true); From 151dd6aeb85be4b8220dea53a38f6571888e5bd2 Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Tue, 1 Sep 2020 14:02:21 +0800 Subject: [PATCH 11/32] fix --- .../java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java index 9f1c5a4c..23eb7437 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java @@ -111,7 +111,6 @@ public TableHandler(ClusterManager mgr, String name, TableOptions options) lastQueryTime_ = 0; this.interceptorManger = new InterceptorManger(); - interceptorManger.add(new BackupRequestInterceptor(options.enableBackupRequest())); } From 181ed6fd4f4ca008f46f762f0b3b99649c778585 Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Wed, 2 Sep 2020 10:34:00 +0800 Subject: [PATCH 12/32] merge --- .../xiaomi/infra/pegasus/client/PegasusTableInterface.java | 2 +- .../com/xiaomi/infra/pegasus/rpc/async/MetaSession.java | 6 +++--- .../com/xiaomi/infra/pegasus/rpc/async/TableHandler.java | 1 - .../infra/pegasus/rpc/interceptor/InterceptorManger.java | 4 ++-- 4 files changed, 6 insertions(+), 7 deletions(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTableInterface.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTableInterface.java index e78ea06a..bc51ea7d 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTableInterface.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTableInterface.java @@ -12,7 +12,7 @@ * This class provides sync and async interfaces to access data of a specified table. All the async * interfaces use Future mode. Notice that it's {@link io.netty.util.concurrent.Future}, but not * {@link java.util.concurrent.Future}. You can wait the future to complete in a synchronous manner, - * or regester completion callback in an asynchronous way. + * or add completion callback in an asynchronous way. * *

A synchronous example: * diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/MetaSession.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/MetaSession.java index 87c4679a..2fb7b881 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/MetaSession.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/MetaSession.java @@ -38,7 +38,7 @@ public MetaSession( for (String addr : addrList) { rpc_address rpcAddr = new rpc_address(); if (rpcAddr.fromString(addr)) { - logger.info("regester {} as meta server", addr); + logger.info("add {} as meta server", addr); metaList.add(clusterManager.getReplicaSession(rpcAddr)); } else { logger.error("invalid address {}", addr); @@ -181,7 +181,7 @@ void onFinishQueryMeta(final MetaRequestRound round) { } } if (!found) { - logger.info("regester forward address {} as meta server", forwardAddress); + logger.info("add forward address {} as meta server", forwardAddress); metaList.add(clusterManager.getReplicaSession(forwardAddress)); curLeader = metaList.size() - 1; } @@ -278,7 +278,7 @@ void resolveHost(String hostPort) throws IllegalArgumentException { added.removeAll(oldSet); for (rpc_address addr : added) { metaList.add(clusterManager.getReplicaSession(addr)); - logger.info("regester {} as meta server", addr); + logger.info("add {} as meta server", addr); } } diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java index 9928d62c..d93eeceb 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java @@ -16,7 +16,6 @@ import com.xiaomi.infra.pegasus.rpc.ReplicationException; import com.xiaomi.infra.pegasus.rpc.Table; import com.xiaomi.infra.pegasus.rpc.TableOptions; -import com.xiaomi.infra.pegasus.rpc.interceptor.BackupRequestInterceptor; import com.xiaomi.infra.pegasus.rpc.interceptor.InterceptorManger; import io.netty.channel.ChannelFuture; import io.netty.util.concurrent.EventExecutor; diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/InterceptorManger.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/InterceptorManger.java index 78abf2b5..347467b1 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/InterceptorManger.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/InterceptorManger.java @@ -12,10 +12,10 @@ public class InterceptorManger { private List interceptors = new ArrayList<>(); public InterceptorManger(TableOptions options) { - regester(new BackupRequestInterceptor(options.enableBackupRequest())); + register(new BackupRequestInterceptor(options.enableBackupRequest())); } - public InterceptorManger regester(TableInterceptor interceptor) { + public InterceptorManger register(TableInterceptor interceptor) { interceptors.add(interceptor); return this; } From d8f1daad8ba75b65d9b545fbba2c5207b79cf9f0 Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Wed, 2 Sep 2020 10:53:28 +0800 Subject: [PATCH 13/32] merge --- .../xiaomi/infra/pegasus/rpc/async/TableHandler.java | 4 ++-- .../rpc/interceptor/BackupRequestInterceptor.java | 4 ++-- .../pegasus/rpc/interceptor/InterceptorManger.java | 10 +++++----- .../pegasus/rpc/interceptor/TableInterceptor.java | 5 ++--- 4 files changed, 11 insertions(+), 12 deletions(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java index d93eeceb..b2bff7a0 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java @@ -255,7 +255,7 @@ public void onRpcReply(ClientRequestRound round, long cachedConfigVersion, Strin } client_operator operator = round.getOperator(); - interceptorManger.interceptAfter(round, operator.rpc_error.errno, this); + interceptorManger.after(round, operator.rpc_error.errno, this); boolean needQueryMeta = false; switch (operator.rpc_error.errno) { case ERR_OK: @@ -363,7 +363,7 @@ void call(final ClientRequestRound round) { tableConfig.replicas.get(round.getOperator().get_gpid().get_pidx()); if (handle.primarySession != null) { - interceptorManger.interceptBefore(round, this); + interceptorManger.before(round, this); // send request to primary handle.primarySession.asyncSend( round.getOperator(), diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/BackupRequestInterceptor.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/BackupRequestInterceptor.java index 642cbcf6..272d5a5c 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/BackupRequestInterceptor.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/BackupRequestInterceptor.java @@ -18,12 +18,12 @@ public BackupRequestInterceptor(boolean isOpen) { } @Override - public void interceptBefore(ClientRequestRound clientRequestRound, TableHandler tableHandler) { + public void before(ClientRequestRound clientRequestRound, TableHandler tableHandler) { backupCall(clientRequestRound, tableHandler); } @Override - public void interceptAfter( + public void after( ClientRequestRound clientRequestRound, error_types errno, TableHandler tableHandler) { // cancel the backup request task ScheduledFuture backupRequestTask = clientRequestRound.backupRequestTask(); diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/InterceptorManger.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/InterceptorManger.java index 347467b1..8535330f 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/InterceptorManger.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/InterceptorManger.java @@ -15,21 +15,21 @@ public InterceptorManger(TableOptions options) { register(new BackupRequestInterceptor(options.enableBackupRequest())); } - public InterceptorManger register(TableInterceptor interceptor) { + private InterceptorManger register(TableInterceptor interceptor) { interceptors.add(interceptor); return this; } - public void interceptBefore(ClientRequestRound clientRequestRound, TableHandler tableHandler) { + public void before(ClientRequestRound clientRequestRound, TableHandler tableHandler) { for (TableInterceptor interceptor : interceptors) { - interceptor.interceptBefore(clientRequestRound, tableHandler); + interceptor.before(clientRequestRound, tableHandler); } } - public void interceptAfter( + public void after( ClientRequestRound clientRequestRound, error_types errno, TableHandler tableHandler) { for (TableInterceptor interceptor : interceptors) { - interceptor.interceptAfter(clientRequestRound, errno, tableHandler); + interceptor.after(clientRequestRound, errno, tableHandler); } } } diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/TableInterceptor.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/TableInterceptor.java index fb35c45b..4f78b11f 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/TableInterceptor.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/TableInterceptor.java @@ -6,8 +6,7 @@ public interface TableInterceptor { // The behavior before sending the RPC to a table. - void interceptBefore(ClientRequestRound clientRequestRound, TableHandler tableHandler); + void before(ClientRequestRound clientRequestRound, TableHandler tableHandler); // The behavior after getting reply or failure of the RPC. - void interceptAfter( - ClientRequestRound clientRequestRound, error_types errno, TableHandler tableHandler); + void after(ClientRequestRound clientRequestRound, error_types errno, TableHandler tableHandler); } From 30e1567b8c5999b7be7582018499932260459fbd Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Wed, 2 Sep 2020 14:20:53 +0800 Subject: [PATCH 14/32] fix --- .../infra/pegasus/client/PegasusClient.java | 14 +- .../client/PegasusClientInterface.java | 5 +- .../infra/pegasus/rpc/TableOptions.java | 10 +- .../interceptor/BackupRequestInterceptor.java | 8 +- .../rpc/interceptor/CompressInterceptor.java | 129 ++++++++---------- .../rpc/interceptor/InterceptorManger.java | 10 +- 6 files changed, 77 insertions(+), 99 deletions(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java index 73c56b7e..2738339c 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java @@ -43,17 +43,19 @@ public long hash(byte[] key) { } private PegasusTable getTable(String tableName) throws PException { - return getTable(tableName, 0); + return getTable(tableName, 0, false); } - private PegasusTable getTable(String tableName, int backupRequestDelayMs) throws PException { + private PegasusTable getTable( + String tableName, int backupRequestDelayMs, boolean enableCompression) throws PException { PegasusTable table = tableMap.get(tableName); if (table == null) { synchronized (tableMapLock) { table = tableMap.get(tableName); if (table == null) { try { - TableOptions options = new TableOptions(new PegasusHasher(), backupRequestDelayMs); + TableOptions options = + new TableOptions(new PegasusHasher(), backupRequestDelayMs, enableCompression); Table internalTable = cluster.openTable(tableName, options); table = new PegasusTable(this, internalTable); } catch (Throwable e) { @@ -189,9 +191,9 @@ public PegasusTableInterface openTable(String tableName) throws PException { } @Override - public PegasusTableInterface openTable(String tableName, int backupRequestDelayMs) - throws PException { - return getTable(tableName, backupRequestDelayMs); + public PegasusTableInterface openTable( + String tableName, int backupRequestDelayMs, boolean enableCompression) throws PException { + return getTable(tableName, backupRequestDelayMs, enableCompression); } @Override diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java index f28f9a26..b3afa0ac 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java @@ -58,11 +58,12 @@ public interface PegasusClientInterface { * administrator * @param backupRequestDelayMs the delay time to send backup request. If backupRequestDelayMs <= * 0, The backup request is disabled. + * @param enableCompression whether to enable the table data open auto-compress * @return the table handler * @throws PException throws exception if any error occurs. */ - public PegasusTableInterface openTable(String tableName, int backupRequestDelayMs) - throws PException; + public PegasusTableInterface openTable( + String tableName, int backupRequestDelayMs, boolean enableCompression) throws PException; /** * Check value exist by key from the cluster diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/TableOptions.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/TableOptions.java index dc442aaf..a9c0f419 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/TableOptions.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/TableOptions.java @@ -7,7 +7,7 @@ public class TableOptions { private final KeyHasher keyHasher; private final int backupRequestDelayMs; - private final boolean enableCompress; + private final boolean enableCompression; public KeyHasher keyHasher() { return this.keyHasher; @@ -21,17 +21,17 @@ public static TableOptions forTest() { return new TableOptions(KeyHasher.DEFAULT, 0, false); } - public TableOptions(KeyHasher h, int backupRequestDelay, boolean enableCompress) { + public TableOptions(KeyHasher h, int backupRequestDelay, boolean enableCompression) { this.keyHasher = h; this.backupRequestDelayMs = backupRequestDelay; - this.enableCompress = enableCompress; + this.enableCompression = enableCompression; } public boolean enableBackupRequest() { return backupRequestDelayMs > 0; } - public boolean enableCompress() { - return enableCompress; + public boolean enableCompression() { + return enableCompression; } } diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/BackupRequestInterceptor.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/BackupRequestInterceptor.java index 272d5a5c..8ed2d501 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/BackupRequestInterceptor.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/BackupRequestInterceptor.java @@ -11,12 +11,6 @@ public class BackupRequestInterceptor implements TableInterceptor { - private boolean isOpen; - - public BackupRequestInterceptor(boolean isOpen) { - this.isOpen = isOpen; - } - @Override public void before(ClientRequestRound clientRequestRound, TableHandler tableHandler) { backupCall(clientRequestRound, tableHandler); @@ -33,7 +27,7 @@ public void after( } private void backupCall(ClientRequestRound clientRequestRound, TableHandler tableHandler) { - if (!isOpen || !clientRequestRound.getOperator().supportBackupRequest()) { + if (!clientRequestRound.getOperator().supportBackupRequest()) { return; } diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/CompressInterceptor.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/CompressInterceptor.java index 23117981..6cb1eae8 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/CompressInterceptor.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/CompressInterceptor.java @@ -4,6 +4,7 @@ import com.xiaomi.infra.pegasus.apps.mutate; import com.xiaomi.infra.pegasus.base.error_code.error_types; import com.xiaomi.infra.pegasus.client.PException; +import com.xiaomi.infra.pegasus.operator.client_operator; import com.xiaomi.infra.pegasus.operator.rrdb_check_and_mutate_operator; import com.xiaomi.infra.pegasus.operator.rrdb_check_and_set_operator; import com.xiaomi.infra.pegasus.operator.rrdb_get_operator; @@ -17,18 +18,10 @@ import java.util.List; public class CompressInterceptor implements TableInterceptor { - private boolean isOpen; - - public CompressInterceptor(boolean isOpen) { - this.isOpen = isOpen; - } @Override public void before(ClientRequestRound clientRequestRound, TableHandler tableHandler) throws PException { - if (!isOpen) { - return; - } tryCompress(clientRequestRound); } @@ -36,83 +29,69 @@ public void before(ClientRequestRound clientRequestRound, TableHandler tableHand public void after( ClientRequestRound clientRequestRound, error_types errno, TableHandler tableHandler) throws PException { - if (errno != error_types.ERR_OK || !isOpen) { + if (errno != error_types.ERR_OK) { return; } tryDecompress(clientRequestRound); } private void tryCompress(ClientRequestRound clientRequestRound) throws PException { - String operatorName = clientRequestRound.getOperator().name(); - switch (operatorName) { - case "put": - { - rrdb_put_operator operator = (rrdb_put_operator) clientRequestRound.getOperator(); - operator.get_request().value.data = - ZstdWrapper.compress(operator.get_request().value.data); - } - case "multi_put": - { - rrdb_multi_put_operator operator = - (rrdb_multi_put_operator) clientRequestRound.getOperator(); - List kvs = operator.get_request().kvs; - for (key_value kv : kvs) { - kv.value.data = ZstdWrapper.compress(kv.value.data); - } - break; - } - case "check_and_set": - { - rrdb_check_and_set_operator operator = - (rrdb_check_and_set_operator) clientRequestRound.getOperator(); - operator.get_request().set_value.data = - ZstdWrapper.compress(operator.get_request().set_value.data); - break; - } - case "check_and_mutate": - { - rrdb_check_and_mutate_operator operator = - (rrdb_check_and_mutate_operator) clientRequestRound.getOperator(); - List mutates = operator.get_request().mutate_list; - for (mutate mu : mutates) { - mu.value.data = ZstdWrapper.compress(mu.value.data); - } - } - default: - throw new PException("unsupported operator = " + operatorName); + client_operator operator = clientRequestRound.getOperator(); + if (operator instanceof rrdb_put_operator) { + rrdb_put_operator put = (rrdb_put_operator) operator; + put.get_request().value.data = ZstdWrapper.compress(put.get_request().value.data); + return; + } + + if (operator instanceof rrdb_multi_put_operator) { + List kvs = ((rrdb_multi_put_operator) operator).get_request().kvs; + for (key_value kv : kvs) { + kv.value.data = ZstdWrapper.compress(kv.value.data); + } + return; + } + + if (operator instanceof rrdb_check_and_set_operator) { + rrdb_check_and_set_operator check_and_set = (rrdb_check_and_set_operator) operator; + check_and_set.get_request().set_value.data = + ZstdWrapper.compress(check_and_set.get_request().set_value.data); + return; } + + if (operator instanceof rrdb_check_and_mutate_operator) { + List mutates = ((rrdb_check_and_mutate_operator) operator).get_request().mutate_list; + for (mutate mu : mutates) { + mu.value.data = ZstdWrapper.compress(mu.value.data); + } + return; + } + + throw new PException("unsupported operator = " + operator.name()); } private void tryDecompress(ClientRequestRound clientRequestRound) throws PException { - String operatorName = clientRequestRound.getOperator().name(); - switch (operatorName) { - case "get": - { - ZstdWrapper.tryDecompress( - ((rrdb_get_operator) clientRequestRound.getOperator()).get_response().value.data); - break; - } - case "multi_get": - { - rrdb_multi_get_operator operator = - (rrdb_multi_get_operator) clientRequestRound.getOperator(); - List kvs = operator.get_response().kvs; - for (key_value kv : kvs) { - kv.value.data = ZstdWrapper.tryDecompress(kv.value.data); - } - break; - } - case "scan": - { - rrdb_scan_operator operator = (rrdb_scan_operator) clientRequestRound.getOperator(); - List kvs = operator.get_response().kvs; - for (key_value kv : kvs) { - kv.value.data = ZstdWrapper.tryDecompress(kv.value.data); - } - break; - } - default: - throw new PException("unsupported operator = " + operatorName); + client_operator operator = clientRequestRound.getOperator(); + + if (operator instanceof rrdb_get_operator) { + ZstdWrapper.tryDecompress(((rrdb_get_operator) operator).get_response().value.data); + return; + } + + if (operator instanceof rrdb_multi_get_operator) { + List kvs = ((rrdb_multi_get_operator) operator).get_response().kvs; + for (key_value kv : kvs) { + kv.value.data = ZstdWrapper.tryDecompress(kv.value.data); + } + return; + } + + if (operator instanceof rrdb_scan_operator) { + List kvs = ((rrdb_scan_operator) operator).get_response().kvs; + for (key_value kv : kvs) { + kv.value.data = ZstdWrapper.tryDecompress(kv.value.data); + } + return; } + throw new PException("unsupported operator = " + operator.name()); } } diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/InterceptorManger.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/InterceptorManger.java index cdc61e0f..3f0ab2e8 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/InterceptorManger.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/InterceptorManger.java @@ -15,12 +15,14 @@ public class InterceptorManger { private List interceptors = new ArrayList<>(); public InterceptorManger(TableOptions options) { - this.register(new BackupRequestInterceptor(options.enableBackupRequest())) - .register(new CompressInterceptor(options.enableCompress())); + this.register(new BackupRequestInterceptor(), options.enableBackupRequest()) + .register(new CompressInterceptor(), options.enableCompression()); } - private InterceptorManger register(TableInterceptor interceptor) { - interceptors.add(interceptor); + private InterceptorManger register(TableInterceptor interceptor, boolean enable) { + if (enable) { + interceptors.add(interceptor); + } return this; } From 501a1557f8a022fa71b504c5982153834272fab1 Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Wed, 2 Sep 2020 15:05:55 +0800 Subject: [PATCH 15/32] fix --- .../infra/pegasus/operator/client_operator.java | 1 + .../rpc/interceptor/InterceptorManger.java | 15 +++++++++++++-- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/operator/client_operator.java b/src/main/java/com/xiaomi/infra/pegasus/operator/client_operator.java index ea928241..8388e4e3 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/operator/client_operator.java +++ b/src/main/java/com/xiaomi/infra/pegasus/operator/client_operator.java @@ -95,4 +95,5 @@ public abstract void send_data(org.apache.thrift.protocol.TProtocol oprot, int s public gpid pid; public String tableName; // only for metrics public error_code rpc_error; + public String error_message; } diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/InterceptorManger.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/InterceptorManger.java index 3f0ab2e8..53c6c0f5 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/InterceptorManger.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/InterceptorManger.java @@ -1,7 +1,10 @@ package com.xiaomi.infra.pegasus.rpc.interceptor; +import static com.xiaomi.infra.pegasus.base.error_code.error_types.ERR_INCOMPLETE_DATA; + import com.xiaomi.infra.pegasus.base.error_code.error_types; import com.xiaomi.infra.pegasus.client.PException; +import com.xiaomi.infra.pegasus.operator.client_operator; import com.xiaomi.infra.pegasus.rpc.TableOptions; import com.xiaomi.infra.pegasus.rpc.async.ClientRequestRound; import com.xiaomi.infra.pegasus.rpc.async.TableHandler; @@ -31,7 +34,11 @@ public void before(ClientRequestRound clientRequestRound, TableHandler tableHand try { interceptor.before(clientRequestRound, tableHandler); } catch (PException e) { - logger.warn("interceptor-before execute failed!", e); + client_operator operator = clientRequestRound.getOperator(); + operator.rpc_error.errno = ERR_INCOMPLETE_DATA; + operator.error_message = "interceptor-before execute failed! error=" + e.getMessage(); + logger.error("interceptor-after execute failed!", e); + clientRequestRound.thisRoundCompletion(); } } } @@ -42,7 +49,11 @@ public void after( try { interceptor.after(clientRequestRound, errno, tableHandler); } catch (PException e) { - logger.warn("interceptor-after execute failed!", e); + client_operator operator = clientRequestRound.getOperator(); + operator.rpc_error.errno = ERR_INCOMPLETE_DATA; + operator.error_message = "interceptor-after execute failed! error=" + e.getMessage(); + logger.error("interceptor-after execute failed!", e); + clientRequestRound.thisRoundCompletion(); } } } From 5b4dcc039a19a54be557ce9ffe54ae3b94e46abf Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Wed, 2 Sep 2020 15:06:53 +0800 Subject: [PATCH 16/32] fix --- .../{CompressInterceptor.java => CompressionInterceptor.java} | 2 +- .../xiaomi/infra/pegasus/rpc/interceptor/InterceptorManger.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) rename src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/{CompressInterceptor.java => CompressionInterceptor.java} (98%) diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/CompressInterceptor.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/CompressionInterceptor.java similarity index 98% rename from src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/CompressInterceptor.java rename to src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/CompressionInterceptor.java index 6cb1eae8..d341157a 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/CompressInterceptor.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/CompressionInterceptor.java @@ -17,7 +17,7 @@ import com.xiaomi.infra.pegasus.tools.ZstdWrapper; import java.util.List; -public class CompressInterceptor implements TableInterceptor { +public class CompressionInterceptor implements TableInterceptor { @Override public void before(ClientRequestRound clientRequestRound, TableHandler tableHandler) diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/InterceptorManger.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/InterceptorManger.java index 53c6c0f5..27514294 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/InterceptorManger.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/InterceptorManger.java @@ -19,7 +19,7 @@ public class InterceptorManger { public InterceptorManger(TableOptions options) { this.register(new BackupRequestInterceptor(), options.enableBackupRequest()) - .register(new CompressInterceptor(), options.enableCompression()); + .register(new CompressionInterceptor(), options.enableCompression()); } private InterceptorManger register(TableInterceptor interceptor, boolean enable) { From 08fd240612be4ec9a1c89a3209095b8deda9c8d4 Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Wed, 2 Sep 2020 15:17:14 +0800 Subject: [PATCH 17/32] fix --- .../interceptor/CompressionInterceptor.java | 16 ++++--------- .../rpc/interceptor/InterceptorManger.java | 24 ++----------------- .../rpc/interceptor/TableInterceptor.java | 6 ++--- 3 files changed, 8 insertions(+), 38 deletions(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/CompressionInterceptor.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/CompressionInterceptor.java index d341157a..18bf16ac 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/CompressionInterceptor.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/CompressionInterceptor.java @@ -3,7 +3,6 @@ import com.xiaomi.infra.pegasus.apps.key_value; import com.xiaomi.infra.pegasus.apps.mutate; import com.xiaomi.infra.pegasus.base.error_code.error_types; -import com.xiaomi.infra.pegasus.client.PException; import com.xiaomi.infra.pegasus.operator.client_operator; import com.xiaomi.infra.pegasus.operator.rrdb_check_and_mutate_operator; import com.xiaomi.infra.pegasus.operator.rrdb_check_and_set_operator; @@ -20,22 +19,20 @@ public class CompressionInterceptor implements TableInterceptor { @Override - public void before(ClientRequestRound clientRequestRound, TableHandler tableHandler) - throws PException { + public void before(ClientRequestRound clientRequestRound, TableHandler tableHandler) { tryCompress(clientRequestRound); } @Override public void after( - ClientRequestRound clientRequestRound, error_types errno, TableHandler tableHandler) - throws PException { + ClientRequestRound clientRequestRound, error_types errno, TableHandler tableHandler) { if (errno != error_types.ERR_OK) { return; } tryDecompress(clientRequestRound); } - private void tryCompress(ClientRequestRound clientRequestRound) throws PException { + private void tryCompress(ClientRequestRound clientRequestRound) { client_operator operator = clientRequestRound.getOperator(); if (operator instanceof rrdb_put_operator) { rrdb_put_operator put = (rrdb_put_operator) operator; @@ -63,13 +60,10 @@ private void tryCompress(ClientRequestRound clientRequestRound) throws PExceptio for (mutate mu : mutates) { mu.value.data = ZstdWrapper.compress(mu.value.data); } - return; } - - throw new PException("unsupported operator = " + operator.name()); } - private void tryDecompress(ClientRequestRound clientRequestRound) throws PException { + private void tryDecompress(ClientRequestRound clientRequestRound) { client_operator operator = clientRequestRound.getOperator(); if (operator instanceof rrdb_get_operator) { @@ -90,8 +84,6 @@ private void tryDecompress(ClientRequestRound clientRequestRound) throws PExcept for (key_value kv : kvs) { kv.value.data = ZstdWrapper.tryDecompress(kv.value.data); } - return; } - throw new PException("unsupported operator = " + operator.name()); } } diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/InterceptorManger.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/InterceptorManger.java index 27514294..514ef864 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/InterceptorManger.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/InterceptorManger.java @@ -1,10 +1,6 @@ package com.xiaomi.infra.pegasus.rpc.interceptor; -import static com.xiaomi.infra.pegasus.base.error_code.error_types.ERR_INCOMPLETE_DATA; - import com.xiaomi.infra.pegasus.base.error_code.error_types; -import com.xiaomi.infra.pegasus.client.PException; -import com.xiaomi.infra.pegasus.operator.client_operator; import com.xiaomi.infra.pegasus.rpc.TableOptions; import com.xiaomi.infra.pegasus.rpc.async.ClientRequestRound; import com.xiaomi.infra.pegasus.rpc.async.TableHandler; @@ -31,30 +27,14 @@ private InterceptorManger register(TableInterceptor interceptor, boolean enable) public void before(ClientRequestRound clientRequestRound, TableHandler tableHandler) { for (TableInterceptor interceptor : interceptors) { - try { - interceptor.before(clientRequestRound, tableHandler); - } catch (PException e) { - client_operator operator = clientRequestRound.getOperator(); - operator.rpc_error.errno = ERR_INCOMPLETE_DATA; - operator.error_message = "interceptor-before execute failed! error=" + e.getMessage(); - logger.error("interceptor-after execute failed!", e); - clientRequestRound.thisRoundCompletion(); - } + interceptor.before(clientRequestRound, tableHandler); } } public void after( ClientRequestRound clientRequestRound, error_types errno, TableHandler tableHandler) { for (TableInterceptor interceptor : interceptors) { - try { - interceptor.after(clientRequestRound, errno, tableHandler); - } catch (PException e) { - client_operator operator = clientRequestRound.getOperator(); - operator.rpc_error.errno = ERR_INCOMPLETE_DATA; - operator.error_message = "interceptor-after execute failed! error=" + e.getMessage(); - logger.error("interceptor-after execute failed!", e); - clientRequestRound.thisRoundCompletion(); - } + interceptor.after(clientRequestRound, errno, tableHandler); } } } diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/TableInterceptor.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/TableInterceptor.java index 0a346caa..4f78b11f 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/TableInterceptor.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/TableInterceptor.java @@ -1,14 +1,12 @@ package com.xiaomi.infra.pegasus.rpc.interceptor; import com.xiaomi.infra.pegasus.base.error_code.error_types; -import com.xiaomi.infra.pegasus.client.PException; import com.xiaomi.infra.pegasus.rpc.async.ClientRequestRound; import com.xiaomi.infra.pegasus.rpc.async.TableHandler; public interface TableInterceptor { // The behavior before sending the RPC to a table. - void before(ClientRequestRound clientRequestRound, TableHandler tableHandler) throws PException; + void before(ClientRequestRound clientRequestRound, TableHandler tableHandler); // The behavior after getting reply or failure of the RPC. - void after(ClientRequestRound clientRequestRound, error_types errno, TableHandler tableHandler) - throws PException; + void after(ClientRequestRound clientRequestRound, error_types errno, TableHandler tableHandler); } From 834888e208c2877e502171bfe4278e7732dd1760 Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Wed, 2 Sep 2020 15:17:52 +0800 Subject: [PATCH 18/32] fix --- .../java/com/xiaomi/infra/pegasus/operator/client_operator.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/operator/client_operator.java b/src/main/java/com/xiaomi/infra/pegasus/operator/client_operator.java index 8388e4e3..ea928241 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/operator/client_operator.java +++ b/src/main/java/com/xiaomi/infra/pegasus/operator/client_operator.java @@ -95,5 +95,4 @@ public abstract void send_data(org.apache.thrift.protocol.TProtocol oprot, int s public gpid pid; public String tableName; // only for metrics public error_code rpc_error; - public String error_message; } From 3ced11e9e0a191c79bfc3d38b3d364ecbb07522e Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Wed, 2 Sep 2020 16:01:32 +0800 Subject: [PATCH 19/32] add test --- .../interceptor/CompressionInterceptor.java | 3 +- .../pegasus/rpc/async/InterceptorTest.java | 37 +++++++++++++++++++ 2 files changed, 39 insertions(+), 1 deletion(-) create mode 100644 src/test/java/com/xiaomi/infra/pegasus/rpc/async/InterceptorTest.java diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/CompressionInterceptor.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/CompressionInterceptor.java index 18bf16ac..fb47bd0d 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/CompressionInterceptor.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/CompressionInterceptor.java @@ -67,7 +67,8 @@ private void tryDecompress(ClientRequestRound clientRequestRound) { client_operator operator = clientRequestRound.getOperator(); if (operator instanceof rrdb_get_operator) { - ZstdWrapper.tryDecompress(((rrdb_get_operator) operator).get_response().value.data); + rrdb_get_operator get = (rrdb_get_operator) operator; + get.get_response().value.data = ZstdWrapper.tryDecompress(get.get_response().value.data); return; } diff --git a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/InterceptorTest.java b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/InterceptorTest.java new file mode 100644 index 00000000..8ab32642 --- /dev/null +++ b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/InterceptorTest.java @@ -0,0 +1,37 @@ +package com.xiaomi.infra.pegasus.rpc.async; + +import com.xiaomi.infra.pegasus.client.ClientOptions; +import com.xiaomi.infra.pegasus.client.PException; +import com.xiaomi.infra.pegasus.client.PegasusClientFactory; +import com.xiaomi.infra.pegasus.client.PegasusTableInterface; +import org.junit.Test; +import org.junit.jupiter.api.Assertions; + +public class InterceptorTest { + @Test + public void testCompressionInterceptor() throws PException { + PegasusTableInterface commonTable = + PegasusClientFactory.createClient(ClientOptions.create()).openTable("temp", 0, false); + PegasusTableInterface compressTable = + PegasusClientFactory.createClient(ClientOptions.create()).openTable("temp", 0, true); + + byte[] hashKey = "hashKey".getBytes(); + byte[] sortKey = "sortKey".getBytes(); + byte[] commonValue = "commonValue".getBytes(); + byte[] compressionValue = "compressionValue".getBytes(); + + // if origin value was not compressed, both commonTable and compressTable can read origin value + commonTable.set(hashKey, sortKey, commonValue, 10000); + Assertions.assertEquals( + new String(commonTable.get(hashKey, sortKey, 10000)), new String(commonValue)); + Assertions.assertEquals( + new String(compressTable.get(hashKey, sortKey, 10000)), new String(commonValue)); + + // if origin value was compressed, only compressTable can read successfully + compressTable.set(hashKey, sortKey, compressionValue, 10000); + Assertions.assertNotEquals( + new String(commonTable.get(hashKey, sortKey, 10000)), new String(compressionValue)); + Assertions.assertEquals( + new String(compressTable.get(hashKey, sortKey, 10000)), new String(compressionValue)); + } +} From 148b584e5143a1a99fc19987dcf8dcffdf32e4a5 Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Wed, 2 Sep 2020 16:04:51 +0800 Subject: [PATCH 20/32] add test --- .../xiaomi/infra/pegasus/rpc/interceptor/InterceptorManger.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/InterceptorManger.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/InterceptorManger.java index 514ef864..b38bb241 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/InterceptorManger.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/InterceptorManger.java @@ -6,10 +6,8 @@ import com.xiaomi.infra.pegasus.rpc.async.TableHandler; import java.util.ArrayList; import java.util.List; -import org.slf4j.Logger; public class InterceptorManger { - private static final Logger logger = org.slf4j.LoggerFactory.getLogger(InterceptorManger.class); private List interceptors = new ArrayList<>(); From 4c0998e73d23548578440ef4357864e0550dda3c Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Wed, 2 Sep 2020 16:51:03 +0800 Subject: [PATCH 21/32] add interface --- .../infra/pegasus/client/PegasusClient.java | 23 +++++++------- .../client/PegasusClientInterface.java | 30 +++++++++++++++++-- .../infra/pegasus/rpc/TableOptions.java | 26 ++++++++++++---- .../pegasus/rpc/async/InterceptorTest.java | 6 ++-- 4 files changed, 64 insertions(+), 21 deletions(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java index 2738339c..97bea60b 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java @@ -29,7 +29,7 @@ public class PegasusClient implements PegasusClientInterface { private final Object tableMapLock; private Cluster cluster; - private static class PegasusHasher implements KeyHasher { + public static class PegasusHasher implements KeyHasher { @Override public long hash(byte[] key) { Validate.isTrue(key != null && key.length >= 2); @@ -43,20 +43,17 @@ public long hash(byte[] key) { } private PegasusTable getTable(String tableName) throws PException { - return getTable(tableName, 0, false); + return getTable(tableName, new TableOptions()); } - private PegasusTable getTable( - String tableName, int backupRequestDelayMs, boolean enableCompression) throws PException { + private PegasusTable getTable(String tableName, TableOptions tableOptions) throws PException { PegasusTable table = tableMap.get(tableName); if (table == null) { synchronized (tableMapLock) { table = tableMap.get(tableName); if (table == null) { try { - TableOptions options = - new TableOptions(new PegasusHasher(), backupRequestDelayMs, enableCompression); - Table internalTable = cluster.openTable(tableName, options); + Table internalTable = cluster.openTable(tableName, tableOptions); table = new PegasusTable(this, internalTable); } catch (Throwable e) { throw new PException(e); @@ -191,9 +188,15 @@ public PegasusTableInterface openTable(String tableName) throws PException { } @Override - public PegasusTableInterface openTable( - String tableName, int backupRequestDelayMs, boolean enableCompression) throws PException { - return getTable(tableName, backupRequestDelayMs, enableCompression); + public PegasusTableInterface openTable(String tableName, int backupRequestDelayMs) + throws PException { + return getTable(tableName, new TableOptions(backupRequestDelayMs, false)); + } + + @Override + public PegasusTableInterface openTable(String tableName, TableOptions tableOptions) + throws PException { + return getTable(tableName, tableOptions); } @Override diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java index b3afa0ac..62c86f56 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java @@ -3,6 +3,7 @@ // can be found in the LICENSE file in the root directory of this source tree. package com.xiaomi.infra.pegasus.client; +import com.xiaomi.infra.pegasus.rpc.TableOptions; import java.util.*; import org.apache.commons.lang3.tuple.Pair; @@ -58,12 +59,35 @@ public interface PegasusClientInterface { * administrator * @param backupRequestDelayMs the delay time to send backup request. If backupRequestDelayMs <= * 0, The backup request is disabled. - * @param enableCompression whether to enable the table data open auto-compress * @return the table handler * @throws PException throws exception if any error occurs. */ - public PegasusTableInterface openTable( - String tableName, int backupRequestDelayMs, boolean enableCompression) throws PException; + @Deprecated + public PegasusTableInterface openTable(String tableName, int backupRequestDelayMs) + throws PException; + + /** + * Open a table, and prepare the sessions and route-table to the replica-servers. + * + *

Please notice that pegasus support two kinds of API: 1. the client-interface way, which is + * provided in this class. 2. the table-interface way, which is provided by {@link + * PegasusTableInterface}. With the client-interface, you don't need to create + * PegasusTableInterface by openTable, so you can access the pegasus cluster conveniently. + * However, the client-interface's api also has some restrictions: 1. we don't provide async + * methods in client-interface. 2. the timeout in client-interface isn't as accurate as the + * table-interface. 3. the client-interface may throw an exception when open table fails. It means + * that you may need to handle this exception in every data access operation, which is annoying. + * 4. You can't specify a per-operation timeout. So we recommend you to use the table-interface. + * + * @param tableName the table should be exist on the server, which is created before by the system + * * administrator + * @param tableOptions control the table feature, such as open backup-request, compress and etc, + * see {@link TableOptions} + * @return + * @throws PException + */ + public PegasusTableInterface openTable(String tableName, TableOptions tableOptions) + throws PException; /** * Check value exist by key from the cluster diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/TableOptions.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/TableOptions.java index a9c0f419..824c418d 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/TableOptions.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/TableOptions.java @@ -3,12 +3,32 @@ // can be found in the LICENSE file in the root directory of this source tree. package com.xiaomi.infra.pegasus.rpc; +import com.xiaomi.infra.pegasus.client.PegasusClient.PegasusHasher; + /** TableOptions is the internal options for opening a Pegasus table. */ public class TableOptions { private final KeyHasher keyHasher; private final int backupRequestDelayMs; private final boolean enableCompression; + public TableOptions() { + this.keyHasher = new PegasusHasher(); + this.backupRequestDelayMs = 0; + this.enableCompression = false; + } + + public TableOptions(int backupRequestDelay, boolean enableCompression) { + this.keyHasher = new PegasusHasher(); + this.backupRequestDelayMs = backupRequestDelay; + this.enableCompression = enableCompression; + } + + public TableOptions(KeyHasher h, int backupRequestDelay, boolean enableCompression) { + this.keyHasher = h; + this.backupRequestDelayMs = backupRequestDelay; + this.enableCompression = enableCompression; + } + public KeyHasher keyHasher() { return this.keyHasher; } @@ -21,12 +41,6 @@ public static TableOptions forTest() { return new TableOptions(KeyHasher.DEFAULT, 0, false); } - public TableOptions(KeyHasher h, int backupRequestDelay, boolean enableCompression) { - this.keyHasher = h; - this.backupRequestDelayMs = backupRequestDelay; - this.enableCompression = enableCompression; - } - public boolean enableBackupRequest() { return backupRequestDelayMs > 0; } diff --git a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/InterceptorTest.java b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/InterceptorTest.java index 8ab32642..854ed4c1 100644 --- a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/InterceptorTest.java +++ b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/InterceptorTest.java @@ -4,6 +4,7 @@ import com.xiaomi.infra.pegasus.client.PException; import com.xiaomi.infra.pegasus.client.PegasusClientFactory; import com.xiaomi.infra.pegasus.client.PegasusTableInterface; +import com.xiaomi.infra.pegasus.rpc.TableOptions; import org.junit.Test; import org.junit.jupiter.api.Assertions; @@ -11,9 +12,10 @@ public class InterceptorTest { @Test public void testCompressionInterceptor() throws PException { PegasusTableInterface commonTable = - PegasusClientFactory.createClient(ClientOptions.create()).openTable("temp", 0, false); + PegasusClientFactory.createClient(ClientOptions.create()).openTable("temp"); PegasusTableInterface compressTable = - PegasusClientFactory.createClient(ClientOptions.create()).openTable("temp", 0, true); + PegasusClientFactory.createClient(ClientOptions.create()) + .openTable("temp", new TableOptions(0, true)); byte[] hashKey = "hashKey".getBytes(); byte[] sortKey = "sortKey".getBytes(); From 9850a3736cb2ca94ea37bc97b0f1d2deaced93b3 Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Wed, 2 Sep 2020 16:52:10 +0800 Subject: [PATCH 22/32] add interface --- src/main/java/com/xiaomi/infra/pegasus/tools/ZstdWrapper.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/tools/ZstdWrapper.java b/src/main/java/com/xiaomi/infra/pegasus/tools/ZstdWrapper.java index a5d19b67..004b24b6 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/tools/ZstdWrapper.java +++ b/src/main/java/com/xiaomi/infra/pegasus/tools/ZstdWrapper.java @@ -15,8 +15,8 @@ private ZstdWrapper() {} /** * try decompress the `src`, return `src` directly if failed * - * @param src - * @return + * @param src the origin sending value + * @return the decompressed value. */ public static byte[] tryDecompress(byte[] src) { byte[] decompressedValue; From 3aea6ca5ac57f69eb84d2c412762d6f7177c3799 Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Wed, 2 Sep 2020 17:00:17 +0800 Subject: [PATCH 23/32] add interface --- .../xiaomi/infra/pegasus/client/PegasusClientInterface.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java index 62c86f56..c2cada0e 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java @@ -45,6 +45,9 @@ public interface PegasusClientInterface { /** * Open a table, and prepare the sessions and route-table to the replica-servers. * + *

Note: this interface is deprecated, retaining it only for compatibility, please see {@link + * PegasusClientInterface#openTable(String, TableOptions)} + * *

Please notice that pegasus support two kinds of API: 1. the client-interface way, which is * provided in this class. 2. the table-interface way, which is provided by {@link * PegasusTableInterface}. With the client-interface, you don't need to create From ad8140bdf02bf941d117b37791e3a3f6a5590d18 Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Wed, 2 Sep 2020 17:49:26 +0800 Subject: [PATCH 24/32] add interface --- .../infra/pegasus/client/PegasusClient.java | 2 +- .../infra/pegasus/rpc/TableOptions.java | 31 ++++++++++--------- .../pegasus/rpc/async/InterceptorTest.java | 2 +- 3 files changed, 19 insertions(+), 16 deletions(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java index 97bea60b..8b5680eb 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java @@ -190,7 +190,7 @@ public PegasusTableInterface openTable(String tableName) throws PException { @Override public PegasusTableInterface openTable(String tableName, int backupRequestDelayMs) throws PException { - return getTable(tableName, new TableOptions(backupRequestDelayMs, false)); + return getTable(tableName, new TableOptions().withBackupRequestDelayMs(backupRequestDelayMs)); } @Override diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/TableOptions.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/TableOptions.java index 824c418d..da51d3cf 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/TableOptions.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/TableOptions.java @@ -7,9 +7,9 @@ /** TableOptions is the internal options for opening a Pegasus table. */ public class TableOptions { - private final KeyHasher keyHasher; - private final int backupRequestDelayMs; - private final boolean enableCompression; + private KeyHasher keyHasher; + private int backupRequestDelayMs; + private boolean enableCompression; public TableOptions() { this.keyHasher = new PegasusHasher(); @@ -17,16 +17,19 @@ public TableOptions() { this.enableCompression = false; } - public TableOptions(int backupRequestDelay, boolean enableCompression) { - this.keyHasher = new PegasusHasher(); - this.backupRequestDelayMs = backupRequestDelay; - this.enableCompression = enableCompression; + public TableOptions withKeyHasher(KeyHasher keyHasher) { + this.keyHasher = keyHasher; + return this; } - public TableOptions(KeyHasher h, int backupRequestDelay, boolean enableCompression) { - this.keyHasher = h; - this.backupRequestDelayMs = backupRequestDelay; + public TableOptions withBackupRequestDelayMs(int backupRequestDelayMs) { + this.backupRequestDelayMs = backupRequestDelayMs; + return this; + } + + public TableOptions withCompression(boolean enableCompression) { this.enableCompression = enableCompression; + return this; } public KeyHasher keyHasher() { @@ -37,10 +40,6 @@ public int backupRequestDelayMs() { return this.backupRequestDelayMs; } - public static TableOptions forTest() { - return new TableOptions(KeyHasher.DEFAULT, 0, false); - } - public boolean enableBackupRequest() { return backupRequestDelayMs > 0; } @@ -48,4 +47,8 @@ public boolean enableBackupRequest() { public boolean enableCompression() { return enableCompression; } + + public static TableOptions forTest() { + return new TableOptions().withKeyHasher(KeyHasher.DEFAULT).withBackupRequestDelayMs(0); + } } diff --git a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/InterceptorTest.java b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/InterceptorTest.java index 854ed4c1..276726e9 100644 --- a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/InterceptorTest.java +++ b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/InterceptorTest.java @@ -15,7 +15,7 @@ public void testCompressionInterceptor() throws PException { PegasusClientFactory.createClient(ClientOptions.create()).openTable("temp"); PegasusTableInterface compressTable = PegasusClientFactory.createClient(ClientOptions.create()) - .openTable("temp", new TableOptions(0, true)); + .openTable("temp", new TableOptions().withCompression(true)); byte[] hashKey = "hashKey".getBytes(); byte[] sortKey = "sortKey".getBytes(); From 779a80f5dda4c198b662c8326772522bccec6552 Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Wed, 2 Sep 2020 17:50:19 +0800 Subject: [PATCH 25/32] add interface --- src/main/java/com/xiaomi/infra/pegasus/rpc/TableOptions.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/TableOptions.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/TableOptions.java index da51d3cf..d7886185 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/TableOptions.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/TableOptions.java @@ -49,6 +49,6 @@ public boolean enableCompression() { } public static TableOptions forTest() { - return new TableOptions().withKeyHasher(KeyHasher.DEFAULT).withBackupRequestDelayMs(0); + return new TableOptions().withKeyHasher(KeyHasher.DEFAULT); } } From 7fea1c69158f79686db814d384c538b2c4a16cfd Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Thu, 3 Sep 2020 12:20:22 +0800 Subject: [PATCH 26/32] fix options --- .../infra/pegasus/client/PegasusClient.java | 15 +++++++---- .../client/PegasusClientInterface.java | 1 - .../pegasus/{rpc => client}/TableOptions.java | 19 +------------ .../com/xiaomi/infra/pegasus/rpc/Cluster.java | 2 +- .../pegasus/rpc/InternalTableOptions.java | 27 +++++++++++++++++++ .../pegasus/rpc/async/ClusterManager.java | 7 ++--- .../infra/pegasus/rpc/async/TableHandler.java | 10 +++---- .../rpc/interceptor/InterceptorManger.java | 2 +- .../infra/pegasus/client/TestPException.java | 6 ++--- .../pegasus/rpc/async/ClusterManagerTest.java | 8 +++--- .../pegasus/rpc/async/InterceptorTest.java | 2 +- .../pegasus/rpc/async/TableHandlerTest.java | 8 +++--- .../pegasus/rpc/async/TimeoutBenchmark.java | 4 +-- 13 files changed, 63 insertions(+), 48 deletions(-) rename src/main/java/com/xiaomi/infra/pegasus/{rpc => client}/TableOptions.java (68%) create mode 100644 src/main/java/com/xiaomi/infra/pegasus/rpc/InternalTableOptions.java diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java index 8b5680eb..7351c740 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java @@ -43,17 +43,18 @@ public long hash(byte[] key) { } private PegasusTable getTable(String tableName) throws PException { - return getTable(tableName, new TableOptions()); + return getTable(tableName, new InternalTableOptions()); } - private PegasusTable getTable(String tableName, TableOptions tableOptions) throws PException { + private PegasusTable getTable(String tableName, InternalTableOptions internalTableOptions) + throws PException { PegasusTable table = tableMap.get(tableName); if (table == null) { synchronized (tableMapLock) { table = tableMap.get(tableName); if (table == null) { try { - Table internalTable = cluster.openTable(tableName, tableOptions); + Table internalTable = cluster.openTable(tableName, internalTableOptions); table = new PegasusTable(this, internalTable); } catch (Throwable e) { throw new PException(e); @@ -190,13 +191,17 @@ public PegasusTableInterface openTable(String tableName) throws PException { @Override public PegasusTableInterface openTable(String tableName, int backupRequestDelayMs) throws PException { - return getTable(tableName, new TableOptions().withBackupRequestDelayMs(backupRequestDelayMs)); + return getTable( + tableName, + new InternalTableOptions( + new PegasusHasher(), + new TableOptions().withBackupRequestDelayMs(backupRequestDelayMs))); } @Override public PegasusTableInterface openTable(String tableName, TableOptions tableOptions) throws PException { - return getTable(tableName, tableOptions); + return getTable(tableName, new InternalTableOptions(new PegasusHasher(), tableOptions)); } @Override diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java index c2cada0e..b38145be 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java @@ -3,7 +3,6 @@ // can be found in the LICENSE file in the root directory of this source tree. package com.xiaomi.infra.pegasus.client; -import com.xiaomi.infra.pegasus.rpc.TableOptions; import java.util.*; import org.apache.commons.lang3.tuple.Pair; diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/TableOptions.java b/src/main/java/com/xiaomi/infra/pegasus/client/TableOptions.java similarity index 68% rename from src/main/java/com/xiaomi/infra/pegasus/rpc/TableOptions.java rename to src/main/java/com/xiaomi/infra/pegasus/client/TableOptions.java index d7886185..8fd59d17 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/TableOptions.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/TableOptions.java @@ -1,27 +1,18 @@ // Copyright (c) 2017, Xiaomi, Inc. All rights reserved. // This source code is licensed under the Apache License Version 2.0, which // can be found in the LICENSE file in the root directory of this source tree. -package com.xiaomi.infra.pegasus.rpc; - -import com.xiaomi.infra.pegasus.client.PegasusClient.PegasusHasher; +package com.xiaomi.infra.pegasus.client; /** TableOptions is the internal options for opening a Pegasus table. */ public class TableOptions { - private KeyHasher keyHasher; private int backupRequestDelayMs; private boolean enableCompression; public TableOptions() { - this.keyHasher = new PegasusHasher(); this.backupRequestDelayMs = 0; this.enableCompression = false; } - public TableOptions withKeyHasher(KeyHasher keyHasher) { - this.keyHasher = keyHasher; - return this; - } - public TableOptions withBackupRequestDelayMs(int backupRequestDelayMs) { this.backupRequestDelayMs = backupRequestDelayMs; return this; @@ -32,10 +23,6 @@ public TableOptions withCompression(boolean enableCompression) { return this; } - public KeyHasher keyHasher() { - return this.keyHasher; - } - public int backupRequestDelayMs() { return this.backupRequestDelayMs; } @@ -47,8 +34,4 @@ public boolean enableBackupRequest() { public boolean enableCompression() { return enableCompression; } - - public static TableOptions forTest() { - return new TableOptions().withKeyHasher(KeyHasher.DEFAULT); - } } diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/Cluster.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/Cluster.java index 0ec9e59b..719883f1 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/Cluster.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/Cluster.java @@ -17,7 +17,7 @@ public static Cluster createCluster(ClientOptions clientOptions) public abstract String[] getMetaList(); - public abstract Table openTable(String name, TableOptions options) + public abstract Table openTable(String name, InternalTableOptions options) throws ReplicationException, TException; public abstract void close(); diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/InternalTableOptions.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/InternalTableOptions.java new file mode 100644 index 00000000..8be0ba22 --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/InternalTableOptions.java @@ -0,0 +1,27 @@ +package com.xiaomi.infra.pegasus.rpc; + +import com.xiaomi.infra.pegasus.client.PegasusClient.PegasusHasher; +import com.xiaomi.infra.pegasus.client.TableOptions; + +public class InternalTableOptions { + private final KeyHasher keyHasher; + private final TableOptions tableOptions; + + public InternalTableOptions() { + this.keyHasher = new PegasusHasher(); + this.tableOptions = new TableOptions(); + } + + public InternalTableOptions(KeyHasher keyHasher, TableOptions tableOptions) { + this.keyHasher = keyHasher; + this.tableOptions = tableOptions; + } + + public KeyHasher keyHasher() { + return keyHasher; + } + + public TableOptions tableOptions() { + return tableOptions; + } +} diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManager.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManager.java index 79302b5d..3682d68b 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManager.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManager.java @@ -9,8 +9,8 @@ import com.xiaomi.infra.pegasus.client.ClientOptions; import com.xiaomi.infra.pegasus.metrics.MetricsManager; import com.xiaomi.infra.pegasus.rpc.Cluster; +import com.xiaomi.infra.pegasus.rpc.InternalTableOptions; import com.xiaomi.infra.pegasus.rpc.ReplicationException; -import com.xiaomi.infra.pegasus.rpc.TableOptions; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; @@ -126,8 +126,9 @@ public String[] getMetaList() { } @Override - public TableHandler openTable(String name, TableOptions options) throws ReplicationException { - return new TableHandler(this, name, options); + public TableHandler openTable(String name, InternalTableOptions internalTableOptions) + throws ReplicationException { + return new TableHandler(this, name, internalTableOptions); } @Override diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java index b2bff7a0..21a24a29 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java @@ -13,9 +13,9 @@ import com.xiaomi.infra.pegasus.replication.partition_configuration; import com.xiaomi.infra.pegasus.replication.query_cfg_request; import com.xiaomi.infra.pegasus.replication.query_cfg_response; +import com.xiaomi.infra.pegasus.rpc.InternalTableOptions; import com.xiaomi.infra.pegasus.rpc.ReplicationException; import com.xiaomi.infra.pegasus.rpc.Table; -import com.xiaomi.infra.pegasus.rpc.TableOptions; import com.xiaomi.infra.pegasus.rpc.interceptor.InterceptorManger; import io.netty.channel.ChannelFuture; import io.netty.util.concurrent.EventExecutor; @@ -53,7 +53,7 @@ static final class TableConfiguration { int backupRequestDelayMs; private InterceptorManger interceptorManger; - public TableHandler(ClusterManager mgr, String name, TableOptions options) + public TableHandler(ClusterManager mgr, String name, InternalTableOptions internalTableOptions) throws ReplicationException { int i = 0; for (; i < name.length(); i++) { @@ -93,12 +93,12 @@ public TableHandler(ClusterManager mgr, String name, TableOptions options) // superclass members tableName_ = name; appID_ = resp.app_id; - hasher_ = options.keyHasher(); + hasher_ = internalTableOptions.keyHasher(); // members of this manager_ = mgr; executor_ = manager_.getExecutor(); - this.backupRequestDelayMs = options.backupRequestDelayMs(); + this.backupRequestDelayMs = internalTableOptions.tableOptions().backupRequestDelayMs(); if (backupRequestDelayMs > 0) { logger.info("the delay time of backup request is \"{}\"", backupRequestDelayMs); } @@ -109,7 +109,7 @@ public TableHandler(ClusterManager mgr, String name, TableOptions options) inQuerying_ = new AtomicBoolean(false); lastQueryTime_ = 0; - this.interceptorManger = new InterceptorManger(options); + this.interceptorManger = new InterceptorManger(internalTableOptions.tableOptions()); } public ReplicaConfiguration getReplicaConfig(int index) { diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/InterceptorManger.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/InterceptorManger.java index b38bb241..6a276c42 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/InterceptorManger.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/InterceptorManger.java @@ -1,7 +1,7 @@ package com.xiaomi.infra.pegasus.rpc.interceptor; import com.xiaomi.infra.pegasus.base.error_code.error_types; -import com.xiaomi.infra.pegasus.rpc.TableOptions; +import com.xiaomi.infra.pegasus.client.TableOptions; import com.xiaomi.infra.pegasus.rpc.async.ClientRequestRound; import com.xiaomi.infra.pegasus.rpc.async.TableHandler; import java.util.ArrayList; diff --git a/src/test/java/com/xiaomi/infra/pegasus/client/TestPException.java b/src/test/java/com/xiaomi/infra/pegasus/client/TestPException.java index 3b529b00..e091f3b6 100644 --- a/src/test/java/com/xiaomi/infra/pegasus/client/TestPException.java +++ b/src/test/java/com/xiaomi/infra/pegasus/client/TestPException.java @@ -11,7 +11,7 @@ import com.xiaomi.infra.pegasus.base.gpid; import com.xiaomi.infra.pegasus.client.PegasusTable.Request; import com.xiaomi.infra.pegasus.operator.rrdb_put_operator; -import com.xiaomi.infra.pegasus.rpc.TableOptions; +import com.xiaomi.infra.pegasus.rpc.InternalTableOptions; import com.xiaomi.infra.pegasus.rpc.async.ClusterManager; import com.xiaomi.infra.pegasus.rpc.async.TableHandler; import io.netty.util.concurrent.DefaultPromise; @@ -59,7 +59,7 @@ public void testHandleReplicationException() throws Exception { String metaList = "127.0.0.1:34601,127.0.0.1:34602,127.0.0.1:34603"; ClusterManager manager = new ClusterManager(ClientOptions.builder().metaServers(metaList).build()); - TableHandler table = manager.openTable("temp", TableOptions.forTest()); + TableHandler table = manager.openTable("temp", new InternalTableOptions()); DefaultPromise promise = table.newPromise(); update_request req = new update_request(new blob(), new blob(), 100); gpid gpid = table.getGpidByHash(1); @@ -97,7 +97,7 @@ public void testTimeOutIsZero() throws Exception { String metaList = "127.0.0.1:34601,127.0.0.1:34602, 127.0.0.1:34603"; ClusterManager manager = new ClusterManager(ClientOptions.builder().metaServers(metaList).build()); - TableHandler table = manager.openTable("temp", TableOptions.forTest()); + TableHandler table = manager.openTable("temp", new InternalTableOptions()); DefaultPromise promise = table.newPromise(); update_request req = new update_request(new blob(), new blob(), 100); gpid gpid = table.getGpidByHash(1); diff --git a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManagerTest.java b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManagerTest.java index 8e9ec9ca..6d7d8ad5 100644 --- a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManagerTest.java +++ b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManagerTest.java @@ -6,8 +6,8 @@ import com.xiaomi.infra.pegasus.base.error_code; import com.xiaomi.infra.pegasus.base.rpc_address; import com.xiaomi.infra.pegasus.client.ClientOptions; +import com.xiaomi.infra.pegasus.rpc.InternalTableOptions; import com.xiaomi.infra.pegasus.rpc.ReplicationException; -import com.xiaomi.infra.pegasus.rpc.TableOptions; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -50,7 +50,7 @@ public void testOpenTable() throws Exception { TableHandler result = null; try { - result = testManager.openTable("testName", TableOptions.forTest()); + result = testManager.openTable("testName", new InternalTableOptions()); } catch (ReplicationException e) { Assert.assertEquals(error_code.error_types.ERR_SESSION_RESET, e.getErrorType()); } finally { @@ -62,7 +62,7 @@ public void testOpenTable() throws Exception { String address_list2 = "127.0.0.1:123,127.0.0.1:34603,127.0.0.1:34601,127.0.0.1:34602"; testManager = new ClusterManager(ClientOptions.builder().metaServers(address_list2).build()); try { - result = testManager.openTable("hehe", TableOptions.forTest()); + result = testManager.openTable("hehe", new InternalTableOptions()); } catch (ReplicationException e) { Assert.assertEquals(error_code.error_types.ERR_OBJECT_NOT_FOUND, e.getErrorType()); } finally { @@ -71,7 +71,7 @@ public void testOpenTable() throws Exception { // test open an valid table try { - result = testManager.openTable("temp", TableOptions.forTest()); + result = testManager.openTable("temp", new InternalTableOptions()); } catch (ReplicationException e) { Assert.fail(); } finally { diff --git a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/InterceptorTest.java b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/InterceptorTest.java index 276726e9..e53fdeec 100644 --- a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/InterceptorTest.java +++ b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/InterceptorTest.java @@ -4,7 +4,7 @@ import com.xiaomi.infra.pegasus.client.PException; import com.xiaomi.infra.pegasus.client.PegasusClientFactory; import com.xiaomi.infra.pegasus.client.PegasusTableInterface; -import com.xiaomi.infra.pegasus.rpc.TableOptions; +import com.xiaomi.infra.pegasus.client.TableOptions; import org.junit.Test; import org.junit.jupiter.api.Assertions; diff --git a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/TableHandlerTest.java b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/TableHandlerTest.java index 66a41311..e02087d4 100644 --- a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/TableHandlerTest.java +++ b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/TableHandlerTest.java @@ -8,8 +8,8 @@ import com.xiaomi.infra.pegasus.base.rpc_address; import com.xiaomi.infra.pegasus.client.ClientOptions; import com.xiaomi.infra.pegasus.operator.client_operator; +import com.xiaomi.infra.pegasus.rpc.InternalTableOptions; import com.xiaomi.infra.pegasus.rpc.ReplicationException; -import com.xiaomi.infra.pegasus.rpc.TableOptions; import com.xiaomi.infra.pegasus.rpc.async.TableHandler.ReplicaConfiguration; import com.xiaomi.infra.pegasus.tools.Toollet; import java.util.ArrayList; @@ -61,7 +61,7 @@ public void testOperateOp() throws Exception { System.out.println("TableHandlerTest#testOperateOp"); TableHandler table = null; try { - table = testManager.openTable("temp", TableOptions.forTest()); + table = testManager.openTable("temp", new InternalTableOptions()); } catch (ReplicationException e) { Assert.fail(); } @@ -148,7 +148,7 @@ public void testTryQueryMeta() throws Exception { TableHandler table = null; try { - table = testManager.openTable("temp", TableOptions.forTest()); + table = testManager.openTable("temp", new InternalTableOptions()); } catch (ReplicationException e) { Assert.fail(); } @@ -192,7 +192,7 @@ public void testConnectAfterQueryMeta() throws Exception { TableHandler table = null; try { - table = testManager.openTable("temp", TableOptions.forTest()); + table = testManager.openTable("temp", new InternalTableOptions()); } catch (ReplicationException e) { Assert.fail(); } diff --git a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/TimeoutBenchmark.java b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/TimeoutBenchmark.java index 656bb85d..b62cc928 100644 --- a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/TimeoutBenchmark.java +++ b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/TimeoutBenchmark.java @@ -6,8 +6,8 @@ import com.xiaomi.infra.pegasus.base.error_code; import com.xiaomi.infra.pegasus.base.gpid; import com.xiaomi.infra.pegasus.client.ClientOptions; +import com.xiaomi.infra.pegasus.rpc.InternalTableOptions; import com.xiaomi.infra.pegasus.rpc.ReplicationException; -import com.xiaomi.infra.pegasus.rpc.TableOptions; import com.xiaomi.infra.pegasus.tools.Toollet; import java.util.concurrent.atomic.AtomicInteger; import org.junit.Assert; @@ -68,7 +68,7 @@ public void timeoutChecker() { TableHandler handle; try { - handle = manager.openTable("temp", TableOptions.forTest()); + handle = manager.openTable("temp", new InternalTableOptions()); } catch (ReplicationException e) { e.printStackTrace(); Assert.fail(); From e1602fee2d0e9015a5d0c657c123b4595964cd44 Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Thu, 3 Sep 2020 14:06:03 +0800 Subject: [PATCH 27/32] fix options --- .../com/xiaomi/infra/pegasus/client/PegasusClient.java | 2 +- .../xiaomi/infra/pegasus/rpc/InternalTableOptions.java | 10 ++++------ .../xiaomi/infra/pegasus/client/TestPException.java | 4 ++-- .../infra/pegasus/rpc/async/ClusterManagerTest.java | 6 +++--- .../infra/pegasus/rpc/async/TableHandlerTest.java | 6 +++--- .../infra/pegasus/rpc/async/TimeoutBenchmark.java | 2 +- 6 files changed, 14 insertions(+), 16 deletions(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java index 7351c740..64e6fa82 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java @@ -43,7 +43,7 @@ public long hash(byte[] key) { } private PegasusTable getTable(String tableName) throws PException { - return getTable(tableName, new InternalTableOptions()); + return getTable(tableName, new InternalTableOptions(new PegasusHasher(), new TableOptions())); } private PegasusTable getTable(String tableName, InternalTableOptions internalTableOptions) diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/InternalTableOptions.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/InternalTableOptions.java index 8be0ba22..20107df9 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/InternalTableOptions.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/InternalTableOptions.java @@ -1,17 +1,11 @@ package com.xiaomi.infra.pegasus.rpc; -import com.xiaomi.infra.pegasus.client.PegasusClient.PegasusHasher; import com.xiaomi.infra.pegasus.client.TableOptions; public class InternalTableOptions { private final KeyHasher keyHasher; private final TableOptions tableOptions; - public InternalTableOptions() { - this.keyHasher = new PegasusHasher(); - this.tableOptions = new TableOptions(); - } - public InternalTableOptions(KeyHasher keyHasher, TableOptions tableOptions) { this.keyHasher = keyHasher; this.tableOptions = tableOptions; @@ -24,4 +18,8 @@ public KeyHasher keyHasher() { public TableOptions tableOptions() { return tableOptions; } + + public static InternalTableOptions forTest() { + return new InternalTableOptions(KeyHasher.DEFAULT, new TableOptions()); + } } diff --git a/src/test/java/com/xiaomi/infra/pegasus/client/TestPException.java b/src/test/java/com/xiaomi/infra/pegasus/client/TestPException.java index e091f3b6..dc5d0e51 100644 --- a/src/test/java/com/xiaomi/infra/pegasus/client/TestPException.java +++ b/src/test/java/com/xiaomi/infra/pegasus/client/TestPException.java @@ -59,7 +59,7 @@ public void testHandleReplicationException() throws Exception { String metaList = "127.0.0.1:34601,127.0.0.1:34602,127.0.0.1:34603"; ClusterManager manager = new ClusterManager(ClientOptions.builder().metaServers(metaList).build()); - TableHandler table = manager.openTable("temp", new InternalTableOptions()); + TableHandler table = manager.openTable("temp", InternalTableOptions.forTest()); DefaultPromise promise = table.newPromise(); update_request req = new update_request(new blob(), new blob(), 100); gpid gpid = table.getGpidByHash(1); @@ -97,7 +97,7 @@ public void testTimeOutIsZero() throws Exception { String metaList = "127.0.0.1:34601,127.0.0.1:34602, 127.0.0.1:34603"; ClusterManager manager = new ClusterManager(ClientOptions.builder().metaServers(metaList).build()); - TableHandler table = manager.openTable("temp", new InternalTableOptions()); + TableHandler table = manager.openTable("temp", InternalTableOptions.forTest()); DefaultPromise promise = table.newPromise(); update_request req = new update_request(new blob(), new blob(), 100); gpid gpid = table.getGpidByHash(1); diff --git a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManagerTest.java b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManagerTest.java index 6d7d8ad5..3a232e21 100644 --- a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManagerTest.java +++ b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManagerTest.java @@ -50,7 +50,7 @@ public void testOpenTable() throws Exception { TableHandler result = null; try { - result = testManager.openTable("testName", new InternalTableOptions()); + result = testManager.openTable("testName", InternalTableOptions.forTest()); } catch (ReplicationException e) { Assert.assertEquals(error_code.error_types.ERR_SESSION_RESET, e.getErrorType()); } finally { @@ -62,7 +62,7 @@ public void testOpenTable() throws Exception { String address_list2 = "127.0.0.1:123,127.0.0.1:34603,127.0.0.1:34601,127.0.0.1:34602"; testManager = new ClusterManager(ClientOptions.builder().metaServers(address_list2).build()); try { - result = testManager.openTable("hehe", new InternalTableOptions()); + result = testManager.openTable("hehe", InternalTableOptions.forTest()); } catch (ReplicationException e) { Assert.assertEquals(error_code.error_types.ERR_OBJECT_NOT_FOUND, e.getErrorType()); } finally { @@ -71,7 +71,7 @@ public void testOpenTable() throws Exception { // test open an valid table try { - result = testManager.openTable("temp", new InternalTableOptions()); + result = testManager.openTable("temp", InternalTableOptions.forTest()); } catch (ReplicationException e) { Assert.fail(); } finally { diff --git a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/TableHandlerTest.java b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/TableHandlerTest.java index e02087d4..714c2053 100644 --- a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/TableHandlerTest.java +++ b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/TableHandlerTest.java @@ -61,7 +61,7 @@ public void testOperateOp() throws Exception { System.out.println("TableHandlerTest#testOperateOp"); TableHandler table = null; try { - table = testManager.openTable("temp", new InternalTableOptions()); + table = testManager.openTable("temp", InternalTableOptions.forTest()); } catch (ReplicationException e) { Assert.fail(); } @@ -148,7 +148,7 @@ public void testTryQueryMeta() throws Exception { TableHandler table = null; try { - table = testManager.openTable("temp", new InternalTableOptions()); + table = testManager.openTable("temp", InternalTableOptions.forTest()); } catch (ReplicationException e) { Assert.fail(); } @@ -192,7 +192,7 @@ public void testConnectAfterQueryMeta() throws Exception { TableHandler table = null; try { - table = testManager.openTable("temp", new InternalTableOptions()); + table = testManager.openTable("temp", InternalTableOptions.forTest()); } catch (ReplicationException e) { Assert.fail(); } diff --git a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/TimeoutBenchmark.java b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/TimeoutBenchmark.java index b62cc928..9c1bbe7c 100644 --- a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/TimeoutBenchmark.java +++ b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/TimeoutBenchmark.java @@ -68,7 +68,7 @@ public void timeoutChecker() { TableHandler handle; try { - handle = manager.openTable("temp", new InternalTableOptions()); + handle = manager.openTable("temp", InternalTableOptions.forTest()); } catch (ReplicationException e) { e.printStackTrace(); Assert.fail(); From fa943994190e5ded92f24d2c3c10baa0f5647610 Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Thu, 3 Sep 2020 14:06:51 +0800 Subject: [PATCH 28/32] fix options --- .../java/com/xiaomi/infra/pegasus/client/PegasusClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java index 64e6fa82..56c5c373 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java @@ -29,7 +29,7 @@ public class PegasusClient implements PegasusClientInterface { private final Object tableMapLock; private Cluster cluster; - public static class PegasusHasher implements KeyHasher { + static class PegasusHasher implements KeyHasher { @Override public long hash(byte[] key) { Validate.isTrue(key != null && key.length >= 2); From 88b5e880ad4ae304151d96593b2bd27923d07680 Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Thu, 3 Sep 2020 14:07:26 +0800 Subject: [PATCH 29/32] fix options --- .../java/com/xiaomi/infra/pegasus/client/PegasusClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java index 56c5c373..f80dc8b6 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java @@ -29,7 +29,7 @@ public class PegasusClient implements PegasusClientInterface { private final Object tableMapLock; private Cluster cluster; - static class PegasusHasher implements KeyHasher { + private static class PegasusHasher implements KeyHasher { @Override public long hash(byte[] key) { Validate.isTrue(key != null && key.length >= 2); From a098625c378c2bcfa3525b3ba527844bdc0cf38b Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Fri, 4 Sep 2020 13:47:17 +0800 Subject: [PATCH 30/32] fix comment --- .../client/PegasusClientInterface.java | 22 +++---------------- 1 file changed, 3 insertions(+), 19 deletions(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java index b38145be..7402d984 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java @@ -44,25 +44,9 @@ public interface PegasusClientInterface { /** * Open a table, and prepare the sessions and route-table to the replica-servers. * - *

Note: this interface is deprecated, retaining it only for compatibility, please see {@link - * PegasusClientInterface#openTable(String, TableOptions)} - * - *

Please notice that pegasus support two kinds of API: 1. the client-interface way, which is - * provided in this class. 2. the table-interface way, which is provided by {@link - * PegasusTableInterface}. With the client-interface, you don't need to create - * PegasusTableInterface by openTable, so you can access the pegasus cluster conveniently. - * However, the client-interface's api also has some restrictions: 1. we don't provide async - * methods in client-interface. 2. the timeout in client-interface isn't as accurate as the - * table-interface. 3. the client-interface may throw an exception when open table fails. It means - * that you may need to handle this exception in every data access operation, which is annoying. - * 4. You can't specify a per-operation timeout. So we recommend you to use the table-interface. - * - * @param tableName the table should be exist on the server, which is created before by the system - * administrator - * @param backupRequestDelayMs the delay time to send backup request. If backupRequestDelayMs <= - * 0, The backup request is disabled. - * @return the table handler - * @throws PException throws exception if any error occurs. + * @deprecated retaining it only for compatibility, we will remove it later, don't use it any + * more. the latest interface please see {@link PegasusClientInterface#openTable(String, + * TableOptions)} */ @Deprecated public PegasusTableInterface openTable(String tableName, int backupRequestDelayMs) From 2a093691495e802c8e1971855762f71573900cfe Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Fri, 4 Sep 2020 13:52:08 +0800 Subject: [PATCH 31/32] fix comment --- .../rpc/interceptor/BackupRequestInterceptor.java | 7 ++++++- .../pegasus/rpc/interceptor/InterceptorManger.java | 12 +++++------- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/BackupRequestInterceptor.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/BackupRequestInterceptor.java index 8ed2d501..c4c39eac 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/BackupRequestInterceptor.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/BackupRequestInterceptor.java @@ -10,6 +10,11 @@ import java.util.concurrent.TimeUnit; public class BackupRequestInterceptor implements TableInterceptor { + private final long backupRequestDelayMs; + + public BackupRequestInterceptor(long backupRequestDelayMs) { + this.backupRequestDelayMs = backupRequestDelayMs; + } @Override public void before(ClientRequestRound clientRequestRound, TableHandler tableHandler) { @@ -53,7 +58,7 @@ private void backupCall(ClientRequestRound clientRequestRound, TableHandler tabl clientRequestRound.timeoutMs(), true); }, - tableHandler.backupRequestDelayMs(), + backupRequestDelayMs, TimeUnit.MILLISECONDS)); } } diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/InterceptorManger.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/InterceptorManger.java index 6a276c42..cc1a27e9 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/InterceptorManger.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/InterceptorManger.java @@ -12,15 +12,13 @@ public class InterceptorManger { private List interceptors = new ArrayList<>(); public InterceptorManger(TableOptions options) { - this.register(new BackupRequestInterceptor(), options.enableBackupRequest()) - .register(new CompressionInterceptor(), options.enableCompression()); - } + if (options.enableBackupRequest()) { + interceptors.add(new BackupRequestInterceptor(options.backupRequestDelayMs())); + } - private InterceptorManger register(TableInterceptor interceptor, boolean enable) { - if (enable) { - interceptors.add(interceptor); + if (options.enableCompression()) { + interceptors.add(new CompressionInterceptor()); } - return this; } public void before(ClientRequestRound clientRequestRound, TableHandler tableHandler) { From d9acc5adba084b0f38fbbaebbab9b16e00ece89b Mon Sep 17 00:00:00 2001 From: Shuo Date: Fri, 4 Sep 2020 14:49:00 +0800 Subject: [PATCH 32/32] Update src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java Co-authored-by: Wu Tao --- .../xiaomi/infra/pegasus/client/PegasusClientInterface.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java index 7402d984..7988638f 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java @@ -44,8 +44,8 @@ public interface PegasusClientInterface { /** * Open a table, and prepare the sessions and route-table to the replica-servers. * - * @deprecated retaining it only for compatibility, we will remove it later, don't use it any - * more. the latest interface please see {@link PegasusClientInterface#openTable(String, + * @deprecated Retained only for backward compatibility, will be removed later. Don't use it any + * more. The latest interface please see {@link PegasusClientInterface#openTable(String, * TableOptions)} */ @Deprecated