From e5d5c8d6f25e7228607fbff30ce5c184e3ec960e Mon Sep 17 00:00:00 2001 From: "J. Victor Martins" Date: Fri, 19 Apr 2024 16:11:47 -0700 Subject: [PATCH 1/2] db: Add stream update support for vulns Signed-off-by: J. Victor Martins --- datastore/matcher_store.go | 6 ++ datastore/postgres/updatevulnerabilities.go | 77 ++++++++++++++++----- datastore/updater.go | 6 ++ 3 files changed, 73 insertions(+), 16 deletions(-) diff --git a/datastore/matcher_store.go b/datastore/matcher_store.go index 80ffdc35f..2f161b2a1 100644 --- a/datastore/matcher_store.go +++ b/datastore/matcher_store.go @@ -1,5 +1,11 @@ package datastore +// Iter is an iterator function that accepts a callback 'yield' to handle each +// iterator item. The consumer can signal the iterator to break or retry by +// returning an error. The iterator itself returns an error if the iteration +// cannot continue or was interrupted unexpectedly. +type Iter[T any] func(yield func(T, error) bool) + // MatcherStore aggregates all interface types type MatcherStore interface { Updater diff --git a/datastore/postgres/updatevulnerabilities.go b/datastore/postgres/updatevulnerabilities.go index be02a8991..cb60fc13d 100644 --- a/datastore/postgres/updatevulnerabilities.go +++ b/datastore/postgres/updatevulnerabilities.go @@ -15,6 +15,7 @@ import ( "github.com/quay/zlog" "github.com/quay/claircore" + "github.com/quay/claircore/datastore" "github.com/quay/claircore/libvuln/driver" "github.com/quay/claircore/pkg/microbatch" ) @@ -45,14 +46,27 @@ var ( ) ) +// UpdateVulnerabilitiesIter implements vulnstore.Updater. +func (s *MatcherStore) UpdateVulnerabilitiesIter(ctx context.Context, updater string, fp driver.Fingerprint, it datastore.VulnerabilityIter) (uuid.UUID, error) { + ctx = zlog.ContextWithValues(ctx, "component", "datastore/postgres/MatcherStore.UpdateVulnerabilitiesIter") + return s.updateVulnerabilities(ctx, updater, fp, it, nil) +} + // UpdateVulnerabilities implements vulnstore.Updater. // // It creates a new UpdateOperation for this update call, inserts the // provided vulnerabilities and computes a diff comprising the removed // and added vulnerabilities for this UpdateOperation. -func (s *MatcherStore) UpdateVulnerabilities(ctx context.Context, updater string, fingerprint driver.Fingerprint, vulns []*claircore.Vulnerability) (uuid.UUID, error) { +func (s *MatcherStore) UpdateVulnerabilities(ctx context.Context, updater string, fp driver.Fingerprint, vulns []*claircore.Vulnerability) (uuid.UUID, error) { ctx = zlog.ContextWithValues(ctx, "component", "datastore/postgres/MatcherStore.UpdateVulnerabilities") - return s.updateVulnerabilities(ctx, updater, fingerprint, vulns, nil, false) + iterVulns := func(yield func(*claircore.Vulnerability, error) bool) { + for i := range vulns { + if !yield(vulns[i], nil) { + break + } + } + } + return s.updateVulnerabilities(ctx, updater, fp, iterVulns, nil) } // DeltaUpdateVulnerabilities implements vulnstore.Updater. @@ -68,10 +82,24 @@ func (s *MatcherStore) UpdateVulnerabilities(ctx context.Context, updater string // - Associate new vulnerabilities with new updateOperation func (s *MatcherStore) DeltaUpdateVulnerabilities(ctx context.Context, updater string, fingerprint driver.Fingerprint, vulns []*claircore.Vulnerability, deletedVulns []string) (uuid.UUID, error) { ctx = zlog.ContextWithValues(ctx, "component", "datastore/postgres/MatcherStore.DeltaUpdateVulnerabilities") - return s.updateVulnerabilities(ctx, updater, fingerprint, vulns, deletedVulns, true) + iterVulns := func(yield func(*claircore.Vulnerability, error) bool) { + for i := range vulns { + if !yield(vulns[i], nil) { + break + } + } + } + delVulns := func(yield func(string, error) bool) { + for _, s := range deletedVulns { + if !yield(s, nil) { + break + } + } + } + return s.updateVulnerabilities(ctx, updater, fingerprint, iterVulns, delVulns) } -func (s *MatcherStore) updateVulnerabilities(ctx context.Context, updater string, fingerprint driver.Fingerprint, vulns []*claircore.Vulnerability, deletedVulns []string, delta bool) (uuid.UUID, error) { +func (s *MatcherStore) updateVulnerabilities(ctx context.Context, updater string, fingerprint driver.Fingerprint, vulnIter datastore.VulnerabilityIter, delIter datastore.Iter[string]) (uuid.UUID, error) { const ( // Create makes a new update operation and returns the reference and ID. create = `INSERT INTO update_operation (updater, fingerprint, kind) VALUES ($1, $2, 'vulnerability') RETURNING id, ref;` @@ -139,6 +167,7 @@ func (s *MatcherStore) updateVulnerabilities(ctx context.Context, updater string return uuid.Nil, fmt.Errorf("failed to create update_operation: %w", err) } + delta := delIter != nil updateVulnerabilitiesCounter.WithLabelValues("create", strconv.FormatBool(delta)).Add(1) updateVulnerabilitiesDuration.WithLabelValues("create", strconv.FormatBool(delta)).Observe(time.Since(start).Seconds()) @@ -181,18 +210,20 @@ func (s *MatcherStore) updateVulnerabilities(ctx context.Context, updater string } if len(oldVulns) > 0 { - for _, v := range vulns { + vulnIter(func(v *claircore.Vulnerability, _ error) bool { // If we have an existing vuln in the new batch // delete it from the oldVulns map so it doesn't // get associated with the new update_operation. delete(oldVulns, v.Name) - } - for _, delName := range deletedVulns { + return true + }) + delIter(func(delName string, _ error) bool { // If we have an existing vuln that has been signaled // as deleted by the updater then delete it so it doesn't // get associated with the new update_operation. delete(oldVulns, delName) - } + return true + }) } start = time.Now() // Associate already existing vulnerabilities with new update_operation. @@ -211,14 +242,20 @@ func (s *MatcherStore) updateVulnerabilities(ctx context.Context, updater string // batch insert vulnerabilities skipCt := 0 - + vulnCt := 0 start = time.Now() mBatcher := microbatch.NewInsert(tx, 2000, time.Minute) - for _, vuln := range vulns { + + vulnIter(func(vuln *claircore.Vulnerability, iterErr error) bool { + if iterErr != nil { + err = iterErr + return false + } + vulnCt++ if vuln.Package == nil || vuln.Package.Name == "" { skipCt++ - continue + return true } pkg := vuln.Package @@ -233,7 +270,7 @@ func (s *MatcherStore) updateVulnerabilities(ctx context.Context, updater string hashKind, hash := md5Vuln(vuln) vKind, vrLower, vrUpper := rangefmt(vuln.Range) - err := mBatcher.Queue(ctx, insert, + err = mBatcher.Queue(ctx, insert, hashKind, hash, vuln.Name, vuln.Updater, vuln.Description, vuln.Issued, vuln.Links, vuln.Severity, vuln.NormalizedSeverity, pkg.Name, pkg.Version, pkg.Module, pkg.Arch, pkg.Kind, @@ -242,12 +279,20 @@ func (s *MatcherStore) updateVulnerabilities(ctx context.Context, updater string vuln.FixedInVersion, vuln.ArchOperation, vKind, vrLower, vrUpper, ) if err != nil { - return uuid.Nil, fmt.Errorf("failed to queue vulnerability: %w", err) + err = fmt.Errorf("failed to queue vulnerability: %w", err) + return false } - if err := mBatcher.Queue(ctx, assoc, hashKind, hash, uoID); err != nil { - return uuid.Nil, fmt.Errorf("failed to queue association: %w", err) + err = mBatcher.Queue(ctx, assoc, hashKind, hash, uoID) + if err != nil { + err = fmt.Errorf("failed to queue association: %w", err) + return false } + + return true + }) + if err != nil { + return uuid.Nil, fmt.Errorf("iterating on vulnerabilities: %w", err) } if err := mBatcher.Done(ctx); err != nil { return uuid.Nil, fmt.Errorf("failed to finish batch vulnerability insert: %w", err) @@ -266,7 +311,7 @@ func (s *MatcherStore) updateVulnerabilities(ctx context.Context, updater string zlog.Debug(ctx). Str("ref", ref.String()). Int("skipped", skipCt). - Int("inserted", len(vulns)-skipCt). + Int("inserted", vulnCt-skipCt). Msg("update_operation committed") return ref, nil } diff --git a/datastore/updater.go b/datastore/updater.go index 705286427..864b6c722 100644 --- a/datastore/updater.go +++ b/datastore/updater.go @@ -10,6 +10,9 @@ import ( "github.com/quay/claircore/libvuln/driver" ) +// VulnerabilityIter is an [Iter] of vulnerabilities. +type VulnerabilityIter Iter[*claircore.Vulnerability] + // Updater is an interface exporting the necessary methods // for updating a vulnerability database. type Updater interface { @@ -19,6 +22,9 @@ type Updater interface { // vulnerabilities, and ensures vulnerabilities from previous updates are // not queried by clients. UpdateVulnerabilities(ctx context.Context, updater string, fingerprint driver.Fingerprint, vulns []*claircore.Vulnerability) (uuid.UUID, error) + // UpdateVulnerabilitiesIter performs the same operation as + // UpdateVulnerabilities, but accepting an iterator function. + UpdateVulnerabilitiesIter(ctx context.Context, updater string, fingerprint driver.Fingerprint, vulnIter VulnerabilityIter) (uuid.UUID, error) // DeltaUpdateVulnerabilities creates a new UpdateOperation consisting of existing // vulnerabilities and new vulnerabilities. It also takes an array of deleted // vulnerability names which should no longer be available to query. From 0831b93675e4ae09d6da0e60d9a167fb77bd3062 Mon Sep 17 00:00:00 2001 From: "J. Victor Martins" Date: Fri, 19 Apr 2024 16:15:49 -0700 Subject: [PATCH 2/2] db: Add stream update support for enrichments Signed-off-by: J. Victor Martins --- datastore/enrichment.go | 6 +++++ datastore/postgres/enrichment.go | 46 ++++++++++++++++++++++++++------ libvuln/jsonblob/jsonblob.go | 10 +++++++ 3 files changed, 54 insertions(+), 8 deletions(-) diff --git a/datastore/enrichment.go b/datastore/enrichment.go index a11f376ea..b55164ac5 100644 --- a/datastore/enrichment.go +++ b/datastore/enrichment.go @@ -8,6 +8,9 @@ import ( "github.com/quay/claircore/libvuln/driver" ) +// EnrichmentIter is an [Iter] of enrichment records. +type EnrichmentIter Iter[*driver.EnrichmentRecord] + // EnrichmentUpdater is an interface exporting the necessary methods // for storing and querying Enrichments. type EnrichmentUpdater interface { @@ -15,6 +18,9 @@ type EnrichmentUpdater interface { // EnrichmentRecord(s), and ensures enrichments from previous updates are not // queries by clients. UpdateEnrichments(ctx context.Context, kind string, fingerprint driver.Fingerprint, enrichments []driver.EnrichmentRecord) (uuid.UUID, error) + // UpdateEnrichmentsIter performs the same operation as UpdateEnrichments, but + // accepting an iterator function. + UpdateEnrichmentsIter(ctx context.Context, kind string, fingerprint driver.Fingerprint, enIter EnrichmentIter) (uuid.UUID, error) } // Enrichment is an interface for querying enrichments from the store. diff --git a/datastore/postgres/enrichment.go b/datastore/postgres/enrichment.go index ba3ea8ecc..6fe5ecfcb 100644 --- a/datastore/postgres/enrichment.go +++ b/datastore/postgres/enrichment.go @@ -17,6 +17,7 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/quay/zlog" + "github.com/quay/claircore/datastore" "github.com/quay/claircore/libvuln/driver" "github.com/quay/claircore/pkg/microbatch" ) @@ -60,10 +61,27 @@ var ( ) ) +func (s *MatcherStore) UpdateEnrichmentsIter(ctx context.Context, updater string, fp driver.Fingerprint, it datastore.EnrichmentIter) (uuid.UUID, error) { + ctx = zlog.ContextWithValues(ctx, "component", "datastore/postgres/MatcherStore.UpdateEnrichmentsIter") + return s.updateEnrichments(ctx, updater, fp, it) +} + // UpdateEnrichments creates a new UpdateOperation, inserts the provided // EnrichmentRecord(s), and ensures enrichments from previous updates are not // queried by clients. -func (s *MatcherStore) UpdateEnrichments(ctx context.Context, name string, fp driver.Fingerprint, es []driver.EnrichmentRecord) (uuid.UUID, error) { +func (s *MatcherStore) UpdateEnrichments(ctx context.Context, updater string, fp driver.Fingerprint, es []driver.EnrichmentRecord) (uuid.UUID, error) { + ctx = zlog.ContextWithValues(ctx, "component", "datastore/postgres/MatcherStore.UpdateEnrichments") + enIter := func(yield func(record *driver.EnrichmentRecord, err error) bool) { + for i := range es { + if !yield(&es[i], nil) { + break + } + } + } + return s.updateEnrichments(ctx, updater, fp, enIter) +} + +func (s *MatcherStore) updateEnrichments(ctx context.Context, name string, fp driver.Fingerprint, it datastore.EnrichmentIter) (uuid.UUID, error) { const ( create = ` INSERT @@ -134,17 +152,29 @@ DO batch := microbatch.NewInsert(tx, 2000, time.Minute) start = time.Now() - for i := range es { - hashKind, hash := hashEnrichment(&es[i]) - err := batch.Queue(ctx, insert, - hashKind, hash, name, es[i].Tags, es[i].Enrichment, + enCt := 0 + it(func(en *driver.EnrichmentRecord, iterErr error) bool { + if iterErr != nil { + err = iterErr + return false + } + enCt++ + hashKind, hash := hashEnrichment(en) + err = batch.Queue(ctx, insert, + hashKind, hash, name, en.Tags, en.Enrichment, ) if err != nil { - return uuid.Nil, fmt.Errorf("failed to queue enrichment: %w", err) + err = fmt.Errorf("failed to queue enrichment: %w", err) + return false } if err := batch.Queue(ctx, assoc, hashKind, hash, name, id); err != nil { - return uuid.Nil, fmt.Errorf("failed to queue association: %w", err) + err = fmt.Errorf("failed to queue association: %w", err) + return false } + return true + }) + if err != nil { + return uuid.Nil, fmt.Errorf("iterating on enrichments: %w", err) } if err := batch.Done(ctx); err != nil { return uuid.Nil, fmt.Errorf("failed to finish batch enrichment insert: %w", err) @@ -160,7 +190,7 @@ DO } zlog.Debug(ctx). Stringer("ref", ref). - Int("inserted", len(es)). + Int("inserted", enCt). Msg("update_operation committed") return ref, nil } diff --git a/libvuln/jsonblob/jsonblob.go b/libvuln/jsonblob/jsonblob.go index a4e5a2213..5d80aff73 100644 --- a/libvuln/jsonblob/jsonblob.go +++ b/libvuln/jsonblob/jsonblob.go @@ -455,6 +455,16 @@ func (s *Store) DeltaUpdateVulnerabilities(ctx context.Context, updater string, return uuid.Nil, nil } +// UpdateEnrichmentsIter is unimplemented. +func (s *Store) UpdateEnrichmentsIter(_ context.Context, _ string, _ driver.Fingerprint, _ datastore.EnrichmentIter) (uuid.UUID, error) { + return uuid.Nil, errors.ErrUnsupported +} + +// UpdateVulnerabilitiesIter is unimplemented. +func (s *Store) UpdateVulnerabilitiesIter(_ context.Context, _ string, _ driver.Fingerprint, _ datastore.VulnerabilityIter) (uuid.UUID, error) { + return uuid.Nil, errors.ErrUnsupported +} + var bufPool sync.Pool func getBuf() []byte {