diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..dd37215e --- /dev/null +++ b/.gitignore @@ -0,0 +1,43 @@ +# Created by .ignore support plugin (hsz.mobi) +# User-specific stuff +.DS_Store +.idea/ + +# Gradle and Maven with auto-import +# When using Gradle or Maven with auto-import, you should exclude module files, +# since they will be recreated, and may cause churn. Uncomment if using +# auto-import. +*.iml +*.ipr + +### Java template +# Compiled class file +*.class +target/ + +# Log file +*.log + +# BlueJ files +*.ctxt + +# Mobile Tools for Java (J2ME) +.mtj.tmp/ + +# Package Files # +*.war +*.nar +*.ear +*.zip +*.tar.gz +*.rar + +pom.xml-E +pom.xml.versionsBackup + +# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml +hs_err_pid* + +# local dep +dep + diff --git a/LICENSE b/LICENSE index 03297876..293af4c0 100644 --- a/LICENSE +++ b/LICENSE @@ -18,4 +18,4 @@ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. +SOFTWARE. \ No newline at end of file diff --git a/compile.sh b/compile.sh index 155ec627..daf13efc 100644 --- a/compile.sh +++ b/compile.sh @@ -18,4 +18,4 @@ do cp target/*.pom dep/${hadoop_version}/ mvn versions:set -DnewVersion=${origin_version} -done \ No newline at end of file +done diff --git a/pom.xml b/pom.xml index c9a7d65d..728cfd86 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.qcloud.cos hadoop-cos - 5.10.0 + 8.0.1 jar Apache Hadoop Tencent Qcloud COS Support @@ -41,7 +41,7 @@ 1.7 1.7 3.3.0 - 5.6.60 + 5.6.62 24.1.1-jre 3.1 4.8 @@ -62,6 +62,13 @@ ${hadoop.version} provided + + + com.qcloud + cosn-ranger-interface + 1.0.3 + provided + @@ -88,7 +95,6 @@ - org.apache.maven.plugins @@ -156,6 +162,5 @@ - diff --git a/src/main/java/com/qcloud/chdfs/permission/RangerAccessType.java b/src/main/java/com/qcloud/chdfs/permission/RangerAccessType.java new file mode 100644 index 00000000..7082c857 --- /dev/null +++ b/src/main/java/com/qcloud/chdfs/permission/RangerAccessType.java @@ -0,0 +1,12 @@ +package com.qcloud.chdfs.permission; + +public enum RangerAccessType { + + LIST, + WRITE, + READ, + DELETE; + + private RangerAccessType() { + } +} diff --git a/src/main/java/org/apache/hadoop/fs/CosFileSystem.java b/src/main/java/org/apache/hadoop/fs/CosFileSystem.java index 36c94970..9a305647 100644 --- a/src/main/java/org/apache/hadoop/fs/CosFileSystem.java +++ b/src/main/java/org/apache/hadoop/fs/CosFileSystem.java @@ -2,6 +2,8 @@ import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.qcloud.cos.model.HeadBucketResult; +import com.qcloud.chdfs.permission.RangerAccessType; import com.qcloud.cos.utils.StringUtils; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; @@ -12,6 +14,7 @@ import org.apache.hadoop.fs.cosn.ranger.client.RangerQcloudObjectStorageClient; import org.apache.hadoop.fs.cosn.ranger.security.authorization.AccessType; import org.apache.hadoop.fs.cosn.ranger.security.authorization.PermissionRequest; +import org.apache.hadoop.fs.cosn.ranger.security.authorization.PermissionResponse; import org.apache.hadoop.fs.cosn.ranger.security.authorization.ServiceType; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.retry.RetryPolicies; @@ -20,6 +23,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Progressable; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,7 +51,6 @@ public class CosFileSystem extends FileSystem { static final String SCHEME = "cosn"; static final String PATH_DELIMITER = Path.SEPARATOR; - static final int COS_MAX_LISTING_LENGTH = 999; static final Charset METADATA_ENCODING = StandardCharsets.UTF_8; // The length of name:value pair should be less than or equal to 1024 bytes. @@ -55,6 +58,10 @@ public class CosFileSystem extends FileSystem { private URI uri; private String bucket; + private boolean isMergeBucket; + private boolean checkMergeBucket; + private int mergeBucketMaxListNum; + private int normalBucketMaxListNum; private NativeFileSystemStore store; private Path workingDir; private String owner = "Unknown"; @@ -68,6 +75,8 @@ public class CosFileSystem extends FileSystem { private boolean enableRangerPluginPermissionCheck = false; public static RangerQcloudObjectStorageClient rangerQcloudObjectStorageStorageClient = null; + private String formatBucket; + public CosFileSystem() { } @@ -96,6 +105,7 @@ public void initialize(URI uri, Configuration conf) throws IOException { initRangerClientImpl(conf); this.bucket = uri.getHost(); + this.formatBucket = CosNUtils.formatBucket(uri.getHost(), conf); if (this.store == null) { this.store = createDefaultStore(conf); @@ -113,6 +123,30 @@ public void initialize(URI uri, Configuration conf) throws IOException { uri, bucket, workingDir, owner, group, conf); BufferPool.getInstance().initialize(getConf()); + // head the bucket to judge whether the merge bucket + this.isMergeBucket = false; + this.checkMergeBucket = this.getConf().getBoolean( + CosNConfigKeys.OPEN_CHECK_MERGE_BUCKET, + CosNConfigKeys.DEFAULT_CHECK_MERGE_BUCKET + ); + + if (this.checkMergeBucket) { // control + HeadBucketResult headBucketResult = this.store.headBucket(this.bucket); + int mergeBucketMaxListNum = this.getConf().getInt( + CosNConfigKeys.MERGE_BUCKET_MAX_LIST_NUM, + CosNConfigKeys.DEFAULT_MERGE_BUCKET_MAX_LIST_NUM + ); + if (headBucketResult.isMergeBucket()) { + this.isMergeBucket = true; + this.mergeBucketMaxListNum = mergeBucketMaxListNum; + this.store.setMergeBucket(true); + } + } + LOG.info("cos file system bucket is merged {}", this.isMergeBucket); + this.normalBucketMaxListNum = this.getConf().getInt( + CosNConfigKeys.NORMAL_BUCKET_MAX_LIST_NUM, + CosNConfigKeys.DEFAULT_NORMAL_BUCKET_MAX_LIST_NUM); + // initialize the thread pool int uploadThreadPoolSize = this.getConf().getInt( CosNConfigKeys.UPLOAD_THREAD_POOL_SIZE_KEY, @@ -127,7 +161,6 @@ public void initialize(URI uri, Configuration conf) throws IOException { CosNConfigKeys.THREAD_KEEP_ALIVE_TIME_KEY, CosNConfigKeys.DEFAULT_THREAD_KEEP_ALIVE_TIME ); - this.boundedIOThreadPool = new ThreadPoolExecutor( ioThreadPoolSize / 2, ioThreadPoolSize, threadKeepAlive, TimeUnit.SECONDS, @@ -295,7 +328,7 @@ public FSDataOutputStream create(Path f, FsPermission permission, long blockSize, Progressable progress) throws IOException { - checkPermission(f, AccessType.WRITE); + checkPermission(f, RangerAccessType.WRITE); if (exists(f) && !overwrite) { throw new FileAlreadyExistsException("File already exists: " + f); @@ -332,8 +365,7 @@ private boolean rejectRootDirectoryDelete(boolean isEmptyDir, public boolean delete(Path f, boolean recursive) throws IOException { LOG.debug("Ready to delete path: {}. recursive: {}.", f, recursive); - checkPermission(f, AccessType.DELETE); - + checkPermission(f, RangerAccessType.DELETE); FileStatus status; try { status = getFileStatus(f); @@ -341,6 +373,7 @@ public boolean delete(Path f, boolean recursive) throws IOException { LOG.debug("Delete called for '{}', but the file does not exist and returning the false.", f); return false; } + Path absolutePath = makeAbsolute(f); String key = pathToKey(absolutePath); if (key.compareToIgnoreCase("/") == 0) { @@ -358,61 +391,77 @@ public boolean delete(Path f, boolean recursive) throws IOException { + " as is a not empty directory and recurse option is" + " false"); } + // how to tell the result + if (!isMergeBucket) { + internalRecursiveDelete(key); + } else { + internalAutoRecursiveDelete(key); + } + } else { + LOG.debug("Delete the cos key [{}].", key); + store.delete(key); + } - CosNDeleteFileContext deleteFileContext = new CosNDeleteFileContext(); - int deleteToFinishes = 0; - - String priorLastKey = null; - do { - CosNPartialListing listing = - store.list(key, COS_MAX_LISTING_LENGTH, priorLastKey, true); - for (FileMetadata file : listing.getFiles()) { - this.boundedCopyThreadPool.execute(new CosNDeleteFileTask( - this.store, file.getKey(), deleteFileContext)); - deleteToFinishes++; - if (!deleteFileContext.isDeleteSuccess()) { - break; - } + if (!isMergeBucket) { + createParentDirectoryIfNecessary(f); + } + return true; + } + + + private void internalRecursiveDelete(String key) throws IOException { + CosNDeleteFileContext deleteFileContext = new CosNDeleteFileContext(); + int deleteToFinishes = 0; + + String priorLastKey = null; + do { + CosNPartialListing listing = + store.list(key, this.normalBucketMaxListNum, priorLastKey, true); + for (FileMetadata file : listing.getFiles()) { + this.boundedCopyThreadPool.execute(new CosNDeleteFileTask( + this.store, file.getKey(), deleteFileContext)); + deleteToFinishes++; + if (!deleteFileContext.isDeleteSuccess()) { + break; } - for (FileMetadata commonPrefix : listing.getCommonPrefixes()) { - this.boundedCopyThreadPool.execute(new CosNDeleteFileTask( - this.store, commonPrefix.getKey(), deleteFileContext)); - deleteToFinishes++; - if (!deleteFileContext.isDeleteSuccess()) { - break; - } + } + for (FileMetadata commonPrefix : listing.getCommonPrefixes()) { + this.boundedCopyThreadPool.execute(new CosNDeleteFileTask( + this.store, commonPrefix.getKey(), deleteFileContext)); + deleteToFinishes++; + if (!deleteFileContext.isDeleteSuccess()) { + break; } - priorLastKey = listing.getPriorLastKey(); - } while (priorLastKey != null); - - deleteFileContext.lock(); - try { - deleteFileContext.awaitAllFinish(deleteToFinishes); - } catch (InterruptedException e) { - LOG.warn("interrupted when wait delete to finish"); - } finally { - deleteFileContext.unlock(); } + priorLastKey = listing.getPriorLastKey(); + } while (priorLastKey != null); - // according the flag and exception in thread opr to throw this out - if (!deleteFileContext.isDeleteSuccess() && deleteFileContext.hasException()) { - throw deleteFileContext.getIOException(); - } + deleteFileContext.lock(); + try { + deleteFileContext.awaitAllFinish(deleteToFinishes); + } catch (InterruptedException e) { + LOG.warn("interrupted when wait delete to finish"); + } finally { + deleteFileContext.unlock(); + } - try { - LOG.debug("Delete the cos key [{}].", key); - store.delete(key); - } catch (Exception e) { - LOG.error("Delete the key failed."); - } + // according the flag and exception in thread opr to throw this out + if (!deleteFileContext.isDeleteSuccess() && deleteFileContext.hasException()) { + throw deleteFileContext.getIOException(); + } - } else { + try { LOG.debug("Delete the cos key [{}].", key); store.delete(key); + } catch (Exception e) { + LOG.error("Delete the key failed."); } + } - createParentDirectoryIfNecessary(f); - return true; + // use by merge bucket which support recursive delete dirs by setting flag parameter + private void internalAutoRecursiveDelete(String key) throws IOException { + LOG.debug("Delete the cos key auto recursive [{}].", key); + store.deleteRecursive(key); } @Override @@ -421,7 +470,7 @@ public FileStatus getFileStatus(Path f) throws IOException { Path absolutePath = makeAbsolute(f); String key = pathToKey(absolutePath); - if (key.length() == 0) { // root always exists + if (key.length() == 0 || key.equals(PATH_DELIMITER)) { // root always exists return newDirectory(absolutePath); } @@ -435,6 +484,10 @@ public FileStatus getFileStatus(Path f) throws IOException { LOG.debug("Retrieve the cos key [{}] to find that it is a directory.", key); return newDirectory(meta, absolutePath); } + } + + if (isMergeBucket) { + throw new FileNotFoundException("No such file or directory in merge bucket'" + absolutePath + "'"); } if (!key.endsWith(PATH_DELIMITER)) { @@ -481,12 +534,17 @@ public URI getUri() { *

