Skip to content

Commit

Permalink
expose ranger to upper, and fix ranger bug (#55)
Browse files Browse the repository at this point in the history
* expose ranger to upper, and fix ranger bug

* fix ranger bugs which change the init order before create native store

Co-authored-by: alantong(佟明达) <[email protected]>
  • Loading branch information
vintmd and vintmd authored May 17, 2022
1 parent 4db3fcb commit 50cceb1
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 166 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.qcloud.cos</groupId>
<artifactId>hadoop-cos</artifactId>
<version>8.1.0</version>
<version>8.1.1</version>
<packaging>jar</packaging>

<name>Apache Hadoop Tencent Cloud COS Support</name>
Expand Down
161 changes: 159 additions & 2 deletions src/main/java/org/apache/hadoop/fs/CosFileSystem.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,20 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.cosn.Constants;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Progressable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.qcloud.chdfs.permission.RangerAccessType;
import org.apache.hadoop.fs.auth.RangerCredentialsProvider;
import org.apache.hadoop.fs.cosn.ranger.client.RangerQcloudObjectStorageClient;
import org.apache.hadoop.fs.cosn.ranger.security.authorization.AccessType;
import org.apache.hadoop.fs.cosn.ranger.security.authorization.PermissionRequest;
import org.apache.hadoop.fs.cosn.ranger.security.authorization.PermissionResponse;
import org.apache.hadoop.fs.cosn.ranger.security.authorization.ServiceType;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
Expand Down Expand Up @@ -46,6 +55,14 @@ public class CosFileSystem extends FileSystem {
private boolean isDefaultNativeStore;
private FileSystem actualImplFS = null;

private URI uri;
private String bucket;
private Path workingDir;
// Authorization related.
private UserGroupInformation userGroupInformation;
private boolean enableRangerPluginPermissionCheck = false;
public static RangerQcloudObjectStorageClient rangerQcloudObjectStorageStorageClient = null;


public CosFileSystem() {
}
Expand All @@ -70,7 +87,17 @@ public void initialize(URI uri, Configuration conf) throws IOException {
super.initialize(uri, conf);
setConf(conf);

// initialize the things authorization related.
UserGroupInformation.setConfiguration(conf);
this.userGroupInformation = UserGroupInformation.getCurrentUser();
this.initRangerClientImpl(conf);

String bucket = uri.getHost();
this.bucket = bucket;
this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
this.workingDir = new Path("/user", System.getProperty("user.name"))
.makeQualified(this.uri, this.getWorkingDirectory());

if (null == this.nativeStore) {
this.nativeStore = CosNUtils.createDefaultStore(conf);
this.nativeStore.initialize(uri, conf);
Expand Down Expand Up @@ -116,6 +143,7 @@ public void initialize(URI uri, Configuration conf) throws IOException {
((CosNFileSystem) this.actualImplFS).withPosixBucket(this.isPosixFSStore);
}


this.actualImplFS.initialize(uri, conf);
}

Expand Down Expand Up @@ -145,12 +173,14 @@ public Path getHomeDirectory() {
public FSDataOutputStream append(Path f, int bufferSize,
Progressable progress) throws IOException {
LOG.debug("append file [{}] in COS.", f);
checkPermission(f, RangerAccessType.WRITE);
return this.actualImplFS.append(f, bufferSize, progress);
}

@Override
public boolean truncate(Path f, long newLength) throws IOException {
LOG.debug("truncate file [{}] in COS.", f);
checkPermission(f, RangerAccessType.WRITE);
return this.actualImplFS.truncate(f, newLength);
}

Expand All @@ -161,6 +191,7 @@ public FSDataOutputStream create(Path f, FsPermission permission,
long blockSize, Progressable progress)
throws IOException {
LOG.debug("Creating a new file [{}] in COS.", f);
checkPermission(f, RangerAccessType.WRITE);
return this.actualImplFS.create(f, permission, overwrite, bufferSize,
replication, blockSize, progress);
}
Expand All @@ -169,12 +200,14 @@ public FSDataOutputStream create(Path f, FsPermission permission,
@Override
public boolean delete(Path f, boolean recursive) throws IOException {
LOG.debug("Ready to delete path: {}. recursive: {}.", f, recursive);
checkPermission(f, RangerAccessType.DELETE);
return this.actualImplFS.delete(f, recursive);
}

@Override
public FileStatus getFileStatus(Path f) throws IOException {
LOG.debug("Get file status: {}.", f);
checkPermission(f, RangerAccessType.READ);
return this.actualImplFS.getFileStatus(f);
}

Expand All @@ -196,25 +229,30 @@ public URI getUri() {
@Override
public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException {
LOG.debug("list status:" + f);
checkPermission(f, RangerAccessType.LIST);
return this.actualImplFS.listStatus(f);
}

@Override
public boolean mkdirs(Path f, FsPermission permission)
throws IOException {
LOG.debug("mkdirs path: {}.", f);
checkPermission(f, RangerAccessType.WRITE);
return this.actualImplFS.mkdirs(f, permission);
}

@Override
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
LOG.debug("Open file [{}] to read, buffer [{}]", f, bufferSize);
checkPermission(f, RangerAccessType.READ);
return this.actualImplFS.open(f, bufferSize);
}

@Override
public boolean rename(Path src, Path dst) throws IOException {
LOG.debug("Rename the source path [{}] to the dest path [{}].", src, dst);
checkPermission(src, RangerAccessType.DELETE);
checkPermission(dst, RangerAccessType.WRITE);
return this.actualImplFS.rename(src, dst);
}

Expand All @@ -228,17 +266,19 @@ public long getDefaultBlockSize() {
*/
@Override
public void setWorkingDirectory(Path newDir) {
this.workingDir = newDir;
this.actualImplFS.setWorkingDirectory(newDir);
}

@Override
public Path getWorkingDirectory() {
return this.actualImplFS.getWorkingDirectory();
return this.workingDir;
}

@Override
public FileChecksum getFileChecksum(Path f, long length) throws IOException {
LOG.debug("call the checksum for the path: {}.", f);
checkPermission(f, RangerAccessType.READ);
Preconditions.checkArgument(length >= 0);
return this.actualImplFS.getFileChecksum(f, length);
}
Expand All @@ -256,6 +296,7 @@ public FileChecksum getFileChecksum(Path f, long length) throws IOException {
@Override
public void setXAttr(Path f, String name, byte[] value, EnumSet<XAttrSetFlag> flag) throws IOException {
LOG.debug("set XAttr: {}.", f);
checkPermission(f, RangerAccessType.WRITE);
this.actualImplFS.setXAttr(f, name, value, flag);
}

Expand All @@ -270,6 +311,7 @@ public void setXAttr(Path f, String name, byte[] value, EnumSet<XAttrSetFlag> fl
@Override
public byte[] getXAttr(Path f, String name) throws IOException {
LOG.debug("get XAttr: {}.", f);
checkPermission(f, RangerAccessType.READ);
return this.actualImplFS.getXAttr(f, name);
}

Expand All @@ -284,12 +326,14 @@ public byte[] getXAttr(Path f, String name) throws IOException {
@Override
public Map<String, byte[]> getXAttrs(Path f, List<String> names) throws IOException {
LOG.debug("get XAttrs: {}.", f);
checkPermission(f, RangerAccessType.READ);
return this.actualImplFS.getXAttrs(f, names);
}

@Override
public Map<String, byte[]> getXAttrs(Path f) throws IOException {
LOG.debug("get XAttrs: {}.", f);
checkPermission(f, RangerAccessType.READ);
return this.actualImplFS.getXAttrs(f);
}

Expand All @@ -303,20 +347,25 @@ public Map<String, byte[]> getXAttrs(Path f) throws IOException {
@Override
public void removeXAttr(Path f, String name) throws IOException {
LOG.debug("remove XAttr: {}.", f);
checkPermission(f, RangerAccessType.WRITE);
this.actualImplFS.removeXAttr(f, name);
}

@Override
public List<String> listXAttrs(Path f) throws IOException {
LOG.debug("list XAttrs: {}.", f);
checkPermission(f, RangerAccessType.READ);
return this.actualImplFS.listXAttrs(f);
}

@Override
public Token<?> getDelegationToken(String renewer) throws IOException {
LOG.info("getDelegationToken, renewer: {}, stack: {}",
renewer, Arrays.toString(Thread.currentThread().getStackTrace()).replace(',', '\n'));
return this.actualImplFS.getDelegationToken(renewer);
if (rangerQcloudObjectStorageStorageClient != null) {
return rangerQcloudObjectStorageStorageClient.getDelegationToken(renewer);
}
return super.getDelegationToken(renewer);
}

public NativeFileSystemStore getStore() {
Expand Down Expand Up @@ -355,6 +404,114 @@ public void releaseFileLock(Path f) throws IOException {
}
}

@Override
public String getCanonicalServiceName() {
if (rangerQcloudObjectStorageStorageClient != null) {
return rangerQcloudObjectStorageStorageClient.getCanonicalServiceName();
}
return null;
}

private void initRangerClientImpl(Configuration conf) throws IOException {
Class<?>[] cosClasses = CosNUtils.loadCosProviderClasses(
conf,
CosNConfigKeys.COSN_CREDENTIALS_PROVIDER);

if (cosClasses.length == 0) {
this.enableRangerPluginPermissionCheck = false;
return;
}

for (Class<?> credClass : cosClasses) {
if (credClass.getName().contains(RangerCredentialsProvider.class.getName())) {
this.enableRangerPluginPermissionCheck = true;
break;
}
}

if (!this.enableRangerPluginPermissionCheck) {
return;
}

Class<?> rangerClientImplClass = conf.getClass(CosNConfigKeys.COSN_RANGER_PLUGIN_CLIENT_IMPL, null);
if (rangerClientImplClass == null) {
try {
rangerClientImplClass = conf.getClassByName(CosNConfigKeys.DEFAULT_COSN_RANGER_PLUGIN_CLIENT_IMPL);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
}

if (rangerQcloudObjectStorageStorageClient == null) {
synchronized (CosFileSystem.class) {
if (rangerQcloudObjectStorageStorageClient == null) {
try {
RangerQcloudObjectStorageClient tmpClient =
(RangerQcloudObjectStorageClient) rangerClientImplClass.newInstance();
tmpClient.init(conf);
rangerQcloudObjectStorageStorageClient = tmpClient;
} catch (Exception e) {
LOG.error(String.format("init %s failed", CosNConfigKeys.COSN_RANGER_PLUGIN_CLIENT_IMPL), e);
throw new IOException(String.format("init %s failed",
CosNConfigKeys.COSN_RANGER_PLUGIN_CLIENT_IMPL), e);
}
}
}
}

}

private void checkPermission(Path f, RangerAccessType rangerAccessType) throws IOException {
if (!this.enableRangerPluginPermissionCheck) {
return;
}

AccessType accessType = null;
switch (rangerAccessType) {
case LIST:
accessType = AccessType.LIST;
break;
case WRITE:
accessType = AccessType.WRITE;
break;
case READ:
accessType = AccessType.READ;
break;
case DELETE:
accessType = AccessType.DELETE;
break;
default:
throw new IOException(String.format("unknown access type %s", rangerAccessType.toString()));
}

Path absolutePath = makeAbsolute(f);
String allowKey = CosNFileSystem.pathToKey(absolutePath);
if (allowKey.startsWith("/")) {
allowKey = allowKey.substring(1);
}

PermissionRequest permissionReq = new PermissionRequest(ServiceType.COS, accessType,
CosNUtils.getBucketNameWithoutAppid(this.bucket, this.getConf().get(CosNConfigKeys.COSN_APPID_KEY)),
allowKey, "", "");
boolean allowed = false;
PermissionResponse permission = rangerQcloudObjectStorageStorageClient.checkPermission(permissionReq);
if (permission != null) {
allowed = permission.isAllowed();
}
if (!allowed) {
throw new IOException(String.format("Permission denied, [key: %s], [user: %s], [operation: %s]",
allowKey, this.userGroupInformation.getShortUserName(), rangerAccessType.name()));
}
}

private Path makeAbsolute(Path path) {
if (path.isAbsolute()) {
return path;
}
return new Path(workingDir, path);
}


@Override
public void close() throws IOException {
LOG.info("begin to close cos file system");
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/apache/hadoop/fs/CosNConfigKeys.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
@InterfaceStability.Unstable
public class CosNConfigKeys extends CommonConfigurationKeys {
public static final String USER_AGENT = "fs.cosn.user.agent";
public static final String DEFAULT_USER_AGENT = "cos-hadoop-plugin-v8.1.0";
public static final String DEFAULT_USER_AGENT = "cos-hadoop-plugin-v8.1.1";

public static final String TENCENT_EMR_VERSION_KEY = "fs.emr.version";

Expand Down
Loading

0 comments on commit 50cceb1

Please sign in to comment.