Skip to content

Commit

Permalink
fix position serde
Browse files Browse the repository at this point in the history
  • Loading branch information
wayblink committed Nov 30, 2023
1 parent 3e24f79 commit af8aa51
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 9 deletions.
2 changes: 1 addition & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ type Client interface {
Flush(ctx context.Context, collName string, async bool) error
// FlushV2 flush collection, specified, return newly sealed segmentIds, all flushed segmentIds of the collection, seal time and error
// currently it is only used in milvus-backup(https://github.com/zilliztech/milvus-backup)
FlushV2(ctx context.Context, collName string, async bool) ([]int64, []int64, int64, map[string]entity.MsgPosition, error)
FlushV2(ctx context.Context, collName string, async bool) ([]int64, []int64, int64, map[string]string, error)
// DeleteByPks deletes entries related to provided primary keys
DeleteByPks(ctx context.Context, collName string, partitionName string, ids entity.Column) error
// Delete deletes entries match expression
Expand Down
20 changes: 12 additions & 8 deletions client/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package client

import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"time"
Expand Down Expand Up @@ -197,7 +198,7 @@ func (c *GrpcClient) Flush(ctx context.Context, collName string, async bool) err

// Flush force collection to flush memory records into storage
// in sync mode, flush will wait all segments to be flushed
func (c *GrpcClient) FlushV2(ctx context.Context, collName string, async bool) ([]int64, []int64, int64, map[string]entity.MsgPosition, error) {
func (c *GrpcClient) FlushV2(ctx context.Context, collName string, async bool) ([]int64, []int64, int64, map[string]string, error) {
if c.Service == nil {
return nil, nil, 0, nil, ErrClientNotReady
}
Expand Down Expand Up @@ -245,18 +246,21 @@ func (c *GrpcClient) FlushV2(ctx context.Context, collName string, async bool) (
}
}
}
channelCPEntities := make(map[string]entity.MsgPosition, len(channelCPs))
channelCPEntities := make(map[string]string, len(channelCPs))
for k, v := range channelCPs {
channelCPEntities[k] = entity.MsgPosition{
ChannelName: v.GetChannelName(),
MsgID: v.GetMsgID(),
MsgGroup: v.GetMsgGroup(),
Timestamp: v.GetTimestamp(),
}
channelCPEntities[k] = Base64MsgPosition(v)
}
return resp.GetCollSegIDs()[collName].GetData(), resp.GetFlushCollSegIDs()[collName].GetData(), resp.GetCollSealTimes()[collName], channelCPEntities, nil
}

func Base64MsgPosition(position *milvuspb.MsgPosition) string {
positionByte, err := proto.Marshal(position)
if err != nil {
return ""
}
return base64.StdEncoding.EncodeToString(positionByte)
}

// DeleteByPks deletes entries related to provided primary keys
func (c *GrpcClient) DeleteByPks(ctx context.Context, collName string, partitionName string, ids entity.Column) error {
if c.Service == nil {
Expand Down

0 comments on commit af8aa51

Please sign in to comment.