Skip to content
This repository has been archived by the owner on May 10, 2022. It is now read-only.

Commit

Permalink
feat: add delRange api (#48)
Browse files Browse the repository at this point in the history
  • Loading branch information
foreverneverer committed Oct 30, 2019
1 parent 65589fd commit 6f38c0b
Show file tree
Hide file tree
Showing 6 changed files with 285 additions and 0 deletions.
22 changes: 22 additions & 0 deletions src/main/java/com/xiaomi/infra/pegasus/client/DelRangeOptions.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright (c) 2019, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.
package com.xiaomi.infra.pegasus.client;

public class DelRangeOptions {
public String nextSortKey = "";
public boolean startInclusive = true; // if the startSortKey is included
public boolean stopInclusive = false; // if the stopSortKey is included
public FilterType sortKeyFilterType = FilterType.FT_NO_FILTER; // filter type for sort key
public byte[] sortKeyFilterPattern = null; // filter pattern for sort key

public DelRangeOptions() {}

public DelRangeOptions(DelRangeOptions o) {
nextSortKey = o.nextSortKey;
startInclusive = o.startInclusive;
stopInclusive = o.stopInclusive;
sortKeyFilterType = o.sortKeyFilterType;
sortKeyFilterPattern = o.sortKeyFilterPattern;
}
}
12 changes: 12 additions & 0 deletions src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,18 @@ public void multiDel(String tableName, byte[] hashKey, List<byte[]> sortKeys) th
tb.multiDel(hashKey, sortKeys, 0);
}

@Override
public void delRange(
String tableName,
byte[] hashKey,
byte[] startSortKey,
byte[] stopSortKey,
DelRangeOptions options)
throws PException {
PegasusTable tb = getTable(tableName);
tb.delRange(hashKey, startSortKey, stopSortKey, options, 0);
}

