Skip to content
This repository has been archived by the owner on May 10, 2022. It is now read-only.

Commit

Permalink
feat: adapt partition split (#165)
Browse files Browse the repository at this point in the history
  • Loading branch information
hycdong authored Aug 30, 2021
1 parent 034c114 commit b31c506
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 29 deletions.
12 changes: 12 additions & 0 deletions src/main/java/com/xiaomi/infra/pegasus/base/error_code.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,18 @@ public enum error_types {
ERR_APP_DROPPED,
ERR_MOCK_INTERNAL,
ERR_ZOOKEEPER_OPERATION,
ERR_CHILD_REGISTERED,
ERR_INGESTION_FAILED,
ERR_UNAUTHENTICATED,
ERR_KRB5_INTERNAL,

ERR_SASL_INTERNAL,
ERR_SASL_INCOMPLETE,
ERR_ACL_DENY,
ERR_SPLITTING,
ERR_PARENT_PARTITION_MISUSED,
ERR_CHILD_NOT_READY,
ERR_DISK_INSUFFICIENT,

// ERROR_CODE defined by client
ERR_SESSION_RESET,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1925,6 +1925,13 @@ public void handleReplicaException(
break;
case ERR_INVALID_DATA:
message = " The request maybe too large!";
break;
case ERR_SPLITTING:
message = " The table is executing partition split!";
break;
case ERR_PARENT_PARTITION_MISUSED:
message = " The partition split finished, is updating config!";
break;
}
promise.setFailure(
new PException(new ReplicationException(op.rpc_error.errno, header + message)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ public static interface ExistListener extends GenericFutureListener<Future<Boole
@Override
public void operationComplete(Future<Boolean> future) throws Exception;
}

/**
* Check value existence for a specific (hashKey, sortKey) pair of current table, async version
*
Expand Down
8 changes: 2 additions & 6 deletions src/main/java/com/xiaomi/infra/pegasus/rpc/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,6 @@ public final long getKeyHash(byte[] data) {
return KeyHasher.DEFAULT.hash(data);
}

public final gpid getGpidByHash(long hash_value) {
com.xiaomi.infra.pegasus.base.gpid result = new com.xiaomi.infra.pegasus.base.gpid(appID_, -1);
result.set_pidx((int) remainder_unsigned(hash_value, getPartitionCount()));
return result;
}

public final gpid[] getAllGpid() {
int count = getPartitionCount();
com.xiaomi.infra.pegasus.base.gpid[] ret = new com.xiaomi.infra.pegasus.base.gpid[count];
Expand Down Expand Up @@ -71,6 +65,8 @@ public final <T> DefaultPromise<T> newPromise() {

public abstract void asyncOperate(client_operator op, ClientOPCallback callback, int timeoutMs);

public abstract gpid getGpidByHash(long hashValue);

public abstract EventExecutor getExecutor();

protected String tableName_;
Expand Down
80 changes: 58 additions & 22 deletions src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -131,45 +131,71 @@ public ReplicaConfiguration getReplicaConfig(int index) {
return tableConfig_.get().replicas.get(index);
}

public gpid getGpidByHash(long hashValue) {
int index = (int) remainder_unsigned(hashValue, getPartitionCount());
final ReplicaConfiguration replicaConfiguration = tableConfig_.get().replicas.get(index);
// table is partition split, and child partition is not ready
// child requests should be redirected to its parent partition
if (tableConfig_.get().replicas.get(index).ballot < 0) {
logger.info(
"Table[{}] is executing partition split, partition[{}] is not ready, requests will send to parent partition[{}]",
tableName_,
index,
index - getPartitionCount() / 2);
index -= getPartitionCount() / 2;
}
return new gpid(appID_, index);
}

// update the table configuration & appID_ according to to queried response
// there should only be one thread to do the table config update
void initTableConfiguration(query_cfg_response resp) {
int partitionCount = resp.getPartition_count();
TableConfiguration oldConfig = tableConfig_.get();

TableConfiguration newConfig = new TableConfiguration();
newConfig.updateVersion = (oldConfig == null) ? 1 : (oldConfig.updateVersion + 1);
newConfig.replicas = new ArrayList<>(resp.getPartition_count());
for (int i = 0; i != resp.getPartition_count(); ++i) {
newConfig.replicas = new ArrayList<>(partitionCount);
for (int i = 0; i != partitionCount; ++i) {
ReplicaConfiguration newReplicaConfig = new ReplicaConfiguration();
newReplicaConfig.pid.set_app_id(resp.getApp_id());
newReplicaConfig.pid.set_pidx(i);
newConfig.replicas.add(newReplicaConfig);
}

// create sessions for primary and secondaries
FutureGroup<Void> futureGroup = new FutureGroup<>(resp.getPartition_count());
FutureGroup<Void> futureGroup = new FutureGroup<>(partitionCount);
for (partition_configuration pc : resp.getPartitions()) {
ReplicaConfiguration s = newConfig.replicas.get(pc.getPid().get_pidx());
s.ballot = pc.ballot;
int index = pc.getPid().get_pidx();
ReplicaConfiguration replicaConfig = newConfig.replicas.get(index);
replicaConfig.ballot = pc.ballot;

// table is partition split, and child partition is not ready
// child requests should be redirected to its parent partition
// this will be happened when query meta is called during partition split
if (replicaConfig.ballot < 0) {
continue;
}

replicaConfig.primaryAddress = pc.getPrimary();
// If the primary address is invalid, we don't create secondary session either.
// Because all of these sessions will be recreated later.
s.primaryAddress = pc.primary;
if (!pc.primary.isInvalid()) {
s.primarySession = tryConnect(pc.primary, futureGroup);

// backup request is enabled, get all secondary sessions
s.secondarySessions.clear();
if (isBackupRequestEnabled()) {
// secondary sessions
pc.secondaries.forEach(
secondary -> {
ReplicaSession session = tryConnect(secondary, futureGroup);
if (session != null) {
s.secondarySessions.add(session);
}
});
}
if (replicaConfig.primaryAddress.isInvalid()) {
continue;
}
replicaConfig.primarySession = tryConnect(replicaConfig.primaryAddress, futureGroup);

replicaConfig.secondarySessions.clear();
// backup request is enabled, get all secondary sessions
if (isBackupRequestEnabled()) {
// secondary sessions
pc.secondaries.forEach(
secondary -> {
ReplicaSession session = tryConnect(secondary, futureGroup);
if (session != null) {
replicaConfig.secondarySessions.add(session);
}
});
}
}

Expand Down Expand Up @@ -200,14 +226,22 @@ public ReplicaSession tryConnect(final rpc_address addr, FutureGroup<Void> futur
return session;
}

boolean isPartitionCountValid(int old_count, int resp_count) {
return ((old_count == resp_count) // normal case
|| (old_count * 2 == resp_count) // table start partition split
|| (old_count == resp_count * 2) // table partition split cancel
);
}

void onUpdateConfiguration(final query_cfg_operator op) {
error_types err = MetaSession.getMetaServiceError(op);
if (err != error_types.ERR_OK) {
logger.warn("query meta for table({}) failed, error_code({})", tableName_, err.toString());
} else {
logger.info("query meta for table({}) received response", tableName_);
query_cfg_response resp = op.get_response();
if (resp.app_id != appID_ || resp.partition_count != tableConfig_.get().replicas.size()) {
if (resp.app_id != appID_
|| !isPartitionCountValid(tableConfig_.get().replicas.size(), resp.partition_count)) {
logger.warn(
"table({}) meta reset, app_id({}->{}), partition_count({}->{})",
tableName_,
Expand Down Expand Up @@ -293,6 +327,8 @@ public void onRpcReply(ClientRequestRound round, long cachedConfigVersion, Strin
case ERR_SESSION_RESET: // <- connection with the server failed
case ERR_OBJECT_NOT_FOUND: // <- replica server doesn't serve this gpid
case ERR_INVALID_STATE: // <- replica server is not primary
case ERR_PARENT_PARTITION_MISUSED: // <- send request to wrong partition because of partition
// split
logger.warn(
"{}: replica server({}) doesn't serve gpid({}), operator({}), try({}), error_code({}), need query meta",
tableName_,
Expand Down

0 comments on commit b31c506

Please sign in to comment.