From 30f08ddbe4baca640049be9d58122f5a1175c3ca Mon Sep 17 00:00:00 2001 From: Yubin Li Date: Wed, 13 Nov 2024 23:24:23 +0800 Subject: [PATCH] add validation --- .../compact/PartialUpdateMergeFunction.java | 26 +++++++++++++------ .../table/PrimaryKeyFileStoreTableTest.java | 14 ++++++++++ 2 files changed, 32 insertions(+), 8 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java index 422b79c428de..ab25794129ba 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java @@ -291,7 +291,7 @@ private static class Factory implements MergeFunctionFactory { private final String removeRecordOnSequenceGroup; - private final Set sequenceGroupPartialDelete; + private Set sequenceGroupPartialDelete; private Factory(Options options, RowType rowType, List primaryKeys) { this.ignoreDelete = options.get(CoreOptions.IGNORE_DELETE); @@ -303,6 +303,7 @@ private Factory(Options options, RowType rowType, List primaryKeys) { List fieldNames = rowType.getFieldNames(); this.fieldSeqComparators = new HashMap<>(); + Map sequenceGroupMap = new HashMap<>(); for (Map.Entry entry : options.toMap().entrySet()) { String k = entry.getKey(); String v = entry.getValue(); @@ -341,13 +342,7 @@ private Factory(Options options, RowType rowType, List primaryKeys) { fieldName -> { int index = fieldNames.indexOf(fieldName); fieldSeqComparators.put(index, userDefinedSeqComparator); - if (removeRecordOnSequenceGroup != null - && Arrays.asList( - removeRecordOnSequenceGroup.split( - FIELDS_SEPARATOR)) - .contains(fieldName)) { - sequenceGroupPartialDelete.add(index); - } + sequenceGroupMap.put(fieldName, index); }); } } @@ -370,6 +365,21 @@ private Factory(Options options, RowType rowType, List primaryKeys) { String.format( "sequence group and %s have conflicting behavior so should not be enabled at the same time.", PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE)); + + if (removeRecordOnSequenceGroup != null) { + String[] sequenceGroupArr = removeRecordOnSequenceGroup.split(FIELDS_SEPARATOR); + Preconditions.checkState( + sequenceGroupMap.keySet().containsAll(Arrays.asList(sequenceGroupArr)), + String.format( + "field '%s' defined in '%s' option must be part of sequence groups", + removeRecordOnSequenceGroup, + PARTIAL_UPDATE_REMOVE_RECORD_ON_SEQUENCE_GROUP.key())); + sequenceGroupPartialDelete = + Arrays.stream(sequenceGroupArr) + .filter(sequenceGroupMap::containsKey) + .map(sequenceGroupMap::get) + .collect(Collectors.toSet()); + } } @Override diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java index 94d6686af9e9..133913c487cd 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java @@ -1236,7 +1236,21 @@ public void testPartialUpdateRemoveRecordOnSequenceGroup() throws Exception { options.set("partial-update.remove-record-on-sequence-group", "seq2"); }, rowType); + FileStoreTable wrongTable = + createFileStoreTable( + options -> { + options.set("merge-engine", "partial-update"); + options.set("fields.seq1.sequence-group", "b"); + options.set("fields.seq2.sequence-group", "c,d"); + options.set("partial-update.remove-record-on-sequence-group", "b"); + }, + rowType); Function rowToString = row -> internalRowToString(row, rowType); + + assertThatThrownBy(() -> wrongTable.newWrite("")) + .hasMessageContaining( + "field 'b' defined in 'partial-update.remove-record-on-sequence-group' option must be part of sequence groups"); + SnapshotReader snapshotReader = table.newSnapshotReader(); TableRead read = table.newRead(); StreamTableWrite write = table.newWrite("");