Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(minio): align expiry rules with pipeline-backend #721

Merged
merged 1 commit into from
Dec 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions cmd/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,14 +232,26 @@ func main() {
defer timeseries.Close()

// Initialize Minio client
minioClient, err := miniox.NewMinioClientAndInitBucket(ctx, &config.Config.Minio, logger, service.MetadataExpiryRules...)
retentionHandler := service.NewRetentionHandler()
minioClient, err := miniox.NewMinioClientAndInitBucket(ctx, &config.Config.Minio, logger, retentionHandler.ListExpiryRules()...)
if err != nil {
logger.Fatal("failed to create minio client", zap.Error(err))
}

serv := service.NewService(repo, timeseries.WriteAPI(), mgmtPublicServiceClient, mgmtPrivateServiceClient,
artifactPrivateServiceClient, redisClient, temporalClient, rayService, &aclClient, minioClient, nil,
config.Config.Server.InstillCoreHost)
serv := service.NewService(
repo,
timeseries.WriteAPI(),
mgmtPublicServiceClient,
mgmtPrivateServiceClient,
artifactPrivateServiceClient,
redisClient,
temporalClient,
rayService,
&aclClient,
minioClient,
retentionHandler,
config.Config.Server.InstillCoreHost,
)

modelpb.RegisterModelPublicServiceServer(
publicGrpcS,
Expand Down
6 changes: 2 additions & 4 deletions cmd/worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,13 @@ import (
"github.com/instill-ai/model-backend/pkg/datamodel"
"github.com/instill-ai/model-backend/pkg/ray"
"github.com/instill-ai/model-backend/pkg/repository"
"github.com/instill-ai/x/temporal"
"github.com/instill-ai/x/zapadapter"

database "github.com/instill-ai/model-backend/pkg/db"
customlogger "github.com/instill-ai/model-backend/pkg/logger"
customotel "github.com/instill-ai/model-backend/pkg/logger/otel"
modelWorker "github.com/instill-ai/model-backend/pkg/worker"

"github.com/instill-ai/x/temporal"
"github.com/instill-ai/x/zapadapter"

miniox "github.com/instill-ai/x/minio"
)

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ require (
github.com/influxdata/influxdb-client-go/v2 v2.14.0
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241211175103-4f1558f81c9c
github.com/instill-ai/usage-client v0.3.0-alpha
github.com/instill-ai/x v0.5.0-alpha.0.20241203111314-11f1aa4a3d91
github.com/instill-ai/x v0.5.0-alpha.0.20241213094923-890bb310fcb2
github.com/jackc/pgx/v5 v5.6.0
github.com/knadh/koanf v1.5.0
github.com/lestrrat-go/jspointer v0.0.0-20181205001929-82fadba7561c
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,8 @@ github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241211175103-4f1558f81c9c h1:
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241211175103-4f1558f81c9c/go.mod h1:rf0UY7VpEgpaLudYEcjx5rnbuwlBaaLyD4FQmWLtgAY=
github.com/instill-ai/usage-client v0.3.0-alpha h1:yY5eNn5zINqy8wpOogiNmrVYzJKnd1KMnMxlYBpr7Tk=
github.com/instill-ai/usage-client v0.3.0-alpha/go.mod h1:8lvtZulkhQ7t8alttb2KkLKYoCp5u4oatzDbfFlEld0=
github.com/instill-ai/x v0.5.0-alpha.0.20241203111314-11f1aa4a3d91 h1:baD7UhjwpmbBkaykoxi+6qd9A97qb/fkvvkocmwSrFA=
github.com/instill-ai/x v0.5.0-alpha.0.20241203111314-11f1aa4a3d91/go.mod h1:jkVtaq9T2zAFA5N46tlV4K5EEVE7FcOVNbqY4wFWYz8=
github.com/instill-ai/x v0.5.0-alpha.0.20241213094923-890bb310fcb2 h1:JHQpGbLn8ViRH3WNdOEt+smFkyhDJCs6U6hDbJ9suHA=
github.com/instill-ai/x v0.5.0-alpha.0.20241213094923-890bb310fcb2/go.mod h1:qztbVw9eW69byjdQINhYc7dm9tmhR0zuP46GjV3hwlQ=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
Expand Down
34 changes: 22 additions & 12 deletions pkg/service/metadataretention.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,37 @@ import (
miniox "github.com/instill-ai/x/minio"
)

// MetadataRetentionHandler allows clients to access the object expiration rule
// associated to a namespace. This is used to set the expiration of objects,
// e.g. when uploading the data of a model run. The preferred way to set the
// expiration of an object is by attaching a tag to the object. The MinIO
// client should set the tag-ased expiration rules for the bucket when it is
// initialized.
type MetadataRetentionHandler interface {
GetExpiryTagBySubscriptionPlan(ctx context.Context, requesterUID uuid.UUID) (string, error)
ListExpiryRules() []miniox.ExpiryRule
GetExpiryRuleByNamespace(_ context.Context, namespaceUID uuid.UUID) (miniox.ExpiryRule, error)
}

type metadataRetentionHandler struct{}

// NewRetentionHandler is the default implementation of
// MetadataRetentionHandler. It returns the same expiration rule for all
// namespaces.
func NewRetentionHandler() MetadataRetentionHandler {
return &metadataRetentionHandler{}
}

func (h metadataRetentionHandler) GetExpiryTagBySubscriptionPlan(ctx context.Context, requesterUID uuid.UUID) (string, error) {
return defaultExpiryTag, nil
}

const (
defaultExpiryTag = "default-expiry"
var (
defaultExpiryRule = miniox.ExpiryRule{
Tag: "default-expiry",
ExpirationDays: 3,
}
)

var MetadataExpiryRules = []miniox.ExpiryRule{
{
Tag: defaultExpiryTag,
ExpirationDays: 3,
},
func (h *metadataRetentionHandler) ListExpiryRules() []miniox.ExpiryRule {
return []miniox.ExpiryRule{defaultExpiryRule}
}

func (h *metadataRetentionHandler) GetExpiryRuleByNamespace(_ context.Context, _ uuid.UUID) (miniox.ExpiryRule, error) {
return defaultExpiryRule, nil
}
24 changes: 11 additions & 13 deletions pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,8 @@ func NewService(
a acl.ACLClientInterface,
minioClient miniox.MinioI,
retentionHandler MetadataRetentionHandler,
h string) Service {
if retentionHandler == nil {
retentionHandler = NewRetentionHandler()
}
h string,
) Service {
return &service{
repository: r,
influxDBWriteClient: i,
Expand Down Expand Up @@ -193,9 +191,9 @@ func (s *service) CreateModelRun(ctx context.Context, triggerUID uuid.UUID, mode
}

requesterUID, userUID := resourcex.GetRequesterUIDAndUserUID(ctx)
expiryRuleTag, err := s.retentionHandler.GetExpiryTagBySubscriptionPlan(ctx, requesterUID)
expiryRule, err := s.retentionHandler.GetExpiryRuleByNamespace(ctx, requesterUID)
if err != nil {
return nil, err
return nil, fmt.Errorf("fetching expiration rule: %w", err)
}

inputReferenceID := miniox.GenerateInputRefID("model-runs")
Expand All @@ -204,7 +202,7 @@ func (s *service) CreateModelRun(ctx context.Context, triggerUID uuid.UUID, mode
FilePath: inputReferenceID,
FileBytes: inputJSON,
FileMimeType: constant.ContentTypeJSON,
ExpiryRuleTag: expiryRuleTag,
ExpiryRuleTag: expiryRule.Tag,
})
if err != nil {
logger.Error("UploadBase64File for input failed", zap.String("inputReferenceID", inputReferenceID), zap.String("reqJSON", string(inputJSON)), zap.Error(err))
Expand Down Expand Up @@ -557,9 +555,9 @@ func (s *service) TriggerNamespaceModelByID(ctx context.Context, ns resource.Nam
},
}

expiryRuleTag, err := s.retentionHandler.GetExpiryTagBySubscriptionPlan(ctx, runLog.RequesterUID)
expiryRule, err := s.retentionHandler.GetExpiryRuleByNamespace(ctx, runLog.RequesterUID)
if err != nil {
return nil, err
return nil, fmt.Errorf("fetching expiration rule: %w", err)
}

we, err := s.temporalClient.ExecuteWorkflow(
Expand All @@ -582,7 +580,7 @@ func (s *service) TriggerNamespaceModelByID(ctx context.Context, ns resource.Nam
Hardware: dbModel.Hardware,
Visibility: dbModel.Visibility,
RunLog: runLog,
ExpiryRuleTag: expiryRuleTag,
ExpiryRuleTag: expiryRule.Tag,
})
if err != nil {
logger.Error(fmt.Sprintf("unable to execute workflow: %s", err.Error()))
Expand Down Expand Up @@ -671,9 +669,9 @@ func (s *service) TriggerAsyncNamespaceModelByID(ctx context.Context, ns resourc

userUID := uuid.FromStringOrNil(resource.GetRequestSingleHeader(ctx, constant.HeaderUserUIDKey))

expiryRuleTag, err := s.retentionHandler.GetExpiryTagBySubscriptionPlan(ctx, runLog.RequesterUID)
expiryRule, err := s.retentionHandler.GetExpiryRuleByNamespace(ctx, runLog.RequesterUID)
if err != nil {
return nil, err
return nil, fmt.Errorf("fetching expiration rule: %w", err)
}

workflowOptions := client.StartWorkflowOptions{
Expand Down Expand Up @@ -705,7 +703,7 @@ func (s *service) TriggerAsyncNamespaceModelByID(ctx context.Context, ns resourc
Hardware: dbModel.Hardware,
Visibility: dbModel.Visibility,
RunLog: runLog,
ExpiryRuleTag: expiryRuleTag,
ExpiryRuleTag: expiryRule.Tag,
})
if err != nil {
logger.Error(fmt.Sprintf("unable to execute workflow: %s", err.Error()))
Expand Down
Loading