Skip to content

Commit

Permalink
fix: create table like using mixed_hive
Browse files Browse the repository at this point in the history
  • Loading branch information
Aireed committed Jan 2, 2025
1 parent 907973e commit 49829de
Showing 1 changed file with 43 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.TableCatalog
import org.apache.spark.sql.execution.command.CreateTableLikeCommand

import org.apache.amoro.spark.{MixedFormatSparkCatalog, MixedFormatSparkSessionCatalog}
import org.apache.amoro.spark.{MixedFormatSparkCatalog, MixedFormatSparkSessionCatalog, SparkUnifiedCatalog, SparkUnifiedSessionCatalog}
import org.apache.amoro.spark.mixed.MixedSessionCatalogBase
import org.apache.amoro.spark.sql.MixedFormatExtensionUtils.buildCatalogAndIdentifier
import org.apache.amoro.spark.sql.catalyst.plans.{AlterMixedFormatTableDropPartition, TruncateMixedFormatTable}
Expand All @@ -51,6 +51,10 @@ case class RewriteMixedFormatCommand(sparkSession: SparkSession) extends Rule[Lo
case _: MixedFormatSparkSessionCatalog[_] =>
provider.isDefined && MixedSessionCatalogBase.SUPPORTED_PROVIDERS.contains(
provider.get.toLowerCase)
case _: SparkUnifiedCatalog => true
case _: SparkUnifiedSessionCatalog[_] =>
provider.isDefined && MixedSessionCatalogBase.SUPPORTED_PROVIDERS.contains(
provider.get.toLowerCase)
case _ => false
}
}
Expand Down Expand Up @@ -82,35 +86,48 @@ case class RewriteMixedFormatCommand(sparkSession: SparkSession) extends Rule[Lo
optionsMap += (WriteMode.WRITE_MODE_KEY -> WriteMode.OVERWRITE_DYNAMIC.mode)
val newTableSpec = tableSpec.copy(properties = propertiesMap)
c.copy(tableSpec = newTableSpec, writeOptions = optionsMap)
case CreateTableLikeCommand(targetTable, sourceTable, _, provider, properties, ifNotExists)
if isCreateMixedFormatTableLikeCommand(targetTable, provider) =>
val (sourceCatalog, sourceIdentifier) = buildCatalogAndIdentifier(sparkSession, sourceTable)
val (targetCatalog, targetIdentifier) = buildCatalogAndIdentifier(sparkSession, targetTable)
val table = sourceCatalog.loadTable(sourceIdentifier)
var targetProperties = properties
targetProperties += ("provider" -> "arctic")
table match {
case keyedTable: MixedSparkTable =>
keyedTable.table() match {
case table: KeyedTable =>
targetProperties += ("primary.keys" -> String.join(
",",
table.primaryKeySpec().fieldNames()))
case CreateTableLikeCommand(targetTable, sourceTable, _, provider, properties, ifNotExists) =>
isCreateMixedFormatTableLikeCommand(targetTable, provider) match {
case true =>
val (sourceCatalog, sourceIdentifier) =
buildCatalogAndIdentifier(sparkSession, sourceTable)
val (targetCatalog, targetIdentifier) =
buildCatalogAndIdentifier(sparkSession, targetTable)
val table = sourceCatalog.loadTable(sourceIdentifier)
var targetProperties = properties
targetProperties += ("provider" -> "mixed_hive")
table match {
case keyedTable: MixedSparkTable =>
keyedTable.table() match {
case table: KeyedTable =>
targetProperties += ("primary.keys" -> String.join(
",",
table.primaryKeySpec().fieldNames()))
case _ =>
}
case _ =>
}
val tableSpec = TableSpec(
properties = targetProperties.toMap,
provider = provider,
options = Map.empty,
location = None,
comment = None,
serde = None,
external = false)
val seq: Seq[String] = Seq(
targetTable.database.getOrElse(sparkSession.catalog.currentDatabase),
targetTable.identifier)
val name = ResolvedDBObjectName(targetCatalog, seq)
CreateTable(name, table.schema(), table.partitioning(), tableSpec, ifNotExists)
case _ =>
provider.isDefined match {
case true =>
throw new UnsupportedOperationException(
s"format ${provider.get} does not support create table like command!!!")
case false => plan
}
}
val tableSpec = TableSpec(
properties = targetProperties.toMap,
provider = provider,
options = Map.empty,
location = None,
comment = None,
serde = None,
external = false)
val seq: Seq[String] = Seq(targetTable.database.get, targetTable.identifier)
val name = ResolvedDBObjectName(targetCatalog, seq)
CreateTable(name, table.schema(), table.partitioning(), tableSpec, ifNotExists)
case _ => plan
}
}
Expand Down

0 comments on commit 49829de

Please sign in to comment.