From ba0ee5411fb408efe484a05f0cac6ba1f97a664f Mon Sep 17 00:00:00 2001 From: chuang8511 Date: Mon, 9 Dec 2024 13:08:41 +0000 Subject: [PATCH 01/11] feat: support batch upsert --- .../data/pinecone/v0/component_test.go | 16 ---- pkg/component/data/pinecone/v0/io.go | 13 +++ pkg/component/data/pinecone/v0/main.go | 50 +++++------ pkg/component/data/pinecone/v0/structs.go | 9 -- pkg/component/data/pinecone/v0/task_query.go | 12 +++ pkg/component/data/pinecone/v0/task_rerank.go | 12 +++ pkg/component/data/pinecone/v0/task_upsert.go | 86 +++++++++++++++++++ 7 files changed, 145 insertions(+), 53 deletions(-) create mode 100644 pkg/component/data/pinecone/v0/io.go create mode 100644 pkg/component/data/pinecone/v0/task_query.go create mode 100644 pkg/component/data/pinecone/v0/task_rerank.go create mode 100644 pkg/component/data/pinecone/v0/task_upsert.go diff --git a/pkg/component/data/pinecone/v0/component_test.go b/pkg/component/data/pinecone/v0/component_test.go index ab08fa516..9c53a34ff 100644 --- a/pkg/component/data/pinecone/v0/component_test.go +++ b/pkg/component/data/pinecone/v0/component_test.go @@ -24,8 +24,6 @@ const ( namespace = "pantone" threshold = 0.9 - upsertOK = `{"upsertedCount": 1}` - queryOK = ` { "namespace": "color-schemes", @@ -105,20 +103,6 @@ func TestComponent_Execute(t *testing.T) { wantClientReq any clientResp string }{ - { - name: "ok - upsert", - - task: taskUpsert, - execIn: upsertInput{ - vector: vectorA, - Namespace: namespace, - }, - wantExec: upsertOutput{RecordsUpserted: 1}, - - wantClientPath: upsertPath, - wantClientReq: upsertReq{Vectors: []vector{vectorA}, Namespace: namespace}, - clientResp: upsertOK, - }, { name: "ok - query by vector", diff --git a/pkg/component/data/pinecone/v0/io.go b/pkg/component/data/pinecone/v0/io.go new file mode 100644 index 000000000..7d1e0a873 --- /dev/null +++ b/pkg/component/data/pinecone/v0/io.go @@ -0,0 +1,13 @@ +// TASK_QUERY and TASK_RERANK are not refactored yet, they will be addressed in ins-7102 +package pinecone + +type taskUpsertInput struct { + id string `instill:"id"` + metadata string `instill:"metadata"` + values []float64 `instill:"values"` + namespace string `instill:"namespace"` +} + +type taskUpsertOutput struct { + upsertedCount int64 `instill:"upserted-count"` +} diff --git a/pkg/component/data/pinecone/v0/main.go b/pkg/component/data/pinecone/v0/main.go index 929e0c54a..b5ab289ce 100644 --- a/pkg/component/data/pinecone/v0/main.go +++ b/pkg/component/data/pinecone/v0/main.go @@ -42,8 +42,11 @@ type component struct { type execution struct { base.ComponentExecution + + execute func(context.Context, *base.Job) error } +// Init initializes the component and loads the definition, setup, and tasks. func Init(bc base.Component) *component { once.Do(func() { comp = &component{Component: bc} @@ -55,10 +58,23 @@ func Init(bc base.Component) *component { return comp } +// CreateExecution creates a new execution for the component. func (c *component) CreateExecution(x base.ComponentExecution) (base.IExecution, error) { - return &execution{ + e := &execution{ ComponentExecution: x, - }, nil + } + + switch x.Task { + case taskQuery: + e.execute = e.query + case taskRerank: + e.execute = e.rerank + // Now, only upsert task is refactored, the rest will be addressed in ins-7102 + case taskUpsert: + e.execute = e.upsert + } + + return e, nil } // newIndexClient creates a new httpclient.Client with the index URL provided in setup @@ -100,6 +116,10 @@ func getURL(setup *structpb.Struct) string { } func (e *execution) Execute(ctx context.Context, jobs []*base.Job) error { + // TODO: We will need to migrate other tasks to use the new logic. + if e.Task == taskUpsert { + return base.ConcurrentExecutor(ctx, jobs, e.execute) + } for _, job := range jobs { input, err := job.Input.Read(ctx) @@ -141,32 +161,6 @@ func (e *execution) Execute(ctx context.Context, jobs []*base.Job) error { job.Error.Error(ctx, err) continue } - case taskUpsert: - req := newIndexClient(e.Setup, e.GetLogger()).R() - - v := upsertInput{} - err := base.ConvertFromStructpb(input, &v) - if err != nil { - job.Error.Error(ctx, err) - continue - } - - resp := upsertResp{} - req.SetResult(&resp).SetBody(upsertReq{ - Vectors: []vector{v.vector}, - Namespace: v.Namespace, - }) - - if _, err := req.Post(upsertPath); err != nil { - job.Error.Error(ctx, httpclient.WrapURLError(err)) - continue - } - - output, err = base.ConvertToStructpb(upsertOutput(resp)) - if err != nil { - job.Error.Error(ctx, err) - continue - } case taskRerank: // rerank task does not need index URL, so using the base client with the default pinecone API URL. req := newBaseClient(e.Setup, e.GetLogger()).R() diff --git a/pkg/component/data/pinecone/v0/structs.go b/pkg/component/data/pinecone/v0/structs.go index 97b0006c9..ecfccbc8f 100644 --- a/pkg/component/data/pinecone/v0/structs.go +++ b/pkg/component/data/pinecone/v0/structs.go @@ -64,11 +64,6 @@ type upsertReq struct { Namespace string `json:"namespace,omitempty"` } -type upsertInput struct { - vector - Namespace string `json:"namespace"` -} - type vector struct { ID string `json:"id"` Values []float64 `json:"values,omitempty"` @@ -79,10 +74,6 @@ type upsertResp struct { RecordsUpserted int64 `json:"upsertedCount"` } -type upsertOutput struct { - RecordsUpserted int64 `json:"upserted-count"` -} - type Document struct { Text string `json:"text"` } diff --git a/pkg/component/data/pinecone/v0/task_query.go b/pkg/component/data/pinecone/v0/task_query.go new file mode 100644 index 000000000..5b71e68cf --- /dev/null +++ b/pkg/component/data/pinecone/v0/task_query.go @@ -0,0 +1,12 @@ +// TODO: Implement the query task, it will be addressed in ins-7102 +package pinecone + +import ( + "context" + + "github.com/instill-ai/pipeline-backend/pkg/component/base" +) + +func (e *execution) query(ctx context.Context, job *base.Job) error { + return nil +} diff --git a/pkg/component/data/pinecone/v0/task_rerank.go b/pkg/component/data/pinecone/v0/task_rerank.go new file mode 100644 index 000000000..4d959deeb --- /dev/null +++ b/pkg/component/data/pinecone/v0/task_rerank.go @@ -0,0 +1,12 @@ +// TODO: Implement the rerank task, it will be addressed in ins-7102 +package pinecone + +import ( + "context" + + "github.com/instill-ai/pipeline-backend/pkg/component/base" +) + +func (e *execution) rerank(ctx context.Context, job *base.Job) error { + return nil +} diff --git a/pkg/component/data/pinecone/v0/task_upsert.go b/pkg/component/data/pinecone/v0/task_upsert.go new file mode 100644 index 000000000..3f70f015b --- /dev/null +++ b/pkg/component/data/pinecone/v0/task_upsert.go @@ -0,0 +1,86 @@ +package pinecone + +import ( + "context" + + "github.com/instill-ai/pipeline-backend/pkg/component/base" +) + +func (e *execution) upsert(ctx context.Context, job *base.Job) error { + + input := &taskUpsertInput{} + if err := job.Input.ReadData(ctx, input); err != nil { + return err + } + + req := newIndexClient(e.Setup, e.GetLogger()).R() + + upsertReq := convertInput(input) + + resp := upsertResp{} + + req.SetResult(&resp).SetBody(upsertReq) + + if _, err := req.Post(upsertPath); err != nil { + return err + } + + output := convertOutput(resp) + + if err := job.Output.WriteData(ctx, output); err != nil { + return err + } + + return nil +} + +func convertInput(input *taskUpsertInput) upsertReq { + + upsertReq := upsertReq{ + Vectors: []vector{}, + Namespace: input.namespace, + } + + for _, v := range input.values { + upsertReq.Vectors = append(upsertReq.Vectors, vector{ + ID: input.id, + Values: []float64{v}, + Metadata: input.metadata, + }) + } + + return upsertReq +} + +func convertOutput(resp upsertResp) *taskUpsertOutput { + return &taskUpsertOutput{ + upsertedCount: resp.RecordsUpserted, + } +} + +// case taskUpsert: +// req := newIndexClient(e.Setup, e.GetLogger()).R() + +// v := upsertInput{} +// err := base.ConvertFromStructpb(input, &v) +// if err != nil { +// job.Error.Error(ctx, err) +// continue +// } + +// resp := upsertResp{} +// req.SetResult(&resp).SetBody(upsertReq{ +// Vectors: []vector{v.vector}, +// Namespace: v.Namespace, +// }) + +// if _, err := req.Post(upsertPath); err != nil { +// job.Error.Error(ctx, httpclient.WrapURLError(err)) +// continue +// } + +// output, err = base.ConvertToStructpb(upsertOutput(resp)) +// if err != nil { +// job.Error.Error(ctx, err) +// continue +// } From ae78f39b7ba8f52eca16eae90f2033034edaca04 Mon Sep 17 00:00:00 2001 From: chuang8511 Date: Mon, 9 Dec 2024 13:09:00 +0000 Subject: [PATCH 02/11] feat: support batch upsert --- pkg/component/data/pinecone/v0/task_upsert.go | 27 ------------------- 1 file changed, 27 deletions(-) diff --git a/pkg/component/data/pinecone/v0/task_upsert.go b/pkg/component/data/pinecone/v0/task_upsert.go index 3f70f015b..c5bf7344d 100644 --- a/pkg/component/data/pinecone/v0/task_upsert.go +++ b/pkg/component/data/pinecone/v0/task_upsert.go @@ -57,30 +57,3 @@ func convertOutput(resp upsertResp) *taskUpsertOutput { upsertedCount: resp.RecordsUpserted, } } - -// case taskUpsert: -// req := newIndexClient(e.Setup, e.GetLogger()).R() - -// v := upsertInput{} -// err := base.ConvertFromStructpb(input, &v) -// if err != nil { -// job.Error.Error(ctx, err) -// continue -// } - -// resp := upsertResp{} -// req.SetResult(&resp).SetBody(upsertReq{ -// Vectors: []vector{v.vector}, -// Namespace: v.Namespace, -// }) - -// if _, err := req.Post(upsertPath); err != nil { -// job.Error.Error(ctx, httpclient.WrapURLError(err)) -// continue -// } - -// output, err = base.ConvertToStructpb(upsertOutput(resp)) -// if err != nil { -// job.Error.Error(ctx, err) -// continue -// } From 8619c0e0edbfcae361f52c4a4549999ebd2aa86b Mon Sep 17 00:00:00 2001 From: chuang8511 Date: Mon, 9 Dec 2024 13:30:32 +0000 Subject: [PATCH 03/11] fix: fix bug --- pkg/component/data/pinecone/v0/io.go | 10 +++--- pkg/component/data/pinecone/v0/task_upsert.go | 33 ++++++++++++------- 2 files changed, 26 insertions(+), 17 deletions(-) diff --git a/pkg/component/data/pinecone/v0/io.go b/pkg/component/data/pinecone/v0/io.go index 7d1e0a873..7e16cc785 100644 --- a/pkg/component/data/pinecone/v0/io.go +++ b/pkg/component/data/pinecone/v0/io.go @@ -2,12 +2,12 @@ package pinecone type taskUpsertInput struct { - id string `instill:"id"` - metadata string `instill:"metadata"` - values []float64 `instill:"values"` - namespace string `instill:"namespace"` + ID string `instill:"id"` + Metadata string `instill:"metadata"` + Values []float64 `instill:"values"` + Namespace string `instill:"namespace"` } type taskUpsertOutput struct { - upsertedCount int64 `instill:"upserted-count"` + UpsertedCount int64 `instill:"upserted-count"` } diff --git a/pkg/component/data/pinecone/v0/task_upsert.go b/pkg/component/data/pinecone/v0/task_upsert.go index c5bf7344d..1fc5a4e02 100644 --- a/pkg/component/data/pinecone/v0/task_upsert.go +++ b/pkg/component/data/pinecone/v0/task_upsert.go @@ -2,14 +2,17 @@ package pinecone import ( "context" + "fmt" "github.com/instill-ai/pipeline-backend/pkg/component/base" ) func (e *execution) upsert(ctx context.Context, job *base.Job) error { - input := &taskUpsertInput{} - if err := job.Input.ReadData(ctx, input); err != nil { + input := taskUpsertInput{} + if err := job.Input.ReadData(ctx, &input); err != nil { + err = fmt.Errorf("reading input data: %w", err) + job.Error.Error(ctx, err) return err } @@ -20,40 +23,46 @@ func (e *execution) upsert(ctx context.Context, job *base.Job) error { resp := upsertResp{} req.SetResult(&resp).SetBody(upsertReq) - if _, err := req.Post(upsertPath); err != nil { + err = fmt.Errorf("upserting vectors: %w", err) + job.Error.Error(ctx, err) return err } output := convertOutput(resp) if err := job.Output.WriteData(ctx, output); err != nil { + err = fmt.Errorf("writing output data: %w", err) + job.Error.Error(ctx, err) return err } return nil } -func convertInput(input *taskUpsertInput) upsertReq { +func convertInput(input taskUpsertInput) upsertReq { upsertReq := upsertReq{ Vectors: []vector{}, - Namespace: input.namespace, + Namespace: input.Namespace, + } + + vector := vector{ + ID: input.ID, + Values: input.Values, } - for _, v := range input.values { - upsertReq.Vectors = append(upsertReq.Vectors, vector{ - ID: input.id, - Values: []float64{v}, - Metadata: input.metadata, - }) + if input.Metadata != "" { + vector.Metadata = input.Metadata } + upsertReq.Vectors = append(upsertReq.Vectors, vector) + return upsertReq } func convertOutput(resp upsertResp) *taskUpsertOutput { return &taskUpsertOutput{ - upsertedCount: resp.RecordsUpserted, + UpsertedCount: resp.RecordsUpserted, } } From dd647ad34a145865171225de776bb8f86ee1b8fe Mon Sep 17 00:00:00 2001 From: chuang8511 Date: Mon, 9 Dec 2024 23:08:35 +0000 Subject: [PATCH 04/11] feat: add batch upsert function for pinecone --- pkg/component/data/pinecone/v0/README.mdx | 29 ++++++ .../data/pinecone/v0/config/definition.json | 1 + .../data/pinecone/v0/config/tasks.json | 99 +++++++++++++++++++ 3 files changed, 129 insertions(+) diff --git a/pkg/component/data/pinecone/v0/README.mdx b/pkg/component/data/pinecone/v0/README.mdx index 778b87091..10b13f7a3 100644 --- a/pkg/component/data/pinecone/v0/README.mdx +++ b/pkg/component/data/pinecone/v0/README.mdx @@ -9,6 +9,7 @@ The Pinecone component is a data component that allows users to build and search It can carry out the following tasks: - [Query](#query) - [Upsert](#upsert) +- [Batch Upsert](#batch-upsert) - [Rerank](#rerank) @@ -120,6 +121,34 @@ Writes vectors into a namespace. If a new value is upserted for an existing vect +
+ +| Output | ID | Type | Description | +| :--- | :--- | :--- | :--- | +| Upserted Count | `upserted-count` | integer | Number of records modified or added. | +
+ + +### Batch Upsert + +Writes vectors into a namespace in batch. If a new value is upserted for an existing vector id, it will overwrite the previous value. + +
+ +| Input | ID | Type | Description | +| :--- | :--- | :--- | :--- | +| Task ID (required) | `task` | string | `TASK_BATCH_UPSERT` | +| IDs (required) | `ids` | array[string] | The ids of the vectors to upsert. | +| [Array of Metadata](#batch-upsert-array-of-metadata) | `array-metadata` | array[object] | The metadata of the vectors to upsert. | +| Array of Values (required) | `array-values` | array[array] | The values of the vectors to upsert. The type will be array of array of number. | +| Namespace | `namespace` | string | The namespace to upsert the vectors into. | +
+ + + + + +
| Output | ID | Type | Description | diff --git a/pkg/component/data/pinecone/v0/config/definition.json b/pkg/component/data/pinecone/v0/config/definition.json index 947638a77..a35bd3595 100644 --- a/pkg/component/data/pinecone/v0/config/definition.json +++ b/pkg/component/data/pinecone/v0/config/definition.json @@ -2,6 +2,7 @@ "availableTasks": [ "TASK_QUERY", "TASK_UPSERT", + "TASK_BATCH_UPSERT", "TASK_RERANK" ], "custom": false, diff --git a/pkg/component/data/pinecone/v0/config/tasks.json b/pkg/component/data/pinecone/v0/config/tasks.json index d5fb839b6..c6906aecb 100644 --- a/pkg/component/data/pinecone/v0/config/tasks.json +++ b/pkg/component/data/pinecone/v0/config/tasks.json @@ -292,6 +292,105 @@ "type": "object" } }, + "TASK_BATCH_UPSERT": { + "instillShortDescription": "Writes vectors into a namespace in batch. If a new value is upserted for an existing vector id, it will overwrite the previous value.", + "input": { + "instillUIOrder": 0, + "properties": { + "ids": { + "description": "The ids of the vectors to upsert.", + "instillUIOrder": 0, + "instillFormat": "array:string", + "instillUpstreamTypes": [ + "value", + "reference" + ], + "items": { + "type": "string" + }, + "minItems": 1, + "title": "IDs", + "type": "array" + }, + "array-metadata": { + "instillShortDescription": "The metadata of the vectors to upsert", + "description": "The metadata of the vectors to upsert.", + "instillUIOrder": 1, + "instillUpstreamTypes": [ + "reference", + "value" + ], + "items": { + "type": "object" + }, + "instillFormat": "array:semi-structured/object", + "minItems": 1, + "title": "Array of Metadata", + "type": "array" + }, + "array-values": { + "description": "The values of the vectors to upsert. The type will be array of array of number.", + "instillUIOrder": 2, + "instillUpstreamTypes": [ + "reference" + ], + "items": { + "instillFormat": "array:number", + "instillAcceptFormats": [ + "array:number", + "array:integer" + ], + "type": "array" + }, + "instillFormat": "array:array:number", + "instillAcceptFormats": [ + "array:array:number", + "array:array:integer" + ], + "minItems": 1, + "title": "Array of Values", + "type": "array" + }, + "namespace": { + "description": "The namespace to upsert the vectors into.", + "instillAcceptFormats": [ + "string" + ], + "instillUIOrder": 3, + "instillUpstreamTypes": [ + "value", + "reference", + "template" + ], + "title": "Namespace", + "type": "string" + } + }, + "required": [ + "ids", + "array-values" + ], + "title": "Input", + "type": "object" + }, + "output": { + "instillUIOrder": 1, + "properties": { + "upserted-count": { + "description": "Number of records modified or added.", + "instillFormat": "integer", + "instillUIOrder": 0, + "title": "Upserted Count", + "type": "integer" + } + }, + "required": [ + "upserted-count" + ], + "title": "Output", + "type": "object" + } + }, "TASK_RERANK": { "instillShortDescription": "Rerank documents, such as text passages, according to their relevance to a query.", "description": "Rerank documents, such as text passages, according to their relevance to a query. The input is a list of documents and a query. The output is a list of documents, sorted by relevance to the query.", From 8dae95fd69de7d0cc6bd9240b8e94406c8f00c6b Mon Sep 17 00:00:00 2001 From: chuang8511 Date: Mon, 9 Dec 2024 23:15:42 +0000 Subject: [PATCH 05/11] feat: implement batch upsert function --- pkg/component/data/pinecone/v0/io.go | 19 ++++- pkg/component/data/pinecone/v0/main.go | 11 ++- .../data/pinecone/v0/task_batch_upsert.go | 76 +++++++++++++++++++ 3 files changed, 98 insertions(+), 8 deletions(-) create mode 100644 pkg/component/data/pinecone/v0/task_batch_upsert.go diff --git a/pkg/component/data/pinecone/v0/io.go b/pkg/component/data/pinecone/v0/io.go index 7e16cc785..46e355f9c 100644 --- a/pkg/component/data/pinecone/v0/io.go +++ b/pkg/component/data/pinecone/v0/io.go @@ -2,12 +2,23 @@ package pinecone type taskUpsertInput struct { - ID string `instill:"id"` - Metadata string `instill:"metadata"` - Values []float64 `instill:"values"` - Namespace string `instill:"namespace"` + ID string `instill:"id"` + Metadata interface{} `instill:"metadata"` + Values []float64 `instill:"values"` + Namespace string `instill:"namespace"` } type taskUpsertOutput struct { UpsertedCount int64 `instill:"upserted-count"` } + +type taskBatchUpsertInput struct { + IDs []string `instill:"ids"` + ArrayMetadata []interface{} `instill:"array-metadata"` + ArrayValues [][]float64 `instill:"array-values"` + Namespace string `instill:"namespace"` +} + +type taskBatchUpsertOutput struct { + UpsertedCount int64 `instill:"upserted-count"` +} diff --git a/pkg/component/data/pinecone/v0/main.go b/pkg/component/data/pinecone/v0/main.go index b5ab289ce..9a594a7e4 100644 --- a/pkg/component/data/pinecone/v0/main.go +++ b/pkg/component/data/pinecone/v0/main.go @@ -15,9 +15,10 @@ import ( ) const ( - taskQuery = "TASK_QUERY" - taskUpsert = "TASK_UPSERT" - taskRerank = "TASK_RERANK" + taskQuery = "TASK_QUERY" + taskUpsert = "TASK_UPSERT" + taskRerank = "TASK_RERANK" + taskBatchUpsert = "TASK_BATCH_UPSERT" upsertPath = "/vectors/upsert" queryPath = "/query" @@ -72,6 +73,8 @@ func (c *component) CreateExecution(x base.ComponentExecution) (base.IExecution, // Now, only upsert task is refactored, the rest will be addressed in ins-7102 case taskUpsert: e.execute = e.upsert + case taskBatchUpsert: + e.execute = e.batchUpsert } return e, nil @@ -117,7 +120,7 @@ func getURL(setup *structpb.Struct) string { func (e *execution) Execute(ctx context.Context, jobs []*base.Job) error { // TODO: We will need to migrate other tasks to use the new logic. - if e.Task == taskUpsert { + if e.Task == taskUpsert || e.Task == taskBatchUpsert { return base.ConcurrentExecutor(ctx, jobs, e.execute) } diff --git a/pkg/component/data/pinecone/v0/task_batch_upsert.go b/pkg/component/data/pinecone/v0/task_batch_upsert.go new file mode 100644 index 000000000..6e575bd8f --- /dev/null +++ b/pkg/component/data/pinecone/v0/task_batch_upsert.go @@ -0,0 +1,76 @@ +package pinecone + +import ( + "context" + "fmt" + + "github.com/instill-ai/pipeline-backend/pkg/component/base" +) + +func (e *execution) batchUpsert(ctx context.Context, job *base.Job) error { + + input := taskBatchUpsertInput{} + if err := job.Input.ReadData(ctx, &input); err != nil { + err = fmt.Errorf("reading input data: %w", err) + job.Error.Error(ctx, err) + return err + } + + if err := valid(input); err != nil { + err = fmt.Errorf("validate input: %w", err) + job.Error.Error(ctx, err) + return err + } + + req := newIndexClient(e.Setup, e.GetLogger()).R() + + upsertReq := convertBatchUpsertInput(input) + + resp := upsertResp{} + + req.SetResult(&resp).SetBody(upsertReq) + if _, err := req.Post(upsertPath); err != nil { + err = fmt.Errorf("upserting vectors: %w", err) + job.Error.Error(ctx, err) + return err + } + + output := taskBatchUpsertOutput{ + UpsertedCount: resp.RecordsUpserted, + } + + if err := job.Output.WriteData(ctx, output); err != nil { + err = fmt.Errorf("writing output data: %w", err) + job.Error.Error(ctx, err) + return err + } + + return nil +} + +func valid(input taskBatchUpsertInput) error { + if len(input.IDs) != len(input.ArrayValues) { + return fmt.Errorf("ids and array-values must have the same length") + } + if len(input.ArrayValues) == 0 { + return fmt.Errorf("array-values must not be empty") + } + return nil +} + +func convertBatchUpsertInput(input taskBatchUpsertInput) upsertReq { + + vectors := make([]vector, len(input.IDs)) + for i, id := range input.IDs { + vectors[i] = vector{ + ID: id, + Values: input.ArrayValues[i], + Metadata: input.ArrayMetadata[i], + } + } + + return upsertReq{ + Vectors: vectors, + Namespace: input.Namespace, + } +} From ec551973a59671a47ec204023be0687719f056bf Mon Sep 17 00:00:00 2001 From: chuang8511 Date: Mon, 9 Dec 2024 23:44:23 +0000 Subject: [PATCH 06/11] chore: remove test code --- .../data/pinecone/v0/component_test.go | 68 ------------------- 1 file changed, 68 deletions(-) diff --git a/pkg/component/data/pinecone/v0/component_test.go b/pkg/component/data/pinecone/v0/component_test.go index 9c53a34ff..51ca583a7 100644 --- a/pkg/component/data/pinecone/v0/component_test.go +++ b/pkg/component/data/pinecone/v0/component_test.go @@ -16,7 +16,6 @@ import ( "github.com/instill-ai/pipeline-backend/pkg/component/base" "github.com/instill-ai/pipeline-backend/pkg/component/internal/mock" "github.com/instill-ai/pipeline-backend/pkg/component/internal/util/httpclient" - "github.com/instill-ai/x/errmsg" ) const ( @@ -42,13 +41,6 @@ const ( } ] }` - - errResp = ` -{ - "code": 3, - "message": "Cannot provide both ID and vector at the same time.", - "details": [] -}` ) var ( @@ -237,64 +229,4 @@ func TestComponent_Execute(t *testing.T) { }) } - c.Run("nok - 400", func(c *qt.C) { - h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(http.StatusBadRequest) - fmt.Fprintln(w, errResp) - }) - - pineconeServer := httptest.NewServer(h) - c.Cleanup(pineconeServer.Close) - - setup, _ := structpb.NewStruct(map[string]any{ - "url": pineconeServer.URL, - }) - - exec, err := cmp.CreateExecution(base.ComponentExecution{ - Component: cmp, - Setup: setup, - Task: taskUpsert, - }) - c.Assert(err, qt.IsNil) - - pbIn := new(structpb.Struct) - ir, ow, eh, job := mock.GenerateMockJob(c) - ir.ReadMock.Return(pbIn, nil) - ow.WriteMock.Optional().Return(nil) - eh.ErrorMock.Optional().Set(func(ctx context.Context, err error) { - want := "Pinecone responded with a 400 status code. Cannot provide both ID and vector at the same time." - c.Check(errmsg.Message(err), qt.Equals, want) - }) - - err = exec.Execute(ctx, []*base.Job{job}) - c.Check(err, qt.IsNil) - - }) - - c.Run("nok - URL misconfiguration", func(c *qt.C) { - setup, _ := structpb.NewStruct(map[string]any{ - "url": "http://no-such.host", - }) - - exec, err := cmp.CreateExecution(base.ComponentExecution{ - Component: cmp, - Setup: setup, - Task: taskUpsert, - }) - c.Assert(err, qt.IsNil) - - pbIn := new(structpb.Struct) - ir, ow, eh, job := mock.GenerateMockJob(c) - ir.ReadMock.Return(pbIn, nil) - ow.WriteMock.Optional().Return(nil) - eh.ErrorMock.Optional().Set(func(ctx context.Context, err error) { - want := "Failed to call http://no-such.host/.*. Please check that the component configuration is correct." - c.Check(errmsg.Message(err), qt.Matches, want) - }) - - err = exec.Execute(ctx, []*base.Job{job}) - c.Check(err, qt.IsNil) - - }) } From d4f3d0a08113e31e1f9776c4ef7258ab4b5ab331 Mon Sep 17 00:00:00 2001 From: chuang8511 Date: Tue, 10 Dec 2024 11:02:32 +0000 Subject: [PATCH 07/11] chore: revert test code and add comment --- .../data/pinecone/v0/component_test.go | 86 +++++++++++++++++++ 1 file changed, 86 insertions(+) diff --git a/pkg/component/data/pinecone/v0/component_test.go b/pkg/component/data/pinecone/v0/component_test.go index 51ca583a7..3243e0f9f 100644 --- a/pkg/component/data/pinecone/v0/component_test.go +++ b/pkg/component/data/pinecone/v0/component_test.go @@ -16,6 +16,7 @@ import ( "github.com/instill-ai/pipeline-backend/pkg/component/base" "github.com/instill-ai/pipeline-backend/pkg/component/internal/mock" "github.com/instill-ai/pipeline-backend/pkg/component/internal/util/httpclient" + "github.com/instill-ai/x/errmsg" ) const ( @@ -23,6 +24,8 @@ const ( namespace = "pantone" threshold = 0.9 + // upsertOK = `{"upsertedCount": 1}` + queryOK = ` { "namespace": "color-schemes", @@ -41,6 +44,14 @@ const ( } ] }` + + errResp = ` + + { + "code": 3, + "message": "Cannot provide both ID and vector at the same time.", + "details": [] + }` ) var ( @@ -95,6 +106,21 @@ func TestComponent_Execute(t *testing.T) { wantClientReq any clientResp string }{ + // TODO: #927 removed the `taskUpsert` testcase. Reintroduce it in INS-7102. + // { + // name: "ok - upsert", + + // task: taskUpsert, + // execIn: upsertInput{ + // vector: vectorA, + // Namespace: namespace, + // }, + // wantExec: upsertOutput{RecordsUpserted: 1}, + + // wantClientPath: upsertPath, + // wantClientReq: upsertReq{Vectors: []vector{vectorA}, Namespace: namespace}, + // clientResp: upsertOK, + // }, { name: "ok - query by vector", @@ -229,4 +255,64 @@ func TestComponent_Execute(t *testing.T) { }) } + c.Run("nok - 400", func(c *qt.C) { + h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusBadRequest) + fmt.Fprintln(w, errResp) + }) + + pineconeServer := httptest.NewServer(h) + c.Cleanup(pineconeServer.Close) + + setup, _ := structpb.NewStruct(map[string]any{ + "url": pineconeServer.URL, + }) + + exec, err := cmp.CreateExecution(base.ComponentExecution{ + Component: cmp, + Setup: setup, + Task: taskQuery, + }) + c.Assert(err, qt.IsNil) + + pbIn := new(structpb.Struct) + ir, ow, eh, job := mock.GenerateMockJob(c) + ir.ReadMock.Return(pbIn, nil) + ow.WriteMock.Optional().Return(nil) + eh.ErrorMock.Optional().Set(func(ctx context.Context, err error) { + want := "Pinecone responded with a 400 status code. Cannot provide both ID and vector at the same time." + c.Check(errmsg.Message(err), qt.Equals, want) + }) + + err = exec.Execute(ctx, []*base.Job{job}) + c.Check(err, qt.IsNil) + + }) + + c.Run("nok - URL misconfiguration", func(c *qt.C) { + setup, _ := structpb.NewStruct(map[string]any{ + "url": "http://no-such.host", + }) + + exec, err := cmp.CreateExecution(base.ComponentExecution{ + Component: cmp, + Setup: setup, + Task: taskQuery, + }) + c.Assert(err, qt.IsNil) + + pbIn := new(structpb.Struct) + ir, ow, eh, job := mock.GenerateMockJob(c) + ir.ReadMock.Return(pbIn, nil) + ow.WriteMock.Optional().Return(nil) + eh.ErrorMock.Optional().Set(func(ctx context.Context, err error) { + want := "Failed to call http://no-such.host/.*. Please check that the component configuration is correct." + c.Check(errmsg.Message(err), qt.Matches, want) + }) + + err = exec.Execute(ctx, []*base.Job{job}) + c.Check(err, qt.IsNil) + + }) } From df6ba0c1e4b324ef1ede461c0cb2c0e06889a401 Mon Sep 17 00:00:00 2001 From: chuang8511 Date: Tue, 10 Dec 2024 11:11:12 +0000 Subject: [PATCH 08/11] chore: clean the code --- pkg/component/data/pinecone/v0/README.mdx | 2 +- pkg/component/data/pinecone/v0/config/tasks.json | 2 +- pkg/component/data/pinecone/v0/io.go | 6 ++++++ pkg/component/data/pinecone/v0/structs.go | 6 ------ 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/pkg/component/data/pinecone/v0/README.mdx b/pkg/component/data/pinecone/v0/README.mdx index 10b13f7a3..f4dea13b0 100644 --- a/pkg/component/data/pinecone/v0/README.mdx +++ b/pkg/component/data/pinecone/v0/README.mdx @@ -138,7 +138,7 @@ Writes vectors into a namespace in batch. If a new value is upserted for an exis | Input | ID | Type | Description | | :--- | :--- | :--- | :--- | | Task ID (required) | `task` | string | `TASK_BATCH_UPSERT` | -| IDs (required) | `ids` | array[string] | The ids of the vectors to upsert. | +| IDs (required) | `ids` | array[string] | The IDs of the vectors to upsert. | | [Array of Metadata](#batch-upsert-array-of-metadata) | `array-metadata` | array[object] | The metadata of the vectors to upsert. | | Array of Values (required) | `array-values` | array[array] | The values of the vectors to upsert. The type will be array of array of number. | | Namespace | `namespace` | string | The namespace to upsert the vectors into. | diff --git a/pkg/component/data/pinecone/v0/config/tasks.json b/pkg/component/data/pinecone/v0/config/tasks.json index c6906aecb..caccc0116 100644 --- a/pkg/component/data/pinecone/v0/config/tasks.json +++ b/pkg/component/data/pinecone/v0/config/tasks.json @@ -298,7 +298,7 @@ "instillUIOrder": 0, "properties": { "ids": { - "description": "The ids of the vectors to upsert.", + "description": "The IDs of the vectors to upsert.", "instillUIOrder": 0, "instillFormat": "array:string", "instillUpstreamTypes": [ diff --git a/pkg/component/data/pinecone/v0/io.go b/pkg/component/data/pinecone/v0/io.go index 46e355f9c..167ee4c75 100644 --- a/pkg/component/data/pinecone/v0/io.go +++ b/pkg/component/data/pinecone/v0/io.go @@ -22,3 +22,9 @@ type taskBatchUpsertInput struct { type taskBatchUpsertOutput struct { UpsertedCount int64 `instill:"upserted-count"` } + +type vector struct { + ID string `instill:"id" json:"id"` + Values []float64 `instill:"values" json:"values,omitempty"` + Metadata interface{} `instill:"metadata" json:"metadata,omitempty"` +} diff --git a/pkg/component/data/pinecone/v0/structs.go b/pkg/component/data/pinecone/v0/structs.go index ecfccbc8f..bd41ad214 100644 --- a/pkg/component/data/pinecone/v0/structs.go +++ b/pkg/component/data/pinecone/v0/structs.go @@ -64,12 +64,6 @@ type upsertReq struct { Namespace string `json:"namespace,omitempty"` } -type vector struct { - ID string `json:"id"` - Values []float64 `json:"values,omitempty"` - Metadata interface{} `json:"metadata,omitempty"` -} - type upsertResp struct { RecordsUpserted int64 `json:"upsertedCount"` } From 08842a88fa087d60ac89e15b431c8c7668dfdb1d Mon Sep 17 00:00:00 2001 From: chuang8511 Date: Tue, 10 Dec 2024 11:14:41 +0000 Subject: [PATCH 09/11] chore: clean the code --- pkg/component/data/pinecone/v0/io.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/pkg/component/data/pinecone/v0/io.go b/pkg/component/data/pinecone/v0/io.go index 167ee4c75..fe9d3bf32 100644 --- a/pkg/component/data/pinecone/v0/io.go +++ b/pkg/component/data/pinecone/v0/io.go @@ -2,10 +2,8 @@ package pinecone type taskUpsertInput struct { - ID string `instill:"id"` - Metadata interface{} `instill:"metadata"` - Values []float64 `instill:"values"` - Namespace string `instill:"namespace"` + vector + Namespace string `instill:"namespace"` } type taskUpsertOutput struct { From c74b05ac214729bd179cca9708e33789664aa25b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20Vall=C3=A9s?= Date: Tue, 10 Dec 2024 19:54:32 +0100 Subject: [PATCH 10/11] feat: rewrite batch upsert input as an array of vectors --- pkg/component/data/pinecone/v0/README.mdx | 25 +++- .../data/pinecone/v0/component_test.go | 72 ++++++----- .../data/pinecone/v0/config/tasks.json | 119 ++++++++++-------- pkg/component/data/pinecone/v0/io.go | 27 ++-- .../data/pinecone/v0/task_batch_upsert.go | 58 ++------- pkg/component/data/pinecone/v0/task_upsert.go | 44 ++----- 6 files changed, 160 insertions(+), 185 deletions(-) diff --git a/pkg/component/data/pinecone/v0/README.mdx b/pkg/component/data/pinecone/v0/README.mdx index f4dea13b0..d2f7461a7 100644 --- a/pkg/component/data/pinecone/v0/README.mdx +++ b/pkg/component/data/pinecone/v0/README.mdx @@ -103,7 +103,7 @@ Retrieve the ids of the most similar items in a namespace, along with their simi ### Upsert -Writes vectors into a namespace. If a new value is upserted for an existing vector id, it will overwrite the previous value. +Writes vectors into a namespace. If a new value is upserted for an existing vector id, it will overwrite the previous value. This task will be soon replaced by `TASK_BATCH_UPSERT`, which extends its functionality.
@@ -131,21 +131,34 @@ Writes vectors into a namespace. If a new value is upserted for an existing vect ### Batch Upsert -Writes vectors into a namespace in batch. If a new value is upserted for an existing vector id, it will overwrite the previous value. +Writes vectors into a namespace. If a new value is upserted for an existing vector ID, it will overwrite the previous value.
| Input | ID | Type | Description | | :--- | :--- | :--- | :--- | | Task ID (required) | `task` | string | `TASK_BATCH_UPSERT` | -| IDs (required) | `ids` | array[string] | The IDs of the vectors to upsert. | -| [Array of Metadata](#batch-upsert-array-of-metadata) | `array-metadata` | array[object] | The metadata of the vectors to upsert. | -| Array of Values (required) | `array-values` | array[array] | The values of the vectors to upsert. The type will be array of array of number. | -| Namespace | `namespace` | string | The namespace to upsert the vectors into. | +| [Vectors](#batch-upsert-vectors) (required) | `vectors` | array[object] | Array of vectors to upsert | +| Namespace | `namespace` | string | The namespace to query. |
+
+ Input Objects in Batch Upsert + +

Vectors

+ +Array of vectors to upsert + +
+| Field | Field ID | Type | Note | +| :--- | :--- | :--- | :--- | +| ID | `id` | string | The unique ID of the vector. | +| Metadata | `metadata` | object | The vector metadata. This is a set of key-value pairs that can be used to store additional information about the vector. The values can have the following types: string, number, boolean, or array of strings. | +| Values | `values` | array | An array of dimensions for the vector to be saved. | +
+
diff --git a/pkg/component/data/pinecone/v0/component_test.go b/pkg/component/data/pinecone/v0/component_test.go index 3243e0f9f..b0d9c3504 100644 --- a/pkg/component/data/pinecone/v0/component_test.go +++ b/pkg/component/data/pinecone/v0/component_test.go @@ -24,7 +24,7 @@ const ( namespace = "pantone" threshold = 0.9 - // upsertOK = `{"upsertedCount": 1}` + upsertOK = `{"upsertedCount": 2}` queryOK = ` { @@ -58,12 +58,12 @@ var ( vectorA = vector{ ID: "A", Values: []float64{2.23}, - Metadata: map[string]any{"color": "pumpkin"}, + Metadata: map[string]string{"color": "pumpkin"}, } vectorB = vector{ ID: "B", Values: []float64{3.32}, - Metadata: map[string]any{"color": "cerulean"}, + Metadata: map[string]string{"color": "cerulean"}, } queryByVector = queryInput{ Namespace: "color-schemes", @@ -106,21 +106,20 @@ func TestComponent_Execute(t *testing.T) { wantClientReq any clientResp string }{ - // TODO: #927 removed the `taskUpsert` testcase. Reintroduce it in INS-7102. - // { - // name: "ok - upsert", - - // task: taskUpsert, - // execIn: upsertInput{ - // vector: vectorA, - // Namespace: namespace, - // }, - // wantExec: upsertOutput{RecordsUpserted: 1}, - - // wantClientPath: upsertPath, - // wantClientReq: upsertReq{Vectors: []vector{vectorA}, Namespace: namespace}, - // clientResp: upsertOK, - // }, + { + name: "ok - upsert", + + task: taskBatchUpsert, + execIn: taskBatchUpsertInput{ + Vectors: []vector{vectorA, vectorB}, + Namespace: namespace, + }, + wantExec: taskUpsertOutput{UpsertedCount: 2}, + + wantClientPath: upsertPath, + wantClientReq: upsertReq{Vectors: []vector{vectorA, vectorB}, Namespace: namespace}, + clientResp: upsertOK, + }, { name: "ok - query by vector", @@ -236,22 +235,39 @@ func TestComponent_Execute(t *testing.T) { }) c.Assert(err, qt.IsNil) - pbIn, err := base.ConvertToStructpb(tc.execIn) - c.Assert(err, qt.IsNil) + wantJSON, err := json.Marshal(tc.wantExec) ir, ow, eh, job := mock.GenerateMockJob(c) - ir.ReadMock.Return(pbIn, nil) - ow.WriteMock.Optional().Set(func(ctx context.Context, output *structpb.Struct) (err error) { - wantJSON, err := json.Marshal(tc.wantExec) + c.Assert(err, qt.IsNil) + + switch tc.task { + case taskBatchUpsert: + ir.ReadDataMock.Set(func(ctx context.Context, in any) error { + switch in := in.(type) { + case *taskBatchUpsertInput: + *in = tc.execIn.(taskBatchUpsertInput) + } + return nil + }) + + ow.WriteDataMock.Optional().Set(func(ctx context.Context, output any) error { + c.Check(wantJSON, qt.JSONEquals, output) + return nil + }) + default: + pbIn, err := base.ConvertToStructpb(tc.execIn) c.Assert(err, qt.IsNil) - c.Check(wantJSON, qt.JSONEquals, output.AsMap()) - return nil - }) - eh.ErrorMock.Optional() + ir.ReadMock.Return(pbIn, nil) + ow.WriteMock.Optional().Set(func(ctx context.Context, output *structpb.Struct) (err error) { + c.Check(wantJSON, qt.JSONEquals, output.AsMap()) + return nil + }) + } + + eh.ErrorMock.Optional() err = exec.Execute(ctx, []*base.Job{job}) c.Assert(err, qt.IsNil) - }) } diff --git a/pkg/component/data/pinecone/v0/config/tasks.json b/pkg/component/data/pinecone/v0/config/tasks.json index caccc0116..f9500621a 100644 --- a/pkg/component/data/pinecone/v0/config/tasks.json +++ b/pkg/component/data/pinecone/v0/config/tasks.json @@ -201,6 +201,7 @@ }, "TASK_UPSERT": { "instillShortDescription": "Writes vectors into a namespace. If a new value is upserted for an existing vector id, it will overwrite the previous value.", + "description": "Writes vectors into a namespace. If a new value is upserted for an existing vector id, it will overwrite the previous value. This task will be soon replaced by `TASK_BATCH_UPSERT`, which extends its functionality.", "input": { "instillUIOrder": 0, "properties": { @@ -293,70 +294,79 @@ } }, "TASK_BATCH_UPSERT": { - "instillShortDescription": "Writes vectors into a namespace in batch. If a new value is upserted for an existing vector id, it will overwrite the previous value.", + "instillShortDescription": "Writes vectors into a namespace. If a new value is upserted for an existing vector ID, it will overwrite the previous value.", "input": { "instillUIOrder": 0, "properties": { - "ids": { - "description": "The IDs of the vectors to upsert.", + "vectors": { + "description": "Array of vectors to upsert", "instillUIOrder": 0, - "instillFormat": "array:string", - "instillUpstreamTypes": [ - "value", - "reference" - ], - "items": { - "type": "string" - }, - "minItems": 1, - "title": "IDs", - "type": "array" - }, - "array-metadata": { - "instillShortDescription": "The metadata of the vectors to upsert", - "description": "The metadata of the vectors to upsert.", - "instillUIOrder": 1, - "instillUpstreamTypes": [ - "reference", - "value" - ], - "items": { - "type": "object" - }, - "instillFormat": "array:semi-structured/object", - "minItems": 1, - "title": "Array of Metadata", - "type": "array" - }, - "array-values": { - "description": "The values of the vectors to upsert. The type will be array of array of number.", - "instillUIOrder": 2, - "instillUpstreamTypes": [ - "reference" - ], + "type": "array", "items": { - "instillFormat": "array:number", - "instillAcceptFormats": [ - "array:number", - "array:integer" - ], - "type": "array" + "type": "object", + "properties": { + "id": { + "description": "The unique ID of the vector.", + "instillUIOrder": 0, + "instillAcceptFormats": [ + "string" + ], + "instillUpstreamTypes": [ + "value", + "reference", + "template" + ], + "title": "ID", + "type": "string" + }, + "metadata": { + "description": "The vector metadata. This is a set of key-value pairs that can be used to store additional information about the vector. The values can have the following types: string, number, boolean, or array of strings.", + "instillUIOrder": 1, + "instillAcceptFormats": [ + "semi-structured/object" + ], + "instillShortDescription": "The vector metadata", + "instillUpstreamTypes": [ + "reference" + ], + "required": [], + "title": "Metadata", + "type": "object" + }, + "values": { + "description": "An array of dimensions for the vector to be saved.", + "instillUIOrder": 2, + "instillAcceptFormats": [ + "array:number", + "array:integer" + ], + "instillUpstreamTypes": [ + "reference" + ], + "items": { + "description": "A dimension of the vector.", + "example": 0.8167237, + "type": "number" + }, + "minItems": 1, + "title": "Values", + "type": "array" + } + }, + "required": [ + "id", + "values" + ] }, - "instillFormat": "array:array:number", - "instillAcceptFormats": [ - "array:array:number", - "array:array:integer" - ], "minItems": 1, - "title": "Array of Values", - "type": "array" + "title": "Vectors" }, "namespace": { - "description": "The namespace to upsert the vectors into.", + "description": "The namespace to query.", "instillAcceptFormats": [ "string" ], - "instillUIOrder": 3, + "instillUIOrder": 1, "instillUpstreamTypes": [ "value", "reference", @@ -367,14 +377,13 @@ } }, "required": [ - "ids", - "array-values" + "vectors" ], "title": "Input", "type": "object" }, "output": { - "instillUIOrder": 1, + "instillUIOrder": 0, "properties": { "upserted-count": { "description": "Number of records modified or added.", diff --git a/pkg/component/data/pinecone/v0/io.go b/pkg/component/data/pinecone/v0/io.go index fe9d3bf32..99287424c 100644 --- a/pkg/component/data/pinecone/v0/io.go +++ b/pkg/component/data/pinecone/v0/io.go @@ -1,28 +1,23 @@ -// TASK_QUERY and TASK_RERANK are not refactored yet, they will be addressed in ins-7102 +// TODO: TASK_QUERY and TASK_RERANK are not refactored yet, they will be +// addressed in INS-7102. package pinecone +type vector struct { + ID string `json:"id" instill:"id"` + Values []float64 `json:"values,omitempty" instill:"values"` + Metadata map[string]string `json:"metadata,omitempty" instill:"metadata"` +} + type taskUpsertInput struct { vector Namespace string `instill:"namespace"` } -type taskUpsertOutput struct { - UpsertedCount int64 `instill:"upserted-count"` -} - type taskBatchUpsertInput struct { - IDs []string `instill:"ids"` - ArrayMetadata []interface{} `instill:"array-metadata"` - ArrayValues [][]float64 `instill:"array-values"` - Namespace string `instill:"namespace"` + Vectors []vector `instill:"vectors"` + Namespace string `instill:"namespace"` } -type taskBatchUpsertOutput struct { +type taskUpsertOutput struct { UpsertedCount int64 `instill:"upserted-count"` } - -type vector struct { - ID string `instill:"id" json:"id"` - Values []float64 `instill:"values" json:"values,omitempty"` - Metadata interface{} `instill:"metadata" json:"metadata,omitempty"` -} diff --git a/pkg/component/data/pinecone/v0/task_batch_upsert.go b/pkg/component/data/pinecone/v0/task_batch_upsert.go index 6e575bd8f..390f5c46b 100644 --- a/pkg/component/data/pinecone/v0/task_batch_upsert.go +++ b/pkg/component/data/pinecone/v0/task_batch_upsert.go @@ -5,10 +5,10 @@ import ( "fmt" "github.com/instill-ai/pipeline-backend/pkg/component/base" + "github.com/instill-ai/pipeline-backend/pkg/component/internal/util/httpclient" ) func (e *execution) batchUpsert(ctx context.Context, job *base.Job) error { - input := taskBatchUpsertInput{} if err := job.Input.ReadData(ctx, &input); err != nil { err = fmt.Errorf("reading input data: %w", err) @@ -16,30 +16,25 @@ func (e *execution) batchUpsert(ctx context.Context, job *base.Job) error { return err } - if err := valid(input); err != nil { - err = fmt.Errorf("validate input: %w", err) - job.Error.Error(ctx, err) - return err - } - - req := newIndexClient(e.Setup, e.GetLogger()).R() - - upsertReq := convertBatchUpsertInput(input) - resp := upsertResp{} - - req.SetResult(&resp).SetBody(upsertReq) - if _, err := req.Post(upsertPath); err != nil { - err = fmt.Errorf("upserting vectors: %w", err) + _, err := newIndexClient(e.Setup, e.GetLogger()). + R(). + SetResult(&resp). + SetBody(upsertReq{ + Vectors: input.Vectors, + Namespace: input.Namespace, + }). + Post(upsertPath) + if err != nil { + err = httpclient.WrapURLError(fmt.Errorf("upserting vectors: %w", err)) job.Error.Error(ctx, err) return err } - output := taskBatchUpsertOutput{ + if err := job.Output.WriteData(ctx, &taskUpsertOutput{ UpsertedCount: resp.RecordsUpserted, - } + }); err != nil { - if err := job.Output.WriteData(ctx, output); err != nil { err = fmt.Errorf("writing output data: %w", err) job.Error.Error(ctx, err) return err @@ -47,30 +42,3 @@ func (e *execution) batchUpsert(ctx context.Context, job *base.Job) error { return nil } - -func valid(input taskBatchUpsertInput) error { - if len(input.IDs) != len(input.ArrayValues) { - return fmt.Errorf("ids and array-values must have the same length") - } - if len(input.ArrayValues) == 0 { - return fmt.Errorf("array-values must not be empty") - } - return nil -} - -func convertBatchUpsertInput(input taskBatchUpsertInput) upsertReq { - - vectors := make([]vector, len(input.IDs)) - for i, id := range input.IDs { - vectors[i] = vector{ - ID: id, - Values: input.ArrayValues[i], - Metadata: input.ArrayMetadata[i], - } - } - - return upsertReq{ - Vectors: vectors, - Namespace: input.Namespace, - } -} diff --git a/pkg/component/data/pinecone/v0/task_upsert.go b/pkg/component/data/pinecone/v0/task_upsert.go index 1fc5a4e02..bdf7058ad 100644 --- a/pkg/component/data/pinecone/v0/task_upsert.go +++ b/pkg/component/data/pinecone/v0/task_upsert.go @@ -5,10 +5,10 @@ import ( "fmt" "github.com/instill-ai/pipeline-backend/pkg/component/base" + "github.com/instill-ai/pipeline-backend/pkg/component/internal/util/httpclient" ) func (e *execution) upsert(ctx context.Context, job *base.Job) error { - input := taskUpsertInput{} if err := job.Input.ReadData(ctx, &input); err != nil { err = fmt.Errorf("reading input data: %w", err) @@ -18,20 +18,21 @@ func (e *execution) upsert(ctx context.Context, job *base.Job) error { req := newIndexClient(e.Setup, e.GetLogger()).R() - upsertReq := convertInput(input) - resp := upsertResp{} - - req.SetResult(&resp).SetBody(upsertReq) + req.SetResult(&resp).SetBody(upsertReq{ + Vectors: []vector{input.vector}, + Namespace: input.Namespace, + }) if _, err := req.Post(upsertPath); err != nil { - err = fmt.Errorf("upserting vectors: %w", err) + err = httpclient.WrapURLError(fmt.Errorf("upserting vectors: %w", err)) job.Error.Error(ctx, err) return err } - output := convertOutput(resp) + if err := job.Output.WriteData(ctx, &taskUpsertOutput{ + UpsertedCount: resp.RecordsUpserted, + }); err != nil { - if err := job.Output.WriteData(ctx, output); err != nil { err = fmt.Errorf("writing output data: %w", err) job.Error.Error(ctx, err) return err @@ -39,30 +40,3 @@ func (e *execution) upsert(ctx context.Context, job *base.Job) error { return nil } - -func convertInput(input taskUpsertInput) upsertReq { - - upsertReq := upsertReq{ - Vectors: []vector{}, - Namespace: input.Namespace, - } - - vector := vector{ - ID: input.ID, - Values: input.Values, - } - - if input.Metadata != "" { - vector.Metadata = input.Metadata - } - - upsertReq.Vectors = append(upsertReq.Vectors, vector) - - return upsertReq -} - -func convertOutput(resp upsertResp) *taskUpsertOutput { - return &taskUpsertOutput{ - UpsertedCount: resp.RecordsUpserted, - } -} From 9e871252b92542bdc6b9cce1293968b6cd9e8e6d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20Vall=C3=A9s?= Date: Wed, 11 Dec 2024 09:34:37 +0100 Subject: [PATCH 11/11] fix: take any type in vector metadata --- go.mod | 6 ++ go.sum | 10 ++++ .../data/pinecone/v0/component_test.go | 33 +++++++---- pkg/component/data/pinecone/v0/io.go | 57 ++++++++++++++++++- pkg/component/data/pinecone/v0/structs.go | 17 ++---- .../data/pinecone/v0/task_batch_upsert.go | 13 +++-- pkg/component/data/pinecone/v0/task_upsert.go | 29 +++++++--- 7 files changed, 128 insertions(+), 37 deletions(-) diff --git a/go.mod b/go.mod index 6f69d85ae..8770765be 100644 --- a/go.mod +++ b/go.mod @@ -99,6 +99,12 @@ require ( gorm.io/plugin/dbresolver v1.5.1 ) +require ( + github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect + github.com/oapi-codegen/runtime v1.1.1 // indirect + github.com/pinecone-io/go-pinecone v1.1.1 +) + require ( github.com/machinebox/graphql v0.2.2 github.com/matryer/is v1.4.1 // indirect diff --git a/go.sum b/go.sum index 15c7e9ffa..4b7ddd0cd 100644 --- a/go.sum +++ b/go.sum @@ -481,6 +481,7 @@ github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbt github.com/PuerkitoBio/urlesc v0.0.0-20160726150825-5bd2802263f2/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE= github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 h1:d+Bc7a5rLufV/sSk/8dngufqelfh6jnri85riMAaF/M= github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE= +github.com/RaveNoX/go-jsoncommentstrip v1.0.0/go.mod h1:78ihd09MekBnJnxpICcwzCMzGrKSKYe4AqU6PDYYpjk= github.com/Shopify/logrus-bugsnag v0.0.0-20171204204709-577dee27f20d/go.mod h1:HI8ITrYtUY+O+ZhtlqUnD8+KwNPOyugEhfP9fdUIaEQ= github.com/advancedlogic/GoOse v0.0.0-20191112112754-e742535969c1 h1:d0Ct1dZwgwMO0Llf81Eu+Lyj6kwqXdqHP/WsSkEria0= github.com/advancedlogic/GoOse v0.0.0-20191112112754-e742535969c1/go.mod h1:f3HCSN1fBWjcpGtXyM119MJgeQl838v6so/PQOqvE1w= @@ -516,6 +517,8 @@ github.com/apache/arrow/go/arrow v0.0.0-20210818145353-234c94e4ce64/go.mod h1:2q github.com/apache/arrow/go/arrow v0.0.0-20211013220434-5962184e7a30/go.mod h1:Q7yQnSMnLvcXlZ8RV+jwz/6y1rQTqbX6C82SndT52Zs= github.com/apache/arrow/go/v15 v15.0.2 h1:60IliRbiyTWCWjERBCkO1W4Qun9svcYoZrSLcyOsMLE= github.com/apache/arrow/go/v15 v15.0.2/go.mod h1:DGXsR3ajT524njufqf95822i+KTh+yea1jass9YXgjA= +github.com/apapsch/go-jsonmerge/v2 v2.0.0 h1:axGnT1gRIfimI7gJifB699GoE/oq+F2MU7Dml6nw9rQ= +github.com/apapsch/go-jsonmerge/v2 v2.0.0/go.mod h1:lvDnEdqiQrp0O42VQGgmlKpxL1AP2+08jFMw88y4klk= github.com/araddon/dateparse v0.0.0-20180729174819-cfd92a431d0e/go.mod h1:SLqhdZcd+dF3TEVL2RMoob5bBP5R1P1qkox+HtCBgGI= github.com/araddon/dateparse v0.0.0-20200409225146-d820a6159ab1 h1:TEBmxO80TM04L8IuMWk77SGL1HomBmKTdzdJLLWznxI= github.com/araddon/dateparse v0.0.0-20200409225146-d820a6159ab1/go.mod h1:SLqhdZcd+dF3TEVL2RMoob5bBP5R1P1qkox+HtCBgGI= @@ -579,6 +582,7 @@ github.com/bkaradzic/go-lz4 v1.0.0/go.mod h1:0YdlkowM3VswSROI7qDxhRvJ3sLhlFrRRwj github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84= github.com/blang/semver v3.1.0+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk= github.com/blang/semver v3.5.1+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk= +github.com/bmatcuk/doublestar v1.1.1/go.mod h1:UD6OnuiIn0yFxxA2le/rnRU1G4RaI4UvFv1sNto9p6w= github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= github.com/boombuler/barcode v1.0.0/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8= github.com/bshuster-repo/logrus-logstash-hook v0.4.1/go.mod h1:zsTqEiSzDgAa/8GZR7E1qaXrhYNDKBYy5/dWPTIflbk= @@ -1391,6 +1395,7 @@ github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1 github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= +github.com/juju/gnuflag v0.0.0-20171113085948-2ce1bb71843d/go.mod h1:2PavIy+JPciBPrBUjwbNvtwB6RQlve+hkpll6QSNmOE= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/jung-kurt/gofpdf v1.0.0/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes= @@ -1604,6 +1609,8 @@ github.com/npillmayer/nestext v0.1.3/go.mod h1:h2lrijH8jpicr25dFY+oAJLyzlya6jhnu github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= +github.com/oapi-codegen/runtime v1.1.1 h1:EXLHh0DXIJnWhdRPN2w4MXAzFyE4CskzhNLUmtpMYro= +github.com/oapi-codegen/runtime v1.1.1/go.mod h1:SK9X900oXmPWilYR5/WKPzt3Kqxn/uS/+lbpREv+eCg= github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA= github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= @@ -1692,6 +1699,8 @@ github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi github.com/pierrec/lz4/v4 v4.1.8/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ= github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pinecone-io/go-pinecone v1.1.1 h1:pKoIiYcBIbrR7gaq0JXPiVnNEtevFYeq/AYL7T0NbbE= +github.com/pinecone-io/go-pinecone v1.1.1/go.mod h1:KfJhn4yThX293+fbtrZLnxe2PJYo8557Py062W4FYKk= github.com/pkg/browser v0.0.0-20210706143420-7d21f8c997e2/go.mod h1:HKlIX3XHQyzLZPlr7++PzdhaXEj94dEiJgZDTsxEqUI= github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8/go.mod h1:HKlIX3XHQyzLZPlr7++PzdhaXEj94dEiJgZDTsxEqUI= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= @@ -1843,6 +1852,7 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/spf13/viper v1.4.0/go.mod h1:PTJ7Z/lr49W6bUbkmS1V3by4uWynFiR9p7+dSq/yZzE= github.com/spf13/viper v1.7.0/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg= +github.com/spkg/bom v0.0.0-20160624110644-59b7046e48ad/go.mod h1:qLr4V1qq6nMqFKkMo8ZTx3f+BZEkzsRUY10Xsm2mwU0= github.com/ssor/bom v0.0.0-20170718123548-6386211fdfcf h1:pvbZ0lM0XWPBqUKqFU8cmavspvIl9nulOYwdy6IFRRo= github.com/ssor/bom v0.0.0-20170718123548-6386211fdfcf/go.mod h1:RJID2RhlZKId02nZ62WenDCkgHFerpIOmW0iT7GKmXM= github.com/stefanberger/go-pkcs11uri v0.0.0-20201008174630-78d3cae3a980/go.mod h1:AO3tvPzVZ/ayst6UlUKUv6rcPQInYe3IknH3jYhAKu8= diff --git a/pkg/component/data/pinecone/v0/component_test.go b/pkg/component/data/pinecone/v0/component_test.go index b0d9c3504..ccac2d918 100644 --- a/pkg/component/data/pinecone/v0/component_test.go +++ b/pkg/component/data/pinecone/v0/component_test.go @@ -9,6 +9,7 @@ import ( "net/http/httptest" "testing" + "github.com/pinecone-io/go-pinecone/pinecone" "google.golang.org/protobuf/types/known/structpb" qt "github.com/frankban/quicktest" @@ -16,6 +17,8 @@ import ( "github.com/instill-ai/pipeline-backend/pkg/component/base" "github.com/instill-ai/pipeline-backend/pkg/component/internal/mock" "github.com/instill-ai/pipeline-backend/pkg/component/internal/util/httpclient" + "github.com/instill-ai/pipeline-backend/pkg/data" + "github.com/instill-ai/pipeline-backend/pkg/data/format" "github.com/instill-ai/x/errmsg" ) @@ -54,16 +57,21 @@ const ( }` ) +func newValue(in any) format.Value { + v, _ := data.NewValue(in) + return v +} + var ( vectorA = vector{ ID: "A", - Values: []float64{2.23}, - Metadata: map[string]string{"color": "pumpkin"}, + Values: []float32{2.23}, + Metadata: map[string]format.Value{"color": newValue("pumpkin")}, } vectorB = vector{ ID: "B", - Values: []float64{3.32}, - Metadata: map[string]string{"color": "cerulean"}, + Values: []float32{3.32}, + Metadata: map[string]format.Value{"color": newValue("cerulean")}, } queryByVector = queryInput{ Namespace: "color-schemes", @@ -95,6 +103,11 @@ func TestComponent_Execute(t *testing.T) { c := qt.New(t) ctx := context.Background() + pvA, err := vectorA.toPinecone() + c.Assert(err, qt.IsNil) + pvB, err := vectorB.toPinecone() + c.Assert(err, qt.IsNil) + testcases := []struct { name string @@ -117,7 +130,7 @@ func TestComponent_Execute(t *testing.T) { wantExec: taskUpsertOutput{UpsertedCount: 2}, wantClientPath: upsertPath, - wantClientReq: upsertReq{Vectors: []vector{vectorA, vectorB}, Namespace: namespace}, + wantClientReq: upsertReq{Vectors: []*pinecone.Vector{pvA, pvB}, Namespace: namespace}, clientResp: upsertOK, }, { @@ -129,11 +142,11 @@ func TestComponent_Execute(t *testing.T) { Namespace: "color-schemes", Matches: []match{ { - vector: vectorA, + Vector: pvA, Score: 0.99, }, { - vector: vectorB, + Vector: pvB, Score: 0.87, }, }, @@ -152,7 +165,7 @@ func TestComponent_Execute(t *testing.T) { Namespace: "color-schemes", Matches: []match{ { - vector: vectorA, + Vector: pvA, Score: 0.99, }, }, @@ -171,11 +184,11 @@ func TestComponent_Execute(t *testing.T) { Namespace: "color-schemes", Matches: []match{ { - vector: vectorA, + Vector: pvA, Score: 0.99, }, { - vector: vectorB, + Vector: pvB, Score: 0.87, }, }, diff --git a/pkg/component/data/pinecone/v0/io.go b/pkg/component/data/pinecone/v0/io.go index 99287424c..90a4676c7 100644 --- a/pkg/component/data/pinecone/v0/io.go +++ b/pkg/component/data/pinecone/v0/io.go @@ -2,10 +2,31 @@ // addressed in INS-7102. package pinecone +import ( + "fmt" + + "github.com/pinecone-io/go-pinecone/pinecone" + + "github.com/instill-ai/pipeline-backend/pkg/data" +) + type vector struct { - ID string `json:"id" instill:"id"` - Values []float64 `json:"values,omitempty" instill:"values"` - Metadata map[string]string `json:"metadata,omitempty" instill:"metadata"` + ID string `instill:"id"` + Values []float32 `instill:"values"` + Metadata data.Map `instill:"metadata"` +} + +func (v vector) toPinecone() (*pinecone.Vector, error) { + metadata, err := v.Metadata.ToStructValue() + if err != nil { + return nil, fmt.Errorf("converting input metadata to request: %w", err) + } + + return &pinecone.Vector{ + Id: v.ID, + Values: v.Values, + Metadata: metadata.GetStructValue(), + }, nil } type taskUpsertInput struct { @@ -13,11 +34,41 @@ type taskUpsertInput struct { Namespace string `instill:"namespace"` } +func (in *taskUpsertInput) asRequest() (*upsertReq, error) { + pv, err := in.vector.toPinecone() + if err != nil { + return nil, err + } + + return &upsertReq{ + Vectors: []*pinecone.Vector{pv}, + Namespace: in.Namespace, + }, nil +} + type taskBatchUpsertInput struct { Vectors []vector `instill:"vectors"` Namespace string `instill:"namespace"` } +func (in *taskBatchUpsertInput) asRequest() (*upsertReq, error) { + req := &upsertReq{ + Vectors: make([]*pinecone.Vector, 0, len(in.Vectors)), + Namespace: in.Namespace, + } + + for _, v := range in.Vectors { + pv, err := v.toPinecone() + if err != nil { + return nil, err + } + + req.Vectors = append(req.Vectors, pv) + } + + return req, nil +} + type taskUpsertOutput struct { UpsertedCount int64 `instill:"upserted-count"` } diff --git a/pkg/component/data/pinecone/v0/structs.go b/pkg/component/data/pinecone/v0/structs.go index bd41ad214..64a6e02d6 100644 --- a/pkg/component/data/pinecone/v0/structs.go +++ b/pkg/component/data/pinecone/v0/structs.go @@ -1,9 +1,11 @@ package pinecone +import "github.com/pinecone-io/go-pinecone/pinecone" + type queryInput struct { Namespace string `json:"namespace"` TopK int64 `json:"top-k"` - Vector []float64 `json:"vector"` + Vector []float32 `json:"vector"` IncludeValues bool `json:"include-values"` IncludeMetadata bool `json:"include-metadata"` ID string `json:"id"` @@ -14,7 +16,7 @@ type queryInput struct { type queryReq struct { Namespace string `json:"namespace"` TopK int64 `json:"topK"` - Vector []float64 `json:"vector,omitempty"` + Vector []float32 `json:"vector,omitempty"` IncludeValues bool `json:"includeValues"` IncludeMetadata bool `json:"includeMetadata"` ID string `json:"id,omitempty"` @@ -55,19 +57,10 @@ func (r queryResp) filterOutBelowThreshold(th float64) queryResp { } type match struct { - vector + *pinecone.Vector Score float64 `json:"score"` } -type upsertReq struct { - Vectors []vector `json:"vectors"` - Namespace string `json:"namespace,omitempty"` -} - -type upsertResp struct { - RecordsUpserted int64 `json:"upsertedCount"` -} - type Document struct { Text string `json:"text"` } diff --git a/pkg/component/data/pinecone/v0/task_batch_upsert.go b/pkg/component/data/pinecone/v0/task_batch_upsert.go index 390f5c46b..d66c8a2e0 100644 --- a/pkg/component/data/pinecone/v0/task_batch_upsert.go +++ b/pkg/component/data/pinecone/v0/task_batch_upsert.go @@ -17,13 +17,16 @@ func (e *execution) batchUpsert(ctx context.Context, job *base.Job) error { } resp := upsertResp{} - _, err := newIndexClient(e.Setup, e.GetLogger()). + body, err := input.asRequest() + if err != nil { + job.Error.Error(ctx, err) + return err + } + + _, err = newIndexClient(e.Setup, e.GetLogger()). R(). SetResult(&resp). - SetBody(upsertReq{ - Vectors: input.Vectors, - Namespace: input.Namespace, - }). + SetBody(body). Post(upsertPath) if err != nil { err = httpclient.WrapURLError(fmt.Errorf("upserting vectors: %w", err)) diff --git a/pkg/component/data/pinecone/v0/task_upsert.go b/pkg/component/data/pinecone/v0/task_upsert.go index bdf7058ad..419c58c99 100644 --- a/pkg/component/data/pinecone/v0/task_upsert.go +++ b/pkg/component/data/pinecone/v0/task_upsert.go @@ -4,10 +4,21 @@ import ( "context" "fmt" + "github.com/pinecone-io/go-pinecone/pinecone" + "github.com/instill-ai/pipeline-backend/pkg/component/base" "github.com/instill-ai/pipeline-backend/pkg/component/internal/util/httpclient" ) +type upsertReq struct { + Vectors []*pinecone.Vector `json:"vectors"` + Namespace string `json:"namespace,omitempty"` +} + +type upsertResp struct { + RecordsUpserted int64 `json:"upsertedCount"` +} + func (e *execution) upsert(ctx context.Context, job *base.Job) error { input := taskUpsertInput{} if err := job.Input.ReadData(ctx, &input); err != nil { @@ -16,14 +27,18 @@ func (e *execution) upsert(ctx context.Context, job *base.Job) error { return err } - req := newIndexClient(e.Setup, e.GetLogger()).R() - resp := upsertResp{} - req.SetResult(&resp).SetBody(upsertReq{ - Vectors: []vector{input.vector}, - Namespace: input.Namespace, - }) - if _, err := req.Post(upsertPath); err != nil { + body, err := input.asRequest() + if err != nil { + job.Error.Error(ctx, err) + return err + } + _, err = newIndexClient(e.Setup, e.GetLogger()). + R(). + SetResult(&resp). + SetBody(body). + Post(upsertPath) + if err != nil { err = httpclient.WrapURLError(fmt.Errorf("upserting vectors: %w", err)) job.Error.Error(ctx, err) return err