From a1d562f07b542321972850834c5812d4c2507b2b Mon Sep 17 00:00:00 2001 From: Jinfeng Liu Date: Fri, 10 May 2024 14:46:16 +0800 Subject: [PATCH] Spark 3.3: Support micro timestamp --- .../clickhouse/read/format/ClickHouseBinaryReader.scala | 6 +++++- .../clickhouse/read/format/ClickHouseJsonReader.scala | 7 ++++--- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/spark-3.3/clickhouse-spark/src/main/scala/xenon/clickhouse/read/format/ClickHouseBinaryReader.scala b/spark-3.3/clickhouse-spark/src/main/scala/xenon/clickhouse/read/format/ClickHouseBinaryReader.scala index acc0db6a..7c36cbef 100644 --- a/spark-3.3/clickhouse-spark/src/main/scala/xenon/clickhouse/read/format/ClickHouseBinaryReader.scala +++ b/spark-3.3/clickhouse-spark/src/main/scala/xenon/clickhouse/read/format/ClickHouseBinaryReader.scala @@ -24,6 +24,8 @@ import org.apache.spark.unsafe.types.UTF8String import xenon.clickhouse.exception.CHClientException import xenon.clickhouse.read.{ClickHouseInputPartition, ClickHouseReader, ScanJobDescription} +import java.time.ZoneOffset +import java.util.concurrent.TimeUnit import scala.collection.JavaConverters._ class ClickHouseBinaryReader( @@ -62,7 +64,9 @@ class ClickHouseBinaryReader( case FloatType => value.asFloat case DoubleType => value.asDouble case d: DecimalType => Decimal(value.asBigDecimal(d.scale)) - case TimestampType => value.asZonedDateTime.toEpochSecond * 1000 * 1000 // TODO consider scanJob.tz + case TimestampType => + var _instant = value.asZonedDateTime.withZoneSameInstant(ZoneOffset.UTC) + TimeUnit.SECONDS.toMicros(_instant.toEpochSecond) + TimeUnit.NANOSECONDS.toMicros(_instant.getNano()) case StringType if value.isInstanceOf[ClickHouseStringValue] => UTF8String.fromBytes(value.asBinary) case StringType => UTF8String.fromString(value.asString) case DateType => value.asDate.toEpochDay.toInt diff --git a/spark-3.3/clickhouse-spark/src/main/scala/xenon/clickhouse/read/format/ClickHouseJsonReader.scala b/spark-3.3/clickhouse-spark/src/main/scala/xenon/clickhouse/read/format/ClickHouseJsonReader.scala index b55af6ad..0e703857 100644 --- a/spark-3.3/clickhouse-spark/src/main/scala/xenon/clickhouse/read/format/ClickHouseJsonReader.scala +++ b/spark-3.3/clickhouse-spark/src/main/scala/xenon/clickhouse/read/format/ClickHouseJsonReader.scala @@ -27,6 +27,7 @@ import xenon.clickhouse.read.{ClickHouseInputPartition, ClickHouseReader, ScanJo import java.math.{MathContext, RoundingMode => RM} import java.time.{LocalDate, ZoneOffset, ZonedDateTime} +import java.util.concurrent.TimeUnit import scala.collection.JavaConverters._ import scala.math.BigDecimal.RoundingMode @@ -79,9 +80,9 @@ class ClickHouseJsonReader( case d: DecimalType => Decimal(BigDecimal(jsonNode.textValue, new MathContext(d.precision)), d.precision, d.scale) case TimestampType => - ZonedDateTime.parse(jsonNode.asText, dateTimeFmt.withZone(scanJob.tz)) - .withZoneSameInstant(ZoneOffset.UTC) - .toEpochSecond * 1000 * 1000 + var _instant = + ZonedDateTime.parse(jsonNode.asText, dateTimeFmt.withZone(scanJob.tz)).withZoneSameInstant(ZoneOffset.UTC) + TimeUnit.SECONDS.toMicros(_instant.toEpochSecond) + TimeUnit.NANOSECONDS.toMicros(_instant.getNano()) case StringType => UTF8String.fromString(jsonNode.asText) case DateType => LocalDate.parse(jsonNode.asText, dateFmt).toEpochDay.toInt case BinaryType => jsonNode.binaryValue