Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: Add partition-loaded and make force-release selectable #256

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions models/collection_loaded.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ type CollectionLoaded struct {
Status LoadStatus
FieldIndexID map[int64]int64

// orignial etcd key
key string
// orignial etcd Key
Key string
Version string
}

Expand All @@ -46,7 +46,7 @@ func NewCollectionLoadedV2_1(info *querypb.CollectionInfo, key string) *Collecti
c.InMemoryPercentage = info.GetInMemoryPercentage()
c.ReplicaIDs = info.GetReplicaIds()
c.Version = LTEVersion2_1
c.key = key
c.Key = key

return c
}
Expand All @@ -57,7 +57,7 @@ func NewCollectionLoadedV2_2(info *querypbv2.CollectionLoadInfo, key string) *Co
c.Status = LoadStatus(info.GetStatus())
c.FieldIndexID = info.GetFieldIndexID()
c.Version = GTEVersion2_2
c.key = key
c.Key = key
return c
}

Expand Down
27 changes: 27 additions & 0 deletions models/partition_loaded.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package models

import (
querypbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/querypb"
)

type PartitionLoaded struct {
CollectionID int64
PartitionID int64
ReplicaNumber int32

Status LoadStatus

Key string
Version string
}

func NewPartitionLoaded(info *querypbv2.PartitionLoadInfo, key string) *PartitionLoaded {
return &PartitionLoaded{
CollectionID: info.GetCollectionID(),
PartitionID: info.GetPartitionID(),
ReplicaNumber: info.GetReplicaNumber(),
Status: LoadStatus(info.GetStatus()),
Version: GTEVersion2_2,
Key: key,
}
}
3 changes: 0 additions & 3 deletions states/backup_mock_connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,6 @@ func (s *embedEtcdMockState) SetupCommands() {
// inspect-pk
getInspectPKCmd(s.client, rootPath),

// force-release
getForceReleaseCmd(s.client, rootPath),

// for testing
etcd.RepairCommand(s.client, rootPath),

Expand Down
5 changes: 4 additions & 1 deletion states/etcd/audit/audit.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,12 @@ func (c *FileAuditKV) Get(ctx context.Context, key string, opts ...clientv3.OpOp

// Delete deletes a key, or optionally using WithRange(end), [key, end).
func (c *FileAuditKV) Delete(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.DeleteResponse, error) {
fmt.Println("audio delete", key)
fmt.Println("audit delete", key)
opts = append(opts, clientv3.WithPrevKV())
resp, err := c.cli.Delete(ctx, key, opts...)
if err != nil {
return resp, err
}
c.writeHeader(models.OpDel, int32(len(resp.PrevKvs)))
for _, kv := range resp.PrevKvs {
c.writeLogKV(kv)
Expand Down
4 changes: 3 additions & 1 deletion states/etcd/common/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ const (
// CollectionLoadPrefix is prefix for querycoord collection loaded in milvus v2.1.x
CollectionLoadPrefix = "queryCoord-collectionMeta"
// CollectionLoadPrefixV2 is prefix for querycoord collection loaded in milvus v2.2.x
CollectionLoadPrefixV2 = "querycoord-collection-loadinfo"
CollectionLoadPrefixV2 = "querycoord-collection-loadinfo"
PartitionLoadedPrefixLegacy = "queryCoord-partitionMeta"
PartitionLoadedPrefix = "querycoord-partition-loadinfo"
)

var (
Expand Down
8 changes: 5 additions & 3 deletions states/etcd/common/collection_loaded.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@ import (
)

// ListCollectionLoadedInfo returns collection loaded info with provided version.
func ListCollectionLoadedInfo(ctx context.Context, cli clientv3.KV, basePath string, version string, filters ...func(any) bool) ([]*models.CollectionLoaded, error) {
func ListCollectionLoadedInfo(ctx context.Context, cli clientv3.KV, basePath string, version string, filters ...func(cl *models.CollectionLoaded) bool) ([]*models.CollectionLoaded, error) {
switch version {
case models.LTEVersion2_1:
prefix := path.Join(basePath, CollectionLoadPrefix)
infos, paths, err := ListProtoObjects(ctx, cli, prefix, func(info *querypb.CollectionInfo) bool {
cl := models.NewCollectionLoadedV2_1(info, "")
for _, filter := range filters {
if !filter(info) {
if !filter(cl) {
return false
}
}
Expand All @@ -34,8 +35,9 @@ func ListCollectionLoadedInfo(ctx context.Context, cli clientv3.KV, basePath str
case models.GTEVersion2_2:
prefix := path.Join(basePath, CollectionLoadPrefixV2)
infos, paths, err := ListProtoObjects(ctx, cli, prefix, func(info *querypbv2.CollectionLoadInfo) bool {
cl := models.NewCollectionLoadedV2_2(info, "")
for _, filter := range filters {
if !filter(info) {
if !filter(cl) {
return false
}
}
Expand Down
37 changes: 37 additions & 0 deletions states/etcd/common/partition_loaded.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package common

import (
"context"
"errors"
"path"

"github.com/milvus-io/birdwatcher/models"
querypbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/querypb"
"github.com/samber/lo"
clientv3 "go.etcd.io/etcd/client/v3"
)

func ListPartitionLoadedInfo(ctx context.Context, cli clientv3.KV, basePath string, version string, filters ...func(*models.PartitionLoaded) bool) ([]*models.PartitionLoaded, error) {
switch version {
case models.GTEVersion2_2:
prefix := path.Join(basePath, PartitionLoadedPrefix)
infos, paths, err := ListProtoObjects(ctx, cli, prefix, func(info *querypbv2.PartitionLoadInfo) bool {
pl := models.NewPartitionLoaded(info, "")
for _, filter := range filters {
if !filter(pl) {
return false
}
}
return true
})
if err != nil {
return nil, err
}

return lo.Map(infos, func(info querypbv2.PartitionLoadInfo, idx int) *models.PartitionLoaded {
return models.NewPartitionLoaded(&info, paths[idx])
}), nil
default:
return nil, errors.New("version not supported")
}
}
31 changes: 15 additions & 16 deletions states/etcd/show/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (
"github.com/cockroachdb/errors"
"github.com/milvus-io/birdwatcher/framework"
"github.com/milvus-io/birdwatcher/models"
"github.com/milvus-io/birdwatcher/proto/v2.0/commonpb"
"github.com/milvus-io/birdwatcher/proto/v2.0/datapb"
"github.com/milvus-io/birdwatcher/proto/v2.0/internalpb"
"github.com/milvus-io/birdwatcher/states/etcd/common"
etcdversion "github.com/milvus-io/birdwatcher/states/etcd/version"
Expand All @@ -37,22 +35,22 @@ func (c *ComponentShow) CheckpointCommand(ctx context.Context, p *CheckpointPara
VirtualName: channel.VirtualName,
},
}
var cp *internalpb.MsgPosition
var cp *models.MsgPosition
var segmentID int64
var err error

cp, err = c.getChannelCheckpoint(ctx, channel.VirtualName)
if err == nil {
checkpoint.Source = "Channel Checkpoint"
checkpoint.Checkpoint = models.NewMsgPosition(cp)
checkpoint.Checkpoint = cp
checkpoints = append(checkpoints, checkpoint)
continue
}

cp, segmentID, err = c.getCheckpointFromSegments(ctx, p.CollectionID, channel.VirtualName)
if err == nil {
checkpoint.Source = fmt.Sprintf("from segment id %d", segmentID)
checkpoint.Checkpoint = models.NewMsgPosition(cp)
checkpoint.Checkpoint = cp
checkpoints = append(checkpoints, checkpoint)
continue
}
Expand Down Expand Up @@ -93,9 +91,9 @@ func (rs *Checkpoints) PrintAs(format framework.Format) string {
return ""
}

func (c *ComponentShow) getChannelCheckpoint(ctx context.Context, channelName string) (*internalpb.MsgPosition, error) {
func (c *ComponentShow) getChannelCheckpoint(ctx context.Context, channelName string) (*models.MsgPosition, error) {
prefix := path.Join(c.basePath, "datacoord-meta", "channel-cp", channelName)
results, _, err := common.ListProtoObjects[internalpb.MsgPosition](context.Background(), c.client, prefix)
results, _, err := common.ListProtoObjects[internalpb.MsgPosition](ctx, c.client, prefix)
if err != nil {
return nil, err
}
Expand All @@ -104,11 +102,12 @@ func (c *ComponentShow) getChannelCheckpoint(ctx context.Context, channelName st
return nil, fmt.Errorf("expected 1 position but got %d", len(results))
}

return &results[0], nil
pos := models.NewMsgPosition(&results[0])
return pos, nil
}

func (c *ComponentShow) getCheckpointFromSegments(ctx context.Context, collID int64, vchannel string) (*internalpb.MsgPosition, int64, error) {
segments, err := common.ListSegments(c.client, c.basePath, func(info *datapb.SegmentInfo) bool {
func (c *ComponentShow) getCheckpointFromSegments(ctx context.Context, collID int64, vchannel string) (*models.MsgPosition, int64, error) {
segments, err := common.ListSegmentsVersion(ctx, c.client, c.basePath, etcdversion.GetVersion(), func(info *models.Segment) bool {
return info.CollectionID == collID && info.InsertChannel == vchannel
})
if err != nil {
Expand All @@ -117,18 +116,18 @@ func (c *ComponentShow) getCheckpointFromSegments(ctx context.Context, collID in
}
fmt.Printf("find segments to list checkpoint for %s, segment found %d\n", vchannel, len(segments))
var segmentID int64
var pos *internalpb.MsgPosition
var pos *models.MsgPosition
for _, segment := range segments {
if segment.State != commonpb.SegmentState_Flushed &&
segment.State != commonpb.SegmentState_Growing &&
segment.State != commonpb.SegmentState_Flushing {
if segment.State != models.SegmentStateFlushed &&
segment.State != models.SegmentStateGrowing &&
segment.State != models.SegmentStateFlushing {
continue
}
// skip all empty segment
if segment.GetDmlPosition() == nil && segment.GetStartPosition() == nil {
continue
}
var segPos *internalpb.MsgPosition
var segPos *models.MsgPosition

if segment.GetDmlPosition() != nil {
segPos = segment.GetDmlPosition()
Expand All @@ -138,7 +137,7 @@ func (c *ComponentShow) getCheckpointFromSegments(ctx context.Context, collID in

if pos == nil || segPos.GetTimestamp() < pos.GetTimestamp() {
pos = segPos
segmentID = segment.GetID()
segmentID = segment.ID
}
}

Expand Down
3 changes: 2 additions & 1 deletion states/etcd/show/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ func (c *ComponentShow) CollectionCommand(ctx context.Context, p *CollectionPara
if p.DatabaseID > -1 && coll.DBID != p.DatabaseID {
return false
}
if p.State != "" && strings.ToLower(p.State) != strings.ToLower(coll.State.String()) {
if p.State != "" && !strings.EqualFold(p.State, coll.State.String()) {
return false
}

total++
return true
})
Expand Down
11 changes: 2 additions & 9 deletions states/etcd/show/collection_loaded.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/milvus-io/birdwatcher/models"
"github.com/milvus-io/birdwatcher/states/etcd/common"
etcdversion "github.com/milvus-io/birdwatcher/states/etcd/version"
"github.com/samber/lo"
)

const (
Expand All @@ -25,20 +24,14 @@ type CollectionLoadedParam struct {
// CollectionLoadedCommand return show collection-loaded command.
func (c *ComponentShow) CollectionLoadedCommand(ctx context.Context, p *CollectionLoadedParam) (*CollectionsLoaded, error) {
var total int
infos, err := common.ListCollectionLoadedInfo(ctx, c.client, c.basePath, etcdversion.GetVersion(), func(_ any) bool {
infos, err := common.ListCollectionLoadedInfo(ctx, c.client, c.basePath, etcdversion.GetVersion(), func(info *models.CollectionLoaded) bool {
total++
return true
return p.CollectionID == 0 || p.CollectionID == info.CollectionID
})
if err != nil {
return nil, errors.Wrap(err, "failed to list collection load info")
}

if p.CollectionID > 0 {
infos = lo.Filter(infos, func(info *models.CollectionLoaded, _ int) bool {
return info.CollectionID == p.CollectionID
})
}

return framework.NewListResult[CollectionsLoaded](infos), nil
}

Expand Down
4 changes: 2 additions & 2 deletions states/etcd/show/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func printIndex(index IndexInfoV1) {
)
fmt.Printf("Index Params: %s\n", common.GetKVPair(index.info.GetIndexParams(), "params"))
for _, v := range indexParams {
fmt.Printf("%s:%s", v.GetKey(), v.GetValue())
fmt.Printf("%s:%s\n", v.GetKey(), v.GetValue())
}
fmt.Println("==================================================================")
}
Expand All @@ -101,7 +101,7 @@ func printIndexV2(index indexpbv2.FieldIndex) {
)
fmt.Printf("Index Params: %s\n", common.GetKVPair(index.GetIndexInfo().GetUserIndexParams(), "params"))
for _, v := range indexParams {
fmt.Printf("%s:%s", v.GetKey(), v.GetValue())
fmt.Printf("%s:%s\n", v.GetKey(), v.GetValue())
}
fmt.Println("==================================================================")
}
57 changes: 57 additions & 0 deletions states/etcd/show/partition_loaded.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package show

import (
"context"
"fmt"
"strings"

"github.com/milvus-io/birdwatcher/framework"
"github.com/milvus-io/birdwatcher/models"
"github.com/milvus-io/birdwatcher/states/etcd/common"
etcdversion "github.com/milvus-io/birdwatcher/states/etcd/version"
)

type PartitionLoadedParam struct {
framework.ParamBase `use:"show partition-loaded" desc:"display the information of loaded partition(s) from querycoord meta"`
CollectionID int64 `name:"collection" default:"0" desc:"collection id to filter with"`
PartitionID int64 `name:"partition" default:"0" desc:"partition id to filter with"`
}

func (c *ComponentShow) PartitionLoadedCommand(ctx context.Context, p *PartitionLoadedParam) (*PartitionsLoaded, error) {
partitions, err := common.ListPartitionLoadedInfo(ctx, c.client, c.basePath, etcdversion.GetVersion(), func(pl *models.PartitionLoaded) bool {
return (p.CollectionID == 0 || p.CollectionID == pl.CollectionID) &&
(p.PartitionID == 0 || p.PartitionID == pl.PartitionID)
})
if err != nil {
return nil, err
}
return framework.NewListResult[PartitionsLoaded](partitions), nil
}

type PartitionsLoaded struct {
framework.ListResultSet[*models.PartitionLoaded]
}

func (rs *PartitionsLoaded) PrintAs(format framework.Format) string {
switch format {
case framework.FormatDefault, framework.FormatPlain:
sb := &strings.Builder{}
for _, info := range rs.Data {
rs.printPartitionLoaded(sb, info)
}
fmt.Fprintf(sb, "--- Partitions Loaded: %d\n", len(rs.Data))
return sb.String()
default:
}
return ""
}

func (rs *PartitionsLoaded) printPartitionLoaded(sb *strings.Builder, info *models.PartitionLoaded) {
fmt.Fprintf(sb, "CollectionID: %d\tPartitionID: %d\n", info.CollectionID, info.PartitionID)
fmt.Fprintf(sb, "ReplicaNumber: %d", info.ReplicaNumber)
switch info.Version {
case models.LTEVersion2_1:
case models.GTEVersion2_2:
fmt.Fprintf(sb, "\tLoadStatus: %s\n", info.Status.String())
}
}
Loading
Loading