Skip to content

Commit

Permalink
DBZ-7773 Implement time.precision.mode for adaptive_time_microseconds…
Browse files Browse the repository at this point in the history
… and connect
  • Loading branch information
twthorn authored and jpechane committed Apr 18, 2024
1 parent a88fcf6 commit 98dc94a
Show file tree
Hide file tree
Showing 11 changed files with 651 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.debezium.connector.SourceInfoStructMaker;
import io.debezium.connector.vitess.connection.VitessTabletType;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.relational.ColumnFilterMode;
import io.debezium.relational.RelationalDatabaseConnectorConfig;

Expand Down Expand Up @@ -390,6 +391,29 @@ public static BigIntUnsignedHandlingMode parse(String value, String defaultValue
+ "'precise' represents values as precise (Java's 'BigDecimal') values;"
+ "'long' represents values using Java's 'long', which may not offer the precision but will be far easier to use in consumers.");

public static final Field TIME_PRECISION_MODE = RelationalDatabaseConnectorConfig.TIME_PRECISION_MODE
.withEnum(TemporalPrecisionMode.class, TemporalPrecisionMode.ADAPTIVE_TIME_MICROSECONDS)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR, 26))
.withValidation(VitessConnectorConfig::validateTimePrecisionMode)
.withDescription("Time, date and timestamps can be represented with different kinds of precisions, including: "
+ "'adaptive_time_microseconds': the precision of date and timestamp values is based the database column's precision; but time fields always use microseconds precision; "
+ "'connect': always represents time, date and timestamp values using Kafka Connect's built-in representations for Time, Date, and Timestamp, "
+ "which uses millisecond precision regardless of the database columns' precision.");

private static int validateTimePrecisionMode(Configuration config, Field field, ValidationOutput problems) {
if (config.hasKey(TIME_PRECISION_MODE.name())) {
final String timePrecisionMode = config.getString(TIME_PRECISION_MODE.name());
if (TemporalPrecisionMode.ADAPTIVE.getValue().equals(timePrecisionMode)) {
// this is a problem
problems.accept(TIME_PRECISION_MODE, timePrecisionMode, "The 'adaptive' time.precision.mode is no longer supported");
return 1;
}
}

// Everything checks out ok.
return 0;
}

public static final Field SOURCE_INFO_STRUCT_MAKER = CommonConnectorConfig.SOURCE_INFO_STRUCT_MAKER
.withDefault(VitessSourceInfoStructMaker.class.getName());

