diff --git a/cmd/main/main.go b/cmd/main/main.go index fe81d8361..8531f87df 100644 --- a/cmd/main/main.go +++ b/cmd/main/main.go @@ -255,7 +255,8 @@ func main() { ms := memory.NewMemoryStore() // Initialize Minio client - minioClient, err := miniox.NewMinioClientAndInitBucket(ctx, &config.Config.Minio, logger, service.MetadataExpiryRules...) + retentionHandler := service.NewRetentionHandler() + minioClient, err := miniox.NewMinioClientAndInitBucket(ctx, &config.Config.Minio, logger, retentionHandler.ListExpiryRules()...) if err != nil { logger.Fatal("failed to create minio client", zap.Error(err)) } @@ -303,7 +304,7 @@ func main() { compStore, ms, workerUID, - nil, + retentionHandler, binaryFetcher, artifactPublicServiceClient, artifactPrivateServiceClient, diff --git a/go.mod b/go.mod index 8770765be..705819712 100644 --- a/go.mod +++ b/go.mod @@ -40,7 +40,7 @@ require ( github.com/influxdata/influxdb-client-go/v2 v2.12.3 github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241129105617-c2c298e76498 github.com/instill-ai/usage-client v0.2.4-alpha.0.20240123081026-6c78d9a5197a - github.com/instill-ai/x v0.5.0-alpha.0.20241203110942-cee5c110cba8 + github.com/instill-ai/x v0.5.0-alpha.0.20241213094923-890bb310fcb2 github.com/itchyny/gojq v0.12.14 github.com/jackc/pgx/v5 v5.5.5 github.com/jmoiron/sqlx v1.4.0 @@ -264,7 +264,7 @@ require ( go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.24.0 // indirect go.opentelemetry.io/otel/metric v1.24.0 // indirect - golang.org/x/sync v0.8.0 // indirect + golang.org/x/sync v0.10.0 // indirect golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect google.golang.org/appengine v1.6.8 // indirect @@ -277,9 +277,9 @@ require ( go.opentelemetry.io/proto/otlp v1.1.0 // indirect go.uber.org/atomic v1.10.0 // indirect go.uber.org/multierr v1.10.0 // indirect - golang.org/x/crypto v0.26.0 - golang.org/x/sys v0.24.0 // indirect - golang.org/x/text v0.17.0 + golang.org/x/crypto v0.31.0 + golang.org/x/sys v0.28.0 // indirect + golang.org/x/text v0.21.0 golang.org/x/time v0.5.0 // indirect google.golang.org/genproto v0.0.0-20240722135656-d784300faade // indirect gopkg.in/yaml.v3 v3.0.1 diff --git a/go.sum b/go.sum index 4b7ddd0cd..acd0bebb1 100644 --- a/go.sum +++ b/go.sum @@ -1289,8 +1289,8 @@ github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241129105617-c2c298e76498 h1: github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241129105617-c2c298e76498/go.mod h1:rf0UY7VpEgpaLudYEcjx5rnbuwlBaaLyD4FQmWLtgAY= github.com/instill-ai/usage-client v0.2.4-alpha.0.20240123081026-6c78d9a5197a h1:gmy8BcCFDZQan40c/D3f62DwTYtlCwi0VrSax+pKffw= github.com/instill-ai/usage-client v0.2.4-alpha.0.20240123081026-6c78d9a5197a/go.mod h1:EpX3Yr661uWULtZf5UnJHfr5rw2PDyX8ku4Kx0UtYFw= -github.com/instill-ai/x v0.5.0-alpha.0.20241203110942-cee5c110cba8 h1:w2F6sI6VbzIXUIh6HrSrV4k43pM/brj1jv6HT994+so= -github.com/instill-ai/x v0.5.0-alpha.0.20241203110942-cee5c110cba8/go.mod h1:jkVtaq9T2zAFA5N46tlV4K5EEVE7FcOVNbqY4wFWYz8= +github.com/instill-ai/x v0.5.0-alpha.0.20241213094923-890bb310fcb2 h1:JHQpGbLn8ViRH3WNdOEt+smFkyhDJCs6U6hDbJ9suHA= +github.com/instill-ai/x v0.5.0-alpha.0.20241213094923-890bb310fcb2/go.mod h1:qztbVw9eW69byjdQINhYc7dm9tmhR0zuP46GjV3hwlQ= github.com/intel/goresctrl v0.2.0/go.mod h1:+CZdzouYFn5EsxgqAQTEzMfwKwuc0fVdMrT9FCCAVRQ= github.com/itchyny/gojq v0.12.14 h1:6k8vVtsrhQSYgSGg827AD+PVVaB1NLXEdX+dda2oZCc= github.com/itchyny/gojq v0.12.14/go.mod h1:y1G7oO7XkcR1LPZO59KyoCRy08T3j9vDYRV0GgYSS+s= @@ -2124,8 +2124,8 @@ golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5y golang.org/x/crypto v0.16.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= -golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw= -golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54= +golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U= +golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -2321,8 +2321,8 @@ golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220929204114-8fcdb60fdcc0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= -golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= +golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180224232135-f6cff0780e54/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -2476,8 +2476,8 @@ golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg= -golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= +golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= @@ -2508,8 +2508,8 @@ golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= -golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= -golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= +golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= diff --git a/pkg/recipe/variable.go b/pkg/recipe/variable.go index ef0978e85..8b26bf7cc 100644 --- a/pkg/recipe/variable.go +++ b/pkg/recipe/variable.go @@ -10,16 +10,19 @@ import ( "github.com/instill-ai/pipeline-backend/config" "github.com/instill-ai/pipeline-backend/pkg/constant" "github.com/instill-ai/pipeline-backend/pkg/resource" + + miniox "github.com/instill-ai/x/minio" ) // SystemVariables contain information about a pipeline trigger. +// TODO jvallesm: we should remove the __ prefix from the fields as it's an +// outdated convention. type SystemVariables struct { PipelineTriggerID string `json:"__PIPELINE_TRIGGER_ID"` PipelineID string `json:"__PIPELINE_ID"` PipelineUID uuid.UUID `json:"__PIPELINE_UID"` PipelineReleaseID string `json:"__PIPELINE_RELEASE_ID"` PipelineReleaseUID uuid.UUID `json:"__PIPELINE_RELEASE_UID"` - ExpiryRuleTag string `json:"__EXPIRY_RULE_TAG"` // PipelineOwner represents the namespace that owns the pipeline. This is typically // the namespace where the pipeline was created and is stored. @@ -30,12 +33,16 @@ type SystemVariables struct { // PipelineRequesterID is the ID of the entity (user or organization) // that initiated the pipeline execution. This may differ from PipelineUserUID // when the pipeline is triggered by on behalf of an organization. + // TODO: we should use resource.Namespace for PipelineRequester PipelineRequesterID string `json:"__PIPELINE_REQUESTER_ID"` // PipelineRequesterUID is the unique identifier of the entity (user or organization) // that initiated the pipeline execution. This may differ from PipelineUserUID // when the pipeline is triggered by on behalf of an organization. PipelineRequesterUID uuid.UUID `json:"__PIPELINE_REQUESTER_UID"` - // TODO: we should use resource.Namespace for PipelineOwner and PipelineRequester + + // ExpiryRule defines the tag and object expiration for the blob storage + // associated to the pipeline run data (e.g. recipe, input, output). + ExpiryRule miniox.ExpiryRule `json:"__EXPIRY_RULE"` HeaderAuthorization string `json:"__PIPELINE_HEADER_AUTHORIZATION"` ModelBackend string `json:"__MODEL_BACKEND"` diff --git a/pkg/service/blobstorage.go b/pkg/service/blobstorage.go index c1323d40c..6102361c9 100644 --- a/pkg/service/blobstorage.go +++ b/pkg/service/blobstorage.go @@ -22,16 +22,15 @@ import ( "github.com/instill-ai/pipeline-backend/pkg/utils" artifactpb "github.com/instill-ai/protogen-go/artifact/artifact/v1alpha" - resourcex "github.com/instill-ai/x/resource" + miniox "github.com/instill-ai/x/minio" ) -func (s *service) uploadBlobAndGetDownloadURL(ctx context.Context, ns resource.Namespace, data string) (string, error) { +func (s *service) uploadBlobAndGetDownloadURL(ctx context.Context, data string, ns resource.Namespace, expiryRule miniox.ExpiryRule) (string, error) { mimeType, err := getMimeType(data) if err != nil { return "", fmt.Errorf("get mime type: %w", err) } artifactClient := s.artifactPublicServiceClient - requesterUID, _ := resourcex.GetRequesterUIDAndUserUID(ctx) vars, err := recipe.GenerateSystemVariables(ctx, recipe.SystemVariables{}) @@ -42,14 +41,12 @@ func (s *service) uploadBlobAndGetDownloadURL(ctx context.Context, ns resource.N ctx = metadata.NewOutgoingContext(ctx, utils.GetRequestMetadata(vars)) timestamp := time.Now().Format(time.RFC3339) - objectName := fmt.Sprintf("%s-%s%s", requesterUID.String(), timestamp, getFileExtension(mimeType)) + objectName := fmt.Sprintf("%s/%s%s", ns.NsUID.String(), timestamp, getFileExtension(mimeType)) - // TODO: We will need to add the expiry days for the blob data. - // This will be addressed in ins-6857 resp, err := artifactClient.GetObjectUploadURL(ctx, &artifactpb.GetObjectUploadURLRequest{ NamespaceId: ns.NsID, ObjectName: objectName, - ObjectExpireDays: 0, + ObjectExpireDays: int32(expiryRule.ExpirationDays), }) if err != nil { diff --git a/pkg/service/main.go b/pkg/service/main.go index 8d0aa0da7..6e7d9d13b 100644 --- a/pkg/service/main.go +++ b/pkg/service/main.go @@ -136,9 +136,6 @@ func NewService( artifactPrivateServiceClient artifactpb.ArtifactPrivateServiceClient, ) Service { zapLogger, _ := logger.GetZapLogger(context.Background()) - if retentionHandler == nil { - retentionHandler = NewRetentionHandler() - } return &service{ repository: repository, diff --git a/pkg/service/metadataretention.go b/pkg/service/metadataretention.go index df7d1f86a..042841996 100644 --- a/pkg/service/metadataretention.go +++ b/pkg/service/metadataretention.go @@ -3,30 +3,42 @@ package service import ( "context" + "github.com/gofrs/uuid" + miniox "github.com/instill-ai/x/minio" ) +// MetadataRetentionHandler allows clients to access the object expiration rule +// associated to a namespace. This is used to set the expiration of objects, +// e.g. when uploading the pipeline run data of a trigger. The preferred way to +// set the expiration of an object is by attaching a tag to the object. The +// MinIO client should set the tag-ased expiration rules for the bucket when it +// is initialized. type MetadataRetentionHandler interface { - GetExpiryTagBySubscriptionPlan(ctx context.Context, requesterUID string) (string, error) + ListExpiryRules() []miniox.ExpiryRule + GetExpiryRuleByNamespace(_ context.Context, namespaceUID uuid.UUID) (miniox.ExpiryRule, error) } type metadataRetentionHandler struct{} +// NewRetentionHandler is the default implementation of +// MetadataRetentionHandler. It returns the same expiration rule for all +// namespaces. func NewRetentionHandler() MetadataRetentionHandler { return &metadataRetentionHandler{} } -func (r metadataRetentionHandler) GetExpiryTagBySubscriptionPlan(ctx context.Context, requesterUID string) (string, error) { - return defaultExpiryTag, nil -} - -const ( - defaultExpiryTag = "default-expiry" +var ( + defaultExpiryRule = miniox.ExpiryRule{ + Tag: "default-expiry", + ExpirationDays: 3, + } ) -var MetadataExpiryRules = []miniox.ExpiryRule{ - { - Tag: defaultExpiryTag, - ExpirationDays: 3, - }, +func (h *metadataRetentionHandler) ListExpiryRules() []miniox.ExpiryRule { + return []miniox.ExpiryRule{defaultExpiryRule} +} + +func (h *metadataRetentionHandler) GetExpiryRuleByNamespace(_ context.Context, _ uuid.UUID) (miniox.ExpiryRule, error) { + return defaultExpiryRule, nil } diff --git a/pkg/service/pipeline.go b/pkg/service/pipeline.go index 93ce3c7f6..5591b8834 100644 --- a/pkg/service/pipeline.go +++ b/pkg/service/pipeline.go @@ -37,6 +37,7 @@ import ( "github.com/instill-ai/pipeline-backend/pkg/utils" "github.com/instill-ai/pipeline-backend/pkg/worker" "github.com/instill-ai/x/errmsg" + miniox "github.com/instill-ai/x/minio" errdomain "github.com/instill-ai/pipeline-backend/pkg/errors" mgmtpb "github.com/instill-ai/protogen-go/core/mgmt/v1beta" @@ -834,15 +835,26 @@ func (s *service) UpdateNamespacePipelineIDByID(ctx context.Context, ns resource // preTriggerPipeline does the following: // 1. Upload pipeline input data to minio if the data is blob data. // 2. New workflow memory. -// 2-1. Set the default values for the variables for memory data and uploading pipeline data. -// 2-2. Set the data with data.Value for the memory data, which will be used for pipeline running. -// 2-3. Upload "uploading pipeline data" to minio for pipeline run logger. +// a. Set the default values for the variables for memory data and +// uploading pipeline data. +// b. Set the data with data.Value for the memory data, which will be +// used for pipeline running. +// c. Upload "uploading pipeline data" to minio for pipeline run logger. // 3. Map the settings in recipe to the format in workflow memory. // 4. Enable the streaming mode when the header contains "text/event-stream" // -// We upload User Input Data by `uploadBlobAndGetDownloadURL`, which exposes the public URL because it will be used by `console` & external users. -// We upload Pipeline Input Data by `uploadPipelineRunInputsToMinio`, which does not expose the public URL. The URL will be used by pipeline run logger. -func (s *service) preTriggerPipeline(ctx context.Context, ns resource.Namespace, r *datamodel.Recipe, pipelineTriggerID string, pipelineData []*pipelinepb.TriggerData) error { +// We upload User Input Data by `uploadBlobAndGetDownloadURL`, which exposes +// the public URL because it will be used by `console` & external users. +// We upload Pipeline Input Data by `uploadPipelineRunInputsToMinio`, which +// does not expose the public URL. The URL will be used by pipeline run logger. +func (s *service) preTriggerPipeline( + ctx context.Context, + requester resource.Namespace, + r *datamodel.Recipe, + pipelineTriggerID string, + pipelineData []*pipelinepb.TriggerData, + expiryRule miniox.ExpiryRule, +) error { batchSize := len(pipelineData) if batchSize > constant.MaxBatchSize { return ErrExceedMaxBatchSize @@ -872,17 +884,15 @@ func (s *service) preTriggerPipeline(ctx context.Context, ns resource.Namespace, } m := i.(map[string]any) - - // TODO: remove these conversions after the blob storage is fully rolled out for k := range m { switch str := m[k].(type) { case string: if isUnstructuredFormat(formatMap[k]) { - // Skip the base64 decoding if the string is a URL + // Skip the base64 decoding if the string is a URL. if strings.HasPrefix(str, "http://") || strings.HasPrefix(str, "https://") { continue } - downloadURL, err := s.uploadBlobAndGetDownloadURL(ctx, ns, str) + downloadURL, err := s.uploadBlobAndGetDownloadURL(ctx, str, requester, expiryRule) if err != nil { return fmt.Errorf("upload blob and get download url: %w", err) } @@ -895,7 +905,7 @@ func (s *service) preTriggerPipeline(ctx context.Context, ns resource.Namespace, if strings.HasPrefix(str[idx], "http://") || strings.HasPrefix(str[idx], "https://") { continue } - downloadURL, err := s.uploadBlobAndGetDownloadURL(ctx, ns, str[idx]) + downloadURL, err := s.uploadBlobAndGetDownloadURL(ctx, str[idx], requester, expiryRule) if err != nil { return fmt.Errorf("upload blob and get download url: %w", err) } @@ -1373,16 +1383,9 @@ func (s *service) preTriggerPipeline(ctx context.Context, ns resource.Namespace, } } - requesterUID, _ := resourcex.GetRequesterUIDAndUserUID(ctx) - - expiryRuleTag, err := s.retentionHandler.GetExpiryTagBySubscriptionPlan(ctx, requesterUID.String()) - if err != nil { - return fmt.Errorf("get expiry rule tag: %w", err) - } - err = s.uploadPipelineRunInputsToMinio(ctx, uploadPipelineRunInputsToMinioParam{ pipelineTriggerID: pipelineTriggerID, - expiryRuleTag: expiryRuleTag, + expiryRule: expiryRule, pipelineData: uploadingPipelineData, }) if err != nil { @@ -1660,7 +1663,7 @@ func (s *service) RestoreNamespacePipelineReleaseByID(ctx context.Context, ns re existingPipeline.Recipe = dbPipelineRelease.Recipe if err := s.repository.UpdateNamespacePipelineByUID(ctx, existingPipeline.UID, existingPipeline); err != nil { - return err + return fmt.Errorf("updating pipeline: %w", err) } return nil @@ -1696,11 +1699,6 @@ func (s *service) triggerPipeline( return nil, nil, err } - expiryRuleTag, err := s.retentionHandler.GetExpiryTagBySubscriptionPlan(ctx, triggerParams.requesterUID.String()) - if err != nil { - return nil, nil, err - } - we, err := s.temporalClient.ExecuteWorkflow( ctx, workflowOptions, @@ -1718,7 +1716,7 @@ func (s *service) triggerPipeline( PipelineRequesterUID: requester.NsUID, PipelineRequesterID: requester.NsID, HeaderAuthorization: resource.GetRequestSingleHeader(ctx, "authorization"), - ExpiryRuleTag: expiryRuleTag, + ExpiryRule: triggerParams.expiryRule, }, Mode: mgmtpb.Mode_MODE_SYNC, WorkerUID: s.workerUID, @@ -1755,6 +1753,7 @@ type triggerParams struct { pipelineTriggerID string requesterUID uuid.UUID userUID uuid.UUID + expiryRule miniox.ExpiryRule } func (s *service) triggerAsyncPipeline(ctx context.Context, params triggerParams) (*longrunningpb.Operation, error) { @@ -1787,11 +1786,6 @@ func (s *service) triggerAsyncPipeline(ctx context.Context, params triggerParams return nil, err } - expiryRuleTag, err := s.retentionHandler.GetExpiryTagBySubscriptionPlan(ctx, params.requesterUID.String()) - if err != nil { - return nil, err - } - we, err := s.temporalClient.ExecuteWorkflow( ctx, workflowOptions, @@ -1808,7 +1802,7 @@ func (s *service) triggerAsyncPipeline(ctx context.Context, params triggerParams PipelineRequesterUID: requester.NsUID, PipelineRequesterID: requester.NsID, HeaderAuthorization: resource.GetRequestSingleHeader(ctx, "authorization"), - ExpiryRuleTag: expiryRuleTag, + ExpiryRule: params.expiryRule, }, Mode: mgmtpb.Mode_MODE_ASYNC, TriggerFromAPI: true, @@ -1976,6 +1970,10 @@ func (s *service) TriggerNamespacePipelineByID(ctx context.Context, ns resource. pipelineUID := dbPipeline.UID requesterUID, userUID := resourcex.GetRequesterUIDAndUserUID(ctx) + requester, err := s.GetNamespaceByUID(ctx, requesterUID) + if err != nil { + return nil, nil, fmt.Errorf("fetching requester namespace: %w", err) + } pipelineRun := s.logPipelineRunStart(ctx, logPipelineRunStartParams{ pipelineTriggerID: pipelineTriggerID, @@ -1994,7 +1992,12 @@ func (s *service) TriggerNamespacePipelineByID(ctx context.Context, ns resource. return nil, nil, fmt.Errorf("check trigger permission error: %w", err) } - err = s.preTriggerPipeline(ctx, ns, dbPipeline.Recipe, pipelineTriggerID, data) + expiryRule, err := s.retentionHandler.GetExpiryRuleByNamespace(ctx, requesterUID) + if err != nil { + return nil, nil, fmt.Errorf("accessing expiry rule: %w", err) + } + + err = s.preTriggerPipeline(ctx, requester, dbPipeline.Recipe, pipelineTriggerID, data, expiryRule) if err != nil { return nil, nil, err } @@ -2006,6 +2009,7 @@ func (s *service) TriggerNamespacePipelineByID(ctx context.Context, ns resource. pipelineTriggerID: pipelineTriggerID, requesterUID: requesterUID, userUID: userUID, + expiryRule: expiryRule, }, returnTraces) if err != nil { return nil, nil, err @@ -2023,6 +2027,10 @@ func (s *service) TriggerAsyncNamespacePipelineByID(ctx context.Context, ns reso } requesterUID, userUID := resourcex.GetRequesterUIDAndUserUID(ctx) + requester, err := s.GetNamespaceByUID(ctx, requesterUID) + if err != nil { + return nil, fmt.Errorf("fetching requester namespace: %w", err) + } pipelineRun := s.logPipelineRunStart(ctx, logPipelineRunStartParams{ pipelineTriggerID: pipelineTriggerID, @@ -2041,7 +2049,12 @@ func (s *service) TriggerAsyncNamespacePipelineByID(ctx context.Context, ns reso return nil, err } - err = s.preTriggerPipeline(ctx, ns, dbPipeline.Recipe, pipelineTriggerID, data) + expiryRule, err := s.retentionHandler.GetExpiryRuleByNamespace(ctx, requesterUID) + if err != nil { + return nil, fmt.Errorf("accessing expiry rule: %w", err) + } + + err = s.preTriggerPipeline(ctx, requester, dbPipeline.Recipe, pipelineTriggerID, data, expiryRule) if err != nil { return nil, err } @@ -2052,6 +2065,7 @@ func (s *service) TriggerAsyncNamespacePipelineByID(ctx context.Context, ns reso pipelineTriggerID: pipelineTriggerID, requesterUID: requesterUID, userUID: userUID, + expiryRule: expiryRule, }) if err != nil { return nil, err @@ -2069,6 +2083,10 @@ func (s *service) TriggerNamespacePipelineReleaseByID(ctx context.Context, ns re return nil, nil, errdomain.ErrNotFound } requesterUID, userUID := resourcex.GetRequesterUIDAndUserUID(ctx) + requester, err := s.GetNamespaceByUID(ctx, requesterUID) + if err != nil { + return nil, nil, fmt.Errorf("fetching requester namespace: %w", err) + } dbPipelineRelease, err := s.repository.GetNamespacePipelineReleaseByID(ctx, ownerPermalink, pipelineUID, id, false) if err != nil { @@ -2092,7 +2110,12 @@ func (s *service) TriggerNamespacePipelineReleaseByID(ctx context.Context, ns re return nil, nil, err } - err = s.preTriggerPipeline(ctx, ns, dbPipelineRelease.Recipe, pipelineTriggerID, data) + expiryRule, err := s.retentionHandler.GetExpiryRuleByNamespace(ctx, requesterUID) + if err != nil { + return nil, nil, fmt.Errorf("accessing expiry rule: %w", err) + } + + err = s.preTriggerPipeline(ctx, requester, dbPipelineRelease.Recipe, pipelineTriggerID, data, expiryRule) if err != nil { return nil, nil, err } @@ -2106,6 +2129,7 @@ func (s *service) TriggerNamespacePipelineReleaseByID(ctx context.Context, ns re pipelineTriggerID: pipelineTriggerID, requesterUID: requesterUID, userUID: userUID, + expiryRule: expiryRule, }, returnTraces) if err != nil { return nil, nil, err @@ -2123,6 +2147,10 @@ func (s *service) TriggerAsyncNamespacePipelineReleaseByID(ctx context.Context, } requesterUID, userUID := resourcex.GetRequesterUIDAndUserUID(ctx) + requester, err := s.GetNamespaceByUID(ctx, requesterUID) + if err != nil { + return nil, fmt.Errorf("fetching requester namespace: %w", err) + } dbPipelineRelease, err := s.repository.GetNamespacePipelineReleaseByID(ctx, ownerPermalink, pipelineUID, id, false) if err != nil { @@ -2146,7 +2174,12 @@ func (s *service) TriggerAsyncNamespacePipelineReleaseByID(ctx context.Context, return nil, err } - err = s.preTriggerPipeline(ctx, ns, dbPipelineRelease.Recipe, pipelineTriggerID, data) + expiryRule, err := s.retentionHandler.GetExpiryRuleByNamespace(ctx, requesterUID) + if err != nil { + return nil, fmt.Errorf("accessing expiry rule: %w", err) + } + + err = s.preTriggerPipeline(ctx, requester, dbPipelineRelease.Recipe, pipelineTriggerID, data, expiryRule) if err != nil { return nil, err } @@ -2159,6 +2192,7 @@ func (s *service) TriggerAsyncNamespacePipelineReleaseByID(ctx context.Context, pipelineTriggerID: pipelineTriggerID, requesterUID: requesterUID, userUID: userUID, + expiryRule: expiryRule, }) if err != nil { return nil, err diff --git a/pkg/service/pipelinerun.go b/pkg/service/pipelinerun.go index a10e42c1e..3e4edc088 100644 --- a/pkg/service/pipelinerun.go +++ b/pkg/service/pipelinerun.go @@ -385,7 +385,7 @@ func (s *service) ListPipelineRunsByRequester(ctx context.Context, req *pb.ListP type uploadPipelineRunInputsToMinioParam struct { pipelineTriggerID string - expiryRuleTag string + expiryRule miniox.ExpiryRule pipelineData []map[string]any } @@ -403,7 +403,7 @@ func (s *service) uploadPipelineRunInputsToMinio(ctx context.Context, param uplo FilePath: objectName, FileContent: param.pipelineData, FileMimeType: constant.ContentTypeJSON, - ExpiryRuleTag: param.expiryRuleTag, + ExpiryRuleTag: param.expiryRule.Tag, }) if err != nil { return fmt.Errorf("upload pipeline run inputs to minio: %w", err) diff --git a/pkg/service/webhook.go b/pkg/service/webhook.go index 02ef34a6c..2f2f4a655 100644 --- a/pkg/service/webhook.go +++ b/pkg/service/webhook.go @@ -162,6 +162,11 @@ func (s *service) DispatchPipelineWebhookEvent(ctx context.Context, params Dispa return DispatchPipelineWebhookEventResult{}, err } + expiryRule, err := s.retentionHandler.GetExpiryRuleByNamespace(ctx, loadPipelineResult.ns.NsUID) + if err != nil { + return DispatchPipelineWebhookEventResult{}, fmt.Errorf("accessing expiry rule: %w", err) + } + if runOn.ReleaseUID == uuid.Nil { pipelineRun := s.logPipelineRunStart(ctx, logPipelineRunStartParams{ pipelineTriggerID: pipelineTriggerID.String(), @@ -182,6 +187,7 @@ func (s *service) DispatchPipelineWebhookEvent(ctx context.Context, params Dispa userUID: loadPipelineResult.ns.NsUID, requesterUID: loadPipelineResult.ns.NsUID, pipelineTriggerID: pipelineTriggerID.String(), + expiryRule: expiryRule, }) if err != nil { return DispatchPipelineWebhookEventResult{}, err @@ -208,6 +214,7 @@ func (s *service) DispatchPipelineWebhookEvent(ctx context.Context, params Dispa userUID: loadPipelineResult.ns.NsUID, requesterUID: loadPipelineResult.ns.NsUID, pipelineTriggerID: pipelineTriggerID.String(), + expiryRule: expiryRule, }) } } diff --git a/pkg/utils/blobstorage.go b/pkg/utils/blobstorage.go index 293f8468b..14b364d11 100644 --- a/pkg/utils/blobstorage.go +++ b/pkg/utils/blobstorage.go @@ -23,34 +23,27 @@ import ( "github.com/instill-ai/x/blobstorage" artifactpb "github.com/instill-ai/protogen-go/artifact/artifact/v1alpha" + miniox "github.com/instill-ai/x/minio" ) -// UploadBlobDataAndReplaceWithURLsParams is the parameters for the UploadBlobDataAndReplaceWithURLs function. -type UploadBlobDataAndReplaceWithURLsParams struct { - // NamespaceID is the namespace ID. - NamespaceID string - // RequesterUID is the requester UID. - RequesterUID uuid.UUID - // DataStructs are the data structs to be uploaded. - DataStructs []*structpb.Struct - // Logger is the logger. - Logger *zap.Logger - // ArtifactClient is the artifact public service client. +// UploadBlobParams contains the information and dependencies to upload blob +// data owned by a namespace and obtain a download URL. +type UploadBlobParams struct { + NamespaceID string + NamespaceUID uuid.UUID + ExpiryRule miniox.ExpiryRule + Logger *zap.Logger ArtifactClient *artifactpb.ArtifactPublicServiceClient } -// UploadBlobDataAndReplaceWithURL uploads the unstructured data in the structs to minio and replaces the data with the URL. -// Before calling this function, ctx should have been set with the request metadata. -func UploadBlobDataAndReplaceWithURLs(ctx context.Context, params UploadBlobDataAndReplaceWithURLsParams) ([]*structpb.Struct, error) { - updatedDataStructs := make([]*structpb.Struct, len(params.DataStructs)) - for i, dataStruct := range params.DataStructs { - updatedDataStruct, err := uploadBlobDataAndReplaceWithURL(ctx, uploadBlobDataAndReplaceWithURLParams{ - namespaceID: params.NamespaceID, - requesterUID: params.RequesterUID, - dataStruct: dataStruct, - logger: params.Logger, - artifactClient: params.ArtifactClient, - }) +// UploadBlobDataAndReplaceWithURLs uploads the unstructured data in the +// structs to minio and replaces the data with the URL. +// Before calling this function, ctx should have been set with the request +// metadata. +func UploadBlobDataAndReplaceWithURLs(ctx context.Context, dataStructs []*structpb.Struct, params UploadBlobParams) ([]*structpb.Struct, error) { + updatedDataStructs := make([]*structpb.Struct, len(dataStructs)) + for i, dataStruct := range dataStructs { + updatedDataStruct, err := uploadBlobDataAndReplaceWithURL(ctx, dataStruct, params) if err != nil { // Note: we don't want to fail the whole process if one of the data structs fails to upload. updatedDataStructs[i] = dataStruct @@ -61,17 +54,7 @@ func UploadBlobDataAndReplaceWithURLs(ctx context.Context, params UploadBlobData return updatedDataStructs, nil } -type uploadBlobDataAndReplaceWithURLParams struct { - namespaceID string - requesterUID uuid.UUID - dataStruct *structpb.Struct - logger *zap.Logger - artifactClient *artifactpb.ArtifactPublicServiceClient -} - -func uploadBlobDataAndReplaceWithURL(ctx context.Context, params uploadBlobDataAndReplaceWithURLParams) (*structpb.Struct, error) { - - dataStruct := params.dataStruct +func uploadBlobDataAndReplaceWithURL(ctx context.Context, dataStruct *structpb.Struct, params UploadBlobParams) (*structpb.Struct, error) { for key, value := range dataStruct.GetFields() { updatedValue, err := processValue(ctx, params, value) if err == nil { @@ -82,18 +65,11 @@ func uploadBlobDataAndReplaceWithURL(ctx context.Context, params uploadBlobDataA return dataStruct, nil } -func processValue(ctx context.Context, params uploadBlobDataAndReplaceWithURLParams, value *structpb.Value) (*structpb.Value, error) { - +func processValue(ctx context.Context, params UploadBlobParams, value *structpb.Value) (*structpb.Value, error) { switch v := value.GetKind().(type) { case *structpb.Value_StringValue: if isUnstructuredData(v.StringValue) { - downloadURL, err := UploadBlobAndGetDownloadURL(ctx, UploadBlobAndGetDownloadURLParams{ - NamespaceID: params.namespaceID, - RequesterUID: params.requesterUID, - Data: v.StringValue, - Logger: params.logger, - ArtifactClient: params.artifactClient, - }) + downloadURL, err := uploadBlobAndGetDownloadURL(ctx, v.StringValue, params) if err != nil { return nil, err } @@ -108,9 +84,7 @@ func processValue(ctx context.Context, params uploadBlobDataAndReplaceWithURLPar case *structpb.Value_StructValue: for _, item := range v.StructValue.GetFields() { structData := item.GetStructValue() - newParams := params - newParams.dataStruct = structData - updatedStructData, err := uploadBlobDataAndReplaceWithURL(ctx, newParams) + updatedStructData, err := uploadBlobDataAndReplaceWithURL(ctx, structData, params) // Note: we don't want to fail the whole process if one of the data structs fails to upload. if err == nil { return &structpb.Value{Kind: &structpb.Value_StructValue{StructValue: updatedStructData}}, nil @@ -121,8 +95,7 @@ func processValue(ctx context.Context, params uploadBlobDataAndReplaceWithURLPar return value, nil } -func processList(ctx context.Context, params uploadBlobDataAndReplaceWithURLParams, list *structpb.ListValue) (*structpb.ListValue, error) { - +func processList(ctx context.Context, params UploadBlobParams, list *structpb.ListValue) (*structpb.ListValue, error) { for i, item := range list.Values { updatedItem, err := processValue(ctx, params, item) if err == nil { @@ -137,42 +110,24 @@ func isUnstructuredData(data string) bool { return strings.HasPrefix(data, "data:") && strings.Contains(data, ";base64,") } -// UploadBlobAndGetDownloadURLParams is the parameters for the UploadBlobAndGetDownloadURL function. -type UploadBlobAndGetDownloadURLParams struct { - // NamespaceID is the namespace ID. - NamespaceID string - // RequesterUID is the requester UID. - RequesterUID uuid.UUID - // Data is the data to be uploaded. - Data string - // Logger is the logger. - Logger *zap.Logger - // ArtifactClient is the artifact public service client. - ArtifactClient *artifactpb.ArtifactPublicServiceClient -} - -// UploadBlobAndGetDownloadURL uploads the unstructured data to minio and returns the public download URL. -func UploadBlobAndGetDownloadURL(ctx context.Context, params UploadBlobAndGetDownloadURLParams) (string, error) { - mimeType, err := getMimeType(params.Data) +func uploadBlobAndGetDownloadURL(ctx context.Context, data string, params UploadBlobParams) (string, error) { + mimeType, err := getMimeType(data) if err != nil { return "", fmt.Errorf("get mime type: %w", err) } uid, err := uuid.NewV4() - if err != nil { return "", fmt.Errorf("generate uuid: %w", err) } - objectName := fmt.Sprintf("%s/%s%s", params.RequesterUID.String(), uid.String(), getFileExtension(mimeType)) - artifactClient := *params.ArtifactClient + objectName := fmt.Sprintf("%s/%s%s", params.NamespaceUID.String(), uid.String(), getFileExtension(mimeType)) - // TODO: We will need to add the expiry days for the blob data. - // This will be addressed in ins-6857 + artifactClient := *params.ArtifactClient resp, err := artifactClient.GetObjectUploadURL(ctx, &artifactpb.GetObjectUploadURLRequest{ NamespaceId: params.NamespaceID, ObjectName: objectName, - ObjectExpireDays: 0, + ObjectExpireDays: int32(params.ExpiryRule.ExpirationDays), }) if err != nil { @@ -180,7 +135,7 @@ func UploadBlobAndGetDownloadURL(ctx context.Context, params UploadBlobAndGetDow } uploadURL := resp.GetUploadUrl() - data := removePrefix(params.Data) + data = removePrefix(data) b, err := base64.StdEncoding.DecodeString(data) if err != nil { return "", fmt.Errorf("decode base64 string: %w", err) diff --git a/pkg/worker/blobstorage.go b/pkg/worker/blobstorage.go index 81513f95d..d64f6128a 100644 --- a/pkg/worker/blobstorage.go +++ b/pkg/worker/blobstorage.go @@ -59,14 +59,12 @@ func (w *worker) uploadBlobDataAndGetDownloadURL(ctx context.Context, param *Com ctx = metadata.NewOutgoingContext(ctx, utils.GetRequestMetadata(sysVarJSON)) - objectName := fmt.Sprintf("%s/%s", requesterID, value.Filename()) + objectName := fmt.Sprintf("%s/%s", param.SystemVariables.PipelineRequesterUID.String(), value.Filename()) - // TODO: We will need to add the expiry days for the blob data. - // This will be addressed in ins-6857 resp, err := artifactClient.GetObjectUploadURL(ctx, &artifactpb.GetObjectUploadURLRequest{ NamespaceId: requesterID, ObjectName: objectName, - ObjectExpireDays: 0, + ObjectExpireDays: int32(param.SystemVariables.ExpiryRule.ExpirationDays), }) if err != nil { diff --git a/pkg/worker/minioactivity.go b/pkg/worker/minioactivity.go index 3f6ecc82b..6c1e2b7af 100644 --- a/pkg/worker/minioactivity.go +++ b/pkg/worker/minioactivity.go @@ -154,15 +154,15 @@ func (w *worker) UploadComponentInputsActivity(ctx context.Context, param *Compo ctx = metadata.NewOutgoingContext(ctx, utils.GetRequestMetadata(sysVarJSON)) - paramsForUpload := utils.UploadBlobDataAndReplaceWithURLsParams{ - NamespaceID: param.SystemVariables.PipelineOwner.NsID, - RequesterUID: param.SystemVariables.PipelineRequesterUID, - DataStructs: compInputs, + paramsForUpload := utils.UploadBlobParams{ + NamespaceID: param.SystemVariables.PipelineRequesterID, + NamespaceUID: param.SystemVariables.PipelineRequesterUID, + ExpiryRule: param.SystemVariables.ExpiryRule, Logger: log, ArtifactClient: &w.artifactPublicServiceClient, } - compInputs, err = utils.UploadBlobDataAndReplaceWithURLs(ctx, paramsForUpload) + compInputs, err = utils.UploadBlobDataAndReplaceWithURLs(ctx, compInputs, paramsForUpload) if err != nil { return err } @@ -173,7 +173,7 @@ func (w *worker) UploadComponentInputsActivity(ctx context.Context, param *Compo FilePath: objectName, FileContent: compInputs, FileMimeType: constant.ContentTypeJSON, - ExpiryRuleTag: param.SystemVariables.ExpiryRuleTag, + ExpiryRuleTag: param.SystemVariables.ExpiryRule.Tag, }) if err != nil { log.Error("failed to upload component run inputs to minio", zap.Error(err)) @@ -225,15 +225,15 @@ func (w *worker) UploadComponentOutputsActivity(ctx context.Context, param *Comp sysVarJSON := utils.StructToMap(param.SystemVariables, "json") ctx = metadata.NewOutgoingContext(ctx, utils.GetRequestMetadata(sysVarJSON)) - paramsForUpload := utils.UploadBlobDataAndReplaceWithURLsParams{ - NamespaceID: param.SystemVariables.PipelineOwner.NsID, - RequesterUID: param.SystemVariables.PipelineRequesterUID, - DataStructs: compOutputs, + paramsForUpload := utils.UploadBlobParams{ + NamespaceID: param.SystemVariables.PipelineRequesterID, + NamespaceUID: param.SystemVariables.PipelineRequesterUID, + ExpiryRule: param.SystemVariables.ExpiryRule, Logger: log, ArtifactClient: &w.artifactPublicServiceClient, } - compOutputs, err = utils.UploadBlobDataAndReplaceWithURLs(ctx, paramsForUpload) + compOutputs, err = utils.UploadBlobDataAndReplaceWithURLs(ctx, compOutputs, paramsForUpload) if err != nil { return err } @@ -242,7 +242,7 @@ func (w *worker) UploadComponentOutputsActivity(ctx context.Context, param *Comp FilePath: objectName, FileContent: compOutputs, FileMimeType: constant.ContentTypeJSON, - ExpiryRuleTag: param.SystemVariables.ExpiryRuleTag, + ExpiryRuleTag: param.SystemVariables.ExpiryRule.Tag, }) if err != nil { log.Error("failed to upload component run outputs to minio", zap.Error(err)) diff --git a/pkg/worker/workflow.go b/pkg/worker/workflow.go index ca3b050d8..f91724b70 100644 --- a/pkg/worker/workflow.go +++ b/pkg/worker/workflow.go @@ -246,7 +246,7 @@ func (w *worker) TriggerPipelineWorkflow(ctx workflow.Context, param *TriggerPip } err := workflow.ExecuteActivity(minioCtx, w.UploadRecipeToMinioActivity, &UploadRecipeToMinioActivityParam{ PipelineTriggerID: param.SystemVariables.PipelineTriggerID, - ExpiryRuleTag: param.SystemVariables.ExpiryRuleTag, + ExpiryRuleTag: param.SystemVariables.ExpiryRule.Tag, }).Get(ctx, nil) if err != nil { logger.Error("Failed to upload pipeline run recipe", zap.Error(err)) @@ -379,10 +379,8 @@ func (w *worker) TriggerPipelineWorkflow(ctx workflow.Context, param *TriggerPip OutputElements: comp.OutputElements, SystemVariables: param.SystemVariables, }).Get(ctx, nil); err != nil { - if err != nil { - errs = append(errs, err) - continue - } + errs = append(errs, err) + continue } } } @@ -421,7 +419,7 @@ func (w *worker) TriggerPipelineWorkflow(ctx workflow.Context, param *TriggerPip if err := workflow.ExecuteActivity(minioCtx, w.UploadOutputsToMinioActivity, &UploadOutputsToMinioActivityParam{ PipelineTriggerID: workflowID, - ExpiryRuleTag: param.SystemVariables.ExpiryRuleTag, + ExpiryRuleTag: param.SystemVariables.ExpiryRule.Tag, }).Get(ctx, nil); err != nil { return err }