Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] support catalog table read by arrowflightsql #530

Merged
merged 3 commits into from
Dec 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
public class PartitionDefinition implements Serializable, Comparable<PartitionDefinition> {
private final String database;
private final String table;

private final String beAddress;
private final Set<Long> tabletIds;
private final String queryPlan;
Expand All @@ -42,6 +41,10 @@ public PartitionDefinition(
this.queryPlan = queryPlan;
}

public static PartitionDefinition emptyPartition(String table) {
return new PartitionDefinition("", table, "", new HashSet<>(), "");
}

public String getBeAddress() {
return beAddress;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.flink.rest;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.Preconditions;

import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonProcessingException;
Expand Down Expand Up @@ -92,6 +93,7 @@ public class RestService implements Serializable {
private static final String FE_LOGIN = "/rest/v1/login";
private static final ObjectMapper objectMapper = new ObjectMapper();
private static final String TABLE_SCHEMA_API = "http://%s/api/%s/%s/_schema";
private static final String CATALOG_TABLE_SCHEMA_API = "http://%s/api/%s/%s/%s/_schema";
private static final String QUERY_PLAN_API = "http://%s/api/%s/%s/_query_plan";

/**
Expand Down Expand Up @@ -234,38 +236,6 @@ public static String parseResponse(HttpURLConnection connection, Logger logger)
}
}

@VisibleForTesting
public static String parseFlightSql(
DorisReadOptions readOptions,
DorisOptions options,
PartitionDefinition partition,
Logger logger)
throws IllegalArgumentException {
String[] tableIdentifiers = parseIdentifier(options.getTableIdentifier(), logger);
String readFields =
StringUtils.isBlank(readOptions.getReadFields())
? "*"
: readOptions.getReadFields();
String sql =
"select "
+ readFields
+ " from `"
+ tableIdentifiers[0]
+ "`.`"
+ tableIdentifiers[1]
+ "`";
String tablet =
partition.getTabletIds().stream()
.map(Object::toString)
.collect(Collectors.joining(","));
sql += " TABLET(" + tablet + ") ";
if (!StringUtils.isEmpty(readOptions.getFilterQuery())) {
sql += " where " + readOptions.getFilterQuery();
}
logger.info("Query SQL Sending to Doris FE is: '{}'.", sql);
return sql;
}

/**
* parse table identifier to array.
*
Expand All @@ -275,15 +245,16 @@ public static String parseFlightSql(
* @throws IllegalArgumentException table identifier is illegal
*/
@VisibleForTesting
static String[] parseIdentifier(String tableIdentifier, Logger logger)
public static String[] parseIdentifier(String tableIdentifier, Logger logger)
throws IllegalArgumentException {
logger.trace("Parse identifier '{}'.", tableIdentifier);
if (StringUtils.isEmpty(tableIdentifier)) {
logger.error(ILLEGAL_ARGUMENT_MESSAGE, "table.identifier", tableIdentifier);
throw new IllegalArgumentException("table.identifier", tableIdentifier);
}
String[] identifier = tableIdentifier.split("\\.");
if (identifier.length != 2) {
// db.table or catalog.db.table
if (identifier.length != 2 && identifier.length != 3) {
logger.error(ILLEGAL_ARGUMENT_MESSAGE, "table.identifier", tableIdentifier);
throw new IllegalArgumentException("table.identifier", tableIdentifier);
}
Expand Down Expand Up @@ -426,12 +397,26 @@ public static Schema getSchema(
throws DorisException {
logger.trace("Finding schema.");
String[] tableIdentifier = parseIdentifier(options.getTableIdentifier(), logger);
String tableSchemaUri =
String.format(
TABLE_SCHEMA_API,
randomEndpoint(options.getFenodes(), logger),
tableIdentifier[0],
tableIdentifier[1]);
String tableSchemaUri;
if (tableIdentifier.length == 2) {
tableSchemaUri =
String.format(
TABLE_SCHEMA_API,
randomEndpoint(options.getFenodes(), logger),
tableIdentifier[0],
tableIdentifier[1]);
} else if (tableIdentifier.length == 3) {
tableSchemaUri =
String.format(
CATALOG_TABLE_SCHEMA_API,
randomEndpoint(options.getFenodes(), logger),
tableIdentifier[0],
tableIdentifier[1],
tableIdentifier[2]);
} else {
throw new IllegalArgumentException(
"table identifier is illegal, should be db.table or catalog.db.table");
}
HttpGet httpGet = new HttpGet(tableSchemaUri);
String response = send(options, readOptions, httpGet, logger);
logger.debug("Find schema response is '{}'.", response);
Expand Down Expand Up @@ -561,6 +546,8 @@ public static List<PartitionDefinition> findPartitions(
DorisOptions options, DorisReadOptions readOptions, Logger logger)
throws DorisException {
String[] tableIdentifiers = parseIdentifier(options.getTableIdentifier(), logger);
Preconditions.checkArgument(
tableIdentifiers.length == 2, "table identifier is illegal, should be db.table");
String readFields =
StringUtils.isBlank(readOptions.getReadFields())
? "*"
Expand All @@ -578,6 +565,7 @@ public static List<PartitionDefinition> findPartitions(
}
logger.info("Query SQL Sending to Doris FE is: '{}'.", sql);
String[] tableIdentifier = parseIdentifier(options.getTableIdentifier(), logger);

String queryPlanUri =
String.format(
QUERY_PLAN_API,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.util.Preconditions;

import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
Expand Down Expand Up @@ -56,6 +57,7 @@ public class DorisSource<OUT>
ResultTypeQueryable<OUT> {

private static final Logger LOG = LoggerFactory.getLogger(DorisSource.class);
private static final String SINGLE_SPLIT = "SingleSplit";

private final DorisOptions options;
private final DorisReadOptions readOptions;
Expand Down Expand Up @@ -95,13 +97,27 @@ public SourceReader<OUT, DorisSourceSplit> createReader(SourceReaderContext read
public SplitEnumerator<DorisSourceSplit, PendingSplitsCheckpoint> createEnumerator(
SplitEnumeratorContext<DorisSourceSplit> context) throws Exception {
List<DorisSourceSplit> dorisSourceSplits = new ArrayList<>();
List<PartitionDefinition> partitions =
RestService.findPartitions(options, readOptions, LOG);
for (int index = 0; index < partitions.size(); index++) {
PartitionDefinition partitionDef = partitions.get(index);
String splitId = partitionDef.getBeAddress() + "_" + index;
dorisSourceSplits.add(new DorisSourceSplit(splitId, partitionDef));
String[] tableIdentifiers = RestService.parseIdentifier(options.getTableIdentifier(), LOG);

if (tableIdentifiers.length == 2) {
List<PartitionDefinition> partitions =
RestService.findPartitions(options, readOptions, LOG);
for (int index = 0; index < partitions.size(); index++) {
PartitionDefinition partitionDef = partitions.get(index);
String splitId = partitionDef.getBeAddress() + "_" + index;
dorisSourceSplits.add(new DorisSourceSplit(splitId, partitionDef));
}
} else {
Preconditions.checkArgument(
readOptions.getUseFlightSql(),
"UseFlightSql must be true when table.identifier is catalog.db.table");
// catalog query or customer query
dorisSourceSplits.add(
new DorisSourceSplit(
SINGLE_SPLIT,
PartitionDefinition.emptyPartition(options.getTableIdentifier())));
}

DorisSplitAssigner splitAssigner = new SimpleSplitAssigner(dorisSourceSplits);
return new DorisSourceEnumerator(context, splitAssigner);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.arrow.flight.Location;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.exception.DorisException;
Expand All @@ -39,17 +41,21 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;

import static org.apache.doris.flink.util.ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE;

public class DorisFlightValueReader extends ValueReader implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(DorisFlightValueReader.class);
private static final String PREFIX = "/* ApplicationName=Flink ArrowFlightSQL Query */";

protected AdbcConnection client;
protected Lock clientLock = new ReentrantLock();

Expand All @@ -64,24 +70,20 @@ public class DorisFlightValueReader extends ValueReader implements AutoCloseable
protected AtomicBoolean eos = new AtomicBoolean(false);

public DorisFlightValueReader(
PartitionDefinition partition,
DorisOptions options,
DorisReadOptions readOptions,
Schema schema) {
PartitionDefinition partition, DorisOptions options, DorisReadOptions readOptions) {
this.partition = partition;
this.options = options;
this.readOptions = readOptions;
initSchema();
this.client = openConnection();
this.schema = schema;
init();
}

private void init() {
clientLock.lock();
try {
this.statement = this.client.createStatement();
this.statement.setSqlQuery(
RestService.parseFlightSql(readOptions, options, partition, LOG));
this.statement.setSqlQuery(parseFlightSql(readOptions, options, partition, LOG));
this.queryResult = statement.executeQuery();
this.arrowReader = queryResult.getReader();
} catch (AdbcException | DorisException e) {
Expand All @@ -92,6 +94,48 @@ private void init() {
LOG.debug("Open scan result is, schema: {}.", schema);
}

private void initSchema() {
try {
this.schema = RestService.getSchema(options, readOptions, LOG);
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}

private String parseFlightSql(
DorisReadOptions readOptions,
DorisOptions options,
PartitionDefinition partition,
Logger logger)
throws IllegalArgumentException {
String[] tableIdentifiers =
RestService.parseIdentifier(options.getTableIdentifier(), logger);
String readFields =
StringUtils.isBlank(readOptions.getReadFields())
? "*"
: readOptions.getReadFields();

String queryTable =
Arrays.stream(tableIdentifiers)
.map(v -> "`" + v + "`")
.collect(Collectors.joining("."));

String sql = PREFIX + " SELECT " + readFields + " FROM " + queryTable;
if (CollectionUtils.isNotEmpty(partition.getTabletIds())) {
String tablet =
partition.getTabletIds().stream()
.map(Object::toString)
.collect(Collectors.joining(","));
sql += " TABLET(" + tablet + ") ";
}

if (!StringUtils.isEmpty(readOptions.getFilterQuery())) {
sql += " WHERE " + readOptions.getFilterQuery();
}
logger.info("Query SQL Sending to Doris FE is: '{}'.", sql);
return sql;
}

private AdbcConnection openConnection() {
final Map<String, Object> parameters = new HashMap<>();
RootAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@

import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.exception.DorisException;
import org.apache.doris.flink.rest.PartitionDefinition;
import org.apache.doris.flink.rest.RestService;
import org.slf4j.Logger;

import java.util.List;
Expand All @@ -32,15 +30,10 @@ public static ValueReader createReader(
PartitionDefinition partition,
DorisOptions options,
DorisReadOptions readOptions,
Logger logger)
throws DorisException {
Logger logger) {
logger.info("create reader for partition: {}", partition.toStringWithoutPlan());
if (readOptions.getUseFlightSql()) {
return new DorisFlightValueReader(
partition,
options,
readOptions,
RestService.getSchema(options, readOptions, logger));
return new DorisFlightValueReader(partition, options, readOptions);
} else {
return new DorisValueReader(partition, options, readOptions);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public PartitionDefinition getPartitionDefinition() {
@Override
public String toString() {
return String.format(
"DorisSourceSplit: %s.%s,id=%s,be=%s,tablets=%s",
"DorisSourceSplit: database=%s,table=%s,id=%s,be=%s,tablets=%s",
partitionDefinition.getDatabase(),
partitionDefinition.getTable(),
id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ public void testParseIdentifierIllegalEmpty() throws IllegalArgumentException {
@Test
public void testParseIdentifierIllegal() throws Exception {
String invalidIdentifier3 = "a.b.c";
RestService.parseIdentifier(invalidIdentifier3, logger);

invalidIdentifier3 = "a.b.c.d.e";
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage(
"argument 'table.identifier' is illegal, value is '" + invalidIdentifier3 + "'.");
Expand Down
Loading