From ee6f65e17e93cd9038fd79ef30dfd275e6fc2286 Mon Sep 17 00:00:00 2001 From: "J. Victor Martins" Date: Thu, 4 Apr 2024 18:52:46 -0700 Subject: [PATCH] updater: Support max entry size in offline vuln load Signed-off-by: J. Victor Martins --- libvuln/jsonblob/jsonblob.go | 28 +++++++++++++--- libvuln/jsonblob/jsonblob_test.go | 56 +++++++++++++++++++++++++++++++ libvuln/updates.go | 4 +-- 3 files changed, 82 insertions(+), 6 deletions(-) diff --git a/libvuln/jsonblob/jsonblob.go b/libvuln/jsonblob/jsonblob.go index a4e5a2213..4a3bea8d4 100644 --- a/libvuln/jsonblob/jsonblob.go +++ b/libvuln/jsonblob/jsonblob.go @@ -44,12 +44,26 @@ type Store struct { latest map[driver.UpdateKind]uuid.UUID } +// LoadOpt is an option to configure the loader. +type LoadOpt func(l *Loader) + +// MaxEntrySize sets a size limit on the number of vulnerabilities and enrichments +// each entry can have (the default limit is no limit). +func MaxEntrySize(sz int) LoadOpt { + return func(l *Loader) { + l.maxEntrySize = sz + } +} + // Load reads in all the records serialized in the provided [io.Reader]. -func Load(ctx context.Context, r io.Reader) (*Loader, error) { +func Load(ctx context.Context, r io.Reader, opts ...LoadOpt) (*Loader, error) { l := Loader{ dec: json.NewDecoder(r), cur: uuid.Nil, } + for _, o := range opts { + o(&l) + } return &l, nil } @@ -65,6 +79,8 @@ type Loader struct { next *Entry de diskEntry cur uuid.UUID + + maxEntrySize int } // Next reports whether there's an [Entry] to be processed. @@ -75,8 +91,12 @@ func (l *Loader) Next() bool { for l.err = l.dec.Decode(&l.de); l.err == nil; l.err = l.dec.Decode(&l.de) { id := l.de.Ref - // If we just hit a new Entry, promote the current one. - if id != l.cur { + // If we just hit a new Entry, or if the next Entry is too big, then promote it. + promote := id != l.cur || + (l.next != nil && + l.maxEntrySize > 0 && + len(l.next.Vuln)+len(l.next.Enrichment) >= l.maxEntrySize) + if promote { l.e = l.next l.next = &Entry{} l.next.Updater = l.de.Updater @@ -100,7 +120,7 @@ func (l *Loader) Next() bool { l.next.Enrichment = append(l.next.Enrichment, en) } // If this was an initial diskEntry, promote the ref. - if id != l.cur { + if promote { l.cur = id // If we have an Entry ready, report that. if l.e != nil { diff --git a/libvuln/jsonblob/jsonblob_test.go b/libvuln/jsonblob/jsonblob_test.go index 141c6a7db..d867849d9 100644 --- a/libvuln/jsonblob/jsonblob_test.go +++ b/libvuln/jsonblob/jsonblob_test.go @@ -98,6 +98,62 @@ func TestRoundtrip(t *testing.T) { } } +func TestMaxSize(t *testing.T) { + ctx := context.Background() + a, err := New() + if err != nil { + t.Fatal(err) + } + + var want, got struct { + V []*claircore.Vulnerability + E []driver.EnrichmentRecord + } + + want.V = test.GenUniqueVulnerabilities(20, "test") + ref, err := a.UpdateVulnerabilities(ctx, "test", "", want.V) + if err != nil { + t.Error(err) + } + t.Logf("ref: %v", ref) + + var buf bytes.Buffer + defer func() { + t.Logf("wrote:\n%s", buf.String()) + }() + r, w := io.Pipe() + eg, ctx := errgroup.WithContext(ctx) + eg.Go(func() error { defer w.Close(); return a.Store(w) }) + eg.Go(func() error { + l, err := Load(ctx, io.TeeReader(r, &buf), MaxEntrySize(10)) + if err != nil { + return err + } + for l.Next() { + e := l.Entry() + if e.Vuln != nil && e.Enrichment != nil { + t.Error("expecting entry to have either vulnerability or enrichment, got both") + } + if len(e.Vuln) != 10 { + t.Errorf("expected 10 vulns, found %d", len(e.Vuln)) + } + if e.Vuln != nil { + got.V = append(got.V, l.Entry().Vuln...) + } + } + if err := l.Err(); err != nil { + return err + } + return nil + }) + if err := eg.Wait(); err != nil { + t.Error(err) + } + if !cmp.Equal(got, want) { + t.Error(cmp.Diff(got, want)) + } +} + func TestEnrichments(t *testing.T) { s, err := New() if err != nil { diff --git a/libvuln/updates.go b/libvuln/updates.go index 49d6d4639..d2a324013 100644 --- a/libvuln/updates.go +++ b/libvuln/updates.go @@ -19,14 +19,14 @@ import ( // // The format provided on "in" should be the same output from [jsonblob.Store], with // any compression undone. -func OfflineImport(ctx context.Context, pool *pgxpool.Pool, in io.Reader) error { +func OfflineImport(ctx context.Context, pool *pgxpool.Pool, in io.Reader, opt ...jsonblob.LoadOpt) error { // BUG(hank) The OfflineImport function is a wart, needed to work around // some package namespacing issues. It should get refactored if claircore // gets merged into clair. ctx = zlog.ContextWithValues(ctx, "component", "libvuln/OfflineImporter") s := postgres.NewMatcherStore(pool) - l, err := jsonblob.Load(ctx, in) + l, err := jsonblob.Load(ctx, in, opt...) if err != nil { return err }