Skip to content

Commit

Permalink
vex: fetcher add changes before archived data
Browse files Browse the repository at this point in the history
But switching how the VEX data is appended to the output spool we allow
the fetcher to ignore the files in the archive that have been changed.

Signed-off-by: crozzy <[email protected]>
  • Loading branch information
crozzy committed Sep 18, 2024
1 parent f96bfb4 commit 02b5747
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 58 deletions.
146 changes: 102 additions & 44 deletions rhel/vex/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"fmt"
"io"
"net/http"
"net/url"
"path"
"regexp"
"strconv"
Expand All @@ -31,6 +32,17 @@ var (
cvePathRegex = regexp.MustCompile(`^\d{4}/(cve-\d{4}-\d{4,}).json$`)
)

// Fetch pulls data down from the Red Hat VEX endpoints. The order of operations is:
// 1. Check if we need to process the entire archive of data. If yes:
// - Make a request to discover the latest archive endpoint.
// - Make a HEAD request to archive endpoint to get the last-modified header.
// - Save the last-modified time in the fingerprint's requestTime.
// 2. Process the changes.csv file, requesting and appending the entries that changed since the finderprint's requestTime.
// 3. Process the deletions.csv file, processing the entries that changed since the finderprint's requestTime.
// 4. If we need to process entire archive, request the archive data and append the entries that have not been changed or deleted.
//
// This helps to ensure that we only persist one copy of an advisory in the worst possible case. In most cases,
// after the initial load, the number of processed files should be very small.
func (u *Updater) Fetch(ctx context.Context, hint driver.Fingerprint) (io.ReadCloser, driver.Fingerprint, error) {
ctx = zlog.ContextWithValues(ctx, "component", "rhel/vex/Updater.Fetch")
fp, err := parseFingerprint(hint)
Expand Down Expand Up @@ -62,48 +74,46 @@ func (u *Updater) Fetch(ctx context.Context, hint driver.Fingerprint) (io.ReadCl
}
}()

