From 3d5cc5ca8c213d1620dc2e55799f2c9917876965 Mon Sep 17 00:00:00 2001 From: liuyongqing <815331793@qq.com> Date: Sat, 13 May 2023 01:05:41 +0800 Subject: [PATCH] support multiple dir and add option to disable create interface head request (#119) Co-authored-by: yongqingliu --- .../org/apache/hadoop/fs/CosFileSystem.java | 11 +++++ .../org/apache/hadoop/fs/CosNFileSystem.java | 48 ++++++++++++------- .../hadoop/fs/CosNativeFileSystemStore.java | 6 +-- .../hadoop/fs/NativeFileSystemStore.java | 2 +- .../org/apache/hadoop/fs/cosn/BufferPool.java | 4 +- .../cosn/buffer/CosNMappedBufferFactory.java | 32 +++++++++---- 6 files changed, 72 insertions(+), 31 deletions(-) diff --git a/src/main/java/org/apache/hadoop/fs/CosFileSystem.java b/src/main/java/org/apache/hadoop/fs/CosFileSystem.java index dc47365f..2603b085 100644 --- a/src/main/java/org/apache/hadoop/fs/CosFileSystem.java +++ b/src/main/java/org/apache/hadoop/fs/CosFileSystem.java @@ -609,6 +609,17 @@ public void disableSSE() throws IOException { } } + // CosNFileSystem Support Only ignore exist file and folder check + public void disableCreateOpFileExistCheck() throws IOException { + LOG.debug("create op file exist check"); + checkInitialized(); + if (this.actualImplFS instanceof CosNFileSystem) { + ((CosNFileSystem) this.actualImplFS).disableCreateOpFileExistCheck(); + } else { + throw new UnsupportedOperationException("Not supported currently"); + } + } + @Override public String getCanonicalServiceName() { if (useOFSRanger()) { diff --git a/src/main/java/org/apache/hadoop/fs/CosNFileSystem.java b/src/main/java/org/apache/hadoop/fs/CosNFileSystem.java index 503758e9..5f280bbc 100644 --- a/src/main/java/org/apache/hadoop/fs/CosNFileSystem.java +++ b/src/main/java/org/apache/hadoop/fs/CosNFileSystem.java @@ -71,6 +71,10 @@ public class CosNFileSystem extends FileSystem { static final String CSE_ALGORITHM_USER_METADATA = "client-side-encryption-cek-alg"; private int symbolicLinkSizeThreshold; + // for fileSystem based on CosN like GooseFS will do folder/file check + // so in this situation, we need disable this duplicate folder/file check in CosN + private boolean createOpCheckExistFile = true; + // todo: flink or some other case must replace with inner structure. public CosNFileSystem() { } @@ -101,6 +105,10 @@ public CosNFileSystem withRangerCredentialsClient(RangerCredentialsClient rc) { return this; } + public void disableCreateOpFileExistCheck() { + this.createOpCheckExistFile = false; + } + @Override public String getScheme() { return CosNFileSystem.SCHEME; @@ -399,28 +407,29 @@ public FSDataOutputStream create(Path f, FsPermission permission, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException { - // preconditions - try { - FileStatus targetFileStatus = this.getFileStatus(f); - if (targetFileStatus.isSymlink()) { - f = this.getLinkTarget(f); - // call the getFileStatus for the latest path again. - targetFileStatus = getFileStatus(f); - } - - if (targetFileStatus.isFile() && !overwrite) { - throw new FileAlreadyExistsException("File already exists: " + f); - } - if (targetFileStatus.isDirectory()) { - throw new FileAlreadyExistsException("Directory already exists: " + f); - } - } catch (FileNotFoundException ignore) { + if (createOpCheckExistFile) { + // preconditions + try { + FileStatus targetFileStatus = this.innerGetFileStatus(f, !overwrite); + if (targetFileStatus.isSymlink()) { + f = this.getLinkTarget(f); + // call the getFileStatus for the latest path again. + targetFileStatus = getFileStatus(f); + } + if (targetFileStatus.isFile() && !overwrite) { + throw new FileAlreadyExistsException("File already exists: " + f); + } + if (targetFileStatus.isDirectory()) { + throw new FileAlreadyExistsException("Directory already exists: " + f); + } + } catch (FileNotFoundException ignore) { // NOTE: 这里之前认为可能会出现从 COS 的 SDK 或者 API 上传了一个 / 结尾的有内容对象 // 那么再在这个文件前缀下面成功创建新的对象而不报错的话,其实是不符合文件系统语义规范的。 // 同时,也是为了保证一个完整的目录结构,但是确实会带来元数据查询请求的明显放大。 // 不过这里,因为一般不会出现 / 结尾的内容对象,即使出现也不会覆盖丢失(因为这里相当于它的一个commonPrefix,原始对象还在COS里面) // 所以决定去掉这个检查,来改善优化性能。 // validatePath(f) + } } Path absolutePath = makeAbsolute(f); @@ -553,6 +562,11 @@ private void internalAutoRecursiveDelete(String key) throws IOException { @Override public FileStatus getFileStatus(Path f) throws IOException { + return innerGetFileStatus(f, true); + } + + public FileStatus innerGetFileStatus(Path f, boolean checkFile) throws IOException { + Path absolutePath = makeAbsolute(f); String key = pathToKey(absolutePath); @@ -561,7 +575,7 @@ public FileStatus getFileStatus(Path f) throws IOException { } CosNResultInfo getObjectMetadataResultInfo = new CosNResultInfo(); - FileMetadata meta = this.nativeStore.retrieveMetadata(key, getObjectMetadataResultInfo); + FileMetadata meta = this.nativeStore.retrieveMetadata(key, getObjectMetadataResultInfo, checkFile); if (meta != null) { if (meta.isFile()) { LOG.debug("Retrieve the cos key [{}] to find that it is a file.", key); diff --git a/src/main/java/org/apache/hadoop/fs/CosNativeFileSystemStore.java b/src/main/java/org/apache/hadoop/fs/CosNativeFileSystemStore.java index 610bf09e..50d3cfa4 100644 --- a/src/main/java/org/apache/hadoop/fs/CosNativeFileSystemStore.java +++ b/src/main/java/org/apache/hadoop/fs/CosNativeFileSystemStore.java @@ -846,18 +846,18 @@ private FileMetadata queryObjectMetadata(String key, @Override public FileMetadata retrieveMetadata(String key) throws IOException { - return retrieveMetadata(key, null); + return retrieveMetadata(key, null, true); } // this method only used in getFileStatus to get the head request result info @Override public FileMetadata retrieveMetadata(String key, - CosNResultInfo info) throws IOException { + CosNResultInfo info, boolean checkFile) throws IOException { if (key.endsWith(CosNFileSystem.PATH_DELIMITER)) { key = key.substring(0, key.length() - 1); } - if (!key.isEmpty()) { + if (!key.isEmpty() && checkFile) { FileMetadata fileMetadata = queryObjectMetadata(key, info); if (fileMetadata != null) { return fileMetadata; diff --git a/src/main/java/org/apache/hadoop/fs/NativeFileSystemStore.java b/src/main/java/org/apache/hadoop/fs/NativeFileSystemStore.java index 17e9f45e..7950dd75 100644 --- a/src/main/java/org/apache/hadoop/fs/NativeFileSystemStore.java +++ b/src/main/java/org/apache/hadoop/fs/NativeFileSystemStore.java @@ -65,7 +65,7 @@ PartETag uploadPartCopy(String uploadId, String srcKey, String destKey, int part FileMetadata retrieveMetadata(String key) throws IOException; - FileMetadata retrieveMetadata(String key, CosNResultInfo info) throws IOException; + FileMetadata retrieveMetadata(String key, CosNResultInfo info, boolean checkFile) throws IOException; CosNSymlinkMetadata retrieveSymlinkMetadata(String symlink) throws IOException; diff --git a/src/main/java/org/apache/hadoop/fs/cosn/BufferPool.java b/src/main/java/org/apache/hadoop/fs/cosn/BufferPool.java index eab11078..d4a3ca9c 100644 --- a/src/main/java/org/apache/hadoop/fs/cosn/BufferPool.java +++ b/src/main/java/org/apache/hadoop/fs/cosn/BufferPool.java @@ -126,7 +126,9 @@ public synchronized void initialize(Configuration conf) CosNConfigKeys.DEFAULT_TMP_DIR); boolean deleteOnExit = conf.getBoolean(CosNConfigKeys.COSN_MAPDISK_DELETEONEXIT_ENABLED, CosNConfigKeys.DEFAULT_COSN_MAPDISK_DELETEONEXIT_ENABLED); - this.bufferFactory = new CosNMappedBufferFactory(tmpDir, deleteOnExit); + String[] tmpDirList = tmpDir.split(","); + LOG.info("tmp dir list", String.join(";", tmpDirList)); + this.bufferFactory = new CosNMappedBufferFactory(tmpDirList, deleteOnExit); } else { String exceptionMsg = String.format("The type of the upload " + "buffer is " diff --git a/src/main/java/org/apache/hadoop/fs/cosn/buffer/CosNMappedBufferFactory.java b/src/main/java/org/apache/hadoop/fs/cosn/buffer/CosNMappedBufferFactory.java index 4b0bc8a8..ed39035d 100644 --- a/src/main/java/org/apache/hadoop/fs/cosn/buffer/CosNMappedBufferFactory.java +++ b/src/main/java/org/apache/hadoop/fs/cosn/buffer/CosNMappedBufferFactory.java @@ -9,16 +9,23 @@ import java.io.RandomAccessFile; import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + public class CosNMappedBufferFactory implements CosNBufferFactory { private static final Logger LOG = LoggerFactory.getLogger(CosNMappedBufferFactory.class); - private final File tmpDir; + private final List tmpDirs = new ArrayList<>(); private final boolean deleteOnExit; - public CosNMappedBufferFactory(String tmpDir, boolean deleteOnExit) throws IOException { - this.tmpDir = CosNMappedBufferFactory.createDir(tmpDir); + public CosNMappedBufferFactory(String[] tmpDirList, boolean deleteOnExit) throws IOException { + for (String tmpDir: tmpDirList) { + File createDir = CosNMappedBufferFactory.createDir(tmpDir); + tmpDirs.add(createDir); + } this.deleteOnExit = deleteOnExit; } @@ -58,19 +65,26 @@ public CosNMappedBuffer create(int size) { Constants.BLOCK_TMP_FILE_SUFFIX, size); } + private final AtomicInteger currentIndex = new AtomicInteger(); + + private File getTmpDir() { + return tmpDirs.get(Math.abs(currentIndex.getAndIncrement() % tmpDirs.size())); + } + public CosNMappedBuffer create(String prefix, String suffix, int size) { - if (null == this.tmpDir) { + File tmpDir = getTmpDir(); + if (null == tmpDir) { LOG.error("The tmp dir is null. no mapped buffer will be created."); return null; } - if (!this.tmpDir.exists()) { + if (!tmpDir.exists()) { LOG.warn("The tmp dir does not exist."); // try to create the tmp directory. try { - CosNMappedBufferFactory.createDir(this.tmpDir.getAbsolutePath()); + CosNMappedBufferFactory.createDir(tmpDir.getAbsolutePath()); } catch (IOException e) { - LOG.error("Try to create the tmp dir [{}] failed.", this.tmpDir.getAbsolutePath(), e); + LOG.error("Try to create the tmp dir [{}] failed.", tmpDir.getAbsolutePath(), e); return null; } } @@ -79,7 +93,7 @@ public CosNMappedBuffer create(String prefix, String suffix, int size) { File tmpFile = File.createTempFile( Constants.BLOCK_TMP_FILE_PREFIX, Constants.BLOCK_TMP_FILE_SUFFIX, - this.tmpDir + tmpDir ); if (this.deleteOnExit) { @@ -92,7 +106,7 @@ public CosNMappedBuffer create(String prefix, String suffix, int size) { randomAccessFile.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, size); return (null != buf) ? new CosNMappedBuffer(buf, randomAccessFile, tmpFile) : null; } catch (IOException e) { - LOG.error("Create tmp file failed. Tmp dir: {}", this.tmpDir, e); + LOG.error("Create tmp file failed. Tmp dir: {}", tmpDir, e); return null; } }