Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/tencentyun/hadoop-cos
Browse files Browse the repository at this point in the history
  • Loading branch information
yuyang733 committed Sep 11, 2022
2 parents 64cf60e + 8ae8414 commit a13545c
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 8 deletions.
75 changes: 67 additions & 8 deletions src/main/java/org/apache/hadoop/fs/CosFileSystem.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,7 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;


/**
Expand All @@ -47,6 +43,9 @@ public class CosFileSystem extends FileSystem {
private NativeFileSystemStore nativeStore;
private boolean isPosixFSStore;
private boolean isDefaultNativeStore;
private volatile boolean healthyFlag = false;
private boolean isPosixUseOFSRanger;
private boolean isPosixImpl = false;
private FileSystem actualImplFS = null;

private URI uri;
Expand Down Expand Up @@ -93,6 +92,9 @@ public void initialize(URI uri, Configuration conf) throws IOException {
this.isDefaultNativeStore = true;
}
this.rangerCredentialsClient = this.nativeStore.getRangerCredentialsClient();
this.isPosixUseOFSRanger = this.getConf().
getBoolean(CosNConfigKeys.COSN_POSIX_BUCKET_USE_OFS_RANGER_ENABLED,
CosNConfigKeys.DEFAULT_COSN_POSIX_BUCKET_USE_OFS_RANGER_ENABLED);

// required checkCustomAuth if ranger is enabled and custom authentication is enabled
checkCustomAuth(conf);
Expand All @@ -108,8 +110,8 @@ public void initialize(URI uri, Configuration conf) throws IOException {
CosNConfigKeys.DEFAULT_COSN_POSIX_BUCKET_FS_IMPL);
}

LOG.info("The posix bucket [{}] use the class [{}] as the filesystem implementation.",
bucket, posixBucketFSImpl);
LOG.info("The posix bucket [{}] use the class [{}] as the filesystem implementation, " +
"use each ranger [{}]", bucket, posixBucketFSImpl, this.isPosixUseOFSRanger);
// if ofs impl.
// network version start from the 2.7.
// sdk version start from the 1.0.4.
Expand All @@ -121,6 +123,7 @@ public void initialize(URI uri, Configuration conf) throws IOException {
((CosNFileSystem) this.actualImplFS).withStore(this.nativeStore).withBucket(bucket)
.withPosixBucket(isPosixFSStore).withRangerCredentialsClient(rangerCredentialsClient);
} else if (this.actualImplFS instanceof CHDFSHadoopFileSystemAdapter) {
this.isPosixImpl = true;
// judge whether ranger client contains policy url or other config need to pass to ofs
this.passThroughRangerConfig();
// before the init, must transfer the config and disable the range in ofs
Expand All @@ -141,6 +144,8 @@ public void initialize(URI uri, Configuration conf) throws IOException {


this.actualImplFS.initialize(uri, conf);
// init status
this.healthyFlag = true;
}

// load class to get relate file system
Expand Down Expand Up @@ -169,13 +174,15 @@ public Path getHomeDirectory() {
public FSDataOutputStream append(Path f, int bufferSize,
Progressable progress) throws IOException {
LOG.debug("append file [{}] in COS.", f);
healthyCheck();
checkPermission(f, RangerAccessType.WRITE);
return this.actualImplFS.append(f, bufferSize, progress);
}

@Override
public boolean truncate(Path f, long newLength) throws IOException {
LOG.debug("truncate file [{}] in COS.", f);
healthyCheck();
checkPermission(f, RangerAccessType.WRITE);
return this.actualImplFS.truncate(f, newLength);
}
Expand All @@ -187,6 +194,7 @@ public FSDataOutputStream create(Path f, FsPermission permission,
long blockSize, Progressable progress)
throws IOException {
LOG.debug("Creating a new file [{}] in COS.", f);
healthyCheck();
checkPermission(f, RangerAccessType.WRITE);
return this.actualImplFS.create(f, permission, overwrite, bufferSize,
replication, blockSize, progress);
Expand All @@ -196,13 +204,15 @@ public FSDataOutputStream create(Path f, FsPermission permission,
@Override
public boolean delete(Path f, boolean recursive) throws IOException {
LOG.debug("Ready to delete path: {}. recursive: {}.", f, recursive);
healthyCheck();
checkPermission(f, RangerAccessType.DELETE);
return this.actualImplFS.delete(f, recursive);
}

@Override
public FileStatus getFileStatus(Path f) throws IOException {
LOG.debug("Get file status: {}.", f);
healthyCheck();
// keep same not change ranger permission here
return this.actualImplFS.getFileStatus(f);
}
Expand All @@ -225,6 +235,7 @@ public URI getUri() {
@Override
public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException {
LOG.debug("list status:" + f);
healthyCheck();
checkPermission(f, RangerAccessType.LIST);
return this.actualImplFS.listStatus(f);
}
Expand All @@ -233,20 +244,23 @@ public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException
public boolean mkdirs(Path f, FsPermission permission)
throws IOException {
LOG.debug("mkdirs path: {}.", f);
healthyCheck();
checkPermission(f, RangerAccessType.WRITE);
return this.actualImplFS.mkdirs(f, permission);
}

@Override
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
LOG.debug("Open file [{}] to read, buffer [{}]", f, bufferSize);
healthyCheck();
checkPermission(f, RangerAccessType.READ);
return this.actualImplFS.open(f, bufferSize);
}

@Override
public boolean rename(Path src, Path dst) throws IOException {
LOG.debug("Rename the source path [{}] to the dest path [{}].", src, dst);
healthyCheck();
checkPermission(src, RangerAccessType.DELETE);
checkPermission(dst, RangerAccessType.WRITE);
return this.actualImplFS.rename(src, dst);
Expand Down Expand Up @@ -274,6 +288,7 @@ public Path getWorkingDirectory() {
@Override
public FileChecksum getFileChecksum(Path f, long length) throws IOException {
LOG.debug("call the checksum for the path: {}.", f);
healthyCheck();
checkPermission(f, RangerAccessType.READ);
Preconditions.checkArgument(length >= 0);
return this.actualImplFS.getFileChecksum(f, length);
Expand All @@ -292,6 +307,7 @@ public FileChecksum getFileChecksum(Path f, long length) throws IOException {
@Override
public void setXAttr(Path f, String name, byte[] value, EnumSet<XAttrSetFlag> flag) throws IOException {
LOG.debug("set XAttr: {}.", f);
healthyCheck();
checkPermission(f, RangerAccessType.WRITE);
this.actualImplFS.setXAttr(f, name, value, flag);
}
Expand All @@ -307,6 +323,7 @@ public void setXAttr(Path f, String name, byte[] value, EnumSet<XAttrSetFlag> fl
@Override
public byte[] getXAttr(Path f, String name) throws IOException {
LOG.debug("get XAttr: {}.", f);
healthyCheck();
checkPermission(f, RangerAccessType.READ);
return this.actualImplFS.getXAttr(f, name);
}
Expand All @@ -322,13 +339,15 @@ public byte[] getXAttr(Path f, String name) throws IOException {
@Override
public Map<String, byte[]> getXAttrs(Path f, List<String> names) throws IOException {
LOG.debug("get XAttrs: {}.", f);
healthyCheck();
checkPermission(f, RangerAccessType.READ);
return this.actualImplFS.getXAttrs(f, names);
}

@Override
public Map<String, byte[]> getXAttrs(Path f) throws IOException {
LOG.debug("get XAttrs: {}.", f);
healthyCheck();
checkPermission(f, RangerAccessType.READ);
return this.actualImplFS.getXAttrs(f);
}
Expand All @@ -343,13 +362,15 @@ public Map<String, byte[]> getXAttrs(Path f) throws IOException {
@Override
public void removeXAttr(Path f, String name) throws IOException {
LOG.debug("remove XAttr: {}.", f);
healthyCheck();
checkPermission(f, RangerAccessType.WRITE);
this.actualImplFS.removeXAttr(f, name);
}

@Override
public List<String> listXAttrs(Path f) throws IOException {
LOG.debug("list XAttrs: {}.", f);
healthyCheck();
checkPermission(f, RangerAccessType.READ);
return this.actualImplFS.listXAttrs(f);
}
Expand All @@ -358,6 +379,9 @@ public List<String> listXAttrs(Path f) throws IOException {
public Token<?> getDelegationToken(String renewer) throws IOException {
LOG.info("getDelegationToken, renewer: {}, stack: {}",
renewer, Arrays.toString(Thread.currentThread().getStackTrace()).replace(',', '\n'));
if (useOFSRanger()) {
return this.actualImplFS.getDelegationToken(renewer);
}
Token<?> token = this.rangerCredentialsClient.doGetDelegationToken(renewer);
if (token != null)
return token;
Expand All @@ -370,11 +394,23 @@ public NativeFileSystemStore getStore() {

// pass ofs ranger client config to ofs
private void passThroughRangerConfig() {
// ofs ranger init get ranger policy auto
String ofsRangerKey = Constants.COSN_CONFIG_TRANSFER_PREFIX.
concat(Constants.COSN_POSIX_BUCKCET_OFS_RANGER_FLAG);
if (useOFSRanger()) {
// set ofs ranger open
this.getConf().setBoolean(ofsRangerKey, true);
return;
} else {
// set false, avoid sdk change the default value
this.getConf().setBoolean(ofsRangerKey, false);
}

if (!this.rangerCredentialsClient.isEnableRangerPluginPermissionCheck()) {
LOG.info("not enable ranger plugin permission check");
return;
}
// todo: alantong, ofs java sdk decide the key

if (this.rangerCredentialsClient.getRangerPolicyUrl() != null) {
String policyUrlKey = Constants.COSN_CONFIG_TRANSFER_PREFIX.
concat(Constants.COSN_POSIX_BUCKET_RANGER_POLICY_URL);
Expand Down Expand Up @@ -412,6 +448,7 @@ private void transferOfsConfig() {
// CHDFS Support Only
public void releaseFileLock(Path f) throws IOException {
LOG.debug("Release the file lock: {}.", f);
healthyCheck();
if (this.actualImplFS instanceof CHDFSHadoopFileSystemAdapter) {
((CHDFSHadoopFileSystemAdapter) this.actualImplFS).releaseFileLock(f);
} else {
Expand All @@ -421,18 +458,33 @@ public void releaseFileLock(Path f) throws IOException {

@Override
public String getCanonicalServiceName() {
if (useOFSRanger()) {
return this.actualImplFS.getCanonicalServiceName();
}
return this.rangerCredentialsClient.doGetCanonicalServiceName();
}

private void checkPermission(Path f, RangerAccessType rangerAccessType) throws IOException {
if (useOFSRanger()) {
return;
}
this.rangerCredentialsClient.doCheckPermission(f, rangerAccessType, getOwnerId(), getWorkingDirectory());
}

private boolean useOFSRanger() {
if (this.isPosixImpl && this.isPosixUseOFSRanger) {
return true;
}
return false;
}

/**
* @param conf
* @throws IOException
*/
private void checkCustomAuth(Configuration conf) throws IOException {
// todo: need get token first
healthyCheck();
this.rangerCredentialsClient.doCheckCustomAuth(conf);
}