var compressedURL *url.URL
// Is this the first run or has the updater changed since the last run?
if fp.changesEtag == "" || fp.version != updaterVersion {
processArchive := fp.changesEtag == "" || fp.version != updaterVersion
if processArchive {
// We need to go after the full corpus of vulnerabilities
// First we target the archive_latest.txt file
latestURI, err := u.url.Parse(latestFile)
// First we target the archive_latest.txt file.
var err error
compressedURL, err = u.getCompressedFileURL(ctx)
if err != nil {
return nil, hint, err
}
latestReq, err := http.NewRequestWithContext(ctx, http.MethodGet, latestURI.String(), nil)
if err != nil {
return nil, hint, err
}
latestRes, err := u.client.Do(latestReq)
if err != nil {
return nil, hint, err
}
defer latestRes.Body.Close()

err = httputil.CheckResponse(latestRes, http.StatusOK)
if err != nil {
return nil, hint, fmt.Errorf("unexpected response from archive_latest.txt: %w", err)
return nil, hint, fmt.Errorf("could not get compressed file URL: %w", err)

Check warning on line 86 in rhel/vex/fetcher.go

View check run for this annotation

Codecov / codecov/patch

rhel/vex/fetcher.go#L86

Added line #L86 was not covered by tests
}
zlog.Debug(ctx).
Str("url", compressedURL.String()).
Msg("got compressed URL")

body, err := io.ReadAll(latestRes.Body) // Fine to use as expecting small number of bytes.
fp.requestTime, err = u.getLastModified(ctx, compressedURL)
if err != nil {
return nil, hint, err
return nil, hint, fmt.Errorf("could not get last-modified header: %w", err)

Check warning on line 94 in rhel/vex/fetcher.go

View check run for this annotation

Codecov / codecov/patch

rhel/vex/fetcher.go#L94

Added line #L94 was not covered by tests
}
}

compressedFilename := string(body)
zlog.Debug(ctx).
Str("filename", compressedFilename).
Msg("requesting latest compressed file")
changed := map[string]bool{}
err = u.processChanges(ctx, cw, fp, changed)
if err != nil {
return nil, hint, err

Check warning on line 101 in rhel/vex/fetcher.go

View check run for this annotation

Codecov / codecov/patch

rhel/vex/fetcher.go#L101

Added line #L101 was not covered by tests
}

uri, err := u.url.Parse(compressedFilename)
if err != nil {
return nil, hint, err
}
err = u.processDeletions(ctx, cw, fp, changed)
if err != nil {
return nil, hint, err

Check warning on line 106 in rhel/vex/fetcher.go

View check run for this annotation

Codecov / codecov/patch

rhel/vex/fetcher.go#L106

Added line #L106 was not covered by tests
}

if processArchive {
rctx, cancel := context.WithTimeout(ctx, compressedFileTimeout)
defer cancel()

req, err := http.NewRequestWithContext(rctx, http.MethodGet, uri.String(), nil)
if compressedURL == nil {
return nil, hint, fmt.Errorf("compressed file URL needs to be populated")

Check warning on line 114 in rhel/vex/fetcher.go

View check run for this annotation

Codecov / codecov/patch

rhel/vex/fetcher.go#L114

Added line #L114 was not covered by tests
}
req, err := http.NewRequestWithContext(rctx, http.MethodGet, compressedURL.String(), nil)
if err != nil {
return nil, hint, err
}
Expand All @@ -119,11 +129,6 @@ func (u *Updater) Fetch(ctx context.Context, hint driver.Fingerprint) (io.ReadCl
return nil, hint, fmt.Errorf("unexpected response from latest compressed file: %w", err)
}

lm := res.Header.Get("last-modified")
fp.requestTime, err = time.Parse(http.TimeFormat, lm)
if err != nil {
return nil, hint, fmt.Errorf("could not parse last-modified header %s: %w", lm, err)
}
z, err := zreader.Reader(res.Body)
if err != nil {
return nil, hint, err
Expand All @@ -149,6 +154,10 @@ func (u *Updater) Fetch(ctx context.Context, hint driver.Fingerprint) (io.ReadCl
if year < lookBackToYear {
continue
}
if changed[path.Base(h.Name)] {
// We've already processed this file don't bother appending it to the output
continue
}
buf.Grow(int(h.Size))
if _, err := buf.ReadFrom(r); err != nil {
return nil, hint, err
Expand Down Expand Up @@ -176,26 +185,71 @@ func (u *Updater) Fetch(ctx context.Context, hint driver.Fingerprint) (io.ReadCl
Msg("finished writing compressed data to spool")
}

err = u.processChanges(ctx, cw, fp)
fp.version = updaterVersion
fp.requestTime = time.Now()
success = true
return f, driver.Fingerprint(fp.String()), nil
}

func (u *Updater) getCompressedFileURL(ctx context.Context) (*url.URL, error) {
latestURI, err := u.url.Parse(latestFile)
if err != nil {
return nil, err

Check warning on line 197 in rhel/vex/fetcher.go

View check run for this annotation

Codecov / codecov/patch

rhel/vex/fetcher.go#L197

Added line #L197 was not covered by tests
}
latestReq, err := http.NewRequestWithContext(ctx, http.MethodGet, latestURI.String(), nil)
if err != nil {
return nil, err

Check warning on line 201 in rhel/vex/fetcher.go

View check run for this annotation

Codecov / codecov/patch

rhel/vex/fetcher.go#L201

Added line #L201 was not covered by tests
}
latestRes, err := u.client.Do(latestReq)
if err != nil {
return nil, err

Check warning on line 205 in rhel/vex/fetcher.go

View check run for this annotation

Codecov / codecov/patch

rhel/vex/fetcher.go#L205

Added line #L205 was not covered by tests
}
defer latestRes.Body.Close()

err = httputil.CheckResponse(latestRes, http.StatusOK)
if err != nil {
return nil, hint, err
return nil, fmt.Errorf("unexpected response from archive_latest.txt: %w", err)

Check warning on line 211 in rhel/vex/fetcher.go

View check run for this annotation

Codecov / codecov/patch

rhel/vex/fetcher.go#L211

Added line #L211 was not covered by tests
}

err = u.processDeletions(ctx, cw, fp)
body, err := io.ReadAll(latestRes.Body) // Fine to use as expecting small number of bytes.
if err != nil {
return nil, hint, err
return nil, err

Check warning on line 216 in rhel/vex/fetcher.go

View check run for this annotation

Codecov / codecov/patch

rhel/vex/fetcher.go#L216

Added line #L216 was not covered by tests
}

fp.version = updaterVersion
fp.requestTime = time.Now()
success = true
return f, driver.Fingerprint(fp.String()), nil
compressedFilename := string(body)
compressedURL, err := u.url.Parse(compressedFilename)
if err != nil {
return nil, err

Check warning on line 222 in rhel/vex/fetcher.go

View check run for this annotation

Codecov / codecov/patch

rhel/vex/fetcher.go#L222

Added line #L222 was not covered by tests
}
return compressedURL, nil
}

func (u *Updater) getLastModified(ctx context.Context, cu *url.URL) (time.Time, error) {
var empty time.Time
req, err := http.NewRequestWithContext(ctx, http.MethodHead, cu.String(), nil)
if err != nil {
return empty, err

Check warning on line 231 in rhel/vex/fetcher.go

View check run for this annotation

Codecov / codecov/patch

rhel/vex/fetcher.go#L231

Added line #L231 was not covered by tests
}

res, err := u.client.Do(req)
if err != nil {
return empty, err

Check warning on line 236 in rhel/vex/fetcher.go

View check run for this annotation

Codecov / codecov/patch

rhel/vex/fetcher.go#L236

Added line #L236 was not covered by tests
}
defer res.Body.Close()

err = httputil.CheckResponse(res, http.StatusOK)
if err != nil {
return empty, fmt.Errorf("unexpected HEAD response from latest compressed file: %w", err)

Check warning on line 242 in rhel/vex/fetcher.go

View check run for this annotation

Codecov / codecov/patch

rhel/vex/fetcher.go#L242

Added line #L242 was not covered by tests
}

lm := res.Header.Get("last-modified")
return time.Parse(http.TimeFormat, lm)
}

// ProcessChanges deals with the published changes.csv, adding records
// to w means they are deemed to have changed since the compressed
// file was last processed. w and fp can be modified.
func (u *Updater) processChanges(ctx context.Context, w io.Writer, fp *fingerprint) error {
func (u *Updater) processChanges(ctx context.Context, w io.Writer, fp *fingerprint, changed map[string]bool) error {
tf, err := tmp.NewFile("", "rhel-vex-changes.")
if err != nil {
return err
Expand Down Expand Up @@ -271,6 +325,8 @@ func (u *Updater) processChanges(ctx context.Context, w io.Writer, fp *fingerpri
continue
}

changed[path.Base(cvePath)] = true

advisoryURI, err := u.url.Parse(cvePath)
if err != nil {
return err
Expand Down Expand Up @@ -323,7 +379,7 @@ func (u *Updater) processChanges(ctx context.Context, w io.Writer, fp *fingerpri
// ProcessDeletions deals with the published deletions.csv, adding records
// to w mean they are deemed to have been deleted since the last compressed
// file was last processed. w and fp can be modified.
func (u *Updater) processDeletions(ctx context.Context, w io.Writer, fp *fingerprint) error {
func (u *Updater) processDeletions(ctx context.Context, w io.Writer, fp *fingerprint, changed map[string]bool) error {
deletionURI, err := u.url.Parse(deletionsFile)
if err != nil {
return err
Expand Down Expand Up @@ -375,6 +431,8 @@ func (u *Updater) processDeletions(ctx context.Context, w io.Writer, fp *fingerp
if updatedTime.Before(fp.requestTime) {
continue
}
changed[path.Base(cvePath)] = true

deletedJSON, err := createDeletedJSON(cvePath)
if err != nil {
zlog.Warn(ctx).Err(err).Msg("error creating JSON object denoting deletion")
Expand Down
35 changes: 21 additions & 14 deletions rhel/vex/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/quay/zlog"
"golang.org/x/tools/txtar"

"github.com/quay/claircore/libvuln/driver"
"github.com/quay/claircore/toolkit/types/csaf"
)

Expand Down Expand Up @@ -43,17 +44,20 @@ func serveSecDB(t *testing.T, txtarFile string) (string, *http.Client) {
t.Fatal(err)
}
filename := filepath.Base(relFilepath)
mux.HandleFunc("/"+filename, func(w http.ResponseWriter, _ *http.Request) {
mux.HandleFunc("/"+filename, func(w http.ResponseWriter, r *http.Request) {
for k, v := range headers {
w.Header().Set(k, v[0])
}

f, err := os.Open("testdata/" + relFilepath)
if err != nil {
t.Fatal(err)
}
if _, err := io.Copy(w, f); err != nil {
t.Fatal(err)
switch r.Method {
case http.MethodHead:
case http.MethodGet:
f, err := os.Open("testdata/" + relFilepath)
if err != nil {
t.Fatal(err)
}
if _, err := io.Copy(w, f); err != nil {
t.Fatal(err)
}
}
})
for _, f := range archive.Files {
Expand Down Expand Up @@ -111,9 +115,12 @@ func TestFactory(t *testing.T) {
if f.changesEtag != "something" {
t.Errorf("bad etag for the changes.csv endpoint: %s", f.changesEtag)
}
if f.deletionsEtag != "somethingelse" {
t.Errorf("bad etag for the deletions.csv endpoint: %s", f.deletionsEtag)
}

// Check saved vulns
expectedLnCt := 8
expectedLnCt := 7
lnCt := 0
r := bufio.NewReader(snappy.NewReader(data))
for b, err := r.ReadBytes('\n'); err == nil; b, err = r.ReadBytes('\n') {
Expand All @@ -127,7 +134,7 @@ func TestFactory(t *testing.T) {
t.Errorf("got %d entries but expected %d", lnCt, expectedLnCt)
}

newData, newFP, err := s.Updaters()[0].Fetch(ctx, "")
newData, newFP, err := s.Updaters()[0].Fetch(ctx, driver.Fingerprint(f.String()))
if err != nil {
t.Fatalf("error re-Fetching, cannot continue: %v", err)
}
Expand All @@ -143,9 +150,9 @@ func TestFactory(t *testing.T) {
if f.deletionsEtag != "somethingelse" {
t.Errorf("bad etag for the deletions.csv endpoint: %s", f.deletionsEtag)
}
buf := &bytes.Buffer{}
sz, _ := newData.Read(buf.Bytes())
if sz != 0 {
t.Errorf("got too much data: %s", buf.String())

r = bufio.NewReader(snappy.NewReader(newData))
for _, err := r.ReadBytes('\n'); err == nil; _, err = r.ReadBytes('\n') {
t.Fatal("should not have anymore data")
}
}

0 comments on commit 02b5747

Please sign in to comment.