diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/connection/NebulaClientOptions.java b/connector/src/main/java/org.apache.flink/connector/nebula/connection/NebulaClientOptions.java index b6610b2..4f189bb 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/connection/NebulaClientOptions.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/connection/NebulaClientOptions.java @@ -40,13 +40,14 @@ public class NebulaClientOptions implements Serializable { private final SelfSignParams selfSignParams; + private final String version; private NebulaClientOptions(String metaAddress, String graphAddress, String username, String password, int timeout, int connectRetry, boolean enableGraphSSL, boolean enableMetaSSL, boolean enableStorageSSL, SSLSignType sslSignType, CASignParams caSignParams, - SelfSignParams selfSignParams) { + SelfSignParams selfSignParams, String version) { this.metaAddress = metaAddress; this.graphAddress = graphAddress; this.username = username; @@ -59,6 +60,7 @@ private NebulaClientOptions(String metaAddress, String graphAddress, String user this.sslSignType = sslSignType; this.caSignParams = caSignParams; this.selfSignParams = selfSignParams; + this.version = version; } public List getMetaAddress() { @@ -118,6 +120,10 @@ public SelfSignParams getSelfSignParam() { return selfSignParams; } + public String getVersion() { + return version; + } + /** * Builder for {@link NebulaClientOptions} */ @@ -136,6 +142,7 @@ public static class NebulaClientOptionsBuilder { private SSLSignType sslSignType = null; private CASignParams caSignParams = null; private SelfSignParams selfSignParams = null; + private String version = null; public NebulaClientOptionsBuilder setMetaAddress(String metaAddress) { this.metaAddress = metaAddress; @@ -200,6 +207,11 @@ public NebulaClientOptionsBuilder setSelfSignParam(String crtFilePath, String ke return this; } + public NebulaClientOptionsBuilder setVersion(String version) { + this.version = version; + return this; + } + public NebulaClientOptions build() { if (metaAddress == null || metaAddress.trim().isEmpty()) { throw new IllegalArgumentException("meta address can not be empty."); @@ -246,7 +258,8 @@ public NebulaClientOptions build() { enableStorageSSL, sslSignType, caSignParams, - selfSignParams); + selfSignParams, + version); } } } diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/connection/NebulaGraphConnectionProvider.java b/connector/src/main/java/org.apache.flink/connector/nebula/connection/NebulaGraphConnectionProvider.java index f05d102..15244da 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/connection/NebulaGraphConnectionProvider.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/connection/NebulaGraphConnectionProvider.java @@ -5,7 +5,6 @@ package org.apache.flink.connector.nebula.connection; - import com.vesoft.nebula.client.graph.NebulaPoolConfig; import com.vesoft.nebula.client.graph.data.CASignedSSLParam; import com.vesoft.nebula.client.graph.data.HostAddress; @@ -43,9 +42,9 @@ public NebulaPool getNebulaPool() throws UnknownHostException { } Collections.shuffle(addresses); - NebulaPool nebulaPool = new NebulaPool(); NebulaPoolConfig poolConfig = new NebulaPoolConfig(); poolConfig.setTimeout(nebulaClientOptions.getTimeout()); + poolConfig.setVersion(nebulaClientOptions.getVersion()); if (nebulaClientOptions.isEnableGraphSSL()) { poolConfig.setEnableSsl(true); switch (nebulaClientOptions.getSSLSignType()) { @@ -67,8 +66,12 @@ public NebulaPool getNebulaPool() throws UnknownHostException { throw new IllegalArgumentException("ssl sign type is not supported."); } } - nebulaPool.init(addresses, poolConfig); - return nebulaPool; + NebulaPool nebulaPool = new NebulaPool(); + if (nebulaPool.init(addresses, poolConfig)) { + return nebulaPool; + } else { + throw new RuntimeException("NebulaPool init failed."); + } } /** diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/connection/NebulaMetaConnectionProvider.java b/connector/src/main/java/org.apache.flink/connector/nebula/connection/NebulaMetaConnectionProvider.java index 8347e90..502aa53 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/connection/NebulaMetaConnectionProvider.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/connection/NebulaMetaConnectionProvider.java @@ -65,6 +65,7 @@ public MetaClient getMetaClient() throws TException, ClientServerIncompatibleExc metaClient = new MetaClient(addresses, timeout, retry, retry); } + metaClient.setVersion(nebulaClientOptions.getVersion()); metaClient.connect(); return metaClient; } diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/connection/NebulaStorageConnectionProvider.java b/connector/src/main/java/org.apache.flink/connector/nebula/connection/NebulaStorageConnectionProvider.java index 3273693..aa82568 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/connection/NebulaStorageConnectionProvider.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/connection/NebulaStorageConnectionProvider.java @@ -56,6 +56,7 @@ public StorageClient getStorageClient() throws Exception { storageClient = new StorageClient(addresses, timeout); } + storageClient.setVersion(nebulaClientOptions.getVersion()); if (!storageClient.connect()) { throw new Exception("failed to connect storaged."); } diff --git a/connector/src/test/java/org/apache/flink/connector/nebula/connection/NebulaGraphConnectionProviderTest.java b/connector/src/test/java/org/apache/flink/connector/nebula/connection/NebulaGraphConnectionProviderTest.java index 19e6293..84b5062 100644 --- a/connector/src/test/java/org/apache/flink/connector/nebula/connection/NebulaGraphConnectionProviderTest.java +++ b/connector/src/test/java/org/apache/flink/connector/nebula/connection/NebulaGraphConnectionProviderTest.java @@ -40,6 +40,7 @@ public void getNebulaPool() { .setPassword("nebula") .setConnectRetry(1) .setTimeout(1000) + .setVersion("test") .build(); NebulaGraphConnectionProvider graphConnectionProvider = new NebulaGraphConnectionProvider(nebulaClientOptions); @@ -52,10 +53,37 @@ public void getNebulaPool() { } } + @Test + public void getNebulaPoolWithWrongVersion() { + NebulaClientOptions nebulaClientOptions = + new NebulaClientOptions.NebulaClientOptionsBuilder() + .setGraphAddress("127.0.0.1:9669") + .setMetaAddress("127.0.0.1:9559") + .setUsername("root") + .setPassword("nebula") + .setConnectRetry(1) + .setTimeout(1000) + .setVersion("INVALID_VERSION") + .build(); + NebulaGraphConnectionProvider graphConnectionProvider = + new NebulaGraphConnectionProvider(nebulaClientOptions); + try { + NebulaPool nebulaPool = graphConnectionProvider.getNebulaPool(); + nebulaPool.getSession("root", "nebula", true); + } catch (Exception e) { + LOG.info("get session failed", e); + if (e.getMessage().contains("NebulaPool init failed.")) { + assert true; + } else { + assert false; + } + } + } + /** * nebula server does not enable ssl, the connection cannot be established correctly. */ - @Test(expected = NotValidConnectionException.class) + @Test(expected = RuntimeException.class) public void getSessionWithSsl() throws NotValidConnectionException { NebulaClientOptions nebulaClientOptions = new NebulaClientOptions.NebulaClientOptionsBuilder() @@ -81,7 +109,7 @@ public void getSessionWithSsl() throws NotValidConnectionException { NebulaPool pool = graphConnectionProvider.getNebulaPool(); pool.getSession("root", "nebula", true); } catch (UnknownHostException | IOErrorException | AuthFailedException - | ClientServerIncompatibleException e) { + | ClientServerIncompatibleException e) { LOG.error("get session failed", e); assert (false); } diff --git a/connector/src/test/resources/docker-compose.yaml b/connector/src/test/resources/docker-compose.yaml index 05c63b1..62af5c7 100644 --- a/connector/src/test/resources/docker-compose.yaml +++ b/connector/src/test/resources/docker-compose.yaml @@ -16,6 +16,8 @@ services: - --minloglevel=0 - --heartbeat_interval_secs=2 - --expired_time_factor=2 + - --enable_client_white_list=true + - --client_white_list=3.0.0:test healthcheck: test: ["CMD", "curl", "-f", "http://172.28.1.1:11000/status"] interval: 30s @@ -52,6 +54,8 @@ services: - --minloglevel=0 - --heartbeat_interval_secs=2 - --expired_time_factor=2 + - --enable_client_white_list=true + - --client_white_list=3.0.0:test healthcheck: test: ["CMD", "curl", "-f", "http://172.28.1.2:11000/status"] interval: 30s @@ -88,6 +92,8 @@ services: - --minloglevel=0 - --heartbeat_interval_secs=2 - --expired_time_factor=2 + - --enable_client_white_list=true + - --client_white_list=3.0.0:test healthcheck: test: ["CMD", "curl", "-f", "http://172.28.1.3:11000/status"] interval: 30s @@ -242,6 +248,8 @@ services: - --minloglevel=0 - --heartbeat_interval_secs=2 - --timezone_name=+08:00:00 + - --enable_client_white_list=true + - --client_white_list=3.0.0:test depends_on: - metad0 - metad1 @@ -279,6 +287,8 @@ services: - --minloglevel=0 - --heartbeat_interval_secs=2 - --timezone_name=+08:00:00 + - --enable_client_white_list=true + - --client_white_list=3.0.0:test depends_on: - metad0 - metad1 @@ -316,6 +326,8 @@ services: - --minloglevel=0 - --heartbeat_interval_secs=2 - --timezone_name=+08:00:00 + - --enable_client_white_list=true + - --client_white_list=3.0.0:test depends_on: - metad0 - metad1