From 789b652e82a6dc41f6b29b57733a6d5a4a5cfc72 Mon Sep 17 00:00:00 2001 From: vintmd <61688729+vintmd@users.noreply.github.com> Date: Tue, 31 Jan 2023 20:24:50 +0800 Subject: [PATCH] add list part to double check upload part conflict (#102) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit fix import Co-authored-by: alantong(佟明达) --- .../org/apache/hadoop/fs/CosNConfigKeys.java | 3 + .../hadoop/fs/CosNativeFileSystemStore.java | 84 +++++++++++++++++-- .../hadoop/fs/NativeFileSystemStore.java | 4 + .../hadoop/fs/cosn/CosNPartListing.java | 33 ++++++++ 4 files changed, 118 insertions(+), 6 deletions(-) create mode 100644 src/main/java/org/apache/hadoop/fs/cosn/CosNPartListing.java diff --git a/src/main/java/org/apache/hadoop/fs/CosNConfigKeys.java b/src/main/java/org/apache/hadoop/fs/CosNConfigKeys.java index c7023677..8e92e9ef 100644 --- a/src/main/java/org/apache/hadoop/fs/CosNConfigKeys.java +++ b/src/main/java/org/apache/hadoop/fs/CosNConfigKeys.java @@ -229,4 +229,7 @@ public class CosNConfigKeys extends CommonConfigurationKeys { public static final String COSN_FILESTATUS_LIST_OP_ENABLED = "fs.cosn.filestatus.list.op.enabled"; public static final boolean DEFAULT_FILESTATUS_LIST_OP_ENABLED = true; + + public static final String COSN_PART_CONFLICT_CHECK_ENABLED = "fs.cosn.part.conflict.check.enabled"; + public static final boolean DEFAULT_COSN_PART_CONFLICT_CHECK_ENABLED = true; } diff --git a/src/main/java/org/apache/hadoop/fs/CosNativeFileSystemStore.java b/src/main/java/org/apache/hadoop/fs/CosNativeFileSystemStore.java index f586c35a..e9c42e35 100644 --- a/src/main/java/org/apache/hadoop/fs/CosNativeFileSystemStore.java +++ b/src/main/java/org/apache/hadoop/fs/CosNativeFileSystemStore.java @@ -46,6 +46,9 @@ import com.qcloud.cos.model.StorageClass; import com.qcloud.cos.model.UploadPartRequest; import com.qcloud.cos.model.UploadPartResult; +import com.qcloud.cos.model.ListPartsRequest; +import com.qcloud.cos.model.PartSummary; +import com.qcloud.cos.model.PartListing; import com.qcloud.cos.region.Region; import com.qcloud.cos.utils.Base64; import com.qcloud.cos.utils.IOUtils; @@ -60,6 +63,7 @@ import org.apache.hadoop.fs.cosn.CustomerDomainEndpointResolver; import org.apache.hadoop.fs.cosn.ResettableFileInputStream; import org.apache.hadoop.fs.cosn.TencentCloudL5EndpointResolver; +import org.apache.hadoop.fs.cosn.CosNPartListing; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -77,6 +81,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.LinkedList; import java.util.concurrent.ThreadLocalRandom; @InterfaceAudience.Private @@ -102,6 +107,7 @@ public class CosNativeFileSystemStore implements NativeFileSystemStore { private int trafficLimit; private boolean crc32cEnabled; private boolean completeMPUCheckEnabled; + private boolean partConflictCheckEnabled; private long partSize; private boolean clientEncryptionEnabled; private CosNEncryptionSecrets encryptionSecrets; @@ -215,6 +221,8 @@ private void initCOSClient(URI uri, Configuration conf) throws IOException { CosNConfigKeys.DEFAULT_CRC32C_CHECKSUM_ENABLED); this.completeMPUCheckEnabled = conf.getBoolean(CosNConfigKeys.COSN_COMPLETE_MPU_CHECK, CosNConfigKeys.DEFAULT_COSN_COMPLETE_MPU_CHECK_ENABLE); + this.partConflictCheckEnabled = conf.getBoolean(CosNConfigKeys.COSN_PART_CONFLICT_CHECK_ENABLED, + CosNConfigKeys.DEFAULT_COSN_PART_CONFLICT_CHECK_ENABLED); this.clientEncryptionEnabled = conf.getBoolean(CosNConfigKeys.COSN_CLIENT_SIDE_ENCRYPTION_ENABLED, CosNConfigKeys.DEFAULT_COSN_CLIENT_SIDE_ENCRYPTION_ENABLED); @@ -522,9 +530,9 @@ public PartETag uploadPart(File file, String key, String uploadId, } @Override - public PartETag uploadPart( - InputStream inputStream, - String key, String uploadId, int partNum, long partSize, byte[] md5Hash, Boolean isLastPart) throws IOException { + public PartETag uploadPart(InputStream inputStream, String key, String uploadId, + int partNum, long partSize, byte[] md5Hash, + Boolean isLastPart) throws IOException { LOG.debug("Upload the part to the cos key [{}]. upload id: {}, part number: {}, part size: {}", key, uploadId, partNum, partSize); ObjectMetadata objectMetadata = new ObjectMetadata(); @@ -558,14 +566,37 @@ public PartETag uploadPart( UploadPartResult uploadPartResult = (UploadPartResult) callCOSClientWithRetry(uploadPartRequest); return uploadPartResult.getPartETag(); + } catch (CosServiceException cse) { + String errMsg = String.format("The current thread:%d, " + + "cos key: %s, upload id: %s, part num: %d, exception: %s", + Thread.currentThread().getId(), key, uploadId, partNum, cse); + if (!this.partConflictCheckEnabled) { + handleException(new Exception(errMsg), key); + } else { + int statusCode = cse.getStatusCode(); + if (409 == statusCode) { + // conflict upload same upload part, because of nginx syn retry, + // sometimes 31s reconnect to cgi, but client side default 30s timeout, + // which may cause the retry request and previous request arrive at cgi at same time. + // so we for now use list parts to double-check this part whether exist. + CosNPartListing partListing = listParts(key, uploadId); + PartETag partETag = isPartExist(partListing, partNum, partSize); + if (null == partETag) { + handleException(new Exception(errMsg), key); + } + LOG.warn("Upload the file [{}] uploadId [{}], part [{}] concurrently." + + key, uploadId, partNum); + return partETag; + } else { + handleException(new Exception(errMsg), key); + } + } } catch (Exception e) { String errMsg = String.format("The current thread:%d, " - + "cos key: %s, upload id: %s, part num: %d, " + - "exception: %s", + + "cos key: %s, upload id: %s, part num: %d, exception: %s", Thread.currentThread().getId(), key, uploadId, partNum, e); handleException(new Exception(errMsg), key); } - return null; } @@ -1497,6 +1528,44 @@ public long getFileLength(String key) throws IOException { } } + @Override + public CosNPartListing listParts(String key, String uploadId) throws IOException { + LOG.debug("List parts key: {}, uploadId: {}", key, uploadId); + ListPartsRequest listPartsRequest = new ListPartsRequest(bucketName, key, uploadId); + PartListing partListing = null; + List partSummaries = new LinkedList<>(); + do { + try { + partListing = (PartListing) callCOSClientWithRetry(listPartsRequest); + } catch (Exception e) { + String errMsg = String.format("list parts occurs an exception, " + + "cos key: %s, upload id: %s, exception: %s", + key, uploadId, e.getMessage()); + LOG.error(errMsg); + handleException(new Exception(errMsg), key); + return null; + } + partSummaries.addAll(partListing.getParts()); + listPartsRequest.setPartNumberMarker(partListing.getNextPartNumberMarker()); + } while (partListing.isTruncated()); + return new CosNPartListing(partSummaries); + } + + private PartETag isPartExist(CosNPartListing partListing, int partNum, long partSize) { + PartETag ret = null; + if (null == partListing) { + return null; + } + for (PartSummary partSummary : partListing.getPartSummaries()){ + // for now only check number and size. + if (partSummary.getPartNumber() == partNum && partSummary.getSize() == partSize) { + ret = new PartETag(partSummary.getPartNumber(), partSummary.getETag()); + break; + } + } + return ret; + } + private void callCOSClientWithSSEKMS(X request, SSECOSKeyManagementParams managementParams) { try { if (request instanceof PutObjectRequest) { @@ -1688,6 +1757,9 @@ private Object callCOSClientWithRetry(X request) throws CosServiceException, } else if (request instanceof GetSymlinkRequest) { sdkMethod = "getSymlink"; return this.cosClient.getSymlink((GetSymlinkRequest) request); + } else if (request instanceof ListPartsRequest) { + sdkMethod = "listParts"; + return this.cosClient.listParts((ListPartsRequest) request); } else { throw new IOException("no such method"); } diff --git a/src/main/java/org/apache/hadoop/fs/NativeFileSystemStore.java b/src/main/java/org/apache/hadoop/fs/NativeFileSystemStore.java index f3d7bc1f..f04d96cf 100644 --- a/src/main/java/org/apache/hadoop/fs/NativeFileSystemStore.java +++ b/src/main/java/org/apache/hadoop/fs/NativeFileSystemStore.java @@ -3,10 +3,12 @@ import com.qcloud.cos.model.CompleteMultipartUploadResult; import com.qcloud.cos.model.HeadBucketResult; import com.qcloud.cos.model.PartETag; +import com.qcloud.cos.model.PartListing; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import com.qcloud.cos.COSClient; +import org.apache.hadoop.fs.cosn.CosNPartListing; import java.io.File; import java.io.IOException; @@ -117,6 +119,8 @@ CosNPartialListing list(String prefix, int maxListingLength, String getSymlink(String symlink) throws IOException; + CosNPartListing listParts(String key, String uploadId) throws IOException; + /** * Delete all keys with the given prefix. Used for testing. * diff --git a/src/main/java/org/apache/hadoop/fs/cosn/CosNPartListing.java b/src/main/java/org/apache/hadoop/fs/cosn/CosNPartListing.java new file mode 100644 index 00000000..65badf23 --- /dev/null +++ b/src/main/java/org/apache/hadoop/fs/cosn/CosNPartListing.java @@ -0,0 +1,33 @@ +package org.apache.hadoop.fs.cosn; + +import com.qcloud.cos.model.PartSummary; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.NativeFileSystemStore; + +import java.util.List; + +/** + *

+ * Holds information of one upload id listing for part summary + * {@link NativeFileSystemStore}. + * This includes the {@link PartSummary part summary} + * (their names) contained in single MPU. + *

+ * + * @see NativeFileSystemStore#listParts(String, String) + */ + +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class CosNPartListing { + private final List partSummaries; + + public CosNPartListing(List partSummaries) { + this.partSummaries = partSummaries; + } + + public List getPartSummaries() { + return this.partSummaries; + } +}