Skip to content

Commit

Permalink
update flush, add two return parameters
Browse files Browse the repository at this point in the history
Signed-off-by: wayblink <[email protected]>
  • Loading branch information
wayblink committed Oct 8, 2022
1 parent a4f6dcc commit abd3517
Show file tree
Hide file tree
Showing 11 changed files with 26 additions and 20 deletions.
2 changes: 1 addition & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ type Client interface {
// Insert column-based data into collection, returns id column values
Insert(ctx context.Context, collName string, partitionName string, columns ...entity.Column) (entity.Column, error)
// Flush flush collection, specified
Flush(ctx context.Context, collName string, async bool) error
Flush(ctx context.Context, collName string, async bool) ([]int64, int64, error)
// DeleteByPks deletes entries related to provided primary keys
DeleteByPks(ctx context.Context, collName string, partitionName string, ids entity.Column) error
// Search search with bool expression
Expand Down
16 changes: 9 additions & 7 deletions client/client_grpc_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,23 +122,25 @@ func (c *grpcClient) Insert(ctx context.Context, collName string, partitionName

// Flush force collection to flush memory records into storage
// in sync mode, flush will wait all segments to be flushed
func (c *grpcClient) Flush(ctx context.Context, collName string, async bool) error {
// return all sealed segmentIds of the collection, seal time and error
// see https://github.com/milvus-io/milvus/issues/18887 for detail
func (c *grpcClient) Flush(ctx context.Context, collName string, async bool) ([]int64, int64, error) {
if c.service == nil {
return ErrClientNotReady
return nil, 0, ErrClientNotReady
}
if err := c.checkCollectionExists(ctx, collName); err != nil {
return err
return nil, 0, err
}
req := &server.FlushRequest{
DbName: "", // reserved,
CollectionNames: []string{collName},
}
resp, err := c.service.Flush(ctx, req)
if err != nil {
return err
return nil, 0, err
}
if err := handleRespStatus(resp.GetStatus()); err != nil {
return err
return nil, 0, err
}
if !async {
segmentIDs, has := resp.GetCollSegIDs()[collName]
Expand All @@ -158,14 +160,14 @@ func (c *grpcClient) Flush(ctx context.Context, collName string, async bool) err
// respect context deadline/cancel
select {
case <-ctx.Done():
return errors.New("deadline exceeded")
return nil, 0, errors.New("deadline exceeded")
default:
}
time.Sleep(200 * time.Millisecond)
}
}
}
return nil
return resp.GetFlushCollSegIDs()[collName].GetData(), resp.GetCollSealTimes()[collName], nil
}

// DeleteByPks deletes entries related to provided primary keys
Expand Down
9 changes: 6 additions & 3 deletions client/client_grpc_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ func TestGrpcClientFlush(t *testing.T) {
c := testClient(ctx, t)

t.Run("test async flush", func(t *testing.T) {
assert.Nil(t, c.Flush(ctx, testCollectionName, true))
_, _, err := c.Flush(ctx, testCollectionName, true)
assert.NoError(t, err)
})

t.Run("test sync flush", func(t *testing.T) {
Expand Down Expand Up @@ -165,14 +166,16 @@ func TestGrpcClientFlush(t *testing.T) {
resp.Status = s
return resp, err
})
assert.Nil(t, c.Flush(ctx, testCollectionName, false))
_, _, err := c.Flush(ctx, testCollectionName, false)
assert.NoError(t, err)
assert.True(t, flag)

start = time.Now()
flag = false
quickCtx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
defer cancel()
assert.NotNil(t, c.Flush(quickCtx, testCollectionName, false))
_, _, err = c.Flush(quickCtx, testCollectionName, false)
assert.NotNil(t, err)
})
}

Expand Down
2 changes: 1 addition & 1 deletion client/client_grpc_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (c *grpcClient) CreateIndex(ctx context.Context, collName string, fieldName
return err
}

flushErr := c.Flush(ctx, collName, true)
_, _, flushErr := c.Flush(ctx, collName, true)
if flushErr != nil {
return flushErr
}
Expand Down
2 changes: 1 addition & 1 deletion examples/auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func main() {
}
log.Println("insert completed")

err = c.Flush(ctx, collectionName, false)
_, _, err = c.Flush(ctx, collectionName, false)
if err != nil {
log.Fatal("failed to flush collection:", err.Error())
}
Expand Down
2 changes: 1 addition & 1 deletion examples/calcdistance/calc_distance.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func main() {
log.Println("insert completed")
ctx, cancel = context.WithTimeout(context.Background(), time.Second*120)
defer cancel()
err = c.Flush(ctx, collectionName, false)
_, _, err = c.Flush(ctx, collectionName, false)
if err != nil {
log.Fatal("failed to flush collection:", err.Error())
}
Expand Down
4 changes: 2 additions & 2 deletions examples/hello_milvus/hello_milvus.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ func main() {
if _, err := c.Insert(ctx, collectionName, "", idColData, randomColData, embeddingColData); err != nil {
log.Fatalf("failed to insert random data into `hello_milvus, err: %v", err)
}

if err := c.Flush(ctx, collectionName, false); err != nil {
_, _, err = c.Flush(ctx, collectionName, false)
if err != nil {
log.Fatalf("failed to flush data, err: %v", err)
}

Expand Down
2 changes: 1 addition & 1 deletion examples/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func main() {
log.Println("insert completed")
ctx, cancel = context.WithTimeout(context.Background(), time.Second*120)
defer cancel()
err = c.Flush(ctx, collectionName, false)
_, _, err = c.Flush(ctx, collectionName, false)
if err != nil {
log.Fatal("failed to flush collection:", err.Error())
}
Expand Down
2 changes: 1 addition & 1 deletion examples/insert/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func main() {
log.Println("insert completed")
ctx, cancel = context.WithTimeout(context.Background(), time.Second*120)
defer cancel()
err = c.Flush(ctx, collectionName, false)
_, _, err = c.Flush(ctx, collectionName, false)
if err != nil {
log.Fatal("failed to flush collection:", err.Error())
}
Expand Down
2 changes: 1 addition & 1 deletion examples/tls/tls.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func main() {
log.Println("insert completed")
ctx, cancel = context.WithTimeout(context.Background(), time.Second*120)
defer cancel()
err = c.Flush(ctx, collectionName, false)
_, _, err = c.Flush(ctx, collectionName, false)
if err != nil {
log.Fatal("failed to flush collection:", err.Error())
}
Expand Down
3 changes: 2 additions & 1 deletion examples/varchar/varchar_pk.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ func main() {
log.Fatalf("failed to insert random data into `hello_milvus, err: %v", err)
}

if err := c.Flush(ctx, collectionName, false); err != nil {
_, _, err = c.Flush(ctx, collectionName, false)
if err != nil {
log.Fatalf("failed to flush data, err: %v", err)
}

Expand Down

0 comments on commit abd3517

Please sign in to comment.