Skip to content

Commit

Permalink
Merge pull request #37 from vintmd/mpu-exception
Browse files Browse the repository at this point in the history
fix bugs and update to 5.10.0 version
  • Loading branch information
yuyang733 authored Oct 29, 2021
2 parents 641269d + 3954a16 commit 90af173
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 11 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.qcloud.cos</groupId>
<artifactId>hadoop-cos</artifactId>
<version>5.9.6</version>
<version>5.10.0</version>
<packaging>jar</packaging>

<name>Apache Hadoop Tencent Qcloud COS Support</name>
Expand Down Expand Up @@ -41,7 +41,7 @@
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
<hadoop.version>3.3.0</hadoop.version>
<cos_api.version>5.6.42</cos_api.version>
<cos_api.version>5.6.60</cos_api.version>
<google.guava.version>24.1.1-jre</google.guava.version>
<commons_lang3.version>3.1</commons_lang3.version>
<junit.version>4.8</junit.version>
Expand Down
10 changes: 8 additions & 2 deletions src/main/java/org/apache/hadoop/fs/CosFileSystem.java
Original file line number Diff line number Diff line change
Expand Up @@ -440,9 +440,15 @@ public FileStatus getFileStatus(Path f) throws IOException {
if (!key.endsWith(PATH_DELIMITER)) {
key += PATH_DELIMITER;
}
LOG.debug("List the cos key [{}] to judge whether it is a directory or not.", key);

int maxKeys = this.getConf().getInt(
CosNConfigKeys.FILESTATUS_LIST_MAX_KEYS,
CosNConfigKeys.DEFAULT_FILESTATUS_LIST_MAX_KEYS
);

LOG.debug("List the cos key [{}] to judge whether it is a directory or not. max keys [{}]", key, maxKeys);
CosNResultInfo listInfo = new CosNResultInfo();
CosNPartialListing listing = store.list(key, 1, listInfo);
CosNPartialListing listing = store.list(key, maxKeys, listInfo);
if (listing.getFiles().length > 0 || listing.getCommonPrefixes().length > 0) {
LOG.debug("List the cos key [{}] to find that it is a directory.", key);
return newDirectory(absolutePath);
Expand Down
10 changes: 9 additions & 1 deletion src/main/java/org/apache/hadoop/fs/CosNConfigKeys.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
@InterfaceStability.Unstable
public class CosNConfigKeys extends CommonConfigurationKeys {
public static final String USER_AGENT = "fs.cosn.user.agent";
public static final String DEFAULT_USER_AGENT = "cos-hadoop-plugin-v5.9.7";
public static final String DEFAULT_USER_AGENT = "cos-hadoop-plugin-v5.10.0";

public static final String TENCENT_EMR_VERSION_KEY = "fs.emr.version";

Expand Down Expand Up @@ -75,6 +75,14 @@ public class CosNConfigKeys extends CommonConfigurationKeys {
public static final String READ_AHEAD_QUEUE_SIZE = "fs.cosn.read.ahead.queue.size";
public static final int DEFAULT_READ_AHEAD_QUEUE_SIZE = 8;

// used to control getFileStatus list to judge dir whether exist.
public static final String FILESTATUS_LIST_MAX_KEYS = "fs.cosn.filestatus.list_max_keys";
public static final int DEFAULT_FILESTATUS_LIST_MAX_KEYS = 2;

// used for double check complete mpu in case of return cos client exception but status is 200 ok.
public static final String COSN_COMPLETE_MPU_CHECK = "fs.cosn.complete.mpu.check";
public static final boolean DEFAULT_COSN_COMPLETE_MPU_CHECK_ENABLE = true;

public static final String MAX_CONNECTION_NUM = "fs.cosn.max.connection.num";
public static final int DEFAULT_MAX_CONNECTION_NUM = 2048;

Expand Down
25 changes: 19 additions & 6 deletions src/main/java/org/apache/hadoop/fs/CosNativeFileSystemStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.qcloud.cos.ClientConfig;
import com.qcloud.cos.exception.CosClientException;
import com.qcloud.cos.exception.CosServiceException;
import com.qcloud.cos.exception.ResponseNotCompleteException;
import com.qcloud.cos.http.HttpProtocol;
import com.qcloud.cos.internal.SkipMd5CheckStrategy;
import com.qcloud.cos.model.*;
Expand Down Expand Up @@ -47,6 +48,7 @@ public class CosNativeFileSystemStore implements NativeFileSystemStore {
private int maxRetryTimes;
private int trafficLimit;
private boolean crc32cEnabled;
private boolean completeMPUCheckEnabled;
private CosNEncryptionSecrets encryptionSecrets;
private CustomerDomainEndpointResolver customerDomainEndpointResolver;

Expand Down Expand Up @@ -104,6 +106,9 @@ private void initCOSClient(URI uri, Configuration conf) throws IOException {
this.crc32cEnabled = conf.getBoolean(CosNConfigKeys.CRC32C_CHECKSUM_ENABLED,
CosNConfigKeys.DEFAULT_CRC32C_CHECKSUM_ENABLED);

this.completeMPUCheckEnabled = conf.getBoolean(CosNConfigKeys.COSN_COMPLETE_MPU_CHECK,
CosNConfigKeys.DEFAULT_COSN_COMPLETE_MPU_CHECK_ENABLE);

// Proxy settings
String httpProxyIp = conf.getTrimmed(CosNConfigKeys.HTTP_PROXY_IP);
int httpProxyPort = conf.getInt(CosNConfigKeys.HTTP_PROXY_PORT, CosNConfigKeys.DEFAULT_HTTP_PROXY_PORT);
Expand Down Expand Up @@ -1106,8 +1111,6 @@ private <X> void callCOSClientWithSSECOS(X request, ObjectMetadata objectMetadat
((CopyObjectRequest) request).setNewObjectMetadata(objectMetadata);
} else if (request instanceof InitiateMultipartUploadRequest) {
((InitiateMultipartUploadRequest) request).setObjectMetadata(objectMetadata);
} else {
throw new IOException("Set SSE_COS request no such method");
}
} catch (Exception e) {
String errMsg =
Expand All @@ -1132,8 +1135,6 @@ private <X> void callCOSClientWithSSEC(X request, SSECustomerKey sseKey) {
((GetObjectRequest) request).setSSECustomerKey(sseKey);
} else if (request instanceof InitiateMultipartUploadRequest) {
((InitiateMultipartUploadRequest) request).setSSECustomerKey(sseKey);
} else {
throw new IOException("Set SSE_C request no such method");
}
} catch (Exception e) {
String errMsg =
Expand Down Expand Up @@ -1260,12 +1261,24 @@ private <X> Object callCOSClientWithRetry(X request) throws CosServiceException,
} else {
throw new IOException("no such method");
}
} catch (ResponseNotCompleteException nce) {
if (this.completeMPUCheckEnabled && request instanceof CompleteMultipartUploadRequest) {
String key = ((CompleteMultipartUploadRequest) request).getKey();
FileMetadata fileMetadata = this.queryObjectMetadata(key);
if (null == fileMetadata) {
// if file not exist must throw the exception.
handleException(nce, key);
}
LOG.warn("Complete mpu resp not complete key [{}]", key);
// todo: some other double check after cgi unified the ctime of mpu.
} else {
throw new IOException(nce);
}
} catch (CosServiceException cse) {
String errMsg = String.format(
"all cos sdk failed, retryIndex: [%d / %d], call " +
"method: %s, exception: %s",
retryIndex, this.maxRetryTimes, sdkMethod,
cse.toString());
retryIndex, this.maxRetryTimes, sdkMethod, cse.toString());
int statusCode = cse.getStatusCode();
String errorCode = cse.getErrorCode();
LOG.debug("fail to retry statusCode {}, errorCode {}", statusCode, errorCode);
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/org/apache/hadoop/fs/NativeFileSystemStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ void storeFile(String key, InputStream inputStream, byte[] md5Hash,

void storeEmptyFile(String key) throws IOException;

// must notice some mpu chunk error might have double head check.
// which means some times CompleteMultipartUploadResult might be null.
CompleteMultipartUploadResult completeMultipartUpload(String key,
String uploadId,
List<PartETag> partETagList) throws IOException;
Expand Down

0 comments on commit 90af173

Please sign in to comment.