Skip to content
This repository has been archived by the owner on May 10, 2022. It is now read-only.

Commit

Permalink
feat: add a app create interface to the java client (#180)
Browse files Browse the repository at this point in the history
  • Loading branch information
cauchy1988 authored Mar 29, 2022
1 parent fe4bfa2 commit 9db2efc
Show file tree
Hide file tree
Showing 23 changed files with 3,568 additions and 69 deletions.
47 changes: 47 additions & 0 deletions idl/meta_admin.thrift
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

include "base.thrift"

namespace cpp dsn.replication
namespace java com.xiaomi.infra.pegasus.replication

struct create_app_options
{
1:i32 partition_count;
2:i32 replica_count;
3:bool success_if_exist;
4:string app_type;
5:bool is_stateful;
6:map<string, string> envs;
}

// client => meta_server
struct configuration_create_app_request
{
1:string app_name;
2:create_app_options options;
}

// meta_server => client
struct configuration_create_app_response
{
1:base.error_code err;
2:i32 appid;
}
5 changes: 3 additions & 2 deletions idl/recompile_thrift.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ rm -rf $TMP_DIR

mkdir -p $TMP_DIR
$thrift --gen java rrdb.thrift
$thrift --gen java replication.thrift
$thrift --gen java security.thrift
$thrift --gen java replication.thrift
$thrift --gen java security.thrift
$thrift --gen java meta_admin.thrift

for gen_file in `find $TMP_DIR -name "*.java"`; do
cat apache-licence-template $gen_file > $gen_file.tmp
Expand Down
2 changes: 2 additions & 0 deletions idl/rrdb.thrift
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
include "base.thrift"
include "replication.thrift"
include "meta_admin.thrift"

namespace cpp dsn.apps
namespace java com.xiaomi.infra.pegasus.apps
Expand Down Expand Up @@ -298,4 +299,5 @@ service rrdb
service meta
{
replication.query_cfg_response query_cfg(1:replication.query_cfg_request query);
meta_admin.configuration_create_app_response create_app(1:meta_admin.configuration_create_app_request request);
}
902 changes: 898 additions & 4 deletions src/main/java/com/xiaomi/infra/pegasus/apps/meta.java

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion src/main/java/com/xiaomi/infra/pegasus/apps/rrdb.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
package com.xiaomi.infra.pegasus.apps;

@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.11.0)", date = "2022-02-12")
@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.11.0)", date = "2022-03-10")
public class rrdb {

public interface Iface {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.xiaomi.infra.pegasus.client;

import com.xiaomi.infra.pegasus.rpc.Cluster;
import java.util.Properties;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class PegasusAbstractClient {
private static final Logger LOGGER = LoggerFactory.getLogger(PegasusAbstractClient.class);
protected final ClientOptions clientOptions;
protected Cluster cluster;

public PegasusAbstractClient(Properties properties) throws PException {
this(ClientOptions.create(properties));
}

public PegasusAbstractClient(String configPath) throws PException {
this(ClientOptions.create(configPath));
}

protected PegasusAbstractClient(ClientOptions options) throws PException {
this.clientOptions = options;
this.cluster = Cluster.createCluster(clientOptions);
LOGGER.info(
"Create Pegasus{}Client Instance By ClientOptions : {}",
this.clientType(),
this.clientOptions.toString());
}

protected String clientType() {
return "";
}

@Override
public void finalize() {
close();
}

public void close() {
synchronized (this) {
if (null != this.cluster) {
String metaList = StringUtils.join(cluster.getMetaList(), ",");
LOGGER.info("start to close pegasus {} client for [{}]", clientType(), metaList);
cluster.close();
cluster = null;
LOGGER.info("finish to close pegasus {} client for [{}]", clientType(), metaList);
}
}
}
}
197 changes: 197 additions & 0 deletions src/main/java/com/xiaomi/infra/pegasus/client/PegasusAdminClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.xiaomi.infra.pegasus.client;

import com.xiaomi.infra.pegasus.base.error_code;
import com.xiaomi.infra.pegasus.base.gpid;
import com.xiaomi.infra.pegasus.operator.create_app_operator;
import com.xiaomi.infra.pegasus.operator.query_cfg_operator;
import com.xiaomi.infra.pegasus.replication.*;
import com.xiaomi.infra.pegasus.rpc.Meta;
import java.util.Map;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PegasusAdminClient extends PegasusAbstractClient
implements PegasusAdminClientInterface {
private static final Logger LOGGER = LoggerFactory.getLogger(PegasusClient.class);
private static final String APP_TYPE = "pegasus";
private static final int META_RETRY_MIN_COUNT = 5;

private Meta meta;

@Override
public String clientType() {
return "Admin";
}

private void initMeta() {
this.meta = this.cluster.getMeta();
}

public PegasusAdminClient(Properties properties) throws PException {
super(properties);
initMeta();
}

public PegasusAdminClient(String configPath) throws PException {
super(configPath);
initMeta();
}

public PegasusAdminClient(ClientOptions options) throws PException {
super(options);
initMeta();
}

@Override
public void createApp(
String appName,
int partitionCount,
int replicaCount,
Map<String, String> envs,
long timeoutMs)
throws PException {
if (partitionCount < 1) {
throw new PException(
new IllegalArgumentException("createApp failed: partitionCount should >= 1!"));
}

if (replicaCount < 1) {
throw new PException(
new IllegalArgumentException("createApp failed: replicaCount should >= 1!"));
}

int i = 0;
for (; i < appName.length(); i++) {
char c = appName.charAt(i);
if (!((c >= 'a' && c <= 'z')
|| (c >= 'A' && c <= 'Z')
|| (c >= '0' && c <= '9')
|| c == '_'
|| c == '.'
|| c == ':')) {
break;
}
}

if (appName.isEmpty() || i < appName.length()) {
throw new PException(
new IllegalArgumentException(
String.format("createApp failed: invalid appName: %s", appName)));
}

if (timeoutMs <= 0) {
throw new PException(
new IllegalArgumentException(
String.format("createApp failed: invalid timeoutMs: %d", timeoutMs)));
}

long startTime = System.currentTimeMillis();

create_app_options options = new create_app_options();
options.setPartition_count(partitionCount);
options.setReplica_count(replicaCount);
options.setSuccess_if_exist(true);
options.setApp_type(APP_TYPE);
options.setEnvs(envs);
options.setIs_stateful(true);

configuration_create_app_request request = new configuration_create_app_request();
request.setApp_name(appName);
request.setOptions(options);

create_app_operator app_operator = new create_app_operator(appName, request);
error_code.error_types error = this.meta.operate(app_operator, META_RETRY_MIN_COUNT);
if (error != error_code.error_types.ERR_OK) {
throw new PException(
String.format(
"Create app:%s failed, partitionCount: %d, replicaCount: %s, error:%s.",
appName, partitionCount, replicaCount, error.toString()));
}

long remainDuration = timeoutMs - (System.currentTimeMillis() - startTime);
if (remainDuration <= 0) {
remainDuration = 8;
}

boolean isHealthy = false;
while (remainDuration > 0) {
isHealthy = this.isAppHealthy(appName, replicaCount);
if (isHealthy) {
break;
}

try {
Thread.sleep(1000);
} catch (InterruptedException e) {
continue;
}

remainDuration = timeoutMs - (System.currentTimeMillis() - startTime);
}

if (!isHealthy) {
throw new PException(
String.format(
"The newly created app:%s is not fully healthy now, but the interface duration expires 'timeoutMs', partitionCount: %d, replicaCount: %s.",
appName, partitionCount, replicaCount));
}
}

@Override
public boolean isAppHealthy(String appName, int replicaCount) throws PException {
if (replicaCount < 1) {
throw new PException(
new IllegalArgumentException(
String.format("Query app:%s Status failed: replicaCount should >= 1!", appName)));
}

query_cfg_request request = new query_cfg_request();
request.setApp_name(appName);

query_cfg_operator query_op = new query_cfg_operator(new gpid(-1, -1), request);
error_code.error_types error = this.meta.operate(query_op, META_RETRY_MIN_COUNT);
if (error != error_code.error_types.ERR_OK) {
throw new PException(
String.format(
"Query app status failed, app:%s, replicaCount: %s, error:%s.",
appName, replicaCount, error.toString()));
}

query_cfg_response response = query_op.get_response();

int readyCount = 0;
for (int i = 0; i < response.partition_count; ++i) {
partition_configuration pc = response.partitions.get(i);
if (!pc.primary.isInvalid() && (pc.secondaries.size() + 1 >= replicaCount)) {
++readyCount;
}
}

LOGGER.info(
String.format(
"Check app healthy, appName:%s, partitionCount:%d, ready_count:%d.",
appName, response.partition_count, readyCount));

return readyCount == response.partition_count;
}
}
Loading

0 comments on commit 9db2efc

Please sign in to comment.