Skip to content

Commit

Permalink
Impl Bulkload APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
wayblink committed Aug 25, 2022
1 parent cb2e834 commit ffccdd3
Show file tree
Hide file tree
Showing 8 changed files with 6,361 additions and 11,181 deletions.
2 changes: 1 addition & 1 deletion .gitmodules
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[submodule "internal/milvus-proto"]
path = internal/milvus-proto
url = [email protected]:milvus-io/milvus-proto.git
url = [email protected]:wayblink/milvus-proto.git
7 changes: 7 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,13 @@ type Client interface {
GetCompactionState(ctx context.Context, id int64) (entity.CompactionState, error)
// GetCompactionStateWithPlans get compaction state with plans of provided compaction id
GetCompactionStateWithPlans(ctx context.Context, id int64) (entity.CompactionState, []entity.CompactionPlan, error)

// Import data files(json, numpy, etc.) on MinIO/S3 storage, read and parse them into sealed segments
Import(ctx context.Context, collName string, partitionName string, channelNames []string, rowBased bool, files []string, options map[string]string) ([]int64, error)
// GetImportState checks import task state
GetImportState(ctx context.Context, taskId int64) (*entity.ImportTaskState, error)
// ListImportTasks list state of all import tasks
ListImportTasks(ctx context.Context, collName string, limit int64) ([]*entity.ImportTaskState, error)
}

// SearchResult contains the result from Search api of client
Expand Down
84 changes: 84 additions & 0 deletions client/client_grpc_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,90 @@ func (c *grpcClient) CalcDistance(ctx context.Context, collName string, partitio
return nil, errors.New("distance field not supported")
}

// Import data files(json, numpy, etc.) on MinIO/S3 storage, read and parse them into sealed segments
func (c *grpcClient) Import(ctx context.Context, collName string, partitionName string, channelNames []string, rowBased bool, files []string, options map[string]string) ([]int64, error) {
if c.service == nil {
return []int64{}, ErrClientNotReady
}
req := &server.ImportRequest{
CollectionName: collName,
PartitionName: partitionName,
ChannelNames: channelNames,
RowBased: rowBased,
Files: files,
Options: entity.MapKvPairs(options),
}
resp, err := c.service.Import(ctx, req)
if err != nil {
return []int64{}, err
}
if err := handleRespStatus(resp.GetStatus()); err != nil {
return []int64{}, err
}

return resp.Tasks, nil
}

// GetImportState checks import task state
func (c *grpcClient) GetImportState(ctx context.Context, taskId int64) (*entity.ImportTaskState, error) {
if c.service == nil {
return nil, ErrClientNotReady
}
req := &server.GetImportStateRequest{
Task: taskId,
}
resp, err := c.service.GetImportState(ctx, req)
if err != nil {
return nil, err
}
if err := handleRespStatus(resp.GetStatus()); err != nil {
return nil, err
}

return &entity.ImportTaskState{
Id: resp.GetId(),
State: entity.ImportState(resp.GetState()),
RowCount: resp.GetRowCount(),
IdList: resp.GetIdList(),
Infos: entity.KvPairsMap(resp.GetInfos()),
// CollectionId int64
// SegmentIds []int64
}, nil
}

// ListImportTasks list state of all import tasks
func (c *grpcClient) ListImportTasks(ctx context.Context, collName string, limit int64) ([]*entity.ImportTaskState, error) {
if c.service == nil {
return nil, ErrClientNotReady
}
req := &server.ListImportTasksRequest{
CollectionName: collName,
Limit: limit,
}
resp, err := c.service.ListImportTasks(ctx, req)
if err != nil {
return nil, err
}
if err := handleRespStatus(resp.GetStatus()); err != nil {
return nil, err
}

tasks := make([]*entity.ImportTaskState, 0)
for _, task := range resp.GetTasks() {
tasks = append(tasks, &entity.ImportTaskState{
Id: task.GetId(),
State: entity.ImportState(task.GetState()),
RowCount: task.GetRowCount(),
IdList: task.GetIdList(),
Infos: entity.KvPairsMap(task.GetInfos()),
CollectionId: task.GetCollectionId(),
SegmentIds: task.GetSegmentIds(),
})
}

return tasks, nil
}

func columnToVectorsArray(collName string, partitions []string, column entity.Column) *server.VectorsArray {
result := &server.VectorsArray{}
switch column.Type() {
Expand Down
24 changes: 24 additions & 0 deletions entity/import.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package entity

type ImportState int32

const (
ImportState_ImportPending ImportState = 0
ImportState_ImportFailed ImportState = 1
ImportState_ImportStarted ImportState = 2
ImportState_ImportDownloaded ImportState = 3
ImportState_ImportParsed ImportState = 4
ImportState_ImportPersisted ImportState = 5
ImportState_ImportCompleted ImportState = 6
ImportState_ImportAllocSegment ImportState = 10
)

type ImportTaskState struct {
Id int64
State ImportState
RowCount int64
IdList []int64
Infos map[string]string
CollectionId int64
SegmentIds []int64
}
2 changes: 1 addition & 1 deletion internal/milvus-proto
Loading

0 comments on commit ffccdd3

Please sign in to comment.