Skip to content

Commit

Permalink
Spark 3.3: Support micro timestamp
Browse files Browse the repository at this point in the history
  • Loading branch information
Veiasai authored and pan3793 committed May 10, 2024
1 parent 21a0d3b commit a1d562f
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import org.apache.spark.unsafe.types.UTF8String
import xenon.clickhouse.exception.CHClientException
import xenon.clickhouse.read.{ClickHouseInputPartition, ClickHouseReader, ScanJobDescription}

import java.time.ZoneOffset
import java.util.concurrent.TimeUnit
import scala.collection.JavaConverters._

class ClickHouseBinaryReader(
Expand Down Expand Up @@ -62,7 +64,9 @@ class ClickHouseBinaryReader(
case FloatType => value.asFloat
case DoubleType => value.asDouble
case d: DecimalType => Decimal(value.asBigDecimal(d.scale))
case TimestampType => value.asZonedDateTime.toEpochSecond * 1000 * 1000 // TODO consider scanJob.tz
case TimestampType =>
var _instant = value.asZonedDateTime.withZoneSameInstant(ZoneOffset.UTC)
TimeUnit.SECONDS.toMicros(_instant.toEpochSecond) + TimeUnit.NANOSECONDS.toMicros(_instant.getNano())
case StringType if value.isInstanceOf[ClickHouseStringValue] => UTF8String.fromBytes(value.asBinary)
case StringType => UTF8String.fromString(value.asString)
case DateType => value.asDate.toEpochDay.toInt
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import xenon.clickhouse.read.{ClickHouseInputPartition, ClickHouseReader, ScanJo

import java.math.{MathContext, RoundingMode => RM}
import java.time.{LocalDate, ZoneOffset, ZonedDateTime}
import java.util.concurrent.TimeUnit
import scala.collection.JavaConverters._
import scala.math.BigDecimal.RoundingMode

Expand Down Expand Up @@ -79,9 +80,9 @@ class ClickHouseJsonReader(
case d: DecimalType =>
Decimal(BigDecimal(jsonNode.textValue, new MathContext(d.precision)), d.precision, d.scale)
case TimestampType =>
ZonedDateTime.parse(jsonNode.asText, dateTimeFmt.withZone(scanJob.tz))
.withZoneSameInstant(ZoneOffset.UTC)
.toEpochSecond * 1000 * 1000
var _instant =
ZonedDateTime.parse(jsonNode.asText, dateTimeFmt.withZone(scanJob.tz)).withZoneSameInstant(ZoneOffset.UTC)
TimeUnit.SECONDS.toMicros(_instant.toEpochSecond) + TimeUnit.NANOSECONDS.toMicros(_instant.getNano())
case StringType => UTF8String.fromString(jsonNode.asText)
case DateType => LocalDate.parse(jsonNode.asText, dateFmt).toEpochDay.toInt
case BinaryType => jsonNode.binaryValue
Expand Down

0 comments on commit a1d562f

Please sign in to comment.