Skip to content

Commit

Permalink
chore(migration): add migration script for start operator
Browse files Browse the repository at this point in the history
  • Loading branch information
donch1989 committed Nov 9, 2023
1 parent ed68178 commit eda0b59
Show file tree
Hide file tree
Showing 8 changed files with 172 additions and 4 deletions.
157 changes: 157 additions & 0 deletions cmd/migration/000007_migrate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
package main

import (
"database/sql"
"database/sql/driver"
"encoding/json"
"errors"
"fmt"
"strings"

"github.com/instill-ai/pipeline-backend/pkg/datamodel"
"google.golang.org/protobuf/types/known/structpb"
"gorm.io/gorm"

database "github.com/instill-ai/pipeline-backend/pkg/db"
)

type pipeline07 struct {
datamodel.BaseDynamic
ID string
Owner string
Description sql.NullString
Recipe *recipe07 `gorm:"type:jsonb"`
}

func (pipeline07) TableName() string {
return "pipeline"
}

// Recipe is the data model of the pipeline recipe
type recipe07 struct {
Version string `json:"version,omitempty"`
Components []*component07 `json:"components,omitempty"`
}

type component07 struct {
ID string `json:"id"`
DefinitionName string `json:"definition_name"`
ResourceName string `json:"resource_name"`
Configuration *structpb.Struct `json:"configuration"`
}

// Scan function for custom GORM type Recipe
func (r *recipe07) Scan(value interface{}) error {
bytes, ok := value.([]byte)
if !ok {
return errors.New(fmt.Sprint("Failed to unmarshal Recipe value:", value))
}

if err := json.Unmarshal(bytes, &r); err != nil {
return err
}

return nil
}

// Value function for custom GORM type Recipe
func (r *recipe07) Value() (driver.Value, error) {
valueString, err := json.Marshal(r)
return string(valueString), err
}

func migratePipelineRecipeUp000007() error {
db := database.GetConnection()
defer database.Close(db)

var items []pipeline07
result := db.Model(&pipeline07{})
if result.Error != nil {
return result.Error
}

rows, err := result.Rows()
if err != nil {
return err
}

defer rows.Close()

for rows.Next() {
var item pipeline07
if err = db.ScanRows(rows, &item); err != nil {
return err
}
items = append(items, item)

}

for idx := range items {
fmt.Printf("migrate %s\n", items[idx].UID)

updateErr := db.Transaction(func(tx *gorm.DB) error {
for idx := range items {

for compIdx := range items[idx].Recipe.Components {
if items[idx].Recipe.Components[compIdx].ID == "start" {
if metadata, ok := items[idx].Recipe.Components[compIdx].Configuration.Fields["metadata"]; ok {
for k := range metadata.GetStructValue().Fields {
vType := metadata.GetStructValue().Fields[k].GetStructValue().Fields["type"].GetStringValue()
instillFormat := ""
switch vType {
case "number", "integer", "boolean", "string":
instillFormat = vType
case "text":
instillFormat = "string"
vType = "string"
case "audio", "image", "video":
instillFormat = fmt.Sprintf("%s/*", vType)
vType = "string"

case "text_array":
instillFormat = "array:string"
items := &structpb.Struct{Fields: map[string]*structpb.Value{}}
items.Fields["type"] = structpb.NewStringValue("string")
metadata.GetStructValue().Fields[k].GetStructValue().Fields["items"] = structpb.NewStructValue(items)
vType = "array"

case "audio_array", "image_array", "video_array":
instillFormat = fmt.Sprintf("array:%s/*", strings.Split(vType, "_")[0])
items := &structpb.Struct{Fields: map[string]*structpb.Value{}}
items.Fields["type"] = structpb.NewStringValue("string")
metadata.GetStructValue().Fields[k].GetStructValue().Fields["items"] = structpb.NewStructValue(items)
vType = "array"

case "number_array", "integer_array", "boolean_array":
instillFormat = fmt.Sprintf("array:%s", strings.Split(vType, "_")[0])
items := &structpb.Struct{Fields: map[string]*structpb.Value{}}
items.Fields["type"] = structpb.NewStringValue(strings.Split(vType, "_")[0])
metadata.GetStructValue().Fields[k].GetStructValue().Fields["items"] = structpb.NewStructValue(items)
vType = "array"

}
metadata.GetStructValue().Fields[k].GetStructValue().Fields["type"] = structpb.NewStringValue(vType)
metadata.GetStructValue().Fields[k].GetStructValue().Fields["instillFormat"], _ = structpb.NewValue(instillFormat)

}
items[idx].Recipe.Components[compIdx].Configuration.Fields["metadata"] = metadata
continue
}
}
}

result := tx.Unscoped().Model(&datamodel.Pipeline{}).Where("uid = ?", items[idx].UID).Update("recipe", &items[idx].Recipe)
if result.Error != nil {
return result.Error
}
}
return nil
})
if updateErr != nil {
return updateErr
}

}

return nil
}
5 changes: 5 additions & 0 deletions cmd/migration/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ func main() {
panic(err)
}
}
if step == 6 {
if err := migratePipelineRecipeUp000007(); err != nil {
panic(err)
}
}

