diff --git a/libvuln/jsonblob/jsonblob.go b/libvuln/jsonblob/jsonblob.go index a4e5a2213..446f19936 100644 --- a/libvuln/jsonblob/jsonblob.go +++ b/libvuln/jsonblob/jsonblob.go @@ -45,70 +45,97 @@ type Store struct { } // Load reads in all the records serialized in the provided [io.Reader]. -func Load(ctx context.Context, r io.Reader) (*Loader, error) { +func Load(_ context.Context, r io.Reader) (*Loader, error) { l := Loader{ dec: json.NewDecoder(r), - cur: uuid.Nil, } + l.de.Ref = uuid.Nil return &l, nil } -// Loader is an iterator that returns a series of [Entry]. +// Loader is an iterator that returns objects grouped by update operations from +// the [Store]. // -// Users should call [*Loader.Next] until it reports false, then check for -// errors via [*Loader.Err]. +// Users should either call [*Loader.Next] or [*Loader.NextIter], and if they +// report false, check for errors via [*Loader.Err]. +// +// 1. Use [*Loader.Next] to read all objects for a single update operation into +// an [Entry]. Use [*Loader.Entry] to read it. +// +// 2. Use [*Loader.NextIter] to iterate over each object for a single update +// operation. Use [*Loader.Iter] get the iterator (see [iter] for more). type Loader struct { err error e *Entry - dec *json.Decoder - next *Entry - de diskEntry - cur uuid.UUID + dec *json.Decoder + de diskEntry + cur uuid.UUID } -// Next reports whether there's an [Entry] to be processed. -func (l *Loader) Next() bool { +// iter wraps a disk entry with methods to read the next object for the same +// update operation. +// +// Users are expected to call [*iter.Next()] to read and parse the next object, +// which can be retrieved using [iter.Vulnerability] or [iter.Enrichment] based +// on the update kind. Errors are reported in the [Loader.Err]. +type iter struct { + *diskEntry + + // Copied when disk entry is first assigned. + Updater string + Fingerprint driver.Fingerprint + Kind driver.UpdateKind + + Vulnerability *claircore.Vulnerability + Enrichment *driver.EnrichmentRecord + + loader *Loader +} + +// NextIter reports if there is a new set of update operations to iterate on. If +// called in the middle of an iteration, all objects for the current operation +// are discarded. +func (l *Loader) NextIter() bool { if l.err != nil { return false } - - for l.err = l.dec.Decode(&l.de); l.err == nil; l.err = l.dec.Decode(&l.de) { - id := l.de.Ref - // If we just hit a new Entry, promote the current one. - if id != l.cur { - l.e = l.next - l.next = &Entry{} - l.next.Updater = l.de.Updater - l.next.Fingerprint = l.de.Fingerprint - l.next.Date = l.de.Date + if l.cur == l.de.Ref { + // Read until we are not on the same update operation. + for l.readNext() { } - switch l.de.Kind { + } + if l.err != nil { + return false + } + // Mark we are in a new update operation. + l.cur = l.de.Ref + return true +} + +// Next reports whether there's an [Entry] to be processed. +func (l *Loader) Next() bool { + if !l.NextIter() { + return false + } + e := Entry{} + it := l.Iter() + for it.Next() { + e.CommonEntry = it.CommonEntry + switch it.Kind { case driver.VulnerabilityKind: - vuln := claircore.Vulnerability{} - if err := json.Unmarshal(l.de.Vuln.buf, &vuln); err != nil { - l.err = err - return false - } - l.next.Vuln = append(l.next.Vuln, &vuln) + e.Vuln = append(e.Vuln, it.Vulnerability) case driver.EnrichmentKind: - en := driver.EnrichmentRecord{} - if err := json.Unmarshal(l.de.Enrichment.buf, &en); err != nil { - l.err = err - return false - } - l.next.Enrichment = append(l.next.Enrichment, en) - } - // If this was an initial diskEntry, promote the ref. - if id != l.cur { - l.cur = id - // If we have an Entry ready, report that. - if l.e != nil { - return true - } + e.Enrichment = append(e.Enrichment, *it.Enrichment) + default: + l.err = fmt.Errorf("unknown entry type: %s", it.Kind) + break } } - l.e = l.next + if l.Err() != nil { + return false + } + l.e = &e return true } @@ -117,6 +144,54 @@ func (l *Loader) Entry() *Entry { return l.e } +// Iter returns an iterator for read all objects for the current update operation. +func (l *Loader) Iter() *iter { + return &iter{ + Updater: l.de.Updater, + Fingerprint: l.de.Fingerprint, + Kind: l.de.Kind, + loader: l, + } +} + +// Next reports if there is a new object available in this iterator. +func (i *iter) Next() bool { + if i.loader.err != nil { + return false + } + if i.diskEntry != nil { + // Only need the next disk entry if not the first iteration. + if !i.loader.readNext() { + return false + } + } + i.diskEntry = &i.loader.de + switch i.Kind { + case driver.VulnerabilityKind: + vuln := claircore.Vulnerability{} + if err := json.Unmarshal(i.diskEntry.Vuln.buf, &vuln); err != nil { + i.loader.err = err + return false + } + i.Vulnerability = &vuln + case driver.EnrichmentKind: + en := driver.EnrichmentRecord{} + if err := json.Unmarshal(i.diskEntry.Enrichment.buf, &en); err != nil { + i.loader.err = err + return false + } + i.Enrichment = &en + } + return true +} + +// readNext will decode the next [diskEntry] from the store, and return true +// only if there were no errors and the entry belongs to the current update operation. +func (l *Loader) readNext() bool { + l.err = l.dec.Decode(&l.de) + return l.err == nil && l.cur == l.de.Ref +} + // Err is the latest encountered error. func (l *Loader) Err() error { // Don't report EOF as an error. @@ -441,18 +516,28 @@ func (s *Store) UpdateEnrichments(ctx context.Context, kind string, fp driver.Fi } // RecordUpdaterStatus is unimplemented. -func (s *Store) RecordUpdaterStatus(ctx context.Context, updaterName string, updateTime time.Time, fingerprint driver.Fingerprint, updaterError error) error { - return nil +func (s *Store) RecordUpdaterStatus(_ context.Context, _ string, _ time.Time, _ driver.Fingerprint, _ error) error { + return errors.ErrUnsupported } // RecordUpdaterSetStatus is unimplemented. -func (s *Store) RecordUpdaterSetStatus(ctx context.Context, updaterSet string, updateTime time.Time) error { - return nil +func (s *Store) RecordUpdaterSetStatus(_ context.Context, _ string, _ time.Time) error { + return errors.ErrUnsupported } // DeltaUpdateVulnerabilities is a noop -func (s *Store) DeltaUpdateVulnerabilities(ctx context.Context, updater string, fingerprint driver.Fingerprint, vulns []*claircore.Vulnerability, deleted []string) (uuid.UUID, error) { - return uuid.Nil, nil +func (s *Store) DeltaUpdateVulnerabilities(_ context.Context, _ string, _ driver.Fingerprint, _ []*claircore.Vulnerability, _ []string) (uuid.UUID, error) { + return uuid.Nil, errors.ErrUnsupported +} + +// UpdateEnrichmentsIter is unimplemented. +func (s *Store) UpdateEnrichmentsIter(_ context.Context, _ string, _ driver.Fingerprint, _ datastore.EnrichmentIter) (uuid.UUID, error) { + return uuid.Nil, errors.ErrUnsupported +} + +// UpdateVulnerabilitiesIter is unimplemented. +func (s *Store) UpdateVulnerabilitiesIter(_ context.Context, _ string, _ driver.Fingerprint, _ datastore.VulnerabilityIter) (uuid.UUID, error) { + return uuid.Nil, errors.ErrUnsupported } var bufPool sync.Pool diff --git a/libvuln/jsonblob/jsonblob_test.go b/libvuln/jsonblob/jsonblob_test.go index 141c6a7db..5bb385093 100644 --- a/libvuln/jsonblob/jsonblob_test.go +++ b/libvuln/jsonblob/jsonblob_test.go @@ -118,3 +118,72 @@ func TestEnrichments(t *testing.T) { } t.Logf("wrote:\n%s", buf.String()) } + +func TestNextIter(t *testing.T) { + ctx := context.Background() + a, err := New() + if err != nil { + t.Fatal(err) + } + + var want, got struct { + V map[string][]*claircore.Vulnerability + } + + want.V = make(map[string][]*claircore.Vulnerability) + got.V = make(map[string][]*claircore.Vulnerability) + + foo := test.GenUniqueVulnerabilities(10, "foo") + bar := test.GenUniqueVulnerabilities(10, "bar") + + ref, err := a.UpdateVulnerabilities(ctx, "foo", "", foo) + if err != nil { + t.Error(err) + } + t.Logf("ref: %v", ref) + + ref, err = a.UpdateVulnerabilities(ctx, "bar", "", bar) + if err != nil { + t.Error(err) + } + t.Logf("ref: %v", ref) + + // We will break the foo vulns at the 5th element. + const fooBreak = 5 + want.V["foo"] = foo[:fooBreak] + want.V["bar"] = bar + + var buf bytes.Buffer + defer func() { + t.Logf("wrote:\n%s", buf.String()) + }() + r, w := io.Pipe() + eg, ctx := errgroup.WithContext(ctx) + eg.Go(func() error { defer w.Close(); return a.Store(w) }) + eg.Go(func() error { + l, err := Load(ctx, io.TeeReader(r, &buf)) + if err != nil { + return err + } + for l.NextIter() { + it := l.Iter() + for i := 0; it.Next(); i++ { + if it.Updater == "foo" && i == fooBreak { + t.Logf("breaking foo at %d:\n%s", i, it.Vulnerability.Name) + break + } + got.V[it.Updater] = append(got.V[it.Updater], it.Vulnerability) + } + } + if err := l.Err(); err != nil { + return err + } + return nil + }) + if err := eg.Wait(); err != nil { + t.Error(err) + } + if !cmp.Equal(got, want) { + t.Error(cmp.Diff(got, want)) + } +}