diff --git a/go.mod b/go.mod index 6f69d85ae..8770765be 100644 --- a/go.mod +++ b/go.mod @@ -99,6 +99,12 @@ require ( gorm.io/plugin/dbresolver v1.5.1 ) +require ( + github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect + github.com/oapi-codegen/runtime v1.1.1 // indirect + github.com/pinecone-io/go-pinecone v1.1.1 +) + require ( github.com/machinebox/graphql v0.2.2 github.com/matryer/is v1.4.1 // indirect diff --git a/go.sum b/go.sum index 15c7e9ffa..4b7ddd0cd 100644 --- a/go.sum +++ b/go.sum @@ -481,6 +481,7 @@ github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbt github.com/PuerkitoBio/urlesc v0.0.0-20160726150825-5bd2802263f2/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE= github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 h1:d+Bc7a5rLufV/sSk/8dngufqelfh6jnri85riMAaF/M= github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE= +github.com/RaveNoX/go-jsoncommentstrip v1.0.0/go.mod h1:78ihd09MekBnJnxpICcwzCMzGrKSKYe4AqU6PDYYpjk= github.com/Shopify/logrus-bugsnag v0.0.0-20171204204709-577dee27f20d/go.mod h1:HI8ITrYtUY+O+ZhtlqUnD8+KwNPOyugEhfP9fdUIaEQ= github.com/advancedlogic/GoOse v0.0.0-20191112112754-e742535969c1 h1:d0Ct1dZwgwMO0Llf81Eu+Lyj6kwqXdqHP/WsSkEria0= github.com/advancedlogic/GoOse v0.0.0-20191112112754-e742535969c1/go.mod h1:f3HCSN1fBWjcpGtXyM119MJgeQl838v6so/PQOqvE1w= @@ -516,6 +517,8 @@ github.com/apache/arrow/go/arrow v0.0.0-20210818145353-234c94e4ce64/go.mod h1:2q github.com/apache/arrow/go/arrow v0.0.0-20211013220434-5962184e7a30/go.mod h1:Q7yQnSMnLvcXlZ8RV+jwz/6y1rQTqbX6C82SndT52Zs= github.com/apache/arrow/go/v15 v15.0.2 h1:60IliRbiyTWCWjERBCkO1W4Qun9svcYoZrSLcyOsMLE= github.com/apache/arrow/go/v15 v15.0.2/go.mod h1:DGXsR3ajT524njufqf95822i+KTh+yea1jass9YXgjA= +github.com/apapsch/go-jsonmerge/v2 v2.0.0 h1:axGnT1gRIfimI7gJifB699GoE/oq+F2MU7Dml6nw9rQ= +github.com/apapsch/go-jsonmerge/v2 v2.0.0/go.mod h1:lvDnEdqiQrp0O42VQGgmlKpxL1AP2+08jFMw88y4klk= github.com/araddon/dateparse v0.0.0-20180729174819-cfd92a431d0e/go.mod h1:SLqhdZcd+dF3TEVL2RMoob5bBP5R1P1qkox+HtCBgGI= github.com/araddon/dateparse v0.0.0-20200409225146-d820a6159ab1 h1:TEBmxO80TM04L8IuMWk77SGL1HomBmKTdzdJLLWznxI= github.com/araddon/dateparse v0.0.0-20200409225146-d820a6159ab1/go.mod h1:SLqhdZcd+dF3TEVL2RMoob5bBP5R1P1qkox+HtCBgGI= @@ -579,6 +582,7 @@ github.com/bkaradzic/go-lz4 v1.0.0/go.mod h1:0YdlkowM3VswSROI7qDxhRvJ3sLhlFrRRwj github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84= github.com/blang/semver v3.1.0+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk= github.com/blang/semver v3.5.1+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk= +github.com/bmatcuk/doublestar v1.1.1/go.mod h1:UD6OnuiIn0yFxxA2le/rnRU1G4RaI4UvFv1sNto9p6w= github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= github.com/boombuler/barcode v1.0.0/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8= github.com/bshuster-repo/logrus-logstash-hook v0.4.1/go.mod h1:zsTqEiSzDgAa/8GZR7E1qaXrhYNDKBYy5/dWPTIflbk= @@ -1391,6 +1395,7 @@ github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1 github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= +github.com/juju/gnuflag v0.0.0-20171113085948-2ce1bb71843d/go.mod h1:2PavIy+JPciBPrBUjwbNvtwB6RQlve+hkpll6QSNmOE= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/jung-kurt/gofpdf v1.0.0/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes= @@ -1604,6 +1609,8 @@ github.com/npillmayer/nestext v0.1.3/go.mod h1:h2lrijH8jpicr25dFY+oAJLyzlya6jhnu github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= +github.com/oapi-codegen/runtime v1.1.1 h1:EXLHh0DXIJnWhdRPN2w4MXAzFyE4CskzhNLUmtpMYro= +github.com/oapi-codegen/runtime v1.1.1/go.mod h1:SK9X900oXmPWilYR5/WKPzt3Kqxn/uS/+lbpREv+eCg= github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA= github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= @@ -1692,6 +1699,8 @@ github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi github.com/pierrec/lz4/v4 v4.1.8/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ= github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pinecone-io/go-pinecone v1.1.1 h1:pKoIiYcBIbrR7gaq0JXPiVnNEtevFYeq/AYL7T0NbbE= +github.com/pinecone-io/go-pinecone v1.1.1/go.mod h1:KfJhn4yThX293+fbtrZLnxe2PJYo8557Py062W4FYKk= github.com/pkg/browser v0.0.0-20210706143420-7d21f8c997e2/go.mod h1:HKlIX3XHQyzLZPlr7++PzdhaXEj94dEiJgZDTsxEqUI= github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8/go.mod h1:HKlIX3XHQyzLZPlr7++PzdhaXEj94dEiJgZDTsxEqUI= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= @@ -1843,6 +1852,7 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/spf13/viper v1.4.0/go.mod h1:PTJ7Z/lr49W6bUbkmS1V3by4uWynFiR9p7+dSq/yZzE= github.com/spf13/viper v1.7.0/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg= +github.com/spkg/bom v0.0.0-20160624110644-59b7046e48ad/go.mod h1:qLr4V1qq6nMqFKkMo8ZTx3f+BZEkzsRUY10Xsm2mwU0= github.com/ssor/bom v0.0.0-20170718123548-6386211fdfcf h1:pvbZ0lM0XWPBqUKqFU8cmavspvIl9nulOYwdy6IFRRo= github.com/ssor/bom v0.0.0-20170718123548-6386211fdfcf/go.mod h1:RJID2RhlZKId02nZ62WenDCkgHFerpIOmW0iT7GKmXM= github.com/stefanberger/go-pkcs11uri v0.0.0-20201008174630-78d3cae3a980/go.mod h1:AO3tvPzVZ/ayst6UlUKUv6rcPQInYe3IknH3jYhAKu8= diff --git a/pkg/component/data/pinecone/v0/README.mdx b/pkg/component/data/pinecone/v0/README.mdx index 778b87091..d2f7461a7 100644 --- a/pkg/component/data/pinecone/v0/README.mdx +++ b/pkg/component/data/pinecone/v0/README.mdx @@ -9,6 +9,7 @@ The Pinecone component is a data component that allows users to build and search It can carry out the following tasks: - [Query](#query) - [Upsert](#upsert) +- [Batch Upsert](#batch-upsert) - [Rerank](#rerank) @@ -102,7 +103,7 @@ Retrieve the ids of the most similar items in a namespace, along with their simi ### Upsert -Writes vectors into a namespace. If a new value is upserted for an existing vector id, it will overwrite the previous value. +Writes vectors into a namespace. If a new value is upserted for an existing vector id, it will overwrite the previous value. This task will be soon replaced by `TASK_BATCH_UPSERT`, which extends its functionality.
@@ -120,6 +121,47 @@ Writes vectors into a namespace. If a new value is upserted for an existing vect +
+ +| Output | ID | Type | Description | +| :--- | :--- | :--- | :--- | +| Upserted Count | `upserted-count` | integer | Number of records modified or added. | +
+ + +### Batch Upsert + +Writes vectors into a namespace. If a new value is upserted for an existing vector ID, it will overwrite the previous value. + +
+ +| Input | ID | Type | Description | +| :--- | :--- | :--- | :--- | +| Task ID (required) | `task` | string | `TASK_BATCH_UPSERT` | +| [Vectors](#batch-upsert-vectors) (required) | `vectors` | array[object] | Array of vectors to upsert | +| Namespace | `namespace` | string | The namespace to query. | +
+ + +
+ Input Objects in Batch Upsert + +

Vectors

+ +Array of vectors to upsert + +
+ +| Field | Field ID | Type | Note | +| :--- | :--- | :--- | :--- | +| ID | `id` | string | The unique ID of the vector. | +| Metadata | `metadata` | object | The vector metadata. This is a set of key-value pairs that can be used to store additional information about the vector. The values can have the following types: string, number, boolean, or array of strings. | +| Values | `values` | array | An array of dimensions for the vector to be saved. | +
+
+ + +
| Output | ID | Type | Description | diff --git a/pkg/component/data/pinecone/v0/component_test.go b/pkg/component/data/pinecone/v0/component_test.go index ab08fa516..ccac2d918 100644 --- a/pkg/component/data/pinecone/v0/component_test.go +++ b/pkg/component/data/pinecone/v0/component_test.go @@ -9,6 +9,7 @@ import ( "net/http/httptest" "testing" + "github.com/pinecone-io/go-pinecone/pinecone" "google.golang.org/protobuf/types/known/structpb" qt "github.com/frankban/quicktest" @@ -16,6 +17,8 @@ import ( "github.com/instill-ai/pipeline-backend/pkg/component/base" "github.com/instill-ai/pipeline-backend/pkg/component/internal/mock" "github.com/instill-ai/pipeline-backend/pkg/component/internal/util/httpclient" + "github.com/instill-ai/pipeline-backend/pkg/data" + "github.com/instill-ai/pipeline-backend/pkg/data/format" "github.com/instill-ai/x/errmsg" ) @@ -24,7 +27,7 @@ const ( namespace = "pantone" threshold = 0.9 - upsertOK = `{"upsertedCount": 1}` + upsertOK = `{"upsertedCount": 2}` queryOK = ` { @@ -46,23 +49,29 @@ const ( }` errResp = ` -{ - "code": 3, - "message": "Cannot provide both ID and vector at the same time.", - "details": [] -}` + + { + "code": 3, + "message": "Cannot provide both ID and vector at the same time.", + "details": [] + }` ) +func newValue(in any) format.Value { + v, _ := data.NewValue(in) + return v +} + var ( vectorA = vector{ ID: "A", - Values: []float64{2.23}, - Metadata: map[string]any{"color": "pumpkin"}, + Values: []float32{2.23}, + Metadata: map[string]format.Value{"color": newValue("pumpkin")}, } vectorB = vector{ ID: "B", - Values: []float64{3.32}, - Metadata: map[string]any{"color": "cerulean"}, + Values: []float32{3.32}, + Metadata: map[string]format.Value{"color": newValue("cerulean")}, } queryByVector = queryInput{ Namespace: "color-schemes", @@ -94,6 +103,11 @@ func TestComponent_Execute(t *testing.T) { c := qt.New(t) ctx := context.Background() + pvA, err := vectorA.toPinecone() + c.Assert(err, qt.IsNil) + pvB, err := vectorB.toPinecone() + c.Assert(err, qt.IsNil) + testcases := []struct { name string @@ -108,15 +122,15 @@ func TestComponent_Execute(t *testing.T) { { name: "ok - upsert", - task: taskUpsert, - execIn: upsertInput{ - vector: vectorA, + task: taskBatchUpsert, + execIn: taskBatchUpsertInput{ + Vectors: []vector{vectorA, vectorB}, Namespace: namespace, }, - wantExec: upsertOutput{RecordsUpserted: 1}, + wantExec: taskUpsertOutput{UpsertedCount: 2}, wantClientPath: upsertPath, - wantClientReq: upsertReq{Vectors: []vector{vectorA}, Namespace: namespace}, + wantClientReq: upsertReq{Vectors: []*pinecone.Vector{pvA, pvB}, Namespace: namespace}, clientResp: upsertOK, }, { @@ -128,11 +142,11 @@ func TestComponent_Execute(t *testing.T) { Namespace: "color-schemes", Matches: []match{ { - vector: vectorA, + Vector: pvA, Score: 0.99, }, { - vector: vectorB, + Vector: pvB, Score: 0.87, }, }, @@ -151,7 +165,7 @@ func TestComponent_Execute(t *testing.T) { Namespace: "color-schemes", Matches: []match{ { - vector: vectorA, + Vector: pvA, Score: 0.99, }, }, @@ -170,11 +184,11 @@ func TestComponent_Execute(t *testing.T) { Namespace: "color-schemes", Matches: []match{ { - vector: vectorA, + Vector: pvA, Score: 0.99, }, { - vector: vectorB, + Vector: pvB, Score: 0.87, }, }, @@ -234,22 +248,39 @@ func TestComponent_Execute(t *testing.T) { }) c.Assert(err, qt.IsNil) - pbIn, err := base.ConvertToStructpb(tc.execIn) - c.Assert(err, qt.IsNil) + wantJSON, err := json.Marshal(tc.wantExec) ir, ow, eh, job := mock.GenerateMockJob(c) - ir.ReadMock.Return(pbIn, nil) - ow.WriteMock.Optional().Set(func(ctx context.Context, output *structpb.Struct) (err error) { - wantJSON, err := json.Marshal(tc.wantExec) + c.Assert(err, qt.IsNil) + + switch tc.task { + case taskBatchUpsert: + ir.ReadDataMock.Set(func(ctx context.Context, in any) error { + switch in := in.(type) { + case *taskBatchUpsertInput: + *in = tc.execIn.(taskBatchUpsertInput) + } + return nil + }) + + ow.WriteDataMock.Optional().Set(func(ctx context.Context, output any) error { + c.Check(wantJSON, qt.JSONEquals, output) + return nil + }) + default: + pbIn, err := base.ConvertToStructpb(tc.execIn) c.Assert(err, qt.IsNil) - c.Check(wantJSON, qt.JSONEquals, output.AsMap()) - return nil - }) - eh.ErrorMock.Optional() + ir.ReadMock.Return(pbIn, nil) + ow.WriteMock.Optional().Set(func(ctx context.Context, output *structpb.Struct) (err error) { + c.Check(wantJSON, qt.JSONEquals, output.AsMap()) + return nil + }) + } + + eh.ErrorMock.Optional() err = exec.Execute(ctx, []*base.Job{job}) c.Assert(err, qt.IsNil) - }) } @@ -270,7 +301,7 @@ func TestComponent_Execute(t *testing.T) { exec, err := cmp.CreateExecution(base.ComponentExecution{ Component: cmp, Setup: setup, - Task: taskUpsert, + Task: taskQuery, }) c.Assert(err, qt.IsNil) @@ -296,7 +327,7 @@ func TestComponent_Execute(t *testing.T) { exec, err := cmp.CreateExecution(base.ComponentExecution{ Component: cmp, Setup: setup, - Task: taskUpsert, + Task: taskQuery, }) c.Assert(err, qt.IsNil) diff --git a/pkg/component/data/pinecone/v0/config/definition.json b/pkg/component/data/pinecone/v0/config/definition.json index 947638a77..a35bd3595 100644 --- a/pkg/component/data/pinecone/v0/config/definition.json +++ b/pkg/component/data/pinecone/v0/config/definition.json @@ -2,6 +2,7 @@ "availableTasks": [ "TASK_QUERY", "TASK_UPSERT", + "TASK_BATCH_UPSERT", "TASK_RERANK" ], "custom": false, diff --git a/pkg/component/data/pinecone/v0/config/tasks.json b/pkg/component/data/pinecone/v0/config/tasks.json index d5fb839b6..f9500621a 100644 --- a/pkg/component/data/pinecone/v0/config/tasks.json +++ b/pkg/component/data/pinecone/v0/config/tasks.json @@ -201,6 +201,7 @@ }, "TASK_UPSERT": { "instillShortDescription": "Writes vectors into a namespace. If a new value is upserted for an existing vector id, it will overwrite the previous value.", + "description": "Writes vectors into a namespace. If a new value is upserted for an existing vector id, it will overwrite the previous value. This task will be soon replaced by `TASK_BATCH_UPSERT`, which extends its functionality.", "input": { "instillUIOrder": 0, "properties": { @@ -292,6 +293,113 @@ "type": "object" } }, + "TASK_BATCH_UPSERT": { + "instillShortDescription": "Writes vectors into a namespace. If a new value is upserted for an existing vector ID, it will overwrite the previous value.", + "input": { + "instillUIOrder": 0, + "properties": { + "vectors": { + "description": "Array of vectors to upsert", + "instillUIOrder": 0, + "type": "array", + "items": { + "type": "object", + "properties": { + "id": { + "description": "The unique ID of the vector.", + "instillUIOrder": 0, + "instillAcceptFormats": [ + "string" + ], + "instillUpstreamTypes": [ + "value", + "reference", + "template" + ], + "title": "ID", + "type": "string" + }, + "metadata": { + "description": "The vector metadata. This is a set of key-value pairs that can be used to store additional information about the vector. The values can have the following types: string, number, boolean, or array of strings.", + "instillUIOrder": 1, + "instillAcceptFormats": [ + "semi-structured/object" + ], + "instillShortDescription": "The vector metadata", + "instillUpstreamTypes": [ + "reference" + ], + "required": [], + "title": "Metadata", + "type": "object" + }, + "values": { + "description": "An array of dimensions for the vector to be saved.", + "instillUIOrder": 2, + "instillAcceptFormats": [ + "array:number", + "array:integer" + ], + "instillUpstreamTypes": [ + "reference" + ], + "items": { + "description": "A dimension of the vector.", + "example": 0.8167237, + "type": "number" + }, + "minItems": 1, + "title": "Values", + "type": "array" + } + }, + "required": [ + "id", + "values" + ] + }, + "minItems": 1, + "title": "Vectors" + }, + "namespace": { + "description": "The namespace to query.", + "instillAcceptFormats": [ + "string" + ], + "instillUIOrder": 1, + "instillUpstreamTypes": [ + "value", + "reference", + "template" + ], + "title": "Namespace", + "type": "string" + } + }, + "required": [ + "vectors" + ], + "title": "Input", + "type": "object" + }, + "output": { + "instillUIOrder": 0, + "properties": { + "upserted-count": { + "description": "Number of records modified or added.", + "instillFormat": "integer", + "instillUIOrder": 0, + "title": "Upserted Count", + "type": "integer" + } + }, + "required": [ + "upserted-count" + ], + "title": "Output", + "type": "object" + } + }, "TASK_RERANK": { "instillShortDescription": "Rerank documents, such as text passages, according to their relevance to a query.", "description": "Rerank documents, such as text passages, according to their relevance to a query. The input is a list of documents and a query. The output is a list of documents, sorted by relevance to the query.", diff --git a/pkg/component/data/pinecone/v0/io.go b/pkg/component/data/pinecone/v0/io.go new file mode 100644 index 000000000..90a4676c7 --- /dev/null +++ b/pkg/component/data/pinecone/v0/io.go @@ -0,0 +1,74 @@ +// TODO: TASK_QUERY and TASK_RERANK are not refactored yet, they will be +// addressed in INS-7102. +package pinecone + +import ( + "fmt" + + "github.com/pinecone-io/go-pinecone/pinecone" + + "github.com/instill-ai/pipeline-backend/pkg/data" +) + +type vector struct { + ID string `instill:"id"` + Values []float32 `instill:"values"` + Metadata data.Map `instill:"metadata"` +} + +func (v vector) toPinecone() (*pinecone.Vector, error) { + metadata, err := v.Metadata.ToStructValue() + if err != nil { + return nil, fmt.Errorf("converting input metadata to request: %w", err) + } + + return &pinecone.Vector{ + Id: v.ID, + Values: v.Values, + Metadata: metadata.GetStructValue(), + }, nil +} + +type taskUpsertInput struct { + vector + Namespace string `instill:"namespace"` +} + +func (in *taskUpsertInput) asRequest() (*upsertReq, error) { + pv, err := in.vector.toPinecone() + if err != nil { + return nil, err + } + + return &upsertReq{ + Vectors: []*pinecone.Vector{pv}, + Namespace: in.Namespace, + }, nil +} + +type taskBatchUpsertInput struct { + Vectors []vector `instill:"vectors"` + Namespace string `instill:"namespace"` +} + +func (in *taskBatchUpsertInput) asRequest() (*upsertReq, error) { + req := &upsertReq{ + Vectors: make([]*pinecone.Vector, 0, len(in.Vectors)), + Namespace: in.Namespace, + } + + for _, v := range in.Vectors { + pv, err := v.toPinecone() + if err != nil { + return nil, err + } + + req.Vectors = append(req.Vectors, pv) + } + + return req, nil +} + +type taskUpsertOutput struct { + UpsertedCount int64 `instill:"upserted-count"` +} diff --git a/pkg/component/data/pinecone/v0/main.go b/pkg/component/data/pinecone/v0/main.go index 929e0c54a..9a594a7e4 100644 --- a/pkg/component/data/pinecone/v0/main.go +++ b/pkg/component/data/pinecone/v0/main.go @@ -15,9 +15,10 @@ import ( ) const ( - taskQuery = "TASK_QUERY" - taskUpsert = "TASK_UPSERT" - taskRerank = "TASK_RERANK" + taskQuery = "TASK_QUERY" + taskUpsert = "TASK_UPSERT" + taskRerank = "TASK_RERANK" + taskBatchUpsert = "TASK_BATCH_UPSERT" upsertPath = "/vectors/upsert" queryPath = "/query" @@ -42,8 +43,11 @@ type component struct { type execution struct { base.ComponentExecution + + execute func(context.Context, *base.Job) error } +// Init initializes the component and loads the definition, setup, and tasks. func Init(bc base.Component) *component { once.Do(func() { comp = &component{Component: bc} @@ -55,10 +59,25 @@ func Init(bc base.Component) *component { return comp } +// CreateExecution creates a new execution for the component. func (c *component) CreateExecution(x base.ComponentExecution) (base.IExecution, error) { - return &execution{ + e := &execution{ ComponentExecution: x, - }, nil + } + + switch x.Task { + case taskQuery: + e.execute = e.query + case taskRerank: + e.execute = e.rerank + // Now, only upsert task is refactored, the rest will be addressed in ins-7102 + case taskUpsert: + e.execute = e.upsert + case taskBatchUpsert: + e.execute = e.batchUpsert + } + + return e, nil } // newIndexClient creates a new httpclient.Client with the index URL provided in setup @@ -100,6 +119,10 @@ func getURL(setup *structpb.Struct) string { } func (e *execution) Execute(ctx context.Context, jobs []*base.Job) error { + // TODO: We will need to migrate other tasks to use the new logic. + if e.Task == taskUpsert || e.Task == taskBatchUpsert { + return base.ConcurrentExecutor(ctx, jobs, e.execute) + } for _, job := range jobs { input, err := job.Input.Read(ctx) @@ -141,32 +164,6 @@ func (e *execution) Execute(ctx context.Context, jobs []*base.Job) error { job.Error.Error(ctx, err) continue } - case taskUpsert: - req := newIndexClient(e.Setup, e.GetLogger()).R() - - v := upsertInput{} - err := base.ConvertFromStructpb(input, &v) - if err != nil { - job.Error.Error(ctx, err) - continue - } - - resp := upsertResp{} - req.SetResult(&resp).SetBody(upsertReq{ - Vectors: []vector{v.vector}, - Namespace: v.Namespace, - }) - - if _, err := req.Post(upsertPath); err != nil { - job.Error.Error(ctx, httpclient.WrapURLError(err)) - continue - } - - output, err = base.ConvertToStructpb(upsertOutput(resp)) - if err != nil { - job.Error.Error(ctx, err) - continue - } case taskRerank: // rerank task does not need index URL, so using the base client with the default pinecone API URL. req := newBaseClient(e.Setup, e.GetLogger()).R() diff --git a/pkg/component/data/pinecone/v0/structs.go b/pkg/component/data/pinecone/v0/structs.go index 97b0006c9..64a6e02d6 100644 --- a/pkg/component/data/pinecone/v0/structs.go +++ b/pkg/component/data/pinecone/v0/structs.go @@ -1,9 +1,11 @@ package pinecone +import "github.com/pinecone-io/go-pinecone/pinecone" + type queryInput struct { Namespace string `json:"namespace"` TopK int64 `json:"top-k"` - Vector []float64 `json:"vector"` + Vector []float32 `json:"vector"` IncludeValues bool `json:"include-values"` IncludeMetadata bool `json:"include-metadata"` ID string `json:"id"` @@ -14,7 +16,7 @@ type queryInput struct { type queryReq struct { Namespace string `json:"namespace"` TopK int64 `json:"topK"` - Vector []float64 `json:"vector,omitempty"` + Vector []float32 `json:"vector,omitempty"` IncludeValues bool `json:"includeValues"` IncludeMetadata bool `json:"includeMetadata"` ID string `json:"id,omitempty"` @@ -55,34 +57,10 @@ func (r queryResp) filterOutBelowThreshold(th float64) queryResp { } type match struct { - vector + *pinecone.Vector Score float64 `json:"score"` } -type upsertReq struct { - Vectors []vector `json:"vectors"` - Namespace string `json:"namespace,omitempty"` -} - -type upsertInput struct { - vector - Namespace string `json:"namespace"` -} - -type vector struct { - ID string `json:"id"` - Values []float64 `json:"values,omitempty"` - Metadata interface{} `json:"metadata,omitempty"` -} - -type upsertResp struct { - RecordsUpserted int64 `json:"upsertedCount"` -} - -type upsertOutput struct { - RecordsUpserted int64 `json:"upserted-count"` -} - type Document struct { Text string `json:"text"` } diff --git a/pkg/component/data/pinecone/v0/task_batch_upsert.go b/pkg/component/data/pinecone/v0/task_batch_upsert.go new file mode 100644 index 000000000..d66c8a2e0 --- /dev/null +++ b/pkg/component/data/pinecone/v0/task_batch_upsert.go @@ -0,0 +1,47 @@ +package pinecone + +import ( + "context" + "fmt" + + "github.com/instill-ai/pipeline-backend/pkg/component/base" + "github.com/instill-ai/pipeline-backend/pkg/component/internal/util/httpclient" +) + +func (e *execution) batchUpsert(ctx context.Context, job *base.Job) error { + input := taskBatchUpsertInput{} + if err := job.Input.ReadData(ctx, &input); err != nil { + err = fmt.Errorf("reading input data: %w", err) + job.Error.Error(ctx, err) + return err + } + + resp := upsertResp{} + body, err := input.asRequest() + if err != nil { + job.Error.Error(ctx, err) + return err + } + + _, err = newIndexClient(e.Setup, e.GetLogger()). + R(). + SetResult(&resp). + SetBody(body). + Post(upsertPath) + if err != nil { + err = httpclient.WrapURLError(fmt.Errorf("upserting vectors: %w", err)) + job.Error.Error(ctx, err) + return err + } + + if err := job.Output.WriteData(ctx, &taskUpsertOutput{ + UpsertedCount: resp.RecordsUpserted, + }); err != nil { + + err = fmt.Errorf("writing output data: %w", err) + job.Error.Error(ctx, err) + return err + } + + return nil +} diff --git a/pkg/component/data/pinecone/v0/task_query.go b/pkg/component/data/pinecone/v0/task_query.go new file mode 100644 index 000000000..5b71e68cf --- /dev/null +++ b/pkg/component/data/pinecone/v0/task_query.go @@ -0,0 +1,12 @@ +// TODO: Implement the query task, it will be addressed in ins-7102 +package pinecone + +import ( + "context" + + "github.com/instill-ai/pipeline-backend/pkg/component/base" +) + +func (e *execution) query(ctx context.Context, job *base.Job) error { + return nil +} diff --git a/pkg/component/data/pinecone/v0/task_rerank.go b/pkg/component/data/pinecone/v0/task_rerank.go new file mode 100644 index 000000000..4d959deeb --- /dev/null +++ b/pkg/component/data/pinecone/v0/task_rerank.go @@ -0,0 +1,12 @@ +// TODO: Implement the rerank task, it will be addressed in ins-7102 +package pinecone + +import ( + "context" + + "github.com/instill-ai/pipeline-backend/pkg/component/base" +) + +func (e *execution) rerank(ctx context.Context, job *base.Job) error { + return nil +} diff --git a/pkg/component/data/pinecone/v0/task_upsert.go b/pkg/component/data/pinecone/v0/task_upsert.go new file mode 100644 index 000000000..419c58c99 --- /dev/null +++ b/pkg/component/data/pinecone/v0/task_upsert.go @@ -0,0 +1,57 @@ +package pinecone + +import ( + "context" + "fmt" + + "github.com/pinecone-io/go-pinecone/pinecone" + + "github.com/instill-ai/pipeline-backend/pkg/component/base" + "github.com/instill-ai/pipeline-backend/pkg/component/internal/util/httpclient" +) + +type upsertReq struct { + Vectors []*pinecone.Vector `json:"vectors"` + Namespace string `json:"namespace,omitempty"` +} + +type upsertResp struct { + RecordsUpserted int64 `json:"upsertedCount"` +} + +func (e *execution) upsert(ctx context.Context, job *base.Job) error { + input := taskUpsertInput{} + if err := job.Input.ReadData(ctx, &input); err != nil { + err = fmt.Errorf("reading input data: %w", err) + job.Error.Error(ctx, err) + return err + } + + resp := upsertResp{} + body, err := input.asRequest() + if err != nil { + job.Error.Error(ctx, err) + return err + } + _, err = newIndexClient(e.Setup, e.GetLogger()). + R(). + SetResult(&resp). + SetBody(body). + Post(upsertPath) + if err != nil { + err = httpclient.WrapURLError(fmt.Errorf("upserting vectors: %w", err)) + job.Error.Error(ctx, err) + return err + } + + if err := job.Output.WriteData(ctx, &taskUpsertOutput{ + UpsertedCount: resp.RecordsUpserted, + }); err != nil { + + err = fmt.Errorf("writing output data: %w", err) + job.Error.Error(ctx, err) + return err + } + + return nil +}