From eda0b599798f06551da86fb8918eeed89b6120c4 Mon Sep 17 00:00:00 2001 From: "Chang, Hui-Tang" Date: Thu, 9 Nov 2023 18:50:27 +0800 Subject: [PATCH] chore(migration): add migration script for start operator --- cmd/migration/000007_migrate.go | 157 ++++++++++++++++++++++++++ cmd/migration/main.go | 5 + config/config.yaml | 2 +- go.mod | 2 +- go.sum | 4 +- pkg/db/migration/000007_init.down.sql | 0 pkg/db/migration/000007_init.up.sql | 0 pkg/service/service.go | 6 + 8 files changed, 172 insertions(+), 4 deletions(-) create mode 100644 cmd/migration/000007_migrate.go create mode 100644 pkg/db/migration/000007_init.down.sql create mode 100644 pkg/db/migration/000007_init.up.sql diff --git a/cmd/migration/000007_migrate.go b/cmd/migration/000007_migrate.go new file mode 100644 index 000000000..cfe1b7bb3 --- /dev/null +++ b/cmd/migration/000007_migrate.go @@ -0,0 +1,157 @@ +package main + +import ( + "database/sql" + "database/sql/driver" + "encoding/json" + "errors" + "fmt" + "strings" + + "github.com/instill-ai/pipeline-backend/pkg/datamodel" + "google.golang.org/protobuf/types/known/structpb" + "gorm.io/gorm" + + database "github.com/instill-ai/pipeline-backend/pkg/db" +) + +type pipeline07 struct { + datamodel.BaseDynamic + ID string + Owner string + Description sql.NullString + Recipe *recipe07 `gorm:"type:jsonb"` +} + +func (pipeline07) TableName() string { + return "pipeline" +} + +// Recipe is the data model of the pipeline recipe +type recipe07 struct { + Version string `json:"version,omitempty"` + Components []*component07 `json:"components,omitempty"` +} + +type component07 struct { + ID string `json:"id"` + DefinitionName string `json:"definition_name"` + ResourceName string `json:"resource_name"` + Configuration *structpb.Struct `json:"configuration"` +} + +// Scan function for custom GORM type Recipe +func (r *recipe07) Scan(value interface{}) error { + bytes, ok := value.([]byte) + if !ok { + return errors.New(fmt.Sprint("Failed to unmarshal Recipe value:", value)) + } + + if err := json.Unmarshal(bytes, &r); err != nil { + return err + } + + return nil +} + +// Value function for custom GORM type Recipe +func (r *recipe07) Value() (driver.Value, error) { + valueString, err := json.Marshal(r) + return string(valueString), err +} + +func migratePipelineRecipeUp000007() error { + db := database.GetConnection() + defer database.Close(db) + + var items []pipeline07 + result := db.Model(&pipeline07{}) + if result.Error != nil { + return result.Error + } + + rows, err := result.Rows() + if err != nil { + return err + } + + defer rows.Close() + + for rows.Next() { + var item pipeline07 + if err = db.ScanRows(rows, &item); err != nil { + return err + } + items = append(items, item) + + } + + for idx := range items { + fmt.Printf("migrate %s\n", items[idx].UID) + + updateErr := db.Transaction(func(tx *gorm.DB) error { + for idx := range items { + + for compIdx := range items[idx].Recipe.Components { + if items[idx].Recipe.Components[compIdx].ID == "start" { + if metadata, ok := items[idx].Recipe.Components[compIdx].Configuration.Fields["metadata"]; ok { + for k := range metadata.GetStructValue().Fields { + vType := metadata.GetStructValue().Fields[k].GetStructValue().Fields["type"].GetStringValue() + instillFormat := "" + switch vType { + case "number", "integer", "boolean", "string": + instillFormat = vType + case "text": + instillFormat = "string" + vType = "string" + case "audio", "image", "video": + instillFormat = fmt.Sprintf("%s/*", vType) + vType = "string" + + case "text_array": + instillFormat = "array:string" + items := &structpb.Struct{Fields: map[string]*structpb.Value{}} + items.Fields["type"] = structpb.NewStringValue("string") + metadata.GetStructValue().Fields[k].GetStructValue().Fields["items"] = structpb.NewStructValue(items) + vType = "array" + + case "audio_array", "image_array", "video_array": + instillFormat = fmt.Sprintf("array:%s/*", strings.Split(vType, "_")[0]) + items := &structpb.Struct{Fields: map[string]*structpb.Value{}} + items.Fields["type"] = structpb.NewStringValue("string") + metadata.GetStructValue().Fields[k].GetStructValue().Fields["items"] = structpb.NewStructValue(items) + vType = "array" + + case "number_array", "integer_array", "boolean_array": + instillFormat = fmt.Sprintf("array:%s", strings.Split(vType, "_")[0]) + items := &structpb.Struct{Fields: map[string]*structpb.Value{}} + items.Fields["type"] = structpb.NewStringValue(strings.Split(vType, "_")[0]) + metadata.GetStructValue().Fields[k].GetStructValue().Fields["items"] = structpb.NewStructValue(items) + vType = "array" + + } + metadata.GetStructValue().Fields[k].GetStructValue().Fields["type"] = structpb.NewStringValue(vType) + metadata.GetStructValue().Fields[k].GetStructValue().Fields["instillFormat"], _ = structpb.NewValue(instillFormat) + + } + items[idx].Recipe.Components[compIdx].Configuration.Fields["metadata"] = metadata + continue + } + } + } + + result := tx.Unscoped().Model(&datamodel.Pipeline{}).Where("uid = ?", items[idx].UID).Update("recipe", &items[idx].Recipe) + if result.Error != nil { + return result.Error + } + } + return nil + }) + if updateErr != nil { + return updateErr + } + + } + + return nil +} diff --git a/cmd/migration/main.go b/cmd/migration/main.go index e7adb9802..1666facad 100644 --- a/cmd/migration/main.go +++ b/cmd/migration/main.go @@ -135,6 +135,11 @@ func main() { panic(err) } } + if step == 6 { + if err := migratePipelineRecipeUp000007(); err != nil { + panic(err) + } + } fmt.Printf("Step up to version %d\n", step+1) if err := m.Steps(1); err != nil { diff --git a/config/config.yaml b/config/config.yaml index f72f1f14f..0cea54bab 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -22,7 +22,7 @@ database: host: pg-sql port: 5432 name: pipeline - version: 6 + version: 7 timezone: Etc/UTC pool: idleconnections: 5 diff --git a/go.mod b/go.mod index 6f9507888..f4a649709 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 github.com/iancoleman/strcase v0.2.0 github.com/influxdata/influxdb-client-go/v2 v2.12.3 - github.com/instill-ai/component v0.6.1-alpha.0.20231108162809-21ba1d24cef4 + github.com/instill-ai/component v0.6.1-alpha.0.20231109103317-39c671d1a3d2 github.com/instill-ai/operator v0.3.0-alpha.0.20231108164009-b0f39b45d5b0 github.com/instill-ai/protogen-go v0.3.3-alpha.0.20231019202606-71607ddcd93f github.com/instill-ai/usage-client v0.2.4-alpha.0.20231019203021-70410a0a8061 diff --git a/go.sum b/go.sum index c6863ee71..ea0e97cbf 100644 --- a/go.sum +++ b/go.sum @@ -1114,8 +1114,8 @@ github.com/influxdata/influxdb-client-go/v2 v2.12.3 h1:28nRlNMRIV4QbtIUvxhWqaxn0 github.com/influxdata/influxdb-client-go/v2 v2.12.3/go.mod h1:IrrLUbCjjfkmRuaCiGQg4m2GbkaeJDcuWoxiWdQEbA0= github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7wlPfJLvMCdtV4zPulc4uCPrlywQOmbFOhgQNU= github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo= -github.com/instill-ai/component v0.6.1-alpha.0.20231108162809-21ba1d24cef4 h1:hMjRL2SiXg3/GJ/viBnKwQW/Ig/Ot03Cv2/NmFb8YW4= -github.com/instill-ai/component v0.6.1-alpha.0.20231108162809-21ba1d24cef4/go.mod h1:JDVDbZby6njJS2ioy2ptD3RhC4CuKWJKsWBg0PtQWK4= +github.com/instill-ai/component v0.6.1-alpha.0.20231109103317-39c671d1a3d2 h1:g3ZS+wKQRrrBYZN0K5CR9gRQA3tiSiwNK38IyRGcSrY= +github.com/instill-ai/component v0.6.1-alpha.0.20231109103317-39c671d1a3d2/go.mod h1:JDVDbZby6njJS2ioy2ptD3RhC4CuKWJKsWBg0PtQWK4= github.com/instill-ai/operator v0.3.0-alpha.0.20231108164009-b0f39b45d5b0 h1:pUkYm8Qzpo/5AYfRHvucnsJprl8+36/LhgkcYNc5GYA= github.com/instill-ai/operator v0.3.0-alpha.0.20231108164009-b0f39b45d5b0/go.mod h1:OSIInKwk10tO5kVssuUnIQjVKlb1PmNiyDTdrazl/eM= github.com/instill-ai/protogen-go v0.3.3-alpha.0.20231019202606-71607ddcd93f h1:hweU93u6qsg8GH/YSogOfa+wOZEnkilGsijcy1xKX7E= diff --git a/pkg/db/migration/000007_init.down.sql b/pkg/db/migration/000007_init.down.sql new file mode 100644 index 000000000..e69de29bb diff --git a/pkg/db/migration/000007_init.up.sql b/pkg/db/migration/000007_init.up.sql new file mode 100644 index 000000000..e69de29bb diff --git a/pkg/service/service.go b/pkg/service/service.go index 370f1fa93..a7ec4d32b 100644 --- a/pkg/service/service.go +++ b/pkg/service/service.go @@ -577,6 +577,10 @@ func (s *service) preTriggerPipeline(recipe *datamodel.Recipe, pipelineInputs [] if err != nil { return err } + err = component.CompileInstillFormat(schStruct) + if err != nil { + return err + } metadata, err = protojson.Marshal(schStruct) if err != nil { return err @@ -586,6 +590,8 @@ func (s *service) preTriggerPipeline(recipe *datamodel.Recipe, pipelineInputs [] c := jsonschema.NewCompiler() c.RegisterExtension("instillAcceptFormats", component.InstillAcceptFormatsMeta, component.InstillAcceptFormatsCompiler{}) + c.RegisterExtension("instillFormat", component.InstillFormatMeta, component.InstillFormatCompiler{}) + if err := c.AddResource("schema.json", strings.NewReader(string(metadata))); err != nil { return err }