Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature]add arrow type for streamload #265

Merged
merged 7 commits into from
Dec 27, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion flink-doris-connector/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,10 @@ selectFlink() {
FLINK_VERSION=0
selectFlink
flinkVer=$?
FLINK_PYTHON_ID="flink-python"
if [ ${flinkVer} -eq 1 ]; then
FLINK_VERSION="1.15.0"
FLINK_PYTHON_ID="flink-python_2.12"
elif [ ${flinkVer} -eq 2 ]; then
FLINK_VERSION="1.16.0"
elif [ ${flinkVer} -eq 3 ]; then
Expand All @@ -160,7 +162,7 @@ FLINK_MAJOR_VERSION=0
echo_g " flink version: ${FLINK_VERSION}, major version: ${FLINK_MAJOR_VERSION}"
echo_g " build starting..."

${MVN_BIN} clean package -Dflink.version=${FLINK_VERSION} -Dflink.major.version=${FLINK_MAJOR_VERSION} "$@"
${MVN_BIN} clean package -Dflink.version=${FLINK_VERSION} -Dflink.major.version=${FLINK_MAJOR_VERSION} -Dflink.python.id=${FLINK_PYTHON_ID} "$@"
JNSimba marked this conversation as resolved.
Show resolved Hide resolved

EXIT_CODE=$?
if [ $EXIT_CODE -eq 0 ]; then
Expand Down
21 changes: 9 additions & 12 deletions flink-doris-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,9 @@ under the License.
<flink.version>1.18.0</flink.version>
<flink.major.version>1.18</flink.major.version>
<flink.sql.cdc.version>2.4.2</flink.sql.cdc.version>
<flink.python.id>flink-python</flink.python.id>
<libthrift.version>0.16.0</libthrift.version>
<arrow.version>5.0.0</arrow.version>
<arrow.version>13.0.0</arrow.version>
<maven-compiler-plugin.version>3.10.1</maven-compiler-plugin.version>
<maven-javadoc-plugin.version>3.3.0</maven-javadoc-plugin.version>
<maven-source-plugin.version>3.2.1</maven-source-plugin.version>
Expand All @@ -84,7 +85,6 @@ under the License.
<spotless.version>2.4.2</spotless.version>
<httpcomponents.version>4.5.13</httpcomponents.version>
<commons-codec.version>1.15</commons-codec.version>
<netty.version>4.1.77.Final</netty.version>
<fasterxml.version>2.13.3</fasterxml.version>
<guava.version>31.1-jre</guava.version>
<slf4j.version>1.7.25</slf4j.version>
Expand Down Expand Up @@ -137,6 +137,13 @@ under the License.
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>${flink.python.id}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
Expand Down Expand Up @@ -195,19 +202,9 @@ under the License.
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
<version>${netty.version}</version>
</dependency>

<!-- jackson -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void insert(byte[] record) {
ensureCapacity(record.length);
if (loadBatchFirstRecord) {
loadBatchFirstRecord = false;
} else {
} else if (lineDelimiter != null) {
this.buffer.put(this.lineDelimiter);
}
this.buffer.put(record);
Expand All @@ -67,7 +67,7 @@ public void insert(byte[] record) {

@VisibleForTesting
public void ensureCapacity(int length) {
int lineDelimiterSize = this.lineDelimiter.length;
int lineDelimiterSize = this.lineDelimiter == null ? 0 : this.lineDelimiter.length;
if (buffer.remaining() - lineDelimiterSize >= length) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@

import static org.apache.doris.flink.sink.LoadStatus.PUBLISH_TIMEOUT;
import static org.apache.doris.flink.sink.LoadStatus.SUCCESS;
import static org.apache.doris.flink.sink.writer.LoadConstants.ARROW;
import static org.apache.doris.flink.sink.writer.LoadConstants.CSV;
import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY;
import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT;
import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_KEY;

Expand Down Expand Up @@ -105,10 +108,15 @@ public DorisBatchStreamLoad(
this.password = dorisOptions.getPassword();
this.loadProps = executionOptions.getStreamLoadProp();
this.labelGenerator = labelGenerator;
this.lineDelimiter =
EscapeHandler.escapeString(
loadProps.getProperty(LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT))
.getBytes();
if (loadProps.getProperty(FORMAT_KEY, CSV).equals(ARROW)) {
this.lineDelimiter = null;
} else {
this.lineDelimiter =
EscapeHandler.escapeString(
loadProps.getProperty(
LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT))
.getBytes();
}
this.executionOptions = executionOptions;
this.flushQueue = new LinkedBlockingDeque<>(executionOptions.getFlushQueueSize());
if (StringUtils.isNotBlank(dorisOptions.getTableIdentifier())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,15 @@ public DorisBatchWriter(
this.dorisReadOptions = dorisReadOptions;
this.executionOptions = executionOptions;
this.flushIntervalMs = executionOptions.getBufferFlushIntervalMs();
serializer.initial();
}

public void initializeLoad() throws IOException {
this.batchStreamLoad =
new DorisBatchStreamLoad(
dorisOptions, dorisReadOptions, executionOptions, labelGenerator);
// when uploading data in streaming mode,
// we need to regularly detect whether there areexceptions.
// when uploading data in streaming mode, we need to regularly detect whether there are
// exceptions.
scheduledExecutorService.scheduleWithFixedDelay(
this::intervalFlush, flushIntervalMs, flushIntervalMs, TimeUnit.MILLISECONDS);
}
Expand All @@ -101,13 +102,24 @@ private void intervalFlush() {
@Override
public void write(IN in, Context context) throws IOException, InterruptedException {
checkFlushException();
String db = this.database;
String tbl = this.table;
DorisRecord record = serializer.serialize(in);
writeOneDorisRecord(serializer.serialize(in));
}

@Override
public void flush(boolean flush) throws IOException, InterruptedException {
checkFlushException();
writeOneDorisRecord(serializer.flush());
LOG.info("checkpoint flush triggered.");
batchStreamLoad.flush(null, true);
}

public void writeOneDorisRecord(DorisRecord record) throws InterruptedException {
if (record == null || record.getRow() == null) {
// ddl or value is null
return;
}
String db = this.database;
String tbl = this.table;
// multi table load
if (record.getTableIdentifier() != null) {
db = record.getDatabase();
Expand All @@ -116,13 +128,6 @@ public void write(IN in, Context context) throws IOException, InterruptedExcepti
batchStreamLoad.writeRecord(db, tbl, record.getRow());
}

@Override
public void flush(boolean flush) throws IOException, InterruptedException {
checkFlushException();
LOG.info("checkpoint flush triggered.");
batchStreamLoad.flush(null, true);
}

@Override
public void close() throws Exception {
LOG.info("DorisBatchWriter Close");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@
import static org.apache.doris.flink.sink.LoadStatus.LABEL_ALREADY_EXIST;
import static org.apache.doris.flink.sink.LoadStatus.SUCCESS;
import static org.apache.doris.flink.sink.ResponseUtil.LABEL_EXIST_PATTERN;
import static org.apache.doris.flink.sink.writer.LoadConstants.ARROW;
import static org.apache.doris.flink.sink.writer.LoadConstants.CSV;
import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY;
import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT;
import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_KEY;

Expand Down Expand Up @@ -115,11 +118,15 @@ public DorisStreamLoad(
executionOptions.getBufferSize(),
executionOptions.getBufferCount(),
executionOptions.isUseCache());
lineDelimiter =
EscapeHandler.escapeString(
streamLoadProp.getProperty(
LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT))
.getBytes();
if (streamLoadProp.getProperty(FORMAT_KEY, CSV).equals(ARROW)) {
lineDelimiter = null;
} else {
lineDelimiter =
EscapeHandler.escapeString(
streamLoadProp.getProperty(
LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT))
.getBytes();
}
loadBatchFirstRecord = true;
}

Expand Down Expand Up @@ -157,8 +164,8 @@ public void abortPreCommit(String labelSuffix, long chkID) throws Exception {
LOG.info("abort for labelSuffix {}. start chkId {}.", labelSuffix, chkID);
while (true) {
try {
// TODO: According to label abort txn. Currently,
// it can only be aborted based on txnid,
// TODO: According to label abort txn. Currently, it can only be aborted based on
// txnid,
// so we must first request a streamload based on the label to get the txnid.
String label = labelGenerator.generateTableLabel(startChkID);
HttpPutBuilder builder = new HttpPutBuilder();
Expand Down Expand Up @@ -218,7 +225,7 @@ public void abortPreCommit(String labelSuffix, long chkID) throws Exception {
public void writeRecord(byte[] record) throws IOException {
if (loadBatchFirstRecord) {
loadBatchFirstRecord = false;
} else {
} else if (lineDelimiter != null) {
recordStream.write(lineDelimiter);
}
recordStream.write(record);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public DorisWriter(
this.globalLoading = false;

initializeLoad(state);
serializer.initial();
}

public void initializeLoad(Collection<DorisWriterState> state) {
Expand All @@ -123,8 +124,8 @@ public void initializeLoad(Collection<DorisWriterState> state) {
}
// get main work thread.
executorThread = Thread.currentThread();
// when uploading data in streaming mode,
// we need to regularly detect whether there are exceptions.
// when uploading data in streaming mode, we need to regularly detect whether there are
// exceptions.
scheduledExecutorService.scheduleWithFixedDelay(
this::checkDone, 200, intervalTime, TimeUnit.MILLISECONDS);
}
Expand Down Expand Up @@ -167,14 +168,23 @@ private void abortLingeringTransactions(Collection<DorisWriterState> recoveredSt
@Override
public void write(IN in, Context context) throws IOException {
checkLoadException();
String tableKey = dorisOptions.getTableIdentifier();
writeOneDorisRecord(serializer.serialize(in));
}

@Override
public void flush(boolean endOfInput) throws IOException, InterruptedException {
writeOneDorisRecord(serializer.flush());
}

public void writeOneDorisRecord(DorisRecord record) throws IOException {

DorisRecord record = serializer.serialize(in);
if (record == null || record.getRow() == null) {
// ddl or value is null
return;
}

// multi table load
String tableKey = dorisOptions.getTableIdentifier();
if (record.getTableIdentifier() != null) {
tableKey = record.getTableIdentifier();
}
Expand All @@ -191,11 +201,6 @@ public void write(IN in, Context context) throws IOException {
streamLoader.writeRecord(record.getRow());
}

@Override
public void flush(boolean flush) throws IOException, InterruptedException {
// No action is triggered, everything is in the precommit method
}

@Override
public Collection<DorisCommittable> prepareCommit() throws IOException, InterruptedException {
// Verify whether data is written during a checkpoint
Expand Down Expand Up @@ -369,5 +374,6 @@ public void close() throws Exception {
dorisStreamLoad.close();
}
}
serializer.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public class LoadConstants {
public static final String FORMAT_KEY = "format";
public static final String JSON = "json";
public static final String CSV = "csv";
public static final String ARROW = "arrow";
public static final String NULL_VALUE = "\\N";
public static final String DORIS_DELETE_SIGN = "__DORIS_DELETE_SIGN__";
public static final String READ_JSON_BY_LINE = "read_json_by_line";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import java.io.Serializable;

public class DorisRecord implements Serializable {

public static DorisRecord empty = new DorisRecord();

private String database;
private String table;
private byte[] row;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,12 @@ public interface DorisRecordSerializer<T> extends Serializable {
* @throws IOException
*/
DorisRecord serialize(T record) throws IOException;

default void initial() {}

default DorisRecord flush() {
return DorisRecord.empty;
}

default void close() throws Exception {}
}
Loading
Loading