Skip to content

Commit

Permalink
db: Add stream update support for enrichments
Browse files Browse the repository at this point in the history
Signed-off-by: J. Victor Martins <[email protected]>
  • Loading branch information
jvdm committed Apr 26, 2024
1 parent 222e8d1 commit 5b8fd8a
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 8 deletions.
6 changes: 6 additions & 0 deletions datastore/enrichment.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,19 @@ import (
"github.com/quay/claircore/libvuln/driver"
)

// EnrichmentIter is an [Iter] of enrichment records.
type EnrichmentIter Iter[*driver.EnrichmentRecord]

// EnrichmentUpdater is an interface exporting the necessary methods
// for storing and querying Enrichments.
type EnrichmentUpdater interface {
// UpdateEnrichments creates a new EnrichmentUpdateOperation, inserts the provided
// EnrichmentRecord(s), and ensures enrichments from previous updates are not
// queries by clients.
UpdateEnrichments(ctx context.Context, kind string, fingerprint driver.Fingerprint, enrichments []driver.EnrichmentRecord) (uuid.UUID, error)
// UpdateEnrichmentsIter performs the same operation as UpdateEnrichments, but
// accepting an iterator function.
UpdateEnrichmentsIter(ctx context.Context, kind string, fingerprint driver.Fingerprint, enIter EnrichmentIter) (uuid.UUID, error)
}

// Enrichment is an interface for querying enrichments from the store.
Expand Down
46 changes: 38 additions & 8 deletions datastore/postgres/enrichment.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/quay/zlog"

"github.com/quay/claircore/datastore"
"github.com/quay/claircore/libvuln/driver"
"github.com/quay/claircore/pkg/microbatch"
)
Expand Down Expand Up @@ -60,10 +61,27 @@ var (
)
)

func (s *MatcherStore) UpdateEnrichmentsIter(ctx context.Context, updater string, fp driver.Fingerprint, it datastore.EnrichmentIter) (uuid.UUID, error) {
ctx = zlog.ContextWithValues(ctx, "component", "datastore/postgres/MatcherStore.UpdateEnrichmentsIter")
return s.updateEnrichments(ctx, updater, fp, it)
}

// UpdateEnrichments creates a new UpdateOperation, inserts the provided
// EnrichmentRecord(s), and ensures enrichments from previous updates are not
// queried by clients.
func (s *MatcherStore) UpdateEnrichments(ctx context.Context, name string, fp driver.Fingerprint, es []driver.EnrichmentRecord) (uuid.UUID, error) {
func (s *MatcherStore) UpdateEnrichments(ctx context.Context, updater string, fp driver.Fingerprint, es []driver.EnrichmentRecord) (uuid.UUID, error) {
ctx = zlog.ContextWithValues(ctx, "component", "datastore/postgres/MatcherStore.UpdateEnrichments")
enIter := func(yield func(record *driver.EnrichmentRecord, err error) bool) {
for i := range es {
if !yield(&es[i], nil) {
break
}
}
}
return s.updateEnrichments(ctx, updater, fp, enIter)
}

func (s *MatcherStore) updateEnrichments(ctx context.Context, name string, fp driver.Fingerprint, it datastore.EnrichmentIter) (uuid.UUID, error) {
const (
create = `
INSERT
Expand Down Expand Up @@ -134,17 +152,29 @@ DO

batch := microbatch.NewInsert(tx, 2000, time.Minute)
start = time.Now()
for i := range es {
hashKind, hash := hashEnrichment(&es[i])
err := batch.Queue(ctx, insert,
hashKind, hash, name, es[i].Tags, es[i].Enrichment,
enCt := 0
it(func(en *driver.EnrichmentRecord, iterErr error) bool {
if iterErr != nil {
err = iterErr
return false
}
enCt++
hashKind, hash := hashEnrichment(en)
err = batch.Queue(ctx, insert,
hashKind, hash, name, en.Tags, en.Enrichment,
)
if err != nil {
return uuid.Nil, fmt.Errorf("failed to queue enrichment: %w", err)
err = fmt.Errorf("failed to queue enrichment: %w", err)
return false
}
if err := batch.Queue(ctx, assoc, hashKind, hash, name, id); err != nil {
return uuid.Nil, fmt.Errorf("failed to queue association: %w", err)
err = fmt.Errorf("failed to queue association: %w", err)
return false
}
return true
})
if err != nil {
return uuid.Nil, fmt.Errorf("iterating on enrichments: %w", err)
}
if err := batch.Done(ctx); err != nil {
return uuid.Nil, fmt.Errorf("failed to finish batch enrichment insert: %w", err)
Expand All @@ -160,7 +190,7 @@ DO
}
zlog.Debug(ctx).
Stringer("ref", ref).
Int("inserted", len(es)).
Int("inserted", enCt).
Msg("update_operation committed")
return ref, nil
}
Expand Down

0 comments on commit 5b8fd8a

Please sign in to comment.