-
Notifications
You must be signed in to change notification settings - Fork 86
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
datastore: Add vuln & enrich stream updates #1315
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,6 +15,7 @@ | |
"github.com/quay/zlog" | ||
|
||
"github.com/quay/claircore" | ||
"github.com/quay/claircore/datastore" | ||
"github.com/quay/claircore/libvuln/driver" | ||
"github.com/quay/claircore/pkg/microbatch" | ||
) | ||
|
@@ -45,14 +46,27 @@ | |
) | ||
) | ||
|
||
// UpdateVulnerabilitiesIter implements vulnstore.Updater. | ||
func (s *MatcherStore) UpdateVulnerabilitiesIter(ctx context.Context, updater string, fp driver.Fingerprint, it datastore.VulnerabilityIter) (uuid.UUID, error) { | ||
ctx = zlog.ContextWithValues(ctx, "component", "datastore/postgres/MatcherStore.UpdateVulnerabilitiesIter") | ||
return s.updateVulnerabilities(ctx, updater, fp, it, nil) | ||
} | ||
|
||
// UpdateVulnerabilities implements vulnstore.Updater. | ||
// | ||
// It creates a new UpdateOperation for this update call, inserts the | ||
// provided vulnerabilities and computes a diff comprising the removed | ||
// and added vulnerabilities for this UpdateOperation. | ||
func (s *MatcherStore) UpdateVulnerabilities(ctx context.Context, updater string, fingerprint driver.Fingerprint, vulns []*claircore.Vulnerability) (uuid.UUID, error) { | ||
func (s *MatcherStore) UpdateVulnerabilities(ctx context.Context, updater string, fp driver.Fingerprint, vulns []*claircore.Vulnerability) (uuid.UUID, error) { | ||
ctx = zlog.ContextWithValues(ctx, "component", "datastore/postgres/MatcherStore.UpdateVulnerabilities") | ||
return s.updateVulnerabilities(ctx, updater, fingerprint, vulns, nil, false) | ||
iterVulns := func(yield func(*claircore.Vulnerability, error) bool) { | ||
for i := range vulns { | ||
if !yield(vulns[i], nil) { | ||
break | ||
} | ||
} | ||
} | ||
return s.updateVulnerabilities(ctx, updater, fp, iterVulns, nil) | ||
} | ||
|
||
// DeltaUpdateVulnerabilities implements vulnstore.Updater. | ||
|
@@ -68,10 +82,24 @@ | |
// - Associate new vulnerabilities with new updateOperation | ||
func (s *MatcherStore) DeltaUpdateVulnerabilities(ctx context.Context, updater string, fingerprint driver.Fingerprint, vulns []*claircore.Vulnerability, deletedVulns []string) (uuid.UUID, error) { | ||
ctx = zlog.ContextWithValues(ctx, "component", "datastore/postgres/MatcherStore.DeltaUpdateVulnerabilities") | ||
return s.updateVulnerabilities(ctx, updater, fingerprint, vulns, deletedVulns, true) | ||
iterVulns := func(yield func(*claircore.Vulnerability, error) bool) { | ||
for i := range vulns { | ||
if !yield(vulns[i], nil) { | ||
break | ||
} | ||
} | ||
} | ||
delVulns := func(yield func(string, error) bool) { | ||
for _, s := range deletedVulns { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. other implementations above used the index |
||
if !yield(s, nil) { | ||
break | ||
} | ||
} | ||
} | ||
return s.updateVulnerabilities(ctx, updater, fingerprint, iterVulns, delVulns) | ||
} | ||
|
||
func (s *MatcherStore) updateVulnerabilities(ctx context.Context, updater string, fingerprint driver.Fingerprint, vulns []*claircore.Vulnerability, deletedVulns []string, delta bool) (uuid.UUID, error) { | ||
func (s *MatcherStore) updateVulnerabilities(ctx context.Context, updater string, fingerprint driver.Fingerprint, vulnIter datastore.VulnerabilityIter, delIter datastore.Iter[string]) (uuid.UUID, error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
const ( | ||
// Create makes a new update operation and returns the reference and ID. | ||
create = `INSERT INTO update_operation (updater, fingerprint, kind) VALUES ($1, $2, 'vulnerability') RETURNING id, ref;` | ||
|
@@ -139,6 +167,7 @@ | |
return uuid.Nil, fmt.Errorf("failed to create update_operation: %w", err) | ||
} | ||
|
||
delta := delIter != nil | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this used? |
||
updateVulnerabilitiesCounter.WithLabelValues("create", strconv.FormatBool(delta)).Add(1) | ||
updateVulnerabilitiesDuration.WithLabelValues("create", strconv.FormatBool(delta)).Observe(time.Since(start).Seconds()) | ||
|
||
|
@@ -181,18 +210,20 @@ | |
} | ||
|
||
if len(oldVulns) > 0 { | ||
for _, v := range vulns { | ||
vulnIter(func(v *claircore.Vulnerability, _ error) bool { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should this and |
||
// If we have an existing vuln in the new batch | ||
// delete it from the oldVulns map so it doesn't | ||
// get associated with the new update_operation. | ||
delete(oldVulns, v.Name) | ||
} | ||
for _, delName := range deletedVulns { | ||
return true | ||
}) | ||
delIter(func(delName string, _ error) bool { | ||
// If we have an existing vuln that has been signaled | ||
// as deleted by the updater then delete it so it doesn't | ||
// get associated with the new update_operation. | ||
delete(oldVulns, delName) | ||
} | ||
return true | ||
}) | ||
} | ||
start = time.Now() | ||
// Associate already existing vulnerabilities with new update_operation. | ||
|
@@ -211,14 +242,20 @@ | |
|
||
// batch insert vulnerabilities | ||
skipCt := 0 | ||
|
||
vulnCt := 0 | ||
start = time.Now() | ||
|
||
mBatcher := microbatch.NewInsert(tx, 2000, time.Minute) | ||
for _, vuln := range vulns { | ||
|
||
vulnIter(func(vuln *claircore.Vulnerability, iterErr error) bool { | ||
if iterErr != nil { | ||
err = iterErr | ||
return false | ||
} | ||
vulnCt++ | ||
if vuln.Package == nil || vuln.Package.Name == "" { | ||
skipCt++ | ||
continue | ||
return true | ||
} | ||
|
||
pkg := vuln.Package | ||
|
@@ -233,7 +270,7 @@ | |
hashKind, hash := md5Vuln(vuln) | ||
vKind, vrLower, vrUpper := rangefmt(vuln.Range) | ||
|
||
err := mBatcher.Queue(ctx, insert, | ||
err = mBatcher.Queue(ctx, insert, | ||
hashKind, hash, | ||
vuln.Name, vuln.Updater, vuln.Description, vuln.Issued, vuln.Links, vuln.Severity, vuln.NormalizedSeverity, | ||
pkg.Name, pkg.Version, pkg.Module, pkg.Arch, pkg.Kind, | ||
|
@@ -242,12 +279,20 @@ | |
vuln.FixedInVersion, vuln.ArchOperation, vKind, vrLower, vrUpper, | ||
) | ||
if err != nil { | ||
return uuid.Nil, fmt.Errorf("failed to queue vulnerability: %w", err) | ||
err = fmt.Errorf("failed to queue vulnerability: %w", err) | ||
return false | ||
} | ||
|
||
if err := mBatcher.Queue(ctx, assoc, hashKind, hash, uoID); err != nil { | ||
return uuid.Nil, fmt.Errorf("failed to queue association: %w", err) | ||
err = mBatcher.Queue(ctx, assoc, hashKind, hash, uoID) | ||
if err != nil { | ||
err = fmt.Errorf("failed to queue association: %w", err) | ||
return false | ||
} | ||
|
||
return true | ||
}) | ||
if err != nil { | ||
return uuid.Nil, fmt.Errorf("iterating on vulnerabilities: %w", err) | ||
} | ||
if err := mBatcher.Done(ctx); err != nil { | ||
return uuid.Nil, fmt.Errorf("failed to finish batch vulnerability insert: %w", err) | ||
|
@@ -266,7 +311,7 @@ | |
zlog.Debug(ctx). | ||
Str("ref", ref.String()). | ||
Int("skipped", skipCt). | ||
Int("inserted", len(vulns)-skipCt). | ||
Int("inserted", vulnCt-skipCt). | ||
Msg("update_operation committed") | ||
return ref, nil | ||
} | ||
|
Original file line number | Diff line number | Diff line change | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -10,6 +10,9 @@ import ( | |||||||||||||||||
"github.com/quay/claircore/libvuln/driver" | ||||||||||||||||||
) | ||||||||||||||||||
|
||||||||||||||||||
// VulnerabilityIter is an [Iter] of vulnerabilities. | ||||||||||||||||||
type VulnerabilityIter Iter[*claircore.Vulnerability] | ||||||||||||||||||
|
||||||||||||||||||
// Updater is an interface exporting the necessary methods | ||||||||||||||||||
// for updating a vulnerability database. | ||||||||||||||||||
type Updater interface { | ||||||||||||||||||
|
@@ -19,6 +22,9 @@ type Updater interface { | |||||||||||||||||
// vulnerabilities, and ensures vulnerabilities from previous updates are | ||||||||||||||||||
// not queried by clients. | ||||||||||||||||||
UpdateVulnerabilities(ctx context.Context, updater string, fingerprint driver.Fingerprint, vulns []*claircore.Vulnerability) (uuid.UUID, error) | ||||||||||||||||||
// UpdateVulnerabilitiesIter performs the same operation as | ||||||||||||||||||
// UpdateVulnerabilities, but accepting an iterator function. | ||||||||||||||||||
UpdateVulnerabilitiesIter(ctx context.Context, updater string, fingerprint driver.Fingerprint, vulnIter VulnerabilityIter) (uuid.UUID, error) | ||||||||||||||||||
Comment on lines
+25
to
+27
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
That's not true for |
||||||||||||||||||
// DeltaUpdateVulnerabilities creates a new UpdateOperation consisting of existing | ||||||||||||||||||
// vulnerabilities and new vulnerabilities. It also takes an array of deleted | ||||||||||||||||||
// vulnerability names which should no longer be available to query. | ||||||||||||||||||
|
jvdm marked this conversation as resolved.
Show resolved
Hide resolved
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