Skip to content

Commit

Permalink
refactor: remove the internal service resolver to a single components.
Browse files Browse the repository at this point in the history
Signed-off-by: Yang Yu <[email protected]>
  • Loading branch information
yuyang733 committed Aug 19, 2024
1 parent 35ab76e commit 8e46f19
Show file tree
Hide file tree
Showing 11 changed files with 61 additions and 705 deletions.
71 changes: 0 additions & 71 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -155,77 +155,6 @@
</snapshotRepository>
</distributionManagement>

<profiles>
<profile>
<id>normal</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<excludes>
<exclude>org/apache/hadoop/fs/cosn/TencentCloudL5EndpointResolverImpl.java</exclude>
<exclude>org/apache/hadoop/fs/cosn/TencentPolarisEndpointResolverImpl.java</exclude>
<exclude>org/apache/hadoop/fs/cosn/TencentPolarisSidecarEndpointResolverImpl.java</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</profile>

<!-- Tencent cloud supports internal configurations, including L5 domain name resolution. -->
<profile>
<!-- NOTE: The following configuration is only used for the Tencent’s internal system. -->
<id>internal</id>
<repositories>
<repository>
<id>tencent_public_snapshots</id>
<name>tencent_public_snapshots</name>
<url>https://mirrors.tencent.com/repository/maven/tencent_public_snapshots</url>
<layout>default</layout>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>

<repository>
<id>tencent_public</id>
<name>tencent_public</name>
<url>https://mirrors.tencent.com/repository/maven/tencent_public/</url>
<layout>default</layout>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>

<dependencies>
<dependency>
<groupId>com.tencent.jungle</groupId>
<artifactId>jungle-udp-l5</artifactId>
<version>1.0.0-20190917.073536-2</version>
</dependency>

<dependency>
<groupId>com.tencent.nameservice</groupId>
<artifactId>polaris-factory-shaded</artifactId>
<version>3.3.7</version>
</dependency>
</dependencies>
</profile>
</profiles>

