Skip to content

Commit

Permalink
Merge pull request #43 from vintmd/retrieve-retry
Browse files Browse the repository at this point in the history
add the retrieve socket exception retry
  • Loading branch information
yuyang733 authored Jan 24, 2022
2 parents 4211625 + d220ce3 commit e3fc1f2
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 26 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.qcloud.cos</groupId>
<artifactId>hadoop-cos</artifactId>
<version>8.0.2</version>
<version>8.0.3</version>
<packaging>jar</packaging>

<name>Apache Hadoop Tencent Qcloud COS Support</name>
Expand Down
6 changes: 4 additions & 2 deletions src/main/java/org/apache/hadoop/fs/CosNConfigKeys.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
@InterfaceStability.Unstable
public class CosNConfigKeys extends CommonConfigurationKeys {
public static final String USER_AGENT = "fs.cosn.user.agent";
public static final String DEFAULT_USER_AGENT = "cos-hadoop-plugin-v8.0.2";
public static final String DEFAULT_USER_AGENT = "cos-hadoop-plugin-v8.0.3";

public static final String TENCENT_EMR_VERSION_KEY = "fs.emr.version";

Expand Down Expand Up @@ -61,6 +61,8 @@ public class CosNConfigKeys extends CommonConfigurationKeys {
public static final long DEFAULT_RETRY_INTERVAL = 3;
public static final String CLIENT_MAX_RETRIES_KEY = "fs.cosn.client.maxRetries";
public static final int DEFAULT_CLIENT_MAX_RETRIES = 5;
public static final String CLIENT_SOCKET_ERROR_MAX_RETRIES = "fs.cosn.socket.error.maxRetries";
public static final int DEFAULT_CLIENT_SOCKET_ERROR_MAX_RETRIES = 5;

public static final String UPLOAD_THREAD_POOL_SIZE_KEY = "fs.cosn.upload_thread_pool";
public static final int DEFAULT_UPLOAD_THREAD_POOL_SIZE = 10;
Expand All @@ -87,7 +89,7 @@ public class CosNConfigKeys extends CommonConfigurationKeys {

public static final String CUSTOMER_DOMAIN = "fs.cosn.customer.domain";
public static final String OPEN_CHECK_MERGE_BUCKET = "fs.cosn.check.merge.bucket";
public static final boolean DEFAULT_CHECK_MERGE_BUCKET = true;
public static final boolean DEFAULT_CHECK_MERGE_BUCKET = false;
public static final String MERGE_BUCKET_MAX_LIST_NUM = "fs.cosn.merge.bucket.max.list.num";
public static final int DEFAULT_MERGE_BUCKET_MAX_LIST_NUM = 5000;
public static final String NORMAL_BUCKET_MAX_LIST_NUM = "fs.cosn.normal.bucket.max.list.num";
Expand Down
8 changes: 6 additions & 2 deletions src/main/java/org/apache/hadoop/fs/CosNFSInputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ public long getEnd() {
private final int maxReadPartNumber;
private byte[] buffer;
private boolean closed = false;
private final int socketErrMaxRetryTimes;

private final ExecutorService readAheadExecutorService;
private final Queue<ReadBuffer> readBufferQueue;
Expand Down Expand Up @@ -136,6 +137,9 @@ public CosNFSInputStream(
this.maxReadPartNumber = conf.getInt(
CosNConfigKeys.READ_AHEAD_QUEUE_SIZE,
CosNConfigKeys.DEFAULT_READ_AHEAD_QUEUE_SIZE);
this.socketErrMaxRetryTimes = conf.getInt(
CosNConfigKeys.CLIENT_SOCKET_ERROR_MAX_RETRIES,
CosNConfigKeys.DEFAULT_CLIENT_SOCKET_ERROR_MAX_RETRIES);
this.readAheadExecutorService = readAheadExecutorService;
this.readBufferQueue =
new ArrayDeque<ReadBuffer>(this.maxReadPartNumber);
Expand Down Expand Up @@ -203,8 +207,8 @@ private synchronized void reopen(long pos) throws IOException {
readBuffer.setStatus(ReadBuffer.SUCCESS);
} else {
this.readAheadExecutorService.execute(
new CosNFileReadTask(
this.conf, this.key, this.store, readBuffer));
new CosNFileReadTask(this.conf, this.key, this.store,
readBuffer, this.socketErrMaxRetryTimes));
}

this.readBufferQueue.add(readBuffer);
Expand Down
92 changes: 71 additions & 21 deletions src/main/java/org/apache/hadoop/fs/CosNFileReadTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,17 @@

import java.io.IOException;
import java.io.InputStream;
import java.net.SocketException;
import java.util.concurrent.ThreadLocalRandom;

public class CosNFileReadTask implements Runnable {
static final Logger LOG = LoggerFactory.getLogger(CosNFileReadTask.class);

private final Configuration conf;
private final String key;
private final NativeFileSystemStore store;
private final CosNFSInputStream.ReadBuffer readBuffer;
private final int socketErrMaxRetryTimes;

/**
* cos file read task
Expand All @@ -24,40 +28,86 @@ public class CosNFileReadTask implements Runnable {
*/
public CosNFileReadTask(Configuration conf, String key,
NativeFileSystemStore store,
CosNFSInputStream.ReadBuffer readBuffer) {
CosNFSInputStream.ReadBuffer readBuffer,
int socketErrMaxRetryTimes) {
this.conf = conf;
this.key = key;
this.store = store;
this.readBuffer = readBuffer;
this.socketErrMaxRetryTimes = socketErrMaxRetryTimes;
}

@Override
public void run() {
try {
this.readBuffer.lock();
try {
InputStream inputStream = this.store.retrieveBlock(
this.key, this.readBuffer.getStart(),
this.readBuffer.getEnd());
IOUtils.readFully(
inputStream, this.readBuffer.getBuffer(), 0,
readBuffer.getBuffer().length);
int readEof = inputStream.read();
if (readEof != -1) {
LOG.error("Expect to read the eof, but the return is not -1. key: {}.", this.key);
int retryIndex = 1;
boolean needRetry = false;
while (true) {
try {
this.retrieveBlock();
needRetry = false;
} catch (SocketException se) {
// if we get stream success, but exceptions occurs when read cos input stream
String errMsg = String.format("retrieve block sdk socket failed, " +
"retryIndex: [%d / %d], key: %s, range: [%d , %d], exception: %s",
retryIndex, this.socketErrMaxRetryTimes, this.key,
this.readBuffer.getStart(), this.readBuffer.getEnd(), se.toString());
if (retryIndex <= this.socketErrMaxRetryTimes) {
LOG.info(errMsg, se);
long sleepLeast = retryIndex * 300L;
long sleepBound = retryIndex * 500L;
try {
Thread.sleep(ThreadLocalRandom.current().
nextLong(sleepLeast, sleepBound));
++retryIndex;
needRetry = true;
} catch (InterruptedException ie) {
this.setFailResult(errMsg, new IOException(ie.toString()));
break;
}
} else {
this.setFailResult(errMsg, se);
break;
}
} catch (IOException e) {
String errMsg = String.format("retrieve block sdk socket failed, " +
"retryIndex: [%d / %d], key: %s, range: [%d , %d], exception: %s",
retryIndex, this.socketErrMaxRetryTimes, this.key,
this.readBuffer.getStart(), this.readBuffer.getEnd(), e.toString());
this.setFailResult(errMsg, e);
break;
}
inputStream.close();
this.readBuffer.setStatus(CosNFSInputStream.ReadBuffer.SUCCESS);
} catch (IOException e) {
this.readBuffer.setStatus(CosNFSInputStream.ReadBuffer.ERROR);
this.readBuffer.setException(e);
LOG.error("Exception occurs when retrieve the block range " +
"start: "
+ String.valueOf(this.readBuffer.getStart()) + " " +
"end: " + this.readBuffer.getEnd(), e);
}

if (!needRetry) {
break;
}
} // end of retry
this.readBuffer.signalAll();
} finally {
this.readBuffer.unLock();
}
}

public void setFailResult(String msg, IOException e) {
this.readBuffer.setStatus(CosNFSInputStream.ReadBuffer.ERROR);
this.readBuffer.setException(e);
LOG.error(msg);
}

// not thread safe
public void retrieveBlock() throws IOException {
InputStream inputStream = this.store.retrieveBlock(
this.key, this.readBuffer.getStart(),
this.readBuffer.getEnd());
IOUtils.readFully(
inputStream, this.readBuffer.getBuffer(), 0,
readBuffer.getBuffer().length);
int readEof = inputStream.read();
if (readEof != -1) {
LOG.error("Expect to read the eof, but the return is not -1. key: {}.", this.key);
}
inputStream.close();
this.readBuffer.setStatus(CosNFSInputStream.ReadBuffer.SUCCESS);
}
}

0 comments on commit e3fc1f2

Please sign in to comment.