Skip to content

Commit

Permalink
[Feature][chunjun-core] Supports capturing dirty data from the source…
Browse files Browse the repository at this point in the history
… and when the source sends it downstream DTStack#1901
  • Loading branch information
gaoliang committed Jun 25, 2024
1 parent 6fdd77a commit fa3edfd
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

package com.dtstack.chunjun.source;

import com.dtstack.chunjun.constants.Metrics;
import com.dtstack.chunjun.dirty.manager.DirtyManager;
import com.dtstack.chunjun.metrics.AccumulatorCollector;
import com.dtstack.chunjun.restore.FormatState;
import com.dtstack.chunjun.source.format.BaseRichInputFormat;
import com.dtstack.chunjun.util.ExceptionUtil;
Expand Down Expand Up @@ -112,19 +115,28 @@ public void run(SourceContext<OUT> ctx) throws Exception {
if (isRunning && format instanceof RichInputFormat) {
((RichInputFormat) format).openInputFormat();
}

OUT nextElement = serializer.createInstance();
while (isRunning) {
format.open(splitIterator.next());

AccumulatorCollector accumulatorCollector =
((BaseRichInputFormat) format).getAccumulatorCollector();
DirtyManager dirtyManager = ((BaseRichInputFormat) format).getDirtyManager();
// for each element we also check if cancel
// was called by checking the isRunning flag

while (isRunning && !format.reachedEnd()) {
synchronized (ctx.getCheckpointLock()) {
nextElement = format.nextRecord(nextElement);
if (nextElement != null) {
ctx.collect(nextElement);
try {
nextElement = format.nextRecord(nextElement);
if (nextElement != null) {
ctx.collect(nextElement);
}
} catch (Exception e) {
// 脏数据总数应是所有slot的脏数据总数,而不是单个的
long globalErrors =
accumulatorCollector.getAccumulatorValue(
Metrics.NUM_ERRORS, false);
dirtyManager.collect(nextElement, e, null, globalErrors);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,18 +190,11 @@ public void openInputFormat() throws IOException {
}

@Override
public RowData nextRecord(RowData rowData) {
public RowData nextRecord(RowData rowData) throws ReadRecordException {
if (byteRateLimiter != null) {
byteRateLimiter.acquire();
}
RowData internalRow = null;
try {
internalRow = nextRecordInternal(rowData);
} catch (ReadRecordException e) {
// 脏数据总数应是所有slot的脏数据总数,而不是单个的
long globalErrors = accumulatorCollector.getAccumulatorValue(Metrics.NUM_ERRORS, false);
dirtyManager.collect(e.getRowData(), e, null, globalErrors);
}
RowData internalRow = nextRecordInternal(rowData);
if (internalRow != null) {
updateDuration();
if (numReadCounter != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@

package com.dtstack.chunjun.throwable;

import java.io.IOException;

/** The Exception describing errors when read a record */
public class ReadRecordException extends Exception {
public class ReadRecordException extends IOException {

private static final long serialVersionUID = 453087894656079820L;
private final int colIndex;
Expand Down

0 comments on commit fa3edfd

Please sign in to comment.