Skip to content

Commit

Permalink
Fix: Add a TTL on build containers for cleanup (#819)
Browse files Browse the repository at this point in the history
Added a key event manager to image service with a imageContainerTTL key
to identified expired build containers
  • Loading branch information
jsun-m authored Jan 7, 2025
1 parent f7eb7ca commit b5f5bb7
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 14 deletions.
28 changes: 27 additions & 1 deletion pkg/abstractions/image/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type Builder struct {
registry *common.ImageRegistry
containerRepo repository.ContainerRepository
tailscale *network.Tailscale
rdb *common.RedisClient
}

type BuildStep struct {
Expand Down Expand Up @@ -130,13 +131,14 @@ func (o *BuildOpts) addPythonRequirements() {
o.PythonPackages = append(filteredPythonPackages, baseRequirementsSlice...)
}

func NewBuilder(config types.AppConfig, registry *common.ImageRegistry, scheduler *scheduler.Scheduler, tailscale *network.Tailscale, containerRepo repository.ContainerRepository) (*Builder, error) {
func NewBuilder(config types.AppConfig, registry *common.ImageRegistry, scheduler *scheduler.Scheduler, tailscale *network.Tailscale, containerRepo repository.ContainerRepository, rdb *common.RedisClient) (*Builder, error) {
return &Builder{
config: config,
scheduler: scheduler,
tailscale: tailscale,
registry: registry,
containerRepo: containerRepo,
rdb: rdb,
}, nil
}

Expand Down Expand Up @@ -285,6 +287,14 @@ func (b *Builder) Build(ctx context.Context, opts *BuildOpts, outputChan chan co
return err
}

err = b.rdb.Set(ctx, Keys.imageBuildContainerTTL(containerId), "1", time.Duration(imageContainerTtlS)*time.Second).Err()
if err != nil {
outputChan <- common.OutputMsg{Done: true, Success: false, Msg: "Failed to connect to build container.\n"}
return err
}

go b.keepAlive(ctx, containerId, ctx.Done())

conn, err := network.ConnectToHost(ctx, hostname, time.Second*30, b.tailscale, b.config.Tailscale)
if err != nil {
outputChan <- common.OutputMsg{Done: true, Success: false, Msg: "Failed to connect to build container.\n"}
Expand Down Expand Up @@ -439,6 +449,22 @@ func (b *Builder) Exists(ctx context.Context, imageId string) bool {
return b.registry.Exists(ctx, imageId)
}

func (b *Builder) keepAlive(ctx context.Context, containerId string, done <-chan struct{}) {
ticker := time.NewTicker(time.Duration(buildContainerKeepAliveIntervalS) * time.Second)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-done:
return
case <-ticker.C:
b.rdb.Set(ctx, Keys.imageBuildContainerTTL(containerId), "1", time.Duration(imageContainerTtlS)*time.Second).Err()
}
}
}

var imageNamePattern = regexp.MustCompile(
`^` + // Assert position at the start of the string
`(?:(?P<Registry>(?:(?:localhost|[\w.-]+(?:\.[\w.-]+)+)(?::\d+)?)|[\w]+:\d+)\/)?` + // Optional registry, which can be localhost, a domain with optional port, or a simple registry with port
Expand Down
81 changes: 72 additions & 9 deletions pkg/abstractions/image/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package image
import (
"context"
"fmt"
"strings"

"github.com/beam-cloud/beta9/pkg/auth"
"github.com/beam-cloud/beta9/pkg/common"
Expand All @@ -23,9 +24,12 @@ type ImageService interface {

type RuncImageService struct {
pb.UnimplementedImageServiceServer
builder *Builder
config types.AppConfig
backendRepo repository.BackendRepository
builder *Builder
config types.AppConfig
backendRepo repository.BackendRepository
rdb *common.RedisClient
keyEventChan chan common.KeyEvent
keyEventManager *common.KeyEventManager
}

type ImageServiceOpts struct {
Expand All @@ -34,8 +38,12 @@ type ImageServiceOpts struct {
BackendRepo repository.BackendRepository
Scheduler *scheduler.Scheduler
Tailscale *network.Tailscale
RedisClient *common.RedisClient
}

const buildContainerKeepAliveIntervalS int = 10
const imageContainerTtlS int = 60

func NewRuncImageService(
ctx context.Context,
opts ImageServiceOpts,
Expand All @@ -45,16 +53,30 @@ func NewRuncImageService(
return nil, err
}

builder, err := NewBuilder(opts.Config, registry, opts.Scheduler, opts.Tailscale, opts.ContainerRepo)
builder, err := NewBuilder(opts.Config, registry, opts.Scheduler, opts.Tailscale, opts.ContainerRepo, opts.RedisClient)
if err != nil {
return nil, err
}

return &RuncImageService{
builder: builder,
config: opts.Config,
backendRepo: opts.BackendRepo,
}, nil
keyEventManager, err := common.NewKeyEventManager(opts.RedisClient)
if err != nil {
return nil, err
}

is := RuncImageService{
builder: builder,
config: opts.Config,
backendRepo: opts.BackendRepo,
keyEventChan: make(chan common.KeyEvent),
keyEventManager: keyEventManager,
rdb: opts.RedisClient,
}

go is.monitorImageContainers(ctx)
go is.keyEventManager.ListenForPattern(ctx, Keys.imageBuildContainerTTL("*"), is.keyEventChan)
go is.keyEventManager.ListenForPattern(ctx, common.RedisKeys.SchedulerContainerState("*"), is.keyEventChan)

return &is, nil
}

func (is *RuncImageService) VerifyImageBuild(ctx context.Context, in *pb.VerifyImageBuildRequest) (*pb.VerifyImageBuildResponse, error) {
Expand Down Expand Up @@ -184,6 +206,35 @@ func (is *RuncImageService) retrieveBuildSecrets(ctx context.Context, secrets []
return buildSecrets, nil
}

func (is *RuncImageService) monitorImageContainers(ctx context.Context) {
for {
select {
case event := <-is.keyEventChan:
switch event.Operation {
case common.KeyOperationSet:
if strings.Contains(event.Key, common.RedisKeys.SchedulerContainerState("")) {
containerId := strings.TrimPrefix(is.keyEventManager.TrimKeyspacePrefix(event.Key), common.RedisKeys.SchedulerContainerState(""))

if is.rdb.Exists(ctx, Keys.imageBuildContainerTTL(containerId)).Val() == 0 {
is.builder.scheduler.Stop(&types.StopContainerArgs{
ContainerId: containerId,
Force: true,
})
}
}
case common.KeyOperationExpired:
containerId := strings.TrimPrefix(is.keyEventManager.TrimKeyspacePrefix(event.Key), Keys.imageBuildContainerTTL(""))
is.builder.scheduler.Stop(&types.StopContainerArgs{
ContainerId: containerId,
Force: true,
})
}
case <-ctx.Done():
return
}
}
}

func convertBuildSteps(buildSteps []*pb.BuildStep) []BuildStep {
steps := make([]BuildStep, len(buildSteps))
for i, s := range buildSteps {
Expand All @@ -194,3 +245,15 @@ func convertBuildSteps(buildSteps []*pb.BuildStep) []BuildStep {
}
return steps
}

var (
imageBuildContainerTTL string = "image:build_container_ttl:%s"
)

var Keys = &keys{}

type keys struct{}

func (k *keys) imageBuildContainerTTL(containerId string) string {
return fmt.Sprintf(imageBuildContainerTTL, containerId)
}
8 changes: 4 additions & 4 deletions pkg/common/key_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ func NewKeyEventManager(rdb *RedisClient) (*KeyEventManager, error) {
return &KeyEventManager{rdb: rdb}, nil
}

func (kem *KeyEventManager) TrimKeyspacePrefix(key string) string {
return strings.TrimPrefix(key, keyspacePrefix)
}

func (kem *KeyEventManager) fetchExistingKeys(patternPrefix string) ([]string, error) {
pattern := fmt.Sprintf("%s*", patternPrefix)

Expand All @@ -49,10 +53,6 @@ func (kem *KeyEventManager) fetchExistingKeys(patternPrefix string) ([]string, e
return trimmedKeys, nil
}

func (kem *KeyEventManager) TrimKeyspacePrefix(key string) string {
return strings.TrimPrefix(key, keyspacePrefix)
}

func (kem *KeyEventManager) ListenForPattern(ctx context.Context, patternPrefix string, keyEventChan chan KeyEvent) error {
existingKeys, err := kem.fetchExistingKeys(patternPrefix)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ func (g *Gateway) registerServices() error {
Scheduler: g.Scheduler,
Tailscale: g.Tailscale,
BackendRepo: g.BackendRepo,
RedisClient: g.RedisClient,
})
if err != nil {
return err
Expand Down

0 comments on commit b5f5bb7

Please sign in to comment.