fmt.Printf("Step up to version %d\n", step+1)
if err := m.Steps(1); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ database:
host: pg-sql
port: 5432
name: pipeline
version: 6
version: 7
timezone: Etc/UTC
pool:
idleconnections: 5
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0
github.com/iancoleman/strcase v0.2.0
github.com/influxdata/influxdb-client-go/v2 v2.12.3
github.com/instill-ai/component v0.6.1-alpha.0.20231108162809-21ba1d24cef4
github.com/instill-ai/component v0.6.1-alpha.0.20231109103317-39c671d1a3d2
github.com/instill-ai/operator v0.3.0-alpha.0.20231108164009-b0f39b45d5b0
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20231019202606-71607ddcd93f
github.com/instill-ai/usage-client v0.2.4-alpha.0.20231019203021-70410a0a8061
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1114,8 +1114,8 @@ github.com/influxdata/influxdb-client-go/v2 v2.12.3 h1:28nRlNMRIV4QbtIUvxhWqaxn0
github.com/influxdata/influxdb-client-go/v2 v2.12.3/go.mod h1:IrrLUbCjjfkmRuaCiGQg4m2GbkaeJDcuWoxiWdQEbA0=
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7wlPfJLvMCdtV4zPulc4uCPrlywQOmbFOhgQNU=
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo=
github.com/instill-ai/component v0.6.1-alpha.0.20231108162809-21ba1d24cef4 h1:hMjRL2SiXg3/GJ/viBnKwQW/Ig/Ot03Cv2/NmFb8YW4=
github.com/instill-ai/component v0.6.1-alpha.0.20231108162809-21ba1d24cef4/go.mod h1:JDVDbZby6njJS2ioy2ptD3RhC4CuKWJKsWBg0PtQWK4=
github.com/instill-ai/component v0.6.1-alpha.0.20231109103317-39c671d1a3d2 h1:g3ZS+wKQRrrBYZN0K5CR9gRQA3tiSiwNK38IyRGcSrY=
github.com/instill-ai/component v0.6.1-alpha.0.20231109103317-39c671d1a3d2/go.mod h1:JDVDbZby6njJS2ioy2ptD3RhC4CuKWJKsWBg0PtQWK4=
github.com/instill-ai/operator v0.3.0-alpha.0.20231108164009-b0f39b45d5b0 h1:pUkYm8Qzpo/5AYfRHvucnsJprl8+36/LhgkcYNc5GYA=
github.com/instill-ai/operator v0.3.0-alpha.0.20231108164009-b0f39b45d5b0/go.mod h1:OSIInKwk10tO5kVssuUnIQjVKlb1PmNiyDTdrazl/eM=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20231019202606-71607ddcd93f h1:hweU93u6qsg8GH/YSogOfa+wOZEnkilGsijcy1xKX7E=
Expand Down
Empty file.
Empty file.
6 changes: 6 additions & 0 deletions pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,10 @@ func (s *service) preTriggerPipeline(recipe *datamodel.Recipe, pipelineInputs []
if err != nil {
return err
}
err = component.CompileInstillFormat(schStruct)
if err != nil {
return err
}

Check warning on line 583 in pkg/service/service.go

View check run for this annotation

Codecov / codecov/patch

pkg/service/service.go#L580-L583

Added lines #L580 - L583 were not covered by tests
metadata, err = protojson.Marshal(schStruct)
if err != nil {
return err
Expand All @@ -586,6 +590,8 @@ func (s *service) preTriggerPipeline(recipe *datamodel.Recipe, pipelineInputs []

c := jsonschema.NewCompiler()
c.RegisterExtension("instillAcceptFormats", component.InstillAcceptFormatsMeta, component.InstillAcceptFormatsCompiler{})
c.RegisterExtension("instillFormat", component.InstillFormatMeta, component.InstillFormatCompiler{})

Check warning on line 594 in pkg/service/service.go

View check run for this annotation

Codecov / codecov/patch

pkg/service/service.go#L593-L594

Added lines #L593 - L594 were not covered by tests
if err := c.AddResource("schema.json", strings.NewReader(string(metadata))); err != nil {
return err
}
Expand Down

0 comments on commit eda0b59

Please sign in to comment.