Skip to content

Commit

Permalink
refactor(pipeline): update start operator flows (#290)
Browse files Browse the repository at this point in the history
Because

- we'd like to refactor start operator so that it has a JSON-schema
structure similar to other components.
- we should use json encoded string in form-data

This commit

- refactor start operator to use `instillAcceptFormats`
- update corresponding OpenAPI generator
  • Loading branch information
donch1989 authored Nov 6, 2023
1 parent f8dac09 commit 205296e
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 239 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ require (
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0
github.com/iancoleman/strcase v0.2.0
github.com/influxdata/influxdb-client-go/v2 v2.12.3
github.com/instill-ai/component v0.6.1-alpha.0.20231106092458-e4a7b537a08e
github.com/instill-ai/operator v0.3.0-alpha.0.20231106092646-9d0caa7af524
github.com/instill-ai/component v0.6.1-alpha.0.20231106153938-6032b6ce48ed
github.com/instill-ai/operator v0.3.0-alpha.0.20231106160006-74980c141828
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20231019202606-71607ddcd93f
github.com/instill-ai/usage-client v0.2.4-alpha.0.20231019203021-70410a0a8061
github.com/instill-ai/x v0.3.0-alpha
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1112,10 +1112,10 @@ github.com/influxdata/influxdb-client-go/v2 v2.12.3 h1:28nRlNMRIV4QbtIUvxhWqaxn0
github.com/influxdata/influxdb-client-go/v2 v2.12.3/go.mod h1:IrrLUbCjjfkmRuaCiGQg4m2GbkaeJDcuWoxiWdQEbA0=
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7wlPfJLvMCdtV4zPulc4uCPrlywQOmbFOhgQNU=
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo=
github.com/instill-ai/component v0.6.1-alpha.0.20231106092458-e4a7b537a08e h1:fEKsQ9ZCGoWLkLm6BBArFUj4zcMU6ngTFj3eNywSLrI=
github.com/instill-ai/component v0.6.1-alpha.0.20231106092458-e4a7b537a08e/go.mod h1:aQfeKtOn74cRQJOxvua1mQGpecasDaD+l46C60ILgdA=
github.com/instill-ai/operator v0.3.0-alpha.0.20231106092646-9d0caa7af524 h1:eOA47/Glmy6LO+ULgswfmoQ1IZ6Y4wV7GagDTL4v9lA=
github.com/instill-ai/operator v0.3.0-alpha.0.20231106092646-9d0caa7af524/go.mod h1:s2cWvyAzgrxj5m2S5A3p152wjtfrxhEeDoy7FI9fmoA=
github.com/instill-ai/component v0.6.1-alpha.0.20231106153938-6032b6ce48ed h1:cZfbReRzNeNdMEIFNs7oW+6ewxH87GtHGlEkLdDAEqU=
github.com/instill-ai/component v0.6.1-alpha.0.20231106153938-6032b6ce48ed/go.mod h1:aQfeKtOn74cRQJOxvua1mQGpecasDaD+l46C60ILgdA=
github.com/instill-ai/operator v0.3.0-alpha.0.20231106160006-74980c141828 h1:IUK+jDlVvAN6rcr3bmZ5/QtM4NdQZMA/3uT2k2WtQmE=
github.com/instill-ai/operator v0.3.0-alpha.0.20231106160006-74980c141828/go.mod h1:tRa5+YWM4Cl1yryETvehUN3s4AiFQRbZJQigp0Fk9Rs=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20231019202606-71607ddcd93f h1:hweU93u6qsg8GH/YSogOfa+wOZEnkilGsijcy1xKX7E=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20231019202606-71607ddcd93f/go.mod h1:q/YL5TZXD9nvmJ7Rih4gY3/B2HT2+GiFdxeZp9D+yE4=
github.com/instill-ai/usage-client v0.2.4-alpha.0.20231019203021-70410a0a8061 h1:lOp2fORCj76/gfPuLqB3TEN2cvFRJHUQOxwFSl4qxEw=
Expand Down
9 changes: 6 additions & 3 deletions integration-test/const.js
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ export const simpleRecipe = {
metadata: {
input: {
title: "Input",
type: "text"
type: "string",
instillAcceptFormats: ["string"]
}
}
}
Expand Down Expand Up @@ -150,7 +151,8 @@ export const simpleRecipeWithoutCSV = {
metadata: {
input: {
title: "Input",
type: "text"
type: "string",
instillAcceptFormats: ["string"]
}
}
}
Expand Down Expand Up @@ -184,7 +186,8 @@ export const simpleRecipeDupId = {
metadata: {
input: {
title: "Input",
type: "text"
type: "string",
instillAcceptFormats: ["string"]
}
}
}
Expand Down
28 changes: 19 additions & 9 deletions pkg/handler/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package handler
import (
"context"
"encoding/base64"
"encoding/json"
"io"
"net/http"
"strconv"
Expand Down Expand Up @@ -71,10 +72,19 @@ func convertFormData(ctx context.Context, mux *runtime.ServeMux, req *http.Reque
if _, ok := inputsMap[inputIdx][key]; !ok {
inputsMap[inputIdx][key] = map[int]interface{}{}
}

inputsMap[inputIdx][key].(map[int]interface{})[keyIdx] = v[0]
var b interface{}
unmarshalErr := json.Unmarshal([]byte(v[0]), &b)
if unmarshalErr != nil {
return nil, unmarshalErr
}
inputsMap[inputIdx][key].(map[int]interface{})[keyIdx] = b
} else {
inputsMap[inputIdx][key] = v[0]
var b interface{}
unmarshalErr := json.Unmarshal([]byte(v[0]), &b)
if unmarshalErr != nil {
return nil, unmarshalErr
}
inputsMap[inputIdx][key] = b
}

}
Expand Down Expand Up @@ -144,12 +154,6 @@ func convertFormData(ctx context.Context, mux *runtime.ServeMux, req *http.Reque
for key, value := range inputValue {

switch value := value.(type) {
case string:
structVal, err := structpb.NewValue(value)
if err != nil {
return nil, err
}
inputs[inputIdx].GetFields()[key] = structVal
case map[int]interface{}:
maxItemIdx := 0
for itemIdx := range value {
Expand All @@ -169,6 +173,12 @@ func convertFormData(ctx context.Context, mux *runtime.ServeMux, req *http.Reque

inputs[inputIdx].GetFields()[key] = structpb.NewListValue(structVal)

default:
structVal, err := structpb.NewValue(value)
if err != nil {
return nil, err
}
inputs[inputIdx].GetFields()[key] = structVal
}

}
Expand Down
46 changes: 1 addition & 45 deletions pkg/service/openapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,51 +165,7 @@ func (s *service) GenerateOpenApiSpec(startCompOrigin *pipelinePB.Component, end

startComp := proto.Clone(startCompOrigin).(*pipelinePB.Component)
for k, v := range startComp.Configuration.Fields["metadata"].GetStructValue().Fields {
var m *structpb.Value
attrType := ""
arrType := ""
switch t := v.GetStructValue().Fields["type"].GetStringValue(); t {
case "integer", "number", "boolean":
attrType = t
case "text", "image", "audio", "video":
attrType = "string"
default:
attrType = "array"
switch t {
case "integer_array", "number_array", "boolean_array":
arrType = strings.Split(t, "_")[0]
case "text_array", "image_array", "audio_array", "video_array":
arrType = "string"
}

}
if attrType != "array" {
m, err = structpb.NewValue(map[string]interface{}{
"title": v.GetStructValue().Fields["title"].GetStringValue(),
"description": v.GetStructValue().Fields["description"].GetStringValue(),
"type": attrType,
"instillFormat": v.GetStructValue().Fields["type"].GetStringValue(),
})
if err != nil {
success = false
}
} else {
m, err = structpb.NewValue(map[string]interface{}{
"title": v.GetStructValue().Fields["title"].GetStringValue(),
"description": v.GetStructValue().Fields["description"].GetStringValue(),
"type": attrType,
"items": map[string]interface{}{
"type": arrType,
"instillFormat": strings.Split(v.GetStructValue().Fields["type"].GetStringValue(), "_")[0],
},
})
if err != nil {
success = false
}
}

openApiInput.Fields["properties"].GetStructValue().Fields[k] = m

openApiInput.Fields["properties"].GetStructValue().Fields[k] = v
}

templateWalk = template.GetFields()["paths"]
Expand Down
155 changes: 25 additions & 130 deletions pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"encoding/json"
"fmt"
"math/rand"
"strconv"
"strings"
"time"

Expand All @@ -14,6 +13,7 @@ import (
"github.com/gofrs/uuid"
"github.com/gogo/status"
"github.com/influxdata/influxdb-client-go/v2/api"
"github.com/santhosh-tekuri/jsonschema/v5"
"go.einride.tech/aip/filtering"
"go.temporal.io/api/enums/v1"
"go.temporal.io/sdk/client"
Expand Down Expand Up @@ -566,143 +566,38 @@ func (s *service) UpdateUserPipelineIDByID(ctx context.Context, ns resource.Name

func (s *service) preTriggerPipeline(recipe *datamodel.Recipe, pipelineInputs []*structpb.Struct) error {

typeMap := map[string]string{}
var metadata []byte
var err error
for _, comp := range recipe.Components {
if comp.DefinitionName == "operator-definitions/op-start" {
for key, value := range comp.Configuration.Fields["metadata"].GetStructValue().Fields {
typeMap[key] = value.GetStructValue().Fields["type"].GetStringValue()
schStruct := &structpb.Struct{Fields: make(map[string]*structpb.Value)}
schStruct.Fields["type"] = structpb.NewStringValue("object")
schStruct.Fields["properties"] = structpb.NewStructValue(comp.Configuration.Fields["metadata"].GetStructValue())
metadata, err = protojson.Marshal(schStruct)
if err != nil {
return err
}
}
}
for idx := range pipelineInputs {
for key, val := range pipelineInputs[idx].Fields {
switch typeMap[key] {
case "integer":
switch val.AsInterface().(type) {
case string:
v, err := strconv.ParseInt(val.GetStringValue(), 10, 64)
if err != nil {
return err
}
pipelineInputs[idx].Fields[key] = structpb.NewNumberValue(float64(v))
default:
pipelineInputs[idx].Fields[key] = structpb.NewNumberValue(val.GetNumberValue())
}

case "number":
switch val.AsInterface().(type) {
case string:
v, err := strconv.ParseFloat(val.GetStringValue(), 64)
if err != nil {
return err
}
pipelineInputs[idx].Fields[key] = structpb.NewNumberValue(v)
default:
pipelineInputs[idx].Fields[key] = structpb.NewNumberValue(val.GetNumberValue())
}

case "boolean":
switch val.AsInterface().(type) {
case string:
v, err := strconv.ParseBool(val.GetStringValue())
if err != nil {
return err
}
pipelineInputs[idx].Fields[key] = structpb.NewBoolValue(v)
default:
pipelineInputs[idx].Fields[key] = structpb.NewBoolValue(val.GetBoolValue())
}

case "text":
case "image", "audio", "video":
dataSplits := strings.Split(pipelineInputs[idx].Fields[key].GetStringValue(), ",")
if len(dataSplits) > 1 {
pipelineInputs[idx].Fields[key] = structpb.NewStringValue(dataSplits[1])
}

case "integer_array", "number_array", "boolean_array", "text_array", "image_array", "audio_array", "video_array":
if val.GetListValue() == nil {
return fmt.Errorf("%s should be a array", key)
}
sch, err := jsonschema.CompileString("", string(metadata))
sch.Location = ""
if err != nil {
return err
}

switch typeMap[key] {
case "integer_array":
vals := []interface{}{}
for _, val := range val.GetListValue().AsSlice() {
switch val := val.(type) {
case string:
n, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return err
}
vals = append(vals, n)
default:
vals = append(vals, val)
}
}
structVal, err := structpb.NewList(vals)
if err != nil {
return err
}
pipelineInputs[idx].Fields[key] = structpb.NewListValue(structVal)

case "number_array":
vals := []interface{}{}
for _, val := range val.GetListValue().AsSlice() {
switch val := val.(type) {
case string:
n, err := strconv.ParseFloat(val, 64)
if err != nil {
return err
}
vals = append(vals, n)
default:
vals = append(vals, val)
}
}
structVal, err := structpb.NewList(vals)
if err != nil {
return err
}
pipelineInputs[idx].Fields[key] = structpb.NewListValue(structVal)
case "boolean_array":
vals := []interface{}{}
for _, val := range val.GetListValue().AsSlice() {
switch val := val.(type) {
case string:
n, err := strconv.ParseBool(val)
if err != nil {
return err
}
vals = append(vals, n)
default:
vals = append(vals, val)
}
}
structVal, err := structpb.NewList(vals)
if err != nil {
return err
}
pipelineInputs[idx].Fields[key] = structpb.NewListValue(structVal)

case "image_array", "audio_array", "video_array":
vals := []interface{}{}
for _, val := range val.GetListValue().AsSlice() {
dataSplits := strings.Split(val.(string), ",")
if len(dataSplits) > 1 {
vals = append(vals, dataSplits[1])
} else {
vals = append(vals, dataSplits[0])
}
}
structVal, err := structpb.NewList(vals)
if err != nil {
return err
}
pipelineInputs[idx].Fields[key] = structpb.NewListValue(structVal)
}
}
for _, pipelineInput := range pipelineInputs {
b, err := protojson.Marshal(pipelineInput)
if err != nil {
return err
}
var v interface{}
if err := json.Unmarshal(b, &v); err != nil {
return err
}

if err := sch.Validate(v); err != nil {
return err
}
}

Expand Down
Loading

0 comments on commit 205296e

Please sign in to comment.