Skip to content

Commit

Permalink
Add fieldName in index
Browse files Browse the repository at this point in the history
Signed-off-by: wayblink <[email protected]>
  • Loading branch information
wayblink committed Nov 29, 2023
1 parent 18efb15 commit 3e24f79
Show file tree
Hide file tree
Showing 20 changed files with 3,161 additions and 683 deletions.
2 changes: 1 addition & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ type Client interface {
Flush(ctx context.Context, collName string, async bool) error
// FlushV2 flush collection, specified, return newly sealed segmentIds, all flushed segmentIds of the collection, seal time and error
// currently it is only used in milvus-backup(https://github.com/zilliztech/milvus-backup)
FlushV2(ctx context.Context, collName string, async bool) ([]int64, []int64, int64, error)
FlushV2(ctx context.Context, collName string, async bool) ([]int64, []int64, int64, map[string]entity.MsgPosition, error)
// DeleteByPks deletes entries related to provided primary keys
DeleteByPks(ctx context.Context, collName string, partitionName string, ids entity.Column) error
// Delete deletes entries match expression
Expand Down
2 changes: 1 addition & 1 deletion client/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (c *GrpcClient) NewCollection(ctx context.Context, collName string, dimensi
return err
}

idx := entity.NewGenericIndex("", "", map[string]string{
idx := entity.NewGenericIndex("", "", "", map[string]string{
"metric_type": string(opt.MetricsType),
})

Expand Down
3 changes: 2 additions & 1 deletion client/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,9 @@ func (c *GrpcClient) DescribeIndex(ctx context.Context, collName string, fieldNa
params := entity.KvPairsMap(info.Params)
it := params["index_type"] // TODO change to const
idx := entity.NewGenericIndex(
info.IndexName,
info.GetIndexName(),
entity.IndexType(it),
info.GetFieldName(),
params,
)
indexes = append(indexes, idx)
Expand Down
30 changes: 22 additions & 8 deletions client/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,30 +191,31 @@ func (c *GrpcClient) mergeDynamicColumns(dynamicName string, rowSize int, column
// Flush force collection to flush memory records into storage
// in sync mode, flush will wait all segments to be flushed
func (c *GrpcClient) Flush(ctx context.Context, collName string, async bool) error {
_, _, _, err := c.FlushV2(ctx, collName, async)
_, _, _, _, err := c.FlushV2(ctx, collName, async)
return err
}

// Flush force collection to flush memory records into storage
// in sync mode, flush will wait all segments to be flushed
func (c *GrpcClient) FlushV2(ctx context.Context, collName string, async bool) ([]int64, []int64, int64, error) {
func (c *GrpcClient) FlushV2(ctx context.Context, collName string, async bool) ([]int64, []int64, int64, map[string]entity.MsgPosition, error) {
if c.Service == nil {
return nil, nil, 0, ErrClientNotReady
return nil, nil, 0, nil, ErrClientNotReady
}
if err := c.checkCollectionExists(ctx, collName); err != nil {
return nil, nil, 0, err
return nil, nil, 0, nil, err
}
req := &milvuspb.FlushRequest{
DbName: "", // reserved,
CollectionNames: []string{collName},
}
resp, err := c.Service.Flush(ctx, req)
if err != nil {
return nil, nil, 0, err
return nil, nil, 0, nil, err
}
if err := handleRespStatus(resp.GetStatus()); err != nil {
return nil, nil, 0, err
return nil, nil, 0, nil, err
}
channelCPs := resp.GetChannelCps()
if !async {
segmentIDs, has := resp.GetCollSegIDs()[collName]
ids := segmentIDs.GetData()
Expand All @@ -223,24 +224,37 @@ func (c *GrpcClient) FlushV2(ctx context.Context, collName string, async bool) (
resp, err := c.Service.GetFlushState(ctx, &milvuspb.GetFlushStateRequest{
SegmentIDs: ids,
})

if err != nil {
// TODO max retry
return false
}
if !resp.GetFlushed() {
channelCPs = resp.GetChannelCps()
}
return resp.GetFlushed()
}
for !flushed() {
// respect context deadline/cancel
select {
case <-ctx.Done():
return nil, nil, 0, errors.New("deadline exceeded")
return nil, nil, 0, nil, errors.New("deadline exceeded")
default:
}
time.Sleep(200 * time.Millisecond)
}
}
}
return resp.GetCollSegIDs()[collName].GetData(), resp.GetFlushCollSegIDs()[collName].GetData(), resp.GetCollSealTimes()[collName], nil
channelCPEntities := make(map[string]entity.MsgPosition, len(channelCPs))
for k, v := range channelCPs {
channelCPEntities[k] = entity.MsgPosition{
ChannelName: v.GetChannelName(),
MsgID: v.GetMsgID(),
MsgGroup: v.GetMsgGroup(),
Timestamp: v.GetTimestamp(),
}
}
return resp.GetCollSegIDs()[collName].GetData(), resp.GetFlushCollSegIDs()[collName].GetData(), resp.GetCollSealTimes()[collName], channelCPEntities, nil
}

// DeleteByPks deletes entries related to provided primary keys
Expand Down
8 changes: 8 additions & 0 deletions entity/MsgPosition.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package entity

type MsgPosition struct {
ChannelName string
MsgID []byte
MsgGroup string
Timestamp uint64
}
54 changes: 35 additions & 19 deletions entity/genidx/genidx.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
//go:build ignore
// +build ignore

// Copyright (C) 2019-2021 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
Expand Down Expand Up @@ -55,6 +52,11 @@ func(i *Index{{.IdxName}}) IndexType() IndexType {
return IndexType("{{.IdxType}}")
}
// FieldName returns FieldName, implementing Index interface
func(i *Index{{.IdxName}}) FieldName() string {
return "{{.FieldName}}"
}
// SupportBinary returns whether index type support binary vector
func(i *Index{{.IdxName}}) SupportBinary() bool {
return {{.VectorSupport}} & 2 > 0
Expand Down Expand Up @@ -223,6 +225,7 @@ func TestIndex{{.IdxName}}SearchParam(t *testing.T) {
type idxDef struct {
IdxName string
IdxType entity.IndexType
FieldName string
VectorSupport int8
ConstructParams []idxParam
SearchParams []idxParam
Expand Down Expand Up @@ -329,6 +332,7 @@ func main() {
{
IdxName: "Flat",
IdxType: entity.Flat,
FieldName: "vec_field",
ConstructParams: []idxParam{},
SearchParams: []idxParam{},
ValidExamples: []string{
Expand All @@ -344,6 +348,7 @@ func main() {
{
IdxName: "BinFlat",
IdxType: entity.BinFlat,
FieldName: "vec_field",
VectorSupport: int8(binaryVectorSupport),
ConstructParams: []idxParam{
{
Expand Down Expand Up @@ -374,8 +379,9 @@ func main() {
},
// IVF_FLAT
{
IdxName: "IvfFlat",
IdxType: entity.IvfFlat,
IdxName: "IvfFlat",
IdxType: entity.IvfFlat,
FieldName: "vec_field",
ConstructParams: []idxParam{
{
Name: "nlist",
Expand Down Expand Up @@ -407,6 +413,7 @@ func main() {
{
IdxName: "BinIvfFlat",
IdxType: entity.BinIvfFlat,
FieldName: "vec_field",
VectorSupport: int8(binaryVectorSupport),
ConstructParams: []idxParam{
{
Expand Down Expand Up @@ -437,8 +444,9 @@ func main() {
},
// IVF_SQ8
{
IdxName: "IvfSQ8",
IdxType: entity.IvfSQ8,
IdxName: "IvfSQ8",
IdxType: entity.IvfSQ8,
FieldName: "vec_field",
ConstructParams: []idxParam{
{
Name: "nlist",
Expand Down Expand Up @@ -468,8 +476,9 @@ func main() {
},
// IVF_PQ
{
IdxName: "IvfPQ",
IdxType: entity.IvfPQ,
IdxName: "IvfPQ",
IdxType: entity.IvfPQ,
FieldName: "vec_field",
ConstructParams: []idxParam{
{
Name: "nlist",
Expand Down Expand Up @@ -509,8 +518,9 @@ func main() {
},
// HNSW
{
IdxName: "HNSW",
IdxType: entity.HNSW,
IdxName: "HNSW",
IdxType: entity.HNSW,
FieldName: "vec_field",
ConstructParams: []idxParam{
{
Name: "M",
Expand Down Expand Up @@ -546,8 +556,9 @@ func main() {
},
// IVF_HNSW
{
IdxName: "IvfHNSW",
IdxType: entity.IvfHNSW,
IdxName: "IvfHNSW",
IdxType: entity.IvfHNSW,
FieldName: "vec_field",
ConstructParams: []idxParam{
{
Name: "nlist",
Expand Down Expand Up @@ -596,6 +607,7 @@ func main() {
{
IdxName: "DISKANN",
IdxType: entity.DISKANN,
FieldName: "vec_field",
ConstructParams: []idxParam{},
SearchParams: []idxParam{
{
Expand All @@ -618,6 +630,7 @@ func main() {
{
IdxName: "AUTOINDEX",
IdxType: entity.AUTOINDEX,
FieldName: "vec_field",
ConstructParams: []idxParam{},
SearchParams: []idxParam{
{
Expand All @@ -639,8 +652,9 @@ func main() {
},
},
{
IdxName: "GPUIvfFlat",
IdxType: entity.GPUIvfFlat,
IdxName: "GPUIvfFlat",
IdxType: entity.GPUIvfFlat,
FieldName: "vec_field",
ConstructParams: []idxParam{
{
Name: "nlist",
Expand Down Expand Up @@ -669,8 +683,9 @@ func main() {
},
},
{
IdxName: "GPUIvfPQ",
IdxType: entity.GPUIvfPQ,
IdxName: "GPUIvfPQ",
IdxType: entity.GPUIvfPQ,
FieldName: "vec_field",
ConstructParams: []idxParam{
{
Name: "nlist",
Expand Down Expand Up @@ -709,8 +724,9 @@ func main() {
},
},
{
IdxName: "SCANN",
IdxType: entity.SCANN,
IdxName: "SCANN",
IdxType: entity.IvfFlat,
FieldName: "vec_field",
ConstructParams: []idxParam{
{
Name: "nlist",
Expand Down
18 changes: 13 additions & 5 deletions entity/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ type Index interface {
Name() string
IndexType() IndexType
Params() map[string]string
FieldName() string
}

// SearchParam interface for index related search param
Expand Down Expand Up @@ -104,8 +105,9 @@ func newBaseSearchParams() baseSearchParams {
}

type baseIndex struct {
it IndexType
name string
it IndexType
name string
fieldName string
}

// Name implements Index
Expand All @@ -118,6 +120,11 @@ func (b baseIndex) IndexType() IndexType {
return b.it
}

// FieldName implements Index
func (b baseIndex) FieldName() string {
return b.fieldName
}

// GenericIndex index struct for general usage
// no constraint for index is applied
type GenericIndex struct {
Expand All @@ -138,11 +145,12 @@ func (gi GenericIndex) Params() map[string]string {
}

// NewGenericIndex create generic index instance
func NewGenericIndex(name string, it IndexType, params map[string]string) Index {
func NewGenericIndex(name string, it IndexType, fieldName string, params map[string]string) Index {
return GenericIndex{
baseIndex: baseIndex{
it: it,
name: name,
it: it,
name: name,
fieldName: fieldName,
},
params: params,
}
Expand Down
2 changes: 1 addition & 1 deletion entity/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
func TestGenericIndex(t *testing.T) {
rand.Seed(time.Now().UnixNano())
name := fmt.Sprintf("generic_index_%d", rand.Int())
gi := NewGenericIndex(name, IvfFlat, map[string]string{
gi := NewGenericIndex(name, IvfFlat, "field", map[string]string{
tMetricType: string(IP),
})
assert.Equal(t, name, gi.Name())
Expand Down
Loading

0 comments on commit 3e24f79

Please sign in to comment.