From b6bff02fb6a9de4ee8bac6ac5fe0249fdde3f231 Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Wed, 19 Aug 2020 19:30:58 +0800 Subject: [PATCH 1/5] init the api --- .../infra/pegasus/client/PegasusClient.java | 103 +++++++++++ .../client/PegasusClientInterface.java | 167 ++++++++++++++++++ .../pegasus/client/PegasusTableInterface.java | 162 +++++++++++++++++ .../pegasus/client/request/BatchDelete.java | 5 + .../pegasus/client/request/BatchGet.java | 23 +++ .../pegasus/client/request/BatchSet.java | 18 ++ .../pegasus/client/request/DelRange.java | 12 ++ .../infra/pegasus/client/request/Delete.java | 12 ++ .../infra/pegasus/client/request/Get.java | 12 ++ .../pegasus/client/request/GetRange.java | 14 ++ .../pegasus/client/request/Increment.java | 16 ++ .../infra/pegasus/client/request/Key.java | 15 ++ .../pegasus/client/request/MultiDelete.java | 5 + .../pegasus/client/request/MultiGet.java | 12 ++ .../pegasus/client/request/MultiSet.java | 30 ++++ .../infra/pegasus/client/request/Set.java | 29 +++ .../client/response/BatchDelResult.java | 5 + .../client/response/BatchGetResult.java | 13 ++ .../client/response/BatchSetResult.java | 19 ++ 19 files changed, 672 insertions(+) create mode 100644 src/main/java/com/xiaomi/infra/pegasus/client/request/BatchDelete.java create mode 100644 src/main/java/com/xiaomi/infra/pegasus/client/request/BatchGet.java create mode 100644 src/main/java/com/xiaomi/infra/pegasus/client/request/BatchSet.java create mode 100644 src/main/java/com/xiaomi/infra/pegasus/client/request/DelRange.java create mode 100644 src/main/java/com/xiaomi/infra/pegasus/client/request/Delete.java create mode 100644 src/main/java/com/xiaomi/infra/pegasus/client/request/Get.java create mode 100644 src/main/java/com/xiaomi/infra/pegasus/client/request/GetRange.java create mode 100644 src/main/java/com/xiaomi/infra/pegasus/client/request/Increment.java create mode 100644 src/main/java/com/xiaomi/infra/pegasus/client/request/Key.java create mode 100644 src/main/java/com/xiaomi/infra/pegasus/client/request/MultiDelete.java create mode 100644 src/main/java/com/xiaomi/infra/pegasus/client/request/MultiGet.java create mode 100644 src/main/java/com/xiaomi/infra/pegasus/client/request/MultiSet.java create mode 100644 src/main/java/com/xiaomi/infra/pegasus/client/request/Set.java create mode 100644 src/main/java/com/xiaomi/infra/pegasus/client/response/BatchDelResult.java create mode 100644 src/main/java/com/xiaomi/infra/pegasus/client/response/BatchGetResult.java create mode 100644 src/main/java/com/xiaomi/infra/pegasus/client/response/BatchSetResult.java diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java index 73c56b7e..dd4238c6 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java @@ -3,6 +3,22 @@ // can be found in the LICENSE file in the root directory of this source tree. package com.xiaomi.infra.pegasus.client; +import com.xiaomi.infra.pegasus.client.PegasusTableInterface.MultiGetResult; +import com.xiaomi.infra.pegasus.client.request.BatchDelete; +import com.xiaomi.infra.pegasus.client.request.BatchGet; +import com.xiaomi.infra.pegasus.client.request.BatchSet; +import com.xiaomi.infra.pegasus.client.request.DelRange; +import com.xiaomi.infra.pegasus.client.request.Delete; +import com.xiaomi.infra.pegasus.client.request.Get; +import com.xiaomi.infra.pegasus.client.request.GetRange; +import com.xiaomi.infra.pegasus.client.request.Increment; +import com.xiaomi.infra.pegasus.client.request.MultiDelete; +import com.xiaomi.infra.pegasus.client.request.MultiGet; +import com.xiaomi.infra.pegasus.client.request.MultiSet; +import com.xiaomi.infra.pegasus.client.request.Set; +import com.xiaomi.infra.pegasus.client.response.BatchDelResult; +import com.xiaomi.infra.pegasus.client.response.BatchGetResult; +import com.xiaomi.infra.pegasus.client.response.BatchSetResult; import com.xiaomi.infra.pegasus.rpc.*; import com.xiaomi.infra.pegasus.tools.Tools; import java.nio.ByteBuffer; @@ -199,6 +215,93 @@ public ClientOptions getConfiguration() { return clientOptions; } + @Override + public boolean exist(String tableName, Get get) throws PException { + PegasusTable tb = getTable(tableName); + return tb.exist(get, 0); + } + + @Override + public byte[] get(String tableName, Get get) throws PException { + PegasusTable tb = getTable(tableName); + return tb.get(get, 0); + } + + @Override + public void batchGet(String tableName, BatchGet batchGet, BatchGetResult batchGetResult) + throws PException { + PegasusTable tb = getTable(tableName); + tb.batchGet(batchGet, batchGetResult, 0); + } + + @Override + public MultiGetResult multiGet(String tableName, MultiGet multiGet) throws PException { + PegasusTable tb = getTable(tableName); + return tb.multiGet(multiGet, 0); + } + + @Override + public MultiGetResult getRange(String tableName, GetRange getRange) throws PException { + PegasusTable tb = getTable(tableName); + return tb.getRange(getRange, 0); + } + + @Override + public void set(String tableName, Set set) throws PException { + PegasusTable tb = getTable(tableName); + tb.set(set, 0); + } + + @Override + public void batchSet(String tableName, BatchSet batchSet, BatchSetResult batchSetResult) + throws PException { + PegasusTable tb = getTable(tableName); + tb.batchSet(batchSet, batchSetResult, 0); + } + + @Override + public void multiSet(String tableName, MultiSet multiSet) throws PException { + PegasusTable tb = getTable(tableName); + tb.multiSet(multiSet, 0); + } + + @Override + public void del(String tableName, Delete delete) throws PException { + PegasusTable tb = getTable(tableName); + tb.del(delete, 0); + } + + @Override + public void batchDel(String tableName, BatchDelete batchDelete, BatchDelResult batchDelResult) + throws PException { + PegasusTable tb = getTable(tableName); + tb.batchDel(batchDelete, batchDelResult, 0); + } + + @Override + public void multiDel(String tableName, MultiDelete multiDelete) throws PException { + PegasusTable tb = getTable(tableName); + tb.multiDel(multiDelete, 0); + } + + @Override + public void delRange(String tableName, DelRange delRange) throws PException { + PegasusTable tb = getTable(tableName); + tb.delRange(delRange, 0); + } + + @Override + public int ttl(String tableName, Get get) throws PException { + PegasusTable tb = getTable(tableName); + return tb.ttl(get, 0); + } + + @Override + public long incr(String tableName, Increment increment) throws PException { + PegasusTable tb = getTable(tableName); + return tb.incr(increment, 0); + } + @Override public boolean exist(String tableName, byte[] hashKey, byte[] sortKey) throws PException { PegasusTable tb = getTable(tableName); diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java index f28f9a26..76e1847e 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java @@ -3,6 +3,22 @@ // can be found in the LICENSE file in the root directory of this source tree. package com.xiaomi.infra.pegasus.client; +import com.xiaomi.infra.pegasus.client.PegasusTableInterface.MultiGetResult; +import com.xiaomi.infra.pegasus.client.request.BatchDelete; +import com.xiaomi.infra.pegasus.client.request.BatchGet; +import com.xiaomi.infra.pegasus.client.request.BatchSet; +import com.xiaomi.infra.pegasus.client.request.DelRange; +import com.xiaomi.infra.pegasus.client.request.Delete; +import com.xiaomi.infra.pegasus.client.request.Get; +import com.xiaomi.infra.pegasus.client.request.GetRange; +import com.xiaomi.infra.pegasus.client.request.Increment; +import com.xiaomi.infra.pegasus.client.request.MultiDelete; +import com.xiaomi.infra.pegasus.client.request.MultiGet; +import com.xiaomi.infra.pegasus.client.request.MultiSet; +import com.xiaomi.infra.pegasus.client.request.Set; +import com.xiaomi.infra.pegasus.client.response.BatchDelResult; +import com.xiaomi.infra.pegasus.client.response.BatchGetResult; +import com.xiaomi.infra.pegasus.client.response.BatchSetResult; import java.util.*; import org.apache.commons.lang3.tuple.Pair; @@ -64,6 +80,127 @@ public interface PegasusClientInterface { public PegasusTableInterface openTable(String tableName, int backupRequestDelayMs) throws PException; + /** + * Check value exist by key from the cluster + * + * @param tableName TableHandler name + * @return true if exist, false if not exist + * @throws PException throws exception if any error occurs. + */ + public boolean exist(String tableName, Get get) throws PException; + + /** + * Get value. + * + * @param tableName TableHandler name + * @return value; null if not found + * @throws PException throws exception if any error occurs. + */ + public byte[] get(String tableName, Get get) throws PException; + + /** + * Batch get values of different keys. Will terminate immediately if any error occurs. + * + * @param tableName table name + * @throws PException throws exception if any error occurs. + *

