diff --git a/debezium-core/src/main/java/io/debezium/processors/reselect/ReselectColumnsPostProcessor.java b/debezium-core/src/main/java/io/debezium/processors/reselect/ReselectColumnsPostProcessor.java index 1686b9cb993..a858f975a50 100644 --- a/debezium-core/src/main/java/io/debezium/processors/reselect/ReselectColumnsPostProcessor.java +++ b/debezium-core/src/main/java/io/debezium/processors/reselect/ReselectColumnsPostProcessor.java @@ -53,10 +53,12 @@ public class ReselectColumnsPostProcessor implements PostProcessor, BeanRegistry private static final String RESELECT_COLUMNS_EXCLUDE_LIST = "reselect.columns.exclude.list"; private static final String RESELECT_UNAVAILABLE_VALUES = "reselect.unavailable.values"; private static final String RESELECT_NULL_VALUES = "reselect.null.values"; + private static final String RESELECT_USE_EVENT_KEY = "reselect.use.event.key"; private Predicate selector; private boolean reselectUnavailableValues; private boolean reselectNullValues; + private boolean reselectUseEventKeyFields; private JdbcConnection jdbcConnection; private ValueConverterProvider valueConverterProvider; private String unavailableValuePlaceholder; @@ -68,6 +70,7 @@ public void configure(Map properties) { final Configuration config = Configuration.from(properties); this.reselectUnavailableValues = config.getBoolean(RESELECT_UNAVAILABLE_VALUES, true); this.reselectNullValues = config.getBoolean(RESELECT_NULL_VALUES, true); + this.reselectUseEventKeyFields = config.getBoolean(RESELECT_USE_EVENT_KEY, false); this.selector = new ReselectColumnsPredicateBuilder() .includeColumns(config.getString(RESELECT_COLUMNS_INCLUDE_LIST)) .excludeColumns(config.getString(RESELECT_COLUMNS_EXCLUDE_LIST)) @@ -127,9 +130,17 @@ public void apply(Object messageKey, Struct value) { final List keyColumns = new ArrayList<>(); final List keyValues = new ArrayList<>(); - for (org.apache.kafka.connect.data.Field field : key.schema().fields()) { - keyColumns.add(field.name()); - keyValues.add(key.get(field)); + if (reselectUseEventKeyFields) { + for (org.apache.kafka.connect.data.Field field : key.schema().fields()) { + keyColumns.add(field.name()); + keyValues.add(key.get(field)); + } + } + else { + for (Column column : table.primaryKeyColumns()) { + keyColumns.add(column.name()); + keyValues.add(key.get(key.schema().field(column.name()))); + } } Map selections; diff --git a/documentation/modules/ROOT/pages/post-processors/reselect-columns.adoc b/documentation/modules/ROOT/pages/post-processors/reselect-columns.adoc index 87c0f8ccc48..05c5d806fb1 100644 --- a/documentation/modules/ROOT/pages/post-processors/reselect-columns.adoc +++ b/documentation/modules/ROOT/pages/post-processors/reselect-columns.adoc @@ -26,6 +26,20 @@ You can configure the post processor to re-select the following column types: Configuring a `PostProcessor` is similar to configuring a `CustomConverter` or `Transformation`. +== Keyless tables + +The `ReselectColumnsPostProcessor` requires that the table have some unique combination of columns that can be used to generate a re-select query that returns a single row. +By default, the `PostProcessor` will use the relational table model to construct a where-clause based on the table's primary key columns or the unique index that is defined on the table. +However, if a table has no primary key or unique index, effectively keyless, then you can use the `message.key.columns` configuration to define a combination of columns that uniquely identifies a single row. +When using `message.key.columns` for keyless tables, it is important to set the `reselect.use.event.key` configuration property to `true` that the event's key fields are used as the basis for the selection criteria since the relational table model would have no primary key columns. + +[NOTE] +==== +The `ReselectColumnsPostProcessor` tolerates a re-select query that returns more than one row. +In such circumstances, only the first row will be used and that entry is entirely random and database driven. +It's recommended that if you use `reselect.use.event.key` set to `true`, your connector configuration and data model guarantees that the columns that participate in the event's key uniquely identify a single database row so that the re-select is always deterministic. +==== + == Configuration example Configure a `PostProcessor` much in the same way that you would configure a `CustomConverter` or `Transformation`. @@ -37,12 +51,14 @@ To enable the connector to use the `ReselectColumnsPostProcessor`, add the follo "reselector.reselect.columns.include.list": ".:,.:", // <3> "reselector.reselect.unavailable.values": "true", // <4> "reselector.reselect.null.values": "true" // <5> + "reselector.reselect.use.event.key": "false" // <6> ---- <1> Comma-separated list of post-processor prefixes. <2> The fully-qualified class type name for the post-processor. <3> Comma-separated list of column names specified by using the following format: `.:`. <4> Enables or disables the re-selection of columns that contain the `unavailable.value.placeholder` sentinel value. <5> Enables or disables the re-selection of columns that are `null`. +<6> Enables or disables the re-selection based event key field names. == Configuration options @@ -77,4 +93,11 @@ Do not set this property if you set the `reselect.columns.include.list` property |`true` |Specifies whether the post processor reselects a column that matches the `reselect.columns.include.list` filter if the column value is `null`. +|[[reselect-columns-post-processor-property-reselect-use-event-key]]<> +|`false` +|Specifies whether the post processor reselects based on the event's key field names or uses the relational table's primary key column names. + + + +By default, the reselect is based on the relational table's primary key columns or unique key index. +Setting this to `true` can be useful if the table has no primary key and the connector is configured to use `message.key.columns` to create events with a key. +This will then use the key field names as the primary key in the SQL reselection query. |===