Skip to content
This repository has been archived by the owner on May 10, 2022. It is now read-only.

feat: using "interceptor" to enhance the api(compress) #126

Merged
merged 36 commits into from
Sep 4, 2020
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
d6b0d19
init
foreverneverer Aug 25, 2020
9dae4ce
init the api
foreverneverer Aug 25, 2020
a3637aa
add compress interceptor
foreverneverer Aug 26, 2020
cd24eed
fix open
foreverneverer Aug 27, 2020
482e335
move and after
foreverneverer Aug 31, 2020
2edb6a0
move and after
foreverneverer Aug 31, 2020
e7ecefb
move and after
foreverneverer Aug 31, 2020
49ac5fd
move and after
foreverneverer Aug 31, 2020
ef7aa21
merge lastest code
foreverneverer Aug 31, 2020
44eb64a
fix
foreverneverer Sep 1, 2020
e7c98c8
fix
foreverneverer Sep 1, 2020
151dd6a
fix
foreverneverer Sep 1, 2020
e3626d6
merge
foreverneverer Sep 2, 2020
181ed6f
merge
foreverneverer Sep 2, 2020
d8f1daa
merge
foreverneverer Sep 2, 2020
e4d5ba5
merge
foreverneverer Sep 2, 2020
660ac79
merge master
foreverneverer Sep 2, 2020
30e1567
fix
foreverneverer Sep 2, 2020
501a155
fix
foreverneverer Sep 2, 2020
5b4dcc0
fix
foreverneverer Sep 2, 2020
08fd240
fix
foreverneverer Sep 2, 2020
834888e
fix
foreverneverer Sep 2, 2020
3ced11e
add test
foreverneverer Sep 2, 2020
148b584
add test
foreverneverer Sep 2, 2020
4c0998e
add interface
foreverneverer Sep 2, 2020
9850a37
add interface
foreverneverer Sep 2, 2020
3aea6ca
add interface
foreverneverer Sep 2, 2020
ad8140b
add interface
foreverneverer Sep 2, 2020
779a80f
add interface
foreverneverer Sep 2, 2020
7fea1c6
fix options
foreverneverer Sep 3, 2020
e1602fe
fix options
foreverneverer Sep 3, 2020
fa94399
fix options
foreverneverer Sep 3, 2020
88b5e88
fix options
foreverneverer Sep 3, 2020
a098625
fix comment
foreverneverer Sep 4, 2020
2a09369
fix comment
foreverneverer Sep 4, 2020
d9acc5a
Update src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInt…
foreverneverer Sep 4, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ public check_and_mutate_response get_response() {
return resp;
}

public check_and_mutate_request get_request() {
return request;
}

private check_and_mutate_request request;
private check_and_mutate_response resp;
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ public check_and_set_response get_response() {
return resp;
}

public check_and_set_request get_request() {
return request;
}

private check_and_set_request request;
private check_and_set_response resp;
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ public update_response get_response() {
return resp;
}

public multi_put_request get_request() {
return request;
}

private multi_put_request request;
private update_response resp;
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ public update_response get_response() {
return resp;
}

public update_request get_request() {
return request;
}

