Skip to content

Commit

Permalink
db: Add stream update support for vulns
Browse files Browse the repository at this point in the history
Signed-off-by: J. Victor Martins <[email protected]>
  • Loading branch information
jvdm committed Apr 26, 2024
1 parent e7a1909 commit 222e8d1
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 16 deletions.
6 changes: 6 additions & 0 deletions datastore/matcher_store.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
package datastore

// Iter is an iterator function that accepts a callback 'yield' to handle each
// iterator item. The consumer can signal the iterator to break or retry by
// returning an error. The iterator itself returns an error if the iteration
// cannot continue or was interrupted unexpectedly.
type Iter[T any] func(yield func(T, error) bool)

// MatcherStore aggregates all interface types
type MatcherStore interface {
Updater
Expand Down
77 changes: 61 additions & 16 deletions datastore/postgres/updatevulnerabilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/quay/zlog"

"github.com/quay/claircore"
"github.com/quay/claircore/datastore"
"github.com/quay/claircore/libvuln/driver"
"github.com/quay/claircore/pkg/microbatch"
)
Expand Down Expand Up @@ -45,14 +46,27 @@ var (
)
)

// UpdateVulnerabilitiesIter implements vulnstore.Updater.
func (s *MatcherStore) UpdateVulnerabilitiesIter(ctx context.Context, updater string, fp driver.Fingerprint, it datastore.VulnerabilityIter) (uuid.UUID, error) {
ctx = zlog.ContextWithValues(ctx, "component", "datastore/postgres/MatcherStore.UpdateVulnerabilitiesIter")
return s.updateVulnerabilities(ctx, updater, fp, it, nil)
}

