diff --git a/src/main/java/org/apache/hadoop/fs/CosNSeekableFSDataOutputStream.java b/src/main/java/org/apache/hadoop/fs/CosNSeekableFSDataOutputStream.java index bdad7d52..f91758c0 100644 --- a/src/main/java/org/apache/hadoop/fs/CosNSeekableFSDataOutputStream.java +++ b/src/main/java/org/apache/hadoop/fs/CosNSeekableFSDataOutputStream.java @@ -39,22 +39,22 @@ public synchronized int ftruncate(long length) throws IOException { } @Override - public void seek(long pos) throws IOException { + public synchronized void seek(long pos) throws IOException { this.seekableOutputStream.seek(pos); } @Override - public boolean seekToNewSource(long pos) throws IOException { + public synchronized boolean seekToNewSource(long pos) throws IOException { return this.seekableOutputStream.seekToNewSource(pos); } @Override - public void doAbort() { + public synchronized void doAbort() { this.seekableOutputStream.doAbort(); } @Override - public long getPos() { + public synchronized long getPos() { return this.seekableOutputStream.getPos(); } diff --git a/src/main/java/org/apache/hadoop/fs/cosn/multipart/upload/MultipartManager.java b/src/main/java/org/apache/hadoop/fs/cosn/multipart/upload/MultipartManager.java index f6c3c508..37bfba11 100644 --- a/src/main/java/org/apache/hadoop/fs/cosn/multipart/upload/MultipartManager.java +++ b/src/main/java/org/apache/hadoop/fs/cosn/multipart/upload/MultipartManager.java @@ -50,6 +50,7 @@ public class MultipartManager { private final List localParts = Collections.synchronizedList( new ArrayList()); private final ListeningExecutorService listeningExecutorService; + private volatile boolean splitPartProcess; // 是否在拆分过程中 private volatile boolean committed; private volatile boolean aborted; private volatile boolean closed; @@ -107,7 +108,7 @@ public void splitParts(long newLen) throws IOException { // 拆块需要进行重置 this.reset(); - + this.splitPartProcess = true; long copyRemaining = Math.min(newLen, fileMetadata.getLength()); if (copyRemaining > 0) { long firstByte = 0; @@ -151,7 +152,7 @@ public void splitParts(long newLen) throws IOException { long endPos = newLen - 1; this.padBytes(startPos, endPos); } - + this.splitPartProcess = false; this.committed = false; this.aborted = false; } @@ -617,8 +618,9 @@ private void checkOpened() { private CosNRandomAccessMappedBuffer getLocalPartResource(String fileName, int size) throws IOException { this.checkOpened(); - - if (LocalRandomAccessMappedBufferPool.getInstance().shouldRelease()) { + + if (LocalRandomAccessMappedBufferPool.getInstance().shouldRelease() && !this.splitPartProcess) { + LOG.info("Begin to release the local cache for the seekable write."); // 本地的 POSIX extension 语义支持空间已经不够了,需要先尝试释放本地占用 // 将当前所有修改提交到远端 this.commitLocalToRemote();