Skip to content
This repository has been archived by the owner on Feb 6, 2024. It is now read-only.

Commit

Permalink
refactor!: refactor shard version logic (#264)
Browse files Browse the repository at this point in the history
## Rationale
Refer to this issue: #263

## Detailed Changes
* Reconstruct the process of create/drop table so that the update of
shard version depends on CeresDB

## Test Plan
Pass existing unit tests and integration tests.
  • Loading branch information
ZuLiangWang authored Nov 8, 2023
1 parent b4402f7 commit 7afb295
Show file tree
Hide file tree
Showing 26 changed files with 233 additions and 262 deletions.
2 changes: 0 additions & 2 deletions .github/workflows/check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ jobs:
- run: |
make install-tools
make test
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v3
integration-test:
runs-on: ubuntu-latest
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/CeresDB/ceresmeta
go 1.21

require (
github.com/CeresDB/ceresdbproto/golang v0.0.0-20231012091414-cdaeab9f7f4d
github.com/CeresDB/ceresdbproto/golang v0.0.0-20231108080833-ca110f5a966a
github.com/caarlos0/env/v6 v6.10.1
github.com/julienschmidt/httprouter v1.3.0
github.com/looplab/fsm v0.3.0
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
github.com/BurntSushi/toml v1.1.0 h1:ksErzDEI1khOiGPgpwuI7x2ebx/uXQNw7xJpn9Eq1+I=
github.com/BurntSushi/toml v1.1.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/CeresDB/ceresdbproto/golang v0.0.0-20231012091414-cdaeab9f7f4d h1:xjRsXHTX++lOhAwHStjsTgUMsqaMazYMZm9T3EKt+2E=
github.com/CeresDB/ceresdbproto/golang v0.0.0-20231012091414-cdaeab9f7f4d/go.mod h1:qLTh6jtSu2ZLIFsU3iiDIKvkrQvyY/Csg6Mk0Ub0QZ4=
github.com/CeresDB/ceresdbproto/golang v0.0.0-20231108080833-ca110f5a966a h1:YxgWd9tpn7IOkGrFQjGbJqa9wnVR1FkdRFvv3ZLqllw=
github.com/CeresDB/ceresdbproto/golang v0.0.0-20231108080833-ca110f5a966a/go.mod h1:qLTh6jtSu2ZLIFsU3iiDIKvkrQvyY/Csg6Mk0Ub0QZ4=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
Expand Down Expand Up @@ -324,6 +324,8 @@ github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/zuliangwang/ceresdbproto/golang v0.0.0-20231106082618-b7e1fc49a3de h1:AMfL1AEmlDt+gvePve1U8ly52BELslBVmea0gk20B/Y=
github.com/zuliangwang/ceresdbproto/golang v0.0.0-20231106082618-b7e1fc49a3de/go.mod h1:qLTh6jtSu2ZLIFsU3iiDIKvkrQvyY/Csg6Mk0Ub0QZ4=
go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/bbolt v1.3.6 h1:/ecaJf0sk1l4l6V4awd65v2C3ILy7MSj+s/x1ADCIMU=
go.etcd.io/bbolt v1.3.6/go.mod h1:qXsaaIqmgQH0T+OPdb99Bf+PKfBBQVAdyD6TY9G8XM4=
Expand Down
36 changes: 35 additions & 1 deletion server/cluster/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,13 +290,47 @@ func (m *managerImpl) GetTablesByShardIDs(clusterName, _ string, shardIDs []stor
return shardTables, nil
}

// DropTable is only used for the HTTP interface.
// It only deletes the table data in ETCD and does not initiate a table deletion request to CeresDB.
func (m *managerImpl) DropTable(ctx context.Context, clusterName, schemaName, tableName string) error {
cluster, err := m.getCluster(clusterName)
if err != nil {
return errors.WithMessage(err, "get cluster")
}

_, err = cluster.metadata.DropTable(ctx, schemaName, tableName)
table, ok, err := cluster.metadata.GetTable(schemaName, tableName)
if !ok {
return metadata.ErrTableNotFound
}
if err != nil {
return errors.WithMessage(err, "get table")
}

getShardNodeResult, err := cluster.metadata.GetShardNodeByTableIDs([]storage.TableID{table.ID})
if err != nil {
return errors.WithMessage(err, "get shard node by tableID")
}

if _, ok := getShardNodeResult.ShardNodes[table.ID]; !ok {
return metadata.ErrShardNotFound
}

if len(getShardNodeResult.ShardNodes[table.ID]) != 1 || len(getShardNodeResult.Version) != 1 {
return metadata.ErrShardNotFound
}

shardID := getShardNodeResult.ShardNodes[table.ID][0].ID
version, ok := getShardNodeResult.Version[shardID]
if !ok {
return metadata.ErrVersionNotFound
}

err = cluster.metadata.DropTable(ctx, metadata.DropTableRequest{
SchemaName: schemaName,
TableName: tableName,
ShardID: shardID,
LatestVersion: version,
})
if err != nil {
return errors.WithMessage(err, "cluster drop table")
}
Expand Down
1 change: 1 addition & 0 deletions server/cluster/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ func testCreateTable(ctx context.Context, re *require.Assertions, manager cluste
re.NoError(err)
_, err = c.GetMetadata().CreateTable(ctx, metadata.CreateTableRequest{
ShardID: shardID,
LatestVersion: 0,
SchemaName: schema,
TableName: tableName,
PartitionInfo: storage.PartitionInfo{Info: nil},
Expand Down
57 changes: 26 additions & 31 deletions server/cluster/metadata/cluster_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,41 +191,37 @@ func (c *ClusterMetadata) GetShardTables(shardIDs []storage.ShardID) map[storage

// DropTable will drop table metadata and all mapping of this table.
// If the table to be dropped has been opened multiple times, all its mapping will be dropped.
func (c *ClusterMetadata) DropTable(ctx context.Context, schemaName, tableName string) (DropTableResult, error) {
c.logger.Info("drop table start", zap.String("cluster", c.Name()), zap.String("schemaName", schemaName), zap.String("tableName", tableName))
func (c *ClusterMetadata) DropTable(ctx context.Context, request DropTableRequest) error {
c.logger.Info("drop table start", zap.String("cluster", c.Name()), zap.String("schemaName", request.SchemaName), zap.String("tableName", request.TableName))

var dropRes DropTableResult
if !c.ensureClusterStable() {
return dropRes, errors.WithMessage(ErrClusterStateInvalid, "invalid cluster state, cluster state must be stable")
return errors.WithMessage(ErrClusterStateInvalid, "invalid cluster state, cluster state must be stable")
}

table, ok, err := c.tableManager.GetTable(schemaName, tableName)
table, ok, err := c.tableManager.GetTable(request.SchemaName, request.TableName)
if err != nil {
return dropRes, errors.WithMessage(err, "get table")
return errors.WithMessage(err, "get table")
}

if !ok {
return dropRes, ErrTableNotFound
return ErrTableNotFound
}

// Drop table.
err = c.tableManager.DropTable(ctx, schemaName, tableName)
err = c.tableManager.DropTable(ctx, request.SchemaName, request.TableName)
if err != nil {
return dropRes, errors.WithMessage(err, "table manager drop table")
return errors.WithMessage(err, "table manager drop table")
}

// Remove dropped table in shard view.
updateVersions, err := c.topologyManager.EvictTable(ctx, table.ID)
err = c.topologyManager.RemoveTable(ctx, request.ShardID, request.LatestVersion, []storage.TableID{table.ID})
if err != nil {
return dropRes, errors.WithMessage(err, "topology manager remove table")
return errors.WithMessage(err, "topology manager remove table")
}

dropRes = DropTableResult{
ShardVersionUpdate: updateVersions,
}
c.logger.Info("drop table success", zap.String("cluster", c.Name()), zap.String("schemaName", schemaName), zap.String("tableName", tableName), zap.String("dropResult", fmt.Sprintf("%+v", dropRes)))
c.logger.Info("drop table success", zap.String("cluster", c.Name()), zap.String("schemaName", request.SchemaName), zap.String("tableName", request.TableName))

return dropRes, nil
return nil
}

// MigrateTable used to migrate tables from old shard to new shard.
Expand Down Expand Up @@ -256,12 +252,12 @@ func (c *ClusterMetadata) MigrateTable(ctx context.Context, request MigrateTable
tableIDs = append(tableIDs, table.ID)
}

if _, err := c.topologyManager.RemoveTable(ctx, request.OldShardID, tableIDs); err != nil {
if err := c.topologyManager.RemoveTable(ctx, request.OldShardID, request.latestOldShardVersion, tableIDs); err != nil {
c.logger.Error("remove table from topology")
return err
}

if _, err := c.topologyManager.AddTable(ctx, request.NewShardID, tables); err != nil {
if err := c.topologyManager.AddTable(ctx, request.NewShardID, request.latestNewShardVersion, tables); err != nil {
c.logger.Error("add table from topology")
return err
}
Expand Down Expand Up @@ -310,25 +306,21 @@ func (c *ClusterMetadata) CreateTableMetadata(ctx context.Context, request Creat
return res, nil
}

func (c *ClusterMetadata) AddTableTopology(ctx context.Context, shardID storage.ShardID, table storage.Table) (CreateTableResult, error) {
func (c *ClusterMetadata) AddTableTopology(ctx context.Context, shardVersionUpdate ShardVersionUpdate, table storage.Table) error {
c.logger.Info("add table topology start", zap.String("cluster", c.Name()), zap.String("tableName", table.Name))

if !c.ensureClusterStable() {
return CreateTableResult{}, errors.WithMessage(ErrClusterStateInvalid, "invalid cluster state, cluster state must be stable")
return errors.WithMessage(ErrClusterStateInvalid, "invalid cluster state, cluster state must be stable")
}

// Add table to topology manager.
result, err := c.topologyManager.AddTable(ctx, shardID, []storage.Table{table})
err := c.topologyManager.AddTable(ctx, shardVersionUpdate.ShardID, shardVersionUpdate.LatestVersion, []storage.Table{table})
if err != nil {
return CreateTableResult{}, errors.WithMessage(err, "topology manager add table")
return errors.WithMessage(err, "topology manager add table")
}

ret := CreateTableResult{
Table: table,
ShardVersionUpdate: result,
}
c.logger.Info("add table topology succeed", zap.String("cluster", c.Name()), zap.String("result", fmt.Sprintf("%+v", ret)))
return ret, nil
c.logger.Info("add table topology succeed", zap.String("cluster", c.Name()), zap.String("table", fmt.Sprintf("%+v", table)), zap.String("shardVersionUpdate", fmt.Sprintf("%+v", shardVersionUpdate)))
return nil
}

func (c *ClusterMetadata) DropTableMetadata(ctx context.Context, schemaName, tableName string) (DropTableMetadataResult, error) {
Expand Down Expand Up @@ -381,14 +373,17 @@ func (c *ClusterMetadata) CreateTable(ctx context.Context, request CreateTableRe
}

// Add table to topology manager.
result, err := c.topologyManager.AddTable(ctx, request.ShardID, []storage.Table{table})
err = c.topologyManager.AddTable(ctx, request.ShardID, request.LatestVersion, []storage.Table{table})
if err != nil {
return CreateTableResult{}, errors.WithMessage(err, "topology manager add table")
}

ret := CreateTableResult{
Table: table,
ShardVersionUpdate: result,
Table: table,
ShardVersionUpdate: ShardVersionUpdate{
ShardID: request.ShardID,
LatestVersion: request.LatestVersion,
},
}
c.logger.Info("create table succeed", zap.String("cluster", c.Name()), zap.String("result", fmt.Sprintf("%+v", ret)))
return ret, nil
Expand Down
9 changes: 7 additions & 2 deletions server/cluster/metadata/cluster_metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ func testTableOperation(ctx context.Context, re *require.Assertions, m *metadata
// Test create table.
createResult, err := m.CreateTable(ctx, metadata.CreateTableRequest{
ShardID: 0,
LatestVersion: 0,
SchemaName: testSchema,
TableName: testTableName,
PartitionInfo: storage.PartitionInfo{Info: nil},
Expand Down Expand Up @@ -177,9 +178,13 @@ func testTableOperation(ctx context.Context, re *require.Assertions, m *metadata
re.Equal(storage.ShardID(1), routeResult.RouteEntries[testTableName].NodeShards[0].ShardInfo.ID)

// Drop table already created.
dropResult, err := m.DropTable(ctx, testSchema, testTableName)
err = m.DropTable(ctx, metadata.DropTableRequest{
SchemaName: testSchema,
TableName: testTableName,
ShardID: storage.ShardID(1),
LatestVersion: 0,
})
re.NoError(err)
re.Equal(storage.ShardID(1), dropResult.ShardVersionUpdate[0].ShardID)
}

func testShardOperation(ctx context.Context, re *require.Assertions, m *metadata.ClusterMetadata) {
Expand Down
1 change: 1 addition & 0 deletions server/cluster/metadata/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ var (
ErrSchemaNotFound = coderr.NewCodeError(coderr.NotFound, "schema not found")
ErrTableNotFound = coderr.NewCodeError(coderr.NotFound, "table not found")
ErrShardNotFound = coderr.NewCodeError(coderr.NotFound, "shard not found")
ErrVersionNotFound = coderr.NewCodeError(coderr.NotFound, "version not found")
ErrNodeNotFound = coderr.NewCodeError(coderr.NotFound, "NodeName not found")
ErrTableAlreadyExists = coderr.NewCodeError(coderr.Internal, "table already exists")
ErrOpenTable = coderr.NewCodeError(coderr.Internal, "open table")
Expand Down
Loading

0 comments on commit 7afb295

Please sign in to comment.