diff --git a/pom.xml b/pom.xml index 8801779e..2a70968b 100644 --- a/pom.xml +++ b/pom.xml @@ -32,6 +32,14 @@ https://cloud.tencent.com/ https://github.com/yuyang733 + + alantong + Mingda Tong + alantong@tencent.com + Tencent Cloud + https://cloud.tencent.com/ + https://github.com/vintmd + @@ -45,7 +53,7 @@ 1.7 1.7 3.3.0 - 5.6.112 + 5.6.137.2 24.1.1-jre 3.1 4.8 diff --git a/src/main/java/org/apache/hadoop/fs/CosNativeFileSystemStore.java b/src/main/java/org/apache/hadoop/fs/CosNativeFileSystemStore.java index 6556c7b6..610bf09e 100644 --- a/src/main/java/org/apache/hadoop/fs/CosNativeFileSystemStore.java +++ b/src/main/java/org/apache/hadoop/fs/CosNativeFileSystemStore.java @@ -193,6 +193,9 @@ private void initCOSClient(URI uri, Configuration conf) throws IOException { throw new IOException("The current version does not support L5 resolver.", e); } config.setEndpointResolver(l5EndpointResolver); + // used by cos java sdk to handle + config.turnOnRefreshEndpointAddrSwitch(); + config.setHandlerAfterProcess(l5EndpointResolver); } } } else { @@ -217,6 +220,7 @@ private void initCOSClient(URI uri, Configuration conf) throws IOException { CosNConfigKeys.COSN_CLIENT_SOCKET_TIMEOUTSEC, CosNConfigKeys.DEFAULT_CLIENT_SOCKET_TIMEOUTSEC); config.setSocketTimeout(socketTimeoutSec * 1000); + this.crc32cEnabled = conf.getBoolean(CosNConfigKeys.CRC32C_CHECKSUM_ENABLED, CosNConfigKeys.DEFAULT_CRC32C_CHECKSUM_ENABLED); this.completeMPUCheckEnabled = conf.getBoolean(CosNConfigKeys.COSN_COMPLETE_MPU_CHECK, @@ -1817,7 +1821,7 @@ private Object callCOSClientWithRetry(X request) throws CosServiceException, if (useL5Id) { if (l5ErrorCodeRetryIndex >= this.l5UpdateMaxRetryTimes) { // L5上报,进行重试 - l5EndpointResolver.updateRouteResult(-1); + l5EndpointResolver.handle(-1, 0); l5ErrorCodeRetryIndex = 1; } else { l5ErrorCodeRetryIndex = l5ErrorCodeRetryIndex + 1; @@ -1883,7 +1887,7 @@ private Object callCOSClientWithRetry(X request) throws CosServiceException, if (useL5Id) { if (l5ErrorCodeRetryIndex >= this.l5UpdateMaxRetryTimes) { // L5上报,进行重试 - l5EndpointResolver.updateRouteResult(-1); + l5EndpointResolver.handle(-1, 0); l5ErrorCodeRetryIndex = 1; } else { l5ErrorCodeRetryIndex = l5ErrorCodeRetryIndex + 1; diff --git a/src/main/java/org/apache/hadoop/fs/cosn/TencentCloudL5EndpointResolver.java b/src/main/java/org/apache/hadoop/fs/cosn/TencentCloudL5EndpointResolver.java index 6ddf472e..4a23702d 100644 --- a/src/main/java/org/apache/hadoop/fs/cosn/TencentCloudL5EndpointResolver.java +++ b/src/main/java/org/apache/hadoop/fs/cosn/TencentCloudL5EndpointResolver.java @@ -1,9 +1,9 @@ package org.apache.hadoop.fs.cosn; import com.qcloud.cos.endpoint.EndpointResolver; +import com.qcloud.cos.http.HandlerAfterProcess; -public interface TencentCloudL5EndpointResolver extends EndpointResolver { +public interface TencentCloudL5EndpointResolver extends EndpointResolver, HandlerAfterProcess { public void setModId(int modId); public void setCmdId(int cmdId); - public void updateRouteResult(int status); } diff --git a/src/main/java/org/apache/hadoop/fs/cosn/TencentCloudL5EndpointResolverImpl.java b/src/main/java/org/apache/hadoop/fs/cosn/TencentCloudL5EndpointResolverImpl.java index 58fedcb4..c67629cf 100644 --- a/src/main/java/org/apache/hadoop/fs/cosn/TencentCloudL5EndpointResolverImpl.java +++ b/src/main/java/org/apache/hadoop/fs/cosn/TencentCloudL5EndpointResolverImpl.java @@ -50,21 +50,23 @@ public void setCmdId(int cmdId) { } @Override - public void updateRouteResult(int status) { + public void handle(int status, long costMs) { + // which is used by cos java sdk to refresh if (null != l5IP && l5Port > 0) { + int startUsec = (int)(System.currentTimeMillis() - costMs); L5API.L5QOSPacket packet = new L5API.L5QOSPacket(); packet.ip = this.l5IP; packet.port = l5Port; packet.cmdid = this.cmdId; packet.modid = this.modId; - packet.start = this.l5Start; + packet.start = startUsec; - for (int i = 0; i < 5; ++i) { - L5API.updateRoute(packet, status); - } + // because inner already retry many times, give the control into the java sdk. + L5API.updateRoute(packet, status); } else { - LOG.error("Update l5 modid: {} cmdid: {} ip: {} port {} failed.", - this.modId, this.cmdId, this.l5IP, this.l5Port); + LOG.error("Update l5 modid: {} cmdid: {} ip: {} port {}, " + + "status {}, costMs {} failed.", + this.modId, this.cmdId, this.l5IP, this.l5Port, status, costMs); } }