diff --git a/cmd/main/main.go b/cmd/main/main.go index b0056b2e..feb6a249 100644 --- a/cmd/main/main.go +++ b/cmd/main/main.go @@ -232,14 +232,26 @@ func main() { defer timeseries.Close() // Initialize Minio client - minioClient, err := miniox.NewMinioClientAndInitBucket(ctx, &config.Config.Minio, logger, service.MetadataExpiryRules...) + retentionHandler := service.NewRetentionHandler() + minioClient, err := miniox.NewMinioClientAndInitBucket(ctx, &config.Config.Minio, logger, retentionHandler.ListExpiryRules()...) if err != nil { logger.Fatal("failed to create minio client", zap.Error(err)) } - serv := service.NewService(repo, timeseries.WriteAPI(), mgmtPublicServiceClient, mgmtPrivateServiceClient, - artifactPrivateServiceClient, redisClient, temporalClient, rayService, &aclClient, minioClient, nil, - config.Config.Server.InstillCoreHost) + serv := service.NewService( + repo, + timeseries.WriteAPI(), + mgmtPublicServiceClient, + mgmtPrivateServiceClient, + artifactPrivateServiceClient, + redisClient, + temporalClient, + rayService, + &aclClient, + minioClient, + retentionHandler, + config.Config.Server.InstillCoreHost, + ) modelpb.RegisterModelPublicServiceServer( publicGrpcS, diff --git a/cmd/worker/main.go b/cmd/worker/main.go index 6d9d7789..3ba1b953 100644 --- a/cmd/worker/main.go +++ b/cmd/worker/main.go @@ -24,15 +24,13 @@ import ( "github.com/instill-ai/model-backend/pkg/datamodel" "github.com/instill-ai/model-backend/pkg/ray" "github.com/instill-ai/model-backend/pkg/repository" + "github.com/instill-ai/x/temporal" + "github.com/instill-ai/x/zapadapter" database "github.com/instill-ai/model-backend/pkg/db" customlogger "github.com/instill-ai/model-backend/pkg/logger" customotel "github.com/instill-ai/model-backend/pkg/logger/otel" modelWorker "github.com/instill-ai/model-backend/pkg/worker" - - "github.com/instill-ai/x/temporal" - "github.com/instill-ai/x/zapadapter" - miniox "github.com/instill-ai/x/minio" ) diff --git a/go.mod b/go.mod index 3aee70ff..d2f9812d 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/influxdata/influxdb-client-go/v2 v2.14.0 github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241211175103-4f1558f81c9c github.com/instill-ai/usage-client v0.3.0-alpha - github.com/instill-ai/x v0.5.0-alpha.0.20241203111314-11f1aa4a3d91 + github.com/instill-ai/x v0.5.0-alpha.0.20241213094923-890bb310fcb2 github.com/jackc/pgx/v5 v5.6.0 github.com/knadh/koanf v1.5.0 github.com/lestrrat-go/jspointer v0.0.0-20181205001929-82fadba7561c diff --git a/go.sum b/go.sum index c004042b..b41d309e 100644 --- a/go.sum +++ b/go.sum @@ -241,8 +241,8 @@ github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241211175103-4f1558f81c9c h1: github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241211175103-4f1558f81c9c/go.mod h1:rf0UY7VpEgpaLudYEcjx5rnbuwlBaaLyD4FQmWLtgAY= github.com/instill-ai/usage-client v0.3.0-alpha h1:yY5eNn5zINqy8wpOogiNmrVYzJKnd1KMnMxlYBpr7Tk= github.com/instill-ai/usage-client v0.3.0-alpha/go.mod h1:8lvtZulkhQ7t8alttb2KkLKYoCp5u4oatzDbfFlEld0= -github.com/instill-ai/x v0.5.0-alpha.0.20241203111314-11f1aa4a3d91 h1:baD7UhjwpmbBkaykoxi+6qd9A97qb/fkvvkocmwSrFA= -github.com/instill-ai/x v0.5.0-alpha.0.20241203111314-11f1aa4a3d91/go.mod h1:jkVtaq9T2zAFA5N46tlV4K5EEVE7FcOVNbqY4wFWYz8= +github.com/instill-ai/x v0.5.0-alpha.0.20241213094923-890bb310fcb2 h1:JHQpGbLn8ViRH3WNdOEt+smFkyhDJCs6U6hDbJ9suHA= +github.com/instill-ai/x v0.5.0-alpha.0.20241213094923-890bb310fcb2/go.mod h1:qztbVw9eW69byjdQINhYc7dm9tmhR0zuP46GjV3hwlQ= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= diff --git a/pkg/service/metadataretention.go b/pkg/service/metadataretention.go index 984ed23e..95a39919 100644 --- a/pkg/service/metadataretention.go +++ b/pkg/service/metadataretention.go @@ -8,27 +8,37 @@ import ( miniox "github.com/instill-ai/x/minio" ) +// MetadataRetentionHandler allows clients to access the object expiration rule +// associated to a namespace. This is used to set the expiration of objects, +// e.g. when uploading the data of a model run. The preferred way to set the +// expiration of an object is by attaching a tag to the object. The MinIO +// client should set the tag-ased expiration rules for the bucket when it is +// initialized. type MetadataRetentionHandler interface { - GetExpiryTagBySubscriptionPlan(ctx context.Context, requesterUID uuid.UUID) (string, error) + ListExpiryRules() []miniox.ExpiryRule + GetExpiryRuleByNamespace(_ context.Context, namespaceUID uuid.UUID) (miniox.ExpiryRule, error) } type metadataRetentionHandler struct{} +// NewRetentionHandler is the default implementation of +// MetadataRetentionHandler. It returns the same expiration rule for all +// namespaces. func NewRetentionHandler() MetadataRetentionHandler { return &metadataRetentionHandler{} } -func (h metadataRetentionHandler) GetExpiryTagBySubscriptionPlan(ctx context.Context, requesterUID uuid.UUID) (string, error) { - return defaultExpiryTag, nil -} - -const ( - defaultExpiryTag = "default-expiry" +var ( + defaultExpiryRule = miniox.ExpiryRule{ + Tag: "default-expiry", + ExpirationDays: 3, + } ) -var MetadataExpiryRules = []miniox.ExpiryRule{ - { - Tag: defaultExpiryTag, - ExpirationDays: 3, - }, +func (h *metadataRetentionHandler) ListExpiryRules() []miniox.ExpiryRule { + return []miniox.ExpiryRule{defaultExpiryRule} +} + +func (h *metadataRetentionHandler) GetExpiryRuleByNamespace(_ context.Context, _ uuid.UUID) (miniox.ExpiryRule, error) { + return defaultExpiryRule, nil } diff --git a/pkg/service/service.go b/pkg/service/service.go index 935999cb..c57732ba 100644 --- a/pkg/service/service.go +++ b/pkg/service/service.go @@ -134,10 +134,8 @@ func NewService( a acl.ACLClientInterface, minioClient miniox.MinioI, retentionHandler MetadataRetentionHandler, - h string) Service { - if retentionHandler == nil { - retentionHandler = NewRetentionHandler() - } + h string, +) Service { return &service{ repository: r, influxDBWriteClient: i, @@ -193,9 +191,9 @@ func (s *service) CreateModelRun(ctx context.Context, triggerUID uuid.UUID, mode } requesterUID, userUID := resourcex.GetRequesterUIDAndUserUID(ctx) - expiryRuleTag, err := s.retentionHandler.GetExpiryTagBySubscriptionPlan(ctx, requesterUID) + expiryRule, err := s.retentionHandler.GetExpiryRuleByNamespace(ctx, requesterUID) if err != nil { - return nil, err + return nil, fmt.Errorf("fetching expiration rule: %w", err) } inputReferenceID := miniox.GenerateInputRefID("model-runs") @@ -204,7 +202,7 @@ func (s *service) CreateModelRun(ctx context.Context, triggerUID uuid.UUID, mode FilePath: inputReferenceID, FileBytes: inputJSON, FileMimeType: constant.ContentTypeJSON, - ExpiryRuleTag: expiryRuleTag, + ExpiryRuleTag: expiryRule.Tag, }) if err != nil { logger.Error("UploadBase64File for input failed", zap.String("inputReferenceID", inputReferenceID), zap.String("reqJSON", string(inputJSON)), zap.Error(err)) @@ -557,9 +555,9 @@ func (s *service) TriggerNamespaceModelByID(ctx context.Context, ns resource.Nam }, } - expiryRuleTag, err := s.retentionHandler.GetExpiryTagBySubscriptionPlan(ctx, runLog.RequesterUID) + expiryRule, err := s.retentionHandler.GetExpiryRuleByNamespace(ctx, runLog.RequesterUID) if err != nil { - return nil, err + return nil, fmt.Errorf("fetching expiration rule: %w", err) } we, err := s.temporalClient.ExecuteWorkflow( @@ -582,7 +580,7 @@ func (s *service) TriggerNamespaceModelByID(ctx context.Context, ns resource.Nam Hardware: dbModel.Hardware, Visibility: dbModel.Visibility, RunLog: runLog, - ExpiryRuleTag: expiryRuleTag, + ExpiryRuleTag: expiryRule.Tag, }) if err != nil { logger.Error(fmt.Sprintf("unable to execute workflow: %s", err.Error())) @@ -671,9 +669,9 @@ func (s *service) TriggerAsyncNamespaceModelByID(ctx context.Context, ns resourc userUID := uuid.FromStringOrNil(resource.GetRequestSingleHeader(ctx, constant.HeaderUserUIDKey)) - expiryRuleTag, err := s.retentionHandler.GetExpiryTagBySubscriptionPlan(ctx, runLog.RequesterUID) + expiryRule, err := s.retentionHandler.GetExpiryRuleByNamespace(ctx, runLog.RequesterUID) if err != nil { - return nil, err + return nil, fmt.Errorf("fetching expiration rule: %w", err) } workflowOptions := client.StartWorkflowOptions{ @@ -705,7 +703,7 @@ func (s *service) TriggerAsyncNamespaceModelByID(ctx context.Context, ns resourc Hardware: dbModel.Hardware, Visibility: dbModel.Visibility, RunLog: runLog, - ExpiryRuleTag: expiryRuleTag, + ExpiryRuleTag: expiryRule.Tag, }) if err != nil { logger.Error(fmt.Sprintf("unable to execute workflow: %s", err.Error()))