From 5b8fd8abe2cc7ba5eab6301999b047d46a8932ef Mon Sep 17 00:00:00 2001 From: "J. Victor Martins" Date: Fri, 19 Apr 2024 16:15:49 -0700 Subject: [PATCH] db: Add stream update support for enrichments Signed-off-by: J. Victor Martins --- datastore/enrichment.go | 6 +++++ datastore/postgres/enrichment.go | 46 ++++++++++++++++++++++++++------ 2 files changed, 44 insertions(+), 8 deletions(-) diff --git a/datastore/enrichment.go b/datastore/enrichment.go index a11f376ea..b55164ac5 100644 --- a/datastore/enrichment.go +++ b/datastore/enrichment.go @@ -8,6 +8,9 @@ import ( "github.com/quay/claircore/libvuln/driver" ) +// EnrichmentIter is an [Iter] of enrichment records. +type EnrichmentIter Iter[*driver.EnrichmentRecord] + // EnrichmentUpdater is an interface exporting the necessary methods // for storing and querying Enrichments. type EnrichmentUpdater interface { @@ -15,6 +18,9 @@ type EnrichmentUpdater interface { // EnrichmentRecord(s), and ensures enrichments from previous updates are not // queries by clients. UpdateEnrichments(ctx context.Context, kind string, fingerprint driver.Fingerprint, enrichments []driver.EnrichmentRecord) (uuid.UUID, error) + // UpdateEnrichmentsIter performs the same operation as UpdateEnrichments, but + // accepting an iterator function. + UpdateEnrichmentsIter(ctx context.Context, kind string, fingerprint driver.Fingerprint, enIter EnrichmentIter) (uuid.UUID, error) } // Enrichment is an interface for querying enrichments from the store. diff --git a/datastore/postgres/enrichment.go b/datastore/postgres/enrichment.go index ba3ea8ecc..6fe5ecfcb 100644 --- a/datastore/postgres/enrichment.go +++ b/datastore/postgres/enrichment.go @@ -17,6 +17,7 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/quay/zlog" + "github.com/quay/claircore/datastore" "github.com/quay/claircore/libvuln/driver" "github.com/quay/claircore/pkg/microbatch" ) @@ -60,10 +61,27 @@ var ( ) ) +func (s *MatcherStore) UpdateEnrichmentsIter(ctx context.Context, updater string, fp driver.Fingerprint, it datastore.EnrichmentIter) (uuid.UUID, error) { + ctx = zlog.ContextWithValues(ctx, "component", "datastore/postgres/MatcherStore.UpdateEnrichmentsIter") + return s.updateEnrichments(ctx, updater, fp, it) +} + // UpdateEnrichments creates a new UpdateOperation, inserts the provided // EnrichmentRecord(s), and ensures enrichments from previous updates are not // queried by clients. -func (s *MatcherStore) UpdateEnrichments(ctx context.Context, name string, fp driver.Fingerprint, es []driver.EnrichmentRecord) (uuid.UUID, error) { +func (s *MatcherStore) UpdateEnrichments(ctx context.Context, updater string, fp driver.Fingerprint, es []driver.EnrichmentRecord) (uuid.UUID, error) { + ctx = zlog.ContextWithValues(ctx, "component", "datastore/postgres/MatcherStore.UpdateEnrichments") + enIter := func(yield func(record *driver.EnrichmentRecord, err error) bool) { + for i := range es { + if !yield(&es[i], nil) { + break + } + } + } + return s.updateEnrichments(ctx, updater, fp, enIter) +} + +func (s *MatcherStore) updateEnrichments(ctx context.Context, name string, fp driver.Fingerprint, it datastore.EnrichmentIter) (uuid.UUID, error) { const ( create = ` INSERT @@ -134,17 +152,29 @@ DO batch := microbatch.NewInsert(tx, 2000, time.Minute) start = time.Now() - for i := range es { - hashKind, hash := hashEnrichment(&es[i]) - err := batch.Queue(ctx, insert, - hashKind, hash, name, es[i].Tags, es[i].Enrichment, + enCt := 0 + it(func(en *driver.EnrichmentRecord, iterErr error) bool { + if iterErr != nil { + err = iterErr + return false + } + enCt++ + hashKind, hash := hashEnrichment(en) + err = batch.Queue(ctx, insert, + hashKind, hash, name, en.Tags, en.Enrichment, ) if err != nil { - return uuid.Nil, fmt.Errorf("failed to queue enrichment: %w", err) + err = fmt.Errorf("failed to queue enrichment: %w", err) + return false } if err := batch.Queue(ctx, assoc, hashKind, hash, name, id); err != nil { - return uuid.Nil, fmt.Errorf("failed to queue association: %w", err) + err = fmt.Errorf("failed to queue association: %w", err) + return false } + return true + }) + if err != nil { + return uuid.Nil, fmt.Errorf("iterating on enrichments: %w", err) } if err := batch.Done(ctx); err != nil { return uuid.Nil, fmt.Errorf("failed to finish batch enrichment insert: %w", err) @@ -160,7 +190,7 @@ DO } zlog.Debug(ctx). Stringer("ref", ref). - Int("inserted", len(es)). + Int("inserted", enCt). Msg("update_operation committed") return ref, nil }