From e08f25710b1bd2f364adbf8529904aa96ddacca3 Mon Sep 17 00:00:00 2001 From: "J. Victor Martins" Date: Thu, 18 Apr 2024 15:56:27 -0700 Subject: [PATCH] matcher: Add iterator-based DB update APIs --- datastore/enrichment.go | 9 ++ datastore/postgres/enrichment.go | 39 ++++- datastore/postgres/updatevulnerabilities.go | 67 ++++++-- datastore/updater.go | 9 ++ libvuln/jsonblob/jsonblob.go | 171 +++++++++++++++----- libvuln/jsonblob/jsonblob_test.go | 69 ++++++++ 6 files changed, 300 insertions(+), 64 deletions(-) diff --git a/datastore/enrichment.go b/datastore/enrichment.go index a11f376ea..c28491a3f 100644 --- a/datastore/enrichment.go +++ b/datastore/enrichment.go @@ -8,6 +8,12 @@ import ( "github.com/quay/claircore/libvuln/driver" ) +// EnrichmentIter is a function for iterating on enrichment records. +// +// It accepts a callback function 'yield' to handle each enrichment record. If the callback +// returns an error, the iteration is halted, and the error is propagated to indicate cancellation. +type EnrichmentIter func(yield func(enricher *driver.EnrichmentRecord) error) error + // EnrichmentUpdater is an interface exporting the necessary methods // for storing and querying Enrichments. type EnrichmentUpdater interface { @@ -15,6 +21,9 @@ type EnrichmentUpdater interface { // EnrichmentRecord(s), and ensures enrichments from previous updates are not // queries by clients. UpdateEnrichments(ctx context.Context, kind string, fingerprint driver.Fingerprint, enrichments []driver.EnrichmentRecord) (uuid.UUID, error) + // UpdateEnrichmentsIter performs the same operation as UpdateEnrichments, but + // accepting an iterator function. + UpdateEnrichmentsIter(ctx context.Context, kind string, fingerprint driver.Fingerprint, enIter EnrichmentIter) (uuid.UUID, error) } // Enrichment is an interface for querying enrichments from the store. diff --git a/datastore/postgres/enrichment.go b/datastore/postgres/enrichment.go index ba3ea8ecc..c1ec1811f 100644 --- a/datastore/postgres/enrichment.go +++ b/datastore/postgres/enrichment.go @@ -15,6 +15,7 @@ import ( "github.com/jackc/pgx/v4/pgxpool" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/quay/claircore/datastore" "github.com/quay/zlog" "github.com/quay/claircore/libvuln/driver" @@ -60,10 +61,28 @@ var ( ) ) +func (s *MatcherStore) UpdateEnrichmentsIter(ctx context.Context, updater string, fp driver.Fingerprint, it datastore.EnrichmentIter) (uuid.UUID, error) { + ctx = zlog.ContextWithValues(ctx, "component", "datastore/postgres/MatcherStore.UpdateEnrichmentsIter") + return s.updateEnrichments(ctx, updater, fp, it) +} + // UpdateEnrichments creates a new UpdateOperation, inserts the provided // EnrichmentRecord(s), and ensures enrichments from previous updates are not // queried by clients. -func (s *MatcherStore) UpdateEnrichments(ctx context.Context, name string, fp driver.Fingerprint, es []driver.EnrichmentRecord) (uuid.UUID, error) { +func (s *MatcherStore) UpdateEnrichments(ctx context.Context, updater string, fp driver.Fingerprint, es []driver.EnrichmentRecord) (uuid.UUID, error) { + ctx = zlog.ContextWithValues(ctx, "component", "datastore/postgres/MatcherStore.UpdateEnrichments") + enIter := func(yield func(record *driver.EnrichmentRecord) error) error { + for i := range es { + if err := yield(&es[i]); err != nil { + return err + } + } + return nil + } + return s.updateEnrichments(ctx, updater, fp, enIter) +} + +func (s *MatcherStore) updateEnrichments(ctx context.Context, name string, fp driver.Fingerprint, it datastore.EnrichmentIter) (uuid.UUID, error) { const ( create = ` INSERT @@ -134,17 +153,23 @@ DO batch := microbatch.NewInsert(tx, 2000, time.Minute) start = time.Now() - for i := range es { - hashKind, hash := hashEnrichment(&es[i]) + enCt := 0 + err = it(func(en *driver.EnrichmentRecord) error { + enCt++ + hashKind, hash := hashEnrichment(en) err := batch.Queue(ctx, insert, - hashKind, hash, name, es[i].Tags, es[i].Enrichment, + hashKind, hash, name, en.Tags, en.Enrichment, ) if err != nil { - return uuid.Nil, fmt.Errorf("failed to queue enrichment: %w", err) + return fmt.Errorf("failed to queue enrichment: %w", err) } if err := batch.Queue(ctx, assoc, hashKind, hash, name, id); err != nil { - return uuid.Nil, fmt.Errorf("failed to queue association: %w", err) + return fmt.Errorf("failed to queue association: %w", err) } + return nil + }) + if err != nil { + return uuid.Nil, fmt.Errorf("iterating on enrichments: %w", err) } if err := batch.Done(ctx); err != nil { return uuid.Nil, fmt.Errorf("failed to finish batch enrichment insert: %w", err) @@ -160,7 +185,7 @@ DO } zlog.Debug(ctx). Stringer("ref", ref). - Int("inserted", len(es)). + Int("inserted", enCt). Msg("update_operation committed") return ref, nil } diff --git a/datastore/postgres/updatevulnerabilities.go b/datastore/postgres/updatevulnerabilities.go index be02a8991..8a4e4b20f 100644 --- a/datastore/postgres/updatevulnerabilities.go +++ b/datastore/postgres/updatevulnerabilities.go @@ -12,11 +12,11 @@ import ( "github.com/google/uuid" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - "github.com/quay/zlog" - "github.com/quay/claircore" + "github.com/quay/claircore/datastore" "github.com/quay/claircore/libvuln/driver" "github.com/quay/claircore/pkg/microbatch" + "github.com/quay/zlog" ) var ( @@ -45,14 +45,33 @@ var ( ) ) +type deltaOpts struct { + deletedVulns []string + vulns []*claircore.Vulnerability +} + +// UpdateVulnerabilitiesIter implements vulnstore.Updater. +func (s *MatcherStore) UpdateVulnerabilitiesIter(ctx context.Context, updater string, fp driver.Fingerprint, it datastore.VulnIter) (uuid.UUID, error) { + ctx = zlog.ContextWithValues(ctx, "component", "datastore/postgres/MatcherStore.UpdateVulnerabilitiesIter") + return s.updateVulnerabilities(ctx, updater, fp, it, nil) +} + // UpdateVulnerabilities implements vulnstore.Updater. // // It creates a new UpdateOperation for this update call, inserts the // provided vulnerabilities and computes a diff comprising the removed // and added vulnerabilities for this UpdateOperation. -func (s *MatcherStore) UpdateVulnerabilities(ctx context.Context, updater string, fingerprint driver.Fingerprint, vulns []*claircore.Vulnerability) (uuid.UUID, error) { +func (s *MatcherStore) UpdateVulnerabilities(ctx context.Context, updater string, fp driver.Fingerprint, vulns []*claircore.Vulnerability) (uuid.UUID, error) { ctx = zlog.ContextWithValues(ctx, "component", "datastore/postgres/MatcherStore.UpdateVulnerabilities") - return s.updateVulnerabilities(ctx, updater, fingerprint, vulns, nil, false) + iterVulns := func(yield func(*claircore.Vulnerability) error) error { + for i := range vulns { + if err := yield(vulns[i]); err != nil { + return err + } + } + return nil + } + return s.updateVulnerabilities(ctx, updater, fp, iterVulns, nil) } // DeltaUpdateVulnerabilities implements vulnstore.Updater. @@ -68,10 +87,23 @@ func (s *MatcherStore) UpdateVulnerabilities(ctx context.Context, updater string // - Associate new vulnerabilities with new updateOperation func (s *MatcherStore) DeltaUpdateVulnerabilities(ctx context.Context, updater string, fingerprint driver.Fingerprint, vulns []*claircore.Vulnerability, deletedVulns []string) (uuid.UUID, error) { ctx = zlog.ContextWithValues(ctx, "component", "datastore/postgres/MatcherStore.DeltaUpdateVulnerabilities") - return s.updateVulnerabilities(ctx, updater, fingerprint, vulns, deletedVulns, true) + deltaOpts := deltaOpts{ + deletedVulns: deletedVulns, + vulns: vulns, + } + iterVulns := func(yield func(*claircore.Vulnerability) error) error { + for i := range vulns { + deltaOpts.vulns = append(deltaOpts.vulns) + if err := yield(vulns[i]); err != nil { + return err + } + } + return nil + } + return s.updateVulnerabilities(ctx, updater, fingerprint, iterVulns, &deltaOpts) } -func (s *MatcherStore) updateVulnerabilities(ctx context.Context, updater string, fingerprint driver.Fingerprint, vulns []*claircore.Vulnerability, deletedVulns []string, delta bool) (uuid.UUID, error) { +func (s *MatcherStore) updateVulnerabilities(ctx context.Context, updater string, fingerprint driver.Fingerprint, vulnIter func(yield func(*claircore.Vulnerability) error) error, deltaOpts *deltaOpts) (uuid.UUID, error) { const ( // Create makes a new update operation and returns the reference and ID. create = `INSERT INTO update_operation (updater, fingerprint, kind) VALUES ($1, $2, 'vulnerability') RETURNING id, ref;` @@ -139,6 +171,7 @@ func (s *MatcherStore) updateVulnerabilities(ctx context.Context, updater string return uuid.Nil, fmt.Errorf("failed to create update_operation: %w", err) } + delta := deltaOpts != nil updateVulnerabilitiesCounter.WithLabelValues("create", strconv.FormatBool(delta)).Add(1) updateVulnerabilitiesDuration.WithLabelValues("create", strconv.FormatBool(delta)).Observe(time.Since(start).Seconds()) @@ -181,13 +214,13 @@ func (s *MatcherStore) updateVulnerabilities(ctx context.Context, updater string } if len(oldVulns) > 0 { - for _, v := range vulns { + for _, v := range deltaOpts.vulns { // If we have an existing vuln in the new batch // delete it from the oldVulns map so it doesn't // get associated with the new update_operation. delete(oldVulns, v.Name) } - for _, delName := range deletedVulns { + for _, delName := range deltaOpts.deletedVulns { // If we have an existing vuln that has been signaled // as deleted by the updater then delete it so it doesn't // get associated with the new update_operation. @@ -211,14 +244,15 @@ func (s *MatcherStore) updateVulnerabilities(ctx context.Context, updater string // batch insert vulnerabilities skipCt := 0 - + vulnCt := 0 start = time.Now() mBatcher := microbatch.NewInsert(tx, 2000, time.Minute) - for _, vuln := range vulns { + err = vulnIter(func(vuln *claircore.Vulnerability) error { + vulnCt++ if vuln.Package == nil || vuln.Package.Name == "" { skipCt++ - continue + return nil } pkg := vuln.Package @@ -242,12 +276,17 @@ func (s *MatcherStore) updateVulnerabilities(ctx context.Context, updater string vuln.FixedInVersion, vuln.ArchOperation, vKind, vrLower, vrUpper, ) if err != nil { - return uuid.Nil, fmt.Errorf("failed to queue vulnerability: %w", err) + return fmt.Errorf("failed to queue vulnerability: %w", err) } if err := mBatcher.Queue(ctx, assoc, hashKind, hash, uoID); err != nil { - return uuid.Nil, fmt.Errorf("failed to queue association: %w", err) + return fmt.Errorf("failed to queue association: %w", err) } + + return nil + }) + if err != nil { + return uuid.Nil, fmt.Errorf("iterating on vulnerabilities: %w", err) } if err := mBatcher.Done(ctx); err != nil { return uuid.Nil, fmt.Errorf("failed to finish batch vulnerability insert: %w", err) @@ -266,7 +305,7 @@ func (s *MatcherStore) updateVulnerabilities(ctx context.Context, updater string zlog.Debug(ctx). Str("ref", ref.String()). Int("skipped", skipCt). - Int("inserted", len(vulns)-skipCt). + Int("inserted", vulnCt-skipCt). Msg("update_operation committed") return ref, nil } diff --git a/datastore/updater.go b/datastore/updater.go index 705286427..ab41a4375 100644 --- a/datastore/updater.go +++ b/datastore/updater.go @@ -10,6 +10,12 @@ import ( "github.com/quay/claircore/libvuln/driver" ) +// VulnIter is a function for iterating on vulnerabilities. +// +// It accepts a callback function 'yield' to handle each vulnerability. If the callback +// returns an error, the iteration is halted, and the error is propagated to indicate cancellation. +type VulnIter func(yield func(*claircore.Vulnerability) error) error + // Updater is an interface exporting the necessary methods // for updating a vulnerability database. type Updater interface { @@ -19,6 +25,9 @@ type Updater interface { // vulnerabilities, and ensures vulnerabilities from previous updates are // not queried by clients. UpdateVulnerabilities(ctx context.Context, updater string, fingerprint driver.Fingerprint, vulns []*claircore.Vulnerability) (uuid.UUID, error) + // UpdateVulnerabilitiesIter performs the same operation as + // UpdateVulnerabilities, but accepting an iterator function. + UpdateVulnerabilitiesIter(ctx context.Context, updater string, fingerprint driver.Fingerprint, vulnIter VulnIter) (uuid.UUID, error) // DeltaUpdateVulnerabilities creates a new UpdateOperation consisting of existing // vulnerabilities and new vulnerabilities. It also takes an array of deleted // vulnerability names which should no longer be available to query. diff --git a/libvuln/jsonblob/jsonblob.go b/libvuln/jsonblob/jsonblob.go index a4e5a2213..6352421a5 100644 --- a/libvuln/jsonblob/jsonblob.go +++ b/libvuln/jsonblob/jsonblob.go @@ -45,70 +45,97 @@ type Store struct { } // Load reads in all the records serialized in the provided [io.Reader]. -func Load(ctx context.Context, r io.Reader) (*Loader, error) { +func Load(_ context.Context, r io.Reader) (*Loader, error) { l := Loader{ dec: json.NewDecoder(r), - cur: uuid.Nil, } + l.de.Ref = uuid.Nil return &l, nil } -// Loader is an iterator that returns a series of [Entry]. +// Loader is an iterator that returns objects grouped by update operations from +// the [Store]. // -// Users should call [*Loader.Next] until it reports false, then check for -// errors via [*Loader.Err]. +// Users should either call [*Loader.Next] or [*Loader.NextIter], and if they +// report false, check for errors via [*Loader.Err]. +// +// 1. Use [*Loader.Next] to read all objects for a single update operation into +// an [Entry]. Use [*Loader.Entry] to read it. +// +// 2. Use [*Loader.NextIter] to iterate over each object for a single update +// operation. Use [*Loader.Iter] get the iterator (see [iter] for more). type Loader struct { err error e *Entry - dec *json.Decoder - next *Entry - de diskEntry - cur uuid.UUID + dec *json.Decoder + de diskEntry + cur uuid.UUID } -// Next reports whether there's an [Entry] to be processed. -func (l *Loader) Next() bool { +// iter wraps a disk entry with methods to read the next object for the same +// update operation. +// +// Users are expected to call [*iter.Next()] to read and parse the next object, +// which can be retrieved using [iter.Vulnerability] or [iter.Enrichment] based +// on the update kind. Errors are reported in the [Loader.Err]. +type iter struct { + *diskEntry + + // Copied when disk entry is first assigned. + Updater string + Fingerprint driver.Fingerprint + Kind driver.UpdateKind + + Vulnerability *claircore.Vulnerability + Enrichment *driver.EnrichmentRecord + + loader *Loader +} + +// NextIter reports if there is a new set of update operations to iterate on. If +// called in the middle of an iteration, all objects for the current operation +// are discarded. +func (l *Loader) NextIter() bool { if l.err != nil { return false } - - for l.err = l.dec.Decode(&l.de); l.err == nil; l.err = l.dec.Decode(&l.de) { - id := l.de.Ref - // If we just hit a new Entry, promote the current one. - if id != l.cur { - l.e = l.next - l.next = &Entry{} - l.next.Updater = l.de.Updater - l.next.Fingerprint = l.de.Fingerprint - l.next.Date = l.de.Date + if l.cur == l.de.Ref { + // Read until we are not on the same update operation. + for l.readNext() { } - switch l.de.Kind { + } + if l.err != nil { + return false + } + // Mark we are in a new update operation. + l.cur = l.de.Ref + return true +} + +// Next reports whether there's an [Entry] to be processed. +func (l *Loader) Next() bool { + if !l.NextIter() { + return false + } + e := Entry{} + it := l.Iter() + for it.Next() { + e.CommonEntry = it.CommonEntry + switch it.Kind { case driver.VulnerabilityKind: - vuln := claircore.Vulnerability{} - if err := json.Unmarshal(l.de.Vuln.buf, &vuln); err != nil { - l.err = err - return false - } - l.next.Vuln = append(l.next.Vuln, &vuln) + e.Vuln = append(e.Vuln, it.Vulnerability) case driver.EnrichmentKind: - en := driver.EnrichmentRecord{} - if err := json.Unmarshal(l.de.Enrichment.buf, &en); err != nil { - l.err = err - return false - } - l.next.Enrichment = append(l.next.Enrichment, en) - } - // If this was an initial diskEntry, promote the ref. - if id != l.cur { - l.cur = id - // If we have an Entry ready, report that. - if l.e != nil { - return true - } + e.Enrichment = append(e.Enrichment, *it.Enrichment) + default: + l.err = fmt.Errorf("unknown entry type: %s", it.Kind) + break } } - l.e = l.next + if l.Err() != nil { + return false + } + l.e = &e return true } @@ -117,6 +144,54 @@ func (l *Loader) Entry() *Entry { return l.e } +// Iter returns an iterator for read all objects for the current update operation. +func (l *Loader) Iter() *iter { + return &iter{ + Updater: l.de.Updater, + Fingerprint: l.de.Fingerprint, + Kind: l.de.Kind, + loader: l, + } +} + +// Next reports if there is a new object available in this iterator. +func (i *iter) Next() bool { + if i.loader.err != nil { + return false + } + if i.diskEntry != nil { + // Only need the next disk entry if not the first iteration. + if !i.loader.readNext() { + return false + } + } + i.diskEntry = &i.loader.de + switch i.Kind { + case driver.VulnerabilityKind: + vuln := claircore.Vulnerability{} + if err := json.Unmarshal(i.diskEntry.Vuln.buf, &vuln); err != nil { + i.loader.err = err + return false + } + i.Vulnerability = &vuln + case driver.EnrichmentKind: + en := driver.EnrichmentRecord{} + if err := json.Unmarshal(i.diskEntry.Enrichment.buf, &en); err != nil { + i.loader.err = err + return false + } + i.Enrichment = &en + } + return true +} + +// readNext will decode the next [diskEntry] from the store, and return true +// only if there were no errors and the entry belongs to the current update operation. +func (l *Loader) readNext() bool { + l.err = l.dec.Decode(&l.de) + return l.err == nil && l.cur == l.de.Ref +} + // Err is the latest encountered error. func (l *Loader) Err() error { // Don't report EOF as an error. @@ -455,6 +530,16 @@ func (s *Store) DeltaUpdateVulnerabilities(ctx context.Context, updater string, return uuid.Nil, nil } +// UpdateEnrichmentsIter is unimplemented. +func (s *Store) UpdateEnrichmentsIter(_ context.Context, _ string, _ driver.Fingerprint, _ datastore.EnrichmentIter) (uuid.UUID, error) { + return uuid.Nil, nil +} + +// UpdateVulnerabilitiesIter is unimplemented. +func (s *Store) UpdateVulnerabilitiesIter(_ context.Context, _ string, _ driver.Fingerprint, _ datastore.VulnIter) (uuid.UUID, error) { + return uuid.Nil, nil +} + var bufPool sync.Pool func getBuf() []byte { diff --git a/libvuln/jsonblob/jsonblob_test.go b/libvuln/jsonblob/jsonblob_test.go index 141c6a7db..5bb385093 100644 --- a/libvuln/jsonblob/jsonblob_test.go +++ b/libvuln/jsonblob/jsonblob_test.go @@ -118,3 +118,72 @@ func TestEnrichments(t *testing.T) { } t.Logf("wrote:\n%s", buf.String()) } + +func TestNextIter(t *testing.T) { + ctx := context.Background() + a, err := New() + if err != nil { + t.Fatal(err) + } + + var want, got struct { + V map[string][]*claircore.Vulnerability + } + + want.V = make(map[string][]*claircore.Vulnerability) + got.V = make(map[string][]*claircore.Vulnerability) + + foo := test.GenUniqueVulnerabilities(10, "foo") + bar := test.GenUniqueVulnerabilities(10, "bar") + + ref, err := a.UpdateVulnerabilities(ctx, "foo", "", foo) + if err != nil { + t.Error(err) + } + t.Logf("ref: %v", ref) + + ref, err = a.UpdateVulnerabilities(ctx, "bar", "", bar) + if err != nil { + t.Error(err) + } + t.Logf("ref: %v", ref) + + // We will break the foo vulns at the 5th element. + const fooBreak = 5 + want.V["foo"] = foo[:fooBreak] + want.V["bar"] = bar + + var buf bytes.Buffer + defer func() { + t.Logf("wrote:\n%s", buf.String()) + }() + r, w := io.Pipe() + eg, ctx := errgroup.WithContext(ctx) + eg.Go(func() error { defer w.Close(); return a.Store(w) }) + eg.Go(func() error { + l, err := Load(ctx, io.TeeReader(r, &buf)) + if err != nil { + return err + } + for l.NextIter() { + it := l.Iter() + for i := 0; it.Next(); i++ { + if it.Updater == "foo" && i == fooBreak { + t.Logf("breaking foo at %d:\n%s", i, it.Vulnerability.Name) + break + } + got.V[it.Updater] = append(got.V[it.Updater], it.Vulnerability) + } + } + if err := l.Err(); err != nil { + return err + } + return nil + }) + if err := eg.Wait(); err != nil { + t.Error(err) + } + if !cmp.Equal(got, want) { + t.Error(cmp.Diff(got, want)) + } +}