From faac162d938c454f16d6a933e7830d681f321c4c Mon Sep 17 00:00:00 2001 From: iainyu Date: Thu, 10 Sep 2020 21:57:29 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=94=AF=E6=8C=81=E6=96=87=E4=BB=B6?= =?UTF-8?q?=E7=B3=BB=E7=BB=9F=E7=9A=84xAttr?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. 支持文件系统的xAttr; 2. 优化日志输出 --- pom.xml | 2 +- .../org/apache/hadoop/fs/CosFileSystem.java | 363 ++++++++++++------ .../org/apache/hadoop/fs/CosNConfigKeys.java | 4 +- .../apache/hadoop/fs/CosNCopyFileTask.java | 10 +- .../apache/hadoop/fs/CosNDeleteFileTask.java | 12 +- .../java/org/apache/hadoop/fs/CosNXAttr.java | 24 ++ .../hadoop/fs/CosNativeFileSystemStore.java | 224 +++++++++-- .../org/apache/hadoop/fs/FileMetadata.java | 15 +- .../hadoop/fs/NativeFileSystemStore.java | 8 + 9 files changed, 481 insertions(+), 181 deletions(-) create mode 100644 src/main/java/org/apache/hadoop/fs/CosNXAttr.java diff --git a/pom.xml b/pom.xml index e7352993..991a5214 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.qcloud.cos hadoop-cos - 3.1.0-5.8.4 + 3.1.0-5.8.5 jar Apache Hadoop Tencent Qcloud COS Support diff --git a/src/main/java/org/apache/hadoop/fs/CosFileSystem.java b/src/main/java/org/apache/hadoop/fs/CosFileSystem.java index 4626d401..de683d46 100644 --- a/src/main/java/org/apache/hadoop/fs/CosFileSystem.java +++ b/src/main/java/org/apache/hadoop/fs/CosFileSystem.java @@ -2,6 +2,8 @@ import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.qcloud.cos.utils.StringUtils; +import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; @@ -17,6 +19,8 @@ import java.io.IOException; import java.io.InputStream; import java.net.URI; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.util.*; import java.util.concurrent.*; @@ -36,6 +40,10 @@ public class CosFileSystem extends FileSystem { static final String PATH_DELIMITER = Path.SEPARATOR; static final int COS_MAX_LISTING_LENGTH = 999; + static final Charset METADATA_ENCODING = StandardCharsets.UTF_8; + // The length of name:value pair should be less than or equal to 1024 bytes. + static final int MAX_XATTR_SIZE = 1024; + private URI uri; String bucket; private NativeFileSystemStore store; @@ -78,9 +86,9 @@ public void initialize(URI uri, Configuration conf) throws IOException { .makeQualified(this.uri, this.getWorkingDirectory()); this.owner = getOwnerId(); this.group = getGroupId(); - if (LOG.isDebugEnabled()) { - LOG.debug("owner:" + owner + ", group:" + group); - } + LOG.debug("uri: {}, bucket: {}, working dir: {}, owner: {}, group: {}.\n" + + "configuration: {}.", + uri, bucket, workingDir, owner, group, conf); BufferPool.getInstance().initialize(getConf()); // initialize the thread pool @@ -101,8 +109,7 @@ public void initialize(URI uri, Configuration conf) throws IOException { ioThreadPoolSize / 2, ioThreadPoolSize, threadKeepAlive, TimeUnit.SECONDS, new LinkedBlockingQueue(ioThreadPoolSize * 2), - new ThreadFactoryBuilder().setNameFormat("cos-transfer-shared" + - "-%d").setDaemon(true).build(), + new ThreadFactoryBuilder().setNameFormat("cos-transfer-shared-%d").setDaemon(true).build(), new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, @@ -161,8 +168,7 @@ private static NativeFileSystemStore createDefaultStore(Configuration conf) { RetryPolicy methodPolicy = RetryPolicies.retryByException(RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap); - Map methodNameToPolicyMap = new HashMap(); + Map methodNameToPolicyMap = new HashMap(); methodNameToPolicyMap.put("storeFile", methodPolicy); methodNameToPolicyMap.put("rename", methodPolicy); @@ -252,9 +258,8 @@ public Path getHomeDirectory() { */ @Override public FSDataOutputStream append(Path f, int bufferSize, - Progressable progress) - throws IOException { - throw new IOException("Not supported"); + Progressable progress) { + throw new UnsupportedOperationException("Not supported currently"); } @Override @@ -263,14 +268,12 @@ public FSDataOutputStream create(Path f, FsPermission permission, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException { - if (exists(f) && !overwrite) { throw new FileAlreadyExistsException("File already exists: " + f); } - if (LOG.isDebugEnabled()) { - LOG.debug("Creating new file '" + f + "' in COS"); - } + LOG.debug("Creating a new file [{}] in COS.", f); + Path absolutePath = makeAbsolute(f); String key = pathToKey(absolutePath); long uploadPartSize = this.getConf().getLong( @@ -280,8 +283,7 @@ public FSDataOutputStream create(Path f, FsPermission permission, return new FSDataOutputStream( new CosFsDataOutputStream(getConf(), store, key, uploadPartSize, - this.boundedIOThreadPool, uploadChecksEnabled), - statistics); + this.boundedIOThreadPool, uploadChecksEnabled), statistics); } private boolean rejectRootDirectoryDelete(boolean isEmptyDir, @@ -299,17 +301,12 @@ private boolean rejectRootDirectoryDelete(boolean isEmptyDir, @Override public boolean delete(Path f, boolean recursive) throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("ready to delete path:" + f + ", recursive:" + recursive); - } + LOG.debug("Ready to delete path: {}. recursive: {}.", f, recursive); FileStatus status; try { status = getFileStatus(f); } catch (FileNotFoundException e) { - if (LOG.isDebugEnabled()) { - LOG.debug("Delete called for '" + f - + "' but file does not exist, so returning false"); - } + LOG.debug("Delete called for '{}', but the file does not exist and returning the false.", f); return false; } Path absolutePath = makeAbsolute(f); @@ -330,18 +327,16 @@ public boolean delete(Path f, boolean recursive) throws IOException { " false"); } - createParent(f); CosNDeleteFileContext deleteFileContext = new CosNDeleteFileContext(); int deleteToFinishes = 0; String priorLastKey = null; do { PartialListing listing = - store.list(key, COS_MAX_LISTING_LENGTH, priorLastKey, - true); + store.list(key, COS_MAX_LISTING_LENGTH, priorLastKey, true); for (FileMetadata file : listing.getFiles()) { this.boundedCopyThreadPool.execute(new CosNDeleteFileTask( - this.store, file.getKey(), deleteFileContext)); + this.store, file.getKey(), deleteFileContext)); deleteToFinishes++; if (!deleteFileContext.isDeleteSuccess()) { break; @@ -349,7 +344,7 @@ public boolean delete(Path f, boolean recursive) throws IOException { } for (FileMetadata commonPrefix : listing.getCommonPrefixes()) { this.boundedCopyThreadPool.execute(new CosNDeleteFileTask( - this.store, commonPrefix.getKey(), deleteFileContext)); + this.store, commonPrefix.getKey(), deleteFileContext)); deleteToFinishes++; if (!deleteFileContext.isDeleteSuccess()) { break; @@ -373,26 +368,24 @@ public boolean delete(Path f, boolean recursive) throws IOException { } try { + LOG.debug("Delete the cos key [{}].", key); store.delete(key); } catch (Exception e) { LOG.error("Delete the key failed."); } } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Deleting file '" + f + "'"); - } - createParent(f); + LOG.debug("Delete the cos key [{}].", key); store.delete(key); } + + createParentDirectoryIfNecessary(f); return true; } @Override public FileStatus getFileStatus(Path f) throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("getFileStatus: " + f); - } + LOG.debug("Get file status: {}.", f); Path absolutePath = makeAbsolute(f); String key = pathToKey(absolutePath); @@ -400,21 +393,13 @@ public FileStatus getFileStatus(Path f) throws IOException { return newDirectory(absolutePath); } - if (LOG.isDebugEnabled()) { - LOG.debug("getFileStatus retrieving metadata for key '" + key + - "'"); - } FileMetadata meta = store.retrieveMetadata(key); if (meta != null) { if (meta.isFile()) { - if (LOG.isDebugEnabled()) { - LOG.debug("getFileStatus returning 'file' for key '" + key + "'"); - } + LOG.debug("Retrieve the cos key [{}] to find that it is a file.", key); return newFile(meta, absolutePath); } else { - if (LOG.isDebugEnabled()) { - LOG.debug("getFileStatus returning 'dir' for key '" + key + "'"); - } + LOG.debug("Retrieve the cos key [{}] to find that it is a directory.", key); return newDirectory(meta, absolutePath); } } @@ -422,27 +407,14 @@ public FileStatus getFileStatus(Path f) throws IOException { if (!key.endsWith(PATH_DELIMITER)) { key += PATH_DELIMITER; } - - meta = store.retrieveMetadata(key); - if (null != meta) { - return newDirectory(absolutePath); - } - - if (LOG.isDebugEnabled()) { - LOG.debug("getFileStatus listing key '" + key + "'"); - } + LOG.debug("List the cos key [{}] to judge whether it is a directory or not.", key); PartialListing listing = store.list(key, 1); if (listing.getFiles().length > 0 || listing.getCommonPrefixes().length > 0) { - if (LOG.isDebugEnabled()) { - LOG.debug("getFileStatus returning 'directory' for key '" + key - + "' as it has contents"); - } + LOG.debug("List the cos key [{}] to find that it is a directory.", key); return newDirectory(absolutePath); } - if (LOG.isDebugEnabled()) { - LOG.debug("getFileStatus could not find key '" + key + "'"); - } + LOG.debug("Can not find the cos key [{}] on COS.", key); throw new FileNotFoundException("No such file or directory '" + absolutePath + "'"); } @@ -488,9 +460,10 @@ public FileStatus[] listStatus(Path f) throws IOException { priorLastKey, false); for (FileMetadata fileMetadata : listing.getFiles()) { Path subPath = keyToPath(fileMetadata.getKey()); - if (fileMetadata.getKey().equals(key)) { // this is just the directory we have been asked to list + LOG.debug("This is just the directory we have been asked to list. cos key: {}.", + fileMetadata.getKey()); } else { status.add(newFile(fileMetadata, subPath)); } @@ -524,18 +497,17 @@ private FileStatus newDirectory(FileMetadata meta, Path path) { if (meta == null) { return newDirectory(path); } - CosNFileStatus status = new CosNFileStatus(0, true, 1, 0, + return new CosNFileStatus(0, true, 1, 0, meta.getLastModified(), 0, null, this.owner, this.group, path.makeQualified(this.getUri(), this.getWorkingDirectory()), meta.getETag(), meta.getCrc64ecm(), meta.getVersionId()); - return status; } /** * Validate the path from the bottom up. * - * @param path - * @throws IOException + * @param path the absolute path to check. + * @throws IOException an IOException occurred when getting the path's metadata. */ private void validatePath(Path path) throws IOException { Path parent = path.getParent(); @@ -549,7 +521,7 @@ private void validatePath(Path path) throws IOException { "Can't make directory for path '%s', it is a file" + ".", parent)); } - } catch (FileNotFoundException e) { + } catch (FileNotFoundException ignored) { } parent = parent.getParent(); } while (parent != null); @@ -558,6 +530,7 @@ private void validatePath(Path path) throws IOException { @Override public boolean mkdirs(Path f, FsPermission permission) throws IOException { + LOG.debug("mkdirs path: {}.", f); try { FileStatus fileStatus = getFileStatus(f); if (fileStatus.isDirectory()) { @@ -567,21 +540,21 @@ public boolean mkdirs(Path f, FsPermission permission) } } catch (FileNotFoundException e) { validatePath(f); + return mkDirRecursively(f, permission); } - - return mkDirRecursively(f, permission); } /** - * Recursively create a directory. + * Create a directory recursively. * * @param f Absolute path to the directory * @param permission Directory permissions * @return Return true if the creation was successful, throw a IOException. - * @throws IOException + * @throws IOException An IOException occurred when creating a directory object on COS. */ public boolean mkDirRecursively(Path f, FsPermission permission) throws IOException { + LOG.debug("Make the directory recursively. Path: {}, FsPermission: {}.", f, permission); Path absolutePath = makeAbsolute(f); List paths = new ArrayList(); do { @@ -590,7 +563,7 @@ public boolean mkDirRecursively(Path f, FsPermission permission) } while (absolutePath != null); for (Path path : paths) { - if (path.equals(new Path(CosFileSystem.PATH_DELIMITER))) { + if (path.isRoot()) { break; } try { @@ -602,14 +575,15 @@ public boolean mkDirRecursively(Path f, FsPermission permission) " it is a file.", f)); } if (fileStatus.isDirectory()) { - break; + if (fileStatus.getModificationTime() > 0) { + break; + } else { + throw new FileNotFoundException("Dir '" + path + "' doesn't exist in COS"); + } } } catch (FileNotFoundException e) { - if (LOG.isDebugEnabled()) { - LOG.debug("Making dir '" + f + "' in COS"); - } - - String folderPath = pathToKey(makeAbsolute(f)); + LOG.debug("Make the directory [{}] on COS.", path); + String folderPath = pathToKey(makeAbsolute(path)); if (!folderPath.endsWith(PATH_DELIMITER)) { folderPath += PATH_DELIMITER; } @@ -619,50 +593,25 @@ public boolean mkDirRecursively(Path f, FsPermission permission) return true; } - private boolean mkdir(Path f) throws IOException { - LOG.debug("mkdir " + f); - try { - FileStatus fileStatus = getFileStatus(f); - if (fileStatus.isFile()) { - throw new FileAlreadyExistsException( - String.format( - "Can't make directory for path '%s' since it " + - "is a file.", f)); - } - } catch (FileNotFoundException e) { - if (LOG.isDebugEnabled()) { - LOG.debug("Making dir '" + f + "' in COS"); - } - - String folderPath = pathToKey(makeAbsolute(f)); - if (!folderPath.endsWith(PATH_DELIMITER)) { - folderPath += PATH_DELIMITER; - } - store.storeEmptyFile(folderPath); - } - return true; - } - @Override public FSDataInputStream open(Path f, int bufferSize) throws IOException { - FileStatus fs = getFileStatus(f); // will throw if the file doesn't + FileStatus fileStatus = getFileStatus(f); // will throw if the file doesn't // exist - if (fs.isDirectory()) { + if (fileStatus.isDirectory()) { throw new FileNotFoundException("'" + f + "' is a directory"); } LOG.info("Opening '" + f + "' for reading"); Path absolutePath = makeAbsolute(f); String key = pathToKey(absolutePath); - long fileSize = store.getFileLength(key); return new FSDataInputStream(new BufferedFSInputStream( new CosFsInputStream(this.getConf(), store, statistics, key, - fileSize, this.boundedIOThreadPool), + fileStatus.getLen(), this.boundedIOThreadPool), bufferSize)); } @Override public boolean rename(Path src, Path dst) throws IOException { - LOG.debug("input rename: src:" + src + " , dst:" + dst); + LOG.debug("Rename the source path [{}] to the dest path [{}].", src, dst); // Renaming the root directory is not allowed if (src.isRoot()) { @@ -675,28 +624,27 @@ public boolean rename(Path src, Path dst) throws IOException { try { srcFileStatus = this.getFileStatus(src); } catch (FileNotFoundException e) { - LOG.debug(e.getMessage()); + LOG.debug("The source path [{}] is not exist.", src); return false; } // Source path and destination path are not allowed to be the same if (src.equals(dst)) { - LOG.debug("source path and dest path refer to the same file or " + + LOG.debug("The source path and the dest path refer to the same file or " + "directory: {}", dst); - throw new IOException("source path and dest path refer to the " + + throw new IOException("the source path and dest path refer to the " + "same file or directory"); } // It is not allowed to rename a parent directory to its subdirectory Path dstParentPath; - for (dstParentPath = dst.getParent(); - null != dstParentPath && !src.equals(dstParentPath); - dstParentPath = dstParentPath.getParent()) { + dstParentPath = dst.getParent(); + while (null != dstParentPath && !src.equals(dstParentPath)) { + dstParentPath = dstParentPath.getParent(); } if (null != dstParentPath) { LOG.debug("It is not allowed to rename a parent directory:{} to " + - "its subdirectory:{}.", - src, dst); + "its subdirectory:{}.", src, dst); throw new IOException(String.format( "It is not allowed to rename a parent directory:%s to its" + " subdirectory:%s", @@ -789,7 +737,12 @@ private boolean copyDirectory(Path srcPath, Path dstPath) throws IOException { " of self"); } - this.store.storeEmptyFile(dstKey); + if (this.store.retrieveMetadata(srcKey) == null) { + this.store.storeEmptyFile(srcKey); + } else { + this.store.copy(srcKey, dstKey); + } + CosNCopyFileContext copyFileContext = new CosNCopyFileContext(); int copiesToFinishes = 0; @@ -822,20 +775,16 @@ private boolean copyDirectory(Path srcPath, Path dstPath) throws IOException { return copyFileContext.isCopySuccess(); } - private void createParent(Path path) throws IOException { + private void createParentDirectoryIfNecessary(Path path) throws IOException { Path parent = path.getParent(); - if (parent != null) { + if (null != parent && !parent.isRoot()) { String parentKey = pathToKey(parent); - LOG.debug("createParent parentKey:" + parentKey); - if (!parentKey.equals(PATH_DELIMITER)) { - String key = pathToKey(makeAbsolute(parent)); - if (key.length() > 0) { - try { - store.storeEmptyFile(key + PATH_DELIMITER); - } catch (Exception e) { - LOG.debug("storeEmptyFile exception: " + e.toString()); - } + if (!StringUtils.isNullOrEmpty(parentKey) && !exists(parent)) { + LOG.debug("Create a parent directory [{}] for the path [{}].", parent, path); + if (!parentKey.endsWith("/")) { + parentKey += "/"; } + store.storeEmptyFile(parentKey); } } } @@ -869,7 +818,7 @@ public String getCanonicalServiceName() { @Override public FileChecksum getFileChecksum(Path f, long length) throws IOException { Preconditions.checkArgument(length >= 0); - LOG.debug("call the checksum for the path: {}.", f); + LOG.debug("Call the checksum for the path: {}.", f); if (this.getConf().getBoolean(CosNConfigKeys.CRC64_CHECKSUM_ENABLED, CosNConfigKeys.DEFAULT_CRC64_CHECKSUM_ENABLED)) { @@ -884,6 +833,164 @@ public FileChecksum getFileChecksum(Path f, long length) throws IOException { } } + /** + * Set the value of an attribute for a path + * + * @param f The path on which to set the attribute + * @param name The attribute to set + * @param value The byte value of the attribute to set (encoded in utf-8) + * @param flag The mode in which to set the attribute + * @throws IOException If there was an issue setting the attributing on COS + */ + @Override + public void setXAttr(Path f, String name, byte[] value, EnumSet flag) throws IOException { + LOG.debug("set XAttr: {}.", f); + + // First, determine whether the length of the name and value exceeds the limit. + if (name.getBytes(METADATA_ENCODING).length + value.length > MAX_XATTR_SIZE) { + throw new HadoopIllegalArgumentException("The maximum combined size of " + + "the name and value of an extended attribute in bytes should be less than or equal to 32768."); + } + + Path absolutePath = makeAbsolute(f); + + String key = pathToKey(absolutePath); + FileMetadata fileMetadata = store.retrieveMetadata(key); + if (null == fileMetadata) { + throw new FileNotFoundException("File or directory doesn't exist: " + f); + } + boolean xAttrExists = (null != fileMetadata.getUserAttributes() + && fileMetadata.getUserAttributes().containsKey(name)); + XAttrSetFlag.validate(name, xAttrExists, flag); + if (fileMetadata.isFile()) { + store.storeFileAttribute(key, name, value); + } else { + store.storeDirAttribute(key, name, value); + } + } + + /** + * get the value of an attribute for a path + * + * @param f The path on which to set the attribute + * @param name The attribute to set + * @return The byte value of the attribute to set (encoded in utf-8) + * @throws IOException If there was an issue setting the attribute on COS + */ + @Override + public byte[] getXAttr(Path f, String name) throws IOException { + LOG.debug("get XAttr: {}.", f); + + Path absolutePath = makeAbsolute(f); + + String key = pathToKey(absolutePath); + FileMetadata fileMetadata = store.retrieveMetadata(key); + if (null == fileMetadata) { + throw new FileNotFoundException("File or directory doesn't exist: " + f); + } + + if (null != fileMetadata.getUserAttributes()) { + return fileMetadata.getUserAttributes().get(name); + } + + return null; + } + + /** + * Get all of the xattrs name/value pairs for a cosn file or directory. + * + * @param f Path to get extended attributes + * @param names XAttr names. + * @return Map describing the XAttrs of the file or directory + * @throws IOException If there was an issue gettting the attribute on COS + */ + @Override + public Map getXAttrs(Path f, List names) throws IOException { + LOG.debug("get XAttrs: {}.", f); + + Path absolutePath = makeAbsolute(f); + + String key = pathToKey(absolutePath); + FileMetadata fileMetadata = store.retrieveMetadata(key); + if (null == fileMetadata) { + throw new FileNotFoundException("File or directory doesn't exist: " + f); + } + + Map attrs = null; + if (null != fileMetadata.getUserAttributes()) { + attrs = new HashMap<>(); + for (String name : names) { + if (fileMetadata.getUserAttributes().containsKey(name)) { + attrs.put(name, fileMetadata.getUserAttributes().get(name)); + } + } + } + + return attrs; + } + + /** + * Removes an xattr of a cosn file or directory. + * + * @param f Path to remove extended attribute + * @param name xattr name + * @throws IOException If there was an issue setting the attribute on COS + */ + @Override + public void removeXAttr(Path f, String name) throws IOException { + LOG.debug("remove XAttr: {}.", f); + + Path absolutPath = makeAbsolute(f); + + String key = pathToKey(absolutPath); + FileMetadata fileMetadata = store.retrieveMetadata(key); + if (null == fileMetadata) { + throw new FileNotFoundException("File or directory doesn't exist: " + f); + } + + boolean xAttrExists = (null != fileMetadata.getUserAttributes() + && fileMetadata.getUserAttributes().containsKey(name)); + if (xAttrExists) { + if (fileMetadata.isFile()) { + store.removeFileAttribute(key, name); + } else { + store.removeDirAttribute(key, name); + } + } + + // Nothing to do if the specified attribute is not found. + } + + @Override + public Map getXAttrs(Path f) throws IOException { + LOG.debug("get XAttrs: {}.", f); + + Path absolutePath = makeAbsolute(f); + + String key = pathToKey(absolutePath); + FileMetadata fileMetadata = store.retrieveMetadata(key); + if (null == fileMetadata) { + throw new FileNotFoundException("File or directory doesn't exist: " + f); + } + + return fileMetadata.getUserAttributes(); + } + + @Override + public List listXAttrs(Path f) throws IOException { + LOG.debug("list XAttrs: {}.", f); + + Path absolutePath = makeAbsolute(f); + + String key = pathToKey(absolutePath); + FileMetadata fileMetadata = store.retrieveMetadata(key); + if (null == fileMetadata) { + throw new FileNotFoundException("File or directory doesn't exist: " + f); + } + + return new ArrayList<>(fileMetadata.getUserAttributes().keySet()); + } + @Override public void close() throws IOException { try { diff --git a/src/main/java/org/apache/hadoop/fs/CosNConfigKeys.java b/src/main/java/org/apache/hadoop/fs/CosNConfigKeys.java index 21882e14..7f1b83aa 100644 --- a/src/main/java/org/apache/hadoop/fs/CosNConfigKeys.java +++ b/src/main/java/org/apache/hadoop/fs/CosNConfigKeys.java @@ -10,7 +10,7 @@ @InterfaceStability.Unstable public class CosNConfigKeys extends CommonConfigurationKeys { public static final String USER_AGENT = "fs.cosn.user.agent"; - public static final String DEFAULT_USER_AGENT = "cos-hadoop-plugin-v5.8.4"; + public static final String DEFAULT_USER_AGENT = "cos-hadoop-plugin-v5.8.5"; public static final String TENCENT_EMR_VERSION_KEY = "fs.emr.version"; @@ -43,7 +43,7 @@ public class CosNConfigKeys extends CommonConfigurationKeys { public static final String COSN_UPLOAD_BUFFER_SIZE_KEY = "fs.cosn.upload.buffer.size"; public static final String COSN_UPLOAD_BUFFER_SIZE_PREV_KEY = "fs.cosn.buffer.size"; - public static final int DEFAULT_UPLOAD_BUFFER_SIZE = -1; // default is 128MB + public static final int DEFAULT_UPLOAD_BUFFER_SIZE = -1; public static final String COSN_BLOCK_SIZE_KEY = "fs.cosn.block.size"; public static final long DEFAULT_BLOCK_SIZE = 128 * Unit.MB; diff --git a/src/main/java/org/apache/hadoop/fs/CosNCopyFileTask.java b/src/main/java/org/apache/hadoop/fs/CosNCopyFileTask.java index 8e09c840..64a4706c 100644 --- a/src/main/java/org/apache/hadoop/fs/CosNCopyFileTask.java +++ b/src/main/java/org/apache/hadoop/fs/CosNCopyFileTask.java @@ -6,13 +6,13 @@ import java.io.IOException; public class CosNCopyFileTask implements Runnable { - private static Logger LOG = LoggerFactory.getLogger(CosNCopyFileTask.class); + private static final Logger LOG = LoggerFactory.getLogger(CosNCopyFileTask.class); - private NativeFileSystemStore store; + private final NativeFileSystemStore store; - private String srcKey; - private String dstKey; - private CosNCopyFileContext cosCopyFileContext; + private final String srcKey; + private final String dstKey; + private final CosNCopyFileContext cosCopyFileContext; public CosNCopyFileTask(NativeFileSystemStore store, String srcKey, String dstKey, diff --git a/src/main/java/org/apache/hadoop/fs/CosNDeleteFileTask.java b/src/main/java/org/apache/hadoop/fs/CosNDeleteFileTask.java index 27f81bc3..d90b7e4a 100644 --- a/src/main/java/org/apache/hadoop/fs/CosNDeleteFileTask.java +++ b/src/main/java/org/apache/hadoop/fs/CosNDeleteFileTask.java @@ -6,12 +6,12 @@ import java.io.IOException; public class CosNDeleteFileTask implements Runnable { - private static Logger LOG = LoggerFactory.getLogger(CosNCopyFileTask.class); + private static final Logger LOG = LoggerFactory.getLogger(CosNCopyFileTask.class); - private NativeFileSystemStore store; + private final NativeFileSystemStore store; - private String srcKey; - private CosNDeleteFileContext cosDeleteFileContext; + private final String srcKey; + private final CosNDeleteFileContext cosDeleteFileContext; public CosNDeleteFileTask(NativeFileSystemStore store, String srcKey, CosNDeleteFileContext cosDeleteFileContext) { @@ -24,10 +24,10 @@ public CosNDeleteFileTask(NativeFileSystemStore store, String srcKey, public void run() { boolean fail = false; try { + LOG.debug("Delete the cos key: {}.", srcKey); this.store.delete(srcKey); } catch (IOException e) { - LOG.warn("Exception thrown when delete file{}, exception:{}" - , this.srcKey, e); + LOG.warn("Exception thrown when delete file [{}], exception: ", this.srcKey, e); fail = true; cosDeleteFileContext.setIOException(e); } finally { diff --git a/src/main/java/org/apache/hadoop/fs/CosNXAttr.java b/src/main/java/org/apache/hadoop/fs/CosNXAttr.java new file mode 100644 index 00000000..e867c567 --- /dev/null +++ b/src/main/java/org/apache/hadoop/fs/CosNXAttr.java @@ -0,0 +1,24 @@ +package org.apache.hadoop.fs; + +import java.io.Serializable; + +public class CosNXAttr implements Serializable { + private String name; + private String value; + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getValue() { + return value; + } + + public void setValue(String value) { + this.value = value; + } +} diff --git a/src/main/java/org/apache/hadoop/fs/CosNativeFileSystemStore.java b/src/main/java/org/apache/hadoop/fs/CosNativeFileSystemStore.java index d7e9e26b..cfe4d0b4 100644 --- a/src/main/java/org/apache/hadoop/fs/CosNativeFileSystemStore.java +++ b/src/main/java/org/apache/hadoop/fs/CosNativeFileSystemStore.java @@ -8,6 +8,7 @@ import com.qcloud.cos.model.*; import com.qcloud.cos.region.Region; import com.qcloud.cos.utils.Base64; +import com.qcloud.cos.utils.Jackson; import com.qcloud.cos.utils.StringUtils; import org.apache.commons.codec.binary.Hex; import org.apache.hadoop.classification.InterfaceAudience; @@ -19,17 +20,21 @@ import java.io.*; import java.net.URI; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.List; +import java.nio.charset.StandardCharsets; +import java.util.*; import java.util.concurrent.ThreadLocalRandom; +import static org.apache.hadoop.fs.CosFileSystem.METADATA_ENCODING; import static org.apache.hadoop.fs.CosFileSystem.PATH_DELIMITER; @InterfaceAudience.Private @InterfaceStability.Unstable public class CosNativeFileSystemStore implements NativeFileSystemStore { + public static final Logger LOG = + LoggerFactory.getLogger(CosNativeFileSystemStore.class); + + private static final String XATTR_PREFIX = "cosn-xattr-"; + private COSClient cosClient; private COSCredentialProviderList cosCredentialProviderList; private String bucketName; @@ -39,10 +44,6 @@ public class CosNativeFileSystemStore implements NativeFileSystemStore { private CosEncryptionSecrets encryptionSecrets; private CustomerDomainEndpointResolver customerDomainEndpointResolver; - public static final Logger LOG = - LoggerFactory.getLogger(CosNativeFileSystemStore.class); - - private void initCOSClient(URI uri, Configuration conf) throws IOException { this.cosCredentialProviderList = CosNUtils.createCosCredentialsProviderSet(uri, conf); @@ -52,7 +53,6 @@ private void initCOSClient(URI uri, Configuration conf) throws IOException { String customerDomain = conf.get(CosNConfigKeys.CUSTOMER_DOMAIN); if (null == customerDomain) { - String region = conf.get(CosNConfigKeys.COSN_REGION_KEY); if (null == region) { region = conf.get(CosNConfigKeys.COSN_REGION_PREV_KEY); @@ -195,7 +195,7 @@ private void storeFileWithRetry(String key, InputStream inputStream, throws IOException { try { ObjectMetadata objectMetadata = new ObjectMetadata(); - if(null != md5Hash){ + if (null != md5Hash) { objectMetadata.setContentMD5(Base64.encodeAsString(md5Hash)); } objectMetadata.setContentLength(length); @@ -238,19 +238,18 @@ private void storeFileWithRetry(String key, InputStream inputStream, @Override public void storeFile(String key, File file, byte[] md5Hash) throws IOException { - if(null != md5Hash){ + if (null != md5Hash) { LOG.debug("Store the file, local path: {}, length: {}, md5hash: {}.", file.getCanonicalPath(), file.length(), Hex.encodeHexString(md5Hash)); } storeFileWithRetry(key, - new BufferedInputStream(new FileInputStream(file)), md5Hash, - file.length()); + new BufferedInputStream(new FileInputStream(file)), md5Hash, file.length()); } @Override public void storeFile(String key, InputStream inputStream, byte[] md5Hash , long contentLength) throws IOException { - if(null != md5Hash){ + if (null != md5Hash) { LOG.debug("Store the file to the cos key: {}, input stream md5 hash: {}, content length: {}.", key, Hex.encodeHexString(md5Hash), contentLength); @@ -261,11 +260,7 @@ public void storeFile(String key, InputStream inputStream, byte[] md5Hash // for cos, storeEmptyFile means create a directory @Override public void storeEmptyFile(String key) throws IOException { - if (!key.endsWith(PATH_DELIMITER)) { - key = key + PATH_DELIMITER; - } - - LOG.debug("Store a empty file to the key: {}.", key); + LOG.debug("Store an empty file to the key: {}.", key); ObjectMetadata objectMetadata = new ObjectMetadata(); objectMetadata.setContentLength(0); @@ -320,7 +315,7 @@ public PartETag uploadPart( uploadPartRequest.setInputStream(inputStream); uploadPartRequest.setPartNumber(partNum); uploadPartRequest.setPartSize(partSize); - if(null != md5Hash){ + if (null != md5Hash) { uploadPartRequest.setMd5Digest(Base64.encodeAsString(md5Hash)); } uploadPartRequest.setKey(key); @@ -350,8 +345,7 @@ public void abortMultipartUpload(String key, String uploadId) throws IOException try { AbortMultipartUploadRequest abortMultipartUploadRequest = new AbortMultipartUploadRequest( - bucketName, key, uploadId - ); + bucketName, key, uploadId); this.callCOSClientWithRetry(abortMultipartUploadRequest); } catch (Exception e) { String errMsg = String.format("Aborting the multipart upload failed. cos key: %s, upload id: %s. " + @@ -425,9 +419,31 @@ private FileMetadata QueryObjectMetadata(String key) throws IOException { String ETag = objectMetadata.getETag(); String crc64ecm = objectMetadata.getCrc64Ecma(); String versionId = objectMetadata.getVersionId(); + Map userMetadata = null; + if (objectMetadata.getUserMetadata() != null) { + userMetadata = new HashMap<>(); + for (Map.Entry userMetadataEntry : objectMetadata.getUserMetadata().entrySet()) { + if (userMetadataEntry.getKey().startsWith(ensureValidAttributeName(XATTR_PREFIX))) { + String xAttrJsonStr = new String(Base64.decode(userMetadataEntry.getValue()), StandardCharsets.UTF_8); + CosNXAttr cosNXAttr = null; + try { + cosNXAttr = Jackson.fromJsonString(xAttrJsonStr, CosNXAttr.class); + } catch (CosClientException e) { + LOG.warn("Parse the xAttr failed. name: {}, XAttJsonStr: {}.", + userMetadataEntry.getKey(), xAttrJsonStr); + continue; // Skip + } + + if (null != cosNXAttr) { + userMetadata.put(cosNXAttr.getName(), cosNXAttr.getValue().getBytes(METADATA_ENCODING)); + } + } + } + } FileMetadata fileMetadata = new FileMetadata(key, fileSize, mtime, - !key.endsWith(PATH_DELIMITER), ETag, crc64ecm, versionId, objectMetadata.getStorageClass()); + !key.endsWith(PATH_DELIMITER), ETag, crc64ecm, versionId, objectMetadata.getStorageClass(), + userMetadata); LOG.debug("Retrieve the file metadata. cos key: {}, ETag:{}, length:{}, crc64ecm: {}.", key, objectMetadata.getETag(), objectMetadata.getContentLength(), objectMetadata.getCrc64Ecma()); return fileMetadata; @@ -459,11 +475,132 @@ public FileMetadata retrieveMetadata(String key) throws IOException { return QueryObjectMetadata(key); } + @Override + public byte[] retrieveAttribute(String key, String attribute) throws IOException { + LOG.debug("Get the extended attribute. cos key: {}, attribute: {}.", key, attribute); + + FileMetadata fileMetadata = retrieveMetadata(key); + if (null != fileMetadata) { + if (null != fileMetadata.getUserAttributes()) { + return fileMetadata.getUserAttributes().get(attribute); + } + } + + return null; + } + + @Override + public void storeDirAttribute(String key, String attribute, byte[] value) throws IOException { + LOG.debug("Store a attribute to the specified directory. cos key: {}, attribute: {}, value: {}.", + key, attribute, new String(value, METADATA_ENCODING)); + if (!key.endsWith(PATH_DELIMITER)) { + key = key + PATH_DELIMITER; + } + storeAttribute(key, attribute, value, false); + } + + @Override + public void storeFileAttribute(String key, String attribute, byte[] value) throws IOException { + LOG.debug("Store a attribute to the specified file. cos key: {}, attribute: {}, value: {}.", + key, attribute, new String(value, METADATA_ENCODING)); + storeAttribute(key, attribute, value, false); + } + + @Override + public void removeDirAttribute(String key, String attribute) throws IOException { + LOG.debug("Remove the attribute from the specified directory. cos key: {}, attribute: {}.", + key, attribute); + if (!key.endsWith(PATH_DELIMITER)) { + key = key + PATH_DELIMITER; + } + storeAttribute(key, attribute, null, true); + } + + @Override + public void removeFileAttribute(String key, String attribute) throws IOException { + LOG.debug("Remove the attribute from the specified file. cos key: {}, attribute: {}.", + key, attribute); + storeAttribute(key, attribute, null, true); + } + + private void storeAttribute(String key, String attribute, byte[] value, boolean deleted) throws IOException { + if (deleted) { + LOG.debug("Delete the extended attribute. cos key: {}, attribute: {}.", key, attribute); + } + + if (null != value && !deleted) { + LOG.debug("Store the extended attribute. cos key: {}, attribute: {}, value: {}.", + key, attribute, new String(value, StandardCharsets.UTF_8)); + } + + if (null == value && !deleted) { + throw new IOException("The attribute value to be set can not be null."); + } + + GetObjectMetadataRequest getObjectMetadataRequest = new GetObjectMetadataRequest(bucketName, key); + this.setEncryptionMetadata(getObjectMetadataRequest, new ObjectMetadata()); + ObjectMetadata objectMetadata = null; + try { + objectMetadata = (ObjectMetadata) callCOSClientWithRetry(getObjectMetadataRequest); + } catch (CosServiceException e) { + if (e.getStatusCode() != 404) { + String errorMessage = String.format("Retrieve the file metadata failed. " + + "cos key: %s, exception: %s.", key, e.toString()); + handleException(new Exception(errorMessage), key); + } + } + + if (null != objectMetadata) { + Map userMetadata = objectMetadata.getUserMetadata(); + if (deleted) { + if (null != userMetadata) { + userMetadata.remove(ensureValidAttributeName(attribute)); + } else { + return; + } + } else { + if (null == userMetadata) { + userMetadata = new HashMap<>(); + } + CosNXAttr cosNXAttr = new CosNXAttr(); + cosNXAttr.setName(attribute); + cosNXAttr.setValue(new String(value, METADATA_ENCODING)); + String xAttrJsonStr = Jackson.toJsonString(cosNXAttr); + userMetadata.put(ensureValidAttributeName(XATTR_PREFIX + attribute), + Base64.encodeAsString(xAttrJsonStr.getBytes(StandardCharsets.UTF_8))); + } + objectMetadata.setUserMetadata(userMetadata); + + // 构造原地copy请求来设置用户自定义属性 + CopyObjectRequest copyObjectRequest = new CopyObjectRequest(bucketName, key, bucketName, key); + if (null != objectMetadata.getStorageClass()) { + copyObjectRequest.setStorageClass(objectMetadata.getStorageClass()); + } + copyObjectRequest.setNewObjectMetadata(objectMetadata); + copyObjectRequest.setRedirectLocation("Replaced"); + this.setEncryptionMetadata(copyObjectRequest, objectMetadata); + if (null != this.customerDomainEndpointResolver) { + if (null != this.customerDomainEndpointResolver.getEndpoint()) { + copyObjectRequest.setSourceEndpointBuilder(this.customerDomainEndpointResolver); + } + } + + try { + callCOSClientWithRetry(copyObjectRequest); + } catch (Exception e) { + String errMsg = String.format("Failed to modify the user-defined attributes. " + + "cos key: %s, attribute: %s, exception: %s.", + key, attribute, e.toString()); + handleException(new Exception(errMsg), key); + } + } + } + /** * @param key The key is the object name that is being retrieved from the - * cos bucket - * @return This method returns null if the key is not found - * @throws IOException + * cos bucket. + * @return This method returns null if the key is not found. + * @throws IOException Retrieve the specified cos key failed. */ @Override public InputStream retrieve(String key) throws IOException { @@ -489,9 +626,9 @@ public InputStream retrieve(String key) throws IOException { /** * @param key The key is the object name that is being retrieved from the - * cos bucket - * @return This method returns null if the key is not found - * @throws IOException + * cos bucket. + * @return This method returns null if the key is not found. + * @throws IOException Retrieve the specified cos key with a specified byte range start failed. */ @Override @@ -610,11 +747,18 @@ private PartialListing list(String prefix, String delimiter, LOG.error(errMsg); handleException(new Exception(errMsg), prefix); } + if (null == objectListing) { + String errMessage = String.format( + "List objects failed for the prefix: %s, delimiter: %s, maxListingLength:%d, priorLastKey: %s.", + prefix, (delimiter == null) ? "" : delimiter, + maxListingLength, priorLastKey); + handleException(new Exception(errMessage), prefix); + } + ArrayList fileMetadataArray = new ArrayList(); ArrayList commonPrefixArray = new ArrayList(); - List summaries = objectListing.getObjectSummaries(); for (COSObjectSummary cosObjectSummary : summaries) { String filePath = cosObjectSummary.getKey(); @@ -630,8 +774,12 @@ private PartialListing list(String prefix, String delimiter, } long fileLen = cosObjectSummary.getSize(); String fileEtag = cosObjectSummary.getETag(); - fileMetadataArray.add(new FileMetadata(filePath, fileLen, mtime, - true, fileEtag, null, null, cosObjectSummary.getStorageClass())); + if (cosObjectSummary.getKey().endsWith(PATH_DELIMITER) && cosObjectSummary.getSize() == 0) { + fileMetadataArray.add(new FileMetadata(filePath, fileLen, mtime, false, fileEtag, null, null, cosObjectSummary.getStorageClass())); + } else { + fileMetadataArray.add(new FileMetadata(filePath, fileLen, mtime, + true, fileEtag, null, null, cosObjectSummary.getStorageClass())); + } } List commonPrefixes = objectListing.getCommonPrefixes(); for (String commonPrefix : commonPrefixes) { @@ -707,11 +855,9 @@ public void rename(String srcKey, String dstKey) throws IOException { @Override public void copy(String srcKey, String dstKey) throws IOException { - LOG.info("Copy the source key [{}] to dest key [{}].", srcKey, dstKey); try { CopyObjectRequest copyObjectRequest = - new CopyObjectRequest(bucketName, srcKey, bucketName, - dstKey); + new CopyObjectRequest(bucketName, srcKey, bucketName, dstKey); FileMetadata sourceFileMetadata = this.retrieveMetadata(srcKey); if (null != sourceFileMetadata.getStorageClass()) { copyObjectRequest.setStorageClass(sourceFileMetadata.getStorageClass()); @@ -726,8 +872,7 @@ public void copy(String srcKey, String dstKey) throws IOException { } catch (Exception e) { String errMsg = String.format( "Copy the object failed, src cos key: %s, dst cos key: %s, " + - "exception: %s", srcKey, - dstKey, e.toString()); + "exception: %s", srcKey, dstKey, e.toString()); handleException(new Exception(errMsg), srcKey); } } @@ -788,7 +933,6 @@ public boolean retrieveBlock(String key, long byteRangeStart, @Override public long getFileLength(String key) throws IOException { - LOG.info("getFile Length, cos key: {}", key); GetObjectMetadataRequest getObjectMetadataRequest = new GetObjectMetadataRequest(bucketName, key); this.setEncryptionMetadata(getObjectMetadataRequest, new ObjectMetadata()); @@ -1025,4 +1169,8 @@ private Object callCOSClientWithRetry(X request) throws CosServiceException, } } } + + private static String ensureValidAttributeName(String attributeName) { + return attributeName.replace('.', '-').toLowerCase(); + } } diff --git a/src/main/java/org/apache/hadoop/fs/FileMetadata.java b/src/main/java/org/apache/hadoop/fs/FileMetadata.java index 17461b91..ae04a936 100644 --- a/src/main/java/org/apache/hadoop/fs/FileMetadata.java +++ b/src/main/java/org/apache/hadoop/fs/FileMetadata.java @@ -3,6 +3,8 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import java.util.Map; + /** *

* Holds basic metadata for a file stored in a {@link NativeFileSystemStore}. @@ -19,6 +21,7 @@ public class FileMetadata { private final String crc64ecm; private final String versionId; private final String storageClass; + private final Map userAttributes; public FileMetadata(String key, long length, long lastModified) { this(key, length, lastModified, true); @@ -35,11 +38,16 @@ public FileMetadata(String key, long length, long lastModified, boolean isFile, public FileMetadata(String key, long length, long lastModified, boolean isFile, String eTag, String crc64ecm, String versionId) { - this(key, length, lastModified, isFile, eTag, crc64ecm, versionId, null); + this(key, length, lastModified, isFile, eTag, crc64ecm, versionId, null, null); } public FileMetadata(String key, long length, long lastModified, boolean isFile, String eTag, String crc64ecm, String versionId, String storageClass) { + this(key, length, lastModified, isFile, eTag, crc64ecm, versionId, storageClass, null); + } + + public FileMetadata(String key, long length, long lastModified, boolean isFile, String eTag, String crc64ecm, + String versionId, String storageClass, Map userAttributes) { this.key = key; this.length = length; this.lastModified = lastModified; @@ -48,6 +56,7 @@ public FileMetadata(String key, long length, long lastModified, boolean isFile, this.crc64ecm = crc64ecm; this.versionId = versionId; this.storageClass = storageClass; + this.userAttributes = userAttributes; } public String getKey() { @@ -78,6 +87,10 @@ public String getStorageClass() { return storageClass; } + public Map getUserAttributes() { + return userAttributes; + } + @Override public String toString() { return "FileMetadata[" + key + ", " + length + ", " + lastModified + diff --git a/src/main/java/org/apache/hadoop/fs/NativeFileSystemStore.java b/src/main/java/org/apache/hadoop/fs/NativeFileSystemStore.java index 059eef92..4ca8ecf1 100644 --- a/src/main/java/org/apache/hadoop/fs/NativeFileSystemStore.java +++ b/src/main/java/org/apache/hadoop/fs/NativeFileSystemStore.java @@ -45,6 +45,14 @@ PartETag uploadPart(InputStream inputStream, String key, String uploadId, FileMetadata retrieveMetadata(String key) throws IOException; + byte[] retrieveAttribute(String key, String attribute) throws IOException; + + void storeDirAttribute(String key, String attribute, byte[] value) throws IOException; + void storeFileAttribute(String key, String attribute, byte[] value) throws IOException; + + void removeDirAttribute(String key, String attribute) throws IOException; + void removeFileAttribute(String key, String attribute) throws IOException; + InputStream retrieve(String key) throws IOException; InputStream retrieve(String key, long byteRangeStart) throws IOException;