Skip to content

Commit

Permalink
add validation
Browse files Browse the repository at this point in the history
  • Loading branch information
liyubin117 committed Nov 13, 2024
1 parent eaeb53a commit 30f08dd
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ private static class Factory implements MergeFunctionFactory<KeyValue> {

private final String removeRecordOnSequenceGroup;

private final Set<Integer> sequenceGroupPartialDelete;
private Set<Integer> sequenceGroupPartialDelete;

private Factory(Options options, RowType rowType, List<String> primaryKeys) {
this.ignoreDelete = options.get(CoreOptions.IGNORE_DELETE);
Expand All @@ -303,6 +303,7 @@ private Factory(Options options, RowType rowType, List<String> primaryKeys) {

List<String> fieldNames = rowType.getFieldNames();
this.fieldSeqComparators = new HashMap<>();
Map<String, Integer> sequenceGroupMap = new HashMap<>();
for (Map.Entry<String, String> entry : options.toMap().entrySet()) {
String k = entry.getKey();
String v = entry.getValue();
Expand Down Expand Up @@ -341,13 +342,7 @@ private Factory(Options options, RowType rowType, List<String> primaryKeys) {
fieldName -> {
int index = fieldNames.indexOf(fieldName);
fieldSeqComparators.put(index, userDefinedSeqComparator);
if (removeRecordOnSequenceGroup != null
&& Arrays.asList(
removeRecordOnSequenceGroup.split(
FIELDS_SEPARATOR))
.contains(fieldName)) {
sequenceGroupPartialDelete.add(index);
}
sequenceGroupMap.put(fieldName, index);
});
}
}
Expand All @@ -370,6 +365,21 @@ private Factory(Options options, RowType rowType, List<String> primaryKeys) {
String.format(
"sequence group and %s have conflicting behavior so should not be enabled at the same time.",
PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE));

if (removeRecordOnSequenceGroup != null) {
String[] sequenceGroupArr = removeRecordOnSequenceGroup.split(FIELDS_SEPARATOR);
Preconditions.checkState(
sequenceGroupMap.keySet().containsAll(Arrays.asList(sequenceGroupArr)),
String.format(
"field '%s' defined in '%s' option must be part of sequence groups",
removeRecordOnSequenceGroup,
PARTIAL_UPDATE_REMOVE_RECORD_ON_SEQUENCE_GROUP.key()));
sequenceGroupPartialDelete =
Arrays.stream(sequenceGroupArr)
.filter(sequenceGroupMap::containsKey)
.map(sequenceGroupMap::get)
.collect(Collectors.toSet());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1236,7 +1236,21 @@ public void testPartialUpdateRemoveRecordOnSequenceGroup() throws Exception {
options.set("partial-update.remove-record-on-sequence-group", "seq2");
},
rowType);
FileStoreTable wrongTable =
createFileStoreTable(
options -> {
options.set("merge-engine", "partial-update");
options.set("fields.seq1.sequence-group", "b");
options.set("fields.seq2.sequence-group", "c,d");
options.set("partial-update.remove-record-on-sequence-group", "b");
},
rowType);
Function<InternalRow, String> rowToString = row -> internalRowToString(row, rowType);

assertThatThrownBy(() -> wrongTable.newWrite(""))
.hasMessageContaining(
"field 'b' defined in 'partial-update.remove-record-on-sequence-group' option must be part of sequence groups");

SnapshotReader snapshotReader = table.newSnapshotReader();
TableRead read = table.newRead();
StreamTableWrite write = table.newWrite("");
Expand Down

0 comments on commit 30f08dd

Please sign in to comment.