From 0cc1eb4c9d3a98ea585ebe6c204dd22e9f399f13 Mon Sep 17 00:00:00 2001 From: yuyang733 Date: Mon, 11 Apr 2022 16:51:43 +0800 Subject: [PATCH] build: fix the L5 support build --- pom.xml | 7 +- .../hadoop/fs/CosNativeFileSystemStore.java | 12 +- .../cosn/TencentCloudL5EndpointResolver.java | 103 +---------------- .../TencentCloudL5EndpointResolverImpl.java | 106 ++++++++++++++++++ 4 files changed, 118 insertions(+), 110 deletions(-) create mode 100644 src/main/java/org/apache/hadoop/fs/cosn/TencentCloudL5EndpointResolverImpl.java diff --git a/pom.xml b/pom.xml index f835f6b0..4d8470ee 100644 --- a/pom.xml +++ b/pom.xml @@ -89,7 +89,7 @@ normal - false + true @@ -98,7 +98,7 @@ maven-compiler-plugin - org/apache/hadoop/fs/cosn/TencentCloudL5EndpointResolver.java + org/apache/hadoop/fs/cosn/TencentCloudL5EndpointResolverImpl.java @@ -109,9 +109,6 @@ internal - - true - com.tencent.jungle diff --git a/src/main/java/org/apache/hadoop/fs/CosNativeFileSystemStore.java b/src/main/java/org/apache/hadoop/fs/CosNativeFileSystemStore.java index e4e9588c..9705a236 100644 --- a/src/main/java/org/apache/hadoop/fs/CosNativeFileSystemStore.java +++ b/src/main/java/org/apache/hadoop/fs/CosNativeFileSystemStore.java @@ -55,7 +55,7 @@ public class CosNativeFileSystemStore implements NativeFileSystemStore { private boolean isMergeBucket; private CustomerDomainEndpointResolver customerDomainEndpointResolver; - private EndpointResolver l5EndpointResolver; + private TencentCloudL5EndpointResolver l5EndpointResolver; private boolean useL5Id = false; private int l5UpdateMaxRetryTimes; @@ -104,10 +104,10 @@ private void initCOSClient(URI uri, Configuration conf) throws IOException { Class l5EndpointResolverClass; try { - l5EndpointResolverClass = Class.forName("org.apache.hadoop.fs.cosn.TencentCloudL5EndpointResolver"); - this.l5EndpointResolver = (EndpointResolver) l5EndpointResolverClass.newInstance(); - ((TencentCloudL5EndpointResolver)this.l5EndpointResolver).setModId(l5modId); - ((TencentCloudL5EndpointResolver)this.l5EndpointResolver).setCmdId(l5cmdId); + l5EndpointResolverClass = Class.forName("org.apache.hadoop.fs.cosn.TencentCloudL5EndpointResolverImpl"); + this.l5EndpointResolver = (TencentCloudL5EndpointResolver) l5EndpointResolverClass.newInstance(); + this.l5EndpointResolver.setModId(l5modId); + this.l5EndpointResolver.setCmdId(l5cmdId); } catch (ClassNotFoundException e) { throw new IOException("The current version does not support L5 resolver.", e); } catch (InstantiationException e) { @@ -1435,7 +1435,7 @@ private Object callCOSClientWithRetry(X request) throws CosServiceException, if (useL5Id) { if( l5ErrorCodeRetryIndex>= this.l5UpdateMaxRetryTimes) { // L5上报,进行重试 - ((TencentCloudL5EndpointResolver)l5EndpointResolver).l5RouteResultUpdate(-1); + l5EndpointResolver.updateRouteResult(-1); l5ErrorCodeRetryIndex = 1; } else { l5ErrorCodeRetryIndex = l5ErrorCodeRetryIndex + 1; diff --git a/src/main/java/org/apache/hadoop/fs/cosn/TencentCloudL5EndpointResolver.java b/src/main/java/org/apache/hadoop/fs/cosn/TencentCloudL5EndpointResolver.java index e7484768..6ddf472e 100644 --- a/src/main/java/org/apache/hadoop/fs/cosn/TencentCloudL5EndpointResolver.java +++ b/src/main/java/org/apache/hadoop/fs/cosn/TencentCloudL5EndpointResolver.java @@ -1,104 +1,9 @@ package org.apache.hadoop.fs.cosn; import com.qcloud.cos.endpoint.EndpointResolver; -import com.tencent.jungle.lb2.L5API; -import com.tencent.jungle.lb2.L5APIException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.util.concurrent.ThreadLocalRandom; - -/** - * 这个类是专供于腾讯云 L5 解析方式 - */ -public class TencentCloudL5EndpointResolver implements EndpointResolver { - private static final Logger LOG = LoggerFactory.getLogger(TencentCloudL5EndpointResolver.class); - - private int modId; - private int cmdId; - private String l5IP; - private int l5Port; - private long l5Start; - - public TencentCloudL5EndpointResolver() { - this(-1, -1); - } - - public TencentCloudL5EndpointResolver(int modId, int cmdId) { - this.modId = modId; - this.cmdId = cmdId; - this.l5IP = null; - this.l5Port = -1; - this.l5Start = 0; - } - - public int getModId() { - return modId; - } - - public void setModId(int modId) { - this.modId = modId; - } - - public int getCmdId() { - return cmdId; - } - - public void setCmdId(int cmdId) { - this.cmdId = cmdId; - } - - public void l5RouteResultUpdate(int status) { - if (null != l5IP && l5Port > 0) { - L5API.L5QOSPacket packet = new L5API.L5QOSPacket(); - packet.ip = this.l5IP; - packet.port = l5Port; - packet.cmdid = this.cmdId; - packet.modid = this.modId; - packet.start = this.l5Start; - - for (int i = 0; i < 5; ++i) { - L5API.updateRoute(packet, status); - } - } else { - LOG.error("Update l5 modid: {} cmdid: {} ip: {} port {} failed.", - this.modId, this.cmdId, this.l5IP, this.l5Port); - } - } - - @Override - public String resolveGeneralApiEndpoint(String s) { - float timeout = 0.2F; - String cgiIpAddr = null; - L5API.L5QOSPacket packet = new L5API.L5QOSPacket(); - packet.modid = this.modId; - packet.cmdid = this.cmdId; - - - for (int i = 0; i < 5; ++i) { - try { - packet = L5API.getRoute(packet, timeout); - if (!packet.ip.isEmpty() && packet.port > 0) { - this.l5IP = packet.ip; - this.l5Port = packet.port; - this.l5Start = packet.start; - cgiIpAddr = String.format("%s:%d", packet.ip, packet.port); - break; - } - } catch (L5APIException e) { - LOG.error("Get l5 modid: {} cmdid: {} failed.", this.modId, this.cmdId); - try { - Thread.sleep(ThreadLocalRandom.current().nextLong(10L, 1000L)); - } catch (InterruptedException var) { - } - } - } - - return cgiIpAddr; - } - - @Override - public String resolveGetServiceApiEndpoint(String s) { - return "service.cos.myqcloud.com"; - } +public interface TencentCloudL5EndpointResolver extends EndpointResolver { + public void setModId(int modId); + public void setCmdId(int cmdId); + public void updateRouteResult(int status); } diff --git a/src/main/java/org/apache/hadoop/fs/cosn/TencentCloudL5EndpointResolverImpl.java b/src/main/java/org/apache/hadoop/fs/cosn/TencentCloudL5EndpointResolverImpl.java new file mode 100644 index 00000000..c9cf0e7e --- /dev/null +++ b/src/main/java/org/apache/hadoop/fs/cosn/TencentCloudL5EndpointResolverImpl.java @@ -0,0 +1,106 @@ +package org.apache.hadoop.fs.cosn; + +import com.tencent.jungle.lb2.L5API; +import com.tencent.jungle.lb2.L5APIException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ThreadLocalRandom; + +/** + * 这个类是专供于腾讯云 L5 解析方式 + */ +public class TencentCloudL5EndpointResolverImpl implements TencentCloudL5EndpointResolver { + private static final Logger LOG = LoggerFactory.getLogger(TencentCloudL5EndpointResolverImpl.class); + + private int modId; + private int cmdId; + private String l5IP; + private int l5Port; + private long l5Start; + + public TencentCloudL5EndpointResolverImpl() { + this(-1, -1); + } + + public TencentCloudL5EndpointResolverImpl(int modId, int cmdId) { + this.modId = modId; + this.cmdId = cmdId; + this.l5IP = null; + this.l5Port = -1; + this.l5Start = 0; + } + + public int getModId() { + return modId; + } + + @Override + public void setModId(int modId) { + this.modId = modId; + } + + public int getCmdId() { + return cmdId; + } + + @Override + public void setCmdId(int cmdId) { + this.cmdId = cmdId; + } + + @Override + public void updateRouteResult(int status) { + if (null != l5IP && l5Port > 0) { + L5API.L5QOSPacket packet = new L5API.L5QOSPacket(); + packet.ip = this.l5IP; + packet.port = l5Port; + packet.cmdid = this.cmdId; + packet.modid = this.modId; + packet.start = this.l5Start; + + for (int i = 0; i < 5; ++i) { + L5API.updateRoute(packet, status); + } + } else { + LOG.error("Update l5 modid: {} cmdid: {} ip: {} port {} failed.", + this.modId, this.cmdId, this.l5IP, this.l5Port); + } + } + + @Override + public String resolveGeneralApiEndpoint(String s) { + float timeout = 0.2F; + String cgiIpAddr = null; + L5API.L5QOSPacket packet = new L5API.L5QOSPacket(); + packet.modid = this.modId; + packet.cmdid = this.cmdId; + + + for (int i = 0; i < 5; ++i) { + try { + packet = L5API.getRoute(packet, timeout); + if (!packet.ip.isEmpty() && packet.port > 0) { + this.l5IP = packet.ip; + this.l5Port = packet.port; + this.l5Start = packet.start; + cgiIpAddr = String.format("%s:%d", packet.ip, packet.port); + break; + } + } catch (L5APIException e) { + LOG.error("Get l5 modid: {} cmdid: {} failed.", this.modId, this.cmdId); + try { + Thread.sleep(ThreadLocalRandom.current().nextLong(10L, 1000L)); + } catch (InterruptedException var) { + } + } + } + + return cgiIpAddr; + } + + @Override + public String resolveGetServiceApiEndpoint(String s) { + return "service.cos.myqcloud.com"; + } +}