// UpdateVulnerabilities implements vulnstore.Updater.
//
// It creates a new UpdateOperation for this update call, inserts the
// provided vulnerabilities and computes a diff comprising the removed
// and added vulnerabilities for this UpdateOperation.
func (s *MatcherStore) UpdateVulnerabilities(ctx context.Context, updater string, fingerprint driver.Fingerprint, vulns []*claircore.Vulnerability) (uuid.UUID, error) {
func (s *MatcherStore) UpdateVulnerabilities(ctx context.Context, updater string, fp driver.Fingerprint, vulns []*claircore.Vulnerability) (uuid.UUID, error) {
ctx = zlog.ContextWithValues(ctx, "component", "datastore/postgres/MatcherStore.UpdateVulnerabilities")
return s.updateVulnerabilities(ctx, updater, fingerprint, vulns, nil, false)
iterVulns := func(yield func(*claircore.Vulnerability, error) bool) {
for i := range vulns {
if !yield(vulns[i], nil) {
break
}
}
}
return s.updateVulnerabilities(ctx, updater, fp, iterVulns, nil)
}

// DeltaUpdateVulnerabilities implements vulnstore.Updater.
Expand All @@ -68,10 +82,24 @@ func (s *MatcherStore) UpdateVulnerabilities(ctx context.Context, updater string
// - Associate new vulnerabilities with new updateOperation
func (s *MatcherStore) DeltaUpdateVulnerabilities(ctx context.Context, updater string, fingerprint driver.Fingerprint, vulns []*claircore.Vulnerability, deletedVulns []string) (uuid.UUID, error) {
ctx = zlog.ContextWithValues(ctx, "component", "datastore/postgres/MatcherStore.DeltaUpdateVulnerabilities")
return s.updateVulnerabilities(ctx, updater, fingerprint, vulns, deletedVulns, true)
iterVulns := func(yield func(*claircore.Vulnerability, error) bool) {
for i := range vulns {
if !yield(vulns[i], nil) {
break
}
}
}
delVulns := func(yield func(string, error) bool) {
for _, s := range deletedVulns {
if !yield(s, nil) {
break
}
}
}
return s.updateVulnerabilities(ctx, updater, fingerprint, iterVulns, delVulns)
}

func (s *MatcherStore) updateVulnerabilities(ctx context.Context, updater string, fingerprint driver.Fingerprint, vulns []*claircore.Vulnerability, deletedVulns []string, delta bool) (uuid.UUID, error) {
func (s *MatcherStore) updateVulnerabilities(ctx context.Context, updater string, fingerprint driver.Fingerprint, vulnIter datastore.VulnerabilityIter, delIter datastore.Iter[string]) (uuid.UUID, error) {
const (
// Create makes a new update operation and returns the reference and ID.
create = `INSERT INTO update_operation (updater, fingerprint, kind) VALUES ($1, $2, 'vulnerability') RETURNING id, ref;`
Expand Down Expand Up @@ -139,6 +167,7 @@ func (s *MatcherStore) updateVulnerabilities(ctx context.Context, updater string
return uuid.Nil, fmt.Errorf("failed to create update_operation: %w", err)
}

delta := delIter != nil
updateVulnerabilitiesCounter.WithLabelValues("create", strconv.FormatBool(delta)).Add(1)
updateVulnerabilitiesDuration.WithLabelValues("create", strconv.FormatBool(delta)).Observe(time.Since(start).Seconds())

Expand Down Expand Up @@ -181,18 +210,20 @@ func (s *MatcherStore) updateVulnerabilities(ctx context.Context, updater string
}

if len(oldVulns) > 0 {
for _, v := range vulns {
vulnIter(func(v *claircore.Vulnerability, _ error) bool {
// If we have an existing vuln in the new batch
// delete it from the oldVulns map so it doesn't
// get associated with the new update_operation.
delete(oldVulns, v.Name)
}
for _, delName := range deletedVulns {
return true
})
delIter(func(delName string, _ error) bool {
// If we have an existing vuln that has been signaled
// as deleted by the updater then delete it so it doesn't
// get associated with the new update_operation.
delete(oldVulns, delName)
}
return true
})
}
start = time.Now()
// Associate already existing vulnerabilities with new update_operation.
Expand All @@ -211,14 +242,20 @@ func (s *MatcherStore) updateVulnerabilities(ctx context.Context, updater string

// batch insert vulnerabilities
skipCt := 0

vulnCt := 0
start = time.Now()

mBatcher := microbatch.NewInsert(tx, 2000, time.Minute)
for _, vuln := range vulns {

vulnIter(func(vuln *claircore.Vulnerability, iterErr error) bool {
if iterErr != nil {
err = iterErr
return false
}
vulnCt++
if vuln.Package == nil || vuln.Package.Name == "" {
skipCt++
continue
return true
}

pkg := vuln.Package
Expand All @@ -233,7 +270,7 @@ func (s *MatcherStore) updateVulnerabilities(ctx context.Context, updater string
hashKind, hash := md5Vuln(vuln)
vKind, vrLower, vrUpper := rangefmt(vuln.Range)

err := mBatcher.Queue(ctx, insert,
err = mBatcher.Queue(ctx, insert,
hashKind, hash,
vuln.Name, vuln.Updater, vuln.Description, vuln.Issued, vuln.Links, vuln.Severity, vuln.NormalizedSeverity,
pkg.Name, pkg.Version, pkg.Module, pkg.Arch, pkg.Kind,
Expand All @@ -242,12 +279,20 @@ func (s *MatcherStore) updateVulnerabilities(ctx context.Context, updater string
vuln.FixedInVersion, vuln.ArchOperation, vKind, vrLower, vrUpper,
)
if err != nil {
return uuid.Nil, fmt.Errorf("failed to queue vulnerability: %w", err)
err = fmt.Errorf("failed to queue vulnerability: %w", err)
return false
}

if err := mBatcher.Queue(ctx, assoc, hashKind, hash, uoID); err != nil {
return uuid.Nil, fmt.Errorf("failed to queue association: %w", err)
err = mBatcher.Queue(ctx, assoc, hashKind, hash, uoID)
if err != nil {
err = fmt.Errorf("failed to queue association: %w", err)
return false
}

return true
})
if err != nil {
return uuid.Nil, fmt.Errorf("iterating on vulnerabilities: %w", err)
}
if err := mBatcher.Done(ctx); err != nil {
return uuid.Nil, fmt.Errorf("failed to finish batch vulnerability insert: %w", err)
Expand All @@ -266,7 +311,7 @@ func (s *MatcherStore) updateVulnerabilities(ctx context.Context, updater string
zlog.Debug(ctx).
Str("ref", ref.String()).
Int("skipped", skipCt).
Int("inserted", len(vulns)-skipCt).
Int("inserted", vulnCt-skipCt).
Msg("update_operation committed")
return ref, nil
}
Expand Down
6 changes: 6 additions & 0 deletions datastore/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import (
"github.com/quay/claircore/libvuln/driver"
)

// VulnerabilityIter is an [Iter] of vulnerabilities.
type VulnerabilityIter Iter[*claircore.Vulnerability]

// Updater is an interface exporting the necessary methods
// for updating a vulnerability database.
type Updater interface {
Expand All @@ -19,6 +22,9 @@ type Updater interface {
// vulnerabilities, and ensures vulnerabilities from previous updates are
// not queried by clients.
UpdateVulnerabilities(ctx context.Context, updater string, fingerprint driver.Fingerprint, vulns []*claircore.Vulnerability) (uuid.UUID, error)
// UpdateVulnerabilitiesIter performs the same operation as
// UpdateVulnerabilities, but accepting an iterator function.
UpdateVulnerabilitiesIter(ctx context.Context, updater string, fingerprint driver.Fingerprint, vulnIter VulnerabilityIter) (uuid.UUID, error)
// DeltaUpdateVulnerabilities creates a new UpdateOperation consisting of existing
// vulnerabilities and new vulnerabilities. It also takes an array of deleted
// vulnerability names which should no longer be available to query.
Expand Down

0 comments on commit 222e8d1

Please sign in to comment.