Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support splitting blobs when stored as OCI layer #1140

Merged
merged 16 commits into from
Dec 20, 2024
11 changes: 11 additions & 0 deletions api/credentials/identity/hostpath/identity.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,3 +157,14 @@ func PathPrefix(id cpi.ConsumerIdentity) string {
}
return strings.TrimPrefix(id[ID_PATHPREFIX], "/")
}

func HostPort(id cpi.ConsumerIdentity) string {
if id == nil {
return ""
}
host := id[ID_HOSTNAME]
if port, ok := id[ID_PORT]; ok {
return host + ":" + port
}
return host
}
1 change: 1 addition & 0 deletions api/oci/extensions/repositories/ocireg/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func NewRepository(ctx cpi.Context, spec *RepositorySpec, info *RepositoryInfo)
spec: spec,
info: info,
}
i.logger.Debug("created repository")
return cpi.NewRepository(i), nil
}

Expand Down
18 changes: 18 additions & 0 deletions api/ocm/cpi/repocpi/bridge_r.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,24 @@ type RepositoryImpl interface {
io.Closer
}

// Chunked is an optional interface, which
// may be implemented to accept a blob limit for mapping
// local blobs to an external storage system.
type Chunked interface {
Skarlso marked this conversation as resolved.
Show resolved Hide resolved
// SetBlobLimit sets the blob limit if possible.
// It returns true, if this was successful.
SetBlobLimit(s int64) bool
}

// SetBlobLimit tries to set a blob limit for a repository
// implementation. It returns true, if this was possible.
func SetBlobLimit(i RepositoryImpl, s int64) bool {
if c, ok := i.(Chunked); ok {
return c.SetBlobLimit(s)
}
return false
}

type _repositoryBridgeBase = resource.ResourceImplBase[cpi.Repository]

type repositoryBridge struct {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package genericocireg

import (
"bytes"
"io"
"os"
"strings"
"sync"

"github.com/mandelsoft/goutils/errors"
"github.com/mandelsoft/goutils/finalizer"
"github.com/opencontainers/go-digest"

"ocm.software/ocm/api/oci"
Expand Down Expand Up @@ -88,9 +92,19 @@ func (m *localBlobAccessMethod) getBlob() (blobaccess.DataAccess, error) {
return nil, errors.ErrNotImplemented("artifact blob synthesis")
}
}
_, data, err := m.namespace.GetBlobData(digest.Digest(m.spec.LocalReference))
if err != nil {
return nil, err
refs := strings.Split(m.spec.LocalReference, ",")

var (
data blobaccess.DataAccess
err error
)
if len(refs) < 2 {
_, data, err = m.namespace.GetBlobData(digest.Digest(m.spec.LocalReference))
if err != nil {
return nil, err
}
} else {
data = &composedBlock{m, refs}
}
m.data = data
return m.data, err
Expand All @@ -111,3 +125,119 @@ func (m *localBlobAccessMethod) Get() ([]byte, error) {
func (m *localBlobAccessMethod) MimeType() string {
return m.spec.MediaType
}

////////////////////////////////////////////////////////////////////////////////

type composedBlock struct {
m *localBlobAccessMethod
refs []string
}

var _ blobaccess.DataAccess = (*composedBlock)(nil)

func (c *composedBlock) Get() ([]byte, error) {
buf := bytes.NewBuffer(nil)
for _, ref := range c.refs {
var finalize finalizer.Finalizer

_, data, err := c.m.namespace.GetBlobData(digest.Digest(ref))
if err != nil {
return nil, err
}
finalize.Close(data)
r, err := data.Reader()
if err != nil {
return nil, err
}
Skarlso marked this conversation as resolved.
Show resolved Hide resolved
finalize.Close(r)
_, err = io.Copy(buf, r)
if err != nil {
return nil, err
}
err = finalize.Finalize()
if err != nil {
return nil, err
}
}
return buf.Bytes(), nil
}
fabianburth marked this conversation as resolved.
Show resolved Hide resolved

func (c *composedBlock) Reader() (io.ReadCloser, error) {
return &composedReader{
m: c.m,
refs: c.refs,
}, nil
}

func (c *composedBlock) Close() error {
return nil
}

type composedReader struct {
lock sync.Mutex
m *localBlobAccessMethod
refs []string
reader io.ReadCloser
data blobaccess.DataAccess
}
jakobmoellerdev marked this conversation as resolved.
Show resolved Hide resolved

func (c *composedReader) Read(p []byte) (n int, err error) {
fabianburth marked this conversation as resolved.
Show resolved Hide resolved
c.lock.Lock()
defer c.lock.Unlock()

for {
if c.reader != nil {
n, err := c.reader.Read(p)

if err == io.EOF {
c.reader.Close()
c.data.Close()
c.refs = c.refs[1:]
c.reader = nil
c.data = nil
// start new layer and return partial (>0) read before next layer is started
err = nil
}
// return partial read (even a zero read if layer is not yet finished) or error
if c.reader != nil || err != nil || n > 0 {
return n, err
}
// otherwise, we can use the given buffer for the next layer

// now, we have to check for a next succeeding layer.
// This means to finish with the actual reader and continue
// with the next one.
}

// If no more layers are available, report EOF.
if len(c.refs) == 0 {
return 0, io.EOF
}

ref := strings.TrimSpace(c.refs[0])
_, c.data, err = c.m.namespace.GetBlobData(digest.Digest(ref))
if err != nil {
return 0, err
}
c.reader, err = c.data.Reader()
if err != nil {
return 0, err
}
}
}

func (c *composedReader) Close() error {
c.lock.Lock()
defer c.lock.Unlock()

if c.reader == nil && c.refs == nil {
return os.ErrClosed
}
if c.reader != nil {
c.reader.Close()
c.data.Close()
c.reader = nil
c.refs = nil
Skarlso marked this conversation as resolved.
Show resolved Hide resolved
}
return nil
}
53 changes: 53 additions & 0 deletions api/ocm/extensions/repositories/genericocireg/bloblimits.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package genericocireg

import (
"sync"

configctx "ocm.software/ocm/api/config"
"ocm.software/ocm/api/ocm/extensions/repositories/genericocireg/config"
)

var (
defaultBlobLimits config.BlobLimits
lock sync.Mutex
)

const (
KB = int64(1000)
MB = 1000 * KB
GB = 1000 * MB
)

func init() {
defaultBlobLimits = config.BlobLimits{}

// Add limits for known OCI repositories, here,
// or provide init functions in specialized packages
// by calling AddDefaultBlobLimit.
AddDefaultBlobLimit("ghcr.io", 10*GB) // https://github.com/orgs/community/discussions/77429
}

// AddDefaultBlobLimit can be used to set default blob limits
// for known repositories.
// Those limits will be overwritten, by blob limits
// given by a configuration object and the repository
// specification.
func AddDefaultBlobLimit(name string, limit int64) {
lock.Lock()
defer lock.Unlock()

defaultBlobLimits[name] = limit
}

func ConfigureBlobLimits(ctx configctx.ContextProvider, target config.Configurable) {
if target != nil {
lock.Lock()
defer lock.Unlock()

target.ConfigureBlobLimits(defaultBlobLimits)

if ctx != nil {
ctx.ConfigContext().ApplyTo(0, target)
}
}
}
Loading
Loading