Expand All @@ -457,6 +509,12 @@ private String getOwnerId() {
return shortUserName;
}

private void healthyCheck() throws IOException {
if (!this.healthyFlag) {
throw new IOException("fileSystem has been closed or not init");
}
}

@Override
public void close() throws IOException {
LOG.info("begin to close cos file system");
Expand All @@ -465,5 +523,6 @@ public void close() throws IOException {
// close range client later, inner native store
this.nativeStore.close();
}
this.healthyFlag = false;
}
}
5 changes: 5 additions & 0 deletions src/main/java/org/apache/hadoop/fs/CosNConfigKeys.java
Original file line number Diff line number Diff line change
Expand Up @@ -155,4 +155,9 @@ public class CosNConfigKeys extends CommonConfigurationKeys {
public static final boolean DEFAULT_COSN_FLUSH_ENABLED = true;
public static final String COSN_MAPDISK_DELETEONEXIT_ENABLED = "fs.cosn.map_disk.delete_on_exit.enabled";
public static final boolean DEFAULT_COSN_MAPDISK_DELETEONEXIT_ENABLED = true;

// range control, whether meta engine need query own ranger. can be used when transfer from ofs to cos ranger
public static final String COSN_POSIX_BUCKET_USE_OFS_RANGER_ENABLED = "fs.cosn.posix.bucket.use_ofs_ranger.enabled";
public static final boolean DEFAULT_COSN_POSIX_BUCKET_USE_OFS_RANGER_ENABLED = false;

}
1 change: 1 addition & 0 deletions src/main/java/org/apache/hadoop/fs/cosn/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,5 @@ private Constants() {
// posix bucket ranger config need to pass through
public static final String COSN_POSIX_BUCKET_RANGER_POLICY_URL = "fs.ofs.cosn.ranger.policy.url";
public static final String COSN_POSIX_BUCKET_RANGER_AUTH_JAR_MD5 = "fs.ofs.cosn.ranger.auth.jar.md5";
public static final String COSN_POSIX_BUCKCET_OFS_RANGER_FLAG = "fs.ofs.ranger.enable.flag";
}

0 comments on commit a13545c

Please sign in to comment.