Skip to content

Commit

Permalink
refactor!: refactor shard version logic (#1286)
Browse files Browse the repository at this point in the history
## Rationale
For details, see: apache/incubator-horaedb-meta#263

## Detailed Changes
* Modify the return value of `CreateTableOnShard` & `DropTableOnShard`
to return the latest shard version.

## Test Plan
Pass all unit tests and integration test.

---------

Co-authored-by: xikai.wxk <[email protected]>
Co-authored-by: WEI Xikai <[email protected]>
  • Loading branch information
3 people authored Nov 9, 2023
1 parent c33ab01 commit 642934a
Show file tree
Hide file tree
Showing 8 changed files with 243 additions and 120 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,8 @@ bytes = "1"
bytes_ext = { path = "components/bytes_ext" }
catalog = { path = "catalog" }
catalog_impls = { path = "catalog_impls" }
ceresdbproto = "1.0.21"
ceresdbproto = "1.0.22"
codec = { path = "components/codec" }
notifier = { path = "components/notifier" }
chrono = "0.4"
clap = "3.0"
clru = "0.6.1"
Expand All @@ -114,21 +113,22 @@ generic_error = { path = "components/generic_error" }
hash_ext = { path = "components/hash_ext" }
hex = "0.4.3"
hyperloglog = { git = "https://github.com/jedisct1/rust-hyperloglog.git", rev = "425487ce910f26636fbde8c4d640b538431aad50" }
lz4_flex = { version = "0.11", default-features = false, features = ["frame"] }
lazy_static = "1.4.0"
logger = { path = "components/logger" }
lru = "0.7.6"
id_allocator = { path = "components/id_allocator" }
influxql-logical-planner = { git = "https://github.com/CeresDB/influxql.git", rev = "acbd3ad7651f2deb74857155bea892f88926da57", package = "iox_query_influxql" }
influxql-parser = { git = "https://github.com/CeresDB/influxql.git", rev = "acbd3ad7651f2deb74857155bea892f88926da57", package = "influxdb_influxql_parser" }
influxql-query = { git = "https://github.com/CeresDB/influxql.git", rev = "acbd3ad7651f2deb74857155bea892f88926da57", package = "iox_query" }
influxql-schema = { git = "https://github.com/CeresDB/influxql.git", rev = "acbd3ad7651f2deb74857155bea892f88926da57", package = "schema" }
interpreters = { path = "interpreters" }
itertools = "0.10.5"
lz4_flex = { version = "0.11", default-features = false, features = ["frame"] }
lazy_static = "1.4.0"
logger = { path = "components/logger" }
lru = "0.7.6"
macros = { path = "components/macros" }
message_queue = { path = "components/message_queue" }
meta_client = { path = "meta_client" }
metric_ext = { path = "components/metric_ext" }
notifier = { path = "components/notifier" }
object_store = { path = "components/object_store" }
panic_ext = { path = "components/panic_ext" }
partitioned_lock = { path = "components/partitioned_lock" }
Expand Down
16 changes: 4 additions & 12 deletions catalog_impls/src/volatile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,20 +326,12 @@ impl Schema for SchemaImpl {
// Do real create table.
// Partition table is not stored in ShardTableManager.
if request.params.partition_info.is_none() {
let shard =
self.shard_set
.get(request.shard_id)
.with_context(|| schema::CreateTable {
request: request.clone(),
msg: "shard not found".to_string(),
})?;

// TODO: seems unnecessary?
let _ = shard
.find_table(&request.params.schema_name, &request.params.table_name)
let _ = self
.shard_set
.get(request.shard_id)
.with_context(|| schema::CreateTable {
request: request.clone(),
msg: "table not found in shard".to_string(),
msg: "shard not found".to_string(),
})?;
}
let request = request.into_engine_create_request(None, self.schema_id);
Expand Down
154 changes: 94 additions & 60 deletions cluster/src/shard_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use catalog::{
},
table_operator::TableOperator,
};
use common_types::table::ShardVersion;
use generic_error::BoxError;
use logger::info;
use snafu::ResultExt;
Expand Down Expand Up @@ -219,24 +220,16 @@ impl ShardOperator {
Ok(())
}

pub async fn create_table(&self, ctx: CreateTableContext) -> Result<()> {
let shard_info = ctx.updated_table_info.shard_info.clone();
let table_info = ctx.updated_table_info.table_info.clone();
pub async fn create_table(&self, ctx: CreateTableContext) -> Result<ShardVersion> {
let shard_info = &ctx.updated_table_info.shard_info;
let table_info = &ctx.updated_table_info.table_info;

info!(
"ShardOperator create table sequentially begin, shard_info:{shard_info:?}, table_info:{table_info:?}",
"ShardOperator create table sequentially begin, shard_id:{}, table:{}, shard_info:{shard_info:?}, table_info:{table_info:?}",
shard_info.id,
table_info.name,
);

// FIXME: maybe should insert table from cluster after having created table.
{
let mut data = self.data.write().unwrap();
data.try_insert_table(ctx.updated_table_info)
.box_err()
.with_context(|| CreateTableWithCause {
msg: format!("shard_info:{shard_info:?}, table_info:{table_info:?}"),
})?;
}

// Create the table by operator afterwards.
let (table_engine, partition_info) = match table_info.partition_info.clone() {
Some(v) => (ctx.partition_table_engine.clone(), Some(v)),
Expand Down Expand Up @@ -275,30 +268,37 @@ impl ShardOperator {
})?;

info!(
"ShardOperator create table sequentially finish, shard_info:{shard_info:?}, table_info:{table_info:?}",
"ShardOperator table is created by operator, shard_id:{}, table:{}",
shard_info.id, table_info.name,
);

Ok(())
let latest_version = {
let mut data = self.data.write().unwrap();
data.try_create_table(ctx.updated_table_info.clone())
.box_err()
.with_context(|| CreateTableWithCause {
msg: format!("shard_info:{shard_info:?}, table_info:{table_info:?}"),
})?
};

info!(
"ShardOperator create table sequentially finish, shard_id:{}, shard_version:{}, table:{}",
shard_info.id, shard_info.version, table_info.name,
);

Ok(latest_version)
}

pub async fn drop_table(&self, ctx: DropTableContext) -> Result<()> {
let shard_info = ctx.updated_table_info.shard_info.clone();
let table_info = ctx.updated_table_info.table_info.clone();
pub async fn drop_table(&self, ctx: DropTableContext) -> Result<ShardVersion> {
let shard_info = &ctx.updated_table_info.shard_info;
let table_info = &ctx.updated_table_info.table_info;

info!(
"ShardOperator drop table sequentially begin, shard_info:{shard_info:?}, table_info:{table_info:?}",
"ShardOperator drop table sequentially begin, shard_id:{}, table:{}, shard_info:{shard_info:?}, table_info:{table_info:?}",
shard_info.id,
table_info.name,
);

// FIXME: maybe should insert table from cluster after having dropped table.
{
let mut data = self.data.write().unwrap();
data.try_remove_table(ctx.updated_table_info)
.box_err()
.with_context(|| DropTableWithCause {
msg: format!("shard_info:{shard_info:?}, table_info:{table_info:?}"),
})?;
}

// Drop the table by operator afterwards.
let drop_table_request = DropTableRequest {
catalog_name: ctx.catalog,
Expand All @@ -319,31 +319,41 @@ impl ShardOperator {
})?;

info!(
"ShardOperator drop table sequentially finish, shard_info:{shard_info:?}, table_info:{table_info:?}",
"ShardOperator table is dropped, shard_id:{}, table:{}",
shard_info.id, table_info.name,
);

Ok(())
// Update the shard info after the table is dropped.
let latest_version = {
let mut data = self.data.write().unwrap();
data.try_drop_table(ctx.updated_table_info.clone())
.box_err()
.with_context(|| DropTableWithCause {
msg: format!("shard_info:{shard_info:?}, table_info:{table_info:?}"),
})?
};

info!(
"ShardOperator drop table sequentially finish, latest_version:{latest_version}, shard_id:{}, old_shard_version:{}, table:{}",
shard_info.id,
shard_info.version,
table_info.name,
);

Ok(latest_version)
}

pub async fn open_table(&self, ctx: OpenTableContext) -> Result<()> {
let shard_info = ctx.updated_table_info.shard_info.clone();
let table_info = ctx.updated_table_info.table_info.clone();
let shard_info = &ctx.updated_table_info.shard_info;
let table_info = &ctx.updated_table_info.table_info;

info!(
"ShardOperator open table sequentially begin, shard_info:{shard_info:?}, table_info:{table_info:?}",
"ShardOperator open table sequentially begin, shard_id:{}, table:{}, shard_info:{shard_info:?}, table_info:{table_info:?}",
shard_info.id,
table_info.name,
);

// FIXME: maybe should insert table from cluster after having opened table.
{
let mut data = self.data.write().unwrap();
data.try_insert_table(ctx.updated_table_info)
.box_err()
.with_context(|| OpenTableWithCause {
msg: format!("shard_info:{shard_info:?}, table_info:{table_info:?}"),
})?;
}

// Open the table by operator afterwards.
// Open the table by operator.
let open_table_request = OpenTableRequest {
catalog_name: ctx.catalog,
schema_name: table_info.schema_name.clone(),
Expand All @@ -366,28 +376,34 @@ impl ShardOperator {
})?;

info!(
"ShardOperator open table sequentially finish, shard_info:{shard_info:?}, table_info:{table_info:?}",
"ShardOperator table is opened by operator, shard_id:{}, table:{}",
shard_info.id, table_info.name
);

Ok(())
}

pub async fn close_table(&self, ctx: CloseTableContext) -> Result<()> {
let shard_info = ctx.updated_table_info.shard_info.clone();
let table_info = ctx.updated_table_info.table_info.clone();

info!("ShardOperator close table sequentially begin, shard_info:{shard_info:?}, table_info:{table_info:?}");

// FIXME: maybe should remove table from cluster after having closed table.
// Update the shard info after the table is opened.
{
let mut data = self.data.write().unwrap();
data.try_remove_table(ctx.updated_table_info)
data.try_open_table(ctx.updated_table_info.clone())
.box_err()
.with_context(|| CloseTableWithCause {
.with_context(|| OpenTableWithCause {
msg: format!("shard_info:{shard_info:?}, table_info:{table_info:?}"),
})?;
}

info!(
"ShardOperator open table sequentially finish, shard_id:{}, table:{}",
shard_info.id, table_info.name
);

Ok(())
}

pub async fn close_table(&self, ctx: CloseTableContext) -> Result<()> {
let shard_info = &ctx.updated_table_info.shard_info;
let table_info = &ctx.updated_table_info.table_info;

info!("ShardOperator close table sequentially begin, shard_id:{}, table:{}, shard_info:{shard_info:?}, table_info:{table_info:?}", shard_info.id, table_info.name);

// Close the table by catalog manager afterwards.
let close_table_request = CloseTableRequest {
catalog_name: ctx.catalog,
Expand All @@ -409,7 +425,25 @@ impl ShardOperator {
msg: format!("shard_info:{shard_info:?}, table_info:{table_info:?}"),
})?;

info!("ShardOperator close table sequentially finish, shard_info:{shard_info:?}, table_info:{table_info:?}");
info!(
"ShardOperator table is closed by operator, shard_id:{}, table:{}",
shard_info.id, table_info.name
);

// Update the shard info after the table is closed.
{
let mut data = self.data.write().unwrap();
data.try_close_table(ctx.updated_table_info.clone())
.box_err()
.with_context(|| CloseTableWithCause {
msg: format!("shard_info:{shard_info:?}, table_info:{table_info:?}"),
})?;
}

info!(
"ShardOperator close table sequentially finish, shard_id:{}, table:{}",
shard_info.id, table_info.name
);

Ok(())
}
Expand Down
Loading

0 comments on commit 642934a

Please sign in to comment.