Skip to content

Commit

Permalink
fix: 优化预读时流关闭的日志 (#141)
Browse files Browse the repository at this point in the history
  • Loading branch information
suninsky authored May 11, 2024
1 parent 529a883 commit e63bae5
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 14 deletions.
12 changes: 11 additions & 1 deletion src/main/java/org/apache/hadoop/fs/CosNFSInputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import javax.annotation.Nullable;


public class CosNFSInputStream extends FSInputStream {
public static final Logger LOG =
Expand Down Expand Up @@ -61,8 +64,13 @@ public void signalAll() {
readyCondition.signalAll();
}

@Nullable
public byte[] getBuffer() {
return this.memory.array();
final MemoryAllocator.Memory finalMemory = memory;
if (finalMemory == null) {
return null;
}
return finalMemory.array();
}

public int length() {
Expand Down Expand Up @@ -358,6 +366,7 @@ public int read() throws IOException {
int byteRead = -1;
if (this.partRemaining != 0) {
byte[] buffer = currentReadBuffer.getBuffer();
Objects.requireNonNull(buffer);
byteRead = buffer[(int) (buffer.length - this.partRemaining)] & 0xff;
}
if (byteRead >= 0) {
Expand Down Expand Up @@ -391,6 +400,7 @@ public int read(byte[] b, int off, int len) throws IOException {

int bytes = 0;
byte[] buffer = currentReadBuffer.getBuffer();
Objects.requireNonNull(buffer);
for (int i = buffer.length - (int) partRemaining;
i < buffer.length; i++) {
b[off + bytesRead] = buffer[i];
Expand Down
50 changes: 37 additions & 13 deletions src/main/java/org/apache/hadoop/fs/CosNFileReadTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@
import java.io.IOException;
import java.io.InputStream;
import java.net.SocketException;
import java.util.Objects;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.apache.hadoop.fs.CosNConfigKeys.DEFAULT_READ_BUFFER_ALLOCATE_TIMEOUT_SECONDS;

public class CosNFileReadTask implements Runnable {
static final Logger LOG = LoggerFactory.getLogger(CosNFileReadTask.class);

Expand Down Expand Up @@ -59,14 +62,11 @@ public void run() {
currentThread.setContextClassLoader(this.getClass().getClassLoader());
try {
this.readBuffer.lock();
if (closed.get()) {
this.setFailResult("the input stream has been canceled.", new IOException("the input stream has been canceled."));
return;
}
checkStreamClosed();
try {
this.readBuffer.allocate(
conf.getInt(CosNConfigKeys.COSN_READ_BUFFER_ALLOCATE_TIMEOUT_SECONDS, 5),
TimeUnit.SECONDS);
conf.getLong(CosNConfigKeys.COSN_READ_BUFFER_ALLOCATE_TIMEOUT_SECONDS,
DEFAULT_READ_BUFFER_ALLOCATE_TIMEOUT_SECONDS), TimeUnit.SECONDS);
} catch (Exception e) {
this.setFailResult("allocate read buffer failed.", new IOException(e));
return;
Expand Down Expand Up @@ -107,15 +107,17 @@ public void run() {
this.readBuffer.getStart(), this.readBuffer.getEnd(), ioException);
this.setFailResult(errMsg, ioException);
break;
} catch (Throwable throwable) {
this.setFailResult("retrieve block failed", new IOException(throwable));
break;
}

if (!needRetry) {
break;
}
} // end of retry
} catch (Throwable throwable) {
this.setFailResult(
String.format("retrieve block failed, key: %s, range: [%d , %d], exception: %s",
this.key, this.readBuffer.getStart(), this.readBuffer.getEnd(), throwable),
new IOException(throwable));
} finally {
this.readBuffer.signalAll();
this.readBuffer.unLock();
Expand All @@ -125,22 +127,44 @@ public void run() {
public void setFailResult(String msg, IOException e) {
this.readBuffer.setStatus(CosNFSInputStream.ReadBuffer.ERROR);
this.readBuffer.setException(e);
LOG.error(msg);
if (e.getCause() != null && e.getCause() instanceof CancelledException) {
// 预期操作,以warn级别导出
LOG.warn(msg);
} else {
LOG.error(msg);
}
}

// not thread safe
public void retrieveBlock() throws IOException {
private void retrieveBlock() throws IOException, CancelledException {
byte[] dataBuf = readBuffer.getBuffer();
checkStreamClosed();
Objects.requireNonNull(dataBuf);
InputStream inputStream = this.store.retrieveBlock(
this.key, this.readBuffer.getStart(),
this.readBuffer.getEnd());
IOUtils.readFully(
inputStream, this.readBuffer.getBuffer(), 0,
readBuffer.getBuffer().length);
inputStream, dataBuf, 0,
dataBuf.length);
int readEof = inputStream.read();
if (readEof != -1) {
LOG.error("Expect to read the eof, but the return is not -1. key: {}.", this.key);
}
inputStream.close();
this.readBuffer.setStatus(CosNFSInputStream.ReadBuffer.SUCCESS);
}

private void checkStreamClosed() throws CancelledException {
if (closed.get()) {
throw new CancelledException("the input stream has been canceled.");
}
}


private static class CancelledException extends Exception {
public CancelledException(String message) {
super(message);
}
}
}

0 comments on commit e63bae5

Please sign in to comment.