Skip to content

Commit

Permalink
feat: multi thread copy (#118)
Browse files Browse the repository at this point in the history
* chore: update the version to 8.2.8

* feat: support multi thread copy for random write

---------

Co-authored-by: xhaopan <[email protected]>
  • Loading branch information
suninsky and suninsky authored Apr 7, 2023
1 parent 24ca174 commit 6a1ee6b
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 8 deletions.
2 changes: 1 addition & 1 deletion compile.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

base_dir=$(cd `dirname $0`;pwd)
cd ${base_dir}
hadoop_version_array=("2.7.5" "2.8.5" "3.1.0" "3.3.0")
hadoop_version_array=("2.7.5" "2.8.5" "3.1.0" "3.2.2" "3.3.0")

origin_version=$(mvn -q -Dexec.executable="echo" -Dexec.args='${project.version}' --non-recursive exec:exec)

Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/apache/hadoop/fs/CosNFileSystem.java
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,8 @@ public FSDataOutputStream append(Path f, int bufferSize,
try {
seekableOutputStreamClass = Class.forName("org.apache.hadoop.fs.CosNSeekableFSDataOutputStream$SeekableOutputStream");
Constructor<?> constructor = seekableOutputStreamClass.getConstructor(Configuration.class, NativeFileSystemStore.class,
String.class, ExecutorService.class);
seekableOutputStream = constructor.newInstance(this.getConf(), this.nativeStore, cosKey, this.boundedIOThreadPool);
String.class, ExecutorService.class, ExecutorService.class);
seekableOutputStream = constructor.newInstance(this.getConf(), this.nativeStore, cosKey, this.boundedIOThreadPool, this.boundedCopyThreadPool);
} catch (ClassNotFoundException e) {
throw new IOException("org.apache.hadoop.fs.CosNSeekableFSDataOutputStream$SeekableOutputStream can not be found. " +
"please make sure that ofs-sdk-definition.jar is placed in the classpath.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ public static class SeekableOutputStream extends OutputStream implements PosixSe
private boolean closed;

public SeekableOutputStream(Configuration conf, NativeFileSystemStore nativeStore,
String cosKey, ExecutorService executorService) throws IOException {
String cosKey, ExecutorService executorService, ExecutorService copyExecutor)
throws IOException {
Preconditions.checkNotNull(conf, "hadoop configuration");
this.nativeStore = Preconditions.checkNotNull(nativeStore, "nativeStore");
this.cosKey = Preconditions.checkNotNull(cosKey, "cosKey");
Expand All @@ -85,7 +86,7 @@ public SeekableOutputStream(Configuration conf, NativeFileSystemStore nativeStor
partSize = Constants.MAX_PART_SIZE;
}
this.multipartManager = new MultipartManager(
this.nativeStore, this.cosKey, partSize, executorService);
this.nativeStore, this.cosKey, partSize, executorService, copyExecutor);
this.multipartManager.resumeForWrite();
// 把 pos 置于末尾
this.pos = this.multipartManager.getCurrentSize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,14 @@ public class MultipartManager {
private final List<LocalPart> localParts = Collections.synchronizedList(
new ArrayList<LocalPart>());
private final ListeningExecutorService listeningExecutorService;
private final ListeningExecutorService listeningCopyExecutorService;
private volatile boolean splitPartProcess; // 是否在拆分过程中
private volatile boolean committed;
private volatile boolean aborted;
private volatile boolean closed;

public MultipartManager(NativeFileSystemStore nativeStore,
String cosKey, long partSize, ExecutorService executorService) {
String cosKey, long partSize, ExecutorService executorService, ExecutorService copyExecutor) {
this.partSize = partSize;
this.MAX_FILE_SIZE = this.partSize * 10000L;
this.nativeStore = nativeStore;
Expand All @@ -67,6 +68,7 @@ public MultipartManager(NativeFileSystemStore nativeStore,
this.closed = false;

this.listeningExecutorService = MoreExecutors.listeningDecorator(executorService);
this.listeningCopyExecutorService = MoreExecutors.listeningDecorator(copyExecutor);
}

/**
Expand Down Expand Up @@ -116,20 +118,28 @@ public void splitParts(long newLen) throws IOException {
if (copyRemaining >= this.partSize) {
// 使用服务端copy
this.initializeMultipartUploadIfNeed();
List<ListenableFuture<PartETag>> uploadPartFutures = new ArrayList<>();
try {
lastByte = firstByte + this.partSize - 1;
while (copyRemaining >= this.partSize) {
LOG.debug("Executing the uploadPartCopy [cosKey: {}, uploadId: {}, partNumber: {}].",
cosKey, this.uploadId, this.localParts.size() + 1);
UploadPartCopy uploadPartCopy = new UploadPartCopy(cosKey, cosKey,
final UploadPartCopy uploadPartCopy = new UploadPartCopy(cosKey, cosKey,
this.localParts.size() + 1, firstByte, lastByte);
this.uploadPartCopy(uploadPartCopy);
uploadPartFutures.add(listeningCopyExecutorService.submit(new Callable<PartETag>() {
@Override
public PartETag call() throws Exception {
uploadPartCopy(uploadPartCopy);
return null;
}
}));
// 补位
this.localParts.add(null);
copyRemaining -= ((lastByte - firstByte) + 1);
firstByte = lastByte + 1;
lastByte = firstByte + this.partSize - 1;
}
Futures.allAsList(uploadPartFutures).get();
} catch (Exception exception) {
LOG.error("Failed to breakDown the cos key [{}]. Abort it.", cosKey, exception);
this.abort();
Expand Down

0 comments on commit 6a1ee6b

Please sign in to comment.