diff --git a/src/main/java/org/apache/hadoop/fs/CosNConfigKeys.java b/src/main/java/org/apache/hadoop/fs/CosNConfigKeys.java index 39506d39..8f436686 100644 --- a/src/main/java/org/apache/hadoop/fs/CosNConfigKeys.java +++ b/src/main/java/org/apache/hadoop/fs/CosNConfigKeys.java @@ -129,7 +129,7 @@ public class CosNConfigKeys extends CommonConfigurationKeys { public static final String READ_AHEAD_BLOCK_SIZE_KEY = "fs.cosn.read.ahead.block.size"; public static final long DEFAULT_READ_AHEAD_BLOCK_SIZE = 1 * Unit.MB; public static final String READ_AHEAD_QUEUE_SIZE = "fs.cosn.read.ahead.queue.size"; - public static final int DEFAULT_READ_AHEAD_QUEUE_SIZE = 8; + public static final int DEFAULT_READ_AHEAD_QUEUE_SIZE = 6; // used to control getFileStatus list to judge dir whether exist. public static final String FILESTATUS_LIST_MAX_KEYS = "fs.cosn.filestatus.list_max_keys"; public static final int DEFAULT_FILESTATUS_LIST_MAX_KEYS = 1; diff --git a/src/main/java/org/apache/hadoop/fs/CosNFSInputStream.java b/src/main/java/org/apache/hadoop/fs/CosNFSInputStream.java index 6c742a28..14da9f08 100644 --- a/src/main/java/org/apache/hadoop/fs/CosNFSInputStream.java +++ b/src/main/java/org/apache/hadoop/fs/CosNFSInputStream.java @@ -7,7 +7,7 @@ import java.io.EOFException; import java.io.IOException; import java.util.ArrayDeque; -import java.util.Queue; +import java.util.Deque; import java.util.concurrent.ExecutorService; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; @@ -91,9 +91,9 @@ public long getEnd() { private final Configuration conf; private final NativeFileSystemStore store; private final String key; - private long position = 0; - private long nextPos = 0; - private long lastByteStart = -1; + private long position; + private long nextPos; + private long lastByteStart; private long fileSize; private long partRemaining; private long bufferStart; @@ -105,7 +105,9 @@ public long getEnd() { private final int socketErrMaxRetryTimes; private final ExecutorService readAheadExecutorService; - private final Queue readBufferQueue; + private final Deque readBufferQueue; + // 设置一个 Previous buffer 用于暂存淘汰出来的队头元素,用以优化小范围随机读的性能 + private ReadBuffer previousReadBuffer; /** * Input Stream @@ -130,6 +132,9 @@ public CosNFSInputStream( this.statistics = statistics; this.key = key; this.fileSize = fileSize; + this.position = 0; + this.nextPos = 0; + this.lastByteStart = -1; this.bufferStart = -1; this.bufferEnd = -1; this.preReadPartSize = conf.getLong( @@ -143,23 +148,15 @@ public CosNFSInputStream( CosNConfigKeys.DEFAULT_CLIENT_SOCKET_ERROR_MAX_RETRIES); this.readAheadExecutorService = readAheadExecutorService; this.readBufferQueue = - new ArrayDeque(this.maxReadPartNumber); + new ArrayDeque<>(this.maxReadPartNumber); this.closed = false; } private synchronized void reopen(long pos) throws IOException { - long partSize = 0; - if (pos < 0) { throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK); } else if (pos > this.fileSize) { throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF); - } else { - if (pos + this.preReadPartSize > this.fileSize) { - partSize = this.fileSize - pos; - } else { - partSize = this.preReadPartSize; - } } this.buffer = null; @@ -170,20 +167,34 @@ private synchronized void reopen(long pos) throws IOException { if (pos == this.nextPos) { isRandomIO = false; } else { - while (this.readBufferQueue.size() != 0) { - if (this.readBufferQueue.element().getStart() != pos) { - this.readBufferQueue.poll(); - } else { - break; - } + // 发生了随机读,针对于小范围的回溯随机读,则直接看一下是否命中了前一次刚刚被淘汰出去的队头读缓存 + // 如果不是,那么随机读只可能是发生了超出前一块范围的回溯随机读,或者是在预读队列范围或者是超出预读队列范围。 + // 如果是在预读队列范围内,那么依赖在预读队列中查找直接定位到要读的块,如果是超出预读队列范围,那么队列会被排空,然后重新定位到要读的块和位置 + if (null != this.previousReadBuffer && pos >= this.previousReadBuffer.getStart() && pos <= this.previousReadBuffer.getEnd()) { + this.buffer = this.previousReadBuffer.getBuffer(); + this.bufferStart = this.previousReadBuffer.getStart(); + this.bufferEnd = this.previousReadBuffer.getEnd(); + this.position = pos; + this.partRemaining = (this.bufferEnd - this.bufferStart + 1) - (pos - this.bufferStart); + this.nextPos = !this.readBufferQueue.isEmpty() ? this.readBufferQueue.getFirst().getStart() : pos + this.preReadPartSize; + return; } } - - this.nextPos = pos + partSize; + // 在预读队列里面定位到要读的块 + while (!this.readBufferQueue.isEmpty()) { + if (pos < this.readBufferQueue.getFirst().getStart() || pos > this.readBufferQueue.getFirst().getEnd()) { + // 定位到要读的块,同时保存淘汰出来的队头元素,供小范围的随机读回溯 + this.previousReadBuffer = this.readBufferQueue.poll(); + } else { + break; + } + } + // 规整到队头的下一个元素的起始位置 + this.nextPos = pos + this.preReadPartSize; int currentBufferQueueSize = this.readBufferQueue.size(); if (currentBufferQueueSize == 0) { - this.lastByteStart = pos - partSize; + this.lastByteStart = pos - this.preReadPartSize; } else { ReadBuffer[] readBuffers = this.readBufferQueue.toArray(new ReadBuffer[currentBufferQueueSize]); @@ -193,12 +204,12 @@ private synchronized void reopen(long pos) throws IOException { int maxLen = this.maxReadPartNumber - currentBufferQueueSize; for (int i = 0; i < maxLen && i < (currentBufferQueueSize + 1) * 2; i++) { - if (this.lastByteStart + partSize * (i + 1) > this.fileSize) { + if (this.lastByteStart + this.preReadPartSize * (i + 1) > this.fileSize) { break; } - long byteStart = this.lastByteStart + partSize * (i + 1); - long byteEnd = byteStart + partSize - 1; + long byteStart = this.lastByteStart + this.preReadPartSize * (i + 1); + long byteEnd = byteStart + this.preReadPartSize - 1; if (byteEnd >= this.fileSize) { byteEnd = this.fileSize - 1; } @@ -218,7 +229,7 @@ private synchronized void reopen(long pos) throws IOException { } } - ReadBuffer readBuffer = this.readBufferQueue.poll(); + ReadBuffer readBuffer = this.readBufferQueue.peek(); IOException innerException = null; readBuffer.lock(); try { @@ -245,7 +256,7 @@ private synchronized void reopen(long pos) throws IOException { } this.position = pos; - this.partRemaining = partSize; + this.partRemaining = (this.bufferEnd - this.bufferStart + 1) - (pos - this.bufferStart); } @Override @@ -263,11 +274,21 @@ public void seek(long pos) throws IOException { return; } if (pos >= this.bufferStart && pos <= this.bufferEnd) { + // 支持块内随机读 LOG.debug("seek cache hit last pos {}, pos {}, this buffer start {}, end {}", this.position, pos, this.bufferStart, this.bufferEnd); this.position = pos; this.partRemaining = this.bufferEnd - pos + 1; + } else if (null != this.previousReadBuffer && pos >= this.previousReadBuffer.getStart() && pos <= this.previousReadBuffer.getEnd()) { + // 在上一次刚刚被淘汰的预读块中 + this.position = pos; + this.partRemaining = -1; // 触发 reopen + } else if (!this.readBufferQueue.isEmpty() && pos >= this.readBufferQueue.getFirst().getStart() && pos <= this.readBufferQueue.getLast().getEnd()) { + // 在预读队列中 + this.position = pos; + this.partRemaining = -1; // 触发 reopen } else { + // 既不在预读队列中,也不在上一次刚刚被淘汰的预读块中,那么直接定位到要读的块和位置 this.position = pos; this.partRemaining = -1; }