diff --git a/pom.xml b/pom.xml index f9372182..0cb56e50 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.qcloud.cos hadoop-cos - 5.9.6 + 5.10.0 jar Apache Hadoop Tencent Qcloud COS Support @@ -41,7 +41,7 @@ 1.7 1.7 3.3.0 - 5.6.42 + 5.6.60 24.1.1-jre 3.1 4.8 diff --git a/src/main/java/org/apache/hadoop/fs/CosFileSystem.java b/src/main/java/org/apache/hadoop/fs/CosFileSystem.java index 31491ce1..36c94970 100644 --- a/src/main/java/org/apache/hadoop/fs/CosFileSystem.java +++ b/src/main/java/org/apache/hadoop/fs/CosFileSystem.java @@ -440,9 +440,15 @@ public FileStatus getFileStatus(Path f) throws IOException { if (!key.endsWith(PATH_DELIMITER)) { key += PATH_DELIMITER; } - LOG.debug("List the cos key [{}] to judge whether it is a directory or not.", key); + + int maxKeys = this.getConf().getInt( + CosNConfigKeys.FILESTATUS_LIST_MAX_KEYS, + CosNConfigKeys.DEFAULT_FILESTATUS_LIST_MAX_KEYS + ); + + LOG.debug("List the cos key [{}] to judge whether it is a directory or not. max keys [{}]", key, maxKeys); CosNResultInfo listInfo = new CosNResultInfo(); - CosNPartialListing listing = store.list(key, 1, listInfo); + CosNPartialListing listing = store.list(key, maxKeys, listInfo); if (listing.getFiles().length > 0 || listing.getCommonPrefixes().length > 0) { LOG.debug("List the cos key [{}] to find that it is a directory.", key); return newDirectory(absolutePath); diff --git a/src/main/java/org/apache/hadoop/fs/CosNConfigKeys.java b/src/main/java/org/apache/hadoop/fs/CosNConfigKeys.java index 18e794fe..d0032a34 100644 --- a/src/main/java/org/apache/hadoop/fs/CosNConfigKeys.java +++ b/src/main/java/org/apache/hadoop/fs/CosNConfigKeys.java @@ -11,7 +11,7 @@ @InterfaceStability.Unstable public class CosNConfigKeys extends CommonConfigurationKeys { public static final String USER_AGENT = "fs.cosn.user.agent"; - public static final String DEFAULT_USER_AGENT = "cos-hadoop-plugin-v5.9.7"; + public static final String DEFAULT_USER_AGENT = "cos-hadoop-plugin-v5.10.0"; public static final String TENCENT_EMR_VERSION_KEY = "fs.emr.version"; @@ -75,6 +75,14 @@ public class CosNConfigKeys extends CommonConfigurationKeys { public static final String READ_AHEAD_QUEUE_SIZE = "fs.cosn.read.ahead.queue.size"; public static final int DEFAULT_READ_AHEAD_QUEUE_SIZE = 8; + // used to control getFileStatus list to judge dir whether exist. + public static final String FILESTATUS_LIST_MAX_KEYS = "fs.cosn.filestatus.list_max_keys"; + public static final int DEFAULT_FILESTATUS_LIST_MAX_KEYS = 2; + + // used for double check complete mpu in case of return cos client exception but status is 200 ok. + public static final String COSN_COMPLETE_MPU_CHECK = "fs.cosn.complete.mpu.check"; + public static final boolean DEFAULT_COSN_COMPLETE_MPU_CHECK_ENABLE = true; + public static final String MAX_CONNECTION_NUM = "fs.cosn.max.connection.num"; public static final int DEFAULT_MAX_CONNECTION_NUM = 2048; diff --git a/src/main/java/org/apache/hadoop/fs/CosNativeFileSystemStore.java b/src/main/java/org/apache/hadoop/fs/CosNativeFileSystemStore.java index 40fe861f..690f4aea 100644 --- a/src/main/java/org/apache/hadoop/fs/CosNativeFileSystemStore.java +++ b/src/main/java/org/apache/hadoop/fs/CosNativeFileSystemStore.java @@ -4,6 +4,7 @@ import com.qcloud.cos.ClientConfig; import com.qcloud.cos.exception.CosClientException; import com.qcloud.cos.exception.CosServiceException; +import com.qcloud.cos.exception.ResponseNotCompleteException; import com.qcloud.cos.http.HttpProtocol; import com.qcloud.cos.internal.SkipMd5CheckStrategy; import com.qcloud.cos.model.*; @@ -47,6 +48,7 @@ public class CosNativeFileSystemStore implements NativeFileSystemStore { private int maxRetryTimes; private int trafficLimit; private boolean crc32cEnabled; + private boolean completeMPUCheckEnabled; private CosNEncryptionSecrets encryptionSecrets; private CustomerDomainEndpointResolver customerDomainEndpointResolver; @@ -104,6 +106,9 @@ private void initCOSClient(URI uri, Configuration conf) throws IOException { this.crc32cEnabled = conf.getBoolean(CosNConfigKeys.CRC32C_CHECKSUM_ENABLED, CosNConfigKeys.DEFAULT_CRC32C_CHECKSUM_ENABLED); + this.completeMPUCheckEnabled = conf.getBoolean(CosNConfigKeys.COSN_COMPLETE_MPU_CHECK, + CosNConfigKeys.DEFAULT_COSN_COMPLETE_MPU_CHECK_ENABLE); + // Proxy settings String httpProxyIp = conf.getTrimmed(CosNConfigKeys.HTTP_PROXY_IP); int httpProxyPort = conf.getInt(CosNConfigKeys.HTTP_PROXY_PORT, CosNConfigKeys.DEFAULT_HTTP_PROXY_PORT); @@ -1106,8 +1111,6 @@ private void callCOSClientWithSSECOS(X request, ObjectMetadata objectMetadat ((CopyObjectRequest) request).setNewObjectMetadata(objectMetadata); } else if (request instanceof InitiateMultipartUploadRequest) { ((InitiateMultipartUploadRequest) request).setObjectMetadata(objectMetadata); - } else { - throw new IOException("Set SSE_COS request no such method"); } } catch (Exception e) { String errMsg = @@ -1132,8 +1135,6 @@ private void callCOSClientWithSSEC(X request, SSECustomerKey sseKey) { ((GetObjectRequest) request).setSSECustomerKey(sseKey); } else if (request instanceof InitiateMultipartUploadRequest) { ((InitiateMultipartUploadRequest) request).setSSECustomerKey(sseKey); - } else { - throw new IOException("Set SSE_C request no such method"); } } catch (Exception e) { String errMsg = @@ -1260,12 +1261,24 @@ private Object callCOSClientWithRetry(X request) throws CosServiceException, } else { throw new IOException("no such method"); } + } catch (ResponseNotCompleteException nce) { + if (this.completeMPUCheckEnabled && request instanceof CompleteMultipartUploadRequest) { + String key = ((CompleteMultipartUploadRequest) request).getKey(); + FileMetadata fileMetadata = this.queryObjectMetadata(key); + if (null == fileMetadata) { + // if file not exist must throw the exception. + handleException(nce, key); + } + LOG.warn("Complete mpu resp not complete key [{}]", key); + // todo: some other double check after cgi unified the ctime of mpu. + } else { + throw new IOException(nce); + } } catch (CosServiceException cse) { String errMsg = String.format( "all cos sdk failed, retryIndex: [%d / %d], call " + "method: %s, exception: %s", - retryIndex, this.maxRetryTimes, sdkMethod, - cse.toString()); + retryIndex, this.maxRetryTimes, sdkMethod, cse.toString()); int statusCode = cse.getStatusCode(); String errorCode = cse.getErrorCode(); LOG.debug("fail to retry statusCode {}, errorCode {}", statusCode, errorCode); diff --git a/src/main/java/org/apache/hadoop/fs/NativeFileSystemStore.java b/src/main/java/org/apache/hadoop/fs/NativeFileSystemStore.java index 71bca390..beb6b51e 100644 --- a/src/main/java/org/apache/hadoop/fs/NativeFileSystemStore.java +++ b/src/main/java/org/apache/hadoop/fs/NativeFileSystemStore.java @@ -30,6 +30,8 @@ void storeFile(String key, InputStream inputStream, byte[] md5Hash, void storeEmptyFile(String key) throws IOException; + // must notice some mpu chunk error might have double head check. + // which means some times CompleteMultipartUploadResult might be null. CompleteMultipartUploadResult completeMultipartUpload(String key, String uploadId, List partETagList) throws IOException;