Expand Down Expand Up @@ -418,8 +442,14 @@ public static BigIntUnsignedHandlingMode parse(String value, String defaultValue
.events(
INCLUDE_UNKNOWN_DATATYPES,
SOURCE_INFO_STRUCT_MAKER)
.connector(SNAPSHOT_MODE, BIGINT_UNSIGNED_HANDLING_MODE)
.excluding(SCHEMA_EXCLUDE_LIST, SCHEMA_INCLUDE_LIST)
.connector(
SNAPSHOT_MODE,
BIGINT_UNSIGNED_HANDLING_MODE,
TIME_PRECISION_MODE)
.excluding(
SCHEMA_EXCLUDE_LIST,
SCHEMA_INCLUDE_LIST,
RelationalDatabaseConnectorConfig.TIME_PRECISION_MODE)
.create();

// tasks.max is defined in org.apache.kafka.connect.runtime.ConnectorConfig
Expand Down Expand Up @@ -473,6 +503,11 @@ public String getConnectorName() {
return Module.name();
}

@Override
public TemporalPrecisionMode getTemporalPrecisionMode() {
return TemporalPrecisionMode.parse(getConfig().getString(TIME_PRECISION_MODE));
}

@Override
protected SourceInfoStructMaker<?> getSourceInfoStructMaker(Version version) {
// Assume V2 is used because it is the default version
Expand Down
25 changes: 22 additions & 3 deletions src/main/java/io/debezium/connector/vitess/VitessType.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;

import io.vitess.proto.Query;

Expand All @@ -22,15 +23,25 @@ public class VitessType {
private final int jdbcId;
// permitted enum values
private final List<String> enumValues;
private Optional<Integer> precision;

public VitessType(String name, int jdbcId) {
this(name, jdbcId, Collections.emptyList());
}

public VitessType(String name, int jdbcId, List<String> enumValues) {
this(name, jdbcId, enumValues, Optional.empty());
}

public VitessType(String name, int jdbcId, Integer precision) {
this(name, jdbcId, Collections.emptyList(), Optional.of(precision));
}

public VitessType(String name, int jdbcId, List<String> enumValues, Optional<Integer> precision) {
this.name = name;
this.jdbcId = jdbcId;
this.enumValues = Collections.unmodifiableList(enumValues);
this.precision = precision;
}

public String getName() {
Expand Down Expand Up @@ -87,6 +98,7 @@ public static VitessType resolve(Query.Field field) {
case "INT24":
case "UINT24":
case "INT32":
case "YEAR":
return new VitessType(type, Types.INTEGER);
case "ENUM":
return new VitessType(type, Types.INTEGER, resolveEnumAndSetValues(field.getColumnType()));
Expand All @@ -109,12 +121,15 @@ public static VitessType resolve(Query.Field field) {
case "TEXT":
case "JSON":
case "DECIMAL":
return new VitessType(type, Types.VARCHAR);
case "TIME":
return new VitessType(type, Types.TIME, field.getDecimals());
case "DATE":
case "DATETIME":
return new VitessType(type, Types.DATE);
case "TIMESTAMP":
case "YEAR":
return new VitessType(type, Types.VARCHAR);
return new VitessType(type, Types.TIMESTAMP_WITH_TIMEZONE, field.getDecimals());
case "DATETIME":
return new VitessType(type, Types.TIMESTAMP, field.getDecimals());
case "FLOAT32":
return new VitessType(type, Types.FLOAT);
case "FLOAT64":
Expand Down Expand Up @@ -161,4 +176,8 @@ else if (startCollecting) {
}
return values;
}

public Optional<Integer> getPrecision() {
return this.precision;
}
}
108 changes: 108 additions & 0 deletions src/main/java/io/debezium/connector/vitess/VitessValueConverter.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,31 +6,47 @@
package io.debezium.connector.vitess;

import java.math.BigDecimal;
import java.sql.Timestamp;
import java.sql.Types;
import java.time.Duration;
import java.time.LocalDate;
import java.time.ZoneOffset;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.debezium.DebeziumException;
import io.debezium.config.CommonConnectorConfig.BinaryHandlingMode;
import io.debezium.data.Json;
import io.debezium.jdbc.JdbcValueConverters;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.relational.Column;
import io.debezium.relational.RelationalChangeRecordEmitter;
import io.debezium.relational.ValueConverter;
import io.debezium.time.Year;
import io.debezium.util.Strings;
import io.vitess.proto.Query;

/** Used by {@link RelationalChangeRecordEmitter} to convert Java value to Connect value */
public class VitessValueConverter extends JdbcValueConverters {

private static final Logger LOGGER = LoggerFactory.getLogger(VitessValueConverter.class);
private static final BigDecimal BIGINT_MAX_VALUE = new BigDecimal("18446744073709551615");
private static final BigDecimal BIGINT_CORRECTION = BIGINT_MAX_VALUE.add(BigDecimal.ONE);

private final boolean includeUnknownDatatypes;
private final VitessConnectorConfig.BigIntUnsignedHandlingMode bigIntUnsignedHandlingMode;

private static final Pattern DATE_FIELD_PATTERN = Pattern.compile("([0-9]*)-([0-9]*)-([0-9]*)");
private static final Pattern TIME_FIELD_PATTERN = Pattern.compile("(\\-?[0-9]*):([0-9]*)(:([0-9]*))?(\\.([0-9]*))?");

public VitessValueConverter(
DecimalMode decimalMode,
TemporalPrecisionMode temporalPrecisionMode,
Expand All @@ -56,6 +72,9 @@ public SchemaBuilder schemaBuilder(Column column) {
if (matches(typeName, Query.Type.SET.name())) {
return io.debezium.data.EnumSet.builder(column.enumValues());
}
if (matches(typeName, Query.Type.YEAR.name())) {
return Year.builder();
}

if (matches(typeName, Query.Type.UINT64.name())) {
switch (bigIntUnsignedHandlingMode) {
Expand Down Expand Up @@ -106,6 +125,15 @@ public ValueConverter converter(Column column, Field fieldDefn) {
}
}

switch (column.jdbcType()) {
case Types.TIME:
if (adaptiveTimeMicrosecondsPrecisionMode) {
return data -> convertDurationToMicroseconds(column, fieldDefn, data);
}
case Types.TIMESTAMP:
return ((ValueConverter) (data -> convertTimestampToLocalDateTime(column, fieldDefn, data))).and(super.converter(column, fieldDefn));
}

final ValueConverter jdbcConverter = super.converter(column, fieldDefn);
if (jdbcConverter == null) {
return includeUnknownDatatypes
Expand All @@ -117,6 +145,29 @@ public ValueConverter converter(Column column, Field fieldDefn) {
}
}

protected static Object convertTimestampToLocalDateTime(Column column, Field fieldDefn, Object data) {
if (data == null && !fieldDefn.schema().isOptional()) {
return null;
}
if (!(data instanceof Timestamp)) {
return data;
}

return ((Timestamp) data).toLocalDateTime();
}

protected Object convertDurationToMicroseconds(Column column, Field fieldDefn, Object data) {
return convertValue(column, fieldDefn, data, 0L, (r) -> {
try {
if (data instanceof Duration) {
r.deliver(((Duration) data).toNanos() / 1_000);
}
}
catch (IllegalArgumentException e) {
}
});
}

/**
* Convert original value insertion of type 'BIGINT' into the correct BIGINT UNSIGNED representation
* Note: Unsigned BIGINT (64-bit) is represented in 'BigDecimal' data type. Reference: https://kafka.apache.org/0102/javadoc/org/apache/kafka/connect/data/Schema.Type.html
Expand Down Expand Up @@ -254,4 +305,61 @@ protected String convertSetValue(Column column, long indexes, List<String> optio
}
return sb.toString();
}

public static Duration stringToDuration(String timeString) {
Matcher matcher = TIME_FIELD_PATTERN.matcher(timeString);
if (!matcher.matches()) {
throw new DebeziumException("Unexpected format for TIME column: " + timeString);
}

boolean isNegative = !timeString.isBlank() && timeString.charAt(0) == '-';

final long hours = Long.parseLong(matcher.group(1));
final long minutes = Long.parseLong(matcher.group(2));
final String secondsGroup = matcher.group(4);
long seconds = 0;
long nanoSeconds = 0;

if (secondsGroup != null) {
seconds = Long.parseLong(secondsGroup);
String microSecondsString = matcher.group(6);
if (microSecondsString != null) {
nanoSeconds = Long.parseLong(Strings.justifyLeft(microSecondsString, 9, '0'));
}
}

final Duration duration = hours >= 0
? Duration
.ofHours(hours)
.plusMinutes(minutes)
.plusSeconds(seconds)
.plusNanos(nanoSeconds)
: Duration
.ofHours(hours)
.minusMinutes(minutes)
.minusSeconds(seconds)
.minusNanos(nanoSeconds);
return isNegative && !duration.isNegative() ? duration.negated() : duration;
}

public static LocalDate stringToLocalDate(String dateString) {
final Matcher matcher = DATE_FIELD_PATTERN.matcher(dateString);
if (!matcher.matches()) {
throw new RuntimeException("Unexpected format for DATE column: " + dateString);
}

final int year = Integer.parseInt(matcher.group(1));
final int month = Integer.parseInt(matcher.group(2));
final int day = Integer.parseInt(matcher.group(3));

if (year == 0 || month == 0 || day == 0) {
LOGGER.warn("Invalid value '{}' stored in column converted to empty value", dateString);
return null;
}
return LocalDate.of(year, month, day);
}

public static Timestamp stringToTimestamp(String datetimeString) {
return Timestamp.valueOf(datetimeString);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.sql.Types;

import io.debezium.connector.vitess.VitessType;
import io.debezium.connector.vitess.VitessValueConverter;

/** Resolve raw column value to Java value */
public class ReplicationMessageColumnValueResolver {
Expand All @@ -28,12 +29,19 @@ public static Object resolveValue(
case Types.BLOB:
case Types.BINARY:
return value.asBytes();
case Types.TIMESTAMP_WITH_TIMEZONE: // This is the case for TIMESTAMP which is simply treated as string
case Types.VARCHAR:
return value.asString();
case Types.FLOAT:
return value.asFloat();
case Types.DOUBLE:
return value.asDouble();
case Types.TIME:
return VitessValueConverter.stringToDuration(value.asString());
case Types.TIMESTAMP: // This is a misnomer and is the case for DATETIME
return VitessValueConverter.stringToTimestamp(value.asString());
case Types.DATE:
return VitessValueConverter.stringToLocalDate(value.asString());
default:
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,8 +389,12 @@ private Table resolveTable(String shard, String schemaName, String tableName, Li
.type(columnMetaData.getVitessType().getName())
.jdbcType(columnMetaData.getVitessType().getJdbcId())
.optional(columnMetaData.isOptional());
if (columnMetaData.getVitessType().isEnum()) {
editor = editor.enumValues(columnMetaData.getVitessType().getEnumValues());
VitessType vitessType = columnMetaData.getVitessType();
if (vitessType.getPrecision().isPresent()) {
editor = editor.length(vitessType.getPrecision().get());
}
if (vitessType.isEnum()) {
editor = editor.enumValues(vitessType.getEnumValues());
}
cols.add(editor.create());

Expand Down
Loading

0 comments on commit 98dc94a

Please sign in to comment.