Skip to content

Commit

Permalink
perf: Optimize positioned read performance (#125)
Browse files Browse the repository at this point in the history
* perf: Optimize positioned read performance

* fix: Fixed the bug of PartRemaining calculation error.
  • Loading branch information
yuyang733 authored Nov 10, 2023
1 parent bd0c515 commit 429e663
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 29 deletions.
2 changes: 1 addition & 1 deletion src/main/java/org/apache/hadoop/fs/CosNConfigKeys.java
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public class CosNConfigKeys extends CommonConfigurationKeys {
public static final String READ_AHEAD_BLOCK_SIZE_KEY = "fs.cosn.read.ahead.block.size";
public static final long DEFAULT_READ_AHEAD_BLOCK_SIZE = 1 * Unit.MB;
public static final String READ_AHEAD_QUEUE_SIZE = "fs.cosn.read.ahead.queue.size";
public static final int DEFAULT_READ_AHEAD_QUEUE_SIZE = 8;
public static final int DEFAULT_READ_AHEAD_QUEUE_SIZE = 6;
// used to control getFileStatus list to judge dir whether exist.
public static final String FILESTATUS_LIST_MAX_KEYS = "fs.cosn.filestatus.list_max_keys";
public static final int DEFAULT_FILESTATUS_LIST_MAX_KEYS = 1;
Expand Down
77 changes: 49 additions & 28 deletions src/main/java/org/apache/hadoop/fs/CosNFSInputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.Deque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
Expand Down Expand Up @@ -91,9 +91,9 @@ public long getEnd() {
private final Configuration conf;
private final NativeFileSystemStore store;
private final String key;
private long position = 0;
private long nextPos = 0;
private long lastByteStart = -1;
private long position;
private long nextPos;
private long lastByteStart;
private long fileSize;
private long partRemaining;
private long bufferStart;
Expand All @@ -105,7 +105,9 @@ public long getEnd() {
private final int socketErrMaxRetryTimes;

private final ExecutorService readAheadExecutorService;
private final Queue<ReadBuffer> readBufferQueue;
private final Deque<ReadBuffer> readBufferQueue;
// 设置一个 Previous buffer 用于暂存淘汰出来的队头元素,用以优化小范围随机读的性能
private ReadBuffer previousReadBuffer;

/**
* Input Stream
Expand All @@ -130,6 +132,9 @@ public CosNFSInputStream(
this.statistics = statistics;
this.key = key;
this.fileSize = fileSize;
this.position = 0;
this.nextPos = 0;
this.lastByteStart = -1;
this.bufferStart = -1;
this.bufferEnd = -1;
this.preReadPartSize = conf.getLong(
Expand All @@ -143,23 +148,15 @@ public CosNFSInputStream(
CosNConfigKeys.DEFAULT_CLIENT_SOCKET_ERROR_MAX_RETRIES);
this.readAheadExecutorService = readAheadExecutorService;
this.readBufferQueue =
new ArrayDeque<ReadBuffer>(this.maxReadPartNumber);
new ArrayDeque<>(this.maxReadPartNumber);
this.closed = false;
}

private synchronized void reopen(long pos) throws IOException {
long partSize = 0;

if (pos < 0) {
throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK);
} else if (pos > this.fileSize) {
throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
} else {
if (pos + this.preReadPartSize > this.fileSize) {
partSize = this.fileSize - pos;
} else {
partSize = this.preReadPartSize;
}
}

this.buffer = null;
Expand All @@ -170,20 +167,34 @@ private synchronized void reopen(long pos) throws IOException {
if (pos == this.nextPos) {
isRandomIO = false;
} else {
while (this.readBufferQueue.size() != 0) {
if (this.readBufferQueue.element().getStart() != pos) {
this.readBufferQueue.poll();
} else {
break;
}
// 发生了随机读,针对于小范围的回溯随机读,则直接看一下是否命中了前一次刚刚被淘汰出去的队头读缓存
// 如果不是,那么随机读只可能是发生了超出前一块范围的回溯随机读,或者是在预读队列范围或者是超出预读队列范围。
// 如果是在预读队列范围内,那么依赖在预读队列中查找直接定位到要读的块,如果是超出预读队列范围,那么队列会被排空,然后重新定位到要读的块和位置
if (null != this.previousReadBuffer && pos >= this.previousReadBuffer.getStart() && pos <= this.previousReadBuffer.getEnd()) {
this.buffer = this.previousReadBuffer.getBuffer();
this.bufferStart = this.previousReadBuffer.getStart();
this.bufferEnd = this.previousReadBuffer.getEnd();
this.position = pos;
this.partRemaining = (this.bufferEnd - this.bufferStart + 1) - (pos - this.bufferStart);
this.nextPos = !this.readBufferQueue.isEmpty() ? this.readBufferQueue.getFirst().getStart() : pos + this.preReadPartSize;
return;
}
}

this.nextPos = pos + partSize;
// 在预读队列里面定位到要读的块
while (!this.readBufferQueue.isEmpty()) {
if (pos < this.readBufferQueue.getFirst().getStart() || pos > this.readBufferQueue.getFirst().getEnd()) {
// 定位到要读的块,同时保存淘汰出来的队头元素,供小范围的随机读回溯
this.previousReadBuffer = this.readBufferQueue.poll();
} else {
break;
}
}
// 规整到队头的下一个元素的起始位置
this.nextPos = pos + this.preReadPartSize;

int currentBufferQueueSize = this.readBufferQueue.size();
if (currentBufferQueueSize == 0) {
this.lastByteStart = pos - partSize;
this.lastByteStart = pos - this.preReadPartSize;
} else {
ReadBuffer[] readBuffers =
this.readBufferQueue.toArray(new ReadBuffer[currentBufferQueueSize]);
Expand All @@ -193,12 +204,12 @@ private synchronized void reopen(long pos) throws IOException {

int maxLen = this.maxReadPartNumber - currentBufferQueueSize;
for (int i = 0; i < maxLen && i < (currentBufferQueueSize + 1) * 2; i++) {
if (this.lastByteStart + partSize * (i + 1) > this.fileSize) {
if (this.lastByteStart + this.preReadPartSize * (i + 1) > this.fileSize) {
break;
}

long byteStart = this.lastByteStart + partSize * (i + 1);
long byteEnd = byteStart + partSize - 1;
long byteStart = this.lastByteStart + this.preReadPartSize * (i + 1);
long byteEnd = byteStart + this.preReadPartSize - 1;
if (byteEnd >= this.fileSize) {
byteEnd = this.fileSize - 1;
}
Expand All @@ -218,7 +229,7 @@ private synchronized void reopen(long pos) throws IOException {
}
}

ReadBuffer readBuffer = this.readBufferQueue.poll();
ReadBuffer readBuffer = this.readBufferQueue.peek();
IOException innerException = null;
readBuffer.lock();
try {
Expand All @@ -245,7 +256,7 @@ private synchronized void reopen(long pos) throws IOException {
}

this.position = pos;
this.partRemaining = partSize;
this.partRemaining = (this.bufferEnd - this.bufferStart + 1) - (pos - this.bufferStart);
}

@Override
Expand All @@ -263,11 +274,21 @@ public void seek(long pos) throws IOException {
return;
}
if (pos >= this.bufferStart && pos <= this.bufferEnd) {
// 支持块内随机读
LOG.debug("seek cache hit last pos {}, pos {}, this buffer start {}, end {}",
this.position, pos, this.bufferStart, this.bufferEnd);
this.position = pos;
this.partRemaining = this.bufferEnd - pos + 1;
} else if (null != this.previousReadBuffer && pos >= this.previousReadBuffer.getStart() && pos <= this.previousReadBuffer.getEnd()) {
// 在上一次刚刚被淘汰的预读块中
this.position = pos;
this.partRemaining = -1; // 触发 reopen
} else if (!this.readBufferQueue.isEmpty() && pos >= this.readBufferQueue.getFirst().getStart() && pos <= this.readBufferQueue.getLast().getEnd()) {
// 在预读队列中
this.position = pos;
this.partRemaining = -1; // 触发 reopen
} else {
// 既不在预读队列中,也不在上一次刚刚被淘汰的预读块中,那么直接定位到要读的块和位置
this.position = pos;
this.partRemaining = -1;
}
Expand Down

0 comments on commit 429e663

Please sign in to comment.