diff --git a/src/main/java/org/apache/hadoop/fs/CosNFSInputStream.java b/src/main/java/org/apache/hadoop/fs/CosNFSInputStream.java index e4163f4..6bc7c37 100644 --- a/src/main/java/org/apache/hadoop/fs/CosNFSInputStream.java +++ b/src/main/java/org/apache/hadoop/fs/CosNFSInputStream.java @@ -11,6 +11,7 @@ import java.io.IOException; import java.util.ArrayDeque; import java.util.Deque; +import java.util.Objects; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -18,6 +19,8 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import javax.annotation.Nullable; + public class CosNFSInputStream extends FSInputStream { public static final Logger LOG = @@ -61,8 +64,13 @@ public void signalAll() { readyCondition.signalAll(); } + @Nullable public byte[] getBuffer() { - return this.memory.array(); + final MemoryAllocator.Memory finalMemory = memory; + if (finalMemory == null) { + return null; + } + return finalMemory.array(); } public int length() { @@ -358,6 +366,7 @@ public int read() throws IOException { int byteRead = -1; if (this.partRemaining != 0) { byte[] buffer = currentReadBuffer.getBuffer(); + Objects.requireNonNull(buffer); byteRead = buffer[(int) (buffer.length - this.partRemaining)] & 0xff; } if (byteRead >= 0) { @@ -391,6 +400,7 @@ public int read(byte[] b, int off, int len) throws IOException { int bytes = 0; byte[] buffer = currentReadBuffer.getBuffer(); + Objects.requireNonNull(buffer); for (int i = buffer.length - (int) partRemaining; i < buffer.length; i++) { b[off + bytesRead] = buffer[i]; diff --git a/src/main/java/org/apache/hadoop/fs/CosNFileReadTask.java b/src/main/java/org/apache/hadoop/fs/CosNFileReadTask.java index 0d4462d..4d528e0 100644 --- a/src/main/java/org/apache/hadoop/fs/CosNFileReadTask.java +++ b/src/main/java/org/apache/hadoop/fs/CosNFileReadTask.java @@ -8,10 +8,13 @@ import java.io.IOException; import java.io.InputStream; import java.net.SocketException; +import java.util.Objects; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import static org.apache.hadoop.fs.CosNConfigKeys.DEFAULT_READ_BUFFER_ALLOCATE_TIMEOUT_SECONDS; + public class CosNFileReadTask implements Runnable { static final Logger LOG = LoggerFactory.getLogger(CosNFileReadTask.class); @@ -59,14 +62,11 @@ public void run() { currentThread.setContextClassLoader(this.getClass().getClassLoader()); try { this.readBuffer.lock(); - if (closed.get()) { - this.setFailResult("the input stream has been canceled.", new IOException("the input stream has been canceled.")); - return; - } + checkStreamClosed(); try { this.readBuffer.allocate( - conf.getInt(CosNConfigKeys.COSN_READ_BUFFER_ALLOCATE_TIMEOUT_SECONDS, 5), - TimeUnit.SECONDS); + conf.getLong(CosNConfigKeys.COSN_READ_BUFFER_ALLOCATE_TIMEOUT_SECONDS, + DEFAULT_READ_BUFFER_ALLOCATE_TIMEOUT_SECONDS), TimeUnit.SECONDS); } catch (Exception e) { this.setFailResult("allocate read buffer failed.", new IOException(e)); return; @@ -107,15 +107,17 @@ public void run() { this.readBuffer.getStart(), this.readBuffer.getEnd(), ioException); this.setFailResult(errMsg, ioException); break; - } catch (Throwable throwable) { - this.setFailResult("retrieve block failed", new IOException(throwable)); - break; } if (!needRetry) { break; } } // end of retry + } catch (Throwable throwable) { + this.setFailResult( + String.format("retrieve block failed, key: %s, range: [%d , %d], exception: %s", + this.key, this.readBuffer.getStart(), this.readBuffer.getEnd(), throwable), + new IOException(throwable)); } finally { this.readBuffer.signalAll(); this.readBuffer.unLock(); @@ -125,17 +127,25 @@ public void run() { public void setFailResult(String msg, IOException e) { this.readBuffer.setStatus(CosNFSInputStream.ReadBuffer.ERROR); this.readBuffer.setException(e); - LOG.error(msg); + if (e.getCause() != null && e.getCause() instanceof CancelledException) { + // 预期操作,以warn级别导出 + LOG.warn(msg); + } else { + LOG.error(msg); + } } // not thread safe - public void retrieveBlock() throws IOException { + private void retrieveBlock() throws IOException, CancelledException { + byte[] dataBuf = readBuffer.getBuffer(); + checkStreamClosed(); + Objects.requireNonNull(dataBuf); InputStream inputStream = this.store.retrieveBlock( this.key, this.readBuffer.getStart(), this.readBuffer.getEnd()); IOUtils.readFully( - inputStream, this.readBuffer.getBuffer(), 0, - readBuffer.getBuffer().length); + inputStream, dataBuf, 0, + dataBuf.length); int readEof = inputStream.read(); if (readEof != -1) { LOG.error("Expect to read the eof, but the return is not -1. key: {}.", this.key); @@ -143,4 +153,18 @@ public void retrieveBlock() throws IOException { inputStream.close(); this.readBuffer.setStatus(CosNFSInputStream.ReadBuffer.SUCCESS); } + + private void checkStreamClosed() throws CancelledException { + if (closed.get()) { + throw new CancelledException("the input stream has been canceled."); + } + } + + + private static class CancelledException extends Exception { + public CancelledException(String message) { + super(message); + } + } } +