Skip to content

Commit

Permalink
support multiple dir and add option to disable create interface head …
Browse files Browse the repository at this point in the history
…request (#119)

Co-authored-by: yongqingliu <[email protected]>
  • Loading branch information
liuyongqing and yongqingliu authored May 12, 2023
1 parent 1a1268f commit 3d5cc5c
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 31 deletions.
11 changes: 11 additions & 0 deletions src/main/java/org/apache/hadoop/fs/CosFileSystem.java
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,17 @@ public void disableSSE() throws IOException {
}
}

// CosNFileSystem Support Only ignore exist file and folder check
public void disableCreateOpFileExistCheck() throws IOException {
LOG.debug("create op file exist check");
checkInitialized();
if (this.actualImplFS instanceof CosNFileSystem) {
((CosNFileSystem) this.actualImplFS).disableCreateOpFileExistCheck();
} else {
throw new UnsupportedOperationException("Not supported currently");
}
}

@Override
public String getCanonicalServiceName() {
if (useOFSRanger()) {
Expand Down
48 changes: 31 additions & 17 deletions src/main/java/org/apache/hadoop/fs/CosNFileSystem.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ public class CosNFileSystem extends FileSystem {
static final String CSE_ALGORITHM_USER_METADATA = "client-side-encryption-cek-alg";
private int symbolicLinkSizeThreshold;

// for fileSystem based on CosN like GooseFS will do folder/file check
// so in this situation, we need disable this duplicate folder/file check in CosN
private boolean createOpCheckExistFile = true;

// todo: flink or some other case must replace with inner structure.
public CosNFileSystem() {
}
Expand Down Expand Up @@ -101,6 +105,10 @@ public CosNFileSystem withRangerCredentialsClient(RangerCredentialsClient rc) {
return this;
}

public void disableCreateOpFileExistCheck() {
this.createOpCheckExistFile = false;
}

@Override
public String getScheme() {
return CosNFileSystem.SCHEME;
Expand Down Expand Up @@ -399,28 +407,29 @@ public FSDataOutputStream create(Path f, FsPermission permission,
int bufferSize, short replication,
long blockSize, Progressable progress)
throws IOException {
// preconditions
try {
FileStatus targetFileStatus = this.getFileStatus(f);
if (targetFileStatus.isSymlink()) {
f = this.getLinkTarget(f);
// call the getFileStatus for the latest path again.
targetFileStatus = getFileStatus(f);
}

if (targetFileStatus.isFile() && !overwrite) {
throw new FileAlreadyExistsException("File already exists: " + f);
}
if (targetFileStatus.isDirectory()) {
throw new FileAlreadyExistsException("Directory already exists: " + f);
}
} catch (FileNotFoundException ignore) {
if (createOpCheckExistFile) {
// preconditions
try {
FileStatus targetFileStatus = this.innerGetFileStatus(f, !overwrite);
if (targetFileStatus.isSymlink()) {
f = this.getLinkTarget(f);
// call the getFileStatus for the latest path again.
targetFileStatus = getFileStatus(f);
}
if (targetFileStatus.isFile() && !overwrite) {
throw new FileAlreadyExistsException("File already exists: " + f);
}
if (targetFileStatus.isDirectory()) {
throw new FileAlreadyExistsException("Directory already exists: " + f);
}
} catch (FileNotFoundException ignore) {
// NOTE: 这里之前认为可能会出现从 COS 的 SDK 或者 API 上传了一个 / 结尾的有内容对象
// 那么再在这个文件前缀下面成功创建新的对象而不报错的话,其实是不符合文件系统语义规范的。
// 同时,也是为了保证一个完整的目录结构,但是确实会带来元数据查询请求的明显放大。
// 不过这里,因为一般不会出现 / 结尾的内容对象,即使出现也不会覆盖丢失(因为这里相当于它的一个commonPrefix,原始对象还在COS里面)
// 所以决定去掉这个检查,来改善优化性能。
// validatePath(f)
}
}

Path absolutePath = makeAbsolute(f);
Expand Down Expand Up @@ -553,6 +562,11 @@ private void internalAutoRecursiveDelete(String key) throws IOException {

@Override
public FileStatus getFileStatus(Path f) throws IOException {
return innerGetFileStatus(f, true);
}

public FileStatus innerGetFileStatus(Path f, boolean checkFile) throws IOException {

Path absolutePath = makeAbsolute(f);
String key = pathToKey(absolutePath);

Expand All @@ -561,7 +575,7 @@ public FileStatus getFileStatus(Path f) throws IOException {
}

CosNResultInfo getObjectMetadataResultInfo = new CosNResultInfo();
FileMetadata meta = this.nativeStore.retrieveMetadata(key, getObjectMetadataResultInfo);
FileMetadata meta = this.nativeStore.retrieveMetadata(key, getObjectMetadataResultInfo, checkFile);
if (meta != null) {
if (meta.isFile()) {
LOG.debug("Retrieve the cos key [{}] to find that it is a file.", key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -846,18 +846,18 @@ private FileMetadata queryObjectMetadata(String key,

@Override
public FileMetadata retrieveMetadata(String key) throws IOException {
return retrieveMetadata(key, null);
return retrieveMetadata(key, null, true);
}

// this method only used in getFileStatus to get the head request result info
@Override
public FileMetadata retrieveMetadata(String key,
CosNResultInfo info) throws IOException {
CosNResultInfo info, boolean checkFile) throws IOException {
if (key.endsWith(CosNFileSystem.PATH_DELIMITER)) {
key = key.substring(0, key.length() - 1);
}

if (!key.isEmpty()) {
if (!key.isEmpty() && checkFile) {
FileMetadata fileMetadata = queryObjectMetadata(key, info);
if (fileMetadata != null) {
return fileMetadata;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ PartETag uploadPartCopy(String uploadId, String srcKey, String destKey, int part

FileMetadata retrieveMetadata(String key) throws IOException;

FileMetadata retrieveMetadata(String key, CosNResultInfo info) throws IOException;
FileMetadata retrieveMetadata(String key, CosNResultInfo info, boolean checkFile) throws IOException;

CosNSymlinkMetadata retrieveSymlinkMetadata(String symlink) throws IOException;

Expand Down
4 changes: 3 additions & 1 deletion src/main/java/org/apache/hadoop/fs/cosn/BufferPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,9 @@ public synchronized void initialize(Configuration conf)
CosNConfigKeys.DEFAULT_TMP_DIR);
boolean deleteOnExit = conf.getBoolean(CosNConfigKeys.COSN_MAPDISK_DELETEONEXIT_ENABLED,
CosNConfigKeys.DEFAULT_COSN_MAPDISK_DELETEONEXIT_ENABLED);
this.bufferFactory = new CosNMappedBufferFactory(tmpDir, deleteOnExit);
String[] tmpDirList = tmpDir.split(",");
LOG.info("tmp dir list", String.join(";", tmpDirList));
this.bufferFactory = new CosNMappedBufferFactory(tmpDirList, deleteOnExit);
} else {
String exceptionMsg = String.format("The type of the upload " +
"buffer is "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,23 @@
import java.io.RandomAccessFile;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;


public class CosNMappedBufferFactory implements CosNBufferFactory {
private static final Logger LOG =
LoggerFactory.getLogger(CosNMappedBufferFactory.class);

private final File tmpDir;
private final List<File> tmpDirs = new ArrayList<>();
private final boolean deleteOnExit;

public CosNMappedBufferFactory(String tmpDir, boolean deleteOnExit) throws IOException {
this.tmpDir = CosNMappedBufferFactory.createDir(tmpDir);
public CosNMappedBufferFactory(String[] tmpDirList, boolean deleteOnExit) throws IOException {
for (String tmpDir: tmpDirList) {
File createDir = CosNMappedBufferFactory.createDir(tmpDir);
tmpDirs.add(createDir);
}
this.deleteOnExit = deleteOnExit;
}

Expand Down Expand Up @@ -58,19 +65,26 @@ public CosNMappedBuffer create(int size) {
Constants.BLOCK_TMP_FILE_SUFFIX, size);
}

private final AtomicInteger currentIndex = new AtomicInteger();

private File getTmpDir() {
return tmpDirs.get(Math.abs(currentIndex.getAndIncrement() % tmpDirs.size()));
}

public CosNMappedBuffer create(String prefix, String suffix, int size) {
if (null == this.tmpDir) {
File tmpDir = getTmpDir();
if (null == tmpDir) {
LOG.error("The tmp dir is null. no mapped buffer will be created.");
return null;
}

if (!this.tmpDir.exists()) {
if (!tmpDir.exists()) {
LOG.warn("The tmp dir does not exist.");
// try to create the tmp directory.
try {
CosNMappedBufferFactory.createDir(this.tmpDir.getAbsolutePath());
CosNMappedBufferFactory.createDir(tmpDir.getAbsolutePath());
} catch (IOException e) {
LOG.error("Try to create the tmp dir [{}] failed.", this.tmpDir.getAbsolutePath(), e);
LOG.error("Try to create the tmp dir [{}] failed.", tmpDir.getAbsolutePath(), e);
return null;
}
}
Expand All @@ -79,7 +93,7 @@ public CosNMappedBuffer create(String prefix, String suffix, int size) {
File tmpFile = File.createTempFile(
Constants.BLOCK_TMP_FILE_PREFIX,
Constants.BLOCK_TMP_FILE_SUFFIX,
this.tmpDir
tmpDir
);

if (this.deleteOnExit) {
Expand All @@ -92,7 +106,7 @@ public CosNMappedBuffer create(String prefix, String suffix, int size) {
randomAccessFile.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, size);
return (null != buf) ? new CosNMappedBuffer(buf, randomAccessFile, tmpFile) : null;
} catch (IOException e) {
LOG.error("Create tmp file failed. Tmp dir: {}", this.tmpDir, e);
LOG.error("Create tmp file failed. Tmp dir: {}", tmpDir, e);
return null;
}
}
Expand Down

0 comments on commit 3d5cc5c

Please sign in to comment.