From 8284af28434b4253f67f52ca3d04d7d2f3516d6d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?alantong=28=E4=BD=9F=E6=98=8E=E8=BE=BE=29?= Date: Tue, 15 Dec 2020 16:21:45 +0800 Subject: [PATCH] cos ranger version --- README.md | 4 +- compile.sh | 21 +++ pom.xml | 30 ++- .../java/org/apache/hadoop/fs/BufferPool.java | 3 + .../hadoop/fs/CosEncryptionMethods.java | 19 +- .../org/apache/hadoop/fs/CosFileSystem.java | 163 ++++++++++++++-- .../hadoop/fs/CosFsDataOutputStream.java | 13 +- .../apache/hadoop/fs/CosFsInputStream.java | 18 +- .../org/apache/hadoop/fs/CosNConfigKeys.java | 10 +- .../apache/hadoop/fs/CosNCopyFileTask.java | 4 +- .../apache/hadoop/fs/CosNFileReadTask.java | 7 + .../java/org/apache/hadoop/fs/CosNUtils.java | 19 +- .../java/org/apache/hadoop/fs/CosNXAttr.java | 2 +- .../hadoop/fs/CosNativeFileSystemStore.java | 176 ++++++++++-------- .../java/org/apache/hadoop/fs/CrcUtil.java | 1 + .../hadoop/fs/NativeFileSystemStore.java | 2 + .../hadoop/fs/ResettableFileInputStream.java | 2 +- .../hadoop/fs/WriteConsistencyChecker.java | 12 +- .../auth/EMRInstanceCredentialsProvider.java | 59 ++++++ .../fs/auth/NoAuthWithCOSException.java | 1 + .../fs/auth/RangerCredentialsProvider.java | 103 ++++++++++ .../auth/SessionTokenCredentialProvider.java | 49 +++++ .../fs/buffer/CosNMappedBufferFactory.java | 3 +- .../RangerQcloudObjectStorageClient.java | 42 +++++ .../security/authorization/AccessType.java | 8 + .../authorization/PermissionRequest.java | 46 +++++ .../security/authorization/ServiceType.java | 6 + .../ranger/security/sts/GetSTSRequest.java | 31 +++ .../ranger/security/sts/GetSTSResponse.java | 31 +++ 29 files changed, 763 insertions(+), 122 deletions(-) create mode 100644 compile.sh create mode 100644 src/main/java/org/apache/hadoop/fs/auth/EMRInstanceCredentialsProvider.java create mode 100644 src/main/java/org/apache/hadoop/fs/auth/RangerCredentialsProvider.java create mode 100644 src/main/java/org/apache/hadoop/fs/auth/SessionTokenCredentialProvider.java create mode 100644 src/main/java/org/apache/hadoop/fs/cosn/ranger/client/RangerQcloudObjectStorageClient.java create mode 100644 src/main/java/org/apache/hadoop/fs/cosn/ranger/security/authorization/AccessType.java create mode 100644 src/main/java/org/apache/hadoop/fs/cosn/ranger/security/authorization/PermissionRequest.java create mode 100644 src/main/java/org/apache/hadoop/fs/cosn/ranger/security/authorization/ServiceType.java create mode 100644 src/main/java/org/apache/hadoop/fs/cosn/ranger/security/sts/GetSTSRequest.java create mode 100644 src/main/java/org/apache/hadoop/fs/cosn/ranger/security/sts/GetSTSResponse.java diff --git a/README.md b/README.md index e93bb257..abae3c0e 100644 --- a/README.md +++ b/README.md @@ -198,8 +198,8 @@ done | 属性键 | 说明 | 默认值 | 必填项 | |:-----------------------------------:|:--------------------|:-----:|:---:| -|fs.defaultFS | 配置hadoop默认使用的底层文件系统,如果想使用cos作为hadoop默认文件系统,则此项应设置为cosn://bucket-appid,此时可以通过文件路径访问cos对象,如/hadoop/inputdata/test.dat。若不想把cos作为hadoop默认文件系统,则不需要修改此项,当需要访问cos上的对象时,则指定完整的uri即可,如cosn://testbucket-1252681927/hadoop/inputdata/test.dat来访问。 -|fs.cosn.credentials.provider |配置secret id和secret key的获取方式。当前支持三种获取方式:1.org.apache.hadoop.fs.auth.SessionCredentialProvider:从请求URI中获取secret id和secret key,其格式为:cosn://{secretId}:{secretKey}@examplebucket-1250000000000/; 2.org.apache.hadoop.fs.auth.SimpleCredentialProvider:从core-site.xml配置文件中读取fs.cosn.userinfo.secretId和fs.cosn.userinfo.secretKey来获取secret id和secret key; 3.org.apache.hadoop.fs.auth.EnvironmentVariableCredentialProvider:从系统环境变量COS_SECRET_ID和COS_SECRET_KEY中获取;4.org.apache.hadoop.fs.auth.CVMInstanceCredentialsProvider:利用腾讯云云服务器(CVM)绑定的角色,获取访问COS的临时密钥; 5. org.apache.hadoop.fs.auth.CPMInstanceCredentialsProvider:利用腾讯云黑石物理机(CPM)绑定的角色,获取访问COS的临时密钥。|如果不指定改配置项,默认会按照以下顺序读取:1.org.apache.hadoop.fs.auth.SessionCredentialProvider; 2.org.apache.hadoop.fs.auth.SimpleCredentialProvider;3.org.apache.hadoop.fs.auth.EnvironmentVariableCredentialProvider; 4.org.apache.hadoop.fs.auth.CVMInstanceCredentialsProvider; 5.org.apache.hadoop.fs.auth.CPMInstanceCredentialsProvider|否| +|fs.defaultFS | 配置hadoop默认使用的底层文件系统,如果想使用cos作为hadoop默认文件系统,则此项应设置为cosn://bucket-appid,此时可以通过文件路径访问cos对象,如/hadoop/inputdata/test.dat。若不想把cos作为hadoop默认文件系统,则不需要修改此项,当需要访问cos上的对象时,则指定完整的uri即可,如cosn://testbucket-1252681927/hadoop/inputdata/test.dat来访问。| +|fs.cosn.credentials.provider |配置secret id和secret key的获取方式。当前支持三种获取方式:1.org.apache.hadoop.fs.auth.SessionCredentialProvider:从请求URI中获取secret id和secret key,其格式为:cosn://{secretId}:{secretKey}@examplebucket-1250000000000/; 2.org.apache.hadoop.fs.auth.SimpleCredentialProvider:从core-site.xml配置文件中读取fs.cosn.userinfo.secretId和fs.cosn.userinfo.secretKey来获取secret id和secret key; 3.org.apache.hadoop.fs.auth.EnvironmentVariableCredentialProvider:从系统环境变量COS_SECRET_ID和COS_SECRET_KEY中获取;4.org.apache.hadoop.fs.auth.SessionTokenCredentialProvider: 设置token;5.org.apache.hadoop.fs.auth.CVMInstanceCredentialsProvider:利用腾讯云云服务器(CVM)绑定的角色,获取访问COS的临时密钥; 6. org.apache.hadoop.fs.auth.CPMInstanceCredentialsProvider:利用腾讯云黑石物理机(CPM)绑定的角色,获取访问COS的临时密钥。7. org.apache.hadoop.fs.auth.RangerCredentialsProvider 使用ranger进行获取秘钥 |如果不指定改配置项,默认会按照以下顺序读取:1.org.apache.hadoop.fs.auth.SessionCredentialProvider; 2.org.apache.hadoop.fs.auth.SimpleCredentialProvider;3.org.apache.hadoop.fs.auth.EnvironmentVariableCredentialProvider;4.org.apache.hadoop.fs.auth.SessionTokenCredentialProvider;5.org.apache.hadoop.fs.auth.CVMInstanceCredentialsProvider;6.org.apache.hadoop.fs.auth.CPMInstanceCredentialsProvider|否| |fs.cosn.useHttps|配置是否使用https协议。|false|否| |fs.cosn.bucket.endpoint_suffix|指定要连接的COS endpoint,该项为非必填项目。对于公有云COS用户而言,只需要正确填写上述的region配置即可。兼容原配置项:fs.cosn.userinfo.endpoint_suffix。|无|否| |fs.cosn.userinfo.secretId/secretKey| 填写您账户的API 密钥信息。可通过 [云 API 密钥 控制台](https://console.cloud.tencent.com/capi) 查看。| 无 | 是| diff --git a/compile.sh b/compile.sh new file mode 100644 index 00000000..e84cc24e --- /dev/null +++ b/compile.sh @@ -0,0 +1,21 @@ +#!/bin/sh + +base_dir=$(cd `dirname $0`;pwd) +cd ${base_dir} +hadoop_version_array=("2.6.5" "2.7.5" "2.8.5" "3.1.0" "3.3.0") + +origin_version=$(mvn -q -Dexec.executable="echo" -Dexec.args='${project.version}' --non-recursive exec:exec) + +for hadoop_version in ${hadoop_version_array[@]} +do + sed -i -E "s/.*<\/hadoop\.version>/${hadoop_version}<\/hadoop\.version>/g" pom.xml + mvn versions:set -DnewVersion=${hadoop_version}-${origin_version} + mvn clean verify + rm -rf dep/${hadoop_version} + mkdir -p dep/${hadoop_version} + cp target/*.jar dep/${hadoop_version}/ + cp target/*.asc dep/${hadoop_version}/ + cp target/*.pom dep/${hadoop_version}/ + + mvn versions:set -DnewVersion=${origin_version} +done \ No newline at end of file diff --git a/pom.xml b/pom.xml index df02e500..9d4df3bf 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.qcloud.cos hadoop-cos - 3.1.0-5.8.7 + 5.9.0 jar Apache Hadoop Tencent Qcloud COS Support @@ -38,9 +38,9 @@ UTF-8 - 1.8 - 1.8 - 3.1.0 + 1.7 + 1.7 + 3.3.0 5.6.32 24.1.1-jre 3.1 @@ -80,10 +80,31 @@ + + org.apache.maven.plugins + maven-source-plugin + 3.2.0 + + true + + + + compile + + jar + + + + + + org.apache.maven.plugins maven-javadoc-plugin 2.9.1 + + -Xdoclint:none + javadoc-jar @@ -126,5 +147,4 @@ - diff --git a/src/main/java/org/apache/hadoop/fs/BufferPool.java b/src/main/java/org/apache/hadoop/fs/BufferPool.java index 55e98b2b..70953200 100644 --- a/src/main/java/org/apache/hadoop/fs/BufferPool.java +++ b/src/main/java/org/apache/hadoop/fs/BufferPool.java @@ -258,6 +258,9 @@ public void returnBuffer(CosNByteBuffer buffer) } } + /** + * close + */ public synchronized void close() { LOG.info("Close a buffer pool instance."); diff --git a/src/main/java/org/apache/hadoop/fs/CosEncryptionMethods.java b/src/main/java/org/apache/hadoop/fs/CosEncryptionMethods.java index 76896e0b..81173f61 100644 --- a/src/main/java/org/apache/hadoop/fs/CosEncryptionMethods.java +++ b/src/main/java/org/apache/hadoop/fs/CosEncryptionMethods.java @@ -25,14 +25,6 @@ public String getMethod() { return method; } - /** - * Flag to indicate this is a server-side encryption option. - * @return true if this is server side. - */ - public boolean isServerSide() { - return serverSide; - } - /** * Get the encryption mechanism from the value provided. * @param name algorithm name @@ -40,7 +32,7 @@ public boolean isServerSide() { * @throws IOException if the algorithm is unknown */ public static CosEncryptionMethods getMethod(String name) throws IOException { - if(StringUtils.isNullOrEmpty(name)) { + if (StringUtils.isNullOrEmpty(name)) { return NONE; } for (CosEncryptionMethods v : values()) { @@ -50,4 +42,13 @@ public static CosEncryptionMethods getMethod(String name) throws IOException { } throw new IOException(UNKNOWN_ALGORITHM_MESSAGE + name); } + + /** + * Flag to indicate this is a server-side encryption option. + * @return true if this is server side. + */ + public boolean isServerSide() { + return serverSide; + } + } diff --git a/src/main/java/org/apache/hadoop/fs/CosFileSystem.java b/src/main/java/org/apache/hadoop/fs/CosFileSystem.java index 3eabc88c..ee5d0855 100644 --- a/src/main/java/org/apache/hadoop/fs/CosFileSystem.java +++ b/src/main/java/org/apache/hadoop/fs/CosFileSystem.java @@ -7,10 +7,17 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.auth.RangerCredentialsProvider; +import org.apache.hadoop.fs.cosn.ranger.client.RangerQcloudObjectStorageClient; +import org.apache.hadoop.fs.cosn.ranger.security.authorization.AccessType; +import org.apache.hadoop.fs.cosn.ranger.security.authorization.PermissionRequest; +import org.apache.hadoop.fs.cosn.ranger.security.authorization.ServiceType; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.io.retry.RetryProxy; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Progressable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -24,6 +31,7 @@ import java.util.*; import java.util.concurrent.*; + /** * A {@link FileSystem} for reading and writing files stored on * Tencent Qcloud Cos @@ -45,7 +53,7 @@ public class CosFileSystem extends FileSystem { static final int MAX_XATTR_SIZE = 1024; private URI uri; - String bucket; + private String bucket; private NativeFileSystemStore store; private Path workingDir; private String owner = "Unknown"; @@ -54,6 +62,11 @@ public class CosFileSystem extends FileSystem { private ExecutorService boundedIOThreadPool; private ExecutorService boundedCopyThreadPool; + private UserGroupInformation currentUser; + + private boolean enableRangerPluginPermissionCheck = false; + public static RangerQcloudObjectStorageClient rangerQcloudObjectStorageStorageClient = null; + public CosFileSystem() { } @@ -74,12 +87,20 @@ public String getScheme() { @Override public void initialize(URI uri, Configuration conf) throws IOException { super.initialize(uri, conf); + setConf(conf); + + UserGroupInformation.setConfiguration(conf); + this.currentUser = UserGroupInformation.getCurrentUser(); + + initRangerClientImpl(conf); + this.bucket = uri.getHost(); + if (this.store == null) { this.store = createDefaultStore(conf); } + this.store.initialize(uri, conf); - setConf(conf); this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); this.workingDir = new Path("/user", System.getProperty("user.name")) @@ -268,6 +289,9 @@ public FSDataOutputStream create(Path f, FsPermission permission, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException { + + checkPermission(f, AccessType.WRITE); + if (exists(f) && !overwrite) { throw new FileAlreadyExistsException("File already exists: " + f); } @@ -302,6 +326,9 @@ private boolean rejectRootDirectoryDelete(boolean isEmptyDir, @Override public boolean delete(Path f, boolean recursive) throws IOException { LOG.debug("Ready to delete path: {}. recursive: {}.", f, recursive); + + checkPermission(f, AccessType.DELETE); + FileStatus status; try { status = getFileStatus(f); @@ -437,6 +464,7 @@ public URI getUri() { @Override public FileStatus[] listStatus(Path f) throws IOException { LOG.debug("list status:" + f); + checkPermission(f, AccessType.LIST); Path absolutePath = makeAbsolute(f); String key = pathToKey(absolutePath); @@ -531,6 +559,7 @@ private void validatePath(Path path) throws IOException { public boolean mkdirs(Path f, FsPermission permission) throws IOException { LOG.debug("mkdirs path: {}.", f); + checkPermission(f, AccessType.WRITE); try { FileStatus fileStatus = getFileStatus(f); if (fileStatus.isDirectory()) { @@ -595,6 +624,8 @@ public boolean mkDirRecursively(Path f, FsPermission permission) @Override public FSDataInputStream open(Path f, int bufferSize) throws IOException { + checkPermission(f, AccessType.READ); + FileStatus fileStatus = getFileStatus(f); // will throw if the file doesn't // exist if (fileStatus.isDirectory()) { @@ -613,6 +644,9 @@ public FSDataInputStream open(Path f, int bufferSize) throws IOException { public boolean rename(Path src, Path dst) throws IOException { LOG.debug("Rename the source path [{}] to the dest path [{}].", src, dst); + checkPermission(src, AccessType.DELETE); + checkPermission(dst, AccessType.WRITE); + // Renaming the root directory is not allowed if (src.isRoot()) { LOG.debug("Cannot rename the root directory of a filesystem."); @@ -809,22 +843,77 @@ public Path getWorkingDirectory() { return workingDir; } + + private void initRangerClientImpl(Configuration conf) throws IOException { + Class[] cosClasses = CosNUtils.loadCosProviderClasses( + conf, + CosNConfigKeys.COSN_CREDENTIALS_PROVIDER); + + if (cosClasses.length == 0) { + this.enableRangerPluginPermissionCheck = false; + return; + } + + for (Class credClass : cosClasses) { + if (credClass.getName().contains(RangerCredentialsProvider.class.getName())) { + this.enableRangerPluginPermissionCheck = true; + break; + } + } + + if (!this.enableRangerPluginPermissionCheck) { + return; + } + + Class rangerClientImplClass = conf.getClass(CosNConfigKeys.COSN_RANGER_PLUGIN_CLIENT_IMPL, null); + if (rangerClientImplClass == null) { + try { + rangerClientImplClass = conf.getClassByName(CosNConfigKeys.DEFAULT_COSN_RANGER_PLUGIN_CLIENT_IMPL); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + } + + if (rangerQcloudObjectStorageStorageClient == null) { + synchronized (CosFileSystem.class) { + if (rangerQcloudObjectStorageStorageClient == null) { + try { + RangerQcloudObjectStorageClient tmpClient = + (RangerQcloudObjectStorageClient) rangerClientImplClass.newInstance(); + tmpClient.init(conf); + rangerQcloudObjectStorageStorageClient = tmpClient; + } catch (Exception e) { + LOG.error(String.format("init %s failed", CosNConfigKeys.COSN_RANGER_PLUGIN_CLIENT_IMPL), e); + throw new IOException(String.format("init %s failed", + CosNConfigKeys.COSN_RANGER_PLUGIN_CLIENT_IMPL), e); + } + } + } + } + + } + @Override public String getCanonicalServiceName() { - // Does not support Token + if (rangerQcloudObjectStorageStorageClient != null) { + return rangerQcloudObjectStorageStorageClient.getCanonicalServiceName(); + } return null; } @Override public FileChecksum getFileChecksum(Path f, long length) throws IOException { Preconditions.checkArgument(length >= 0); - LOG.debug("Call the checksum for the path: {}.", f); + LOG.debug("call the checksum for the path: {}.", f); // The order of each file, must support both crc at same time, how to tell the difference crc request? + checkPermission(f, AccessType.READ); + if (this.getConf().getBoolean(CosNConfigKeys.CRC64_CHECKSUM_ENABLED, CosNConfigKeys.DEFAULT_CRC64_CHECKSUM_ENABLED)) { Path absolutePath = makeAbsolute(f); String key = pathToKey(absolutePath); + FileMetadata fileMetadata = this.store.retrieveMetadata(key); if (null == fileMetadata) { throw new FileNotFoundException("File or directory doesn't exist: " + f); @@ -847,6 +936,7 @@ public FileChecksum getFileChecksum(Path f, long length) throws IOException { } } + /** * Set the value of an attribute for a path * @@ -860,10 +950,12 @@ public FileChecksum getFileChecksum(Path f, long length) throws IOException { public void setXAttr(Path f, String name, byte[] value, EnumSet flag) throws IOException { LOG.debug("set XAttr: {}.", f); + checkPermission(f, AccessType.WRITE); + // First, determine whether the length of the name and value exceeds the limit. if (name.getBytes(METADATA_ENCODING).length + value.length > MAX_XATTR_SIZE) { throw new HadoopIllegalArgumentException(String.format("The maximum combined size of " + - "the name and value of an extended attribute in bytes should be less than or equal to %d", + "the name and value of an extended attribute in bytes should be less than or equal to %d", MAX_XATTR_SIZE)); } @@ -896,6 +988,8 @@ public void setXAttr(Path f, String name, byte[] value, EnumSet fl public byte[] getXAttr(Path f, String name) throws IOException { LOG.debug("get XAttr: {}.", f); + checkPermission(f, AccessType.READ); + Path absolutePath = makeAbsolute(f); String key = pathToKey(absolutePath); @@ -923,6 +1017,8 @@ public byte[] getXAttr(Path f, String name) throws IOException { public Map getXAttrs(Path f, List names) throws IOException { LOG.debug("get XAttrs: {}.", f); + checkPermission(f, AccessType.READ); + Path absolutePath = makeAbsolute(f); String key = pathToKey(absolutePath); @@ -944,6 +1040,23 @@ public Map getXAttrs(Path f, List names) throws IOExcept return attrs; } + @Override + public Map getXAttrs(Path f) throws IOException { + LOG.debug("get XAttrs: {}.", f); + + checkPermission(f, AccessType.READ); + + Path absolutePath = makeAbsolute(f); + + String key = pathToKey(absolutePath); + FileMetadata fileMetadata = store.retrieveMetadata(key); + if (null == fileMetadata) { + throw new FileNotFoundException("File or directory doesn't exist: " + f); + } + + return fileMetadata.getUserAttributes(); + } + /** * Removes an xattr of a cosn file or directory. * @@ -955,6 +1068,8 @@ public Map getXAttrs(Path f, List names) throws IOExcept public void removeXAttr(Path f, String name) throws IOException { LOG.debug("remove XAttr: {}.", f); + checkPermission(f, AccessType.WRITE); + Path absolutPath = makeAbsolute(f); String key = pathToKey(absolutPath); @@ -977,8 +1092,10 @@ public void removeXAttr(Path f, String name) throws IOException { } @Override - public Map getXAttrs(Path f) throws IOException { - LOG.debug("get XAttrs: {}.", f); + public List listXAttrs(Path f) throws IOException { + LOG.debug("list XAttrs: {}.", f); + + checkPermission(f, AccessType.READ); Path absolutePath = makeAbsolute(f); @@ -988,22 +1105,36 @@ public Map getXAttrs(Path f) throws IOException { throw new FileNotFoundException("File or directory doesn't exist: " + f); } - return fileMetadata.getUserAttributes(); + return new ArrayList<>(fileMetadata.getUserAttributes().keySet()); } @Override - public List listXAttrs(Path f) throws IOException { - LOG.debug("list XAttrs: {}.", f); + public Token getDelegationToken(String renewer) throws IOException { + if (rangerQcloudObjectStorageStorageClient != null) { + return rangerQcloudObjectStorageStorageClient.getDelegationToken(renewer); + } + return super.getDelegationToken(renewer); + } - Path absolutePath = makeAbsolute(f); - String key = pathToKey(absolutePath); - FileMetadata fileMetadata = store.retrieveMetadata(key); - if (null == fileMetadata) { - throw new FileNotFoundException("File or directory doesn't exist: " + f); + private void checkPermission(Path f, AccessType accessType) throws IOException { + if (!this.enableRangerPluginPermissionCheck) { + return; + } + Path absolutePath = makeAbsolute(f); + String allowKey = pathToKey(absolutePath); + if (allowKey.startsWith("/")) { + allowKey = allowKey.substring(1); } - return new ArrayList<>(fileMetadata.getUserAttributes().keySet()); + + PermissionRequest permissionReq = new PermissionRequest(ServiceType.COS, accessType, + this.bucket, allowKey, "", ""); + boolean allowed = rangerQcloudObjectStorageStorageClient.checkPermission(permissionReq); + if (!allowed) { + throw new IOException(String.format("Permission denied, [key: %s], [user: %s], [operation: %s]", + allowKey, currentUser.getShortUserName(), accessType.name())); + } } @Override diff --git a/src/main/java/org/apache/hadoop/fs/CosFsDataOutputStream.java b/src/main/java/org/apache/hadoop/fs/CosFsDataOutputStream.java index f3faee07..5a380176 100644 --- a/src/main/java/org/apache/hadoop/fs/CosFsDataOutputStream.java +++ b/src/main/java/org/apache/hadoop/fs/CosFsDataOutputStream.java @@ -41,6 +41,17 @@ public class CosFsDataOutputStream extends OutputStream { private WriteConsistencyChecker writeConsistencyChecker = null; private boolean closed = false; + /** + * Output stream + * + * @param conf config + * @param store native file system + * @param key cos key + * @param blockSize block config size + * @param executorService thread executor + * @param checksEnabled check flag + * @throws IOException + */ public CosFsDataOutputStream( Configuration conf, NativeFileSystemStore store, @@ -207,7 +218,7 @@ private void uploadPart() throws IOException { this.currentBlockId++; LOG.debug("upload part blockId: {}, uploadId: {}.", this.currentBlockId, this.uploadId); - byte[] md5Hash = this.digest == null ? null : this.digest.digest(); + final byte[] md5Hash = this.digest == null ? null : this.digest.digest(); ListenableFuture partETagListenableFuture = this.executorService.submit(new Callable() { private final CosNByteBuffer buffer = currentBlockBuffer; diff --git a/src/main/java/org/apache/hadoop/fs/CosFsInputStream.java b/src/main/java/org/apache/hadoop/fs/CosFsInputStream.java index 9b31fa16..8ae54ce0 100644 --- a/src/main/java/org/apache/hadoop/fs/CosFsInputStream.java +++ b/src/main/java/org/apache/hadoop/fs/CosFsInputStream.java @@ -87,7 +87,7 @@ public long getEnd() { private long partRemaining; private long bufferStart; private long bufferEnd; - private final long PreReadPartSize; + private final long preReadPartSize; private final int maxReadPartNumber; private byte[] buffer; private boolean closed = false; @@ -95,6 +95,16 @@ public long getEnd() { private final ExecutorService readAheadExecutorService; private final Queue readBufferQueue; + /** + * Input Stream + * + * @param conf config + * @param store native file system + * @param statistics statis + * @param key cos key + * @param fileSize file size + * @param readAheadExecutorService thread executor + */ public CosFsInputStream( Configuration conf, NativeFileSystemStore store, @@ -110,7 +120,7 @@ public CosFsInputStream( this.fileSize = fileSize; this.bufferStart = -1; this.bufferEnd = -1; - this.PreReadPartSize = conf.getLong( + this.preReadPartSize = conf.getLong( CosNConfigKeys.READ_AHEAD_BLOCK_SIZE_KEY, CosNConfigKeys.DEFAULT_READ_AHEAD_BLOCK_SIZE); this.maxReadPartNumber = conf.getInt( @@ -130,10 +140,10 @@ private synchronized void reopen(long pos) throws IOException { } else if (pos > this.fileSize) { throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF); } else { - if (pos + this.PreReadPartSize > this.fileSize) { + if (pos + this.preReadPartSize > this.fileSize) { partSize = this.fileSize - pos; } else { - partSize = this.PreReadPartSize; + partSize = this.preReadPartSize; } } diff --git a/src/main/java/org/apache/hadoop/fs/CosNConfigKeys.java b/src/main/java/org/apache/hadoop/fs/CosNConfigKeys.java index 79daaa3e..11de942d 100644 --- a/src/main/java/org/apache/hadoop/fs/CosNConfigKeys.java +++ b/src/main/java/org/apache/hadoop/fs/CosNConfigKeys.java @@ -20,6 +20,7 @@ public class CosNConfigKeys extends CommonConfigurationKeys { public static final String COSN_APPID_KEY = "fs.cosn.userinfo.appid"; public static final String COSN_USERINFO_SECRET_ID_KEY = "fs.cosn.userinfo.secretId"; public static final String COSN_USERINFO_SECRET_KEY_KEY = "fs.cosn.userinfo.secretKey"; + public static final String COSN_USERINFO_SESSION_TOKEN = "fs.cosn.userinfo.sessionToken"; public static final String COSN_REGION_KEY = "fs.cosn.bucket.region"; public static final String COSN_REGION_PREV_KEY = "fs.cosn.userinfo.region"; public static final String COSN_ENDPOINT_SUFFIX_KEY = "fs.cosn.bucket.endpoint_suffix"; @@ -76,7 +77,7 @@ public class CosNConfigKeys extends CommonConfigurationKeys { public static final String COSN_SERVER_SIDE_ENCRYPTION_ALGORITHM = "fs.cosn.server-side-encryption.algorithm"; public static final String COSN_SERVER_SIDE_ENCRYPTION_KEY = "fs.cosn.server-side-encryption.key"; - public static final String BASE64_Pattern = "^([A-Za-z0-9+/]{4})*([A-Za-z0-9+/]{4}|[A-Za-z0-9+/]{3}=|[A-Za-z0-9" + + public static final String BASE64_PATTERN = "^([A-Za-z0-9+/]{4})*([A-Za-z0-9+/]{4}|[A-Za-z0-9+/]{3}=|[A-Za-z0-9" + "+/]{2}==)$"; // traffic limit @@ -97,4 +98,11 @@ public class CosNConfigKeys extends CommonConfigurationKeys { public static final int DEFAULT_HTTP_PROXY_PORT = -1; public static final String HTTP_PROXY_USERNAME = "fs.cosn.http.proxy.username"; public static final String HTTP_PROXY_PASSWORD = "fs.cosn.http.proxy.password"; + + public static final String COSN_RANGER_TEMP_TOKEN_REFRESH_INTERVAL = "fs.cosn.ranger.temp.token.refresh.interval"; + public static final int DEFAULT_COSN_RANGER_TEMP_TOKEN_REFRESH_INTERVAL = 20; + + public static final String COSN_RANGER_PLUGIN_CLIENT_IMPL = "fs.cosn.ranger.plugin.client.impl"; + public static final String DEFAULT_COSN_RANGER_PLUGIN_CLIENT_IMPL = + "org.apache.hadoop.fs.cosn.ranger.client.RangerQcloudObjectStorageClientImpl"; } diff --git a/src/main/java/org/apache/hadoop/fs/CosNCopyFileTask.java b/src/main/java/org/apache/hadoop/fs/CosNCopyFileTask.java index 64a4706c..13ddeb6f 100644 --- a/src/main/java/org/apache/hadoop/fs/CosNCopyFileTask.java +++ b/src/main/java/org/apache/hadoop/fs/CosNCopyFileTask.java @@ -29,8 +29,8 @@ public void run() { try { this.store.copy(srcKey, dstKey); } catch (IOException e) { - LOG.warn("Exception thrown when copy from {} to {}, exception:{}" - , this.srcKey, this.dstKey, e); + LOG.warn("Exception thrown when copy from {} to {}, exception:{}", + this.srcKey, this.dstKey, e); fail = true; } finally { this.cosCopyFileContext.lock(); diff --git a/src/main/java/org/apache/hadoop/fs/CosNFileReadTask.java b/src/main/java/org/apache/hadoop/fs/CosNFileReadTask.java index 805c1420..fd53bcb4 100644 --- a/src/main/java/org/apache/hadoop/fs/CosNFileReadTask.java +++ b/src/main/java/org/apache/hadoop/fs/CosNFileReadTask.java @@ -22,6 +22,13 @@ public class CosNFileReadTask implements Runnable { private RetryPolicy retryPolicy = null; + /** + * cos file read task + * @param conf config + * @param key cos key + * @param store native file system + * @param readBuffer read buffer + */ public CosNFileReadTask(Configuration conf, String key, NativeFileSystemStore store, CosFsInputStream.ReadBuffer readBuffer) { diff --git a/src/main/java/org/apache/hadoop/fs/CosNUtils.java b/src/main/java/org/apache/hadoop/fs/CosNUtils.java index 7de88910..f2126281 100644 --- a/src/main/java/org/apache/hadoop/fs/CosNUtils.java +++ b/src/main/java/org/apache/hadoop/fs/CosNUtils.java @@ -25,6 +25,13 @@ public final class CosNUtils { private CosNUtils() { } + /** + * create cos cred + * @param uri cos uri + * @param conf config + * @return provider list + * @throws IOException + */ public static COSCredentialProviderList createCosCredentialsProviderSet( URI uri, Configuration conf) throws IOException { @@ -39,6 +46,7 @@ public static COSCredentialProviderList createCosCredentialsProviderSet( conf)); credentialProviderList.add(new SimpleCredentialProvider(uri, conf)); credentialProviderList.add(new EnvironmentVariableCredentialProvider(uri, conf)); + credentialProviderList.add(new SessionTokenCredentialProvider(uri,conf)); credentialProviderList.add(new CVMInstanceCredentialsProvider(uri, conf)); credentialProviderList.add(new CPMInstanceCredentialsProvider(uri, conf)); } else { @@ -64,6 +72,14 @@ public static Class[] loadCosProviderClasses( } } + /** + * create cos cred + * @param uri cos uri + * @param conf config + * @param credClass cred class + * @return provider + * @throws IOException + */ public static COSCredentialsProvider createCOSCredentialProvider( URI uri, Configuration conf, @@ -134,8 +150,7 @@ private static Constructor getConstructor(Class cl, Class... args) { try { Constructor constructor = cl.getDeclaredConstructor(args); - return Modifier.isPublic(constructor.getModifiers()) ? - constructor : null; + return Modifier.isPublic(constructor.getModifiers()) ? constructor : null; } catch (NoSuchMethodException e) { return null; } diff --git a/src/main/java/org/apache/hadoop/fs/CosNXAttr.java b/src/main/java/org/apache/hadoop/fs/CosNXAttr.java index e867c567..1842f4ba 100644 --- a/src/main/java/org/apache/hadoop/fs/CosNXAttr.java +++ b/src/main/java/org/apache/hadoop/fs/CosNXAttr.java @@ -21,4 +21,4 @@ public String getValue() { public void setValue(String value) { this.value = value; } -} +} \ No newline at end of file diff --git a/src/main/java/org/apache/hadoop/fs/CosNativeFileSystemStore.java b/src/main/java/org/apache/hadoop/fs/CosNativeFileSystemStore.java index ab097759..e36089fa 100644 --- a/src/main/java/org/apache/hadoop/fs/CosNativeFileSystemStore.java +++ b/src/main/java/org/apache/hadoop/fs/CosNativeFileSystemStore.java @@ -19,6 +19,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.OverridingMethodsMustInvokeSuper; import java.io.*; import java.net.URI; import java.nio.charset.StandardCharsets; @@ -59,13 +60,13 @@ private void initCOSClient(URI uri, Configuration conf) throws IOException { if (null == region) { region = conf.get(CosNConfigKeys.COSN_REGION_PREV_KEY); } - String endpoint_suffix = + String endpointSuffix = conf.get(CosNConfigKeys.COSN_ENDPOINT_SUFFIX_KEY); - if (null == endpoint_suffix) { - endpoint_suffix = + if (null == endpointSuffix) { + endpointSuffix = conf.get(CosNConfigKeys.COSN_ENDPOINT_SUFFIX_PREV_KEY); } - if (null == region && null == endpoint_suffix) { + if (null == region && null == endpointSuffix) { String exceptionMsg = String.format("config '%s' and '%s' specify at least one.", CosNConfigKeys.COSN_REGION_KEY, CosNConfigKeys.COSN_ENDPOINT_SUFFIX_KEY); @@ -73,7 +74,7 @@ private void initCOSClient(URI uri, Configuration conf) throws IOException { } if (null == region) { config = new ClientConfig(new Region("")); - config.setEndPointSuffix(endpoint_suffix); + config.setEndPointSuffix(endpointSuffix); } else { config = new ClientConfig(new Region(region)); } @@ -150,13 +151,13 @@ private void initCOSClient(URI uri, Configuration conf) throws IOException { CosNConfigKeys.DEFAULT_MAX_CONNECTION_NUM)); // 设置是否进行服务器端加密 - String ServerSideEncryptionAlgorithm = conf.get(CosNConfigKeys.COSN_SERVER_SIDE_ENCRYPTION_ALGORITHM, ""); - CosEncryptionMethods CosSSE = CosEncryptionMethods.getMethod( - ServerSideEncryptionAlgorithm); - String SSEKey = conf.get( + String serverSideEncryptionAlgorithm = conf.get(CosNConfigKeys.COSN_SERVER_SIDE_ENCRYPTION_ALGORITHM, ""); + CosEncryptionMethods cosSSE = CosEncryptionMethods.getMethod( + serverSideEncryptionAlgorithm); + String sseKey = conf.get( CosNConfigKeys.COSN_SERVER_SIDE_ENCRYPTION_KEY, ""); - CheckEncryptionMethod(config, CosSSE, SSEKey); - this.encryptionSecrets = new CosEncryptionSecrets(CosSSE, SSEKey); + checkEncryptionMethod(config, cosSSE, sseKey); + this.encryptionSecrets = new CosEncryptionSecrets(cosSSE, sseKey); // Set the traffic limit this.trafficLimit = conf.getInt(CosNConfigKeys.TRAFFIC_LIMIT, CosNConfigKeys.DEFAULT_TRAFFIC_LIMIT); @@ -229,7 +230,7 @@ private void storeFileWithRetry(String key, InputStream inputStream, int statusCode = cse.getStatusCode(); if (statusCode == 409) { // Check一下这个文件是否已经存在 - FileMetadata fileMetadata = this.QueryObjectMetadata(key); + FileMetadata fileMetadata = this.queryObjectMetadata(key); if (null == fileMetadata) { // 如果文件不存在,则需要抛出异常 handleException(cse, key); @@ -249,7 +250,8 @@ private void storeFileWithRetry(String key, InputStream inputStream, @Override public void storeFile(String key, File file, byte[] md5Hash) throws IOException { if (null != md5Hash) { - LOG.debug("Store the file, local path: {}, length: {}, md5hash: {}.", file.getCanonicalPath(), file.length(), + LOG.debug("Store the file, local path: {}, length: {}, md5hash: {}.", + file.getCanonicalPath(), file.length(), Hex.encodeHexString(md5Hash)); } storeFileWithRetry(key, @@ -257,8 +259,8 @@ public void storeFile(String key, File file, byte[] md5Hash) throws IOException } @Override - public void storeFile(String key, InputStream inputStream, byte[] md5Hash - , long contentLength) throws IOException { + public void storeFile(String key, InputStream inputStream, + byte[] md5Hash, long contentLength) throws IOException { if (null != md5Hash) { LOG.debug("Store the file to the cos key: {}, input stream md5 hash: {}, content length: {}.", key, Hex.encodeHexString(md5Hash), @@ -292,7 +294,7 @@ public void storeEmptyFile(String key) throws IOException { int statusCode = cse.getStatusCode(); if (statusCode == 409) { // 并发上传文件导致,再check一遍文件是否存在 - FileMetadata fileMetadata = this.QueryObjectMetadata(key); + FileMetadata fileMetadata = this.queryObjectMetadata(key); if (null == fileMetadata) { // 文件还是不存在,必须要抛出异常 handleException(cse, key); @@ -374,6 +376,12 @@ public void abortMultipartUpload(String key, String uploadId) throws IOException } } + /** + * get cos upload Id + * @param key cos key + * @return uploadId + * @throws IOException + */ public String getUploadId(String key) throws IOException { if (null == key || key.length() == 0) { return ""; @@ -405,6 +413,14 @@ public String getUploadId(String key) throws IOException { return null; } + /** + * complete cos mpu + * @param key cos key + * @param uploadId upload id + * @param partETagList each part etag list + * @return result + * @throws IOException + */ public CompleteMultipartUploadResult completeMultipartUpload( String key, String uploadId, List partETagList) throws IOException { Collections.sort(partETagList, new Comparator() { @@ -431,7 +447,7 @@ public int compare(PartETag o1, PartETag o2) { return null; } - private FileMetadata QueryObjectMetadata(String key) throws IOException { + private FileMetadata queryObjectMetadata(String key) throws IOException { LOG.debug("Query Object metadata. cos key: {}.", key); GetObjectMetadataRequest getObjectMetadataRequest = new GetObjectMetadataRequest(bucketName, key); @@ -456,7 +472,8 @@ private FileMetadata QueryObjectMetadata(String key) throws IOException { userMetadata = new HashMap<>(); for (Map.Entry userMetadataEntry : objectMetadata.getUserMetadata().entrySet()) { if (userMetadataEntry.getKey().startsWith(ensureValidAttributeName(XATTR_PREFIX))) { - String xAttrJsonStr = new String(Base64.decode(userMetadataEntry.getValue()), StandardCharsets.UTF_8); + String xAttrJsonStr = new String(Base64.decode(userMetadataEntry.getValue()), + StandardCharsets.UTF_8); CosNXAttr cosNXAttr = null; try { cosNXAttr = Jackson.fromJsonString(xAttrJsonStr, CosNXAttr.class); @@ -473,9 +490,9 @@ private FileMetadata QueryObjectMetadata(String key) throws IOException { } } FileMetadata fileMetadata = - new FileMetadata(key, fileSize, mtime, - !key.endsWith(PATH_DELIMITER), ETag, crc64ecm, crc32cm, versionId, objectMetadata.getStorageClass(), - userMetadata); + new FileMetadata(key, fileSize, mtime, !key.endsWith(PATH_DELIMITER), + ETag, crc64ecm, crc32cm, versionId, + objectMetadata.getStorageClass(), userMetadata); LOG.debug("Retrieve the file metadata. cos key: {}, ETag:{}, length:{}, crc64ecm: {}.", key, objectMetadata.getETag(), objectMetadata.getContentLength(), objectMetadata.getCrc64Ecma()); return fileMetadata; @@ -497,14 +514,14 @@ public FileMetadata retrieveMetadata(String key) throws IOException { } if (!key.isEmpty()) { - FileMetadata fileMetadata = QueryObjectMetadata(key); + FileMetadata fileMetadata = queryObjectMetadata(key); if (fileMetadata != null) { return fileMetadata; } } // judge if the key is directory key = key + PATH_DELIMITER; - return QueryObjectMetadata(key); + return queryObjectMetadata(key); } @Override @@ -633,6 +650,8 @@ private void storeAttribute(String key, String attribute, byte[] value, boolean } /** + * retrieve cos key + * * @param key The key is the object name that is being retrieved from the * cos bucket. * @return This method returns null if the key is not found. @@ -729,6 +748,37 @@ public InputStream retrieveBlock(String key, long byteRangeStart, return null; } + @Override + public boolean retrieveBlock(String key, long byteRangeStart, + long blockSize, + String localBlockPath) throws IOException { + long fileSize = getFileLength(key); + long byteRangeEnd = 0; + try { + GetObjectRequest request = new GetObjectRequest(this.bucketName, + key); + if (this.trafficLimit >= 0) { + request.setTrafficLimit(this.trafficLimit); + } + this.setEncryptionMetadata(request, new ObjectMetadata()); + if (fileSize > 0) { + byteRangeEnd = Math.min(fileSize - 1, + byteRangeStart + blockSize - 1); + request.setRange(byteRangeStart, byteRangeEnd); + } + cosClient.getObject(request, new File(localBlockPath)); + return true; + } catch (Exception e) { + String errMsg = + String.format("Retrieving block key [%s] with range [%d - %d] " + + "occurs an exception: %s", + key, byteRangeStart, byteRangeEnd, e.getMessage()); + handleException(new Exception(errMsg), key); + return false; // never will get here + } + } + + @Override public PartialListing list(String prefix, int maxListingLength) throws IOException { return list(prefix, maxListingLength, null, false); @@ -739,8 +789,7 @@ public PartialListing list(String prefix, int maxListingLength, String priorLastKey, boolean recurse) throws IOException { - return list(prefix, recurse ? null : PATH_DELIMITER, maxListingLength - , priorLastKey); + return list(prefix, recurse ? null : PATH_DELIMITER, maxListingLength, priorLastKey); } /** @@ -811,10 +860,13 @@ private PartialListing list(String prefix, String delimiter, long fileLen = cosObjectSummary.getSize(); String fileEtag = cosObjectSummary.getETag(); if (cosObjectSummary.getKey().endsWith(PATH_DELIMITER) && cosObjectSummary.getSize() == 0) { - fileMetadataArray.add(new FileMetadata(filePath, fileLen, mtime, false, fileEtag, null, null, null, cosObjectSummary.getStorageClass())); + fileMetadataArray.add(new FileMetadata(filePath, fileLen, mtime, false, + fileEtag, null, null, null, + cosObjectSummary.getStorageClass())); } else { fileMetadataArray.add(new FileMetadata(filePath, fileLen, mtime, - true, fileEtag, null, null, null, cosObjectSummary.getStorageClass())); + true, fileEtag, null, null, null, + cosObjectSummary.getStorageClass())); } } List commonPrefixes = objectListing.getCommonPrefixes(); @@ -858,6 +910,12 @@ public void delete(String key) throws IOException { } } + /** + * rename operation + * @param srcKey src cos key + * @param dstKey dst cos key + * @throws IOException + */ public void rename(String srcKey, String dstKey) throws IOException { LOG.debug("Rename the source cos key [{}] to the dest cos key [{}].", srcKey, dstKey); try { @@ -948,36 +1006,6 @@ private void handleException(Exception e, String key) throws IOException { throw new IOException(exceptInfo); } - @Override - public boolean retrieveBlock(String key, long byteRangeStart, - long blockSize, - String localBlockPath) throws IOException { - long fileSize = getFileLength(key); - long byteRangeEnd = 0; - try { - GetObjectRequest request = new GetObjectRequest(this.bucketName, - key); - if (this.trafficLimit >= 0) { - request.setTrafficLimit(this.trafficLimit); - } - this.setEncryptionMetadata(request, new ObjectMetadata()); - if (fileSize > 0) { - byteRangeEnd = Math.min(fileSize - 1, - byteRangeStart + blockSize - 1); - request.setRange(byteRangeStart, byteRangeEnd); - } - cosClient.getObject(request, new File(localBlockPath)); - return true; - } catch (Exception e) { - String errMsg = - String.format("Retrieving block key [%s] with range [%d - %d] " + - "occurs an exception: %s", - key, byteRangeStart, byteRangeEnd, e.getMessage()); - handleException(new Exception(errMsg), key); - return false; // never will get here - } - } - @Override public long getFileLength(String key) throws IOException { GetObjectMetadataRequest getObjectMetadataRequest = @@ -1018,21 +1046,21 @@ private void callCOSClientWithSSECOS(X request, ObjectMetadata objectMetadat } } - private void callCOSClientWithSSEC(X request, SSECustomerKey SSEKey) { + private void callCOSClientWithSSEC(X request, SSECustomerKey sseKey) { try { if (request instanceof PutObjectRequest) { - ((PutObjectRequest) request).setSSECustomerKey(SSEKey); + ((PutObjectRequest) request).setSSECustomerKey(sseKey); } else if (request instanceof UploadPartRequest) { - ((UploadPartRequest) request).setSSECustomerKey(SSEKey); + ((UploadPartRequest) request).setSSECustomerKey(sseKey); } else if (request instanceof GetObjectMetadataRequest) { - ((GetObjectMetadataRequest) request).setSSECustomerKey(SSEKey); + ((GetObjectMetadataRequest) request).setSSECustomerKey(sseKey); } else if (request instanceof CopyObjectRequest) { - ((CopyObjectRequest) request).setDestinationSSECustomerKey(SSEKey); - ((CopyObjectRequest) request).setSourceSSECustomerKey(SSEKey); + ((CopyObjectRequest) request).setDestinationSSECustomerKey(sseKey); + ((CopyObjectRequest) request).setSourceSSECustomerKey(sseKey); } else if (request instanceof GetObjectRequest) { - ((GetObjectRequest) request).setSSECustomerKey(SSEKey); + ((GetObjectRequest) request).setSSECustomerKey(sseKey); } else if (request instanceof InitiateMultipartUploadRequest) { - ((InitiateMultipartUploadRequest) request).setSSECustomerKey(SSEKey); + ((InitiateMultipartUploadRequest) request).setSSECustomerKey(sseKey); } else { throw new IOException("Set SSE_C request no such method"); } @@ -1058,12 +1086,12 @@ private void setEncryptionMetadata(X request, ObjectMetadata objectMetadata) } } - private void CheckEncryptionMethod(ClientConfig config, - CosEncryptionMethods CosSSE, String SSEKey) throws IOException { - int sseKeyLen = StringUtils.isNullOrEmpty(SSEKey) ? 0 : SSEKey.length(); + private void checkEncryptionMethod(ClientConfig config, + CosEncryptionMethods cosSSE, String sseKey) throws IOException { + int sseKeyLen = StringUtils.isNullOrEmpty(sseKey) ? 0 : sseKey.length(); String description = "Encryption key:"; - if (SSEKey == null) { + if (sseKey == null) { description += "null "; } else { switch (sseKeyLen) { @@ -1075,17 +1103,17 @@ private void CheckEncryptionMethod(ClientConfig config, break; default: description = description + " of length " + sseKeyLen + " ending with " - + SSEKey.charAt(sseKeyLen - 1); + + sseKey.charAt(sseKeyLen - 1); } } - switch (CosSSE) { + switch (cosSSE) { case SSE_C: LOG.debug("Using SSE_C with {}", description); config.setHttpProtocol(HttpProtocol.https); if (sseKeyLen == 0) { throw new IOException("missing encryption key for SSE_C "); - } else if (!SSEKey.matches(CosNConfigKeys.BASE64_Pattern)) { + } else if (!sseKey.matches(CosNConfigKeys.BASE64_PATTERN)) { throw new IOException("encryption key need to Base64 encoding for SSE_C "); } break; @@ -1161,8 +1189,8 @@ private Object callCOSClientWithRetry(X request) throws CosServiceException, String errorCode = cse.getErrorCode(); LOG.debug("fail to retry statusCode {}, errorCode {}", statusCode, errorCode); // 对5xx错误进行重试 - if (request instanceof CopyObjectRequest && statusCode / 100 == 2 && - errorCode != null && !errorCode.isEmpty()) { + if (request instanceof CopyObjectRequest && statusCode / 100 == 2 + && errorCode != null && !errorCode.isEmpty()) { if (retryIndex <= this.maxRetryTimes) { LOG.info(errMsg, cse); ++retryIndex; diff --git a/src/main/java/org/apache/hadoop/fs/CrcUtil.java b/src/main/java/org/apache/hadoop/fs/CrcUtil.java index 7065539b..d3d6f603 100644 --- a/src/main/java/org/apache/hadoop/fs/CrcUtil.java +++ b/src/main/java/org/apache/hadoop/fs/CrcUtil.java @@ -7,6 +7,7 @@ private CrcUtil() { } /** + * int turn to bytes * @return 4-byte array holding the big-endian representation of * {@code value}. */ diff --git a/src/main/java/org/apache/hadoop/fs/NativeFileSystemStore.java b/src/main/java/org/apache/hadoop/fs/NativeFileSystemStore.java index 4ca8ecf1..b4cf7475 100644 --- a/src/main/java/org/apache/hadoop/fs/NativeFileSystemStore.java +++ b/src/main/java/org/apache/hadoop/fs/NativeFileSystemStore.java @@ -48,9 +48,11 @@ PartETag uploadPart(InputStream inputStream, String key, String uploadId, byte[] retrieveAttribute(String key, String attribute) throws IOException; void storeDirAttribute(String key, String attribute, byte[] value) throws IOException; + void storeFileAttribute(String key, String attribute, byte[] value) throws IOException; void removeDirAttribute(String key, String attribute) throws IOException; + void removeFileAttribute(String key, String attribute) throws IOException; InputStream retrieve(String key) throws IOException; diff --git a/src/main/java/org/apache/hadoop/fs/ResettableFileInputStream.java b/src/main/java/org/apache/hadoop/fs/ResettableFileInputStream.java index a6d29b60..26cc1bf5 100644 --- a/src/main/java/org/apache/hadoop/fs/ResettableFileInputStream.java +++ b/src/main/java/org/apache/hadoop/fs/ResettableFileInputStream.java @@ -96,4 +96,4 @@ public long skip(final long count) position += count; return inputStream.skip(count); } -} +} \ No newline at end of file diff --git a/src/main/java/org/apache/hadoop/fs/WriteConsistencyChecker.java b/src/main/java/org/apache/hadoop/fs/WriteConsistencyChecker.java index d0cc76ed..91956e43 100644 --- a/src/main/java/org/apache/hadoop/fs/WriteConsistencyChecker.java +++ b/src/main/java/org/apache/hadoop/fs/WriteConsistencyChecker.java @@ -42,6 +42,10 @@ public String getKey() { return key; } + /** + * judge the success operation + * @return whether success + */ public boolean isSucceeded() { if (this.expectedLength < 0 && this.realLength < 0) { // The expected length and the real length are invalid. @@ -74,6 +78,10 @@ public long getRealLength() { return realLength; } + /** + * get description string + * @return description + */ public String getDescription() { if (!this.description.isEmpty()) { return this.description; @@ -99,8 +107,8 @@ public String getDescription() { // The expected length and real length is valid. if (this.expectedLength == this.realLength) { - this.description = String.format("File verification succeeded. expected length: %d, real length: %d" - , this.expectedLength, this.realLength); + this.description = String.format("File verification succeeded. expected length: %d, real length: %d" , + this.expectedLength, this.realLength); } else { this.description = String.format("File verification failure. expected length: %d, real length: %d", this.expectedLength, this.realLength); diff --git a/src/main/java/org/apache/hadoop/fs/auth/EMRInstanceCredentialsProvider.java b/src/main/java/org/apache/hadoop/fs/auth/EMRInstanceCredentialsProvider.java new file mode 100644 index 00000000..403b8e60 --- /dev/null +++ b/src/main/java/org/apache/hadoop/fs/auth/EMRInstanceCredentialsProvider.java @@ -0,0 +1,59 @@ +package org.apache.hadoop.fs.auth; + +import com.qcloud.cos.auth.*; +import com.qcloud.cos.exception.CosClientException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CosNConfigKeys; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.net.URI; + +/** + * Provide the credentials when the CosN connector is instantiated on Tencent Cloud Virtual Machine(CVM) + */ +public class EMRInstanceCredentialsProvider extends AbstractCOSCredentialProvider implements COSCredentialsProvider { + private static final Logger LOG = LoggerFactory.getLogger(EMRInstanceCredentialsProvider.class); + + private String appId; + private final COSCredentialsProvider cosCredentialsProvider; + + public EMRInstanceCredentialsProvider(@Nullable URI uri, Configuration conf) { + super(uri, conf); + if (null != conf) { + this.appId = conf.get(CosNConfigKeys.COSN_APPID_KEY); + } + InstanceMetadataCredentialsEndpointProvider endpointProvider = + new InstanceMetadataCredentialsEndpointProvider( + InstanceMetadataCredentialsEndpointProvider.Instance.EMR); + InstanceCredentialsFetcher instanceCredentialsFetcher = new InstanceCredentialsFetcher(endpointProvider); + this.cosCredentialsProvider = new InstanceCredentialsProvider(instanceCredentialsFetcher); + } + + @Override + public COSCredentials getCredentials() { + try { + COSCredentials cosCredentials = this.cosCredentialsProvider.getCredentials(); + // Compatible appId + if (null != this.appId) { + if (cosCredentials instanceof InstanceProfileCredentials) { + return new InstanceProfileCredentials(this.appId, cosCredentials.getCOSAccessKeyId(), + cosCredentials.getCOSSecretKey(), + ((InstanceProfileCredentials) cosCredentials).getSessionToken(), + ((InstanceProfileCredentials) cosCredentials).getExpiredTime()); + } + } + return cosCredentials; + } catch (CosClientException e) { + LOG.error("Failed to obtain the credentials from CVMInstanceCredentialsProvider.", e); + } + + return null; + } + + @Override + public void refresh() { + this.cosCredentialsProvider.refresh(); + } +} diff --git a/src/main/java/org/apache/hadoop/fs/auth/NoAuthWithCOSException.java b/src/main/java/org/apache/hadoop/fs/auth/NoAuthWithCOSException.java index 37ea5906..763b49d9 100644 --- a/src/main/java/org/apache/hadoop/fs/auth/NoAuthWithCOSException.java +++ b/src/main/java/org/apache/hadoop/fs/auth/NoAuthWithCOSException.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.hadoop.fs.auth; import com.qcloud.cos.exception.CosClientException; diff --git a/src/main/java/org/apache/hadoop/fs/auth/RangerCredentialsProvider.java b/src/main/java/org/apache/hadoop/fs/auth/RangerCredentialsProvider.java new file mode 100644 index 00000000..8fa1a94f --- /dev/null +++ b/src/main/java/org/apache/hadoop/fs/auth/RangerCredentialsProvider.java @@ -0,0 +1,103 @@ +package org.apache.hadoop.fs.auth; + +import com.qcloud.cos.auth.BasicSessionCredentials; +import com.qcloud.cos.auth.COSCredentials; +import com.qcloud.cos.auth.COSCredentialsProvider; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CosFileSystem; +import org.apache.hadoop.fs.CosNConfigKeys; +import org.apache.hadoop.fs.cosn.ranger.security.sts.GetSTSResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.net.URI; +import java.util.Date; +import java.util.concurrent.atomic.AtomicReference; + +public class RangerCredentialsProvider extends AbstractCOSCredentialProvider implements COSCredentialsProvider { + private static final Logger log = LoggerFactory.getLogger(RangerCredentialsProvider.class); + private Thread credentialsFetcherDaemonThread; + private CredentialsFetcherDaemon credentialsFetcherDaemon; + private final String bucketName; + private String bucketRegion; + + public RangerCredentialsProvider(@Nullable URI uri, Configuration conf) { + super(uri, conf); + this.bucketName = uri.getHost(); + this.bucketRegion = conf.get(CosNConfigKeys.COSN_REGION_KEY); + if (this.bucketRegion == null || this.bucketRegion.isEmpty()) { + this.bucketRegion = conf.get(CosNConfigKeys.COSN_REGION_PREV_KEY); + } + + credentialsFetcherDaemon = new CredentialsFetcherDaemon( + conf.getInt( + CosNConfigKeys.COSN_RANGER_TEMP_TOKEN_REFRESH_INTERVAL, + CosNConfigKeys.DEFAULT_COSN_RANGER_TEMP_TOKEN_REFRESH_INTERVAL)); + credentialsFetcherDaemonThread = new Thread(credentialsFetcherDaemon); + credentialsFetcherDaemonThread.setDaemon(true); + credentialsFetcherDaemonThread.start(); + } + + class CredentialsFetcherDaemon implements Runnable { + private int refreshIntervalSeconds; + private AtomicReference lastCredentialsRef; + private Date lastFreshDate; + + CredentialsFetcherDaemon(int refreshIntervalSeconds) { + this.refreshIntervalSeconds = refreshIntervalSeconds; + this.lastCredentialsRef = new AtomicReference<>(); + } + + COSCredentials getCredentials() { + COSCredentials lastCred = lastCredentialsRef.get(); + if (lastCred == null) { + return fetchCredentials(); + } + return lastCred; + } + + private COSCredentials fetchCredentials() { + try { + GetSTSResponse stsResp = CosFileSystem.rangerQcloudObjectStorageStorageClient.getSTS(bucketRegion, + bucketName); + return new BasicSessionCredentials(stsResp.getTempAK(), stsResp.getTempSK(), stsResp.getTempToken()); + } catch (IOException e) { + log.error("fetch credentials failed", e); + return null; + } + } + + @Override + public void run() { + while (true) { + Date currentDate = new Date(); + if (lastFreshDate == null || (currentDate.getTime() / 1000 - lastFreshDate.getTime() / 1000) + < this.refreshIntervalSeconds) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + } + continue; + } + + COSCredentials newCred = fetchCredentials(); + if (newCred != null) { + this.lastFreshDate = currentDate; + this.lastCredentialsRef.set(newCred); + } + } + } + } + + @Override + public COSCredentials getCredentials() { + return credentialsFetcherDaemon.getCredentials(); + } + + @Override + public void refresh() { + + } +} diff --git a/src/main/java/org/apache/hadoop/fs/auth/SessionTokenCredentialProvider.java b/src/main/java/org/apache/hadoop/fs/auth/SessionTokenCredentialProvider.java new file mode 100644 index 00000000..284005fe --- /dev/null +++ b/src/main/java/org/apache/hadoop/fs/auth/SessionTokenCredentialProvider.java @@ -0,0 +1,49 @@ +package org.apache.hadoop.fs.auth; + +import com.qcloud.cos.auth.BasicSessionCredentials; +import com.qcloud.cos.auth.COSCredentials; +import com.qcloud.cos.auth.COSCredentialsProvider; +import com.qcloud.cos.utils.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CosNConfigKeys; + +import javax.annotation.Nullable; +import java.net.URI; + +public class SessionTokenCredentialProvider extends AbstractCOSCredentialProvider + implements COSCredentialsProvider { + private String appId; + private String secretId; + private String secretKey; + private String sessionToken; + + public SessionTokenCredentialProvider(@Nullable URI uri, Configuration conf) { + super(uri, conf); + if (null != conf) { + this.appId = conf.get(CosNConfigKeys.COSN_APPID_KEY); + this.secretId = conf.get( + CosNConfigKeys.COSN_USERINFO_SECRET_ID_KEY); + this.secretKey = conf.get( + CosNConfigKeys.COSN_USERINFO_SECRET_KEY_KEY); + this.sessionToken = conf.get( + CosNConfigKeys.COSN_USERINFO_SESSION_TOKEN); + } + } + + @Override + public COSCredentials getCredentials() { + if (!StringUtils.isNullOrEmpty(this.secretId) + && !StringUtils.isNullOrEmpty(this.secretKey)) { + if (null != this.appId) { + return new BasicSessionCredentials(this.appId, this.secretId, this.secretKey,this.sessionToken); + } else { + return new BasicSessionCredentials(this.secretId, this.secretKey, this.sessionToken); + } + } + return null; + } + + @Override + public void refresh() { + } +} diff --git a/src/main/java/org/apache/hadoop/fs/buffer/CosNMappedBufferFactory.java b/src/main/java/org/apache/hadoop/fs/buffer/CosNMappedBufferFactory.java index f27e5fd8..a5075195 100644 --- a/src/main/java/org/apache/hadoop/fs/buffer/CosNMappedBufferFactory.java +++ b/src/main/java/org/apache/hadoop/fs/buffer/CosNMappedBufferFactory.java @@ -72,8 +72,7 @@ public CosNByteBuffer create(int size) { "rw"); randomAccessFile.setLength(size); MappedByteBuffer buf = - randomAccessFile.getChannel().map(FileChannel.MapMode.READ_WRITE, 0 - , size); + randomAccessFile.getChannel().map(FileChannel.MapMode.READ_WRITE, 0 , size); return new CosNMappedBuffer(buf, randomAccessFile, tmpFile); } catch (IOException e) { LOG.error("Create tmp file failed. Tmp dir: {}", this.tmpDir, e); diff --git a/src/main/java/org/apache/hadoop/fs/cosn/ranger/client/RangerQcloudObjectStorageClient.java b/src/main/java/org/apache/hadoop/fs/cosn/ranger/client/RangerQcloudObjectStorageClient.java new file mode 100644 index 00000000..8e8cc96f --- /dev/null +++ b/src/main/java/org/apache/hadoop/fs/cosn/ranger/client/RangerQcloudObjectStorageClient.java @@ -0,0 +1,42 @@ +package org.apache.hadoop.fs.cosn.ranger.client; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.cosn.ranger.security.authorization.PermissionRequest; +import org.apache.hadoop.fs.cosn.ranger.security.sts.GetSTSResponse; +import org.apache.hadoop.security.token.Token; + +import java.io.IOException; + +public interface RangerQcloudObjectStorageClient { + public void init(Configuration conf) throws IOException; + + public String getCanonicalServiceName(); + + public boolean checkPermission(PermissionRequest permissionRequest) throws IOException; + + public Token getDelegationToken(String renewer) throws IOException; + + /** + * Renew the given token. + * + * @return the new expiration time + * @throws IOException + * @throws InterruptedException + */ + public long renew(Token token, + Configuration conf + ) throws IOException, InterruptedException; + + /** + * Cancel the given token + * + * @throws IOException + * @throws InterruptedException + */ + + public void cancel(Token token, Configuration configuration) throws IOException, InterruptedException; + + public GetSTSResponse getSTS(String bucketRegion, String bucketName) throws IOException; + + public void close(); +} diff --git a/src/main/java/org/apache/hadoop/fs/cosn/ranger/security/authorization/AccessType.java b/src/main/java/org/apache/hadoop/fs/cosn/ranger/security/authorization/AccessType.java new file mode 100644 index 00000000..c7ea6d9a --- /dev/null +++ b/src/main/java/org/apache/hadoop/fs/cosn/ranger/security/authorization/AccessType.java @@ -0,0 +1,8 @@ +package org.apache.hadoop.fs.cosn.ranger.security.authorization; + +public enum AccessType { + LIST, + WRITE, + READ, + DELETE, +} diff --git a/src/main/java/org/apache/hadoop/fs/cosn/ranger/security/authorization/PermissionRequest.java b/src/main/java/org/apache/hadoop/fs/cosn/ranger/security/authorization/PermissionRequest.java new file mode 100644 index 00000000..b4ec1706 --- /dev/null +++ b/src/main/java/org/apache/hadoop/fs/cosn/ranger/security/authorization/PermissionRequest.java @@ -0,0 +1,46 @@ +package org.apache.hadoop.fs.cosn.ranger.security.authorization; + +public class PermissionRequest { + private ServiceType serviceType; + private AccessType accessType; + + private String bucketName; + private String objectKey; + + private String fsMountPoint; + private String chdfsPath; + + public PermissionRequest(ServiceType serviceType, AccessType accessType, + String bucketName, String objectKey, String fsMountPoint, String chdfsPath) { + this.serviceType = serviceType; + this.accessType = accessType; + this.bucketName = bucketName; + this.objectKey = objectKey; + this.fsMountPoint = fsMountPoint; + this.chdfsPath = chdfsPath; + } + + public ServiceType getServiceType() { + return serviceType; + } + + public AccessType getAccessType() { + return accessType; + } + + public String getBucketName() { + return bucketName; + } + + public String getObjectKey() { + return objectKey; + } + + public String getFsMountPoint() { + return fsMountPoint; + } + + public String getChdfsPath() { + return chdfsPath; + } +} diff --git a/src/main/java/org/apache/hadoop/fs/cosn/ranger/security/authorization/ServiceType.java b/src/main/java/org/apache/hadoop/fs/cosn/ranger/security/authorization/ServiceType.java new file mode 100644 index 00000000..d50f5bad --- /dev/null +++ b/src/main/java/org/apache/hadoop/fs/cosn/ranger/security/authorization/ServiceType.java @@ -0,0 +1,6 @@ +package org.apache.hadoop.fs.cosn.ranger.security.authorization; + +public enum ServiceType { + COS, + CHDFS, +} diff --git a/src/main/java/org/apache/hadoop/fs/cosn/ranger/security/sts/GetSTSRequest.java b/src/main/java/org/apache/hadoop/fs/cosn/ranger/security/sts/GetSTSRequest.java new file mode 100644 index 00000000..2dd8414a --- /dev/null +++ b/src/main/java/org/apache/hadoop/fs/cosn/ranger/security/sts/GetSTSRequest.java @@ -0,0 +1,31 @@ +package org.apache.hadoop.fs.cosn.ranger.security.sts; + +public class GetSTSRequest { + private String bucketName; + private String bucketRegion; + private String allowPrefix; + + public String getBucketName() { + return bucketName; + } + + public void setBucketName(String bucketName) { + this.bucketName = bucketName; + } + + public String getBucketRegion() { + return bucketRegion; + } + + public void setBucketRegion(String bucketRegion) { + this.bucketRegion = bucketRegion; + } + + public String getAllowPrefix() { + return allowPrefix; + } + + public void setAllowPrefix(String allowPrefix) { + this.allowPrefix = allowPrefix; + } +} \ No newline at end of file diff --git a/src/main/java/org/apache/hadoop/fs/cosn/ranger/security/sts/GetSTSResponse.java b/src/main/java/org/apache/hadoop/fs/cosn/ranger/security/sts/GetSTSResponse.java new file mode 100644 index 00000000..80b43ac3 --- /dev/null +++ b/src/main/java/org/apache/hadoop/fs/cosn/ranger/security/sts/GetSTSResponse.java @@ -0,0 +1,31 @@ +package org.apache.hadoop.fs.cosn.ranger.security.sts; + +public class GetSTSResponse { + private String tempAK; + private String tempSK; + private String tempToken; + + public String getTempAK() { + return tempAK; + } + + public void setTempAK(String tempAK) { + this.tempAK = tempAK; + } + + public String getTempSK() { + return tempSK; + } + + public void setTempSK(String tempSK) { + this.tempSK = tempSK; + } + + public String getTempToken() { + return tempToken; + } + + public void setTempToken(String tempToken) { + this.tempToken = tempToken; + } +} \ No newline at end of file