From 052a511dbeb3210f1536ccc41e79599e5bba08f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?alantong=28=E4=BD=9F=E6=98=8E=E8=BE=BE=29?= Date: Fri, 13 May 2022 09:46:14 +0800 Subject: [PATCH] add the 503 mpu retry, support get client from native store and fix the bug of deploy script --- deploy.sh | 2 +- .../hadoop/fs/CosNativeFileSystemStore.java | 30 ++++++++++++++++--- .../hadoop/fs/NativeFileSystemStore.java | 4 +++ 3 files changed, 31 insertions(+), 5 deletions(-) diff --git a/deploy.sh b/deploy.sh index 8e6f8a27..5670f6b2 100644 --- a/deploy.sh +++ b/deploy.sh @@ -21,7 +21,7 @@ do # 外部maven 中央仓库 deploy_repository_id="oss" deploy_repository_url="https://oss.sonatype.org/service/local/staging/deploy/maven2" - elif ["$OPT" = "$INTER" ]; then + elif [ "$OPT" = "$INTER" ]; then deploy_repository_id="cos-inner-maven-repository" deploy_repository_url="http://mirrors.tencent.com/repository/maven/QCLOUD_COS" fi diff --git a/src/main/java/org/apache/hadoop/fs/CosNativeFileSystemStore.java b/src/main/java/org/apache/hadoop/fs/CosNativeFileSystemStore.java index e3fef690..838bf949 100644 --- a/src/main/java/org/apache/hadoop/fs/CosNativeFileSystemStore.java +++ b/src/main/java/org/apache/hadoop/fs/CosNativeFileSystemStore.java @@ -1437,8 +1437,7 @@ private Object callCOSClientWithRetry(X request) throws CosServiceException, String errorCode = cse.getErrorCode(); LOG.debug("fail to retry statusCode {}, errorCode {}", statusCode, errorCode); // 对5xx错误进行重试 - if (request instanceof CopyObjectRequest && statusCode / 100 == 2 - && errorCode != null && !errorCode.isEmpty()) { + if (request instanceof CopyObjectRequest && hasErrorCode(statusCode, errorCode)) { if (retryIndex <= this.maxRetryTimes) { LOG.info(errMsg, cse); ++retryIndex; @@ -1446,8 +1445,7 @@ private Object callCOSClientWithRetry(X request) throws CosServiceException, LOG.error(errMsg, cse); throw new IOException(errMsg); } - } else if (request instanceof CompleteMultipartUploadRequest && statusCode / 100 ==2 - && errorCode != null && !errorCode.isEmpty()) { + } else if (request instanceof CompleteMultipartUploadRequest && hasErrorCode(statusCode, errorCode)) { // complete mpu error code might be in body when status code is 200 // double check to head object only works in big data job case which key is not same. String key = ((CompleteMultipartUploadRequest) request).getKey(); @@ -1507,6 +1505,21 @@ private Object callCOSClientWithRetry(X request) throws CosServiceException, throw cse; } } + + // mpu might occur 503 access time out but already completed, + // if direct retry may occur 403 not found the upload id. + if (request instanceof CompleteMultipartUploadRequest && statusCode == 503) { + String key = ((CompleteMultipartUploadRequest) request).getKey(); + FileMetadata fileMetadata = this.queryObjectMetadata(key); + if (null != fileMetadata) { + // if file exist direct return. + LOG.info("complete mpu error might access time out, " + + "but key {} already exist, length {}", + key, fileMetadata.getLength()); + return new CompleteMultipartUploadResult(); + } + } + Thread.sleep( ThreadLocalRandom.current().nextLong(sleepLeast, sleepBound)); ++retryIndex; @@ -1529,6 +1542,15 @@ private Object callCOSClientWithRetry(X request) throws CosServiceException, private static String ensureValidAttributeName(String attributeName) { return attributeName.replace('.', '-').toLowerCase(); } + + private boolean hasErrorCode(int statusCode, String errCode) { + return statusCode / 100 == 2 && errCode != null && !errCode.isEmpty(); + } + + public COSClient getCOSClient(){ + return this.cosClient; + } + private String getPluginVersionInfo() { Properties versionProperties = new Properties(); InputStream inputStream= null; diff --git a/src/main/java/org/apache/hadoop/fs/NativeFileSystemStore.java b/src/main/java/org/apache/hadoop/fs/NativeFileSystemStore.java index 237ed1a3..49511814 100644 --- a/src/main/java/org/apache/hadoop/fs/NativeFileSystemStore.java +++ b/src/main/java/org/apache/hadoop/fs/NativeFileSystemStore.java @@ -6,6 +6,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import com.qcloud.cos.COSClient; import java.io.File; import java.io.IOException; @@ -24,6 +25,9 @@ public interface NativeFileSystemStore { void initialize(URI uri, Configuration conf) throws IOException; + // must init first + COSClient getCOSClient(); + HeadBucketResult headBucket(String bucketName) throws IOException; void storeFile(String key, File file, byte[] md5Hash) throws IOException;