Skip to content

Commit

Permalink
chore(minio): align expiry rules with pipeline-backend
Browse files Browse the repository at this point in the history
  • Loading branch information
jvallesm committed Dec 13, 2024
1 parent f27b240 commit eaaedb9
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 36 deletions.
20 changes: 16 additions & 4 deletions cmd/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,14 +232,26 @@ func main() {
defer timeseries.Close()

// Initialize Minio client
minioClient, err := miniox.NewMinioClientAndInitBucket(ctx, &config.Config.Minio, logger, service.MetadataExpiryRules...)
retentionHandler := service.NewRetentionHandler()
minioClient, err := miniox.NewMinioClientAndInitBucket(ctx, &config.Config.Minio, logger, retentionHandler.ListExpiryRules()...)
if err != nil {
logger.Fatal("failed to create minio client", zap.Error(err))
}

serv := service.NewService(repo, timeseries.WriteAPI(), mgmtPublicServiceClient, mgmtPrivateServiceClient,
artifactPrivateServiceClient, redisClient, temporalClient, rayService, &aclClient, minioClient, nil,
config.Config.Server.InstillCoreHost)
serv := service.NewService(
repo,
timeseries.WriteAPI(),
mgmtPublicServiceClient,
mgmtPrivateServiceClient,
artifactPrivateServiceClient,
redisClient,
temporalClient,
rayService,
&aclClient,
minioClient,
retentionHandler,
config.Config.Server.InstillCoreHost,
)

modelpb.RegisterModelPublicServiceServer(
publicGrpcS,
Expand Down
6 changes: 2 additions & 4 deletions cmd/worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,13 @@ import (
"github.com/instill-ai/model-backend/pkg/datamodel"
"github.com/instill-ai/model-backend/pkg/ray"
"github.com/instill-ai/model-backend/pkg/repository"
"github.com/instill-ai/x/temporal"
"github.com/instill-ai/x/zapadapter"

database "github.com/instill-ai/model-backend/pkg/db"
customlogger "github.com/instill-ai/model-backend/pkg/logger"
customotel "github.com/instill-ai/model-backend/pkg/logger/otel"
modelWorker "github.com/instill-ai/model-backend/pkg/worker"

"github.com/instill-ai/x/temporal"
"github.com/instill-ai/x/zapadapter"

miniox "github.com/instill-ai/x/minio"
)

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ require (
github.com/influxdata/influxdb-client-go/v2 v2.14.0
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241211175103-4f1558f81c9c
github.com/instill-ai/usage-client v0.3.0-alpha
github.com/instill-ai/x v0.5.0-alpha.0.20241203111314-11f1aa4a3d91
github.com/instill-ai/x v0.5.0-alpha.0.20241213094923-890bb310fcb2
github.com/jackc/pgx/v5 v5.6.0
github.com/knadh/koanf v1.5.0
github.com/lestrrat-go/jspointer v0.0.0-20181205001929-82fadba7561c
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,8 @@ github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241211175103-4f1558f81c9c h1:
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241211175103-4f1558f81c9c/go.mod h1:rf0UY7VpEgpaLudYEcjx5rnbuwlBaaLyD4FQmWLtgAY=
github.com/instill-ai/usage-client v0.3.0-alpha h1:yY5eNn5zINqy8wpOogiNmrVYzJKnd1KMnMxlYBpr7Tk=
github.com/instill-ai/usage-client v0.3.0-alpha/go.mod h1:8lvtZulkhQ7t8alttb2KkLKYoCp5u4oatzDbfFlEld0=
github.com/instill-ai/x v0.5.0-alpha.0.20241203111314-11f1aa4a3d91 h1:baD7UhjwpmbBkaykoxi+6qd9A97qb/fkvvkocmwSrFA=
github.com/instill-ai/x v0.5.0-alpha.0.20241203111314-11f1aa4a3d91/go.mod h1:jkVtaq9T2zAFA5N46tlV4K5EEVE7FcOVNbqY4wFWYz8=
github.com/instill-ai/x v0.5.0-alpha.0.20241213094923-890bb310fcb2 h1:JHQpGbLn8ViRH3WNdOEt+smFkyhDJCs6U6hDbJ9suHA=
github.com/instill-ai/x v0.5.0-alpha.0.20241213094923-890bb310fcb2/go.mod h1:qztbVw9eW69byjdQINhYc7dm9tmhR0zuP46GjV3hwlQ=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
Expand Down
34 changes: 22 additions & 12 deletions pkg/service/metadataretention.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,37 @@ import (
miniox "github.com/instill-ai/x/minio"
)

// MetadataRetentionHandler allows clients to access the object expiration rule
// associated to a namespace. This is used to set the expiration of objects,
// e.g. when uploading the data of a model run. The preferred way to set the
// expiration of an object is by attaching a tag to the object. The MinIO
// client should set the tag-ased expiration rules for the bucket when it is
// initialized.
type MetadataRetentionHandler interface {
GetExpiryTagBySubscriptionPlan(ctx context.Context, requesterUID uuid.UUID) (string, error)
ListExpiryRules() []miniox.ExpiryRule
GetExpiryRuleByNamespace(_ context.Context, namespaceUID uuid.UUID) (miniox.ExpiryRule, error)
}

type metadataRetentionHandler struct{}

// NewRetentionHandler is the default implementation of
// MetadataRetentionHandler. It returns the same expiration rule for all
// namespaces.
func NewRetentionHandler() MetadataRetentionHandler {
return &metadataRetentionHandler{}
}

func (h metadataRetentionHandler) GetExpiryTagBySubscriptionPlan(ctx context.Context, requesterUID uuid.UUID) (string, error) {
return defaultExpiryTag, nil
}

const (
defaultExpiryTag = "default-expiry"
var (
defaultExpiryRule = miniox.ExpiryRule{
Tag: "default-expiry",
ExpirationDays: 3,
}
)

var MetadataExpiryRules = []miniox.ExpiryRule{
{
Tag: defaultExpiryTag,
ExpirationDays: 3,
},
func (h *metadataRetentionHandler) ListExpiryRules() []miniox.ExpiryRule {
return []miniox.ExpiryRule{defaultExpiryRule}
}

func (h *metadataRetentionHandler) GetExpiryRuleByNamespace(_ context.Context, _ uuid.UUID) (miniox.ExpiryRule, error) {
return defaultExpiryRule, nil
}
24 changes: 11 additions & 13 deletions pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,8 @@ func NewService(
a acl.ACLClientInterface,
minioClient miniox.MinioI,
retentionHandler MetadataRetentionHandler,
h string) Service {
if retentionHandler == nil {
retentionHandler = NewRetentionHandler()
}
h string,
) Service {
return &service{
repository: r,
influxDBWriteClient: i,
Expand Down Expand Up @@ -193,9 +191,9 @@ func (s *service) CreateModelRun(ctx context.Context, triggerUID uuid.UUID, mode
}

requesterUID, userUID := resourcex.GetRequesterUIDAndUserUID(ctx)
expiryRuleTag, err := s.retentionHandler.GetExpiryTagBySubscriptionPlan(ctx, requesterUID)
expiryRule, err := s.retentionHandler.GetExpiryRuleByNamespace(ctx, requesterUID)
if err != nil {
return nil, err
return nil, fmt.Errorf("fetching expiration rule: %w", err)
}

inputReferenceID := miniox.GenerateInputRefID("model-runs")
Expand All @@ -204,7 +202,7 @@ func (s *service) CreateModelRun(ctx context.Context, triggerUID uuid.UUID, mode
FilePath: inputReferenceID,
FileBytes: inputJSON,
FileMimeType: constant.ContentTypeJSON,
ExpiryRuleTag: expiryRuleTag,
ExpiryRuleTag: expiryRule.Tag,
})
if err != nil {
logger.Error("UploadBase64File for input failed", zap.String("inputReferenceID", inputReferenceID), zap.String("reqJSON", string(inputJSON)), zap.Error(err))
Expand Down Expand Up @@ -557,9 +555,9 @@ func (s *service) TriggerNamespaceModelByID(ctx context.Context, ns resource.Nam
},
}

expiryRuleTag, err := s.retentionHandler.GetExpiryTagBySubscriptionPlan(ctx, runLog.RequesterUID)
expiryRule, err := s.retentionHandler.GetExpiryRuleByNamespace(ctx, runLog.RequesterUID)
if err != nil {
return nil, err
return nil, fmt.Errorf("fetching expiration rule: %w", err)
}

we, err := s.temporalClient.ExecuteWorkflow(
Expand All @@ -582,7 +580,7 @@ func (s *service) TriggerNamespaceModelByID(ctx context.Context, ns resource.Nam
Hardware: dbModel.Hardware,
Visibility: dbModel.Visibility,
RunLog: runLog,
ExpiryRuleTag: expiryRuleTag,
ExpiryRuleTag: expiryRule.Tag,
})
if err != nil {
logger.Error(fmt.Sprintf("unable to execute workflow: %s", err.Error()))
Expand Down Expand Up @@ -671,9 +669,9 @@ func (s *service) TriggerAsyncNamespaceModelByID(ctx context.Context, ns resourc

userUID := uuid.FromStringOrNil(resource.GetRequestSingleHeader(ctx, constant.HeaderUserUIDKey))

expiryRuleTag, err := s.retentionHandler.GetExpiryTagBySubscriptionPlan(ctx, runLog.RequesterUID)
expiryRule, err := s.retentionHandler.GetExpiryRuleByNamespace(ctx, runLog.RequesterUID)
if err != nil {
return nil, err
return nil, fmt.Errorf("fetching expiration rule: %w", err)
}

workflowOptions := client.StartWorkflowOptions{
Expand Down Expand Up @@ -705,7 +703,7 @@ func (s *service) TriggerAsyncNamespaceModelByID(ctx context.Context, ns resourc
Hardware: dbModel.Hardware,
Visibility: dbModel.Visibility,
RunLog: runLog,
ExpiryRuleTag: expiryRuleTag,
ExpiryRuleTag: expiryRule.Tag,
})
if err != nil {
logger.Error(fmt.Sprintf("unable to execute workflow: %s", err.Error()))
Expand Down

0 comments on commit eaaedb9

Please sign in to comment.