diff --git a/crates/catalog/sql/Cargo.toml b/crates/catalog/sql/Cargo.toml
index a51671650..71cf37c14 100644
--- a/crates/catalog/sql/Cargo.toml
+++ b/crates/catalog/sql/Cargo.toml
@@ -40,7 +40,7 @@ uuid = { workspace = true, features = ["v4"] }
iceberg_test_utils = { path = "../../test_utils", features = ["tests"] }
itertools = { workspace = true }
regex = "1.10.5"
-sqlx = { version = "0.8.0", features = [
+sqlx = { version = "0.8.1", features = [
"tls-rustls",
"runtime-tokio",
"any",
diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs
index 1556614dd..f92f47429 100644
--- a/crates/catalog/sql/src/catalog.rs
+++ b/crates/catalog/sql/src/catalog.rs
@@ -23,8 +23,7 @@ use iceberg::io::FileIO;
use iceberg::spec::{TableMetadata, TableMetadataBuilder};
use iceberg::table::Table;
use iceberg::{
- Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, TableCreation,
- TableIdent,
+ Catalog, Error, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, TableIdent,
};
use sqlx::any::{install_default_drivers, AnyPoolOptions, AnyQueryResult, AnyRow};
use sqlx::{Any, AnyPool, Row, Transaction};
@@ -588,8 +587,8 @@ impl Catalog for SqlCatalog {
&format!(
"DELETE FROM {CATALOG_TABLE_NAME}
WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
- AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
AND {CATALOG_FIELD_TABLE_NAME} = ?
+ AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
AND (
{CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
@@ -769,23 +768,86 @@ impl Catalog for SqlCatalog {
Ok(())
}
- async fn update_table(&self, _commit: TableCommit) -> Result
{
- Err(Error::new(
- ErrorKind::FeatureUnsupported,
- "Updating a table is not supported yet",
- ))
+ async fn update_table(&self, mut commit: TableCommit) -> Result {
+ let identifier = commit.identifier().clone();
+ if !self.table_exists(&identifier).await? {
+ return no_such_table_err(&identifier);
+ }
+
+ // ReplaceSortOrder is currently not supported, so ignore the requirement here.
+ let _requirements = commit.take_requirements();
+ let table_updates = commit.take_updates();
+
+ let table = self.load_table(&identifier).await?;
+ let mut update_table_metadata_builder =
+ TableMetadataBuilder::new_from_metadata(table.metadata().clone(), None);
+
+ for table_update in table_updates {
+ update_table_metadata_builder = table_update.apply(update_table_metadata_builder)?;
+ }
+
+ for table_requirement in _requirements {
+ table_requirement.check(Some(table.metadata()))?;
+ }
+
+ let new_table_meta_location = metadata_path(table.metadata().location(), Uuid::new_v4());
+ let file = self.fileio.new_output(&new_table_meta_location)?;
+ let update_table_metadata = update_table_metadata_builder.build()?;
+ file.write(serde_json::to_vec(&update_table_metadata.metadata)?.into())
+ .await?;
+
+ let update = format!(
+ "UPDATE {CATALOG_TABLE_NAME}
+ SET {CATALOG_FIELD_METADATA_LOCATION_PROP} = ?
+ WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
+ AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
+ AND {CATALOG_FIELD_TABLE_NAME} = ?
+ AND (
+ {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
+ OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
+ )"
+ );
+
+ let namespace_name = identifier.namespace().join(".");
+ let args: Vec