@Override
public void batchMultiDel(String tableName, List<Pair<byte[], List<byte[]>>> keys)
throws PException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,25 @@ public int batchDel2(String tableName, List<Pair<byte[], byte[]>> keys, List<PEx
*/
public void multiDel(String tableName, byte[] hashKey, List<byte[]> sortKeys) throws PException;

/**
* Delete key-values within range of startSortKey and stopSortKey under hashKey. Will terminate
* immediately if any error occurs.
*
* @param tableName table name
* @param hashKey used to decide which partition the key may exist should not be null or empty.
* @param startSortKey the start sort key. null means "".
* @param stopSortKey the stop sort key. null or "" means fetch to the last sort key.
* @param options del range options.
* @throws PException throws exception if any error occurs.
*/
public void delRange(
String tableName,
byte[] hashKey,
byte[] startSortKey,
byte[] stopSortKey,
DelRangeOptions options)
throws PException;

/**
* Batch delete specified sort keys under the same hash key. Will terminate immediately if any
* error occurs.
Expand Down
86 changes: 86 additions & 0 deletions src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1370,6 +1370,92 @@ public void multiDel(byte[] hashKey, List<byte[]> sortKeys, int timeout) throws
}
}

@Override
public void delRange(
byte[] hashKey, byte[] startSortKey, byte[] stopSortKey, DelRangeOptions options, int timeout)
throws PException {
if (timeout <= 0) timeout = defaultTimeout;
long startTime = System.currentTimeMillis();
long lastCheckTime = startTime;
long deadlineTime = startTime + timeout;
int count = 0;
final int maxBatchDelCount = 100;

ScanOptions scanOptions = new ScanOptions();
scanOptions.noValue = true;
scanOptions.startInclusive = options.startInclusive;
scanOptions.stopInclusive = options.stopInclusive;
scanOptions.sortKeyFilterType = options.sortKeyFilterType;
scanOptions.sortKeyFilterPattern = options.sortKeyFilterPattern;

options.nextSortKey = new String(startSortKey);
PegasusScannerInterface pegasusScanner =
getScanner(hashKey, startSortKey, stopSortKey, scanOptions);
lastCheckTime = System.currentTimeMillis();
if (lastCheckTime >= deadlineTime) {
throw new PException(
"Getting pegasusScanner takes too long time when delete hashKey:"
+ new String(hashKey)
+ ",startSortKey:"
+ new String(startSortKey)
+ ",stopSortKey:"
+ new String(stopSortKey)
+ ",timeUsed:"
+ (lastCheckTime - startTime)
+ ":",
new ReplicationException(error_code.error_types.ERR_TIMEOUT));
}

int remainingTime = (int) (deadlineTime - lastCheckTime);
List<byte[]> sortKeys = new ArrayList<byte[]>();
try {
Pair<Pair<byte[], byte[]>, byte[]> pairs;
while ((pairs = pegasusScanner.next()) != null) {
sortKeys.add(pairs.getKey().getValue());
if (sortKeys.size() == maxBatchDelCount) {
options.nextSortKey = new String(sortKeys.get(0));
asyncMultiDel(hashKey, sortKeys, remainingTime).get(remainingTime, TimeUnit.MILLISECONDS);
lastCheckTime = System.currentTimeMillis();
remainingTime = (int) (deadlineTime - lastCheckTime);
if (remainingTime <= 0) {
throw new TimeoutException();
}
count++;
sortKeys.clear();
}
}
if (!sortKeys.isEmpty()) {
asyncMultiDel(hashKey, sortKeys, remainingTime).get(remainingTime, TimeUnit.MILLISECONDS);
options.nextSortKey = null;
}
} catch (InterruptedException | ExecutionException e) {
throw new PException(
"delRange of hashKey:"
+ new String(hashKey)
+ " from sortKey:"
+ options.nextSortKey
+ "[index:"
+ count * maxBatchDelCount
+ "]"
+ " failed:",
e);
} catch (TimeoutException e) {
String sortKey = sortKeys.isEmpty() ? null : new String(sortKeys.get(0));
int timeUsed = (int) (System.currentTimeMillis() - startTime);
throw new PException(
"delRange of hashKey:"
+ new String(hashKey)
+ " from sortKey:"
+ sortKey
+ "[index:"
+ count * maxBatchDelCount
+ "]"
+ " failed, timeUsed:"
+ timeUsed,
new ReplicationException(error_code.error_types.ERR_TIMEOUT));
}
}

@Override
public void batchMultiDel(List<Pair<byte[], List<byte[]>>> keys, int timeout) throws PException {
if (keys == null || keys.size() == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -978,6 +978,27 @@ public int batchDel2(
*/
public void multiDel(byte[] hashKey, List<byte[]> sortKeys, int timeout /*ms*/) throws PException;

/**
* Delete key-values within range of startSortKey and stopSortKey under hashKey. Will terminate
* immediately if any error occurs.
*
* @param hashKey used to decide which partition the key may exist should not be null or empty.
* @param startSortKey the start sort key. null means "".
* @param stopSortKey the stop sort key. null or "" means fetch to the last sort key.
* @param options del range options.
* @param timeout how long will the operation timeout in milliseconds. if timeout > 0, it is a
* timeout value for current op, else the timeout value in the configuration file will be
* used.
* @throws PException throws exception if any error occurs.
*/
public void delRange(
byte[] hashKey,
byte[] startSortKey,
byte[] stopSortKey,
DelRangeOptions options,
int timeout /*ms*/)
throws PException;

/**
* Batch delete specified sort keys under the same hash key. Will terminate immediately if any
* error occurs.
Expand Down
125 changes: 125 additions & 0 deletions src/test/java/com/xiaomi/infra/pegasus/client/TestBasic.java
Original file line number Diff line number Diff line change
Expand Up @@ -2357,4 +2357,129 @@ public void createClient() throws PException {

PegasusClientFactory.closeSingletonClient();
}

@Test
public void delRange() throws PException {
PegasusClientInterface client = PegasusClientFactory.getSingletonClient();
DelRangeOptions delRangeOptions = new DelRangeOptions();

// multi set values
List<Pair<byte[], byte[]>> values = new ArrayList<Pair<byte[], byte[]>>();
int count = 0;
try {
while (count < 150) {
values.add(Pair.of(("k_" + count).getBytes(), ("v_" + count).getBytes()));
count++;
}
client.multiSet("temp", "delRange".getBytes(), values);
} catch (Exception e) {
e.printStackTrace();
Assert.assertTrue(false);
}

// delRange with default delRangeOptions
try {
client.delRange(
"temp", "delRange".getBytes(), "k_0".getBytes(), "k_90".getBytes(), delRangeOptions);
} catch (Exception e) {
e.printStackTrace();
Assert.assertTrue(false);
}

Assert.assertTrue(delRangeOptions.nextSortKey == null);
List<byte[]> remainingSortKey = new ArrayList<byte[]>();

remainingSortKey.add("k_90".getBytes());
remainingSortKey.add("k_91".getBytes());
remainingSortKey.add("k_92".getBytes());
remainingSortKey.add("k_93".getBytes());
remainingSortKey.add("k_94".getBytes());
remainingSortKey.add("k_95".getBytes());
remainingSortKey.add("k_96".getBytes());
remainingSortKey.add("k_97".getBytes());
remainingSortKey.add("k_98".getBytes());
remainingSortKey.add("k_99".getBytes());
List<Pair<byte[], byte[]>> remainingValue = new ArrayList<Pair<byte[], byte[]>>();

try {
client.multiGet("temp", "delRange".getBytes(), remainingSortKey, remainingValue);
} catch (Exception e) {
e.printStackTrace();
Assert.assertTrue(false);
}

List<String> valueStr = new ArrayList<String>();
for (Pair<byte[], byte[]> pair : remainingValue) {
valueStr.add(new String(pair.getValue()));
}
Assert.assertEquals(10, valueStr.size());
Assert.assertTrue(valueStr.contains("v_90"));
Assert.assertTrue(valueStr.contains("v_91"));
Assert.assertTrue(valueStr.contains("v_92"));
Assert.assertTrue(valueStr.contains("v_93"));
Assert.assertTrue(valueStr.contains("v_94"));
Assert.assertTrue(valueStr.contains("v_95"));
Assert.assertTrue(valueStr.contains("v_96"));
Assert.assertTrue(valueStr.contains("v_97"));
Assert.assertTrue(valueStr.contains("v_98"));
Assert.assertTrue(valueStr.contains("v_99"));

// delRange with FT_MATCH_POSTFIX option
delRangeOptions.sortKeyFilterType = FilterType.FT_MATCH_POSTFIX;
delRangeOptions.sortKeyFilterPattern = "k_93".getBytes();
try {
client.delRange(
"temp", "delRange".getBytes(), "k_90".getBytes(), "k_95".getBytes(), delRangeOptions);
} catch (Exception e) {
e.printStackTrace();
Assert.assertTrue(false);
}

remainingValue.clear();
valueStr.clear();
try {
client.multiGet("temp", "delRange".getBytes(), remainingSortKey, remainingValue);
} catch (Exception e) {
e.printStackTrace();
Assert.assertTrue(false);
}
for (Pair<byte[], byte[]> pair : remainingValue) {
valueStr.add(new String(pair.getValue()));
}

Assert.assertEquals(9, valueStr.size());
Assert.assertTrue(!valueStr.contains("v_93"));

// delRange with "*Inclusive" option
delRangeOptions.startInclusive = false;
delRangeOptions.stopInclusive = true;
delRangeOptions.sortKeyFilterType = FilterType.FT_NO_FILTER;
delRangeOptions.sortKeyFilterPattern = null;
try {
client.delRange(
"temp", "delRange".getBytes(), "k_90".getBytes(), "k_95".getBytes(), delRangeOptions);
} catch (Exception e) {
e.printStackTrace();
Assert.assertTrue(false);
}

remainingValue.clear();
valueStr.clear();
try {
client.multiGet("temp", "delRange".getBytes(), remainingSortKey, remainingValue);
} catch (Exception e) {
e.printStackTrace();
Assert.assertTrue(false);
}
for (Pair<byte[], byte[]> pair : remainingValue) {
valueStr.add(new String(pair.getValue()));
}

Assert.assertEquals(5, valueStr.size());
Assert.assertTrue(valueStr.contains("v_90"));
Assert.assertTrue(valueStr.contains("v_96"));
Assert.assertTrue(valueStr.contains("v_97"));
Assert.assertTrue(valueStr.contains("v_98"));
Assert.assertTrue(valueStr.contains("v_99"));
}
}

0 comments on commit 6f38c0b

Please sign in to comment.