Skip to content
This repository has been archived by the owner on May 10, 2022. It is now read-only.

feat: using "interceptor" to enhance the api(compress) #126

Merged
merged 36 commits into from
Sep 4, 2020
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
d6b0d19
init
foreverneverer Aug 25, 2020
9dae4ce
init the api
foreverneverer Aug 25, 2020
a3637aa
add compress interceptor
foreverneverer Aug 26, 2020
cd24eed
fix open
foreverneverer Aug 27, 2020
482e335
move and after
foreverneverer Aug 31, 2020
2edb6a0
move and after
foreverneverer Aug 31, 2020
e7ecefb
move and after
foreverneverer Aug 31, 2020
49ac5fd
move and after
foreverneverer Aug 31, 2020
ef7aa21
merge lastest code
foreverneverer Aug 31, 2020
44eb64a
fix
foreverneverer Sep 1, 2020
e7c98c8
fix
foreverneverer Sep 1, 2020
151dd6a
fix
foreverneverer Sep 1, 2020
e3626d6
merge
foreverneverer Sep 2, 2020
181ed6f
merge
foreverneverer Sep 2, 2020
d8f1daa
merge
foreverneverer Sep 2, 2020
e4d5ba5
merge
foreverneverer Sep 2, 2020
660ac79
merge master
foreverneverer Sep 2, 2020
30e1567
fix
foreverneverer Sep 2, 2020
501a155
fix
foreverneverer Sep 2, 2020
5b4dcc0
fix
foreverneverer Sep 2, 2020
08fd240
fix
foreverneverer Sep 2, 2020
834888e
fix
foreverneverer Sep 2, 2020
3ced11e
add test
foreverneverer Sep 2, 2020
148b584
add test
foreverneverer Sep 2, 2020
4c0998e
add interface
foreverneverer Sep 2, 2020
9850a37
add interface
foreverneverer Sep 2, 2020
3aea6ca
add interface
foreverneverer Sep 2, 2020
ad8140b
add interface
foreverneverer Sep 2, 2020
779a80f
add interface
foreverneverer Sep 2, 2020
7fea1c6
fix options
foreverneverer Sep 3, 2020
e1602fe
fix options
foreverneverer Sep 3, 2020
fa94399
fix options
foreverneverer Sep 3, 2020
88b5e88
fix options
foreverneverer Sep 3, 2020
a098625
fix comment
foreverneverer Sep 4, 2020
2a09369
fix comment
foreverneverer Sep 4, 2020
d9acc5a
Update src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInt…
foreverneverer Sep 4, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,19 @@ public long hash(byte[] key) {
}

private PegasusTable getTable(String tableName) throws PException {
return getTable(tableName, 0);
return getTable(tableName, 0, false);
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
}

