Skip to content

Commit

Permalink
add list part to double check upload part conflict (#102)
Browse files Browse the repository at this point in the history
fix import

Co-authored-by: alantong(佟明达) <[email protected]>
  • Loading branch information
vintmd and vintmd authored Jan 31, 2023
1 parent 04a3d48 commit 789b652
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 6 deletions.
3 changes: 3 additions & 0 deletions src/main/java/org/apache/hadoop/fs/CosNConfigKeys.java
Original file line number Diff line number Diff line change
Expand Up @@ -229,4 +229,7 @@ public class CosNConfigKeys extends CommonConfigurationKeys {

public static final String COSN_FILESTATUS_LIST_OP_ENABLED = "fs.cosn.filestatus.list.op.enabled";
public static final boolean DEFAULT_FILESTATUS_LIST_OP_ENABLED = true;

public static final String COSN_PART_CONFLICT_CHECK_ENABLED = "fs.cosn.part.conflict.check.enabled";
public static final boolean DEFAULT_COSN_PART_CONFLICT_CHECK_ENABLED = true;
}
84 changes: 78 additions & 6 deletions src/main/java/org/apache/hadoop/fs/CosNativeFileSystemStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@
import com.qcloud.cos.model.StorageClass;
import com.qcloud.cos.model.UploadPartRequest;
import com.qcloud.cos.model.UploadPartResult;
import com.qcloud.cos.model.ListPartsRequest;
import com.qcloud.cos.model.PartSummary;
import com.qcloud.cos.model.PartListing;
import com.qcloud.cos.region.Region;
import com.qcloud.cos.utils.Base64;
import com.qcloud.cos.utils.IOUtils;
Expand All @@ -60,6 +63,7 @@
import org.apache.hadoop.fs.cosn.CustomerDomainEndpointResolver;
import org.apache.hadoop.fs.cosn.ResettableFileInputStream;
import org.apache.hadoop.fs.cosn.TencentCloudL5EndpointResolver;
import org.apache.hadoop.fs.cosn.CosNPartListing;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -77,6 +81,7 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.LinkedList;
import java.util.concurrent.ThreadLocalRandom;

@InterfaceAudience.Private
Expand All @@ -102,6 +107,7 @@ public class CosNativeFileSystemStore implements NativeFileSystemStore {
private int trafficLimit;
private boolean crc32cEnabled;
private boolean completeMPUCheckEnabled;
private boolean partConflictCheckEnabled;
private long partSize;
private boolean clientEncryptionEnabled;
private CosNEncryptionSecrets encryptionSecrets;
Expand Down Expand Up @@ -215,6 +221,8 @@ private void initCOSClient(URI uri, Configuration conf) throws IOException {
CosNConfigKeys.DEFAULT_CRC32C_CHECKSUM_ENABLED);
this.completeMPUCheckEnabled = conf.getBoolean(CosNConfigKeys.COSN_COMPLETE_MPU_CHECK,
CosNConfigKeys.DEFAULT_COSN_COMPLETE_MPU_CHECK_ENABLE);
this.partConflictCheckEnabled = conf.getBoolean(CosNConfigKeys.COSN_PART_CONFLICT_CHECK_ENABLED,
CosNConfigKeys.DEFAULT_COSN_PART_CONFLICT_CHECK_ENABLED);
this.clientEncryptionEnabled = conf.getBoolean(CosNConfigKeys.COSN_CLIENT_SIDE_ENCRYPTION_ENABLED,
CosNConfigKeys.DEFAULT_COSN_CLIENT_SIDE_ENCRYPTION_ENABLED);

Expand Down Expand Up @@ -522,9 +530,9 @@ public PartETag uploadPart(File file, String key, String uploadId,
}

@Override
public PartETag uploadPart(
InputStream inputStream,
String key, String uploadId, int partNum, long partSize, byte[] md5Hash, Boolean isLastPart) throws IOException {
public PartETag uploadPart(InputStream inputStream, String key, String uploadId,
int partNum, long partSize, byte[] md5Hash,
Boolean isLastPart) throws IOException {
LOG.debug("Upload the part to the cos key [{}]. upload id: {}, part number: {}, part size: {}",
key, uploadId, partNum, partSize);
ObjectMetadata objectMetadata = new ObjectMetadata();
Expand Down Expand Up @@ -558,14 +566,37 @@ public PartETag uploadPart(
UploadPartResult uploadPartResult =
(UploadPartResult) callCOSClientWithRetry(uploadPartRequest);
return uploadPartResult.getPartETag();
} catch (CosServiceException cse) {
String errMsg = String.format("The current thread:%d, "
+ "cos key: %s, upload id: %s, part num: %d, exception: %s",
Thread.currentThread().getId(), key, uploadId, partNum, cse);
if (!this.partConflictCheckEnabled) {
handleException(new Exception(errMsg), key);
} else {
int statusCode = cse.getStatusCode();
if (409 == statusCode) {
// conflict upload same upload part, because of nginx syn retry,
// sometimes 31s reconnect to cgi, but client side default 30s timeout,
// which may cause the retry request and previous request arrive at cgi at same time.
// so we for now use list parts to double-check this part whether exist.
CosNPartListing partListing = listParts(key, uploadId);
PartETag partETag = isPartExist(partListing, partNum, partSize);
if (null == partETag) {
handleException(new Exception(errMsg), key);
}
LOG.warn("Upload the file [{}] uploadId [{}], part [{}] concurrently." +
key, uploadId, partNum);
return partETag;
} else {
handleException(new Exception(errMsg), key);
}
}
} catch (Exception e) {
String errMsg = String.format("The current thread:%d, "
+ "cos key: %s, upload id: %s, part num: %d, " +
"exception: %s",
+ "cos key: %s, upload id: %s, part num: %d, exception: %s",
Thread.currentThread().getId(), key, uploadId, partNum, e);
handleException(new Exception(errMsg), key);
}

return null;
}

Expand Down Expand Up @@ -1497,6 +1528,44 @@ public long getFileLength(String key) throws IOException {
}
}

@Override
public CosNPartListing listParts(String key, String uploadId) throws IOException {
LOG.debug("List parts key: {}, uploadId: {}", key, uploadId);
ListPartsRequest listPartsRequest = new ListPartsRequest(bucketName, key, uploadId);
PartListing partListing = null;
List<PartSummary> partSummaries = new LinkedList<>();
do {
try {
partListing = (PartListing) callCOSClientWithRetry(listPartsRequest);
} catch (Exception e) {
String errMsg = String.format("list parts occurs an exception, " +
"cos key: %s, upload id: %s, exception: %s",
key, uploadId, e.getMessage());
LOG.error(errMsg);
handleException(new Exception(errMsg), key);
return null;
}
partSummaries.addAll(partListing.getParts());
listPartsRequest.setPartNumberMarker(partListing.getNextPartNumberMarker());
} while (partListing.isTruncated());
return new CosNPartListing(partSummaries);
}

private PartETag isPartExist(CosNPartListing partListing, int partNum, long partSize) {
PartETag ret = null;
if (null == partListing) {
return null;
}
for (PartSummary partSummary : partListing.getPartSummaries()){
// for now only check number and size.
if (partSummary.getPartNumber() == partNum && partSummary.getSize() == partSize) {
ret = new PartETag(partSummary.getPartNumber(), partSummary.getETag());
break;
}
}
return ret;
}

private <X> void callCOSClientWithSSEKMS(X request, SSECOSKeyManagementParams managementParams) {
try {
if (request instanceof PutObjectRequest) {
Expand Down Expand Up @@ -1688,6 +1757,9 @@ private <X> Object callCOSClientWithRetry(X request) throws CosServiceException,
} else if (request instanceof GetSymlinkRequest) {
sdkMethod = "getSymlink";
return this.cosClient.getSymlink((GetSymlinkRequest) request);
} else if (request instanceof ListPartsRequest) {
sdkMethod = "listParts";
return this.cosClient.listParts((ListPartsRequest) request);
} else {
throw new IOException("no such method");
}
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/org/apache/hadoop/fs/NativeFileSystemStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
import com.qcloud.cos.model.CompleteMultipartUploadResult;
import com.qcloud.cos.model.HeadBucketResult;
import com.qcloud.cos.model.PartETag;
import com.qcloud.cos.model.PartListing;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import com.qcloud.cos.COSClient;
import org.apache.hadoop.fs.cosn.CosNPartListing;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -117,6 +119,8 @@ CosNPartialListing list(String prefix, int maxListingLength,

String getSymlink(String symlink) throws IOException;

CosNPartListing listParts(String key, String uploadId) throws IOException;

/**
* Delete all keys with the given prefix. Used for testing.
*
Expand Down
33 changes: 33 additions & 0 deletions src/main/java/org/apache/hadoop/fs/cosn/CosNPartListing.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package org.apache.hadoop.fs.cosn;

import com.qcloud.cos.model.PartSummary;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.NativeFileSystemStore;

import java.util.List;

/**
* <p>
* Holds information of one upload id listing for part summary
* {@link NativeFileSystemStore}.
* This includes the {@link PartSummary part summary}
* (their names) contained in single MPU.
* </p>
*
* @see NativeFileSystemStore#listParts(String, String)
*/

@InterfaceAudience.Private
@InterfaceStability.Unstable
public class CosNPartListing {
private final List<PartSummary> partSummaries;

public CosNPartListing(List<PartSummary> partSummaries) {
this.partSummaries = partSummaries;
}

public List<PartSummary> getPartSummaries() {
return this.partSummaries;
}
}

0 comments on commit 789b652

Please sign in to comment.