Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support specifying fe protocol #494

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ public class RestService implements Serializable {
private static final String BACKENDS_V2 = "/api/backends?is_alive=true";
private static final String FE_LOGIN = "/rest/v1/login";
private static final ObjectMapper objectMapper = new ObjectMapper();
private static final String TABLE_SCHEMA_API = "http://%s/api/%s/%s/_schema";
private static final String QUERY_PLAN_API = "http://%s/api/%s/%s/_query_plan";
private static final String TABLE_SCHEMA_API = "%s/api/%s/%s/_schema";
private static final String QUERY_PLAN_API = "%s/api/%s/%s/_query_plan";

/**
* send request to Doris FE and get response json string.
Expand Down Expand Up @@ -131,6 +131,7 @@ private static String send(
RequestConfig.custom()
.setConnectTimeout(connectTimeout)
.setSocketTimeout(socketTimeout)
.setRedirectsEnabled(true)
.build();

request.setConfig(requestConfig);
Expand Down Expand Up @@ -310,6 +311,9 @@ public static String randomEndpoint(String feNodes, Logger logger)
Collections.shuffle(nodes);
for (String feNode : nodes) {
String host = feNode.trim();
if (!host.startsWith("http://") && !host.startsWith("https://")) {
host = "http://" + host;
}
if (BackendUtil.tryHttpConnection(host)) {
return host;
}
Expand Down Expand Up @@ -359,7 +363,10 @@ public static List<BackendRowV2> getBackendsV2(

for (String feNode : feNodeList) {
try {
String beUrl = "http://" + feNode + BACKENDS_V2;
if (!feNode.startsWith("http://") && !feNode.startsWith("https://")) {
feNode = "http://" + feNode;
}
String beUrl = feNode + BACKENDS_V2;
HttpGet httpGet = new HttpGet(beUrl);
String response = send(options, readOptions, httpGet, logger);
logger.info("Backend Info:{}", response);
Expand Down Expand Up @@ -387,8 +394,7 @@ public static List<BackendRowV2> getBackendsV2(
private static List<BackendRowV2> convert(List<String> feNodeList) {
List<BackendRowV2> nodeList = new ArrayList<>();
for (String node : feNodeList) {
String[] split = node.split(":");
nodeList.add(BackendRowV2.of(split[0], Integer.valueOf(split[1]), true));
nodeList.add(BackendRowV2.ofUrl(node, true));
}
return nodeList;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ public String getIp() {
}

public void setIp(String ip) {
if (!ip.startsWith("http://") && !ip.startsWith("https://")) {
ip = "http://" + ip;
}
this.ip = ip;
}

Expand Down Expand Up @@ -82,5 +85,13 @@ public static BackendRowV2 of(String ip, int httpPort, boolean alive) {
rowV2.setAlive(alive);
return rowV2;
}

public static BackendRowV2 ofUrl(String url, boolean alive) {
int lastColon = url.lastIndexOf(":");
return BackendRowV2.of(
url.substring(0, lastColon),
Integer.valueOf(url.substring(lastColon + 1)),
alive);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,7 @@ private List<BackendV2.BackendRowV2> initBackends(String beNodes) {
if (tryHttpConnection(node)) {
LOG.info("{} backend http connection success.", node);
node = node.trim();
String[] ipAndPort = node.split(":");
BackendRowV2 backendRowV2 = new BackendRowV2();
backendRowV2.setIp(ipAndPort[0]);
backendRowV2.setHttpPort(Integer.parseInt(ipAndPort[1]));
backendRowV2.setAlive(true);
backends.add(backendRowV2);
backends.add(BackendRowV2.ofUrl(node, true));
}
});
return backends;
Expand Down Expand Up @@ -98,8 +93,10 @@ public String getAvailableBackend(int subtaskId) {

public static boolean tryHttpConnection(String host) {
try {
if (!host.startsWith("http://") && !host.startsWith("https://")) {
host = "http://" + host;
}
LOG.debug("try to connect host {}", host);
host = "http://" + host;
URL url = new URL(host);
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.setRequestMethod("GET");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class DorisCommittable implements DorisAbstractCommittable {
private final long txnID;

public DorisCommittable(String hostPort, String db, long txnID) {
this.hostPort = hostPort;
this.hostPort = (hostPort.startsWith("http") ? "" : "http://") + hostPort;
this.db = db;
this.txnID = txnID;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public class DorisBatchStreamLoad implements Serializable {
private static final long STREAM_LOAD_MAX_ROWS = Integer.MAX_VALUE;
private final LabelGenerator labelGenerator;
private final byte[] lineDelimiter;
private static final String LOAD_URL_PATTERN = "http://%s/api/%s/%s/_stream_load";
private static final String LOAD_URL_PATTERN = "%s/api/%s/%s/_stream_load";
private String loadUrl;
private String hostPort;
private final String username;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
/** The committer to commit transaction. */
public class DorisCommitter implements Committer<DorisCommittable>, Closeable {
private static final Logger LOG = LoggerFactory.getLogger(DorisCommitter.class);
private static final String commitPattern = "http://%s/api/%s/_stream_load_2pc";
private static final String commitPattern = "%s/api/%s/_stream_load_2pc";
private final CloseableHttpClient httpClient;
private final DorisOptions dorisOptions;
private final DorisReadOptions dorisReadOptions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public class BatchStageLoad implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(BatchStageLoad.class);
private final LabelGenerator labelGenerator;
private final byte[] lineDelimiter;
private static final String UPLOAD_URL_PATTERN = "http://%s/copy/upload";
private static final String UPLOAD_URL_PATTERN = "%s/copy/upload";
private static final String LINE_DELIMITER_KEY_WITH_PRETIX = "file.line_delimiter";
private String uploadUrl;
private String hostPort;
Expand Down Expand Up @@ -96,7 +96,9 @@ public BatchStageLoad(
this.password = dorisOptions.getPassword();
this.loadProps = executionOptions.getStreamLoadProp();
this.labelGenerator = labelGenerator;
this.hostPort = dorisOptions.getFenodes();
Copy link
Author

@vlada-dudr vlada-dudr Sep 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one seems particularly odd - isn't fenodes comma separated string? I did this. thoug, as it is later being prepended to UPLOAD_URL_PATTERN

this.hostPort =
(dorisOptions.getFenodes().startsWith("http") ? "" : "http://")
+ dorisOptions.getFenodes();
this.uploadUrl = String.format(UPLOAD_URL_PATTERN, hostPort);
this.fileNum = new AtomicInteger();
this.lineDelimiter =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class DorisCopyCommittable implements DorisAbstractCommittable {
private final String copySQL;

public DorisCopyCommittable(String hostPort, String copySQL) {
this.hostPort = hostPort;
this.hostPort = (hostPort.startsWith("http") ? "" : "http://") + hostPort;
this.copySQL = copySQL;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@

public class DorisCopyCommitter implements Committer<DorisCopyCommittable>, Closeable {
private static final Logger LOG = LoggerFactory.getLogger(DorisCopyCommitter.class);
private static final String commitPattern = "http://%s/copy/query";
private static final String commitPattern = "%s/copy/query";
private static final int SUCCESS = 0;
private static final String FAIL = "1";
private ObjectMapper objectMapper = new ObjectMapper();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public class SchemaChangeManager implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(SchemaChangeManager.class);
private static final String CHECK_SCHEMA_CHANGE_API =
"http://%s/api/enable_light_schema_change/%s/%s";
private static final String SCHEMA_CHANGE_API = "http://%s/api/query/default_cluster/%s";
private static final String SCHEMA_CHANGE_API = "%s/api/query/default_cluster/%s";
private ObjectMapper objectMapper = new ObjectMapper();
private DorisOptions dorisOptions;
private String charsetEncoding = "UTF-8";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ public class DorisStreamLoad implements Serializable {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private final LabelGenerator labelGenerator;
private final byte[] lineDelimiter;
private static final String LOAD_URL_PATTERN = "http://%s/api/%s/%s/_stream_load";
private static final String ABORT_URL_PATTERN = "http://%s/api/%s/_stream_load_2pc";
private static final String LOAD_URL_PATTERN = "%s/api/%s/%s/_stream_load";
private static final String ABORT_URL_PATTERN = "%s/api/%s/_stream_load_2pc";
public static final String JOB_EXIST_FINISHED = "FINISHED";

private String loadUrlStr;
Expand Down Expand Up @@ -102,7 +102,7 @@ public DorisStreamLoad(
DorisExecutionOptions executionOptions,
LabelGenerator labelGenerator,
CloseableHttpClient httpClient) {
this.hostPort = hostPort;
this.hostPort = (hostPort.startsWith("http") ? "" : "http://") + hostPort;
String[] tableInfo = dorisOptions.getTableIdentifier().split("\\.");
this.db = tableInfo[0];
this.table = tableInfo[1];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,12 @@ public void testParseIdentifierIllegal() throws Exception {

@Test
public void testChoiceFe() throws Exception {
String validFes = "1,2,3";
String validFes = "1,http://2,https://3";
String fe = RestService.randomEndpoint(validFes, logger);
List<String> feNodes = new ArrayList<>(3);
feNodes.add("1");
feNodes.add("2");
feNodes.add("3");
feNodes.add("http://1");
feNodes.add("http://2");
feNodes.add("https://3");
Assert.assertTrue(feNodes.contains(fe));

String emptyFes = "";
Expand Down Expand Up @@ -416,7 +416,7 @@ public void testParseBackendV2Error() throws Exception {
public void testGetBackendsV2() {
DorisOptions options =
DorisOptions.builder()
.setFenodes("127.0.0.1:1,127.0.0.1:2")
.setFenodes("https://127.0.0.1:1,http://127.0.0.1:2,127.0.0.1:3")
.setAutoRedirect(false)
.build();
DorisReadOptions readOptions = DorisReadOptions.defaults();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public void testPrepareCommit() throws Exception {

Assert.assertEquals(1, committableList.size());
DorisCopyCommittable committable = committableList.toArray(new DorisCopyCommittable[0])[0];
Assert.assertEquals("127.0.0.1:8030", committable.getHostPort());
Assert.assertEquals("http://127.0.0.1:8030", committable.getHostPort());

Pattern copySql =
Pattern.compile(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void testPrepareCommit() throws Exception {
Collection<DorisCommittable> committableList = dorisWriter.prepareCommit();
Assert.assertEquals(1, committableList.size());
DorisCommittable dorisCommittable = committableList.stream().findFirst().get();
Assert.assertEquals("local:8040", dorisCommittable.getHostPort());
Assert.assertEquals("http://local:8040", dorisCommittable.getHostPort());
Assert.assertEquals("db", dorisCommittable.getDb());
Assert.assertEquals(2, dorisCommittable.getTxnID());
Assert.assertFalse(dorisWriter.isLoading());
Expand Down
Loading