From 2aff02958edc428a0166d81b218ce36f2469f406 Mon Sep 17 00:00:00 2001 From: Yang Yu Date: Sat, 8 Apr 2023 18:42:38 +0800 Subject: [PATCH] refactor: Refactor the output stream copy thread pool of the the append and truncate operation. --- .../fs/CosNExtendedFSDataOutputStream.java | 17 +++++++++++------ .../org/apache/hadoop/fs/CosNFileSystem.java | 8 ++++---- 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/src/main/java/org/apache/hadoop/fs/CosNExtendedFSDataOutputStream.java b/src/main/java/org/apache/hadoop/fs/CosNExtendedFSDataOutputStream.java index 17721177..5bfbe65d 100644 --- a/src/main/java/org/apache/hadoop/fs/CosNExtendedFSDataOutputStream.java +++ b/src/main/java/org/apache/hadoop/fs/CosNExtendedFSDataOutputStream.java @@ -1,6 +1,8 @@ package org.apache.hadoop.fs; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import com.qcloud.cos.model.PartETag; import com.qcloud.cos.utils.CRC64; import org.apache.hadoop.conf.Configuration; @@ -21,15 +23,17 @@ public class CosNExtendedFSDataOutputStream extends CosNFSDataOutputStream { private static final Logger LOG = LoggerFactory.getLogger(CosNExtendedFSDataOutputStream.class); + private final ListeningExecutorService copyExecutoService; + public CosNExtendedFSDataOutputStream(Configuration conf, NativeFileSystemStore nativeStore, - String cosKey, ExecutorService executorService) throws IOException { - this(conf, nativeStore, cosKey, executorService, false); + String cosKey, ExecutorService ioExecutorService, ExecutorService copyExecutorService) throws IOException { + this(conf, nativeStore, cosKey, ioExecutorService, copyExecutorService, false); } public CosNExtendedFSDataOutputStream(Configuration conf, NativeFileSystemStore nativeStore, - String cosKey, ExecutorService executorService, boolean appendFlag) throws IOException { - super(conf, nativeStore, cosKey, executorService); - + String cosKey, ExecutorService ioExecutorService, ExecutorService copyExecutorService, boolean appendFlag) throws IOException { + super(conf, nativeStore, cosKey, ioExecutorService); + this.copyExecutoService = MoreExecutors.listeningDecorator(copyExecutorService); if (appendFlag) { this.resumeForWrite(); } @@ -139,7 +143,8 @@ protected void uploadPartCopyAsync(final UploadPartCopy uploadPartCopy) throws I partsSubmitted.incrementAndGet(); bytesSubmitted.addAndGet(uploadPartCopy.getLastByte() - uploadPartCopy.getFirstByte() + 1); - ListenableFuture partETagListenableFuture = executorService.submit(new Callable() { + ListenableFuture partETagListenableFuture = + CosNExtendedFSDataOutputStream.this.copyExecutoService.submit(new Callable() { @Override public PartETag call() throws Exception { LOG.debug("Start to copy the part: {}.", uploadPartCopy); diff --git a/src/main/java/org/apache/hadoop/fs/CosNFileSystem.java b/src/main/java/org/apache/hadoop/fs/CosNFileSystem.java index 8236a178..503758e9 100644 --- a/src/main/java/org/apache/hadoop/fs/CosNFileSystem.java +++ b/src/main/java/org/apache/hadoop/fs/CosNFileSystem.java @@ -324,7 +324,7 @@ public FSDataOutputStream append(Path f, int bufferSize, } } else { return new FSDataOutputStream(new CosNExtendedFSDataOutputStream( - this.getConf(), this.nativeStore, cosKey, this.boundedIOThreadPool, true), + this.getConf(), this.nativeStore, cosKey, this.boundedCopyThreadPool, this.boundedCopyThreadPool,true), statistics, fileStatus.getLen()); } } @@ -374,8 +374,8 @@ public boolean truncate(Path f, long newLength) throws IOException { // Use the single thread to truncate. try (OutputStream outputStream = new FSDataOutputStream( - new CosNExtendedFSDataOutputStream(this.getConf(), this.nativeStore, cosKey, this.boundedIOThreadPool), - statistics)) { + new CosNExtendedFSDataOutputStream(this.getConf(), this.nativeStore, cosKey, + this.boundedIOThreadPool, this.boundedCopyThreadPool), statistics)) { // If the newLength is equal to 0, just wait for 'try finally' to close. if (newLength > 0) { try (InputStream inputStream = @@ -430,7 +430,7 @@ public FSDataOutputStream create(Path f, FsPermission permission, // Need to support the synchronous flush. return new FSDataOutputStream( new CosNExtendedFSDataOutputStream(this.getConf(), nativeStore, key, - this.boundedIOThreadPool), statistics); + this.boundedIOThreadPool, this.boundedCopyThreadPool), statistics); } else { return new FSDataOutputStream( new CosNFSDataOutputStream(this.getConf(), nativeStore, key,