Skip to content

Commit

Permalink
add the 503 mpu retry, support get client from native store and fix t…
Browse files Browse the repository at this point in the history
…he bug of deploy script
  • Loading branch information
vintmd committed May 13, 2022
1 parent 3288399 commit 052a511
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 5 deletions.
2 changes: 1 addition & 1 deletion deploy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ do
# 外部maven 中央仓库
deploy_repository_id="oss"
deploy_repository_url="https://oss.sonatype.org/service/local/staging/deploy/maven2"
elif ["$OPT" = "$INTER" ]; then
elif [ "$OPT" = "$INTER" ]; then
deploy_repository_id="cos-inner-maven-repository"
deploy_repository_url="http://mirrors.tencent.com/repository/maven/QCLOUD_COS"
fi
Expand Down
30 changes: 26 additions & 4 deletions src/main/java/org/apache/hadoop/fs/CosNativeFileSystemStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -1437,17 +1437,15 @@ private <X> Object callCOSClientWithRetry(X request) throws CosServiceException,
String errorCode = cse.getErrorCode();
LOG.debug("fail to retry statusCode {}, errorCode {}", statusCode, errorCode);
// 对5xx错误进行重试
if (request instanceof CopyObjectRequest && statusCode / 100 == 2
&& errorCode != null && !errorCode.isEmpty()) {
if (request instanceof CopyObjectRequest && hasErrorCode(statusCode, errorCode)) {
if (retryIndex <= this.maxRetryTimes) {
LOG.info(errMsg, cse);
++retryIndex;
} else {
LOG.error(errMsg, cse);
throw new IOException(errMsg);
}
} else if (request instanceof CompleteMultipartUploadRequest && statusCode / 100 ==2
&& errorCode != null && !errorCode.isEmpty()) {
} else if (request instanceof CompleteMultipartUploadRequest && hasErrorCode(statusCode, errorCode)) {
// complete mpu error code might be in body when status code is 200
// double check to head object only works in big data job case which key is not same.
String key = ((CompleteMultipartUploadRequest) request).getKey();
Expand Down Expand Up @@ -1507,6 +1505,21 @@ private <X> Object callCOSClientWithRetry(X request) throws CosServiceException,
throw cse;
}
}

// mpu might occur 503 access time out but already completed,
// if direct retry may occur 403 not found the upload id.
if (request instanceof CompleteMultipartUploadRequest && statusCode == 503) {
String key = ((CompleteMultipartUploadRequest) request).getKey();
FileMetadata fileMetadata = this.queryObjectMetadata(key);
if (null != fileMetadata) {
// if file exist direct return.
LOG.info("complete mpu error might access time out, " +
"but key {} already exist, length {}",
key, fileMetadata.getLength());
return new CompleteMultipartUploadResult();
}
}

Thread.sleep(
ThreadLocalRandom.current().nextLong(sleepLeast, sleepBound));
++retryIndex;
Expand All @@ -1529,6 +1542,15 @@ private <X> Object callCOSClientWithRetry(X request) throws CosServiceException,
private static String ensureValidAttributeName(String attributeName) {
return attributeName.replace('.', '-').toLowerCase();
}

private boolean hasErrorCode(int statusCode, String errCode) {
return statusCode / 100 == 2 && errCode != null && !errCode.isEmpty();
}

public COSClient getCOSClient(){
return this.cosClient;
}

private String getPluginVersionInfo() {
Properties versionProperties = new Properties();
InputStream inputStream= null;
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/org/apache/hadoop/fs/NativeFileSystemStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import com.qcloud.cos.COSClient;

import java.io.File;
import java.io.IOException;
Expand All @@ -24,6 +25,9 @@ public interface NativeFileSystemStore {

void initialize(URI uri, Configuration conf) throws IOException;

// must init first
COSClient getCOSClient();

HeadBucketResult headBucket(String bucketName) throws IOException;

void storeFile(String key, File file, byte[] md5Hash) throws IOException;
Expand Down

0 comments on commit 052a511

Please sign in to comment.