Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(blob): add retention when uploading pipeline trigger data #937

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions cmd/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,8 @@ func main() {
ms := memory.NewMemoryStore()

// Initialize Minio client
minioClient, err := miniox.NewMinioClientAndInitBucket(ctx, &config.Config.Minio, logger, service.MetadataExpiryRules...)
retentionHandler := service.NewRetentionHandler()
minioClient, err := miniox.NewMinioClientAndInitBucket(ctx, &config.Config.Minio, logger, retentionHandler.ListExpiryRules()...)
if err != nil {
logger.Fatal("failed to create minio client", zap.Error(err))
}
Expand Down Expand Up @@ -303,7 +304,7 @@ func main() {
compStore,
ms,
workerUID,
nil,
retentionHandler,
binaryFetcher,
artifactPublicServiceClient,
artifactPrivateServiceClient,
Expand Down
10 changes: 5 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ require (
github.com/influxdata/influxdb-client-go/v2 v2.12.3
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241129105617-c2c298e76498
github.com/instill-ai/usage-client v0.2.4-alpha.0.20240123081026-6c78d9a5197a
github.com/instill-ai/x v0.5.0-alpha.0.20241203110942-cee5c110cba8
github.com/instill-ai/x v0.5.0-alpha.0.20241213094923-890bb310fcb2
github.com/itchyny/gojq v0.12.14
github.com/jackc/pgx/v5 v5.5.5
github.com/jmoiron/sqlx v1.4.0
Expand Down Expand Up @@ -264,7 +264,7 @@ require (
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.24.0 // indirect
go.opentelemetry.io/otel/metric v1.24.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sync v0.10.0 // indirect
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
google.golang.org/appengine v1.6.8 // indirect
Expand All @@ -277,9 +277,9 @@ require (
go.opentelemetry.io/proto/otlp v1.1.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.10.0 // indirect
golang.org/x/crypto v0.26.0
golang.org/x/sys v0.24.0 // indirect
golang.org/x/text v0.17.0
golang.org/x/crypto v0.31.0
golang.org/x/sys v0.28.0 // indirect
golang.org/x/text v0.21.0
golang.org/x/time v0.5.0 // indirect
google.golang.org/genproto v0.0.0-20240722135656-d784300faade // indirect
gopkg.in/yaml.v3 v3.0.1
Expand Down
20 changes: 10 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1289,8 +1289,8 @@ github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241129105617-c2c298e76498 h1:
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241129105617-c2c298e76498/go.mod h1:rf0UY7VpEgpaLudYEcjx5rnbuwlBaaLyD4FQmWLtgAY=
github.com/instill-ai/usage-client v0.2.4-alpha.0.20240123081026-6c78d9a5197a h1:gmy8BcCFDZQan40c/D3f62DwTYtlCwi0VrSax+pKffw=
github.com/instill-ai/usage-client v0.2.4-alpha.0.20240123081026-6c78d9a5197a/go.mod h1:EpX3Yr661uWULtZf5UnJHfr5rw2PDyX8ku4Kx0UtYFw=
github.com/instill-ai/x v0.5.0-alpha.0.20241203110942-cee5c110cba8 h1:w2F6sI6VbzIXUIh6HrSrV4k43pM/brj1jv6HT994+so=
github.com/instill-ai/x v0.5.0-alpha.0.20241203110942-cee5c110cba8/go.mod h1:jkVtaq9T2zAFA5N46tlV4K5EEVE7FcOVNbqY4wFWYz8=
github.com/instill-ai/x v0.5.0-alpha.0.20241213094923-890bb310fcb2 h1:JHQpGbLn8ViRH3WNdOEt+smFkyhDJCs6U6hDbJ9suHA=
github.com/instill-ai/x v0.5.0-alpha.0.20241213094923-890bb310fcb2/go.mod h1:qztbVw9eW69byjdQINhYc7dm9tmhR0zuP46GjV3hwlQ=
github.com/intel/goresctrl v0.2.0/go.mod h1:+CZdzouYFn5EsxgqAQTEzMfwKwuc0fVdMrT9FCCAVRQ=
github.com/itchyny/gojq v0.12.14 h1:6k8vVtsrhQSYgSGg827AD+PVVaB1NLXEdX+dda2oZCc=
github.com/itchyny/gojq v0.12.14/go.mod h1:y1G7oO7XkcR1LPZO59KyoCRy08T3j9vDYRV0GgYSS+s=
Expand Down Expand Up @@ -2124,8 +2124,8 @@ golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5y
golang.org/x/crypto v0.16.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs=
golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw=
golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54=
golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U=
golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk=
golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
Expand Down Expand Up @@ -2321,8 +2321,8 @@ golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220929204114-8fcdb60fdcc0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20180224232135-f6cff0780e54/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down Expand Up @@ -2476,8 +2476,8 @@ golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg=
golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
Expand Down Expand Up @@ -2508,8 +2508,8 @@ golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc=
golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
Expand Down
11 changes: 9 additions & 2 deletions pkg/recipe/variable.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,19 @@ import (
"github.com/instill-ai/pipeline-backend/config"
"github.com/instill-ai/pipeline-backend/pkg/constant"
"github.com/instill-ai/pipeline-backend/pkg/resource"

miniox "github.com/instill-ai/x/minio"
)

// SystemVariables contain information about a pipeline trigger.
// TODO jvallesm: we should remove the __ prefix from the fields as it's an
// outdated convention.
type SystemVariables struct {
PipelineTriggerID string `json:"__PIPELINE_TRIGGER_ID"`
PipelineID string `json:"__PIPELINE_ID"`
PipelineUID uuid.UUID `json:"__PIPELINE_UID"`
PipelineReleaseID string `json:"__PIPELINE_RELEASE_ID"`
PipelineReleaseUID uuid.UUID `json:"__PIPELINE_RELEASE_UID"`
ExpiryRuleTag string `json:"__EXPIRY_RULE_TAG"`

// PipelineOwner represents the namespace that owns the pipeline. This is typically
// the namespace where the pipeline was created and is stored.
Expand All @@ -30,12 +33,16 @@ type SystemVariables struct {
// PipelineRequesterID is the ID of the entity (user or organization)
// that initiated the pipeline execution. This may differ from PipelineUserUID
// when the pipeline is triggered by on behalf of an organization.
// TODO: we should use resource.Namespace for PipelineRequester
PipelineRequesterID string `json:"__PIPELINE_REQUESTER_ID"`
// PipelineRequesterUID is the unique identifier of the entity (user or organization)
// that initiated the pipeline execution. This may differ from PipelineUserUID
// when the pipeline is triggered by on behalf of an organization.
PipelineRequesterUID uuid.UUID `json:"__PIPELINE_REQUESTER_UID"`
// TODO: we should use resource.Namespace for PipelineOwner and PipelineRequester

// ExpiryRule defines the tag and object expiration for the blob storage
// associated to the pipeline run data (e.g. recipe, input, output).
ExpiryRule miniox.ExpiryRule `json:"__EXPIRY_RULE"`

HeaderAuthorization string `json:"__PIPELINE_HEADER_AUTHORIZATION"`
ModelBackend string `json:"__MODEL_BACKEND"`
Expand Down
11 changes: 4 additions & 7 deletions pkg/service/blobstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,15 @@ import (
"github.com/instill-ai/pipeline-backend/pkg/utils"

artifactpb "github.com/instill-ai/protogen-go/artifact/artifact/v1alpha"
resourcex "github.com/instill-ai/x/resource"
miniox "github.com/instill-ai/x/minio"
)

func (s *service) uploadBlobAndGetDownloadURL(ctx context.Context, ns resource.Namespace, data string) (string, error) {
func (s *service) uploadBlobAndGetDownloadURL(ctx context.Context, data string, ns resource.Namespace, expiryRule miniox.ExpiryRule) (string, error) {
mimeType, err := getMimeType(data)
if err != nil {
return "", fmt.Errorf("get mime type: %w", err)
}
artifactClient := s.artifactPublicServiceClient
requesterUID, _ := resourcex.GetRequesterUIDAndUserUID(ctx)

vars, err := recipe.GenerateSystemVariables(ctx, recipe.SystemVariables{})

Expand All @@ -42,14 +41,12 @@ func (s *service) uploadBlobAndGetDownloadURL(ctx context.Context, ns resource.N
ctx = metadata.NewOutgoingContext(ctx, utils.GetRequestMetadata(vars))

timestamp := time.Now().Format(time.RFC3339)
objectName := fmt.Sprintf("%s-%s%s", requesterUID.String(), timestamp, getFileExtension(mimeType))
objectName := fmt.Sprintf("%s/%s%s", ns.NsUID.String(), timestamp, getFileExtension(mimeType))

// TODO: We will need to add the expiry days for the blob data.
// This will be addressed in ins-6857
resp, err := artifactClient.GetObjectUploadURL(ctx, &artifactpb.GetObjectUploadURLRequest{
NamespaceId: ns.NsID,
ObjectName: objectName,
ObjectExpireDays: 0,
ObjectExpireDays: int32(expiryRule.ExpirationDays),
})

if err != nil {
Expand Down
3 changes: 0 additions & 3 deletions pkg/service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,6 @@ func NewService(
artifactPrivateServiceClient artifactpb.ArtifactPrivateServiceClient,
) Service {
zapLogger, _ := logger.GetZapLogger(context.Background())
if retentionHandler == nil {
retentionHandler = NewRetentionHandler()
}

return &service{
repository: repository,
Expand Down
36 changes: 24 additions & 12 deletions pkg/service/metadataretention.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,42 @@ package service
import (
"context"

"github.com/gofrs/uuid"

miniox "github.com/instill-ai/x/minio"
)

// MetadataRetentionHandler allows clients to access the object expiration rule
// associated to a namespace. This is used to set the expiration of objects,
// e.g. when uploading the pipeline run data of a trigger. The preferred way to
// set the expiration of an object is by attaching a tag to the object. The
// MinIO client should set the tag-ased expiration rules for the bucket when it
// is initialized.
type MetadataRetentionHandler interface {
GetExpiryTagBySubscriptionPlan(ctx context.Context, requesterUID string) (string, error)
ListExpiryRules() []miniox.ExpiryRule
GetExpiryRuleByNamespace(_ context.Context, namespaceUID uuid.UUID) (miniox.ExpiryRule, error)
}

type metadataRetentionHandler struct{}

// NewRetentionHandler is the default implementation of
// MetadataRetentionHandler. It returns the same expiration rule for all
// namespaces.
func NewRetentionHandler() MetadataRetentionHandler {
return &metadataRetentionHandler{}
}

func (r metadataRetentionHandler) GetExpiryTagBySubscriptionPlan(ctx context.Context, requesterUID string) (string, error) {
return defaultExpiryTag, nil
}

const (
defaultExpiryTag = "default-expiry"
var (
defaultExpiryRule = miniox.ExpiryRule{
Tag: "default-expiry",
ExpirationDays: 3,
}
)

var MetadataExpiryRules = []miniox.ExpiryRule{
{
Tag: defaultExpiryTag,
ExpirationDays: 3,
},
func (h *metadataRetentionHandler) ListExpiryRules() []miniox.ExpiryRule {
return []miniox.ExpiryRule{defaultExpiryRule}
}

func (h *metadataRetentionHandler) GetExpiryRuleByNamespace(_ context.Context, _ uuid.UUID) (miniox.ExpiryRule, error) {
return defaultExpiryRule, nil
}
Loading
Loading