Notice: the method is not atomic, that means, maybe some keys succeed but some keys + * failed. + */ + public void batchGet(String tableName, BatchGet batchGet, BatchGetResult batchGetResult) + throws PException; + + /** + * Get multiple values under the same hash key. + * + * @param tableName table name + * @return true if all data is fetched; false if only partial data is fetched. + * @throws PException throws exception if any error occurs. + */ + public MultiGetResult multiGet(String tableName, MultiGet multiGet) throws PException; + + /** + * Get multiple key-values under the same hashKey with sortKey range limited. + * + * @param tableName table name + * @return true if all data is fetched; false if only partial data is fetched. + * @throws PException throws exception if any error occurs. + */ + public MultiGetResult getRange(String tableName, GetRange getRange) throws PException; + + /** + * Set value. + * + * @param tableName TableHandler name + * @throws PException throws exception if any error occurs. + */ + public void set(String tableName, Set set) throws PException; + + /** + * Batch set lots of values. Will terminate immediately if any error occurs. + * + * @param tableName TableHandler name + * @throws PException throws exception if any error occurs. + *

Notice: the method is not atomic, that means, maybe some keys succeed but some keys + * failed. + */ + public void batchSet(String tableName, BatchSet batchSet, BatchSetResult batchSetResult) + throws PException; + + /** + * Set multiple value under the same hash key. + * + * @param tableName table name + * @throws PException throws exception if any error occurs. + */ + public void multiSet(String tableName, MultiSet multiSet) throws PException; + + /** + * Delete value. + * + * @param tableName TableHandler name + * @throws PException throws exception if any error occurs. + */ + public void del(String tableName, Delete delete) throws PException; + + /** + * Batch delete values of different keys. Will terminate immediately if any error occurs. + * + * @param tableName table name + * @throws PException throws exception if any error occurs. + *

Notice: the method is not atomic, that means, maybe some keys succeed but some keys + * failed. + */ + public void batchDel(String tableName, BatchDelete batchDelete, BatchDelResult batchDelResult) + throws PException; + + public void multiDel(String tableName, MultiDelete multiDelete) throws PException; + + /** + * Delete key-values within range of startSortKey and stopSortKey under hashKey. Will terminate + * immediately if any error occurs. + * + * @param tableName table name + * @throws PException throws exception if any error occurs. + */ + public void delRange(String tableName, DelRange delRange) throws PException; + + /** + * Get ttl time. + * + * @param tableName TableHandler name + * @return ttl time in seconds; -1 if no ttl set; -2 if not exist. + * @throws PException throws exception if any error occurs. + */ + public int ttl(String tableName, Get get) throws PException; + + /** + * Atomically increment value. + * + * @return the new value. + * @throws PException throws exception if any error occurs. + */ + public long incr(String tableName, Increment increment) throws PException; + /** * Check value exist by key from the cluster * @@ -73,6 +210,7 @@ public PegasusTableInterface openTable(String tableName, int backupRequestDelayM * @return true if exist, false if not exist * @throws PException throws exception if any error occurs. */ + @Deprecated public boolean exist(String tableName, byte[] hashKey, byte[] sortKey) throws PException; /** @@ -94,6 +232,7 @@ public PegasusTableInterface openTable(String tableName, int backupRequestDelayM * @return value; null if not found * @throws PException throws exception if any error occurs. */ + @Deprecated public byte[] get(String tableName, byte[] hashKey, byte[] sortKey) throws PException; /** @@ -108,6 +247,7 @@ public PegasusTableInterface openTable(String tableName, int backupRequestDelayM *

Notice: the method is not atomic, that means, maybe some keys succeed but some keys * failed. */ + @Deprecated public void batchGet(String tableName, List> keys, List values) throws PException; @@ -125,6 +265,7 @@ public void batchGet(String tableName, List> keys, ListNotice: the method is not atomic, that means, maybe some keys succeed but some keys * failed. */ + @Deprecated public int batchGet2( String tableName, List> keys, List> results) throws PException; @@ -146,6 +287,7 @@ public int batchGet2( * @return true if all data is fetched; false if only partial data is fetched. * @throws PException throws exception if any error occurs. */ + @Deprecated public boolean multiGet( String tableName, byte[] hashKey, @@ -155,6 +297,7 @@ public boolean multiGet( List> values) throws PException; + @Deprecated public boolean multiGet( String tableName, byte[] hashKey, List sortKeys, List> values) throws PException; @@ -175,6 +318,7 @@ public boolean multiGet( * @return true if all data is fetched; false if only partial data is fetched. * @throws PException throws exception if any error occurs. */ + @Deprecated public boolean multiGet( String tableName, byte[] hashKey, @@ -186,6 +330,7 @@ public boolean multiGet( List> values) throws PException; + @Deprecated public boolean multiGet( String tableName, byte[] hashKey, @@ -208,6 +353,7 @@ public boolean multiGet( *

Notice: the method is not atomic, that means, maybe some keys succeed but some keys * failed. */ + @Deprecated public void batchMultiGet( String tableName, List>> keys, List values) throws PException; @@ -228,6 +374,7 @@ public void batchMultiGet( *

Notice: the method is not atomic, that means, maybe some keys succeed but some keys * failed. */ + @Deprecated public int batchMultiGet2( String tableName, List>> keys, @@ -266,9 +413,11 @@ public boolean multiGetSortKeys(String tableName, byte[] hashKey, List s * @param ttlSeconds time to live in seconds, 0 means no ttl. default value is 0. * @throws PException throws exception if any error occurs. */ + @Deprecated public void set(String tableName, byte[] hashKey, byte[] sortKey, byte[] value, int ttlSeconds) throws PException; + @Deprecated public void set(String tableName, byte[] hashKey, byte[] sortKey, byte[] value) throws PException; /** @@ -280,6 +429,7 @@ public void set(String tableName, byte[] hashKey, byte[] sortKey, byte[] value, *

Notice: the method is not atomic, that means, maybe some keys succeed but some keys * failed. */ + @Deprecated public void batchSet(String tableName, List items) throws PException; /** @@ -296,6 +446,7 @@ public void set(String tableName, byte[] hashKey, byte[] sortKey, byte[] value, *

Notice: the method is not atomic, that means, maybe some keys succeed but some keys * failed. */ + @Deprecated public int batchSet2(String tableName, List items, List results) throws PException; @@ -308,10 +459,12 @@ public int batchSet2(String tableName, List items, List res * @param ttlSeconds time to live in seconds, 0 means no ttl. default value is 0. * @throws PException throws exception if any error occurs. */ + @Deprecated public void multiSet( String tableName, byte[] hashKey, List> values, int ttlSeconds) throws PException; + @Deprecated public void multiSet(String tableName, byte[] hashKey, List> values) throws PException; @@ -326,9 +479,11 @@ public void multiSet(String tableName, byte[] hashKey, List *

Notice: the method is not atomic, that means, maybe some keys succeed but some keys * failed. */ + @Deprecated public void batchMultiSet(String tableName, List items, int ttlSeconds) throws PException; + @Deprecated public void batchMultiSet(String tableName, List items) throws PException; /** @@ -347,10 +502,12 @@ public void batchMultiSet(String tableName, List items, int ttlSeco *

Notice: the method is not atomic, that means, maybe some keys succeed but some keys * failed. */ + @Deprecated public int batchMultiSet2( String tableName, List items, int ttlSeconds, List results) throws PException; + @Deprecated public int batchMultiSet2(String tableName, List items, List results) throws PException; @@ -364,6 +521,7 @@ public int batchMultiSet2(String tableName, List items, List items, ListNotice: the method is not atomic, that means, maybe some keys succeed but some keys * failed. */ + @Deprecated public void batchDel(String tableName, List> keys) throws PException; /** @@ -392,6 +551,7 @@ public int batchMultiSet2(String tableName, List items, ListNotice: the method is not atomic, that means, maybe some keys succeed but some keys * failed. */ + @Deprecated public int batchDel2(String tableName, List> keys, List results) throws PException; @@ -403,6 +563,7 @@ public int batchDel2(String tableName, List> keys, List sortKeys) throws PException; /** @@ -416,6 +577,7 @@ public int batchDel2(String tableName, List> keys, ListNotice: the method is not atomic, that means, maybe some keys succeed but some keys * failed. */ + @Deprecated public void batchMultiDel(String tableName, List>> keys) throws PException; @@ -452,6 +615,7 @@ public void batchMultiDel(String tableName, List>> key *

Notice: the method is not atomic, that means, maybe some keys succeed but some keys * failed. */ + @Deprecated public int batchMultiDel2( String tableName, List>> keys, List results) throws PException; @@ -467,6 +631,7 @@ public int batchMultiDel2( * @return ttl time in seconds; -1 if no ttl set; -2 if not exist. * @throws PException throws exception if any error occurs. */ + @Deprecated public int ttl(String tableName, byte[] hashKey, byte[] sortKey) throws PException; /** @@ -485,9 +650,11 @@ public int batchMultiDel2( * @return the new value. * @throws PException throws exception if any error occurs. */ + @Deprecated public long incr(String tableName, byte[] hashKey, byte[] sortKey, long increment, int ttlSeconds) throws PException; + @Deprecated public long incr(String tableName, byte[] hashKey, byte[] sortKey, long increment) throws PException; diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTableInterface.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTableInterface.java index bc51ea7d..897ece10 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTableInterface.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTableInterface.java @@ -3,6 +3,21 @@ // can be found in the LICENSE file in the root directory of this source tree. package com.xiaomi.infra.pegasus.client; +import com.xiaomi.infra.pegasus.client.request.BatchDelete; +import com.xiaomi.infra.pegasus.client.request.BatchGet; +import com.xiaomi.infra.pegasus.client.request.BatchSet; +import com.xiaomi.infra.pegasus.client.request.DelRange; +import com.xiaomi.infra.pegasus.client.request.Delete; +import com.xiaomi.infra.pegasus.client.request.Get; +import com.xiaomi.infra.pegasus.client.request.GetRange; +import com.xiaomi.infra.pegasus.client.request.Increment; +import com.xiaomi.infra.pegasus.client.request.MultiDelete; +import com.xiaomi.infra.pegasus.client.request.MultiGet; +import com.xiaomi.infra.pegasus.client.request.MultiSet; +import com.xiaomi.infra.pegasus.client.request.Set; +import com.xiaomi.infra.pegasus.client.response.BatchDelResult; +import com.xiaomi.infra.pegasus.client.response.BatchGetResult; +import com.xiaomi.infra.pegasus.client.response.BatchSetResult; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; import java.util.List; @@ -55,6 +70,7 @@ * *

Please refer to the netty document for the usage of Future. */ +// TODO(jiashuo1): refactor as PegasusClientInterface public interface PegasusTableInterface { /// < -------- Exist -------- @@ -709,10 +725,128 @@ public static interface TTLListener extends GenericFutureListenerNotice: the method is not atomic, that means, maybe some keys succeed but some keys + * failed. + */ + public void batchGet(BatchGet batchGet, BatchGetResult batchGetResult, int timeout) + throws PException; + + /** + * Get multiple values under the same hash key. + * + * @return true if all data is fetched; false if only partial data is fetched. + * @throws PException throws exception if any error occurs. + */ + public MultiGetResult multiGet(MultiGet multiGet, int timeout) throws PException; + + /** + * Get multiple key-values under the same hashKey with sortKey range limited. + * + * @return true if all data is fetched; false if only partial data is fetched. + * @throws PException throws exception if any error occurs. + */ + public MultiGetResult getRange(GetRange getRange, int timeout) throws PException; + + /** + * Get multiple sort keys under the same hash key. + * + * @return true if all data is fetched; false if only partial data is fetched. + * @throws PException throws exception if any error occurs. + */ + public boolean multiGetSortKeys(MultiGet multiGet, int timeout) throws PException; + + /** + * Set value. + * + * @throws PException throws exception if any error occurs. + */ + public void set(Set set, int timeout) throws PException; + + /** + * Batch set lots of values. Will terminate immediately if any error occurs. + * + * @throws PException throws exception if any error occurs. + *

Notice: the method is not atomic, that means, maybe some keys succeed but some keys + * failed. + */ + public void batchSet(BatchSet batchSet, BatchSetResult batchSetResult, int timeout) + throws PException; + + /** + * Set multiple value under the same hash key. + * + * @throws PException throws exception if any error occurs. + */ + public void multiSet(MultiSet multiSet, int timeout) throws PException; + + /** + * Delete value. + * + * @throws PException throws exception if any error occurs. + */ + public void del(Delete delete, int timeout) throws PException; + + /** + * Batch delete values of different keys. Will terminate immediately if any error occurs. + * + * @throws PException throws exception if any error occurs. + *

Notice: the method is not atomic, that means, maybe some keys succeed but some keys + * failed. + */ + public void batchDel(BatchDelete batchDelete, BatchDelResult batchDelResult, int timeout) + throws PException; + + public void multiDel(MultiDelete multiDelete, int timeout) throws PException; + + /** + * Delete key-values within range of startSortKey and stopSortKey under hashKey. Will terminate + * immediately if any error occurs. + * + * @throws PException throws exception if any error occurs. + */ + public void delRange(DelRange delRange, int timeout) throws PException; + + /** + * Get ttl time. + * + * @return ttl time in seconds; -1 if no ttl set; -2 if not exist. + * @throws PException throws exception if any error occurs. + */ + public int ttl(Get get, int timeout) throws PException; + + /** + * Atomically increment value. + * + * @return the new value. + * @throws PException throws exception if any error occurs. + */ + public long incr(Increment increment, int timeout) throws PException; + /** * sync version of Exist, please refer to the async version {@link #asyncExist(byte[], byte[], * int)} */ + @Deprecated public boolean exist(byte[] hashKey, byte[] sortKey, int timeout /*ms*/) throws PException; /** @@ -724,6 +858,7 @@ public static interface TTLListener extends GenericFutureListenerNotice: the method is not atomic, that means, maybe some keys succeed but some keys * failed. */ + @Deprecated public void batchGet(List> keys, List values, int timeout /*ms*/) throws PException; @@ -759,6 +895,7 @@ public void batchGet(List> keys, List values, int t *

Notice: the method is not atomic, that means, maybe some keys succeed but some keys * failed. */ + @Deprecated public int batchGet2( List> keys, List> results, int timeout /*ms*/) throws PException; @@ -767,6 +904,7 @@ public int batchGet2( * sync version of MultiGet, please refer to the async version {@link #asyncMultiGet(byte[], List, * int, int, int)} and {@link #asyncMultiGet(byte[], List, int)} */ + @Deprecated public MultiGetResult multiGet( byte[] hashKey, List sortKeys, @@ -775,6 +913,7 @@ public MultiGetResult multiGet( int timeout /*ms*/) throws PException; + @Deprecated public MultiGetResult multiGet(byte[] hashKey, List sortKeys, int timeout /*ms*/) throws PException; @@ -783,6 +922,7 @@ public MultiGetResult multiGet(byte[] hashKey, List sortKeys, int timeou * byte[], byte[], MultiGetOptions, int, int, int)} and {@link #asyncMultiGet(byte[], byte[], * byte[], MultiGetOptions, int)} */ + @Deprecated public MultiGetResult multiGet( byte[] hashKey, byte[] startSortKey, @@ -793,6 +933,7 @@ public MultiGetResult multiGet( int timeout /*ms*/) throws PException; + @Deprecated public MultiGetResult multiGet( byte[] hashKey, byte[] startSortKey, @@ -815,6 +956,7 @@ public MultiGetResult multiGet( *

Notice: the method is not atomic, that means, maybe some keys succeed but some keys * failed. */ + @Deprecated public void batchMultiGet( List>> keys, List values, int timeout /*ms*/) throws PException; @@ -834,6 +976,7 @@ public void batchMultiGet( *

Notice: the method is not atomic, that means, maybe some keys succeed but some keys * failed. */ + @Deprecated public int batchMultiGet2( List>> keys, List> results, @@ -844,9 +987,11 @@ public int batchMultiGet2( * sync version of MultiGetSortKeys, please refer to the async version {@link * #asyncMultiGetSortKeys(byte[], int, int, int)} and {@link #asyncMultiGetSortKeys(byte[], int)} */ + @Deprecated public MultiGetSortKeysResult multiGetSortKeys( byte[] hashKey, int maxFetchCount, int maxFetchSize, int timeout /*ms*/) throws PException; + @Deprecated public MultiGetSortKeysResult multiGetSortKeys(byte[] hashKey, int timeout /*ms*/) throws PException; @@ -854,9 +999,11 @@ public MultiGetSortKeysResult multiGetSortKeys(byte[] hashKey, int timeout /*ms* * sync version of Set, please refer to the async version {@link #asyncSet(byte[], byte[], byte[], * int, int)} and {@link #asyncSet(byte[], byte[], byte[], int)} */ + @Deprecated public void set(byte[] hashKey, byte[] sortKey, byte[] value, int ttlSeconds, int timeout /*ms*/) throws PException; + @Deprecated public void set(byte[] hashKey, byte[] sortKey, byte[] value, int timeout /*ms*/) throws PException; @@ -871,6 +1018,7 @@ public void set(byte[] hashKey, byte[] sortKey, byte[] value, int timeout /*ms*/ *

Notice: the method is not atomic, that means, maybe some keys succeed but some keys * failed. */ + @Deprecated public void batchSet(List items, int timeout /*ms*/) throws PException; /** @@ -889,6 +1037,7 @@ public void set(byte[] hashKey, byte[] sortKey, byte[] value, int timeout /*ms*/ *

Notice: the method is not atomic, that means, maybe some keys succeed but some keys * failed. */ + @Deprecated public int batchSet2(List items, List results, int timeout /*ms*/) throws PException; @@ -896,10 +1045,12 @@ public int batchSet2(List items, List results, int timeout * sync version of MultiSet, please refer to the async version {@link #asyncMultiSet(byte[], List, * int, int)} and {@link #asyncMultiSet(byte[], List, int)} */ + @Deprecated public void multiSet( byte[] hashKey, List> values, int ttlSeconds, int timeout /*ms*/) throws PException; + @Deprecated public void multiSet(byte[] hashKey, List> values, int timeout /*ms*/) throws PException; @@ -916,6 +1067,7 @@ public void multiSet(byte[] hashKey, List> values, int time *

Notice: the method is not atomic, that means, maybe some keys succeed but some keys * failed. */ + @Deprecated public void batchMultiSet(List items, int ttlSeconds, int timeout /*ms*/) throws PException; @@ -937,6 +1089,7 @@ public void batchMultiSet(List items, int ttlSeconds, int timeout / *

Notice: the method is not atomic, that means, maybe some keys succeed but some keys * failed. */ + @Deprecated public int batchMultiSet2( List items, int ttlSeconds, List results, int timeout /*ms*/) throws PException; @@ -944,6 +1097,7 @@ public int batchMultiSet2( /** * sync version of Del, please refer to the async version {@link #asyncDel(byte[], byte[], int)} */ + @Deprecated public void del(byte[] hashKey, byte[] sortKey, int timeout /*ms*/) throws PException; /** @@ -957,6 +1111,7 @@ public int batchMultiSet2( *

Notice: the method is not atomic, that means, maybe some keys succeed but some keys * failed. */ + @Deprecated public void batchDel(List> keys, int timeout /*ms*/) throws PException; /** @@ -976,6 +1131,7 @@ public int batchMultiSet2( *

Notice: the method is not atomic, that means, maybe some keys succeed but some keys * failed. */ + @Deprecated public int batchDel2( List> keys, List results, int timeout /*ms*/) throws PException; @@ -984,6 +1140,7 @@ public int batchDel2( * sync version of MultiDel, please refer to the async version {@link #asyncMultiDel(byte[], List, * int)} */ + @Deprecated public void multiDel(byte[] hashKey, List sortKeys, int timeout /*ms*/) throws PException; /** @@ -999,6 +1156,7 @@ public int batchDel2( * used. * @throws PException throws exception if any error occurs. */ + @Deprecated public void delRange( byte[] hashKey, byte[] startSortKey, @@ -1019,6 +1177,7 @@ public void delRange( *

Notice: the method is not atomic, that means, maybe some keys succeed but some keys * failed. */ + @Deprecated public void batchMultiDel(List>> keys, int timeout /*ms*/) throws PException; @@ -1039,6 +1198,7 @@ public void batchMultiDel(List>> keys, int timeout /*m *

Notice: the method is not atomic, that means, maybe some keys succeed but some keys * failed. */ + @Deprecated public int batchMultiDel2( List>> keys, List results, int timeout /*ms*/) throws PException; @@ -1047,6 +1207,7 @@ public int batchMultiDel2( * sync version of Incr, please refer to the async version {@link #asyncIncr(byte[], byte[], long, * int, int)} */ + @Deprecated public long incr( byte[] hashKey, byte[] sortKey, long increment, int ttlSeconds, int timeout /*ms*/) throws PException; @@ -1055,6 +1216,7 @@ public long incr( * sync version of Incr, please refer to the async version {@link #asyncIncr(byte[], byte[], long, * int)} */ + @Deprecated public long incr(byte[] hashKey, byte[] sortKey, long increment, int timeout /*ms*/) throws PException; diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/BatchDelete.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/BatchDelete.java new file mode 100644 index 00000000..40637656 --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/BatchDelete.java @@ -0,0 +1,5 @@ +package com.xiaomi.infra.pegasus.client.request; + +public class BatchDelete { + +} diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/BatchGet.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/BatchGet.java new file mode 100644 index 00000000..8268e6c3 --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/BatchGet.java @@ -0,0 +1,23 @@ +package com.xiaomi.infra.pegasus.client.request; + +import java.util.ArrayList; +import java.util.List; + +public class BatchGet { + boolean forceComplete; + + public List multiGetList = new ArrayList<>(); + public List getList = new ArrayList<>(); + + public BatchGet(boolean forceComplete) { + this.forceComplete = forceComplete; + } + + public void add(MultiGet multiGet){ + multiGetList.add(multiGet); + } + + public void add(Get get){ + getList.add(get); + } +} diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/BatchSet.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/BatchSet.java new file mode 100644 index 00000000..e5b76cec --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/BatchSet.java @@ -0,0 +1,18 @@ +package com.xiaomi.infra.pegasus.client.request; + +import java.util.ArrayList; +import java.util.List; + +public class BatchSet { + public List multiSetList = new ArrayList<>(); + public List setList = new ArrayList<>(); + + public void add(MultiSet multiSet){ + multiSetList.add(multiSet); + } + + public void add(Set set){ + setList.add(set); + } + +} diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/DelRange.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/DelRange.java new file mode 100644 index 00000000..a325b058 --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/DelRange.java @@ -0,0 +1,12 @@ +package com.xiaomi.infra.pegasus.client.request; + +import com.xiaomi.infra.pegasus.client.DelRangeOptions; + +public class DelRange { + + byte[] hashKey; + byte[] startSortKey; + byte[] stopSortKey; + DelRangeOptions options; + +} diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/Delete.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/Delete.java new file mode 100644 index 00000000..b0139c65 --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/Delete.java @@ -0,0 +1,12 @@ +package com.xiaomi.infra.pegasus.client.request; + +public class Delete extends Key{ + + public Delete(byte[] hashKey) { + super(hashKey); + } + + public Delete(byte[] hashKey, byte[] sortKey) { + super(hashKey, sortKey); + } +} diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/Get.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/Get.java new file mode 100644 index 00000000..af90ee96 --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/Get.java @@ -0,0 +1,12 @@ +package com.xiaomi.infra.pegasus.client.request; + +public class Get extends Key{ + + public Get(byte[] hashKey) { + super(hashKey); + } + + public Get(byte[] hashKey, byte[] sortKey) { + super(hashKey,sortKey); + } +} diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/GetRange.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/GetRange.java new file mode 100644 index 00000000..a0cd8bf0 --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/GetRange.java @@ -0,0 +1,14 @@ +package com.xiaomi.infra.pegasus.client.request; + +import com.xiaomi.infra.pegasus.client.MultiGetOptions; + +public class GetRange { + byte[] hashKey; + byte[] startSortKey; + byte[] stopSortKey; + int maxFetchCount; + int maxFetchSize; + + MultiGetOptions options; + +} diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/Increment.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/Increment.java new file mode 100644 index 00000000..5e80c196 --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/Increment.java @@ -0,0 +1,16 @@ +package com.xiaomi.infra.pegasus.client.request; + +public class Increment extends Key{ + int increment = 1; + int ttlSeconds = 0; + + public Increment(byte[] hashKey, byte[] sortKey) { + super(hashKey, sortKey); + } + + public Increment(byte[] hashKey, byte[] sortKey, int increment, int ttlSeconds) { + super(hashKey, sortKey); + this.ttlSeconds = ttlSeconds; + this.increment = increment; + } +} diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/Key.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/Key.java new file mode 100644 index 00000000..04f44468 --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/Key.java @@ -0,0 +1,15 @@ +package com.xiaomi.infra.pegasus.client.request; + +class Key { + public byte[] hashKey = null; + public byte[] sortKey = null; + + Key(byte[] hashKey) { + this.hashKey = hashKey; + } + + Key(byte[] hashKey, byte[] sortKey) { + this.hashKey = hashKey; + this.sortKey = sortKey; + } +} diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/MultiDelete.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/MultiDelete.java new file mode 100644 index 00000000..acbf935a --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/MultiDelete.java @@ -0,0 +1,5 @@ +package com.xiaomi.infra.pegasus.client.request; + +public class MultiDelete { + +} diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/MultiGet.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/MultiGet.java new file mode 100644 index 00000000..a98ca95f --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/MultiGet.java @@ -0,0 +1,12 @@ +package com.xiaomi.infra.pegasus.client.request; + +import java.util.List; + +public class MultiGet { + byte[] hashKey; + List sortKeys; + + int maxFetchCount; + int maxFetchSize; + +} diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/MultiSet.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/MultiSet.java new file mode 100644 index 00000000..33c5771b --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/MultiSet.java @@ -0,0 +1,30 @@ +// Copyright (c) 2017, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. +package com.xiaomi.infra.pegasus.client.request; + +import java.util.*; +import org.apache.commons.lang3.tuple.Pair; + +/** + * @author qinzuoyan + *

Store data under the same hashKey. + */ +public class MultiSet { + public byte[] hashKey; + public List> values = new ArrayList<>(); + public int ttlSeconds = 0; + + public MultiSet(byte[] hashKey) { + this.hashKey = hashKey; + } + + public MultiSet(byte[] hashKey, int ttlSeconds) { + this.hashKey = hashKey; + this.ttlSeconds = ttlSeconds; + } + + public void add(byte[] sortKey, byte[] value) { + values.add(Pair.of(sortKey, value)); + } +} diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/Set.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/Set.java new file mode 100644 index 00000000..f36a071e --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/Set.java @@ -0,0 +1,29 @@ +// Copyright (c) 2017, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. +package com.xiaomi.infra.pegasus.client.request; + +import java.io.Serializable; + +public class Set implements Serializable { + public byte[] hashKey = null; + public byte[] sortKey = null; + public byte[] value = null; + public int ttlSeconds = 0; // 0 means no ttl + + public Set() {} + + public Set(byte[] hashKey, byte[] sortKey, byte[] value, int ttlSeconds) { + this.hashKey = hashKey; + this.sortKey = sortKey; + this.value = value; + this.ttlSeconds = ttlSeconds; + } + + public Set(byte[] hashKey, byte[] sortKey, byte[] value) { + this.hashKey = hashKey; + this.sortKey = sortKey; + this.value = value; + this.ttlSeconds = 0; + } +} diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/response/BatchDelResult.java b/src/main/java/com/xiaomi/infra/pegasus/client/response/BatchDelResult.java new file mode 100644 index 00000000..d92bd3c2 --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/client/response/BatchDelResult.java @@ -0,0 +1,5 @@ +package com.xiaomi.infra.pegasus.client.response; + +public class BatchDelResult { + +} diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/response/BatchGetResult.java b/src/main/java/com/xiaomi/infra/pegasus/client/response/BatchGetResult.java new file mode 100644 index 00000000..9f0a1897 --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/client/response/BatchGetResult.java @@ -0,0 +1,13 @@ +package com.xiaomi.infra.pegasus.client.response; + +import com.xiaomi.infra.pegasus.client.PException; +import java.util.List; +import org.apache.commons.lang3.tuple.Pair; + +public class BatchGetResult { + public List> results; + + public void add(PException pe, byte[] value) { + results.add(Pair.of(pe, value)); + } +} diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/response/BatchSetResult.java b/src/main/java/com/xiaomi/infra/pegasus/client/response/BatchSetResult.java new file mode 100644 index 00000000..96dfdfbd --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/client/response/BatchSetResult.java @@ -0,0 +1,19 @@ +package com.xiaomi.infra.pegasus.client.response; + +import com.xiaomi.infra.pegasus.client.PException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +public class BatchSetResult { + AtomicInteger successCount = new AtomicInteger(0); + List results = new ArrayList<>(); + + void add(PException pe) { + results.add(pe); + } + + void addSuccssCount(){ + successCount.incrementAndGet(); + } +} From ecb7e46a4acc7b89e35c359cf0831439927c7743 Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Wed, 19 Aug 2020 19:44:41 +0800 Subject: [PATCH 2/5] init the api --- .../com/xiaomi/infra/pegasus/client/request/Set.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/Set.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/Set.java index f36a071e..aca1a043 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/request/Set.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/Set.java @@ -4,11 +4,15 @@ package com.xiaomi.infra.pegasus.client.request; import java.io.Serializable; +import javax.validation.constraints.Min; +import javax.validation.constraints.NotNull; public class Set implements Serializable { - public byte[] hashKey = null; - public byte[] sortKey = null; - public byte[] value = null; + public byte[] hashKey; + public byte[] sortKey; + @NotNull + public byte[] value ; + @Min(0) public int ttlSeconds = 0; // 0 means no ttl public Set() {} From 05aa92da1ac9cf3a84ea118f7a0711469b337f04 Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Mon, 24 Aug 2020 18:05:16 +0800 Subject: [PATCH 3/5] complete base --- scripts/format-all.sh | 1 + .../infra/pegasus/client/FutureGroup.java | 44 +- .../infra/pegasus/client/PegasusClient.java | 28 +- .../client/PegasusClientInterface.java | 35 +- .../infra/pegasus/client/PegasusTable.java | 755 +++++++++++++++++- .../pegasus/client/PegasusTableInterface.java | 64 +- .../pegasus/client/request/BatchDelete.java | 19 + .../pegasus/client/request/BatchGet.java | 24 +- .../pegasus/client/request/BatchSet.java | 20 +- .../pegasus/client/request/DelRange.java | 12 - .../infra/pegasus/client/request/Delete.java | 14 +- .../infra/pegasus/client/request/Get.java | 14 +- .../pegasus/client/request/GetRange.java | 14 - .../pegasus/client/request/Increment.java | 23 +- .../infra/pegasus/client/request/Key.java | 18 +- .../pegasus/client/request/MultiDelete.java | 18 + .../pegasus/client/request/MultiGet.java | 27 +- .../pegasus/client/request/MultiSet.java | 31 +- .../pegasus/client/request/RangeDelete.java | 23 + .../pegasus/client/request/RangeGet.java | 31 + .../infra/pegasus/client/request/Set.java | 36 +- .../client/response/BatchDelResult.java | 5 - .../client/response/BatchGetResult.java | 13 - .../client/response/BatchSetResult.java | 19 - .../infra/pegasus/rpc/async/TableHandler.java | 2 +- .../infra/pegasus/client/TestFutureGroup.java | 4 +- 26 files changed, 1055 insertions(+), 239 deletions(-) delete mode 100644 src/main/java/com/xiaomi/infra/pegasus/client/request/DelRange.java delete mode 100644 src/main/java/com/xiaomi/infra/pegasus/client/request/GetRange.java create mode 100644 src/main/java/com/xiaomi/infra/pegasus/client/request/RangeDelete.java create mode 100644 src/main/java/com/xiaomi/infra/pegasus/client/request/RangeGet.java delete mode 100644 src/main/java/com/xiaomi/infra/pegasus/client/response/BatchDelResult.java delete mode 100644 src/main/java/com/xiaomi/infra/pegasus/client/response/BatchGetResult.java delete mode 100644 src/main/java/com/xiaomi/infra/pegasus/client/response/BatchSetResult.java diff --git a/scripts/format-all.sh b/scripts/format-all.sh index 995bc3f4..e3f9218a 100755 --- a/scripts/format-all.sh +++ b/scripts/format-all.sh @@ -11,6 +11,7 @@ SRC_FILES=(src/main/java/com/xiaomi/infra/pegasus/client/*.java src/main/java/com/xiaomi/infra/pegasus/operator/*.java src/main/java/com/xiaomi/infra/pegasus/tools/*.java src/main/java/com/xiaomi/infra/pegasus/base/*.java + src/main/java/com/xiaomi/infra/pegasus/client/request/*.java src/main/java/com/xiaomi/infra/pegasus/example/*.java src/test/java/com/xiaomi/infra/pegasus/client/*.java src/test/java/com/xiaomi/infra/pegasus/metrics/*.java diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/FutureGroup.java b/src/main/java/com/xiaomi/infra/pegasus/client/FutureGroup.java index c7a92ba9..bb21b95b 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/FutureGroup.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/FutureGroup.java @@ -6,29 +6,39 @@ import io.netty.util.concurrent.Future; import java.util.ArrayList; import java.util.List; +import org.apache.commons.lang3.tuple.Pair; public class FutureGroup { + private boolean forceComplete; public FutureGroup(int initialCapacity) { - asyncTasks = new ArrayList<>(initialCapacity); + this(initialCapacity, true); + } + + public FutureGroup(int initialCapacity, boolean forceComplete) { + this.asyncTasks = new ArrayList<>(initialCapacity); + this.forceComplete = forceComplete; } public void add(Future task) { asyncTasks.add(task); } - public void waitAllCompleteOrOneFail(int timeoutMillis) throws PException { - waitAllCompleteOrOneFail(null, timeoutMillis); + public void waitAllComplete(int timeoutMillis) throws PException { + List> results = new ArrayList<>(); + waitAllComplete(results, timeoutMillis); } /** * Waits until all future tasks complete but terminate if one fails. * - * @param results is nullable, each element is the result of the Future. + * @param results . */ - public void waitAllCompleteOrOneFail(List results, int timeoutMillis) throws PException { + public int waitAllComplete(List> results, int timeoutMillis) + throws PException { int timeLimit = timeoutMillis; long duration = 0; + int count = 0; for (int i = 0; i < asyncTasks.size(); i++) { Future fu = asyncTasks.get(i); try { @@ -40,20 +50,26 @@ public void waitAllCompleteOrOneFail(List results, int timeoutMillis) th throw new PException("async task #[" + i + "] await failed: " + e.toString()); } - if (fu.isSuccess() && timeLimit >= 0) { - if (results != null) { - results.set(i, fu.getNow()); - } + if (timeLimit < 0) { + throw new PException( + String.format("async task #[" + i + "] failed: timeout expired (%dms)", timeoutMillis)); + } + + if (fu.isSuccess()) { + count++; + results.add(Pair.of(null, fu.getNow())); } else { Throwable cause = fu.cause(); - if (cause == null) { - throw new PException( - String.format( - "async task #[" + i + "] failed: timeout expired (%dms)", timeoutMillis)); + if (forceComplete) { + throw new PException("async task #[" + i + "] failed: " + cause.getMessage(), cause); } - throw new PException("async task #[" + i + "] failed: " + cause.getMessage(), cause); + results.add( + Pair.of( + new PException("async task #[" + i + "] failed: " + cause.getMessage(), cause), + null)); } } + return count; } private List> asyncTasks; diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java index dd4238c6..49832e7e 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java @@ -7,18 +7,15 @@ import com.xiaomi.infra.pegasus.client.request.BatchDelete; import com.xiaomi.infra.pegasus.client.request.BatchGet; import com.xiaomi.infra.pegasus.client.request.BatchSet; -import com.xiaomi.infra.pegasus.client.request.DelRange; import com.xiaomi.infra.pegasus.client.request.Delete; import com.xiaomi.infra.pegasus.client.request.Get; -import com.xiaomi.infra.pegasus.client.request.GetRange; import com.xiaomi.infra.pegasus.client.request.Increment; import com.xiaomi.infra.pegasus.client.request.MultiDelete; import com.xiaomi.infra.pegasus.client.request.MultiGet; import com.xiaomi.infra.pegasus.client.request.MultiSet; +import com.xiaomi.infra.pegasus.client.request.RangeDelete; +import com.xiaomi.infra.pegasus.client.request.RangeGet; import com.xiaomi.infra.pegasus.client.request.Set; -import com.xiaomi.infra.pegasus.client.response.BatchDelResult; -import com.xiaomi.infra.pegasus.client.response.BatchGetResult; -import com.xiaomi.infra.pegasus.client.response.BatchSetResult; import com.xiaomi.infra.pegasus.rpc.*; import com.xiaomi.infra.pegasus.tools.Tools; import java.nio.ByteBuffer; @@ -228,10 +225,10 @@ public byte[] get(String tableName, Get get) throws PException { } @Override - public void batchGet(String tableName, BatchGet batchGet, BatchGetResult batchGetResult) + public void batchGet(String tableName, BatchGet batchGet, List> results) throws PException { PegasusTable tb = getTable(tableName); - tb.batchGet(batchGet, batchGetResult, 0); + tb.batchGet(batchGet, results, 0); } @Override @@ -241,9 +238,9 @@ public MultiGetResult multiGet(String tableName, MultiGet multiGet) throws PExce } @Override - public MultiGetResult getRange(String tableName, GetRange getRange) throws PException { + public MultiGetResult rangeGet(String tableName, RangeGet rangeGet) throws PException { PegasusTable tb = getTable(tableName); - return tb.getRange(getRange, 0); + return tb.rangeGet(rangeGet, 0); } @Override @@ -253,10 +250,10 @@ public void set(String tableName, Set set) throws PException { } @Override - public void batchSet(String tableName, BatchSet batchSet, BatchSetResult batchSetResult) + public void batchSet(String tableName, BatchSet batchSet, List> results) throws PException { PegasusTable tb = getTable(tableName); - tb.batchSet(batchSet, batchSetResult, 0); + tb.batchSet(batchSet, results, 0); } @Override @@ -272,10 +269,11 @@ public void del(String tableName, Delete delete) throws PException { } @Override - public void batchDel(String tableName, BatchDelete batchDelete, BatchDelResult batchDelResult) + public void batchDel( + String tableName, BatchDelete batchDelete, List> results) throws PException { PegasusTable tb = getTable(tableName); - tb.batchDel(batchDelete, batchDelResult, 0); + tb.batchDel(batchDelete, results, 0); } @Override @@ -285,9 +283,9 @@ public void multiDel(String tableName, MultiDelete multiDelete) throws PExceptio } @Override - public void delRange(String tableName, DelRange delRange) throws PException { + public void rangeDelete(String tableName, RangeDelete rangeDelete) throws PException { PegasusTable tb = getTable(tableName); - tb.delRange(delRange, 0); + tb.rangeDelete(rangeDelete, 0); } @Override diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java index 76e1847e..fcb11ba2 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java @@ -7,18 +7,15 @@ import com.xiaomi.infra.pegasus.client.request.BatchDelete; import com.xiaomi.infra.pegasus.client.request.BatchGet; import com.xiaomi.infra.pegasus.client.request.BatchSet; -import com.xiaomi.infra.pegasus.client.request.DelRange; import com.xiaomi.infra.pegasus.client.request.Delete; import com.xiaomi.infra.pegasus.client.request.Get; -import com.xiaomi.infra.pegasus.client.request.GetRange; import com.xiaomi.infra.pegasus.client.request.Increment; import com.xiaomi.infra.pegasus.client.request.MultiDelete; import com.xiaomi.infra.pegasus.client.request.MultiGet; import com.xiaomi.infra.pegasus.client.request.MultiSet; +import com.xiaomi.infra.pegasus.client.request.RangeDelete; +import com.xiaomi.infra.pegasus.client.request.RangeGet; import com.xiaomi.infra.pegasus.client.request.Set; -import com.xiaomi.infra.pegasus.client.response.BatchDelResult; -import com.xiaomi.infra.pegasus.client.response.BatchGetResult; -import com.xiaomi.infra.pegasus.client.response.BatchSetResult; import java.util.*; import org.apache.commons.lang3.tuple.Pair; @@ -106,7 +103,7 @@ public PegasusTableInterface openTable(String tableName, int backupRequestDelayM *

Notice: the method is not atomic, that means, maybe some keys succeed but some keys * failed. */ - public void batchGet(String tableName, BatchGet batchGet, BatchGetResult batchGetResult) + public void batchGet(String tableName, BatchGet batchGet, List> results) throws PException; /** @@ -125,7 +122,7 @@ public void batchGet(String tableName, BatchGet batchGet, BatchGetResult batchGe * @return true if all data is fetched; false if only partial data is fetched. * @throws PException throws exception if any error occurs. */ - public MultiGetResult getRange(String tableName, GetRange getRange) throws PException; + public MultiGetResult rangeGet(String tableName, RangeGet rangeGet) throws PException; /** * Set value. @@ -143,7 +140,7 @@ public void batchGet(String tableName, BatchGet batchGet, BatchGetResult batchGe *

Notice: the method is not atomic, that means, maybe some keys succeed but some keys * failed. */ - public void batchSet(String tableName, BatchSet batchSet, BatchSetResult batchSetResult) + public void batchSet(String tableName, BatchSet batchSet, List> results) throws PException; /** @@ -170,7 +167,8 @@ public void batchSet(String tableName, BatchSet batchSet, BatchSetResult batchSe *

Notice: the method is not atomic, that means, maybe some keys succeed but some keys * failed. */ - public void batchDel(String tableName, BatchDelete batchDelete, BatchDelResult batchDelResult) + public void batchDel( + String tableName, BatchDelete batchDelete, List> results) throws PException; public void multiDel(String tableName, MultiDelete multiDelete) throws PException; @@ -182,7 +180,7 @@ public void batchDel(String tableName, BatchDelete batchDelete, BatchDelResult b * @param tableName table name * @throws PException throws exception if any error occurs. */ - public void delRange(String tableName, DelRange delRange) throws PException; + public void rangeDelete(String tableName, RangeDelete rangeDelete) throws PException; /** * Get ttl time. @@ -194,7 +192,7 @@ public void batchDel(String tableName, BatchDelete batchDelete, BatchDelResult b public int ttl(String tableName, Get get) throws PException; /** - * Atomically increment value. + * Atomically value value. * * @return the new value. * @throws PException throws exception if any error occurs. @@ -635,18 +633,17 @@ public int batchMultiDel2( public int ttl(String tableName, byte[] hashKey, byte[] sortKey) throws PException; /** - * Atomically increment value. + * Atomically value value. * * @param tableName the table name. - * @param hashKey the hash key to increment. - * @param sortKey the sort key to increment. - * @param increment the increment to be added to the old value. + * @param hashKey the hash key to value. + * @param sortKey the sort key to value. + * @param increment the value to be added to the old value. * @param ttlSeconds time to live in seconds for the new value. should be no less than -1. for the * second method, the ttlSeconds is 0. - if ttlSeconds == 0, the semantic is the same as - * redis: - normally, increment will preserve the original ttl. - if old data is expired by - * ttl, then set initial value to 0 and set no ttl. - if ttlSeconds > 0, then update with the - * new ttl if increment succeed. - if ttlSeconds == -1, then update to no ttl if increment - * succeed. + * redis: - normally, value will preserve the original ttl. - if old data is expired by ttl, + * then set initial value to 0 and set no ttl. - if ttlSeconds > 0, then update with the new + * ttl if value succeed. - if ttlSeconds == -1, then update to no ttl if value succeed. * @return the new value. * @throws PException throws exception if any error occurs. */ diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java index 15f64e43..a26d19bb 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java @@ -7,6 +7,18 @@ import com.xiaomi.infra.pegasus.base.blob; import com.xiaomi.infra.pegasus.base.error_code; import com.xiaomi.infra.pegasus.base.gpid; +import com.xiaomi.infra.pegasus.client.request.BatchDelete; +import com.xiaomi.infra.pegasus.client.request.BatchGet; +import com.xiaomi.infra.pegasus.client.request.BatchSet; +import com.xiaomi.infra.pegasus.client.request.Delete; +import com.xiaomi.infra.pegasus.client.request.Get; +import com.xiaomi.infra.pegasus.client.request.Increment; +import com.xiaomi.infra.pegasus.client.request.MultiDelete; +import com.xiaomi.infra.pegasus.client.request.MultiGet; +import com.xiaomi.infra.pegasus.client.request.MultiSet; +import com.xiaomi.infra.pegasus.client.request.RangeDelete; +import com.xiaomi.infra.pegasus.client.request.RangeGet; +import com.xiaomi.infra.pegasus.client.request.Set; import com.xiaomi.infra.pegasus.operator.*; import com.xiaomi.infra.pegasus.rpc.ReplicationException; import com.xiaomi.infra.pegasus.rpc.Table; @@ -43,6 +55,443 @@ public PegasusTable(PegasusClient client, Table table) { this.metaList = client.getMetaList(); } + @Override + public Future asyncExist(Get get, int timeout) { + final DefaultPromise promise = table.newPromise(); + asyncTTL(get, timeout) + .addListener( + new TTLListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (future.isSuccess()) { + promise.setSuccess(future.get() != -2); + } else { + promise.setFailure(future.cause()); + } + } + }); + return promise; + } + + @Override + public Future asyncTTL(Get get, int timeout) { + final DefaultPromise promise = table.newPromise(); + blob request = new blob(PegasusClient.generateKey(get.hashKey, get.sortKey)); + + long partitionHash = table.getHash(request.data); + gpid pid = table.getGpidByHash(partitionHash); + rrdb_ttl_operator op = new rrdb_ttl_operator(pid, table.getTableName(), request, partitionHash); + + table.asyncOperate( + op, + new Table.ClientOPCallback() { + @Override + public void onCompletion(client_operator clientOP) { + rrdb_ttl_operator op2 = (rrdb_ttl_operator) clientOP; + if (op2.rpc_error.errno != error_code.error_types.ERR_OK) { + handleReplicaException( + new Request(get.hashKey, get.sortKey), promise, op, table, timeout); + } else if (op2.get_response().error != 0 && op2.get_response().error != 1) { + promise.setFailure(new PException("rocksdb error: " + op2.get_response().error)); + } else { + // On success: ttl time in seconds; -1 if no ttl set; -2 if not exist. + // If not exist, the error code of rpc response is kNotFound(1). + promise.setSuccess( + op2.get_response().error == 1 ? -2 : op2.get_response().ttl_seconds); + } + } + }, + timeout); + return promise; + } + + @Override + public Future asyncGet(Get get, int timeout) { + final DefaultPromise promise = table.newPromise(); + blob request = new blob(PegasusClient.generateKey(get.hashKey, get.sortKey)); + long partitionHash = table.getHash(request.data); + gpid gpid = table.getGpidByHash(partitionHash); + rrdb_get_operator op = + new rrdb_get_operator(gpid, table.getTableName(), request, partitionHash); + Table.ClientOPCallback callback = + new Table.ClientOPCallback() { + @Override + public void onCompletion(client_operator clientOP) { + rrdb_get_operator gop = (rrdb_get_operator) clientOP; + if (gop.rpc_error.errno != error_code.error_types.ERR_OK) { + handleReplicaException( + new Request(get.hashKey, get.sortKey), promise, op, table, timeout); + } else if (gop.get_response().error == 1) { // rocksdb::kNotFound + promise.setSuccess(null); + } else if (gop.get_response().error != 0) { + promise.setFailure(new PException("rocksdb error: " + gop.get_response().error)); + } else { + promise.setSuccess(gop.get_response().value.data); + } + } + }; + + table.asyncOperate(op, callback, timeout); + return promise; + } + + @Override + public Future asyncRangeGet(RangeGet rangeGet, int timeout) throws PException { + final DefaultPromise promise = table.newPromise(); + + blob hashKeyBlob = new blob(rangeGet.hashKey); + blob startSortKeyBlob = + (rangeGet.startSortKey == null ? null : new blob(rangeGet.startSortKey)); + blob stopSortKeyBlob = (rangeGet.stopSortKey == null ? null : new blob(rangeGet.stopSortKey)); + blob sortKeyFilterPatternBlob = + (rangeGet.externOptions.sortKeyFilterPattern == null + ? null + : new blob(rangeGet.externOptions.sortKeyFilterPattern)); + + multi_get_request request = + new multi_get_request( + hashKeyBlob, + null, + rangeGet.maxFetchCount, + rangeGet.maxFetchSize, + rangeGet.externOptions.noValue, + startSortKeyBlob, + stopSortKeyBlob, + rangeGet.externOptions.startInclusive, + rangeGet.externOptions.stopInclusive, + filter_type.findByValue(rangeGet.externOptions.sortKeyFilterType.getValue()), + sortKeyFilterPatternBlob, + rangeGet.externOptions.reverse); + long partitionHash = table.getKeyHash(request.hash_key.data); + gpid gpid = table.getGpidByHash(partitionHash); + rrdb_multi_get_operator op = + new rrdb_multi_get_operator(gpid, table.getTableName(), request, partitionHash); + + table.asyncOperate( + op, + new Table.ClientOPCallback() { + @Override + public void onCompletion(client_operator clientOP) { + rrdb_multi_get_operator gop = (rrdb_multi_get_operator) clientOP; + if (gop.rpc_error.errno != error_code.error_types.ERR_OK) { + handleReplicaException( + new Request(rangeGet.hashKey, rangeGet.maxFetchCount), + promise, + op, + table, + timeout); + } else if (gop.get_response().error != 0 && gop.get_response().error != 7) { + // rocksdb::Status::kOk && rocksdb::Status::kIncomplete + promise.setFailure(new PException("rocksdb error: " + gop.get_response().error)); + } else { + MultiGetResult result = new MultiGetResult(); + result.allFetched = (gop.get_response().error == 0); + result.values = new ArrayList>(gop.get_response().kvs.size()); + for (key_value kv : gop.get_response().kvs) { + result.values.add(new ImmutablePair(kv.key.data, kv.value.data)); + } + promise.setSuccess(result); + } + } + }, + timeout); + return promise; + } + + @Override + public Future asyncMultiGet(MultiGet multiGet, int timeout) { + final DefaultPromise promise = table.newPromise(); + + blob hashKeyBlob = new blob(multiGet.hashKey); + List sortKeyBlobs = new ArrayList(); + Map setKeyMap = null; + + if (multiGet.sortKeys != null && multiGet.sortKeys.size() > 0) { + setKeyMap = new TreeMap(); + for (int i = 0; i < multiGet.sortKeys.size(); i++) { + byte[] sortKey = multiGet.sortKeys.get(i); + if (sortKey == null) { + promise.setFailure( + new PException("Invalid parameter: sortKeys[" + i + "] should not be null")); + return promise; + } + setKeyMap.put(ByteBuffer.wrap(sortKey), sortKey); + } + for (Map.Entry entry : setKeyMap.entrySet()) { + sortKeyBlobs.add(new blob(entry.getValue())); + } + } + + multi_get_request request = + new multi_get_request( + hashKeyBlob, + sortKeyBlobs, + multiGet.maxFetchCount, + multiGet.maxFetchCount, + multiGet.noValue, + null, + null, + true, + false, + filter_type.FT_NO_FILTER, + null, + false); + long partitionHash = table.getKeyHash(request.hash_key.data); + gpid gpid = table.getGpidByHash(partitionHash); + rrdb_multi_get_operator op = + new rrdb_multi_get_operator(gpid, table.getTableName(), request, partitionHash); + final Map finalSetKeyMap = setKeyMap; + + table.asyncOperate( + op, + new Table.ClientOPCallback() { + @Override + public void onCompletion(client_operator clientOP) { + rrdb_multi_get_operator gop = (rrdb_multi_get_operator) clientOP; + if (gop.rpc_error.errno != error_code.error_types.ERR_OK) { + handleReplicaException( + new Request(multiGet.hashKey, sortKeyBlobs.size()), promise, op, table, timeout); + } else if (gop.get_response().error != 0 && gop.get_response().error != 7) { + // rocksdb::Status::kOk && rocksdb::Status::kIncomplete + promise.setFailure(new PException("rocksdb error: " + gop.get_response().error)); + } else { + MultiGetResult result = new MultiGetResult(); + result.allFetched = (gop.get_response().error == 0); + result.values = new ArrayList>(gop.get_response().kvs.size()); + if (finalSetKeyMap == null) { + for (key_value kv : gop.get_response().kvs) { + result.values.add(new ImmutablePair(kv.key.data, kv.value.data)); + } + } else { + for (key_value kv : gop.get_response().kvs) { + byte[] sortKey = finalSetKeyMap.get(ByteBuffer.wrap(kv.key.data)); + if (sortKey != null) { + result.values.add(new ImmutablePair(sortKey, kv.value.data)); + } + } + } + promise.setSuccess(result); + } + } + }, + timeout); + return promise; + } + + @Override + public Future asyncSet(Set set, int timeout) { + final DefaultPromise promise = table.newPromise(); + + try { + writeLimiter.validateSingleSet(set.hashKey, set.sortKey, set.value); + } catch (IllegalArgumentException e) { + handleWriteLimiterException(promise, e.getMessage()); + return promise; + } + + blob k = new blob(PegasusClient.generateKey(set.hashKey, set.sortKey)); + blob v = new blob(set.value); + int expireSeconds = (set.ttlSeconds == 0 ? 0 : set.ttlSeconds + (int) Tools.epoch_now()); + update_request req = new update_request(k, v, expireSeconds); + + long partitionHash = table.getHash(k.data); + gpid gpid = table.getGpidByHash(partitionHash); + rrdb_put_operator op = new rrdb_put_operator(gpid, table.getTableName(), req, partitionHash); + table.asyncOperate( + op, + new Table.ClientOPCallback() { + @Override + public void onCompletion(client_operator clientOP) { + rrdb_put_operator gop = (rrdb_put_operator) clientOP; + if (gop.rpc_error.errno != error_code.error_types.ERR_OK) { + handleReplicaException( + new Request(set.hashKey, set.sortKey), promise, op, table, timeout); + } else if (gop.get_response().error != 0) { + promise.setFailure(new PException("rocksdb error: " + gop.get_response().error)); + } else { + promise.setSuccess(null); + } + } + }, + timeout); + return promise; + } + + @Override + public Future asyncMultiSet(MultiSet multiSet, int timeout) { + final DefaultPromise promise = table.newPromise(); + if (multiSet.values == null || multiSet.values.size() == 0) { + promise.setFailure(new PException("Invalid parameter: values should not be null or empty")); + return promise; + } + + try { + writeLimiter.validateMultiSet(multiSet.hashKey, multiSet.values); + } catch (IllegalArgumentException e) { + handleWriteLimiterException(promise, e.getMessage()); + return promise; + } + + blob hash_key_blob = new blob(multiSet.hashKey); + List values_blob = new ArrayList(); + for (int i = 0; i < multiSet.values.size(); i++) { + byte[] k = multiSet.values.get(i).getKey(); + if (k == null) { + promise.setFailure( + new PException("Invalid parameter: values[" + i + "].key should not be null")); + return promise; + } + byte[] v = multiSet.values.get(i).getValue(); + if (v == null) { + promise.setFailure( + new PException("Invalid parameter: values[" + i + "].value should not be null")); + return promise; + } + values_blob.add(new key_value(new blob(k), new blob(v))); + } + int expireTsSseconds = + (multiSet.ttlSeconds == 0 ? 0 : multiSet.ttlSeconds + (int) Tools.epoch_now()); + multi_put_request request = new multi_put_request(hash_key_blob, values_blob, expireTsSseconds); + + long partitionHash = table.getKeyHash(multiSet.hashKey); + gpid gpid = table.getGpidByHash(partitionHash); + rrdb_multi_put_operator op = + new rrdb_multi_put_operator(gpid, table.getTableName(), request, partitionHash); + + table.asyncOperate( + op, + new Table.ClientOPCallback() { + @Override + public void onCompletion(client_operator clientOP) { + rrdb_multi_put_operator op2 = (rrdb_multi_put_operator) clientOP; + if (op2.rpc_error.errno != error_code.error_types.ERR_OK) { + handleReplicaException( + new Request(multiSet.hashKey, values_blob.size()), promise, op, table, timeout); + } else if (op2.get_response().error != 0) { + promise.setFailure(new PException("rocksdb error: " + op2.get_response().error)); + } else { + promise.setSuccess(null); + } + } + }, + timeout); + return promise; + } + + @Override + public Future asyncDel(Delete delete, int timeout) { + final DefaultPromise promise = table.newPromise(); + blob request = new blob(PegasusClient.generateKey(delete.hashKey, delete.sortKey)); + long partitionHash = table.getHash(request.data); + gpid gpid = table.getGpidByHash(partitionHash); + rrdb_remove_operator op = + new rrdb_remove_operator(gpid, table.getTableName(), request, partitionHash); + + table.asyncOperate( + op, + new Table.ClientOPCallback() { + @Override + public void onCompletion(client_operator clientOP) { + rrdb_remove_operator op2 = (rrdb_remove_operator) clientOP; + if (op2.rpc_error.errno != error_code.error_types.ERR_OK) { + handleReplicaException( + new Request(delete.hashKey, delete.sortKey), promise, op, table, timeout); + } else if (op2.get_response().error != 0) { + promise.setFailure(new PException("rocksdb error: " + op2.get_response().error)); + } else { + promise.setSuccess(null); + } + } + }, + timeout); + return promise; + } + + @Override + public Future asyncMultiDel(MultiDelete multiDelete, int timeout) { + final DefaultPromise promise = table.newPromise(); + if (multiDelete.sortKeys == null || multiDelete.sortKeys.isEmpty()) { + promise.setFailure(new PException("Invalid parameter: sortKeys size should be at lease 1")); + return promise; + } + + List sortKeyBlobs = new ArrayList(multiDelete.sortKeys.size()); + for (int i = 0; i < multiDelete.sortKeys.size(); i++) { + byte[] sortKey = multiDelete.sortKeys.get(i); + if (sortKey == null) { + promise.setFailure( + new PException("Invalid parameter: sortKeys[" + i + "] should not be null")); + return promise; + } + sortKeyBlobs.add(new blob(sortKey)); + } + multi_remove_request request = + new multi_remove_request(new blob(multiDelete.hashKey), sortKeyBlobs, 100); + + long partitionHash = table.getKeyHash(multiDelete.hashKey); + gpid pid = table.getGpidByHash(partitionHash); + rrdb_multi_remove_operator op = + new rrdb_multi_remove_operator(pid, table.getTableName(), request, partitionHash); + + table.asyncOperate( + op, + new Table.ClientOPCallback() { + public void onCompletion(client_operator clientOP) { + rrdb_multi_remove_operator op2 = (rrdb_multi_remove_operator) clientOP; + if (op2.rpc_error.errno != error_code.error_types.ERR_OK) { + handleReplicaException( + new Request(multiDelete.hashKey, sortKeyBlobs.size()), + promise, + op, + table, + timeout); + } else if (op2.get_response().error != 0) { + promise.setFailure(new PException("rocksdb error: " + op2.get_response().error)); + } else { + Validate.isTrue(op2.get_response().count == multiDelete.sortKeys.size()); + promise.setSuccess(null); + } + } + }, + timeout); + return promise; + } + + @Override + public Future asyncIncr(Increment increment, int timeout) { + final DefaultPromise promise = table.newPromise(); + + blob key = new blob(PegasusClient.generateKey(increment.hashKey, increment.sortKey)); + int expireSeconds = + (increment.ttlSeconds <= 0 + ? increment.ttlSeconds + : increment.ttlSeconds + (int) Tools.epoch_now()); + incr_request request = new incr_request(key, increment.value, expireSeconds); + long partitionHash = table.getHash(request.key.data); + gpid gpid = table.getGpidByHash(partitionHash); + rrdb_incr_operator op = + new rrdb_incr_operator(gpid, table.getTableName(), request, partitionHash); + + table.asyncOperate( + op, + new Table.ClientOPCallback() { + @Override + public void onCompletion(client_operator clientOP) { + rrdb_incr_operator op2 = (rrdb_incr_operator) clientOP; + if (op2.rpc_error.errno != error_code.error_types.ERR_OK) { + handleReplicaException( + new Request(increment.hashKey, increment.sortKey), promise, op, table, timeout); + } else if (op2.get_response().error != 0) { + promise.setFailure(new PException("rocksdb error: " + op2.get_response().error)); + } else { + promise.setSuccess(op2.get_response().new_value); + } + } + }, + timeout); + return promise; + } + @Override public Future asyncExist(byte[] hashKey, byte[] sortKey, int timeout) { final DefaultPromise promise = table.newPromise(); @@ -914,6 +1363,310 @@ public void onCompletion(client_operator clientOP) { return promise; } + @Override + public boolean exist(Get get, int timeout) throws PException { + if (timeout <= 0) timeout = defaultTimeout; + try { + return asyncExist(get, timeout).get(timeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw PException.threadInterrupted(table.getTableName(), e); + } catch (TimeoutException e) { + throw PException.timeout( + metaList, table.getTableName(), new Request(get.hashKey, get.sortKey), timeout, e); + } catch (ExecutionException e) { + throw new PException(e); + } + } + + @Override + public byte[] get(Get get, int timeout) throws PException { + if (timeout <= 0) timeout = defaultTimeout; + try { + return asyncGet(get, timeout).get(timeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw PException.threadInterrupted(table.getTableName(), e); + } catch (TimeoutException e) { + throw PException.timeout( + metaList, table.getTableName(), new Request(get.hashKey, get.sortKey), timeout, e); + } catch (ExecutionException e) { + throw new PException(e); + } + } + + @Override + public void batchGet(BatchGet batchGet, List> results, int timeout) + throws PException { + results.clear(); + FutureGroup futureGroup = + new FutureGroup<>(batchGet.getList.size(), batchGet.forceComplete); + for (Get get : batchGet.getList) { + futureGroup.add(asyncGet(get, timeout)); + } + futureGroup.waitAllComplete(results, timeout); + } + + @Override + public MultiGetResult multiGet(MultiGet multiGet, int timeout) throws PException { + if (timeout <= 0) timeout = defaultTimeout; + try { + return asyncMultiGet(multiGet, timeout).get(timeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw PException.threadInterrupted(table.getTableName(), e); + } catch (TimeoutException e) { + throw PException.timeout( + metaList, + table.getTableName(), + new Request(multiGet.hashKey, multiGet.sortKeys.size()), + timeout, + e); + } catch (ExecutionException e) { + throw new PException(e); + } + } + + @Override + public MultiGetResult rangeGet(RangeGet rangeGet, int timeout) throws PException { + if (timeout <= 0) timeout = defaultTimeout; + try { + return asyncRangeGet(rangeGet, timeout).get(timeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw PException.threadInterrupted(table.getTableName(), e); + } catch (TimeoutException e) { + throw PException.timeout( + metaList, + table.getTableName(), + new Request(rangeGet.hashKey, rangeGet.maxFetchCount), + timeout, + e); + } catch (ExecutionException e) { + throw new PException(e); + } + } + + @Override + public void set(Set set, int timeout) throws PException { + if (timeout <= 0) timeout = defaultTimeout; + try { + asyncSet(set, timeout).get(timeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw PException.threadInterrupted(table.getTableName(), e); + } catch (TimeoutException e) { + throw PException.timeout( + metaList, table.getTableName(), new Request(set.hashKey, set.sortKey), timeout, e); + } catch (ExecutionException e) { + throw new PException(e); + } + } + + @Override + public int batchSet(BatchSet batchSet, List> results, int timeout) + throws PException { + if (results == null) { + throw new PException("Invalid parameter: results should not be null"); + } + results.clear(); + FutureGroup futureGroup = + new FutureGroup<>(batchSet.setList.size(), batchSet.forceComplete); + + for (Set set : batchSet.setList) { + futureGroup.add(asyncSet(set, timeout)); + } + return futureGroup.waitAllComplete(results, timeout); + } + + @Override + public void multiSet(MultiSet multiSet, int timeout) throws PException { + if (timeout <= 0) timeout = defaultTimeout; + try { + asyncMultiSet(multiSet, timeout).get(timeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw PException.threadInterrupted(table.getTableName(), e); + } catch (TimeoutException e) { + throw PException.timeout( + metaList, + table.getTableName(), + new Request(multiSet.hashKey, multiSet.values.size()), + timeout, + e); + } catch (ExecutionException e) { + throw new PException(e); + } + } + + @Override + public void del(Delete delete, int timeout) throws PException { + if (timeout <= 0) timeout = defaultTimeout; + try { + asyncDel(delete, timeout).get(timeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw PException.threadInterrupted(table.getTableName(), e); + } catch (TimeoutException e) { + throw PException.timeout( + metaList, table.getTableName(), new Request(delete.hashKey, delete.sortKey), timeout, e); + } catch (ExecutionException e) { + throw new PException(e); + } + } + + @Override + public int batchDel(BatchDelete batchDelete, List> results, int timeout) + throws PException { + + FutureGroup futureGroup = + new FutureGroup<>(batchDelete.deleteList.size(), batchDelete.forceComplete); + + for (Delete delete : batchDelete.deleteList) { + futureGroup.add(asyncDel(delete, timeout)); + } + + return futureGroup.waitAllComplete(results, timeout); + } + + @Override + public void multiDel(MultiDelete multiDelete, int timeout) throws PException { + if (timeout <= 0) timeout = defaultTimeout; + try { + asyncMultiDel(multiDelete, timeout).get(timeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw PException.threadInterrupted(table.getTableName(), e); + } catch (TimeoutException e) { + throw PException.timeout( + metaList, + table.getTableName(), + new Request(multiDelete.hashKey, multiDelete.sortKeys.size()), + timeout, + e); + } catch (ExecutionException e) { + throw new PException(e); + } + } + + @Override + public void rangeDelete(RangeDelete rangeDelete, int timeout) throws PException { + + if (timeout <= 0) timeout = defaultTimeout; + long startTime = System.currentTimeMillis(); + long lastCheckTime = startTime; + long deadlineTime = startTime + timeout; + int count = 0; + final int maxBatchDelCount = 100; + + ScanOptions scanOptions = new ScanOptions(); + scanOptions.noValue = true; + scanOptions.startInclusive = rangeDelete.options.startInclusive; + scanOptions.stopInclusive = rangeDelete.options.stopInclusive; + scanOptions.sortKeyFilterType = rangeDelete.options.sortKeyFilterType; + scanOptions.sortKeyFilterPattern = rangeDelete.options.sortKeyFilterPattern; + + rangeDelete.options.nextSortKey = rangeDelete.startSortKey; + PegasusScannerInterface pegasusScanner = + getScanner( + rangeDelete.hashKey, rangeDelete.startSortKey, rangeDelete.stopSortKey, scanOptions); + lastCheckTime = System.currentTimeMillis(); + if (lastCheckTime >= deadlineTime) { + throw new PException( + "Getting pegasusScanner takes too long time when delete hashKey:" + + new String(rangeDelete.hashKey) + + ",startSortKey:" + + new String(rangeDelete.startSortKey) + + ",stopSortKey:" + + new String(rangeDelete.stopSortKey) + + ",timeUsed:" + + (lastCheckTime - startTime) + + ":", + new ReplicationException(error_code.error_types.ERR_TIMEOUT)); + } + + int remainingTime = (int) (deadlineTime - lastCheckTime); + List sortKeys = new ArrayList(); + try { + Pair, byte[]> pairs; + while ((pairs = pegasusScanner.next()) != null) { + sortKeys.add(pairs.getKey().getValue()); + if (sortKeys.size() == maxBatchDelCount) { + rangeDelete.options.nextSortKey = sortKeys.get(0); + asyncMultiDel(rangeDelete.hashKey, sortKeys, remainingTime) + .get(remainingTime, TimeUnit.MILLISECONDS); + lastCheckTime = System.currentTimeMillis(); + remainingTime = (int) (deadlineTime - lastCheckTime); + if (remainingTime <= 0) { + throw new TimeoutException(); + } + count++; + sortKeys.clear(); + } + } + if (!sortKeys.isEmpty()) { + asyncMultiDel(new MultiDelete(rangeDelete.hashKey, sortKeys), remainingTime) + .get(remainingTime, TimeUnit.MILLISECONDS); + rangeDelete.options.nextSortKey = null; + } + } catch (InterruptedException | ExecutionException e) { + String nextSortKeyStr = + rangeDelete.options.nextSortKey == null + ? "" + : new String(rangeDelete.options.nextSortKey); + throw new PException( + "RangeDelete of hashKey:" + + new String(rangeDelete.hashKey) + + " from sortKey:" + + nextSortKeyStr + + "[index:" + + count * maxBatchDelCount + + "]" + + " failed:", + e); + } catch (TimeoutException e) { + String sortKey = sortKeys.isEmpty() ? null : new String(sortKeys.get(0)); + int timeUsed = (int) (System.currentTimeMillis() - startTime); + throw new PException( + "RangeDelete of hashKey:" + + new String(rangeDelete.hashKey) + + " from sortKey:" + + sortKey + + "[index:" + + count * maxBatchDelCount + + "]" + + " failed, timeUsed:" + + timeUsed, + new ReplicationException(error_code.error_types.ERR_TIMEOUT)); + } + } + + @Override + public int ttl(Get get, int timeout) throws PException { + if (timeout <= 0) timeout = defaultTimeout; + try { + return asyncTTL(get, timeout).get(timeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw PException.threadInterrupted(table.getTableName(), e); + } catch (TimeoutException e) { + throw PException.timeout( + metaList, table.getTableName(), new Request(get.hashKey, get.sortKey), timeout, e); + } catch (ExecutionException e) { + throw new PException(e); + } + } + + @Override + public long incr(Increment increment, int timeout) throws PException { + if (timeout <= 0) timeout = defaultTimeout; + try { + return asyncIncr(increment, timeout).get(timeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw PException.threadInterrupted(table.getTableName(), e); + } catch (TimeoutException e) { + throw PException.timeout( + metaList, + table.getTableName(), + new Request(increment.hashKey, increment.sortKey), + timeout, + e); + } catch (ExecutionException e) { + throw new PException(e); + } + } + @Override public boolean exist(byte[] hashKey, byte[] sortKey, int timeout) throws PException { if (timeout <= 0) timeout = defaultTimeout; @@ -1241,7 +1994,7 @@ public void batchSet(List items, int timeout) throws PException { for (SetItem i : items) { group.add(asyncSet(i.hashKey, i.sortKey, i.value, i.ttlSeconds, timeout)); } - group.waitAllCompleteOrOneFail(timeout); + group.waitAllComplete(timeout); } @Override diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTableInterface.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTableInterface.java index 897ece10..1656a8af 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTableInterface.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTableInterface.java @@ -6,18 +6,15 @@ import com.xiaomi.infra.pegasus.client.request.BatchDelete; import com.xiaomi.infra.pegasus.client.request.BatchGet; import com.xiaomi.infra.pegasus.client.request.BatchSet; -import com.xiaomi.infra.pegasus.client.request.DelRange; import com.xiaomi.infra.pegasus.client.request.Delete; import com.xiaomi.infra.pegasus.client.request.Get; -import com.xiaomi.infra.pegasus.client.request.GetRange; import com.xiaomi.infra.pegasus.client.request.Increment; import com.xiaomi.infra.pegasus.client.request.MultiDelete; import com.xiaomi.infra.pegasus.client.request.MultiGet; import com.xiaomi.infra.pegasus.client.request.MultiSet; +import com.xiaomi.infra.pegasus.client.request.RangeDelete; +import com.xiaomi.infra.pegasus.client.request.RangeGet; import com.xiaomi.infra.pegasus.client.request.Set; -import com.xiaomi.infra.pegasus.client.response.BatchDelResult; -import com.xiaomi.infra.pegasus.client.response.BatchGetResult; -import com.xiaomi.infra.pegasus.client.response.BatchSetResult; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; import java.util.List; @@ -87,6 +84,27 @@ public static interface ExistListener extends GenericFutureListener future) throws Exception; } + public Future asyncExist(Get get, int timeout /*ms*/); + + public Future asyncTTL(Get get, int timeout /*ms*/); + + public Future asyncGet(Get get, int timeout /*ms*/); + + public Future asyncRangeGet(RangeGet rangeGet, int timeout /*ms*/) + throws PException; + + public Future asyncMultiGet(MultiGet multiGet, int timeout /*ms*/); + + public Future asyncMultiSet(MultiSet multiSet, int timeout /*ms*/); + + public Future asyncSet(Set set, int timeout /*ms*/); + + public Future asyncDel(Delete delete, int timeout /*ms*/); + + public Future asyncMultiDel(MultiDelete multiDelete, int timeout /*ms*/); + + public Future asyncIncr(Increment increment, int timeout /*ms*/); + /** * Check value existence for a specific (hashKey, sortKey) pair of current table, async version * @@ -481,17 +499,17 @@ public static interface IncrListener extends GenericFutureListener> } /** - * atomically increment value by key, async version + * atomically value value by key, async version * - * @param hashKey the hash key to increment. - * @param sortKey the sort key to increment. - * @param increment the increment to be added to the old value. + * @param hashKey the hash key to value. + * @param sortKey the sort key to value. + * @param increment the value to be added to the old value. * @param ttlSeconds time to live in seconds for the new value. for the second method, the * ttlSeconds is 0. should be no less than -1. for the second method, the ttlSeconds is 0. - - * if ttlSeconds == 0, the semantic is the same as redis: - normally, increment will preserve - * the original ttl. - if old data is expired by ttl, then set initial value to 0 and set no - * ttl. - if ttlSeconds > 0, then update with the new ttl if increment succeed. - if - * ttlSeconds == -1, then update to no ttl if increment succeed. + * if ttlSeconds == 0, the semantic is the same as redis: - normally, value will preserve the + * original ttl. - if old data is expired by ttl, then set initial value to 0 and set no ttl. + * - if ttlSeconds > 0, then update with the new ttl if value succeed. - if ttlSeconds == -1, + * then update to no ttl if value succeed. * @param timeout how long will the operation timeout in milliseconds. if timeout > 0, it is a * timeout value for current op, else the timeout value in the configuration file will be * used. @@ -748,7 +766,7 @@ public static interface TTLListener extends GenericFutureListenerNotice: the method is not atomic, that means, maybe some keys succeed but some keys * failed. */ - public void batchGet(BatchGet batchGet, BatchGetResult batchGetResult, int timeout) + public void batchGet(BatchGet batchGet, List> results, int timeout) throws PException; /** @@ -765,15 +783,7 @@ public void batchGet(BatchGet batchGet, BatchGetResult batchGetResult, int timeo * @return true if all data is fetched; false if only partial data is fetched. * @throws PException throws exception if any error occurs. */ - public MultiGetResult getRange(GetRange getRange, int timeout) throws PException; - - /** - * Get multiple sort keys under the same hash key. - * - * @return true if all data is fetched; false if only partial data is fetched. - * @throws PException throws exception if any error occurs. - */ - public boolean multiGetSortKeys(MultiGet multiGet, int timeout) throws PException; + public MultiGetResult rangeGet(RangeGet rangeGet, int timeout) throws PException; /** * Set value. @@ -789,7 +799,7 @@ public void batchGet(BatchGet batchGet, BatchGetResult batchGetResult, int timeo *

Notice: the method is not atomic, that means, maybe some keys succeed but some keys * failed. */ - public void batchSet(BatchSet batchSet, BatchSetResult batchSetResult, int timeout) + public int batchSet(BatchSet batchSet, List> results, int timeout) throws PException; /** @@ -813,7 +823,7 @@ public void batchSet(BatchSet batchSet, BatchSetResult batchSetResult, int timeo *

Notice: the method is not atomic, that means, maybe some keys succeed but some keys * failed. */ - public void batchDel(BatchDelete batchDelete, BatchDelResult batchDelResult, int timeout) + public int batchDel(BatchDelete batchDelete, List> results, int timeout) throws PException; public void multiDel(MultiDelete multiDelete, int timeout) throws PException; @@ -824,7 +834,7 @@ public void batchDel(BatchDelete batchDelete, BatchDelResult batchDelResult, int * * @throws PException throws exception if any error occurs. */ - public void delRange(DelRange delRange, int timeout) throws PException; + public void rangeDelete(RangeDelete rangeDelete, int timeout) throws PException; /** * Get ttl time. @@ -835,7 +845,7 @@ public void batchDel(BatchDelete batchDelete, BatchDelResult batchDelResult, int public int ttl(Get get, int timeout) throws PException; /** - * Atomically increment value. + * Atomically value value. * * @return the new value. * @throws PException throws exception if any error occurs. diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/BatchDelete.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/BatchDelete.java index 40637656..379aae5f 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/request/BatchDelete.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/BatchDelete.java @@ -1,5 +1,24 @@ package com.xiaomi.infra.pegasus.client.request; +import java.util.ArrayList; +import java.util.List; + public class BatchDelete { + public boolean forceComplete; + + public List deleteList = new ArrayList<>(); + + public BatchDelete(boolean forceComplete) { + this.forceComplete = forceComplete; + } + + public BatchDelete(boolean forceComplete, List deleteList) { + this.forceComplete = forceComplete; + this.deleteList = deleteList; + } + + public void add(Delete delete) { + deleteList.add(delete); + } } diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/BatchGet.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/BatchGet.java index 8268e6c3..56e5e236 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/request/BatchGet.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/BatchGet.java @@ -4,20 +4,20 @@ import java.util.List; public class BatchGet { - boolean forceComplete; + public boolean forceComplete; - public List multiGetList = new ArrayList<>(); - public List getList = new ArrayList<>(); + public List getList = new ArrayList<>(); - public BatchGet(boolean forceComplete) { - this.forceComplete = forceComplete; - } + public BatchGet(boolean forceComplete) { + this.forceComplete = forceComplete; + } - public void add(MultiGet multiGet){ - multiGetList.add(multiGet); - } + public BatchGet(boolean forceComplete, List getList) { + this.forceComplete = forceComplete; + this.getList = getList; + } - public void add(Get get){ - getList.add(get); - } + public void add(Get get) { + getList.add(get); + } } diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/BatchSet.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/BatchSet.java index e5b76cec..d6ad6868 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/request/BatchSet.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/BatchSet.java @@ -4,15 +4,19 @@ import java.util.List; public class BatchSet { - public List multiSetList = new ArrayList<>(); - public List setList = new ArrayList<>(); + public boolean forceComplete; + public List setList = new ArrayList<>(); - public void add(MultiSet multiSet){ - multiSetList.add(multiSet); - } + public BatchSet(boolean forceComplete) { + this.forceComplete = forceComplete; + } - public void add(Set set){ - setList.add(set); - } + public BatchSet(boolean forceComplete, List setList) { + this.forceComplete = forceComplete; + this.setList = setList; + } + public void add(Set set) { + setList.add(set); + } } diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/DelRange.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/DelRange.java deleted file mode 100644 index a325b058..00000000 --- a/src/main/java/com/xiaomi/infra/pegasus/client/request/DelRange.java +++ /dev/null @@ -1,12 +0,0 @@ -package com.xiaomi.infra.pegasus.client.request; - -import com.xiaomi.infra.pegasus.client.DelRangeOptions; - -public class DelRange { - - byte[] hashKey; - byte[] startSortKey; - byte[] stopSortKey; - DelRangeOptions options; - -} diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/Delete.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/Delete.java index b0139c65..07a83a9d 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/request/Delete.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/Delete.java @@ -1,12 +1,12 @@ package com.xiaomi.infra.pegasus.client.request; -public class Delete extends Key{ +public class Delete extends Key { - public Delete(byte[] hashKey) { - super(hashKey); - } + public Delete(byte[] hashKey) { + super(hashKey); + } - public Delete(byte[] hashKey, byte[] sortKey) { - super(hashKey, sortKey); - } + public Delete(byte[] hashKey, byte[] sortKey) { + super(hashKey, sortKey); + } } diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/Get.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/Get.java index af90ee96..92579139 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/request/Get.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/Get.java @@ -1,12 +1,12 @@ package com.xiaomi.infra.pegasus.client.request; -public class Get extends Key{ +public class Get extends Key { - public Get(byte[] hashKey) { - super(hashKey); - } + public Get(byte[] hashKey) { + super(hashKey); + } - public Get(byte[] hashKey, byte[] sortKey) { - super(hashKey,sortKey); - } + public Get(byte[] hashKey, byte[] sortKey) { + super(hashKey, sortKey); + } } diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/GetRange.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/GetRange.java deleted file mode 100644 index a0cd8bf0..00000000 --- a/src/main/java/com/xiaomi/infra/pegasus/client/request/GetRange.java +++ /dev/null @@ -1,14 +0,0 @@ -package com.xiaomi.infra.pegasus.client.request; - -import com.xiaomi.infra.pegasus.client.MultiGetOptions; - -public class GetRange { - byte[] hashKey; - byte[] startSortKey; - byte[] stopSortKey; - int maxFetchCount; - int maxFetchSize; - - MultiGetOptions options; - -} diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/Increment.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/Increment.java index 5e80c196..3743092d 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/request/Increment.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/Increment.java @@ -1,16 +1,17 @@ package com.xiaomi.infra.pegasus.client.request; -public class Increment extends Key{ - int increment = 1; - int ttlSeconds = 0; +public class Increment extends Key { + public int value; + public int ttlSeconds; - public Increment(byte[] hashKey, byte[] sortKey) { - super(hashKey, sortKey); - } + public Increment(byte[] hashKey, byte[] sortKey) { + this(hashKey, sortKey, 1, 0); + } - public Increment(byte[] hashKey, byte[] sortKey, int increment, int ttlSeconds) { - super(hashKey, sortKey); - this.ttlSeconds = ttlSeconds; - this.increment = increment; - } + public Increment(byte[] hashKey, byte[] sortKey, int value, int ttlSeconds) { + super(hashKey, sortKey); + assert (ttlSeconds > -1); + this.ttlSeconds = ttlSeconds; + this.value = value; + } } diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/Key.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/Key.java index 04f44468..fc9ef46f 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/request/Key.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/Key.java @@ -1,15 +1,15 @@ package com.xiaomi.infra.pegasus.client.request; class Key { - public byte[] hashKey = null; - public byte[] sortKey = null; + public byte[] hashKey = null; + public byte[] sortKey = null; - Key(byte[] hashKey) { - this.hashKey = hashKey; - } + Key(byte[] hashKey) { + this.hashKey = hashKey; + } - Key(byte[] hashKey, byte[] sortKey) { - this.hashKey = hashKey; - this.sortKey = sortKey; - } + Key(byte[] hashKey, byte[] sortKey) { + this.hashKey = hashKey; + this.sortKey = sortKey; + } } diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/MultiDelete.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/MultiDelete.java index acbf935a..c529bf66 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/request/MultiDelete.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/MultiDelete.java @@ -1,5 +1,23 @@ package com.xiaomi.infra.pegasus.client.request; +import java.util.ArrayList; +import java.util.List; + public class MultiDelete { + public byte[] hashKey; + public List sortKeys = new ArrayList<>(); + + public MultiDelete(byte[] hashKey) { + assert (hashKey != null && hashKey.length != 0 && hashKey.length < 0xFFFF); + this.hashKey = hashKey; + } + + public MultiDelete(byte[] hashKey, List sortKeys) { + this.hashKey = hashKey; + this.sortKeys = sortKeys; + } + public void add(byte[] sortKey) { + sortKeys.add(sortKey); + } } diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/MultiGet.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/MultiGet.java index a98ca95f..52b94f28 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/request/MultiGet.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/MultiGet.java @@ -1,12 +1,31 @@ package com.xiaomi.infra.pegasus.client.request; +import java.util.ArrayList; import java.util.List; public class MultiGet { - byte[] hashKey; - List sortKeys; + public byte[] hashKey; + public List sortKeys; - int maxFetchCount; - int maxFetchSize; + public int maxFetchCount; + public int maxFetchSize; + public boolean noValue; + public MultiGet(byte[] hashKey) { + this(hashKey, new ArrayList<>(), 100, 1000, false); + } + + public MultiGet( + byte[] hashKey, List sortKeys, int maxFetchCount, int maxFetchSize, boolean noValue) { + assert (hashKey != null && hashKey.length != 0 && hashKey.length < 0xFFFF && sortKeys != null); + this.hashKey = hashKey; + this.sortKeys = sortKeys; + this.maxFetchCount = maxFetchCount; + this.maxFetchSize = maxFetchSize; + this.noValue = noValue; + } + + public void add(byte[] sortKey) { + sortKeys.add(sortKey); + } } diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/MultiSet.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/MultiSet.java index 33c5771b..ea5c94b1 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/request/MultiSet.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/MultiSet.java @@ -6,25 +6,22 @@ import java.util.*; import org.apache.commons.lang3.tuple.Pair; -/** - * @author qinzuoyan - *

Store data under the same hashKey. - */ public class MultiSet { - public byte[] hashKey; - public List> values = new ArrayList<>(); - public int ttlSeconds = 0; + public byte[] hashKey; + public List> values = new ArrayList<>(); + public int ttlSeconds; - public MultiSet(byte[] hashKey) { - this.hashKey = hashKey; - } + public MultiSet(byte[] hashKey) { + this(hashKey, 0); + } - public MultiSet(byte[] hashKey, int ttlSeconds) { - this.hashKey = hashKey; - this.ttlSeconds = ttlSeconds; - } + public MultiSet(byte[] hashKey, int ttlSeconds) { + assert (hashKey != null && hashKey.length > 0 && hashKey.length < 0xFFFF && ttlSeconds > 0); + this.hashKey = hashKey; + this.ttlSeconds = ttlSeconds; + } - public void add(byte[] sortKey, byte[] value) { - values.add(Pair.of(sortKey, value)); - } + public void add(byte[] sortKey, byte[] value) { + values.add(Pair.of(sortKey, value)); + } } diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/RangeDelete.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/RangeDelete.java new file mode 100644 index 00000000..9acd044b --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/RangeDelete.java @@ -0,0 +1,23 @@ +package com.xiaomi.infra.pegasus.client.request; + +import com.xiaomi.infra.pegasus.client.DelRangeOptions; + +public class RangeDelete { + + public byte[] hashKey; + public byte[] startSortKey; + public byte[] stopSortKey; + + public DelRangeOptions options = new DelRangeOptions(); + + public RangeDelete(byte[] hashKey) { + this(hashKey, "".getBytes(), "".getBytes()); + } + + public RangeDelete(byte[] hashKey, byte[] startSortKey, byte[] stopSortKey) { + assert (hashKey != null && hashKey.length > 0); + this.hashKey = hashKey; + this.startSortKey = startSortKey; + this.stopSortKey = stopSortKey; + } +} diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/RangeGet.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/RangeGet.java new file mode 100644 index 00000000..7f5e6c7a --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/RangeGet.java @@ -0,0 +1,31 @@ +package com.xiaomi.infra.pegasus.client.request; + +import com.xiaomi.infra.pegasus.client.MultiGetOptions; + +public class RangeGet { + public byte[] hashKey; + public byte[] startSortKey; + public byte[] stopSortKey; + public int maxFetchCount; + public int maxFetchSize; + + public MultiGetOptions externOptions = new MultiGetOptions(); + + public RangeGet(byte[] hashKey, byte[] startSortKey, byte[] stopSortKey) { + this(hashKey, startSortKey, stopSortKey, 100, 1000); + } + + public RangeGet( + byte[] hashKey, + byte[] startSortKey, + byte[] stopSortKey, + int maxFetchCount, + int maxFetchSize) { + assert (hashKey != null && hashKey.length != 0 && hashKey.length < 0xFFFF); + this.hashKey = hashKey; + this.startSortKey = startSortKey; + this.stopSortKey = stopSortKey; + this.maxFetchCount = maxFetchCount; + this.maxFetchSize = maxFetchSize; + } +} diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/Set.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/Set.java index aca1a043..7a8265e3 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/request/Set.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/Set.java @@ -4,30 +4,22 @@ package com.xiaomi.infra.pegasus.client.request; import java.io.Serializable; -import javax.validation.constraints.Min; -import javax.validation.constraints.NotNull; public class Set implements Serializable { - public byte[] hashKey; - public byte[] sortKey; - @NotNull - public byte[] value ; - @Min(0) - public int ttlSeconds = 0; // 0 means no ttl + public byte[] hashKey; + public byte[] sortKey; + public byte[] value; + public int ttlSeconds; // 0 means no ttl - public Set() {} + public Set(byte[] hashKey, byte[] sortKey, byte[] value) { + this(hashKey, sortKey, value, 0); + } - public Set(byte[] hashKey, byte[] sortKey, byte[] value, int ttlSeconds) { - this.hashKey = hashKey; - this.sortKey = sortKey; - this.value = value; - this.ttlSeconds = ttlSeconds; - } - - public Set(byte[] hashKey, byte[] sortKey, byte[] value) { - this.hashKey = hashKey; - this.sortKey = sortKey; - this.value = value; - this.ttlSeconds = 0; - } + public Set(byte[] hashKey, byte[] sortKey, byte[] value, int ttlSeconds) { + assert (value != null && ttlSeconds >= 0); + this.hashKey = hashKey; + this.sortKey = sortKey; + this.value = value; + this.ttlSeconds = ttlSeconds; + } } diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/response/BatchDelResult.java b/src/main/java/com/xiaomi/infra/pegasus/client/response/BatchDelResult.java deleted file mode 100644 index d92bd3c2..00000000 --- a/src/main/java/com/xiaomi/infra/pegasus/client/response/BatchDelResult.java +++ /dev/null @@ -1,5 +0,0 @@ -package com.xiaomi.infra.pegasus.client.response; - -public class BatchDelResult { - -} diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/response/BatchGetResult.java b/src/main/java/com/xiaomi/infra/pegasus/client/response/BatchGetResult.java deleted file mode 100644 index 9f0a1897..00000000 --- a/src/main/java/com/xiaomi/infra/pegasus/client/response/BatchGetResult.java +++ /dev/null @@ -1,13 +0,0 @@ -package com.xiaomi.infra.pegasus.client.response; - -import com.xiaomi.infra.pegasus.client.PException; -import java.util.List; -import org.apache.commons.lang3.tuple.Pair; - -public class BatchGetResult { - public List> results; - - public void add(PException pe, byte[] value) { - results.add(Pair.of(pe, value)); - } -} diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/response/BatchSetResult.java b/src/main/java/com/xiaomi/infra/pegasus/client/response/BatchSetResult.java deleted file mode 100644 index 96dfdfbd..00000000 --- a/src/main/java/com/xiaomi/infra/pegasus/client/response/BatchSetResult.java +++ /dev/null @@ -1,19 +0,0 @@ -package com.xiaomi.infra.pegasus.client.response; - -import com.xiaomi.infra.pegasus.client.PException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; - -public class BatchSetResult { - AtomicInteger successCount = new AtomicInteger(0); - List results = new ArrayList<>(); - - void add(PException pe) { - results.add(pe); - } - - void addSuccssCount(){ - successCount.incrementAndGet(); - } -} diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java index 3c9605c9..998e7785 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java @@ -158,7 +158,7 @@ void initTableConfiguration(query_cfg_response resp) { // Warm up the connections during client.openTable, so RPCs thereafter can // skip the connect process. try { - futureGroup.waitAllCompleteOrOneFail(manager_.getTimeout()); + futureGroup.waitAllComplete(manager_.getTimeout()); } catch (PException e) { logger.warn("failed to connect with some replica servers: ", e); } diff --git a/src/test/java/com/xiaomi/infra/pegasus/client/TestFutureGroup.java b/src/test/java/com/xiaomi/infra/pegasus/client/TestFutureGroup.java index 349ade4d..6605c4d5 100644 --- a/src/test/java/com/xiaomi/infra/pegasus/client/TestFutureGroup.java +++ b/src/test/java/com/xiaomi/infra/pegasus/client/TestFutureGroup.java @@ -47,7 +47,7 @@ public void testBlockingOperationException() throws Exception { FutureGroup group = new FutureGroup<>(1); group.add(promise); try { - group.waitAllCompleteOrOneFail(10000); + group.waitAllComplete(10000); } catch (PException e) { success.set(false); System.err.println(name.getMethodName() + ": " + e.toString()); @@ -78,7 +78,7 @@ public void testFutureWaitTimeout() throws Exception { group.add(promise); try { // never wake up promise. - group.waitAllCompleteOrOneFail(10); + group.waitAllComplete(10); } catch (PException e) { // must throw exception System.err.println(name.getMethodName() + ": " + e.toString()); From 8e49d58af68d5d024183aa0b2ff0ecbf9cf89a55 Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Mon, 24 Aug 2020 18:13:56 +0800 Subject: [PATCH 4/5] complete base --- .../pegasus/client/PegasusClientInterface.java | 8 ++++---- .../pegasus/client/PegasusTableInterface.java | 16 ++++++++-------- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java index fcb11ba2..d213e33e 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java @@ -633,12 +633,12 @@ public int batchMultiDel2( public int ttl(String tableName, byte[] hashKey, byte[] sortKey) throws PException; /** - * Atomically value value. + * Atomically increment value. * * @param tableName the table name. - * @param hashKey the hash key to value. - * @param sortKey the sort key to value. - * @param increment the value to be added to the old value. + * @param hashKey the hash key to increment. + * @param sortKey the sort key to increment. + * @param increment the increment to be added to the old value. * @param ttlSeconds time to live in seconds for the new value. should be no less than -1. for the * second method, the ttlSeconds is 0. - if ttlSeconds == 0, the semantic is the same as * redis: - normally, value will preserve the original ttl. - if old data is expired by ttl, diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTableInterface.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTableInterface.java index 1656a8af..1de03338 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTableInterface.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTableInterface.java @@ -499,17 +499,17 @@ public static interface IncrListener extends GenericFutureListener> } /** - * atomically value value by key, async version + * atomically increment value by key, async version * - * @param hashKey the hash key to value. - * @param sortKey the sort key to value. - * @param increment the value to be added to the old value. + * @param hashKey the hash key to increment. + * @param sortKey the sort key to increment. + * @param increment the increment to be added to the old value. * @param ttlSeconds time to live in seconds for the new value. for the second method, the * ttlSeconds is 0. should be no less than -1. for the second method, the ttlSeconds is 0. - - * if ttlSeconds == 0, the semantic is the same as redis: - normally, value will preserve the - * original ttl. - if old data is expired by ttl, then set initial value to 0 and set no ttl. - * - if ttlSeconds > 0, then update with the new ttl if value succeed. - if ttlSeconds == -1, - * then update to no ttl if value succeed. + * if ttlSeconds == 0, the semantic is the same as redis: - normally, increment will preserve + * the original ttl. - if old data is expired by ttl, then set initial value to 0 and set no + * ttl. - if ttlSeconds > 0, then update with the new ttl if increment succeed. - if + * ttlSeconds == -1, then update to no ttl if increment succeed. * @param timeout how long will the operation timeout in milliseconds. if timeout > 0, it is a * timeout value for current op, else the timeout value in the configuration file will be * used. From fede9e06df2c7ce85e4c328e1e10700f3e56c8e7 Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Mon, 24 Aug 2020 18:22:55 +0800 Subject: [PATCH 5/5] complete base --- .../infra/pegasus/client/PegasusClient.java | 4 +- .../client/PegasusClientInterface.java | 92 +---------------- .../infra/pegasus/client/PegasusTable.java | 2 +- .../pegasus/client/PegasusTableInterface.java | 99 ++++--------------- 4 files changed, 23 insertions(+), 174 deletions(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java index 49832e7e..c152e34c 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java @@ -283,9 +283,9 @@ public void multiDel(String tableName, MultiDelete multiDelete) throws PExceptio } @Override - public void rangeDelete(String tableName, RangeDelete rangeDelete) throws PException { + public void rangeDel(String tableName, RangeDelete rangeDelete) throws PException { PegasusTable tb = getTable(tableName); - tb.rangeDelete(rangeDelete, 0); + tb.rangeDel(rangeDelete, 0); } @Override diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java index d213e33e..949ff6eb 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java @@ -77,126 +77,36 @@ public interface PegasusClientInterface { public PegasusTableInterface openTable(String tableName, int backupRequestDelayMs) throws PException; - /** - * Check value exist by key from the cluster - * - * @param tableName TableHandler name - * @return true if exist, false if not exist - * @throws PException throws exception if any error occurs. - */ public boolean exist(String tableName, Get get) throws PException; - /** - * Get value. - * - * @param tableName TableHandler name - * @return value; null if not found - * @throws PException throws exception if any error occurs. - */ public byte[] get(String tableName, Get get) throws PException; - /** - * Batch get values of different keys. Will terminate immediately if any error occurs. - * - * @param tableName table name - * @throws PException throws exception if any error occurs. - *

Notice: the method is not atomic, that means, maybe some keys succeed but some keys - * failed. - */ public void batchGet(String tableName, BatchGet batchGet, List> results) throws PException; - /** - * Get multiple values under the same hash key. - * - * @param tableName table name - * @return true if all data is fetched; false if only partial data is fetched. - * @throws PException throws exception if any error occurs. - */ public MultiGetResult multiGet(String tableName, MultiGet multiGet) throws PException; - /** - * Get multiple key-values under the same hashKey with sortKey range limited. - * - * @param tableName table name - * @return true if all data is fetched; false if only partial data is fetched. - * @throws PException throws exception if any error occurs. - */ public MultiGetResult rangeGet(String tableName, RangeGet rangeGet) throws PException; - /** - * Set value. - * - * @param tableName TableHandler name - * @throws PException throws exception if any error occurs. - */ public void set(String tableName, Set set) throws PException; - /** - * Batch set lots of values. Will terminate immediately if any error occurs. - * - * @param tableName TableHandler name - * @throws PException throws exception if any error occurs. - *

Notice: the method is not atomic, that means, maybe some keys succeed but some keys - * failed. - */ public void batchSet(String tableName, BatchSet batchSet, List> results) throws PException; - /** - * Set multiple value under the same hash key. - * - * @param tableName table name - * @throws PException throws exception if any error occurs. - */ public void multiSet(String tableName, MultiSet multiSet) throws PException; - /** - * Delete value. - * - * @param tableName TableHandler name - * @throws PException throws exception if any error occurs. - */ public void del(String tableName, Delete delete) throws PException; - /** - * Batch delete values of different keys. Will terminate immediately if any error occurs. - * - * @param tableName table name - * @throws PException throws exception if any error occurs. - *

Notice: the method is not atomic, that means, maybe some keys succeed but some keys - * failed. - */ public void batchDel( String tableName, BatchDelete batchDelete, List> results) throws PException; public void multiDel(String tableName, MultiDelete multiDelete) throws PException; - /** - * Delete key-values within range of startSortKey and stopSortKey under hashKey. Will terminate - * immediately if any error occurs. - * - * @param tableName table name - * @throws PException throws exception if any error occurs. - */ - public void rangeDelete(String tableName, RangeDelete rangeDelete) throws PException; + public void rangeDel(String tableName, RangeDelete rangeDelete) throws PException; - /** - * Get ttl time. - * - * @param tableName TableHandler name - * @return ttl time in seconds; -1 if no ttl set; -2 if not exist. - * @throws PException throws exception if any error occurs. - */ public int ttl(String tableName, Get get) throws PException; - /** - * Atomically value value. - * - * @return the new value. - * @throws PException throws exception if any error occurs. - */ public long incr(String tableName, Increment increment) throws PException; /** diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java index a26d19bb..f31f3511 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java @@ -1542,7 +1542,7 @@ public void multiDel(MultiDelete multiDelete, int timeout) throws PException { } @Override - public void rangeDelete(RangeDelete rangeDelete, int timeout) throws PException { + public void rangeDel(RangeDelete rangeDelete, int timeout) throws PException { if (timeout <= 0) timeout = defaultTimeout; long startTime = System.currentTimeMillis(); diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTableInterface.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTableInterface.java index 1de03338..17af6a95 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTableInterface.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTableInterface.java @@ -67,7 +67,6 @@ * *

Please refer to the netty document for the usage of Future. */ -// TODO(jiashuo1): refactor as PegasusClientInterface public interface PegasusTableInterface { /// < -------- Exist -------- @@ -123,6 +122,7 @@ public Future asyncRangeGet(RangeGet rangeGet, int timeout /*ms* * are guaranteed to be executed as the same order as the listeners added. But listeners for * different tables are not guaranteed to be dispatched in the same thread. */ + @Deprecated public Future asyncExist(byte[] hashKey, byte[] sortKey, int timeout /*ms*/); /// < -------- SortKeyCount -------- @@ -154,6 +154,7 @@ public static interface SortKeyCountListener extends GenericFutureListener asyncSortKeyCount(byte[] hashKey, int timeout /*ms*/); /// < -------- Get -------- @@ -188,6 +189,7 @@ public static interface GetListener extends GenericFutureListener * are guaranteed to be executed as the same order as the listeners added. But listeners for * different tables are not guaranteed to be dispatched in the same thread. */ + @Deprecated public Future asyncGet(byte[] hashKey, byte[] sortKey, int timeout /*ms*/); /// < -------- MultiGet -------- @@ -240,6 +242,7 @@ public static interface MultiGetListener extends GenericFutureListener asyncMultiGet( byte[] hashKey, List sortKeys, @@ -247,6 +250,7 @@ public Future asyncMultiGet( int maxFetchSize, int timeout /*ms*/); + @Deprecated public Future asyncMultiGet( byte[] hashKey, List sortKeys, int timeout /*ms*/); @@ -272,6 +276,7 @@ public Future asyncMultiGet( * the same order as the listeners added. But listeners for different tables are not * guaranteed to be dispatched in the same thread. */ + @Deprecated public Future asyncMultiGet( byte[] hashKey, byte[] startSortKey, @@ -281,6 +286,7 @@ public Future asyncMultiGet( int maxFetchSize, int timeout /*ms*/); + @Deprecated public Future asyncMultiGet( byte[] hashKey, byte[] startSortKey, @@ -335,9 +341,11 @@ public static interface MultiGetSortKeysListener * the same order as the listeners added. But listeners for different tables are not * guaranteed to be dispatched in the same thread. */ + @Deprecated public Future asyncMultiGetSortKeys( byte[] hashKey, int maxFetchCount, int maxFetchSize, int timeout /*ms*/); + @Deprecated public Future asyncMultiGetSortKeys(byte[] hashKey, int timeout /*ms*/); /// < -------- Set -------- @@ -374,9 +382,11 @@ public static interface SetListener extends GenericFutureListener> * are guaranteed to be executed as the same order as the listeners added. But listeners for * different tables are not guaranteed to be dispatched in the same thread. */ + @Deprecated public Future asyncSet( byte[] hashKey, byte[] sortKey, byte[] value, int ttlSeconds, int timeout /*ms*/); + @Deprecated public Future asyncSet(byte[] hashKey, byte[] sortKey, byte[] value, int timeout /*ms*/); /// < -------- MultiGet -------- @@ -411,9 +421,11 @@ public static interface MultiSetListener extends GenericFutureListener asyncMultiSet( byte[] hashKey, List> values, int ttlSeconds, int timeout /*ms*/); + @Deprecated public Future asyncMultiSet( byte[] hashKey, List> values, int timeout /*ms*/); @@ -449,6 +461,7 @@ public static interface DelListener extends GenericFutureListener> * the same order as the listeners added. But listeners for different tables are not * guaranteed to be dispatched in the same thread. */ + @Deprecated public Future asyncDel(byte[] hashKey, byte[] sortKey, int timeout /*ms*/); /// < -------- MultiDel -------- @@ -482,6 +495,7 @@ public static interface MultiDelListener extends GenericFutureListener asyncMultiDel(byte[] hashKey, List sortKeys, int timeout /*ms*/); /// < -------- Incr -------- @@ -521,9 +535,11 @@ public static interface IncrListener extends GenericFutureListener> * the same order as the listeners added. But listeners for different tables are not * guaranteed to be dispatched in the same thread. */ + @Deprecated public Future asyncIncr( byte[] hashKey, byte[] sortKey, long increment, int ttlSeconds, int timeout /*ms*/); + @Deprecated public Future asyncIncr(byte[] hashKey, byte[] sortKey, long increment, int timeout /*ms*/); /// < -------- CheckAndSet -------- @@ -739,117 +755,40 @@ public static interface TTLListener extends GenericFutureListener asyncTTL(byte[] hashKey, byte[] sortKey, int timeout /*ms*/); /// < -------- Sync Methods -------- - /** - * Check value exist by key from the cluster - * - * @return true if exist, false if not exist - * @throws PException throws exception if any error occurs. - */ public boolean exist(Get get, int timeout) throws PException; - /** - * Get value. - * - * @return value; null if not found - * @throws PException throws exception if any error occurs. - */ public byte[] get(Get get, int timeout) throws PException; - /** - * Batch get values of different keys. Will terminate immediately if any error occurs. - * - * @throws PException throws exception if any error occurs. - *

Notice: the method is not atomic, that means, maybe some keys succeed but some keys - * failed. - */ public void batchGet(BatchGet batchGet, List> results, int timeout) throws PException; - /** - * Get multiple values under the same hash key. - * - * @return true if all data is fetched; false if only partial data is fetched. - * @throws PException throws exception if any error occurs. - */ public MultiGetResult multiGet(MultiGet multiGet, int timeout) throws PException; - /** - * Get multiple key-values under the same hashKey with sortKey range limited. - * - * @return true if all data is fetched; false if only partial data is fetched. - * @throws PException throws exception if any error occurs. - */ public MultiGetResult rangeGet(RangeGet rangeGet, int timeout) throws PException; - /** - * Set value. - * - * @throws PException throws exception if any error occurs. - */ public void set(Set set, int timeout) throws PException; - /** - * Batch set lots of values. Will terminate immediately if any error occurs. - * - * @throws PException throws exception if any error occurs. - *

Notice: the method is not atomic, that means, maybe some keys succeed but some keys - * failed. - */ public int batchSet(BatchSet batchSet, List> results, int timeout) throws PException; - /** - * Set multiple value under the same hash key. - * - * @throws PException throws exception if any error occurs. - */ public void multiSet(MultiSet multiSet, int timeout) throws PException; - /** - * Delete value. - * - * @throws PException throws exception if any error occurs. - */ public void del(Delete delete, int timeout) throws PException; - /** - * Batch delete values of different keys. Will terminate immediately if any error occurs. - * - * @throws PException throws exception if any error occurs. - *

Notice: the method is not atomic, that means, maybe some keys succeed but some keys - * failed. - */ public int batchDel(BatchDelete batchDelete, List> results, int timeout) throws PException; public void multiDel(MultiDelete multiDelete, int timeout) throws PException; - /** - * Delete key-values within range of startSortKey and stopSortKey under hashKey. Will terminate - * immediately if any error occurs. - * - * @throws PException throws exception if any error occurs. - */ - public void rangeDelete(RangeDelete rangeDelete, int timeout) throws PException; + public void rangeDel(RangeDelete rangeDelete, int timeout) throws PException; - /** - * Get ttl time. - * - * @return ttl time in seconds; -1 if no ttl set; -2 if not exist. - * @throws PException throws exception if any error occurs. - */ public int ttl(Get get, int timeout) throws PException; - /** - * Atomically value value. - * - * @return the new value. - * @throws PException throws exception if any error occurs. - */ public long incr(Increment increment, int timeout) throws PException; /**