private update_request request;
private update_response resp;
}
10 changes: 8 additions & 2 deletions src/main/java/com/xiaomi/infra/pegasus/rpc/TableOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
public class TableOptions {
private final KeyHasher keyHasher;
private final int backupRequestDelayMs;
private final boolean enableCompress;

public KeyHasher keyHasher() {
return this.keyHasher;
Expand All @@ -17,15 +18,20 @@ public int backupRequestDelayMs() {
}

public static TableOptions forTest() {
return new TableOptions(KeyHasher.DEFAULT, 0);
return new TableOptions(KeyHasher.DEFAULT, 0, false);
}

public TableOptions(KeyHasher h, int backupRequestDelay) {
public TableOptions(KeyHasher h, int backupRequestDelay, boolean enableCompress) {
this.keyHasher = h;
this.backupRequestDelayMs = backupRequestDelay;
this.enableCompress = enableCompress;
}

public boolean enableBackupRequest() {
return backupRequestDelayMs > 0;
}

public boolean enableCompress() {
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
return enableCompress;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package com.xiaomi.infra.pegasus.rpc.interceptor;

import com.xiaomi.infra.pegasus.apps.key_value;
import com.xiaomi.infra.pegasus.apps.mutate;
import com.xiaomi.infra.pegasus.base.error_code.error_types;
import com.xiaomi.infra.pegasus.client.PException;
import com.xiaomi.infra.pegasus.operator.rrdb_check_and_mutate_operator;
import com.xiaomi.infra.pegasus.operator.rrdb_check_and_set_operator;
import com.xiaomi.infra.pegasus.operator.rrdb_get_operator;
import com.xiaomi.infra.pegasus.operator.rrdb_multi_get_operator;
import com.xiaomi.infra.pegasus.operator.rrdb_multi_put_operator;
import com.xiaomi.infra.pegasus.operator.rrdb_put_operator;
import com.xiaomi.infra.pegasus.operator.rrdb_scan_operator;
import com.xiaomi.infra.pegasus.rpc.async.ClientRequestRound;
import com.xiaomi.infra.pegasus.rpc.async.TableHandler;
import com.xiaomi.infra.pegasus.tools.ZstdWrapper;
import java.util.List;

public class CompressInterceptor implements TableInterceptor {
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
private boolean isOpen;

public CompressInterceptor(boolean isOpen) {
this.isOpen = isOpen;
}

@Override
public void before(ClientRequestRound clientRequestRound, TableHandler tableHandler)
throws PException {
if (!isOpen) {
return;
}
tryCompress(clientRequestRound);
}

@Override
public void after(
ClientRequestRound clientRequestRound, error_types errno, TableHandler tableHandler)
throws PException {
if (errno != error_types.ERR_OK || !isOpen) {
return;
}
tryDecompress(clientRequestRound);
}

private void tryCompress(ClientRequestRound clientRequestRound) throws PException {
String operatorName = clientRequestRound.getOperator().name();
switch (operatorName) {
case "put":
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
{
rrdb_put_operator operator = (rrdb_put_operator) clientRequestRound.getOperator();
operator.get_request().value.data =
ZstdWrapper.compress(operator.get_request().value.data);
}
case "multi_put":
{
rrdb_multi_put_operator operator =
(rrdb_multi_put_operator) clientRequestRound.getOperator();
List<key_value> kvs = operator.get_request().kvs;
for (key_value kv : kvs) {
kv.value.data = ZstdWrapper.compress(kv.value.data);
}
break;
}
case "check_and_set":
{
rrdb_check_and_set_operator operator =
(rrdb_check_and_set_operator) clientRequestRound.getOperator();
operator.get_request().set_value.data =
ZstdWrapper.compress(operator.get_request().set_value.data);
break;
}
case "check_and_mutate":
{
rrdb_check_and_mutate_operator operator =
(rrdb_check_and_mutate_operator) clientRequestRound.getOperator();
List<mutate> mutates = operator.get_request().mutate_list;
for (mutate mu : mutates) {
mu.value.data = ZstdWrapper.compress(mu.value.data);
}
}
default:
throw new PException("unsupported operator = " + operatorName);
}
}

private void tryDecompress(ClientRequestRound clientRequestRound) throws PException {
String operatorName = clientRequestRound.getOperator().name();
switch (operatorName) {
case "get":
{
ZstdWrapper.tryDecompress(
((rrdb_get_operator) clientRequestRound.getOperator()).get_response().value.data);
break;
}
case "multi_get":
{
rrdb_multi_get_operator operator =
(rrdb_multi_get_operator) clientRequestRound.getOperator();
List<key_value> kvs = operator.get_response().kvs;
for (key_value kv : kvs) {
kv.value.data = ZstdWrapper.tryDecompress(kv.value.data);
}
break;
}
case "scan":
{
rrdb_scan_operator operator = (rrdb_scan_operator) clientRequestRound.getOperator();
List<key_value> kvs = operator.get_response().kvs;
for (key_value kv : kvs) {
kv.value.data = ZstdWrapper.tryDecompress(kv.value.data);
}
break;
}
default:
throw new PException("unsupported operator = " + operatorName);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
package com.xiaomi.infra.pegasus.rpc.interceptor;

import com.xiaomi.infra.pegasus.base.error_code.error_types;
import com.xiaomi.infra.pegasus.client.PException;
import com.xiaomi.infra.pegasus.rpc.TableOptions;
import com.xiaomi.infra.pegasus.rpc.async.ClientRequestRound;
import com.xiaomi.infra.pegasus.rpc.async.TableHandler;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;

public class InterceptorManger {
private static final Logger logger = org.slf4j.LoggerFactory.getLogger(InterceptorManger.class);

private List<TableInterceptor> interceptors = new ArrayList<>();

public InterceptorManger(TableOptions options) {
register(new BackupRequestInterceptor(options.enableBackupRequest()));
this.register(new BackupRequestInterceptor(options.enableBackupRequest()))
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
.register(new CompressInterceptor(options.enableCompress()));
}

private InterceptorManger register(TableInterceptor interceptor) {
Expand All @@ -22,14 +26,22 @@ private InterceptorManger register(TableInterceptor interceptor) {

public void before(ClientRequestRound clientRequestRound, TableHandler tableHandler) {
for (TableInterceptor interceptor : interceptors) {
interceptor.before(clientRequestRound, tableHandler);
try {
interceptor.before(clientRequestRound, tableHandler);
} catch (PException e) {
logger.warn("interceptor-before execute failed!", e);
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

public void after(
ClientRequestRound clientRequestRound, error_types errno, TableHandler tableHandler) {
for (TableInterceptor interceptor : interceptors) {
interceptor.after(clientRequestRound, errno, tableHandler);
try {
interceptor.after(clientRequestRound, errno, tableHandler);
} catch (PException e) {
logger.warn("interceptor-after execute failed!", e);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package com.xiaomi.infra.pegasus.rpc.interceptor;

import com.xiaomi.infra.pegasus.base.error_code.error_types;
import com.xiaomi.infra.pegasus.client.PException;
import com.xiaomi.infra.pegasus.rpc.async.ClientRequestRound;
import com.xiaomi.infra.pegasus.rpc.async.TableHandler;

public interface TableInterceptor {
// The behavior before sending the RPC to a table.
void before(ClientRequestRound clientRequestRound, TableHandler tableHandler);
void before(ClientRequestRound clientRequestRound, TableHandler tableHandler) throws PException;
// The behavior after getting reply or failure of the RPC.
void after(ClientRequestRound clientRequestRound, error_types errno, TableHandler tableHandler);
void after(ClientRequestRound clientRequestRound, error_types errno, TableHandler tableHandler)
throws PException;
}
17 changes: 17 additions & 0 deletions src/main/java/com/xiaomi/infra/pegasus/tools/ZstdWrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,23 @@ public class ZstdWrapper {

private ZstdWrapper() {}

/**
* try decompress the `src`, return `src` directly if failed
*
* @param src
* @return
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
*/
public static byte[] tryDecompress(byte[] src) {
byte[] decompressedValue;
try {
decompressedValue = decompress(src);
} catch (PException e) {
// decompress fail
decompressedValue = src;
}
return decompressedValue;
}

/**
* Compresses the `src` and returns the compressed.
*
Expand Down