Skip to content

Commit

Permalink
pass through cos ranger policy url and auth md5 (#71)
Browse files Browse the repository at this point in the history
change the version to v8.1.6 and begin to release

Co-authored-by: alantong(佟明达) <[email protected]>
  • Loading branch information
vintmd and vintmd authored Aug 11, 2022
1 parent ba4cbb7 commit 52ea809
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 34 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.qcloud.cos</groupId>
<artifactId>hadoop-cos</artifactId>
<version>8.1.5</version>
<version>8.1.6</version>
<packaging>jar</packaging>

<name>Apache Hadoop Tencent Cloud COS Support</name>
Expand Down
52 changes: 37 additions & 15 deletions src/main/java/org/apache/hadoop/fs/CosFileSystem.java
Original file line number Diff line number Diff line change
Expand Up @@ -116,15 +116,19 @@ public void initialize(URI uri, Configuration conf) throws IOException {
// network version start from the 2.7.
// sdk version start from the 1.0.4.
this.actualImplFS = getActualFileSystemByClassName(posixBucketFSImpl);
if (this.actualImplFS instanceof CHDFSHadoopFileSystemAdapter) {

// judge normal impl first, skip the class nodef error when only use normal bucket
if (this.actualImplFS instanceof CosNFileSystem) {
this.nativeStore.isPosixBucket(true);
((CosNFileSystem) this.actualImplFS).withStore(this.nativeStore).withBucket(bucket)
.withPosixBucket(isPosixFSStore).withRangerCredentialsClient(rangerCredentialsClient);
} else if (this.actualImplFS instanceof CHDFSHadoopFileSystemAdapter) {
// judge whether ranger client contains policy url or other config need to pass to ofs
this.passThroughRangerConfig();
// before the init, must transfer the config and disable the range in ofs
this.transferOfsConfig();
this.nativeStore.close();
this.nativeStore = null;
} else if (this.actualImplFS instanceof CosNFileSystem) {
this.nativeStore.isPosixBucket(true);
((CosNFileSystem) this.actualImplFS).withStore(this.nativeStore).withBucket(bucket)
.withPosixBucket(isPosixFSStore).withRangerCredentialsClient(rangerCredentialsClient);
} else {
// Another class
throw new IOException(
Expand Down Expand Up @@ -202,7 +206,7 @@ public boolean delete(Path f, boolean recursive) throws IOException {
@Override
public FileStatus getFileStatus(Path f) throws IOException {
LOG.debug("Get file status: {}.", f);
checkPermission(f, RangerAccessType.READ);
// keep same not change ranger permission here
return this.actualImplFS.getFileStatus(f);
}

Expand Down Expand Up @@ -367,6 +371,25 @@ public NativeFileSystemStore getStore() {
return this.nativeStore;
}

// pass ofs ranger client config to ofs
private void passThroughRangerConfig() {
if (!this.rangerCredentialsClient.isEnableRangerPluginPermissionCheck()) {
LOG.info("not enable ranger plugin permission check");
return;
}
// todo: alantong, ofs java sdk decide the key
if (this.rangerCredentialsClient.getRangerPolicyUrl() != null) {
String policyUrlKey = Constants.COSN_CONFIG_TRANSFER_PREFIX.
concat(Constants.COSN_POSIX_BUCKET_RANGER_POLICY_URL);
this.getConf().set(policyUrlKey, this.rangerCredentialsClient.getRangerPolicyUrl());
}
if (this.rangerCredentialsClient.getAuthJarMd5() != null) {
String authJarMd5Key = Constants.COSN_CONFIG_TRANSFER_PREFIX.
concat(Constants.COSN_POSIX_BUCKET_RANGER_AUTH_JAR_MD5);
this.getConf().set(authJarMd5Key, this.rangerCredentialsClient.getAuthJarMd5());
}
}

// exclude the ofs original config, filter the ofs config with COSN_CONFIG_TRANSFER_PREFIX
private void transferOfsConfig() {
// 1. list to get transfer prefix ofs config
Expand Down Expand Up @@ -404,11 +427,18 @@ public String getCanonicalServiceName() {
return this.rangerCredentialsClient.doGetCanonicalServiceName();
}


private void checkPermission(Path f, RangerAccessType rangerAccessType) throws IOException {
this.rangerCredentialsClient.doCheckPermission(f, rangerAccessType, getOwnerId(), getWorkingDirectory());
}

/**
* @param conf
* @throws IOException
*/
private void checkCustomAuth(Configuration conf) throws IOException {
this.rangerCredentialsClient.doCheckCustomAuth(conf);
}

private String getOwnerId() {
UserGroupInformation currentUgi;
try {
Expand All @@ -430,14 +460,6 @@ private String getOwnerId() {
return shortUserName;
}

/**
* @param conf
* @throws IOException
*/
private void checkCustomAuth(Configuration conf) throws IOException {
this.rangerCredentialsClient.doCheckCustomAuth(conf);
}

@Override
public void close() throws IOException {
LOG.info("begin to close cos file system");
Expand Down
10 changes: 9 additions & 1 deletion src/main/java/org/apache/hadoop/fs/CosNConfigKeys.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
@InterfaceStability.Unstable
public class CosNConfigKeys extends CommonConfigurationKeys {
public static final String USER_AGENT = "fs.cosn.user.agent";
public static final String DEFAULT_USER_AGENT = "cos-hadoop-plugin-v8.1.5";
public static final String DEFAULT_USER_AGENT = "cos-hadoop-plugin-v8.1.6";

public static final String TENCENT_EMR_VERSION_KEY = "fs.emr.version";

Expand Down Expand Up @@ -97,6 +97,14 @@ public class CosNConfigKeys extends CommonConfigurationKeys {
public static final String FILESTATUS_LIST_MAX_KEYS = "fs.cosn.filestatus.list_max_keys";
public static final int DEFAULT_FILESTATUS_LIST_MAX_KEYS = 2;

// used by normal bucket to control max keys of list status
public static final String LISTSTATUS_LIST_MAX_KEYS = "fs.cosn.liststatus.list_max_keys";
public static final int DEFAULT_LISTSTATUS_LIST_MAX_KEYS = 999;

// used by posix bucket to control max keys of list status
public static final String LISTSTATUS_POSIX_BUCKET__LIST_MAX_KEYS = "fs.cosn.liststatus.posix_bucket.list_max_keys";
public static final int DEFAULT_LISTSTATUS_POSIX_BUCKET_LIST_MAX_KEYS = 5000;

// used for double check complete mpu in case of return cos client exception but status is 200 ok.
public static final String COSN_COMPLETE_MPU_CHECK = "fs.cosn.complete.mpu.check";
public static final boolean DEFAULT_COSN_COMPLETE_MPU_CHECK_ENABLE = true;
Expand Down
39 changes: 26 additions & 13 deletions src/main/java/org/apache/hadoop/fs/CosNFileSystem.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ public class CosNFileSystem extends FileSystem {

static final String SCHEME = "cosn";
static final String PATH_DELIMITER = Path.SEPARATOR;
static final int BUCKET_LIST_LIMIT = 999;
static final int POSIX_BUCKET_LIST_LIMIT = 5000;
static final Charset METADATA_ENCODING = StandardCharsets.UTF_8;
// The length of name:value pair should be less than or equal to 1024 bytes.
static final int MAX_XATTR_SIZE = 1024;
Expand Down Expand Up @@ -357,7 +355,11 @@ public boolean delete(Path f, boolean recursive) throws IOException {
}
// how to tell the result
if (!isPosixBucket) {
internalRecursiveDelete(key);
int listMaxLength = this.getConf().getInt(
CosNConfigKeys.LISTSTATUS_LIST_MAX_KEYS,
CosNConfigKeys.DEFAULT_LISTSTATUS_LIST_MAX_KEYS
);
internalRecursiveDelete(key, listMaxLength);
} else {
internalAutoRecursiveDelete(key);
}
Expand All @@ -372,14 +374,14 @@ public boolean delete(Path f, boolean recursive) throws IOException {
return true;
}

private void internalRecursiveDelete(String key) throws IOException {
private void internalRecursiveDelete(String key, int listMaxLength) throws IOException {
CosNDeleteFileContext deleteFileContext = new CosNDeleteFileContext();
int deleteToFinishes = 0;

String priorLastKey = null;
do {
CosNPartialListing listing =
nativeStore.list(key, this.BUCKET_LIST_LIMIT, priorLastKey, true);
nativeStore.list(key, listMaxLength, priorLastKey, true);
for (FileMetadata file : listing.getFiles()) {
checkPermission(new Path(file.getKey()), RangerAccessType.DELETE);
this.boundedCopyThreadPool.execute(new CosNDeleteFileTask(
Expand Down Expand Up @@ -486,12 +488,18 @@ public FileStatus getFileStatus(Path f) throws IOException {
public FileStatus[] listStatus(Path f) throws IOException {
Path absolutePath = makeAbsolute(f);
String key = pathToKey(absolutePath);
int listMaxLength = CosNFileSystem.BUCKET_LIST_LIMIT;

int listMaxLength = this.getConf().getInt(
CosNConfigKeys.LISTSTATUS_LIST_MAX_KEYS,
CosNConfigKeys.DEFAULT_LISTSTATUS_LIST_MAX_KEYS
);
if (isPosixBucket) {
listMaxLength = CosNFileSystem.POSIX_BUCKET_LIST_LIMIT;
listMaxLength = this.getConf().getInt(
CosNConfigKeys.LISTSTATUS_POSIX_BUCKET__LIST_MAX_KEYS,
CosNConfigKeys.DEFAULT_LISTSTATUS_POSIX_BUCKET_LIST_MAX_KEYS
);
}


if (key.length() > 0) {
FileMetadata meta = nativeStore.retrieveMetadata(key);
if (meta != null && meta.isFile()) {
Expand Down Expand Up @@ -784,16 +792,21 @@ public boolean rename(Path src, Path dst) throws IOException {
}

if (!isPosixBucket) {
return internalCopyAndDelete(src, dst, srcFileStatus.isDirectory());
int listMaxLength = this.getConf().getInt(
CosNConfigKeys.LISTSTATUS_LIST_MAX_KEYS,
CosNConfigKeys.DEFAULT_LISTSTATUS_LIST_MAX_KEYS
);
return internalCopyAndDelete(src, dst, srcFileStatus.isDirectory(), listMaxLength);
} else {
return internalRename(src, dst);
}
}

private boolean internalCopyAndDelete(Path srcPath, Path dstPath, boolean isDir) throws IOException {
private boolean internalCopyAndDelete(Path srcPath, Path dstPath,
boolean isDir, int listMaxLength) throws IOException {
boolean result = false;
if (isDir) {
result = this.copyDirectory(srcPath, dstPath);
result = this.copyDirectory(srcPath, dstPath, listMaxLength);
} else {
result = this.copyFile(srcPath, dstPath);
}
Expand Down Expand Up @@ -822,7 +835,7 @@ private boolean copyFile(Path srcPath, Path dstPath) throws IOException {
return true;
}

private boolean copyDirectory(Path srcPath, Path dstPath) throws IOException {
private boolean copyDirectory(Path srcPath, Path dstPath, int listMaxLength) throws IOException {
String srcKey = pathToKey(srcPath);
if (!srcKey.endsWith(PATH_DELIMITER)) {
srcKey += PATH_DELIMITER;
Expand All @@ -849,7 +862,7 @@ private boolean copyDirectory(Path srcPath, Path dstPath) throws IOException {
String priorLastKey = null;
do {
CosNPartialListing objectList = this.nativeStore.list(srcKey,
this.BUCKET_LIST_LIMIT, priorLastKey, true);
listMaxLength, priorLastKey, true);
for (FileMetadata file : objectList.getFiles()) {
checkPermission(new Path(file.getKey()), RangerAccessType.DELETE);
this.boundedCopyThreadPool.execute(new CosNCopyFileTask(
Expand Down
36 changes: 32 additions & 4 deletions src/main/java/org/apache/hadoop/fs/RangerCredentialsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.auth.RangerCredentialsProvider;
import org.apache.hadoop.fs.cosn.ranger.client.RangerQcloudObjectStorageClient;
import org.apache.hadoop.fs.cosn.ranger.security.authorization.AccessType;
import org.apache.hadoop.fs.cosn.ranger.security.authorization.PermissionRequest;
import org.apache.hadoop.fs.cosn.ranger.security.authorization.PermissionResponse;
import org.apache.hadoop.fs.cosn.ranger.security.authorization.ServiceType;
import org.apache.hadoop.fs.cosn.ranger.security.authorization.*;
import org.apache.hadoop.fs.cosn.ranger.security.sts.GetSTSResponse;
import org.apache.hadoop.security.token.Token;
import org.slf4j.Logger;
Expand All @@ -29,6 +26,10 @@ public class RangerCredentialsClient {

private boolean enableRangerPluginPermissionCheck = false;

private String rangerPolicyUrl;

private String authJarMd5;

public RangerCredentialsClient() {
}

Expand Down Expand Up @@ -168,6 +169,20 @@ private void initRangerClientImpl(Configuration conf) throws IOException {
(RangerQcloudObjectStorageClient) rangerClientImplClass.newInstance();
tmpClient.init(conf);
RangerCredentialsClient.rangerQcloudObjectStorageStorageClient = tmpClient;

// set ranger policy url and other auth info.
// when use posix mode to query bucket. server side also auth the policy url.
// so need to pass these configurations to ofs java sdk which carried on when mount fs.
RangerAuthPolicyResponse rangerAuthPolicyResp =
rangerQcloudObjectStorageStorageClient.getRangerAuthPolicy();
if (rangerAuthPolicyResp != null) {
if (rangerAuthPolicyResp.getRangerPolicyUrl() != null) {
this.rangerPolicyUrl = rangerAuthPolicyResp.getRangerPolicyUrl();
}
if (rangerAuthPolicyResp.getAuthJarMd5() != null) {
this.authJarMd5 = rangerAuthPolicyResp.getAuthJarMd5();
}
}
} catch (Exception e) {
log.error(String.format("init %s failed", CosNConfigKeys.COSN_RANGER_PLUGIN_CLIENT_IMPL), e);
throw new IOException(String.format("init %s failed",
Expand All @@ -176,6 +191,19 @@ private void initRangerClientImpl(Configuration conf) throws IOException {
}
}
}
} // end of init ranger impl

// must call after init
public String getRangerPolicyUrl() {
return this.rangerPolicyUrl;
}

public String getAuthJarMd5() {
return this.authJarMd5;
}

public boolean isEnableRangerPluginPermissionCheck() {
return this.enableRangerPluginPermissionCheck;
}

}
4 changes: 4 additions & 0 deletions src/main/java/org/apache/hadoop/fs/cosn/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,8 @@ private Constants() {
public static final String COSN_POSIX_BUCKET_FS_CHDFS_IMPL="com.qcloud.chdfs.fs.CHDFSHadoopFileSystemAdapter";

public static final String CUSTOM_AUTHENTICATION = "custom authentication";

// posix bucket ranger config need to pass through
public static final String COSN_POSIX_BUCKET_RANGER_POLICY_URL = "fs.ofs.cosn.ranger.policy.url";
public static final String COSN_POSIX_BUCKET_RANGER_AUTH_JAR_MD5 = "fs.ofs.cosn.ranger.auth.jar.md5";
}

0 comments on commit 52ea809

Please sign in to comment.