*/ @Override - public FileStatus[] listStatus(Path f) throws IOException { + public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException { LOG.debug("list status:" + f); - checkPermission(f, AccessType.LIST); + checkPermission(f, RangerAccessType.LIST); Path absolutePath = makeAbsolute(f); String key = pathToKey(absolutePath); + int listMaxLength = this.normalBucketMaxListNum; + if (checkMergeBucket && isMergeBucket) { + listMaxLength = this.mergeBucketMaxListNum; + } + if (key.length() > 0) { FileMetadata meta = store.retrieveMetadata(key); @@ -499,11 +557,21 @@ public FileStatus[] listStatus(Path f) throws IOException { key += PATH_DELIMITER; } + // if introduce the get file status to solve the file not exist exception + // will expand the list qps, for now only used get file status for merge bucket. + if (this.isMergeBucket) { + try { + this.getFileStatus(f); + } catch (FileNotFoundException e) { + throw new FileNotFoundException("No such file or directory:" + f); + } + } + URI pathUri = absolutePath.toUri(); Set status = new TreeSet(); String priorLastKey = null; do { - CosNPartialListing listing = store.list(key, COS_MAX_LISTING_LENGTH, + CosNPartialListing listing = store.list(key, listMaxLength, priorLastKey, false); for (FileMetadata fileMetadata : listing.getFiles()) { Path subPath = keyToPath(fileMetadata.getKey()); @@ -578,7 +646,7 @@ private void validatePath(Path path) throws IOException { public boolean mkdirs(Path f, FsPermission permission) throws IOException { LOG.debug("mkdirs path: {}.", f); - checkPermission(f, AccessType.WRITE); + checkPermission(f, RangerAccessType.WRITE); try { FileStatus fileStatus = getFileStatus(f); if (fileStatus.isDirectory()) { @@ -588,7 +656,13 @@ public boolean mkdirs(Path f, FsPermission permission) } } catch (FileNotFoundException e) { validatePath(f); - return mkDirRecursively(f, permission); + boolean result; + if (isMergeBucket) { + result = mkDirAutoRecursively(f, permission); + } else { + result = mkDirRecursively(f, permission); + } + return result; } } @@ -641,9 +715,30 @@ public boolean mkDirRecursively(Path f, FsPermission permission) return true; } + /** + * Create a directory recursively auto by merge plan + * which the put object interface decide the dir which end with the "/" + * + * @param f Absolute path to the directory + * @param permission Directory permissions + * @return Return true if the creation was successful, throw a IOException. + * @throws IOException An IOException occurred when creating a directory object on COS. + */ + public boolean mkDirAutoRecursively(Path f, FsPermission permission) + throws IOException { + LOG.debug("Make the directory recursively auto. Path: {}, FsPermission: {}.", f, permission); + // add the end '/' to the path which server auto create middle dir + String folderPath = pathToKey(makeAbsolute(f)); + if (!folderPath.endsWith(PATH_DELIMITER)) { + folderPath += PATH_DELIMITER; + } + store.storeEmptyFile(folderPath); + return true; + } @Override public FSDataInputStream open(Path f, int bufferSize) throws IOException { - checkPermission(f, AccessType.READ); + checkPermission(f, RangerAccessType.READ); + LOG.debug("Open file [{}] to read, buffer [{}]", f, bufferSize); FileStatus fileStatus = getFileStatus(f); // will throw if the file doesn't // exist @@ -659,13 +754,12 @@ public FSDataInputStream open(Path f, int bufferSize) throws IOException { bufferSize)); } - @Override public boolean rename(Path src, Path dst) throws IOException { LOG.debug("Rename the source path [{}] to the dest path [{}].", src, dst); - checkPermission(src, AccessType.DELETE); - checkPermission(dst, AccessType.WRITE); + checkPermission(src, RangerAccessType.DELETE); + checkPermission(dst, RangerAccessType.WRITE); // Renaming the root directory is not allowed if (src.isRoot()) { @@ -752,11 +846,18 @@ public boolean rename(Path src, Path dst) throws IOException { // The default root directory is definitely there. } + if (!isMergeBucket) { + return internalCopyAndDelete(src, dst, srcFileStatus.isDirectory()); + } else { + return internalRename(src, dst); + } + } + private boolean internalCopyAndDelete(Path srcPath, Path dstPath, boolean isDir) throws IOException { boolean result = false; - if (srcFileStatus.isDirectory()) { - result = this.copyDirectory(src, dst); + if (isDir) { + result = this.copyDirectory(srcPath, dstPath); } else { - result = this.copyFile(src, dst); + result = this.copyFile(srcPath, dstPath); } if (!result) { @@ -765,8 +866,14 @@ public boolean rename(Path src, Path dst) throws IOException { // ensure data security. return false; } else { - return this.delete(src, true); + return this.delete(srcPath, true); } + } + private boolean internalRename(Path srcPath, Path dstPath) throws IOException { + String srcKey = pathToKey(srcPath); + String dstKey = pathToKey(dstPath); + this.store.rename(srcKey, dstKey); + return true; } private boolean copyFile(Path srcPath, Path dstPath) throws IOException { @@ -803,7 +910,7 @@ private boolean copyDirectory(Path srcPath, Path dstPath) throws IOException { String priorLastKey = null; do { CosNPartialListing objectList = this.store.list(srcKey, - COS_MAX_LISTING_LENGTH, priorLastKey, true); + this.normalBucketMaxListNum, priorLastKey, true); for (FileMetadata file : objectList.getFiles()) { this.boundedCopyThreadPool.execute(new CosNCopyFileTask( this.store, @@ -898,7 +1005,7 @@ private void initRangerClientImpl(Configuration conf) throws IOException { synchronized (CosFileSystem.class) { if (rangerQcloudObjectStorageStorageClient == null) { try { - RangerQcloudObjectStorageClient tmpClient = + RangerQcloudObjectStorageClient tmpClient = (RangerQcloudObjectStorageClient) rangerClientImplClass.newInstance(); tmpClient.init(conf); rangerQcloudObjectStorageStorageClient = tmpClient; @@ -927,7 +1034,7 @@ public FileChecksum getFileChecksum(Path f, long length) throws IOException { LOG.debug("call the checksum for the path: {}.", f); // The order of each file, must support both crc at same time, how to tell the difference crc request? - checkPermission(f, AccessType.READ); + checkPermission(f, RangerAccessType.READ); if (this.getConf().getBoolean(CosNConfigKeys.CRC64_CHECKSUM_ENABLED, CosNConfigKeys.DEFAULT_CRC64_CHECKSUM_ENABLED)) { @@ -970,7 +1077,7 @@ public FileChecksum getFileChecksum(Path f, long length) throws IOException { public void setXAttr(Path f, String name, byte[] value, EnumSet flag) throws IOException { LOG.debug("set XAttr: {}.", f); - checkPermission(f, AccessType.WRITE); + checkPermission(f, RangerAccessType.WRITE); // First, determine whether the length of the name and value exceeds the limit. if (name.getBytes(METADATA_ENCODING).length + value.length > MAX_XATTR_SIZE) { @@ -1008,7 +1115,7 @@ public void setXAttr(Path f, String name, byte[] value, EnumSet fl public byte[] getXAttr(Path f, String name) throws IOException { LOG.debug("get XAttr: {}.", f); - checkPermission(f, AccessType.READ); + checkPermission(f, RangerAccessType.READ); Path absolutePath = makeAbsolute(f); @@ -1037,7 +1144,7 @@ public byte[] getXAttr(Path f, String name) throws IOException { public Map getXAttrs(Path f, List names) throws IOException { LOG.debug("get XAttrs: {}.", f); - checkPermission(f, AccessType.READ); + checkPermission(f, RangerAccessType.READ); Path absolutePath = makeAbsolute(f); @@ -1064,7 +1171,7 @@ public Map getXAttrs(Path f, List names) throws IOExcept public Map getXAttrs(Path f) throws IOException { LOG.debug("get XAttrs: {}.", f); - checkPermission(f, AccessType.READ); + checkPermission(f, RangerAccessType.READ); Path absolutePath = makeAbsolute(f); @@ -1088,7 +1195,7 @@ public Map getXAttrs(Path f) throws IOException { public void removeXAttr(Path f, String name) throws IOException { LOG.debug("remove XAttr: {}.", f); - checkPermission(f, AccessType.WRITE); + checkPermission(f, RangerAccessType.WRITE); Path absolutPath = makeAbsolute(f); @@ -1115,7 +1222,7 @@ public void removeXAttr(Path f, String name) throws IOException { public List listXAttrs(Path f) throws IOException { LOG.debug("list XAttrs: {}.", f); - checkPermission(f, AccessType.READ); + checkPermission(f, RangerAccessType.READ); Path absolutePath = makeAbsolute(f); @@ -1130,17 +1237,37 @@ public List listXAttrs(Path f) throws IOException { @Override public Token getDelegationToken(String renewer) throws IOException { + LOG.info("getDelegationToken, renewer: {}, stack: {}", + renewer, Arrays.toString(Thread.currentThread().getStackTrace()).replace( ',', '\n' )); if (rangerQcloudObjectStorageStorageClient != null) { return rangerQcloudObjectStorageStorageClient.getDelegationToken(renewer); } return super.getDelegationToken(renewer); } - - private void checkPermission(Path f, AccessType accessType) throws IOException { + private void checkPermission(Path f, RangerAccessType rangerAccessType) throws IOException { if (!this.enableRangerPluginPermissionCheck) { return; } + + AccessType accessType = null; + switch (rangerAccessType) { + case LIST: + accessType = AccessType.LIST; + break; + case WRITE: + accessType = AccessType.WRITE; + break; + case READ: + accessType = AccessType.READ; + break; + case DELETE: + accessType = AccessType.DELETE; + break; + default: + throw new IOException(String.format("unknown access type %s", rangerAccessType.toString())); + } + Path absolutePath = makeAbsolute(f); String allowKey = pathToKey(absolutePath); if (allowKey.startsWith("/")) { @@ -1149,16 +1276,21 @@ private void checkPermission(Path f, AccessType accessType) throws IOException { PermissionRequest permissionReq = new PermissionRequest(ServiceType.COS, accessType, - this.bucket, allowKey, "", ""); - boolean allowed = rangerQcloudObjectStorageStorageClient.checkPermission(permissionReq); + this.formatBucket, allowKey, "", ""); + boolean allowed = false; + PermissionResponse permission = rangerQcloudObjectStorageStorageClient.checkPermission(permissionReq); + if (permission != null) { + allowed = permission.isAllowed(); + } if (!allowed) { throw new IOException(String.format("Permission denied, [key: %s], [user: %s], [operation: %s]", - allowKey, currentUser.getShortUserName(), accessType.name())); + allowKey, currentUser.getShortUserName(), rangerAccessType.name())); } } @Override public void close() throws IOException { + LOG.info("begin to close cos file system"); try { super.close(); } finally { diff --git a/src/main/java/org/apache/hadoop/fs/CosNConfigKeys.java b/src/main/java/org/apache/hadoop/fs/CosNConfigKeys.java index d0032a34..81c7c61b 100644 --- a/src/main/java/org/apache/hadoop/fs/CosNConfigKeys.java +++ b/src/main/java/org/apache/hadoop/fs/CosNConfigKeys.java @@ -4,6 +4,7 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.cosn.Unit; + /** * This class contains constants for configuration keys used in the cos file system. */ @@ -11,7 +12,7 @@ @InterfaceStability.Unstable public class CosNConfigKeys extends CommonConfigurationKeys { public static final String USER_AGENT = "fs.cosn.user.agent"; - public static final String DEFAULT_USER_AGENT = "cos-hadoop-plugin-v5.10.0"; + public static final String DEFAULT_USER_AGENT = "cos-hadoop-plugin-v8.0.1"; public static final String TENCENT_EMR_VERSION_KEY = "fs.emr.version"; @@ -74,7 +75,6 @@ public class CosNConfigKeys extends CommonConfigurationKeys { public static final long DEFAULT_READ_AHEAD_BLOCK_SIZE = 1 * Unit.MB; public static final String READ_AHEAD_QUEUE_SIZE = "fs.cosn.read.ahead.queue.size"; public static final int DEFAULT_READ_AHEAD_QUEUE_SIZE = 8; - // used to control getFileStatus list to judge dir whether exist. public static final String FILESTATUS_LIST_MAX_KEYS = "fs.cosn.filestatus.list_max_keys"; public static final int DEFAULT_FILESTATUS_LIST_MAX_KEYS = 2; @@ -82,12 +82,16 @@ public class CosNConfigKeys extends CommonConfigurationKeys { // used for double check complete mpu in case of return cos client exception but status is 200 ok. public static final String COSN_COMPLETE_MPU_CHECK = "fs.cosn.complete.mpu.check"; public static final boolean DEFAULT_COSN_COMPLETE_MPU_CHECK_ENABLE = true; - public static final String MAX_CONNECTION_NUM = "fs.cosn.max.connection.num"; public static final int DEFAULT_MAX_CONNECTION_NUM = 2048; public static final String CUSTOMER_DOMAIN = "fs.cosn.customer.domain"; - + public static final String OPEN_CHECK_MERGE_BUCKET = "fs.cosn.check.merge.bucket"; + public static final boolean DEFAULT_CHECK_MERGE_BUCKET = true; + public static final String MERGE_BUCKET_MAX_LIST_NUM = "fs.cosn.merge.bucket.max.list.num"; + public static final int DEFAULT_MERGE_BUCKET_MAX_LIST_NUM = 5000; + public static final String NORMAL_BUCKET_MAX_LIST_NUM = "fs.cosn.normal.bucket.max.list.num"; + public static final int DEFAULT_NORMAL_BUCKET_MAX_LIST_NUM = 999; public static final String COSN_SERVER_SIDE_ENCRYPTION_ALGORITHM = "fs.cosn.server-side-encryption.algorithm"; public static final String COSN_SERVER_SIDE_ENCRYPTION_KEY = "fs.cosn.server-side-encryption.key"; public static final String COSN_SERVER_SIDE_ENCRYPTION_CONTEXT = "fs.cosn.server-side-encryption.context"; @@ -119,7 +123,6 @@ public class CosNConfigKeys extends CommonConfigurationKeys { public static final String COSN_RANGER_PLUGIN_CLIENT_IMPL = "fs.cosn.ranger.plugin.client.impl"; public static final String DEFAULT_COSN_RANGER_PLUGIN_CLIENT_IMPL = "org.apache.hadoop.fs.cosn.ranger.client.RangerQcloudObjectStorageClientImpl"; - public static final String COSN_CLIENT_SOCKET_TIMEOUTSEC = "fs.cosn.client.socket.timeoutsec"; public static final int DEFAULT_CLIENT_SOCKET_TIMEOUTSEC = 30; } diff --git a/src/main/java/org/apache/hadoop/fs/CosNEncryptionSecrets.java b/src/main/java/org/apache/hadoop/fs/CosNEncryptionSecrets.java index ede4699b..b0942814 100644 --- a/src/main/java/org/apache/hadoop/fs/CosNEncryptionSecrets.java +++ b/src/main/java/org/apache/hadoop/fs/CosNEncryptionSecrets.java @@ -60,7 +60,6 @@ public CosNEncryptionSecrets(final CosNEncryptionMethods encryptionAlgorithm, final String encryptionKey, final String encryptionContext) throws IOException { this(encryptionAlgorithm.getMethod(), encryptionKey, encryptionContext); } - /** * Create a pair of secrets. * @@ -147,7 +146,6 @@ public String getEncryptionKey() { } public String getEncryptionContext() { return encryptionContext; } - /** * Does this instance have encryption options? * That is: is the algorithm non-null. diff --git a/src/main/java/org/apache/hadoop/fs/CosNFSDataOutputStream.java b/src/main/java/org/apache/hadoop/fs/CosNFSDataOutputStream.java index f8741eba..ed4005c5 100644 --- a/src/main/java/org/apache/hadoop/fs/CosNFSDataOutputStream.java +++ b/src/main/java/org/apache/hadoop/fs/CosNFSDataOutputStream.java @@ -8,6 +8,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.cosn.*; import org.apache.hadoop.fs.cosn.buffer.CosNByteBuffer; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/src/main/java/org/apache/hadoop/fs/CosNFSInputStream.java b/src/main/java/org/apache/hadoop/fs/CosNFSInputStream.java index 8d15fc70..25462817 100644 --- a/src/main/java/org/apache/hadoop/fs/CosNFSInputStream.java +++ b/src/main/java/org/apache/hadoop/fs/CosNFSInputStream.java @@ -29,12 +29,14 @@ public static class ReadBuffer { private int status; private long start; private long end; + private IOException exception; public ReadBuffer(long start, long end) { this.start = start; this.end = end; this.buffer = new byte[(int) (this.end - this.start) + 1]; this.status = INIT; + this.exception = null; } public void lock() { @@ -67,6 +69,14 @@ public void setStatus(int status) { this.status = status; } + public void setException(IOException e) { + this.exception = e; + } + + public IOException getException() { + return this.exception; + } + public long getStart() { return start; } @@ -204,10 +214,12 @@ private synchronized void reopen(long pos) throws IOException { } ReadBuffer readBuffer = this.readBufferQueue.poll(); + IOException innerException = null; readBuffer.lock(); try { readBuffer.await(ReadBuffer.INIT); if (readBuffer.getStatus() == ReadBuffer.ERROR) { + innerException = readBuffer.getException(); this.buffer = null; this.bufferStart = -1; this.bufferEnd = -1; @@ -223,7 +235,7 @@ private synchronized void reopen(long pos) throws IOException { } if (null == this.buffer) { - throw new IOException("Null IO stream"); + throw new IOException("Null IO stream.", innerException); } this.position = pos; @@ -348,7 +360,6 @@ public int available() throws IOException { return (int) remaining; } - @Override public void close() throws IOException { if (this.closed) { diff --git a/src/main/java/org/apache/hadoop/fs/CosNFileReadTask.java b/src/main/java/org/apache/hadoop/fs/CosNFileReadTask.java index 117e9aa2..d068e366 100644 --- a/src/main/java/org/apache/hadoop/fs/CosNFileReadTask.java +++ b/src/main/java/org/apache/hadoop/fs/CosNFileReadTask.java @@ -49,6 +49,7 @@ public void run() { this.readBuffer.setStatus(CosNFSInputStream.ReadBuffer.SUCCESS); } catch (IOException e) { this.readBuffer.setStatus(CosNFSInputStream.ReadBuffer.ERROR); + this.readBuffer.setException(e); LOG.error("Exception occurs when retrieve the block range " + "start: " + String.valueOf(this.readBuffer.getStart()) + " " + diff --git a/src/main/java/org/apache/hadoop/fs/CosNPartialListing.java b/src/main/java/org/apache/hadoop/fs/CosNPartialListing.java index d98f0660..df012406 100644 --- a/src/main/java/org/apache/hadoop/fs/CosNPartialListing.java +++ b/src/main/java/org/apache/hadoop/fs/CosNPartialListing.java @@ -2,8 +2,6 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.fs.FileMetadata; -import org.apache.hadoop.fs.NativeFileSystemStore; /** *

diff --git a/src/main/java/org/apache/hadoop/fs/CosNUtils.java b/src/main/java/org/apache/hadoop/fs/CosNUtils.java index f2126281..484fecca 100644 --- a/src/main/java/org/apache/hadoop/fs/CosNUtils.java +++ b/src/main/java/org/apache/hadoop/fs/CosNUtils.java @@ -171,4 +171,15 @@ private static Method getFactoryMethod(Class cl, Class returnType, return null; } } + + public static String formatBucket(String originBucketName, Configuration conf) { + String appidStr = conf.get(CosNConfigKeys.COSN_APPID_KEY); + if (appidStr == null || appidStr.isEmpty()) { + return originBucketName; + } + if (originBucketName.endsWith("-"+appidStr)) { + return originBucketName; + } + return originBucketName + "-" + appidStr; + } } diff --git a/src/main/java/org/apache/hadoop/fs/CosNativeFileSystemStore.java b/src/main/java/org/apache/hadoop/fs/CosNativeFileSystemStore.java index cbafbf0a..a86cc0c3 100644 --- a/src/main/java/org/apache/hadoop/fs/CosNativeFileSystemStore.java +++ b/src/main/java/org/apache/hadoop/fs/CosNativeFileSystemStore.java @@ -51,6 +51,7 @@ public class CosNativeFileSystemStore implements NativeFileSystemStore { private boolean crc32cEnabled; private boolean completeMPUCheckEnabled; private CosNEncryptionSecrets encryptionSecrets; + private boolean isMergeBucket; private CustomerDomainEndpointResolver customerDomainEndpointResolver; private void initCOSClient(URI uri, Configuration conf) throws IOException { @@ -103,10 +104,8 @@ private void initCOSClient(URI uri, Configuration conf) throws IOException { CosNConfigKeys.COSN_CLIENT_SOCKET_TIMEOUTSEC, CosNConfigKeys.DEFAULT_CLIENT_SOCKET_TIMEOUTSEC); config.setSocketTimeout(socketTimeoutSec * 1000); - this.crc32cEnabled = conf.getBoolean(CosNConfigKeys.CRC32C_CHECKSUM_ENABLED, CosNConfigKeys.DEFAULT_CRC32C_CHECKSUM_ENABLED); - this.completeMPUCheckEnabled = conf.getBoolean(CosNConfigKeys.COSN_COMPLETE_MPU_CHECK, CosNConfigKeys.DEFAULT_COSN_COMPLETE_MPU_CHECK_ENABLE); @@ -194,6 +193,7 @@ public void initialize(URI uri, Configuration conf) throws IOException { try { initCOSClient(uri, conf); this.bucketName = uri.getHost(); + this.isMergeBucket = false; String storageClass = conf.get(CosNConfigKeys.COSN_STORAGE_CLASS_KEY); if (null != storageClass && !storageClass.isEmpty()) { try { @@ -216,6 +216,11 @@ public void initialize(URI uri, Configuration conf) throws IOException { } } + @Override + public void setMergeBucket(boolean isMergeBucket) { + this.isMergeBucket = isMergeBucket; + } + private void storeFileWithRetry(String key, InputStream inputStream, byte[] md5Hash, long length) throws IOException { @@ -265,6 +270,20 @@ private void storeFileWithRetry(String key, InputStream inputStream, } } + @Override + public HeadBucketResult headBucket(String bucketName) throws IOException { + HeadBucketRequest headBucketRequest = new HeadBucketRequest(bucketName); + try { + HeadBucketResult result = (HeadBucketResult) callCOSClientWithRetry(headBucketRequest); + return result; + } catch (Exception e) { + String errMsg = String.format("head bucket [%s] occurs an exception", + bucketName, e.toString()); + handleException(new Exception(errMsg), bucketName); + } + return null; // never will get here + } + @Override public void storeFile(String key, File file, byte[] md5Hash) throws IOException { if (null != md5Hash) { @@ -511,9 +530,17 @@ private FileMetadata queryObjectMetadata(String key, } } } + } + boolean isFile = true; + if (isMergeBucket) { + if (objectMetadata.isFileModeDir() || key.equals(PATH_DELIMITER)) { + isFile = false; + } + } else { + isFile = !key.endsWith(PATH_DELIMITER); } FileMetadata fileMetadata = - new FileMetadata(key, fileSize, mtime, !key.endsWith(PATH_DELIMITER), + new FileMetadata(key, fileSize, mtime, isFile, ETag, crc64ecm, crc32cm, versionId, objectMetadata.getStorageClass(), userMetadata); // record the last request result info @@ -971,77 +998,118 @@ public void delete(String key) throws IOException { } /** - * rename operation - * @param srcKey src cos key - * @param dstKey dst cos key - * @throws IOException + * delete recursive only used on merge bucket to delete dir recursive + * @param key cos key + * @throws IOException e */ - public void rename(String srcKey, String dstKey) throws IOException { - LOG.debug("Rename the source cos key [{}] to the dest cos key [{}].", srcKey, dstKey); + @Override + public void deleteRecursive(String key) throws IOException { + LOG.debug("Delete the cos key recursive: {} from bucket: {}.", key, this.bucketName); + try { + DeleteObjectRequest deleteObjectRequest = + new DeleteObjectRequest(bucketName, key); + deleteObjectRequest.setRecursive(true); + callCOSClientWithRetry(deleteObjectRequest); + } catch (Exception e) { + String errMsg = String.format("Deleting the cos key recursive [%s] occurs an exception: " + + "%s", key, e.toString()); + handleException(new Exception(errMsg), key); + } + } + + @Override + public void copy(String srcKey, String dstKey) throws IOException { try { ObjectMetadata objectMetadata = new ObjectMetadata(); if (crc32cEnabled) { objectMetadata.setHeader(Constants.CRC32C_REQ_HEADER, Constants.CRC32C_REQ_HEADER_VAL); } + CopyObjectRequest copyObjectRequest = - new CopyObjectRequest(bucketName, srcKey, bucketName, - dstKey); - // get the storage class of the source file + new CopyObjectRequest(bucketName, srcKey, bucketName, dstKey); FileMetadata sourceFileMetadata = this.retrieveMetadata(srcKey); if (null != sourceFileMetadata.getStorageClass()) { copyObjectRequest.setStorageClass(sourceFileMetadata.getStorageClass()); } copyObjectRequest.setNewObjectMetadata(objectMetadata); this.setEncryptionMetadata(copyObjectRequest, objectMetadata); - if (null != this.customerDomainEndpointResolver) { if (null != this.customerDomainEndpointResolver.getEndpoint()) { copyObjectRequest.setSourceEndpointBuilder(this.customerDomainEndpointResolver); } } callCOSClientWithRetry(copyObjectRequest); - DeleteObjectRequest deleteObjectRequest = - new DeleteObjectRequest(bucketName, srcKey); - callCOSClientWithRetry(deleteObjectRequest); } catch (Exception e) { String errMsg = String.format( - "Rename object failed, source cos key: %s, dest cos key: %s, " + - "exception: %s", srcKey, - dstKey, e.toString()); + "Copy the object failed, src cos key: %s, dst cos key: %s, " + + "exception: %s", srcKey, dstKey, e.toString()); handleException(new Exception(errMsg), srcKey); } } + /** + * rename operation + * @param srcKey src cos key + * @param dstKey dst cos key + * @throws IOException + */ @Override - public void copy(String srcKey, String dstKey) throws IOException { + public void rename(String srcKey, String dstKey) throws IOException { + if (!isMergeBucket) { + normalBucketRename(srcKey, dstKey); + } else { + mergeBucketRename(srcKey, dstKey); + } + } + + public void normalBucketRename(String srcKey, String dstKey) throws IOException { + LOG.debug("Rename normal bucket key, the source cos key [{}] to the dest cos key [{}].", srcKey, dstKey); try { ObjectMetadata objectMetadata = new ObjectMetadata(); if (crc32cEnabled) { objectMetadata.setHeader(Constants.CRC32C_REQ_HEADER, Constants.CRC32C_REQ_HEADER_VAL); } - CopyObjectRequest copyObjectRequest = new CopyObjectRequest(bucketName, srcKey, bucketName, dstKey); + // get the storage class of the source file FileMetadata sourceFileMetadata = this.retrieveMetadata(srcKey); if (null != sourceFileMetadata.getStorageClass()) { copyObjectRequest.setStorageClass(sourceFileMetadata.getStorageClass()); } copyObjectRequest.setNewObjectMetadata(objectMetadata); this.setEncryptionMetadata(copyObjectRequest, objectMetadata); + if (null != this.customerDomainEndpointResolver) { if (null != this.customerDomainEndpointResolver.getEndpoint()) { copyObjectRequest.setSourceEndpointBuilder(this.customerDomainEndpointResolver); } } callCOSClientWithRetry(copyObjectRequest); + DeleteObjectRequest deleteObjectRequest = + new DeleteObjectRequest(bucketName, srcKey); + callCOSClientWithRetry(deleteObjectRequest); } catch (Exception e) { String errMsg = String.format( - "Copy the object failed, src cos key: %s, dst cos key: %s, " + + "Rename object failed, normal bucket, source cos key: %s, dest cos key: %s, " + "exception: %s", srcKey, dstKey, e.toString()); handleException(new Exception(errMsg), srcKey); } } + public void mergeBucketRename(String srcKey, String dstKey) throws IOException { + LOG.debug("Rename merge bucket key, the source cos key [{}] to the dest cos key [{}].", srcKey, dstKey); + try { + RenameRequest renameRequest = new RenameRequest(bucketName, srcKey, dstKey); + callCOSClientWithRetry(renameRequest); + } catch (Exception e) { + String errMsg = String.format( + "Rename object failed, merge bucket, source cos key: %s, dest cos key: %s, " + + "exception: %s", srcKey, + dstKey, e.toString()); + handleException(new Exception(errMsg), srcKey); + } + } + @Override public void purge(String prefix) throws IOException { throw new IOException("purge not supported"); @@ -1100,7 +1168,6 @@ private void callCOSClientWithSSEKMS(X request, SSECOSKeyManagementParams ma LOG.error(errMsg); } } - private void callCOSClientWithSSECOS(X request, ObjectMetadata objectMetadata) { try { objectMetadata.setServerSideEncryption(SSEAlgorithm.AES256.getAlgorithm()); @@ -1214,6 +1281,9 @@ private void checkEncryptionMethod(ClientConfig config, } } + // merge bucket mkdir if the middle part exist will return the 500 error, + // and the rename if the dst exist will return the 500 status too, + // which make the relate 5** retry useless. must to improve the resp info to filter. private Object callCOSClientWithRetry(X request) throws CosServiceException, IOException { String sdkMethod = ""; int retryIndex = 1; @@ -1233,6 +1303,13 @@ private Object callCOSClientWithRetry(X request) throws CosServiceException, .mark((int) ((UploadPartRequest) request).getPartSize()); } return this.cosClient.uploadPart((UploadPartRequest) request); + } else if (request instanceof HeadBucketRequest) { // use for checking bucket type + sdkMethod = "headBucket"; + return this.cosClient.headBucket((HeadBucketRequest) request); + } else if (request instanceof RenameRequest) { + sdkMethod = "rename"; + this.cosClient.rename((RenameRequest) request); + return new Object(); } else if (request instanceof GetObjectMetadataRequest) { sdkMethod = "queryObjectMeta"; return this.cosClient.getObjectMetadata((GetObjectMetadataRequest) request); @@ -1344,7 +1421,6 @@ private Object callCOSClientWithRetry(X request) throws CosServiceException, private static String ensureValidAttributeName(String attributeName) { return attributeName.replace('.', '-').toLowerCase(); } - private String getPluginVersionInfo() { Properties versionProperties = new Properties(); InputStream inputStream= null; @@ -1365,5 +1441,4 @@ private String getPluginVersionInfo() { } return versionStr; } - } diff --git a/src/main/java/org/apache/hadoop/fs/NativeFileSystemStore.java b/src/main/java/org/apache/hadoop/fs/NativeFileSystemStore.java index beb6b51e..5a32300e 100644 --- a/src/main/java/org/apache/hadoop/fs/NativeFileSystemStore.java +++ b/src/main/java/org/apache/hadoop/fs/NativeFileSystemStore.java @@ -1,6 +1,7 @@ package org.apache.hadoop.fs; import com.qcloud.cos.model.CompleteMultipartUploadResult; +import com.qcloud.cos.model.HeadBucketResult; import com.qcloud.cos.model.PartETag; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -23,6 +24,11 @@ public interface NativeFileSystemStore { void initialize(URI uri, Configuration conf) throws IOException; + void setMergeBucket(boolean isMergeBucket); + + HeadBucketResult headBucket(String bucketName) throws IOException; + + void storeFile(String key, File file, byte[] md5Hash) throws IOException; void storeFile(String key, InputStream inputStream, byte[] md5Hash, @@ -85,7 +91,9 @@ CosNPartialListing list(String prefix, int maxListingLength, void delete(String key) throws IOException; + void deleteRecursive(String key) throws IOException; void copy(String srcKey, String dstKey) throws IOException; + void rename(String srcKey, String dstKey) throws IOException; /** * Delete all keys with the given prefix. Used for testing. diff --git a/src/main/java/org/apache/hadoop/fs/auth/DLFInstanceCredentialsProvider.java b/src/main/java/org/apache/hadoop/fs/auth/DLFInstanceCredentialsProvider.java index 2bea36e4..124deb80 100644 --- a/src/main/java/org/apache/hadoop/fs/auth/DLFInstanceCredentialsProvider.java +++ b/src/main/java/org/apache/hadoop/fs/auth/DLFInstanceCredentialsProvider.java @@ -17,50 +17,50 @@ import java.util.Map; public class DLFInstanceCredentialsProvider extends AbstractCOSCredentialProvider implements COSCredentialsProvider { - private static final Logger LOG = LoggerFactory.getLogger(DLFInstanceCredentialsProvider.class); - private final COSCredentialsProvider cosCredentialsProvider; - private static final String UIN = "Uin"; - private static final String REQUEST_ID = "RequestId"; - private static final String TYPE = "Type"; + private static final Logger LOG = LoggerFactory.getLogger(DLFInstanceCredentialsProvider.class); + private final COSCredentialsProvider cosCredentialsProvider; + private static final String UIN = "Uin"; + private static final String REQUEST_ID = "RequestId"; + private static final String TYPE = "Type"; - private String url; - private String path; - private String uin; - private String requestId; + private String url; + private String path; + private String uin; + private String requestId; - public DLFInstanceCredentialsProvider (@Nullable URI uri, Configuration conf) { - super(uri, conf); - if (null != conf) { - this.url = conf.get(CosNConfigKeys.COS_REMOTE_CREDENTIAL_PROVIDER_URL); - this.path = conf.get(CosNConfigKeys.COS_REMOTE_CREDENTIAL_PROVIDER_PATH); - this.uin = conf.get(CosNConfigKeys.COSN_UIN_KEY); - this.requestId = conf.get(CosNConfigKeys.COSN_REQUEST_ID); + public DLFInstanceCredentialsProvider (@Nullable URI uri, Configuration conf) { + super(uri, conf); + if (null != conf) { + this.url = conf.get(CosNConfigKeys.COS_REMOTE_CREDENTIAL_PROVIDER_URL); + this.path = conf.get(CosNConfigKeys.COS_REMOTE_CREDENTIAL_PROVIDER_PATH); + this.uin = conf.get(CosNConfigKeys.COSN_UIN_KEY); + this.requestId = conf.get(CosNConfigKeys.COSN_REQUEST_ID); - } + } - if (uin == null || requestId == null) { - throw new IllegalArgumentException("uin and request id must be exist"); - } + if (uin == null || requestId == null) { + throw new IllegalArgumentException("uin and request id must be exist"); + } - Map header = ImmutableMap.of(UIN, uin, REQUEST_ID, requestId, TYPE, "DLF"); + Map header = ImmutableMap.of(UIN, uin, REQUEST_ID, requestId, TYPE, "DLF"); - HttpCredentialsEndpointProvider endpointProvider = new HttpCredentialsEndpointProvider(url, path, header); - InstanceCredentialsFetcher instanceCredentialsFetcher = new InstanceCredentialsFetcher(endpointProvider); - this.cosCredentialsProvider = new InstanceCredentialsProvider(instanceCredentialsFetcher); - } - @Override - public COSCredentials getCredentials() { - try { - return this.cosCredentialsProvider.getCredentials(); - } catch (CosClientException e) { - LOG.error("Failed to obtain the credentials from DLFInstanceCredentialsProvider.", e); + HttpCredentialsEndpointProvider endpointProvider = new HttpCredentialsEndpointProvider(url, path, header); + InstanceCredentialsFetcher instanceCredentialsFetcher = new InstanceCredentialsFetcher(endpointProvider); + this.cosCredentialsProvider = new InstanceCredentialsProvider(instanceCredentialsFetcher); } + @Override + public COSCredentials getCredentials() { + try { + return this.cosCredentialsProvider.getCredentials(); + } catch (CosClientException e) { + LOG.error("Failed to obtain the credentials from DLFInstanceCredentialsProvider.", e); + } - return null; - } + return null; + } - @Override - public void refresh() { - this.cosCredentialsProvider.refresh(); - } + @Override + public void refresh() { + this.cosCredentialsProvider.refresh(); + } } diff --git a/src/main/java/org/apache/hadoop/fs/auth/EMRInstanceCredentialsProvider.java b/src/main/java/org/apache/hadoop/fs/auth/EMRInstanceCredentialsProvider.java index 403b8e60..51ee16af 100644 --- a/src/main/java/org/apache/hadoop/fs/auth/EMRInstanceCredentialsProvider.java +++ b/src/main/java/org/apache/hadoop/fs/auth/EMRInstanceCredentialsProvider.java @@ -46,7 +46,9 @@ public COSCredentials getCredentials() { } return cosCredentials; } catch (CosClientException e) { - LOG.error("Failed to obtain the credentials from CVMInstanceCredentialsProvider.", e); + LOG.error("Failed to obtain the credentials from EMRInstanceCredentialsProvider.", e); + } catch (Exception e) { + LOG.error("getCredentials failed", e); } return null; diff --git a/src/main/java/org/apache/hadoop/fs/auth/RangerCredentialsProvider.java b/src/main/java/org/apache/hadoop/fs/auth/RangerCredentialsProvider.java index 8fa1a94f..8eaad8e5 100644 --- a/src/main/java/org/apache/hadoop/fs/auth/RangerCredentialsProvider.java +++ b/src/main/java/org/apache/hadoop/fs/auth/RangerCredentialsProvider.java @@ -6,6 +6,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CosFileSystem; import org.apache.hadoop.fs.CosNConfigKeys; +import org.apache.hadoop.fs.CosNUtils; import org.apache.hadoop.fs.cosn.ranger.security.sts.GetSTSResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -13,91 +14,94 @@ import javax.annotation.Nullable; import java.io.IOException; import java.net.URI; -import java.util.Date; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; public class RangerCredentialsProvider extends AbstractCOSCredentialProvider implements COSCredentialsProvider { private static final Logger log = LoggerFactory.getLogger(RangerCredentialsProvider.class); - private Thread credentialsFetcherDaemonThread; - private CredentialsFetcherDaemon credentialsFetcherDaemon; + private RangerCredentialsFetcher rangerCredentialsFetcher; private final String bucketName; private String bucketRegion; + private String appId; + public RangerCredentialsProvider(@Nullable URI uri, Configuration conf) { super(uri, conf); - this.bucketName = uri.getHost(); + if (null != conf) { + this.appId = conf.get(CosNConfigKeys.COSN_APPID_KEY); + } + this.bucketName = CosNUtils.formatBucket(uri.getHost(), conf); this.bucketRegion = conf.get(CosNConfigKeys.COSN_REGION_KEY); if (this.bucketRegion == null || this.bucketRegion.isEmpty()) { this.bucketRegion = conf.get(CosNConfigKeys.COSN_REGION_PREV_KEY); } - credentialsFetcherDaemon = new CredentialsFetcherDaemon( + rangerCredentialsFetcher = new RangerCredentialsFetcher( conf.getInt( CosNConfigKeys.COSN_RANGER_TEMP_TOKEN_REFRESH_INTERVAL, CosNConfigKeys.DEFAULT_COSN_RANGER_TEMP_TOKEN_REFRESH_INTERVAL)); - credentialsFetcherDaemonThread = new Thread(credentialsFetcherDaemon); - credentialsFetcherDaemonThread.setDaemon(true); - credentialsFetcherDaemonThread.start(); } - class CredentialsFetcherDaemon implements Runnable { + class RangerCredentialsFetcher { private int refreshIntervalSeconds; private AtomicReference lastCredentialsRef; - private Date lastFreshDate; + private AtomicLong lastGetCredentialsTimeStamp; - CredentialsFetcherDaemon(int refreshIntervalSeconds) { + RangerCredentialsFetcher(int refreshIntervalSeconds) { this.refreshIntervalSeconds = refreshIntervalSeconds; this.lastCredentialsRef = new AtomicReference<>(); + this.lastGetCredentialsTimeStamp = new AtomicLong(); } COSCredentials getCredentials() { - COSCredentials lastCred = lastCredentialsRef.get(); - if (lastCred == null) { - return fetchCredentials(); + if (needSyncFetchNewCredentials()) { + synchronized (this) { + if (needSyncFetchNewCredentials()) { + return fetchNewCredentials(); + } + } } - return lastCred; + return lastCredentialsRef.get(); } - private COSCredentials fetchCredentials() { + private boolean needSyncFetchNewCredentials() { + if (lastCredentialsRef.get() == null) { + return true; + } + long currentSec = System.currentTimeMillis() / 1000; + return currentSec - lastGetCredentialsTimeStamp.get() > this.refreshIntervalSeconds; + } + + private COSCredentials fetchNewCredentials() { try { GetSTSResponse stsResp = CosFileSystem.rangerQcloudObjectStorageStorageClient.getSTS(bucketRegion, bucketName); - return new BasicSessionCredentials(stsResp.getTempAK(), stsResp.getTempSK(), stsResp.getTempToken()); - } catch (IOException e) { - log.error("fetch credentials failed", e); - return null; - } - } - @Override - public void run() { - while (true) { - Date currentDate = new Date(); - if (lastFreshDate == null || (currentDate.getTime() / 1000 - lastFreshDate.getTime() / 1000) - < this.refreshIntervalSeconds) { - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - } - continue; + COSCredentials cosCredentials = null; + if (appId != null) { + cosCredentials = new BasicSessionCredentials(appId, stsResp.getTempAK(), stsResp.getTempSK(), + stsResp.getTempToken()); + } else { + cosCredentials = new BasicSessionCredentials(stsResp.getTempAK(), stsResp.getTempSK(), + stsResp.getTempToken()); } - COSCredentials newCred = fetchCredentials(); - if (newCred != null) { - this.lastFreshDate = currentDate; - this.lastCredentialsRef.set(newCred); - } + this.lastCredentialsRef.set(cosCredentials); + this.lastGetCredentialsTimeStamp.set(System.currentTimeMillis() / 1000); + return cosCredentials; + } catch (IOException e) { + log.error("fetch credentials failed", e); + return null; } } } @Override public COSCredentials getCredentials() { - return credentialsFetcherDaemon.getCredentials(); + return rangerCredentialsFetcher.getCredentials(); } @Override public void refresh() { - } } diff --git a/src/main/java/org/apache/hadoop/fs/cosn/BufferInputStream.java b/src/main/java/org/apache/hadoop/fs/cosn/BufferInputStream.java index 8f031c9d..e15c23c5 100644 --- a/src/main/java/org/apache/hadoop/fs/cosn/BufferInputStream.java +++ b/src/main/java/org/apache/hadoop/fs/cosn/BufferInputStream.java @@ -4,7 +4,6 @@ import java.io.InputStream; import java.nio.ByteBuffer; import java.nio.InvalidMarkException; - import org.apache.hadoop.fs.cosn.buffer.CosNByteBuffer; public class BufferInputStream extends InputStream { diff --git a/src/main/java/org/apache/hadoop/fs/cosn/BufferPool.java b/src/main/java/org/apache/hadoop/fs/cosn/BufferPool.java index f5cf8b70..e68686cc 100644 --- a/src/main/java/org/apache/hadoop/fs/cosn/BufferPool.java +++ b/src/main/java/org/apache/hadoop/fs/cosn/BufferPool.java @@ -1,8 +1,8 @@ package org.apache.hadoop.fs.cosn; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CosNConfigKeys; import org.apache.hadoop.fs.cosn.buffer.*; +import org.apache.hadoop.fs.CosNConfigKeys; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/src/main/java/org/apache/hadoop/fs/cosn/buffer/CosNByteBuffer.java b/src/main/java/org/apache/hadoop/fs/cosn/buffer/CosNByteBuffer.java index 8034a369..a89a1ff5 100644 --- a/src/main/java/org/apache/hadoop/fs/cosn/buffer/CosNByteBuffer.java +++ b/src/main/java/org/apache/hadoop/fs/cosn/buffer/CosNByteBuffer.java @@ -2,13 +2,14 @@ import java.io.Closeable; import java.io.IOException; +import java.lang.reflect.Method; import java.nio.ByteBuffer; import java.nio.MappedByteBuffer; -import org.apache.hadoop.io.nativeio.NativeIO; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import sun.nio.ch.DirectBuffer; +import sun.nio.ch.FileChannelImpl; /** * The base class for all CosN byte buffers. @@ -40,7 +41,19 @@ public void close() throws IOException { this.byteBuffer.clear(); if (this.isMapped()) { - NativeIO.POSIX.munmap((MappedByteBuffer) this.byteBuffer); + Method method = null; + try { + method = FileChannelImpl.class.getDeclaredMethod( + "unmap", + MappedByteBuffer.class); + method.setAccessible(true); + method.invoke( + FileChannelImpl.class, + (MappedByteBuffer)this.byteBuffer); + } catch (Exception e) { + LOG.error("failed to call reflect unmap", e); + throw new IOException("failed to call reflect unmap", e); + } } else if (this.byteBuffer.isDirect()) { ((DirectBuffer) this.byteBuffer).cleaner().clean(); } diff --git a/src/main/java/org/apache/hadoop/fs/cosn/buffer/CosNDirectBuffer.java b/src/main/java/org/apache/hadoop/fs/cosn/buffer/CosNDirectBuffer.java index ef2c8b54..7421f803 100644 --- a/src/main/java/org/apache/hadoop/fs/cosn/buffer/CosNDirectBuffer.java +++ b/src/main/java/org/apache/hadoop/fs/cosn/buffer/CosNDirectBuffer.java @@ -1,7 +1,5 @@ package org.apache.hadoop.fs.cosn.buffer; -import org.apache.hadoop.fs.cosn.buffer.CosNByteBuffer; - import java.nio.ByteBuffer; /** diff --git a/src/main/java/org/apache/hadoop/fs/cosn/buffer/CosNMappedBuffer.java b/src/main/java/org/apache/hadoop/fs/cosn/buffer/CosNMappedBuffer.java index c4f4ad28..a5132b4b 100644 --- a/src/main/java/org/apache/hadoop/fs/cosn/buffer/CosNMappedBuffer.java +++ b/src/main/java/org/apache/hadoop/fs/cosn/buffer/CosNMappedBuffer.java @@ -6,7 +6,6 @@ import java.nio.ByteBuffer; import java.nio.MappedByteBuffer; -import org.apache.hadoop.fs.cosn.buffer.CosNByteBuffer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/src/main/java/org/apache/hadoop/fs/cosn/buffer/CosNNonDirectBuffer.java b/src/main/java/org/apache/hadoop/fs/cosn/buffer/CosNNonDirectBuffer.java index 28ce57f3..16d395f5 100644 --- a/src/main/java/org/apache/hadoop/fs/cosn/buffer/CosNNonDirectBuffer.java +++ b/src/main/java/org/apache/hadoop/fs/cosn/buffer/CosNNonDirectBuffer.java @@ -1,7 +1,5 @@ package org.apache.hadoop.fs.cosn.buffer; -import org.apache.hadoop.fs.cosn.buffer.CosNByteBuffer; - import java.nio.ByteBuffer; /** diff --git a/src/main/java/org/apache/hadoop/fs/cosn/ranger/client/RangerQcloudObjectStorageClient.java b/src/main/java/org/apache/hadoop/fs/cosn/ranger/client/RangerQcloudObjectStorageClient.java deleted file mode 100644 index 8e8cc96f..00000000 --- a/src/main/java/org/apache/hadoop/fs/cosn/ranger/client/RangerQcloudObjectStorageClient.java +++ /dev/null @@ -1,42 +0,0 @@ -package org.apache.hadoop.fs.cosn.ranger.client; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.cosn.ranger.security.authorization.PermissionRequest; -import org.apache.hadoop.fs.cosn.ranger.security.sts.GetSTSResponse; -import org.apache.hadoop.security.token.Token; - -import java.io.IOException; - -public interface RangerQcloudObjectStorageClient { - public void init(Configuration conf) throws IOException; - - public String getCanonicalServiceName(); - - public boolean checkPermission(PermissionRequest permissionRequest) throws IOException; - - public Token getDelegationToken(String renewer) throws IOException; - - /** - * Renew the given token. - * - * @return the new expiration time - * @throws IOException - * @throws InterruptedException - */ - public long renew(Token token, - Configuration conf - ) throws IOException, InterruptedException; - - /** - * Cancel the given token - * - * @throws IOException - * @throws InterruptedException - */ - - public void cancel(Token token, Configuration configuration) throws IOException, InterruptedException; - - public GetSTSResponse getSTS(String bucketRegion, String bucketName) throws IOException; - - public void close(); -} diff --git a/src/main/java/org/apache/hadoop/fs/cosn/ranger/security/authorization/AccessType.java b/src/main/java/org/apache/hadoop/fs/cosn/ranger/security/authorization/AccessType.java deleted file mode 100644 index c7ea6d9a..00000000 --- a/src/main/java/org/apache/hadoop/fs/cosn/ranger/security/authorization/AccessType.java +++ /dev/null @@ -1,8 +0,0 @@ -package org.apache.hadoop.fs.cosn.ranger.security.authorization; - -public enum AccessType { - LIST, - WRITE, - READ, - DELETE, -} diff --git a/src/main/java/org/apache/hadoop/fs/cosn/ranger/security/authorization/PermissionRequest.java b/src/main/java/org/apache/hadoop/fs/cosn/ranger/security/authorization/PermissionRequest.java deleted file mode 100644 index b4ec1706..00000000 --- a/src/main/java/org/apache/hadoop/fs/cosn/ranger/security/authorization/PermissionRequest.java +++ /dev/null @@ -1,46 +0,0 @@ -package org.apache.hadoop.fs.cosn.ranger.security.authorization; - -public class PermissionRequest { - private ServiceType serviceType; - private AccessType accessType; - - private String bucketName; - private String objectKey; - - private String fsMountPoint; - private String chdfsPath; - - public PermissionRequest(ServiceType serviceType, AccessType accessType, - String bucketName, String objectKey, String fsMountPoint, String chdfsPath) { - this.serviceType = serviceType; - this.accessType = accessType; - this.bucketName = bucketName; - this.objectKey = objectKey; - this.fsMountPoint = fsMountPoint; - this.chdfsPath = chdfsPath; - } - - public ServiceType getServiceType() { - return serviceType; - } - - public AccessType getAccessType() { - return accessType; - } - - public String getBucketName() { - return bucketName; - } - - public String getObjectKey() { - return objectKey; - } - - public String getFsMountPoint() { - return fsMountPoint; - } - - public String getChdfsPath() { - return chdfsPath; - } -} diff --git a/src/main/java/org/apache/hadoop/fs/cosn/ranger/security/authorization/ServiceType.java b/src/main/java/org/apache/hadoop/fs/cosn/ranger/security/authorization/ServiceType.java deleted file mode 100644 index d50f5bad..00000000 --- a/src/main/java/org/apache/hadoop/fs/cosn/ranger/security/authorization/ServiceType.java +++ /dev/null @@ -1,6 +0,0 @@ -package org.apache.hadoop.fs.cosn.ranger.security.authorization; - -public enum ServiceType { - COS, - CHDFS, -} diff --git a/src/main/java/org/apache/hadoop/fs/cosn/ranger/security/sts/GetSTSRequest.java b/src/main/java/org/apache/hadoop/fs/cosn/ranger/security/sts/GetSTSRequest.java deleted file mode 100644 index 2dd8414a..00000000 --- a/src/main/java/org/apache/hadoop/fs/cosn/ranger/security/sts/GetSTSRequest.java +++ /dev/null @@ -1,31 +0,0 @@ -package org.apache.hadoop.fs.cosn.ranger.security.sts; - -public class GetSTSRequest { - private String bucketName; - private String bucketRegion; - private String allowPrefix; - - public String getBucketName() { - return bucketName; - } - - public void setBucketName(String bucketName) { - this.bucketName = bucketName; - } - - public String getBucketRegion() { - return bucketRegion; - } - - public void setBucketRegion(String bucketRegion) { - this.bucketRegion = bucketRegion; - } - - public String getAllowPrefix() { - return allowPrefix; - } - - public void setAllowPrefix(String allowPrefix) { - this.allowPrefix = allowPrefix; - } -} \ No newline at end of file diff --git a/src/main/java/org/apache/hadoop/fs/cosn/ranger/security/sts/GetSTSResponse.java b/src/main/java/org/apache/hadoop/fs/cosn/ranger/security/sts/GetSTSResponse.java deleted file mode 100644 index 80b43ac3..00000000 --- a/src/main/java/org/apache/hadoop/fs/cosn/ranger/security/sts/GetSTSResponse.java +++ /dev/null @@ -1,31 +0,0 @@ -package org.apache.hadoop.fs.cosn.ranger.security.sts; - -public class GetSTSResponse { - private String tempAK; - private String tempSK; - private String tempToken; - - public String getTempAK() { - return tempAK; - } - - public void setTempAK(String tempAK) { - this.tempAK = tempAK; - } - - public String getTempSK() { - return tempSK; - } - - public void setTempSK(String tempSK) { - this.tempSK = tempSK; - } - - public String getTempToken() { - return tempToken; - } - - public void setTempToken(String tempToken) { - this.tempToken = tempToken; - } -} \ No newline at end of file