Skip to content

Commit

Permalink
feat: add zk auth (#3581)
Browse files Browse the repository at this point in the history
  • Loading branch information
dl239 authored Nov 15, 2023
1 parent 714369e commit 2fb650a
Show file tree
Hide file tree
Showing 34 changed files with 206 additions and 36 deletions.
7 changes: 7 additions & 0 deletions docs/en/deploy/conf.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
# If you are deploying the standalone version, you do not need to configure zk_cluster and zk_root_path, just comment these two configurations. Deploying the cluster version needs to configure these two items, and the two configurations of all nodes in a cluster must be consistent
#--zk_cluster=127.0.0.1:7181
#--zk_root_path=/openmldb_cluster
# set the username and password of zookeeper if authentication is enabled
#--zk_cert=user:passwd
# The address of the tablet needs to be specified in the standalone version, and this configuration can be ignored in the cluster version
--tablet=127.0.0.1:9921
# Configure log directory
Expand Down Expand Up @@ -76,6 +78,8 @@
# If you start the cluster version, you need to specify the address of zk and the node path of the cluster in zk
#--zk_cluster=127.0.0.1:7181
#--zk_root_path=/openmldb_cluster
# set the username and password of zookeeper if authentication is enabled
#--zk_cert=user:passwd
# Configure the thread pool size, it is recommended to be consistent with the number of CPU cores
--thread_pool_size=24
Expand Down Expand Up @@ -218,6 +222,8 @@
# If the deployed openmldb is a cluster version, you need to specify the zk address and the cluster zk node directory
#--zk_cluster=127.0.0.1:7181
#--zk_root_path=/openmldb_cluster
# set the username and password of zookeeper if authentication is enabled
#--zk_cert=user:passwd
# configure log path
--openmldb_log_dir=./logs
Expand Down Expand Up @@ -249,6 +255,7 @@ zookeeper.connection_timeout=5000
zookeeper.max_retries=10
zookeeper.base_sleep_time=1000
zookeeper.max_connect_waitTime=30000
#zookeeper.cert=user:passwd
# Spark Config
spark.home=
Expand Down
7 changes: 7 additions & 0 deletions docs/zh/deploy/conf.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
# 如果是部署单机版不需要配置zk_cluster和zk_root_path,把这俩配置注释即可. 部署集群版需要配置这两项,一个集群中所有节点的这两个配置必须保持一致
#--zk_cluster=127.0.0.1:7181
#--zk_root_path=/openmldb_cluster
# 配置zk认证的用户名和密码, 用冒号分割
#--zk_cert=user:passwd
# 单机版需要指定tablet的地址, 集群版此配置可忽略
--tablet=127.0.0.1:9921
# 配置log目录
Expand Down Expand Up @@ -76,6 +78,8 @@
# 如果启动集群版需要指定zk的地址和集群在zk的节点路径
#--zk_cluster=127.0.0.1:7181
#--zk_root_path=/openmldb_cluster
# 配置zk认证的用户名和密码, 用冒号分割
#--zk_cert=user:passwd
# 配置线程池大小,建议和cpu核数一致
--thread_pool_size=24
Expand Down Expand Up @@ -222,6 +226,8 @@
# 如果部署的openmldb是集群版,需要指定zk地址和集群zk节点目录
#--zk_cluster=127.0.0.1:7181
#--zk_root_path=/openmldb_cluster
# 配置zk认证的用户名和密码, 用冒号分割
#--zk_cert=user:passwd
# 配置日志路径
--openmldb_log_dir=./logs
Expand Down Expand Up @@ -254,6 +260,7 @@ zookeeper.connection_timeout=5000
zookeeper.max_retries=10
zookeeper.base_sleep_time=1000
zookeeper.max_connect_waitTime=30000
#zookeeper.cert=user:passwd
# Spark Config
spark.home=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;

import java.util.concurrent.TimeUnit;
import java.util.List;
Expand All @@ -46,12 +49,26 @@ public CuratorFramework getClient() {
public boolean connect() throws InterruptedException {
log.info("ZKClient connect with config: {}", config);
RetryPolicy retryPolicy = new ExponentialBackoffRetry(config.getBaseSleepTime(), config.getMaxRetries());
CuratorFramework client = CuratorFrameworkFactory.builder()
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
.connectString(config.getCluster())
.sessionTimeoutMs(config.getSessionTimeout())
.connectionTimeoutMs(config.getConnectionTimeout())
.retryPolicy(retryPolicy)
.build();
.retryPolicy(retryPolicy);
if (!config.getCert().isEmpty()) {
builder.authorization("digest", config.getCert().getBytes())
.aclProvider(new ACLProvider() {
@Override
public List<ACL> getDefaultAcl() {
return ZooDefs.Ids.CREATOR_ALL_ACL;
}

@Override
public List<ACL> getAclForPath(String s) {
return ZooDefs.Ids.CREATOR_ALL_ACL;
}
});
}
CuratorFramework client = builder.build();
client.start();
if (!client.blockUntilConnected(config.getMaxConnectWaitTime(), TimeUnit.MILLISECONDS)) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,7 @@ public class ZKConfig {
private int baseSleepTime = 1000;
@Builder.Default
private int maxConnectWaitTime = 30000;
@Builder.Default
private String cert = "";

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class SdkOption {
private String sparkConfPath = "";
private int zkLogLevel = 3;
private String zkLogFile = "";
private String zkCert = "";

// options for standalone mode
private String host = "";
Expand Down Expand Up @@ -70,6 +71,7 @@ public SQLRouterOptions buildSQLRouterOptions() throws SqlException {
copt.setSpark_conf_path(getSparkConfPath());
copt.setZk_log_level(getZkLogLevel());
copt.setZk_log_file(getZkLogFile());
copt.setZk_cert(getZkCert());

// base
buildBaseOptions(copt);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class SyncToolConfig {
// public static int CHANNEL_KEEP_ALIVE_TIME;
public static String ZK_CLUSTER;
public static String ZK_ROOT_PATH;
public static String ZK_CERT;
public static String SYNC_TASK_PROGRESS_PATH;

public static String HADOOP_CONF_DIR;
Expand Down Expand Up @@ -86,6 +87,7 @@ private static void parseFromProperties(Properties prop) {
if (ZK_ROOT_PATH.isEmpty()) {
throw new RuntimeException("zookeeper.root_path should not be empty");
}
ZK_CERT = prop.getProperty("zookeeper.cert", "");

HADOOP_CONF_DIR = prop.getProperty("hadoop.conf.dir", "");
if (HADOOP_CONF_DIR.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,13 @@ public SyncToolImpl(String endpoint) throws SqlException, InterruptedException {
this.zkClient = new ZKClient(ZKConfig.builder()
.cluster(SyncToolConfig.ZK_CLUSTER)
.namespace(SyncToolConfig.ZK_ROOT_PATH)
.cert(SyncToolConfig.ZK_CERT)
.build());
Preconditions.checkState(zkClient.connect(), "zk connect failed");
SdkOption option = new SdkOption();
option.setZkCluster(SyncToolConfig.ZK_CLUSTER);
option.setZkPath(SyncToolConfig.ZK_ROOT_PATH);
option.setZkCert(SyncToolConfig.ZK_CERT);
this.router = new SqlClusterExecutor(option);
this.zkCollectorPath = SyncToolConfig.ZK_ROOT_PATH + "/sync_tool/collector";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,12 @@
import org.apache.commons.logging.LogFactory;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -59,16 +62,34 @@ public TaskManagerClient(String endpoint) {
}

public TaskManagerClient(String zkCluster, String zkPath) throws Exception {
this(zkCluster, zkPath, "");
}

public TaskManagerClient(String zkCluster, String zkPath, String zkCert) throws Exception {
if (zkCluster == null || zkPath == null) {
logger.info("Zookeeper address is wrong, please check the configuration");
}
String masterZnode = zkPath + "/taskmanager/leader";

zkClient = CuratorFrameworkFactory.builder()
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
.connectString(zkCluster)
.sessionTimeoutMs(10000)
.retryPolicy(new ExponentialBackoffRetry(1000, 10))
.build();
.retryPolicy(new ExponentialBackoffRetry(1000, 10));
if (!zkCert.isEmpty()) {
builder.authorization("digest", zkCert.getBytes())
.aclProvider(new ACLProvider() {
@Override
public List<ACL> getDefaultAcl() {
return ZooDefs.Ids.CREATOR_ALL_ACL;
}

@Override
public List<ACL> getAclForPath(String s) {
return ZooDefs.Ids.CREATOR_ALL_ACL;
}
});
}
zkClient = builder.build();
zkClient.start();
Stat stat = zkClient.checkExists().forPath(masterZnode);
if (stat != null) { // The original master exists and is directly connected to it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ public static String getZkRootPath() {
return getString("zookeeper.root_path");
}

public static String getZkCert() {
return props.getProperty("zookeeper.cert", "");
}

public static int getZkConnectionTimeout() {
return getInt("zookeeper.connection_timeout");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ private void initExternalFunction() throws InterruptedException {
.connectionTimeout(TaskManagerConfig.getZkConnectionTimeout())
.maxConnectWaitTime(TaskManagerConfig.getZkMaxConnectWaitTime())
.maxRetries(TaskManagerConfig.getZkMaxRetries())
.cert(TaskManagerConfig.getZkCert())
.build());
zkClient.connect();

Expand Down
2 changes: 2 additions & 0 deletions python/openmldb_sdk/openmldb/sdk/sdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ def init(self):
options.zk_log_level = int(self.options_map['zkLogLevel'])
if 'zkLogFile' in self.options_map:
options.zk_log_file = self.options_map['zkLogFile']
if 'zkCert' in self.options_map:
options.zk_cert = self.options_map['zkCert']
else:
options = sql_router_sdk.StandaloneOptions()
# use host
Expand Down
1 change: 1 addition & 0 deletions release/conf/apiserver.flags.template
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
--role=apiserver
--zk_cluster=127.0.0.1:2181
--zk_root_path=/openmldb
#--zk_cert=user:passwd

--openmldb_log_dir=./logs
--log_level=info
Expand Down
1 change: 1 addition & 0 deletions release/conf/nameserver.flags.template
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
--role=nameserver
--zk_cluster=127.0.0.1:2181
--zk_root_path=/openmldb
#--zk_cert=user:passwd

--openmldb_log_dir=./logs
--log_level=info
Expand Down
1 change: 1 addition & 0 deletions release/conf/tablet.flags.template
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

--zk_cluster=127.0.0.1:2181
--zk_root_path=/openmldb
#--zk_cert=user:passwd

# thread_pool_size建议和cpu核数一致
--thread_pool_size=24
Expand Down
6 changes: 5 additions & 1 deletion src/cmd/openmldb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ DECLARE_string(nameserver);
DECLARE_int32(port);
DECLARE_string(zk_cluster);
DECLARE_string(zk_root_path);
DECLARE_string(zk_auth_schema);
DECLARE_string(zk_cert);
DECLARE_int32(thread_pool_size);
DECLARE_int32(put_concurrency_limit);
DECLARE_int32(get_concurrency_limit);
Expand Down Expand Up @@ -3653,7 +3655,7 @@ void StartNsClient() {
std::shared_ptr<::openmldb::zk::ZkClient> zk_client;
if (!FLAGS_zk_cluster.empty()) {
zk_client = std::make_shared<::openmldb::zk::ZkClient>(FLAGS_zk_cluster, "",
FLAGS_zk_session_timeout, "", FLAGS_zk_root_path);
FLAGS_zk_session_timeout, "", FLAGS_zk_root_path, FLAGS_zk_auth_schema, FLAGS_zk_cert);
if (!zk_client->Init()) {
std::cout << "zk client init failed" << std::endl;
return;
Expand Down Expand Up @@ -3876,6 +3878,8 @@ void StartAPIServer() {
cluster_options.zk_cluster = FLAGS_zk_cluster;
cluster_options.zk_path = FLAGS_zk_root_path;
cluster_options.zk_session_timeout = FLAGS_zk_session_timeout;
cluster_options.zk_auth_schema = FLAGS_zk_auth_schema;
cluster_options.zk_cert = FLAGS_zk_cert;
if (!api_service->Init(cluster_options)) {
PDLOG(WARNING, "Fail to init");
exit(1);
Expand Down
4 changes: 4 additions & 0 deletions src/cmd/sql_cmd.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ DEFINE_string(spark_conf, "", "The config file of Spark job");
// cluster mode
DECLARE_string(zk_cluster);
DECLARE_string(zk_root_path);
DECLARE_string(zk_auth_schema);
DECLARE_string(zk_cert);
DECLARE_int32(zk_session_timeout);
DECLARE_uint32(zk_log_level);
DECLARE_string(zk_log_file);
Expand Down Expand Up @@ -267,6 +269,8 @@ bool InitClusterSDK() {
copt.zk_session_timeout = FLAGS_zk_session_timeout;
copt.zk_log_level = FLAGS_zk_log_level;
copt.zk_log_file = FLAGS_zk_log_file;
copt.zk_auth_schema = FLAGS_zk_auth_schema;
copt.zk_cert = FLAGS_zk_cert;

cs = new ::openmldb::sdk::ClusterSDK(copt);
if (!cs->Init()) {
Expand Down
5 changes: 4 additions & 1 deletion src/datacollector/data_collector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@

DECLARE_string(zk_cluster);
DECLARE_string(zk_root_path);
DECLARE_string(zk_auth_schema);
DECLARE_string(zk_cert);
DECLARE_int32(thread_pool_size);
DECLARE_int32(zk_session_timeout);
DECLARE_int32(zk_keep_alive_check_interval);
Expand Down Expand Up @@ -179,7 +181,8 @@ bool DataCollectorImpl::Init(const std::string& endpoint) {
}
bool DataCollectorImpl::Init(const std::string& zk_cluster, const std::string& zk_path, const std::string& endpoint) {
zk_client_ = std::make_shared<zk::ZkClient>(zk_cluster, FLAGS_zk_session_timeout, endpoint, zk_path,
zk_path + kDataCollectorRegisterPath);
zk_path + kDataCollectorRegisterPath,
FLAGS_zk_auth_schema, FLAGS_zk_cert);
if (!zk_client_->Init()) {
LOG(WARNING) << "fail to init zk client";
return false;
Expand Down
2 changes: 2 additions & 0 deletions src/flags.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ DEFINE_uint32(tablet_heartbeat_timeout, 5 * 60 * 1000, "config the heartbeat of
DEFINE_uint32(tablet_offline_check_interval, 1000, "config the check interval of tablet offline. unit is milliseconds");
DEFINE_string(zk_cluster, "", "config the zookeeper cluster eg ip:2181,ip2:2181,ip3:2181");
DEFINE_string(zk_root_path, "/openmldb", "config the root path of zookeeper");
DEFINE_string(zk_auth_schema, "digest", "config the id of authentication schema");
DEFINE_string(zk_cert, "", "config the application credentials");
DEFINE_string(tablet, "", "config the endpoint of tablet");
DEFINE_string(nameserver, "", "config the endpoint of nameserver");
DEFINE_int32(zk_keep_alive_check_interval, 15000, "config the interval of keep alive check. unit is milliseconds");
Expand Down
3 changes: 2 additions & 1 deletion src/nameserver/cluster_info.cc
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ void ClusterInfo::UpdateNSClient(const std::vector<std::string>& children) {

int ClusterInfo::Init(std::string& msg) {
zk_client_ = std::make_shared<::openmldb::zk::ZkClient>(cluster_add_.zk_endpoints(), FLAGS_zk_session_timeout, "",
cluster_add_.zk_path(), cluster_add_.zk_path() + "/leader");
cluster_add_.zk_path(), cluster_add_.zk_path() + "/leader",
cluster_add_.zk_auth_schema(), cluster_add_.zk_cert());
bool ok = zk_client_->Init();
for (int i = 1; i < 3; i++) {
if (ok) {
Expand Down
2 changes: 0 additions & 2 deletions src/nameserver/name_server_create_remote_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ DECLARE_uint32(name_server_task_max_concurrency);
DECLARE_uint32(system_table_replica_num);
DECLARE_bool(auto_failover);

using ::openmldb::zk::ZkClient;

namespace openmldb {
namespace nameserver {

Expand Down
5 changes: 4 additions & 1 deletion src/nameserver/name_server_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
DECLARE_string(endpoint);
DECLARE_string(zk_cluster);
DECLARE_string(zk_root_path);
DECLARE_string(zk_auth_schema);
DECLARE_string(zk_cert);
DECLARE_string(tablet);
DECLARE_int32(zk_session_timeout);
DECLARE_int32(zk_keep_alive_check_interval);
Expand Down Expand Up @@ -1411,7 +1413,8 @@ bool NameServerImpl::Init(const std::string& zk_cluster, const std::string& zk_p
zone_info_.set_replica_alias("");
zone_info_.set_zone_term(1);
LOG(INFO) << "zone name " << zone_info_.zone_name();
zk_client_ = new ZkClient(zk_cluster, real_endpoint, FLAGS_zk_session_timeout, endpoint, zk_path);
zk_client_ = new ZkClient(zk_cluster, real_endpoint, FLAGS_zk_session_timeout, endpoint, zk_path,
FLAGS_zk_auth_schema, FLAGS_zk_cert);
if (!zk_client_->Init()) {
PDLOG(WARNING, "fail to init zookeeper with cluster[%s]", zk_cluster.c_str());
return false;
Expand Down
5 changes: 4 additions & 1 deletion src/nameserver/name_server_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ DECLARE_string(ssd_root_path);
DECLARE_string(hdd_root_path);
DECLARE_string(zk_cluster);
DECLARE_string(zk_root_path);
DECLARE_string(zk_auth_schema);
DECLARE_string(zk_cert);
DECLARE_int32(zk_session_timeout);
DECLARE_int32(request_timeout_ms);
DECLARE_int32(zk_keep_alive_check_interval);
Expand Down Expand Up @@ -171,7 +173,8 @@ TEST_P(NameServerImplTest, MakesnapshotTask) {

sleep(5);

ZkClient zk_client(FLAGS_zk_cluster, "", 1000, FLAGS_endpoint, FLAGS_zk_root_path);
ZkClient zk_client(FLAGS_zk_cluster, "", 1000, FLAGS_endpoint, FLAGS_zk_root_path,
FLAGS_zk_auth_schema, FLAGS_zk_cert);
ok = zk_client.Init();
ASSERT_TRUE(ok);
std::string op_index_node = FLAGS_zk_root_path + "/op/op_index";
Expand Down
Loading

0 comments on commit 2fb650a

Please sign in to comment.