<build>
<resources>
<resource>
Expand Down
18 changes: 16 additions & 2 deletions src/main/java/org/apache/hadoop/fs/CosNConfigKeys.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,22 +60,36 @@ public class CosNConfigKeys extends CommonConfigurationKeys {
public static final String COSN_USE_HTTPS_KEY = "fs.cosn.useHttps";
public static final boolean DEFAULT_USE_HTTPS = true; // 现在 COS 强制使用 https 作为访问协议

// 这个腾讯内部 L5 负载均衡系统的配置,主要限于内部项目依赖使用,现在改用北极星了
// 这个腾讯内部 L5 负载均衡系统的配置,主要限于内部项目依赖使用,现在改用北极星了。
// 这些内部扩展(L5、北极星和北极星 SideCar)需要依赖 tencent-internal-extension 组件,这个只有腾讯内部仓库才有。坐标如下:
// <dependency>
// <groupId>com.qcloud.cos</groupId>
// <artifactId>tencent-internal-extension</artifactId>
// <version>${version}</version>
//</dependency>
@Deprecated
public static final String COSN_L5_KEY = "fs.cosn.bucket.l5";
public static final boolean DEFAULT_COSN_USE_L5_ENABLE = false;
public static final String COSN_USE_L5_ENABLE = "fs.cosn.use.l5.enable";
// 默认这个不要更需要更改。只有运行时依赖的组件确实发生了变动再更改
public static final String COSN_L5_RESOLVER_CLASS = "fs.cosn.l5.resolver.class";
public static final String DEFAULT_COSN_L5_RESOLVER_CLASS = "com.qcloud.cos.internal.endpoint.resolver.TencentCloudL5EndpointResolver";

public static final String COSN_USE_POLARIS_ENABLED = "fs.cosn.polaris.enabled";
public static final boolean DEFAULT_COSN_USE_POLARIS_ENABLED = false;
public static final String COSN_POLARIS_RESOLVER_CLASS = "fs.cosn.polaris.resolver.class";
public static final String DEFAULT_COSN_POLARIS_RESOLVER_CLASS = "com.qcloud.cos.internal.endpoint.resolver.TencentPolarisEndpointResolver";
public static final String COSN_POLARIS_NAMESPACE = "fs.cosn.polaris.namespace";
public static final String COSN_POLARIS_SERVICE = "fs.cosn.polaris.service";

public static final String COSN_L5_UPDATE_MAX_RETRIES_KEY = "fs.cosn.l5.update.maxRetries";
public static final int DEFAULT_COSN_L5_UPDATE_MAX_RETRIES = 5;

// 如果进程不能内嵌运行北极星,使用sidecar方式运行
public static final String COSN_USE_POLARIS_SIDECAR_ENABLED = "fs.cosn.polaris.sidecar.enabled";
public static final String COSN_POLARIS_SIDECAR_CLIENT_IMPL = "fs.cosn.polaris.sidecar.client.impl";
public static final String DEFAULT_COSN_POLARIS_SIDECAR_CLIENT_IMPL = "com.qcloud.cos.internal.endpoint.resolver.TencentPolarisSidecarClient";
public static final String COSN_POLARIS_SIDECAR_RESOLVER_CLASS = "fs.cosn.polaris.sidecar.resolver.class";
public static final String DEFAULT_COSN_POLARIS_SIDECAR_RESOLVER_CLASS = "com.qcloud.cos.internal.endpoint.resolver.TencentPolarisSidecarEndpointResolver";
public static final boolean DEFAULT_COSN_USE_POLARIS_SIDECAR_ENABLED = false;
public static final String COSN_POLARIS_SIDECAR_ADDRESS = "fs.cosn.polaris.sidecar.address";

Expand Down
67 changes: 45 additions & 22 deletions src/main/java/org/apache/hadoop/fs/CosNativeFileSystemStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
import com.qcloud.cos.COSClient;
import com.qcloud.cos.COSEncryptionClient;
import com.qcloud.cos.ClientConfig;
import com.qcloud.cos.endpoint.EndpointResolver;
import com.qcloud.cos.endpoint.SuffixEndpointBuilder;
import com.qcloud.cos.exception.CosClientException;
import com.qcloud.cos.exception.CosServiceException;
import com.qcloud.cos.exception.ResponseNotCompleteException;
import com.qcloud.cos.http.HandlerAfterProcess;
import com.qcloud.cos.http.HttpProtocol;
import com.qcloud.cos.internal.SkipMd5CheckStrategy;
import com.qcloud.cos.internal.crypto.CryptoConfiguration;
Expand Down Expand Up @@ -62,10 +64,7 @@
import org.apache.hadoop.fs.cosn.Constants;
import org.apache.hadoop.fs.cosn.CustomerDomainEndpointResolver;
import org.apache.hadoop.fs.cosn.ResettableFileInputStream;
import org.apache.hadoop.fs.cosn.TencentCloudL5EndpointResolver;
import org.apache.hadoop.fs.cosn.CosNPartListing;
import org.apache.hadoop.fs.cosn.TencentPolarisEndpointResolver;
import org.apache.hadoop.fs.cosn.TencentPolarisSidecarClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -118,10 +117,10 @@ public class CosNativeFileSystemStore implements NativeFileSystemStore {
private RangerCredentialsClient rangerCredentialsClient = null;
private boolean useL5Id = false;
private int l5UpdateMaxRetryTimes;
private TencentCloudL5EndpointResolver tencentL5EndpointResolver;
private EndpointResolver tencentL5EndpointResolver;

private boolean usePolaris = false;
private TencentPolarisEndpointResolver tencentPolarisEndpointResolver;
private EndpointResolver tencentPolarisEndpointResolver;

private void initCOSClient(URI uri, Configuration conf) throws IOException {
COSCredentialProviderList cosCredentialProviderList =
Expand Down Expand Up @@ -192,17 +191,20 @@ private void initCOSClient(URI uri, Configuration conf) throws IOException {

Class<?> l5EndpointResolverClass;
try {
l5EndpointResolverClass = Class.forName("org.apache.hadoop.fs.cosn.TencentCloudL5EndpointResolverImpl");
this.tencentL5EndpointResolver = (TencentCloudL5EndpointResolver) l5EndpointResolverClass.newInstance();
this.tencentL5EndpointResolver.setModId(l5modId);
this.tencentL5EndpointResolver.setCmdId(l5cmdId);
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
l5EndpointResolverClass = Class.forName(conf.get(CosNConfigKeys.COSN_L5_RESOLVER_CLASS,
CosNConfigKeys.DEFAULT_COSN_L5_RESOLVER_CLASS));
Constructor<?> constructor = l5EndpointResolverClass.getConstructor(int.class, int.class, int.class);
this.tencentL5EndpointResolver = (EndpointResolver) constructor.newInstance(l5modId, l5cmdId, this.l5UpdateMaxRetryTimes);
} catch (ClassNotFoundException | NoSuchMethodException | InvocationTargetException |
InstantiationException | IllegalAccessException e) {
throw new IOException("The current version does not support L5 resolver.", e);
}
config.setEndpointResolver(tencentL5EndpointResolver);
// used by cos java sdk to handle
config.turnOnRefreshEndpointAddrSwitch();
config.setHandlerAfterProcess(tencentL5EndpointResolver);
if (this.tencentL5EndpointResolver instanceof HandlerAfterProcess) {
config.setHandlerAfterProcess((HandlerAfterProcess) tencentL5EndpointResolver);
}
}
}

Expand All @@ -214,37 +216,54 @@ private void initCOSClient(URI uri, Configuration conf) throws IOException {
String namespace = conf.get(CosNConfigKeys.COSN_POLARIS_NAMESPACE);
String service = conf.get(CosNConfigKeys.COSN_POLARIS_SERVICE);
try {
Class<?> polarisEndpointResolverClass = Class.forName("org.apache.hadoop.fs.cosn.TencentPolarisEndpointResolverImpl");
Constructor<?> constructor = polarisEndpointResolverClass.getConstructor(String.class, String.class);
this.tencentPolarisEndpointResolver = (TencentPolarisEndpointResolver) constructor.newInstance(namespace, service);
Class<?> polarisEndpointResolverClass = Class.forName(conf.get(CosNConfigKeys.COSN_POLARIS_RESOLVER_CLASS,
CosNConfigKeys.DEFAULT_COSN_POLARIS_RESOLVER_CLASS));
Constructor<?> constructor = polarisEndpointResolverClass.getConstructor(String.class, String.class, int.class);
this.tencentPolarisEndpointResolver = (EndpointResolver) constructor.newInstance(namespace, service, this.l5UpdateMaxRetryTimes);
} catch (ClassNotFoundException | NoSuchMethodException | InvocationTargetException |
InstantiationException | IllegalAccessException e) {
throw new IOException("The current version does not support Polaris resolver.", e);
}

config.setEndpointResolver(this.tencentPolarisEndpointResolver);
config.turnOnRefreshEndpointAddrSwitch();
config.setHandlerAfterProcess(this.tencentPolarisEndpointResolver);
if (this.tencentPolarisEndpointResolver instanceof HandlerAfterProcess) {
config.setHandlerAfterProcess((HandlerAfterProcess) this.tencentPolarisEndpointResolver);
}
}

// 使用北极星 sidecar 初始化
boolean usePolarisSidecar = conf.getBoolean(CosNConfigKeys.COSN_USE_POLARIS_SIDECAR_ENABLED,
CosNConfigKeys.DEFAULT_COSN_USE_POLARIS_SIDECAR_ENABLED);
if( usePolarisSidecar ){
String namespace = conf.get(CosNConfigKeys.COSN_POLARIS_NAMESPACE);
String service = conf.get(CosNConfigKeys.COSN_POLARIS_SERVICE);
String address = conf.get(CosNConfigKeys.COSN_POLARIS_SIDECAR_ADDRESS);
TencentPolarisSidecarClient polarisSideCarClient = new TencentPolarisSidecarClient(address);
Class<?> polarisSideCarClientClass;
Object polarisSideCarClient;
try {
polarisSideCarClientClass = Class.forName(conf.get(CosNConfigKeys.COSN_POLARIS_SIDECAR_CLIENT_IMPL,
CosNConfigKeys.DEFAULT_COSN_POLARIS_SIDECAR_CLIENT_IMPL));
Constructor<?> constructor = polarisSideCarClientClass.getConstructor(String.class);
polarisSideCarClient = constructor.newInstance(address);
} catch (ClassNotFoundException | NoSuchMethodException | InvocationTargetException |
InstantiationException | IllegalAccessException e) {
throw new IOException("The current version does not support Polaris sidecar resolver.", e);
}
try {
Class<?> polarisEndpointResolverClass = Class.forName("org.apache.hadoop.fs.cosn.TencentPolarisSidecarEndpointResolverImpl");
Constructor<?> constructor = polarisEndpointResolverClass.getConstructor(TencentPolarisSidecarClient.class, String.class, String.class);
this.tencentPolarisEndpointResolver = (TencentPolarisEndpointResolver) constructor.newInstance(polarisSideCarClient, namespace, service);
Class<?> polarisEndpointResolverClass = Class.forName(conf.get(CosNConfigKeys.COSN_POLARIS_SIDECAR_RESOLVER_CLASS,
CosNConfigKeys.DEFAULT_COSN_POLARIS_SIDECAR_RESOLVER_CLASS));
Constructor<?> constructor = polarisEndpointResolverClass.getConstructor(polarisSideCarClientClass, String.class, String.class, int.class);
this.tencentPolarisEndpointResolver = (EndpointResolver) constructor.newInstance(polarisSideCarClient, namespace, service, this.l5UpdateMaxRetryTimes);
} catch (ClassNotFoundException | NoSuchMethodException | InvocationTargetException |
InstantiationException | IllegalAccessException e) {
throw new IOException("The current version does not support Polaris sidecar resolver.", e);
}
config.setEndpointResolver(this.tencentPolarisEndpointResolver);
config.turnOnRefreshEndpointAddrSwitch();
config.setHandlerAfterProcess(this.tencentPolarisEndpointResolver);
if (this.tencentL5EndpointResolver instanceof HandlerAfterProcess) {
config.setHandlerAfterProcess((HandlerAfterProcess) this.tencentPolarisEndpointResolver);
}
}
} else {
config = new ClientConfig(new Region(""));
Expand Down Expand Up @@ -1888,7 +1907,9 @@ private <X> Object callCOSClientWithRetry(X request) throws CosServiceException,
if (useL5Id) {
if (l5ErrorCodeRetryIndex >= this.l5UpdateMaxRetryTimes) {
// L5上报,进行重试
tencentL5EndpointResolver.handle(-1, 0);
if (this.tencentL5EndpointResolver instanceof HandlerAfterProcess) {
((HandlerAfterProcess) this.tencentL5EndpointResolver).handle(-1, 0);
}
l5ErrorCodeRetryIndex = 1;
} else {
l5ErrorCodeRetryIndex = l5ErrorCodeRetryIndex + 1;
Expand All @@ -1898,7 +1919,9 @@ private <X> Object callCOSClientWithRetry(X request) throws CosServiceException,
if (this.usePolaris) {
if (l5ErrorCodeRetryIndex >= this.l5UpdateMaxRetryTimes) {
// Polaris上报,进行重试
this.tencentPolarisEndpointResolver.handle(-1, 0);
if (this.tencentPolarisEndpointResolver instanceof HandlerAfterProcess) {
((HandlerAfterProcess) this.tencentPolarisEndpointResolver).handle(-1, 0);
}
l5ErrorCodeRetryIndex = 1;
} else {
l5ErrorCodeRetryIndex = l5ErrorCodeRetryIndex + 1;
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

Loading

0 comments on commit 8e46f19

Please sign in to comment.