private PegasusTable getTable(String tableName, int backupRequestDelayMs) throws PException {
private PegasusTable getTable(
String tableName, int backupRequestDelayMs, boolean enableCompression) throws PException {
PegasusTable table = tableMap.get(tableName);
if (table == null) {
synchronized (tableMapLock) {
table = tableMap.get(tableName);
if (table == null) {
try {
TableOptions options = new TableOptions(new PegasusHasher(), backupRequestDelayMs);
TableOptions options =
new TableOptions(new PegasusHasher(), backupRequestDelayMs, enableCompression);
Table internalTable = cluster.openTable(tableName, options);
table = new PegasusTable(this, internalTable);
} catch (Throwable e) {
Expand Down Expand Up @@ -189,9 +191,9 @@ public PegasusTableInterface openTable(String tableName) throws PException {
}

@Override
public PegasusTableInterface openTable(String tableName, int backupRequestDelayMs)
throws PException {
return getTable(tableName, backupRequestDelayMs);
public PegasusTableInterface openTable(
String tableName, int backupRequestDelayMs, boolean enableCompression) throws PException {
return getTable(tableName, backupRequestDelayMs, enableCompression);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,12 @@ public interface PegasusClientInterface {
* administrator
* @param backupRequestDelayMs the delay time to send backup request. If backupRequestDelayMs <=
* 0, The backup request is disabled.
* @param enableCompression whether to enable the table data open auto-compress
* @return the table handler
* @throws PException throws exception if any error occurs.
*/
public PegasusTableInterface openTable(String tableName, int backupRequestDelayMs)
throws PException;
public PegasusTableInterface openTable(
String tableName, int backupRequestDelayMs, boolean enableCompression) throws PException;
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved

/**
* Check value exist by key from the cluster
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ public check_and_mutate_response get_response() {
return resp;
}

public check_and_mutate_request get_request() {
return request;
}

private check_and_mutate_request request;
private check_and_mutate_response resp;
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ public check_and_set_response get_response() {
return resp;
}

public check_and_set_request get_request() {
return request;
}

private check_and_set_request request;
private check_and_set_response resp;
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ public update_response get_response() {
return resp;
}

public multi_put_request get_request() {
return request;
}

private multi_put_request request;
private update_response resp;
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ public update_response get_response() {
return resp;
}

public update_request get_request() {
return request;
}

private update_request request;
private update_response resp;
}
10 changes: 8 additions & 2 deletions src/main/java/com/xiaomi/infra/pegasus/rpc/TableOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
public class TableOptions {
private final KeyHasher keyHasher;
private final int backupRequestDelayMs;
private final boolean enableCompression;

public KeyHasher keyHasher() {
return this.keyHasher;
Expand All @@ -17,15 +18,20 @@ public int backupRequestDelayMs() {
}

public static TableOptions forTest() {
return new TableOptions(KeyHasher.DEFAULT, 0);
return new TableOptions(KeyHasher.DEFAULT, 0, false);
}

public TableOptions(KeyHasher h, int backupRequestDelay) {
public TableOptions(KeyHasher h, int backupRequestDelay, boolean enableCompression) {
this.keyHasher = h;
this.backupRequestDelayMs = backupRequestDelay;
this.enableCompression = enableCompression;
}

public boolean enableBackupRequest() {
return backupRequestDelayMs > 0;
}

public boolean enableCompression() {
return enableCompression;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,6 @@

public class BackupRequestInterceptor implements TableInterceptor {

private boolean isOpen;

public BackupRequestInterceptor(boolean isOpen) {
this.isOpen = isOpen;
}

@Override
public void before(ClientRequestRound clientRequestRound, TableHandler tableHandler) {
backupCall(clientRequestRound, tableHandler);
Expand All @@ -33,7 +27,7 @@ public void after(
}

private void backupCall(ClientRequestRound clientRequestRound, TableHandler tableHandler) {
if (!isOpen || !clientRequestRound.getOperator().supportBackupRequest()) {
if (!clientRequestRound.getOperator().supportBackupRequest()) {
return;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package com.xiaomi.infra.pegasus.rpc.interceptor;

import com.xiaomi.infra.pegasus.apps.key_value;
import com.xiaomi.infra.pegasus.apps.mutate;
import com.xiaomi.infra.pegasus.base.error_code.error_types;
import com.xiaomi.infra.pegasus.operator.client_operator;
import com.xiaomi.infra.pegasus.operator.rrdb_check_and_mutate_operator;
import com.xiaomi.infra.pegasus.operator.rrdb_check_and_set_operator;
import com.xiaomi.infra.pegasus.operator.rrdb_get_operator;
import com.xiaomi.infra.pegasus.operator.rrdb_multi_get_operator;
import com.xiaomi.infra.pegasus.operator.rrdb_multi_put_operator;
import com.xiaomi.infra.pegasus.operator.rrdb_put_operator;
import com.xiaomi.infra.pegasus.operator.rrdb_scan_operator;
import com.xiaomi.infra.pegasus.rpc.async.ClientRequestRound;
import com.xiaomi.infra.pegasus.rpc.async.TableHandler;
import com.xiaomi.infra.pegasus.tools.ZstdWrapper;
import java.util.List;

public class CompressionInterceptor implements TableInterceptor {

@Override
public void before(ClientRequestRound clientRequestRound, TableHandler tableHandler) {
tryCompress(clientRequestRound);
}

@Override
public void after(
ClientRequestRound clientRequestRound, error_types errno, TableHandler tableHandler) {
if (errno != error_types.ERR_OK) {
return;
}
tryDecompress(clientRequestRound);
}

private void tryCompress(ClientRequestRound clientRequestRound) {
client_operator operator = clientRequestRound.getOperator();
if (operator instanceof rrdb_put_operator) {
rrdb_put_operator put = (rrdb_put_operator) operator;
put.get_request().value.data = ZstdWrapper.compress(put.get_request().value.data);
return;
}

if (operator instanceof rrdb_multi_put_operator) {
List<key_value> kvs = ((rrdb_multi_put_operator) operator).get_request().kvs;
for (key_value kv : kvs) {
kv.value.data = ZstdWrapper.compress(kv.value.data);
}
return;
}

if (operator instanceof rrdb_check_and_set_operator) {
rrdb_check_and_set_operator check_and_set = (rrdb_check_and_set_operator) operator;
check_and_set.get_request().set_value.data =
ZstdWrapper.compress(check_and_set.get_request().set_value.data);
return;
}

if (operator instanceof rrdb_check_and_mutate_operator) {
List<mutate> mutates = ((rrdb_check_and_mutate_operator) operator).get_request().mutate_list;
for (mutate mu : mutates) {
mu.value.data = ZstdWrapper.compress(mu.value.data);
}
}
}

private void tryDecompress(ClientRequestRound clientRequestRound) {
client_operator operator = clientRequestRound.getOperator();

if (operator instanceof rrdb_get_operator) {
rrdb_get_operator get = (rrdb_get_operator) operator;
get.get_response().value.data = ZstdWrapper.tryDecompress(get.get_response().value.data);
return;
}

if (operator instanceof rrdb_multi_get_operator) {
List<key_value> kvs = ((rrdb_multi_get_operator) operator).get_response().kvs;
for (key_value kv : kvs) {
kv.value.data = ZstdWrapper.tryDecompress(kv.value.data);
}
return;
}

if (operator instanceof rrdb_scan_operator) {
List<key_value> kvs = ((rrdb_scan_operator) operator).get_response().kvs;
for (key_value kv : kvs) {
kv.value.data = ZstdWrapper.tryDecompress(kv.value.data);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@ public class InterceptorManger {
private List<TableInterceptor> interceptors = new ArrayList<>();

public InterceptorManger(TableOptions options) {
register(new BackupRequestInterceptor(options.enableBackupRequest()));
this.register(new BackupRequestInterceptor(), options.enableBackupRequest())
.register(new CompressionInterceptor(), options.enableCompression());
}

private InterceptorManger register(TableInterceptor interceptor) {
interceptors.add(interceptor);
private InterceptorManger register(TableInterceptor interceptor, boolean enable) {
if (enable) {
interceptors.add(interceptor);
}
return this;
}

Expand Down
17 changes: 17 additions & 0 deletions src/main/java/com/xiaomi/infra/pegasus/tools/ZstdWrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,23 @@ public class ZstdWrapper {

private ZstdWrapper() {}

/**
* try decompress the `src`, return `src` directly if failed
*
* @param src
* @return
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
*/
public static byte[] tryDecompress(byte[] src) {
byte[] decompressedValue;
try {
decompressedValue = decompress(src);
} catch (PException e) {
// decompress fail
decompressedValue = src;
}
return decompressedValue;
}

/**
* Compresses the `src` and returns the compressed.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.xiaomi.infra.pegasus.rpc.async;

import com.xiaomi.infra.pegasus.client.ClientOptions;
import com.xiaomi.infra.pegasus.client.PException;
import com.xiaomi.infra.pegasus.client.PegasusClientFactory;
import com.xiaomi.infra.pegasus.client.PegasusTableInterface;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;

public class InterceptorTest {
@Test
public void testCompressionInterceptor() throws PException {
PegasusTableInterface commonTable =
PegasusClientFactory.createClient(ClientOptions.create()).openTable("temp", 0, false);
PegasusTableInterface compressTable =
PegasusClientFactory.createClient(ClientOptions.create()).openTable("temp", 0, true);

byte[] hashKey = "hashKey".getBytes();
byte[] sortKey = "sortKey".getBytes();
byte[] commonValue = "commonValue".getBytes();
byte[] compressionValue = "compressionValue".getBytes();

// if origin value was not compressed, both commonTable and compressTable can read origin value
commonTable.set(hashKey, sortKey, commonValue, 10000);
Assertions.assertEquals(
new String(commonTable.get(hashKey, sortKey, 10000)), new String(commonValue));
Assertions.assertEquals(
new String(compressTable.get(hashKey, sortKey, 10000)), new String(commonValue));

// if origin value was compressed, only compressTable can read successfully
compressTable.set(hashKey, sortKey, compressionValue, 10000);
Assertions.assertNotEquals(
new String(commonTable.get(hashKey, sortKey, 10000)), new String(compressionValue));
Assertions.assertEquals(
new String(compressTable.get(hashKey, sortKey, 10000)), new String(compressionValue));
}
}