diff --git a/pom.xml b/pom.xml index 19d28250..35110f27 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.qcloud.cos hadoop-cos - 8.0.2 + 8.0.3 jar Apache Hadoop Tencent Qcloud COS Support diff --git a/src/main/java/org/apache/hadoop/fs/CosNConfigKeys.java b/src/main/java/org/apache/hadoop/fs/CosNConfigKeys.java index 1a7c09d2..81d572ae 100644 --- a/src/main/java/org/apache/hadoop/fs/CosNConfigKeys.java +++ b/src/main/java/org/apache/hadoop/fs/CosNConfigKeys.java @@ -12,7 +12,7 @@ @InterfaceStability.Unstable public class CosNConfigKeys extends CommonConfigurationKeys { public static final String USER_AGENT = "fs.cosn.user.agent"; - public static final String DEFAULT_USER_AGENT = "cos-hadoop-plugin-v8.0.2"; + public static final String DEFAULT_USER_AGENT = "cos-hadoop-plugin-v8.0.3"; public static final String TENCENT_EMR_VERSION_KEY = "fs.emr.version"; @@ -61,6 +61,8 @@ public class CosNConfigKeys extends CommonConfigurationKeys { public static final long DEFAULT_RETRY_INTERVAL = 3; public static final String CLIENT_MAX_RETRIES_KEY = "fs.cosn.client.maxRetries"; public static final int DEFAULT_CLIENT_MAX_RETRIES = 5; + public static final String CLIENT_SOCKET_ERROR_MAX_RETRIES = "fs.cosn.socket.error.maxRetries"; + public static final int DEFAULT_CLIENT_SOCKET_ERROR_MAX_RETRIES = 5; public static final String UPLOAD_THREAD_POOL_SIZE_KEY = "fs.cosn.upload_thread_pool"; public static final int DEFAULT_UPLOAD_THREAD_POOL_SIZE = 10; @@ -87,7 +89,7 @@ public class CosNConfigKeys extends CommonConfigurationKeys { public static final String CUSTOMER_DOMAIN = "fs.cosn.customer.domain"; public static final String OPEN_CHECK_MERGE_BUCKET = "fs.cosn.check.merge.bucket"; - public static final boolean DEFAULT_CHECK_MERGE_BUCKET = true; + public static final boolean DEFAULT_CHECK_MERGE_BUCKET = false; public static final String MERGE_BUCKET_MAX_LIST_NUM = "fs.cosn.merge.bucket.max.list.num"; public static final int DEFAULT_MERGE_BUCKET_MAX_LIST_NUM = 5000; public static final String NORMAL_BUCKET_MAX_LIST_NUM = "fs.cosn.normal.bucket.max.list.num"; diff --git a/src/main/java/org/apache/hadoop/fs/CosNFSInputStream.java b/src/main/java/org/apache/hadoop/fs/CosNFSInputStream.java index 25462817..e7cfaf26 100644 --- a/src/main/java/org/apache/hadoop/fs/CosNFSInputStream.java +++ b/src/main/java/org/apache/hadoop/fs/CosNFSInputStream.java @@ -101,6 +101,7 @@ public long getEnd() { private final int maxReadPartNumber; private byte[] buffer; private boolean closed = false; + private final int socketErrMaxRetryTimes; private final ExecutorService readAheadExecutorService; private final Queue readBufferQueue; @@ -136,6 +137,9 @@ public CosNFSInputStream( this.maxReadPartNumber = conf.getInt( CosNConfigKeys.READ_AHEAD_QUEUE_SIZE, CosNConfigKeys.DEFAULT_READ_AHEAD_QUEUE_SIZE); + this.socketErrMaxRetryTimes = conf.getInt( + CosNConfigKeys.CLIENT_SOCKET_ERROR_MAX_RETRIES, + CosNConfigKeys.DEFAULT_CLIENT_SOCKET_ERROR_MAX_RETRIES); this.readAheadExecutorService = readAheadExecutorService; this.readBufferQueue = new ArrayDeque(this.maxReadPartNumber); @@ -203,8 +207,8 @@ private synchronized void reopen(long pos) throws IOException { readBuffer.setStatus(ReadBuffer.SUCCESS); } else { this.readAheadExecutorService.execute( - new CosNFileReadTask( - this.conf, this.key, this.store, readBuffer)); + new CosNFileReadTask(this.conf, this.key, this.store, + readBuffer, this.socketErrMaxRetryTimes)); } this.readBufferQueue.add(readBuffer); diff --git a/src/main/java/org/apache/hadoop/fs/CosNFileReadTask.java b/src/main/java/org/apache/hadoop/fs/CosNFileReadTask.java index d068e366..ae89bff1 100644 --- a/src/main/java/org/apache/hadoop/fs/CosNFileReadTask.java +++ b/src/main/java/org/apache/hadoop/fs/CosNFileReadTask.java @@ -7,13 +7,17 @@ import java.io.IOException; import java.io.InputStream; +import java.net.SocketException; +import java.util.concurrent.ThreadLocalRandom; public class CosNFileReadTask implements Runnable { static final Logger LOG = LoggerFactory.getLogger(CosNFileReadTask.class); + private final Configuration conf; private final String key; private final NativeFileSystemStore store; private final CosNFSInputStream.ReadBuffer readBuffer; + private final int socketErrMaxRetryTimes; /** * cos file read task @@ -24,40 +28,86 @@ public class CosNFileReadTask implements Runnable { */ public CosNFileReadTask(Configuration conf, String key, NativeFileSystemStore store, - CosNFSInputStream.ReadBuffer readBuffer) { + CosNFSInputStream.ReadBuffer readBuffer, + int socketErrMaxRetryTimes) { + this.conf = conf; this.key = key; this.store = store; this.readBuffer = readBuffer; + this.socketErrMaxRetryTimes = socketErrMaxRetryTimes; } @Override public void run() { try { this.readBuffer.lock(); - try { - InputStream inputStream = this.store.retrieveBlock( - this.key, this.readBuffer.getStart(), - this.readBuffer.getEnd()); - IOUtils.readFully( - inputStream, this.readBuffer.getBuffer(), 0, - readBuffer.getBuffer().length); - int readEof = inputStream.read(); - if (readEof != -1) { - LOG.error("Expect to read the eof, but the return is not -1. key: {}.", this.key); + int retryIndex = 1; + boolean needRetry = false; + while (true) { + try { + this.retrieveBlock(); + needRetry = false; + } catch (SocketException se) { + // if we get stream success, but exceptions occurs when read cos input stream + String errMsg = String.format("retrieve block sdk socket failed, " + + "retryIndex: [%d / %d], key: %s, range: [%d , %d], exception: %s", + retryIndex, this.socketErrMaxRetryTimes, this.key, + this.readBuffer.getStart(), this.readBuffer.getEnd(), se.toString()); + if (retryIndex <= this.socketErrMaxRetryTimes) { + LOG.info(errMsg, se); + long sleepLeast = retryIndex * 300L; + long sleepBound = retryIndex * 500L; + try { + Thread.sleep(ThreadLocalRandom.current(). + nextLong(sleepLeast, sleepBound)); + ++retryIndex; + needRetry = true; + } catch (InterruptedException ie) { + this.setFailResult(errMsg, new IOException(ie.toString())); + break; + } + } else { + this.setFailResult(errMsg, se); + break; + } + } catch (IOException e) { + String errMsg = String.format("retrieve block sdk socket failed, " + + "retryIndex: [%d / %d], key: %s, range: [%d , %d], exception: %s", + retryIndex, this.socketErrMaxRetryTimes, this.key, + this.readBuffer.getStart(), this.readBuffer.getEnd(), e.toString()); + this.setFailResult(errMsg, e); + break; } - inputStream.close(); - this.readBuffer.setStatus(CosNFSInputStream.ReadBuffer.SUCCESS); - } catch (IOException e) { - this.readBuffer.setStatus(CosNFSInputStream.ReadBuffer.ERROR); - this.readBuffer.setException(e); - LOG.error("Exception occurs when retrieve the block range " + - "start: " - + String.valueOf(this.readBuffer.getStart()) + " " + - "end: " + this.readBuffer.getEnd(), e); - } + + if (!needRetry) { + break; + } + } // end of retry this.readBuffer.signalAll(); } finally { this.readBuffer.unLock(); } } + + public void setFailResult(String msg, IOException e) { + this.readBuffer.setStatus(CosNFSInputStream.ReadBuffer.ERROR); + this.readBuffer.setException(e); + LOG.error(msg); + } + + // not thread safe + public void retrieveBlock() throws IOException { + InputStream inputStream = this.store.retrieveBlock( + this.key, this.readBuffer.getStart(), + this.readBuffer.getEnd()); + IOUtils.readFully( + inputStream, this.readBuffer.getBuffer(), 0, + readBuffer.getBuffer().length); + int readEof = inputStream.read(); + if (readEof != -1) { + LOG.error("Expect to read the eof, but the return is not -1. key: {}.", this.key); + } + inputStream.close(); + this.readBuffer.setStatus(CosNFSInputStream.ReadBuffer.SUCCESS); + } }