Skip to content

Commit

Permalink
Implement resource group interfaces
Browse files Browse the repository at this point in the history
Signed-off-by: yhmo <[email protected]>
  • Loading branch information
yhmo committed Nov 2, 2023
1 parent 0d596e9 commit 1167612
Show file tree
Hide file tree
Showing 12 changed files with 915 additions and 2 deletions.
197 changes: 197 additions & 0 deletions src/main/java/io/milvus/client/AbstractMilvusGrpcClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import io.milvus.param.highlevel.dml.response.*;
import io.milvus.param.index.*;
import io.milvus.param.partition.*;
import io.milvus.param.resourcegroup.*;
import io.milvus.param.role.*;
import io.milvus.response.*;
import lombok.NonNull;
Expand Down Expand Up @@ -567,6 +568,7 @@ public R<RpcStatus> loadCollection(@NonNull LoadCollectionParam requestParam) {
LoadCollectionRequest.Builder builder = LoadCollectionRequest.newBuilder()
.setCollectionName(requestParam.getCollectionName())
.setReplicaNumber(requestParam.getReplicaNumber())
.addAllResourceGroups(requestParam.getResourceGroups())
.setRefresh(requestParam.isRefresh());
if (StringUtils.isNotEmpty(requestParam.getDatabaseName())) {
builder.setDbName(requestParam.getDatabaseName());
Expand Down Expand Up @@ -999,6 +1001,7 @@ public R<RpcStatus> loadPartitions(@NonNull LoadPartitionsParam requestParam) {
.setCollectionName(requestParam.getCollectionName())
.setReplicaNumber(requestParam.getReplicaNumber())
.addAllPartitionNames(requestParam.getPartitionNames())
.addAllResourceGroups(requestParam.getResourceGroups())
.setRefresh(requestParam.isRefresh());
if (StringUtils.isNotEmpty(requestParam.getDatabaseName())) {
builder.setDbName(requestParam.getDatabaseName());
Expand Down Expand Up @@ -2817,6 +2820,7 @@ public R<GetLoadStateResponse> getLoadState(GetLoadStateParam requestParam) {
}
}

@Override
public R<CheckHealthResponse> checkHealth() {
if (!clientIsReady()) {
return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
Expand All @@ -2837,6 +2841,7 @@ public R<CheckHealthResponse> checkHealth() {
}
}

@Override
public R<GetVersionResponse> getVersion() {
if (!clientIsReady()) {
return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
Expand All @@ -2857,6 +2862,198 @@ public R<GetVersionResponse> getVersion() {
}
}

@Override
public R<RpcStatus> createResourceGroup(CreateResourceGroupParam requestParam) {
if (!clientIsReady()) {
return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
}

logInfo(requestParam.toString());

try {
CreateResourceGroupRequest request = CreateResourceGroupRequest.newBuilder()
.setResourceGroup(requestParam.getGroupName())
.build();

Status response = blockingStub().createResourceGroup(request);
if (response.getErrorCode() != ErrorCode.Success) {
return failedStatus("CreateResourceGroup", response);
}
logDebug("CreateResourceGroup successfully! Resource group name:{}",
requestParam.getGroupName());
return R.success(new RpcStatus(RpcStatus.SUCCESS_MSG));
} catch (StatusRuntimeException e) {
logError("CreateResourceGroup RPC failed! Resource group name:{}",
requestParam.getGroupName(), e);
return R.failed(e);
} catch (Exception e) {
logError("CreateResourceGroup failed! Resource group name:{}",
requestParam.getGroupName(), e);
return R.failed(e);
}
}

@Override
public R<RpcStatus> dropResourceGroup(DropResourceGroupParam requestParam) {
if (!clientIsReady()) {
return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
}

logInfo(requestParam.toString());

try {
DropResourceGroupRequest request = DropResourceGroupRequest.newBuilder()
.setResourceGroup(requestParam.getGroupName())
.build();

Status response = blockingStub().dropResourceGroup(request);
if (response.getErrorCode() != ErrorCode.Success) {
return failedStatus("DropResourceGroup", response);
}
logDebug("DropResourceGroup successfully! Resource group name:{}",
requestParam.getGroupName());
return R.success(new RpcStatus(RpcStatus.SUCCESS_MSG));
} catch (StatusRuntimeException e) {
logError("DropResourceGroup RPC failed! Resource group name:{}",
requestParam.getGroupName(), e);
return R.failed(e);
} catch (Exception e) {
logError("DropResourceGroup failed! Resource group name:{}",
requestParam.getGroupName(), e);
return R.failed(e);
}
}

@Override
public R<ListResourceGroupsResponse> listResourceGroups(ListResourceGroupsParam requestParam) {
if (!clientIsReady()) {
return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
}

logInfo(requestParam.toString());

try {
ListResourceGroupsRequest request = ListResourceGroupsRequest.newBuilder()
.build();

ListResourceGroupsResponse response = blockingStub().listResourceGroups(request);
if (response.getStatus().getErrorCode() != ErrorCode.Success) {
return failedStatus("ListResourceGroups", response.getStatus());
}
logDebug("ListResourceGroups successfully!");
return R.success(response);
} catch (StatusRuntimeException e) {
logError("ListResourceGroups RPC failed!", e);
return R.failed(e);
} catch (Exception e) {
logError("ListResourceGroups failed!", e);
return R.failed(e);
}
}

@Override
public R<DescribeResourceGroupResponse> describeResourceGroup(DescribeResourceGroupParam requestParam) {
if (!clientIsReady()) {
return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
}

logInfo(requestParam.toString());

try {
DescribeResourceGroupRequest request = DescribeResourceGroupRequest.newBuilder()
.setResourceGroup(requestParam.getGroupName())
.build();

DescribeResourceGroupResponse response = blockingStub().describeResourceGroup(request);
if (response.getStatus().getErrorCode() != ErrorCode.Success) {
return failedStatus("DescribeResourceGroup", response.getStatus());
}
logDebug("DescribeResourceGroup successfully! Resource group name:{}",
requestParam.getGroupName());
return R.success(response);
} catch (StatusRuntimeException e) {
logError("DescribeResourceGroup RPC failed! Resource group name:{}",
requestParam.getGroupName(), e);
return R.failed(e);
} catch (Exception e) {
logError("DescribeResourceGroup failed! Resource group name:{}",
requestParam.getGroupName(), e);
return R.failed(e);
}
}

@Override
public R<RpcStatus> transferNode(TransferNodeParam requestParam) {
if (!clientIsReady()) {
return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
}

logInfo(requestParam.toString());

try {
TransferNodeRequest request = TransferNodeRequest.newBuilder()
.setSourceResourceGroup(requestParam.getSourceGroupName())
.setTargetResourceGroup(requestParam.getTargetGroupName())
.setNumNode(requestParam.getNodeNumber())
.build();

Status response = blockingStub().transferNode(request);
if (response.getErrorCode() != ErrorCode.Success) {
return failedStatus("TransferNode", response);
}
logDebug("TransferNode successfully! Source group:{}, target group:{}, nodes number:{}",
requestParam.getSourceGroupName(), requestParam.getTargetGroupName(), requestParam.getNodeNumber());
return R.success(new RpcStatus(RpcStatus.SUCCESS_MSG));
} catch (StatusRuntimeException e) {
logError("TransferNode RPC failed! Source group:{}, target group:{}, nodes number:{}",
requestParam.getSourceGroupName(), requestParam.getTargetGroupName(), requestParam.getNodeNumber(), e);
return R.failed(e);
} catch (Exception e) {
logError("TransferNode failed! Source group:{}, target group:{}, nodes number:{}",
requestParam.getSourceGroupName(), requestParam.getTargetGroupName(), requestParam.getNodeNumber(), e);
return R.failed(e);
}
}

@Override
public R<RpcStatus> transferReplica(TransferReplicaParam requestParam) {
if (!clientIsReady()) {
return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
}

logInfo(requestParam.toString());

try {
TransferReplicaRequest request = TransferReplicaRequest.newBuilder()
.setSourceResourceGroup(requestParam.getSourceGroupName())
.setTargetResourceGroup(requestParam.getTargetGroupName())
.setCollectionName(requestParam.getCollectionName())
.setDbName(requestParam.getDatabaseName())
.setNumReplica(requestParam.getReplicaNumber())
.build();

Status response = blockingStub().transferReplica(request);
if (response.getErrorCode() != ErrorCode.Success) {
return failedStatus("TransferReplica", response);
}
logDebug("TransferReplica successfully! Source group:{}, target group:{}, replica number:{}",
requestParam.getSourceGroupName(), requestParam.getTargetGroupName(),
requestParam.getReplicaNumber());
return R.success(new RpcStatus(RpcStatus.SUCCESS_MSG));
} catch (StatusRuntimeException e) {
logError("TransferReplica RPC failed! Source group:{}, target group:{}, replica number:{}",
requestParam.getSourceGroupName(), requestParam.getTargetGroupName(),
requestParam.getReplicaNumber(), e);
return R.failed(e);
} catch (Exception e) {
logError("TransferReplica failed! Source group:{}, target group:{}, replica number:{}",
requestParam.getSourceGroupName(), requestParam.getTargetGroupName(),
requestParam.getReplicaNumber(), e);
return R.failed(e);
}
}


///////////////////// High Level API//////////////////////
@Override
public R<RpcStatus> createCollection(CreateSimpleCollectionParam requestParam) {
Expand Down
48 changes: 48 additions & 0 deletions src/main/java/io/milvus/client/MilvusClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.milvus.param.index.*;
import io.milvus.param.partition.*;
import io.milvus.param.role.*;
import io.milvus.param.resourcegroup.*;

import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -679,6 +680,53 @@ default void close() {
*/
R<GetLoadStateResponse> getLoadState(GetLoadStateParam requestParam);

/**
* Create a resource group.
*
* @param requestParam {@link CreateResourceGroupParam}
* @return {status:result code, data:RpcStatus{msg: result message}}
*/
R<RpcStatus> createResourceGroup(CreateResourceGroupParam requestParam);

/**
* Drop a resource group.
*
* @param requestParam {@link DropResourceGroupParam}
* @return {status:result code, data:RpcStatus{msg: result message}}
*/
R<RpcStatus> dropResourceGroup(DropResourceGroupParam requestParam);

/**
* List resource groups.
*
* @param requestParam {@link ListResourceGroupsParam}
* @return {status:result code, data:ListResourceGroupsResponse{status}}
*/
R<ListResourceGroupsResponse> listResourceGroups(ListResourceGroupsParam requestParam);

/**
* Describe a resource group.
*
* @param requestParam {@link DescribeResourceGroupParam}
* @return {status:result code, data:DescribeResourceGroupResponse{status}}
*/
R<DescribeResourceGroupResponse> describeResourceGroup(DescribeResourceGroupParam requestParam);

/**
* Transfer a query node from source resource group to target resource_group.
*
* @param requestParam {@link TransferNodeParam}
* @return {status:result code, data:RpcStatus{msg: result message}}
*/
R<RpcStatus> transferNode(TransferNodeParam requestParam);

/**
* Transfer a replica from source resource group to target resource_group.
*
* @param requestParam {@link TransferReplicaParam}
* @return {status:result code, data:RpcStatus{msg: result message}}
*/
R<RpcStatus> transferReplica(TransferReplicaParam requestParam);


///////////////////// High Level API//////////////////////
Expand Down
31 changes: 31 additions & 0 deletions src/main/java/io/milvus/client/MilvusMultiServiceClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.milvus.param.highlevel.dml.response.*;
import io.milvus.param.index.*;
import io.milvus.param.partition.*;
import io.milvus.param.resourcegroup.*;
import io.milvus.param.role.*;
import lombok.NonNull;
import org.apache.commons.collections4.CollectionUtils;
Expand Down Expand Up @@ -567,6 +568,36 @@ public R<GetLoadStateResponse> getLoadState(GetLoadStateParam requestParam) {
return this.clusterFactory.getMaster().getClient().getLoadState(requestParam);
}

@Override
public R<RpcStatus> createResourceGroup(CreateResourceGroupParam requestParam) {
return this.clusterFactory.getMaster().getClient().createResourceGroup(requestParam);
}

@Override
public R<RpcStatus> dropResourceGroup(DropResourceGroupParam requestParam) {
return this.clusterFactory.getMaster().getClient().dropResourceGroup(requestParam);
}

@Override
public R<ListResourceGroupsResponse> listResourceGroups(ListResourceGroupsParam requestParam) {
return this.clusterFactory.getMaster().getClient().listResourceGroups(requestParam);
}

@Override
public R<DescribeResourceGroupResponse> describeResourceGroup(DescribeResourceGroupParam requestParam) {
return this.clusterFactory.getMaster().getClient().describeResourceGroup(requestParam);
}

@Override
public R<RpcStatus> transferNode(TransferNodeParam requestParam) {
return this.clusterFactory.getMaster().getClient().transferNode(requestParam);
}

@Override
public R<RpcStatus> transferReplica(TransferReplicaParam requestParam) {
return this.clusterFactory.getMaster().getClient().transferReplica(requestParam);
}

///////////////////// High Level API//////////////////////


Expand Down
29 changes: 29 additions & 0 deletions src/main/java/io/milvus/client/MilvusServiceClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import io.milvus.param.highlevel.dml.response.*;
import io.milvus.param.index.*;
import io.milvus.param.partition.*;
import io.milvus.param.resourcegroup.*;
import io.milvus.param.role.*;
import lombok.NonNull;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -624,7 +625,35 @@ public R<GetLoadStateResponse> getLoadState(GetLoadStateParam requestParam) {
return retry(()-> super.getLoadState(requestParam));
}

@Override
public R<RpcStatus> createResourceGroup(CreateResourceGroupParam requestParam) {
return retry(()-> super.createResourceGroup(requestParam));
}

@Override
public R<RpcStatus> dropResourceGroup(DropResourceGroupParam requestParam) {
return retry(()-> super.dropResourceGroup(requestParam));
}

@Override
public R<ListResourceGroupsResponse> listResourceGroups(ListResourceGroupsParam requestParam) {
return retry(()-> super.listResourceGroups(requestParam));
}

@Override
public R<DescribeResourceGroupResponse> describeResourceGroup(DescribeResourceGroupParam requestParam) {
return retry(()-> super.describeResourceGroup(requestParam));
}

@Override
public R<RpcStatus> transferNode(TransferNodeParam requestParam) {
return retry(()-> super.transferNode(requestParam));
}

@Override
public R<RpcStatus> transferReplica(TransferReplicaParam requestParam) {
return retry(()-> super.transferReplica(requestParam));
}

@Override
public R<RpcStatus> createCollection(CreateSimpleCollectionParam requestParam) {
Expand Down
Loading

0 comments on commit 1167612

Please sign in to comment.