Skip to content

Commit

Permalink
fix transformers, split packaging, better logs
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Goodman <[email protected]>
  • Loading branch information
wagoodman committed Dec 17, 2024
1 parent 5973c76 commit 06d7fe3
Show file tree
Hide file tree
Showing 14 changed files with 400 additions and 270 deletions.
10 changes: 3 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,17 @@ module github.com/anchore/grype-db

go 1.23.2

toolchain go1.23.4

require (
github.com/Masterminds/semver/v3 v3.3.1
github.com/OneOfOne/xxhash v1.2.8
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d
github.com/adrg/xdg v0.5.3
github.com/anchore/go-logger v0.0.0-20230725134548-c21dafa1ec5a
github.com/anchore/grype v0.86.1
github.com/anchore/syft v1.18.1
github.com/anchore/grype v0.86.2-0.20241216230527-69330e5f3d62
github.com/anchore/syft v1.18.2-0.20241216153735-397eb9c10acd
github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de
github.com/dave/jennifer v1.7.1
github.com/dustin/go-humanize v1.0.1
github.com/glebarez/sqlite v1.11.0
github.com/go-test/deep v1.1.1
github.com/google/go-cmp v0.6.0
Expand Down Expand Up @@ -108,7 +107,6 @@ require (
github.com/docker/go-events v0.0.0-20190806004212-e31b211e4f1c // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/dsnet/compress v0.0.2-0.20210315054119-f66993602bf5 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/edsrzf/mmap-go v1.1.0 // indirect
github.com/elliotchance/phpserialize v1.4.0 // indirect
github.com/emirpasic/gods v1.18.1 // indirect
Expand Down Expand Up @@ -246,7 +244,5 @@ require (
modernc.org/sqlite v1.34.2 // indirect
)

replace github.com/mholt/archiver/v3 v3.5.1 => github.com/anchore/archiver/v3 v3.5.2

// this is a breaking change, so we need to pin the version until glebarez/go-sqlite is updated to use internal/libc
replace modernc.org/sqlite v1.33.0 => modernc.org/sqlite v1.32.0
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -252,14 +252,14 @@ github.com/anchore/go-testutils v0.0.0-20200925183923-d5f45b0d3c04 h1:VzprUTpc0v
github.com/anchore/go-testutils v0.0.0-20200925183923-d5f45b0d3c04/go.mod h1:6dK64g27Qi1qGQZ67gFmBFvEHScy0/C8qhQhNe5B5pQ=
github.com/anchore/go-version v1.2.2-0.20210903204242-51efa5b487c4 h1:rmZG77uXgE+o2gozGEBoUMpX27lsku+xrMwlmBZJtbg=
github.com/anchore/go-version v1.2.2-0.20210903204242-51efa5b487c4/go.mod h1:Bkc+JYWjMCF8OyZ340IMSIi2Ebf3uwByOk6ho4wne1E=
github.com/anchore/grype v0.86.1 h1:HWpzCOCwjKkwkIEEC5lcKI4yl6GhTF3+Z12tXWYtMoI=
github.com/anchore/grype v0.86.1/go.mod h1:k3VnXfi+e/OGx1mTUL733gy3fyB4W/AdHP8fSyQML9w=
github.com/anchore/grype v0.86.2-0.20241216230527-69330e5f3d62 h1:eNZG5LS8tadVkk10YtuVxTKn1jU5nXnD1yH6eoaQaM8=
github.com/anchore/grype v0.86.2-0.20241216230527-69330e5f3d62/go.mod h1:k3VnXfi+e/OGx1mTUL733gy3fyB4W/AdHP8fSyQML9w=
github.com/anchore/packageurl-go v0.1.1-0.20241018175412-5c22e6360c4f h1:dAQPIrQ3a5PBqZeZ+B9NGZsGmodk4NO9OjDIsQmQyQM=
github.com/anchore/packageurl-go v0.1.1-0.20241018175412-5c22e6360c4f/go.mod h1:KoYIv7tdP5+CC9VGkeZV4/vGCKsY55VvoG+5dadg4YI=
github.com/anchore/stereoscope v0.0.11 h1:d+dePyWyQzoQehnWOnx/aISW5HW1zLAQKzvaFIpydsU=
github.com/anchore/stereoscope v0.0.11/go.mod h1:dxQyMHSdvgOCscQd/lInPHeP5xCJsZYxpzvzy8Y804Y=
github.com/anchore/syft v1.18.1 h1:JZ7CLbeWrWolCZa4f6SJBLJ9qGBLFCzHrFd8c4bsm94=
github.com/anchore/syft v1.18.1/go.mod h1:ufXPZcjmoTjERaC0HTEW2+chF+fQdryhaQ9arcUO2WQ=
github.com/anchore/syft v1.18.2-0.20241216153735-397eb9c10acd h1:11d0Pzp4Ysw1XxloRS6cHNDBWwqB3MSMzffgMYwFDUw=
github.com/anchore/syft v1.18.2-0.20241216153735-397eb9c10acd/go.mod h1:A8LH+VE33zk5efyBdo45/X9BdXEFrMvetwjMvPV+OFw=
github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8=
github.com/andybalholm/brotli v1.1.1 h1:PR2pgnyFznKEugtsUo0xLdDop5SKXd5Qf5ysW+7XdTA=
github.com/andybalholm/brotli v1.1.1/go.mod h1:05ib4cKhjx3OQYUY22hTVd34Bc8upXjOLL2rKwwZBoA=
Expand Down
2 changes: 2 additions & 0 deletions internal/tarutil/reader_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ func (t ReaderEntry) writeEntry(tw lowLevelWriter) error {
}

func writeEntry(tw lowLevelWriter, filename string, fileInfo os.FileInfo, opener func() (io.Reader, error)) error {
log.WithFields("path", filename).Trace("adding file to archive")

header, err := tar.FileInfoHeader(fileInfo, "")
if err != nil {
return err
Expand Down
80 changes: 74 additions & 6 deletions pkg/process/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"time"

"github.com/dustin/go-humanize"

"github.com/anchore/grype-db/internal/log"
"github.com/anchore/grype-db/pkg/data"
v3 "github.com/anchore/grype-db/pkg/process/v3"
Expand Down Expand Up @@ -104,17 +106,39 @@ func getWriter(schemaVersion int, dataAge time.Time, directory string, states pr

func build(results []providerResults, writer data.Writer, processors ...data.Processor) error {
lastUpdate := time.Now()
var totalRecords int
for _, result := range results {
totalRecords += int(result.count)
}
log.WithFields("total", humanize.Comma(int64(totalRecords))).Info("processing all records")

var recordsProcessed int

// for exponential moving average, choose an alpha between 0 and 1, where 1 biases towards the most recent sample
// and 0 biases towards the average of all samples.
rateWindow := newEMA(0.4)

for _, result := range results {
log.WithFields("provider", result.provider.Provider, "count", result.count).Info("processing provider records")
idx := 0
log.WithFields("provider", result.provider.Provider, "total", humanize.Comma(result.count)).Info("processing provider records")
providerRecordsProcessed := 0
recordsProcessedInStatusCycle := 0
for opener := range result.openers {
idx++
log.WithFields("entry", opener.String()).Tracef("processing")
providerRecordsProcessed++
recordsProcessed++
recordsProcessedInStatusCycle++
var processor data.Processor

if time.Since(lastUpdate) > 3*time.Second {
log.WithFields("provider", result.provider.Provider, "count", result.count, "processed", idx).Debug("processing provider records")
r := recordsPerSecond(recordsProcessedInStatusCycle, lastUpdate)
rateWindow.Add(r)

log.WithFields(
"provider", fmt.Sprintf("%q %1.0f/s (%1.2f%%)", result.provider.Provider, r, percent(providerRecordsProcessed, int(result.count))),
"overall", fmt.Sprintf("%1.2f%%", percent(recordsProcessed, totalRecords)),
"eta", eta(recordsProcessed, totalRecords, rateWindow.Average()).String(),
).Debug("status")
lastUpdate = time.Now()
recordsProcessedInStatusCycle = 0
}

f, err := opener.Open()
Expand All @@ -129,7 +153,6 @@ func build(results []providerResults, writer data.Writer, processors ...data.Pro
for _, candidate := range processors {
if candidate.IsSupported(envelope.Schema) {
processor = candidate
log.WithFields("schema", envelope.Schema).Trace("matched with processor")
break
}
}
Expand All @@ -153,3 +176,48 @@ func build(results []providerResults, writer data.Writer, processors ...data.Pro

return nil
}

type expMovingAverage struct {
alpha float64
value float64
count int
}

func newEMA(alpha float64) *expMovingAverage {
return &expMovingAverage{alpha: alpha}
}

func (e *expMovingAverage) Add(sample float64) {
if e.count == 0 {
e.value = sample // initialize with the first sample
} else {
e.value = e.alpha*sample + (1-e.alpha)*e.value
}
e.count++
}

func (e *expMovingAverage) Average() float64 {
return e.value
}

func recordsPerSecond(idx int, lastUpdate time.Time) float64 {
sec := time.Since(lastUpdate).Seconds()
if sec == 0 {
return 0
}
return float64(idx) / sec
}

func percent(idx, total int) float64 {
if total == 0 {
return 0
}
return float64(idx) / float64(total) * 100
}

func eta(idx, total int, rate float64) time.Duration {
if rate == 0 {
return 0
}
return time.Duration(float64(total-idx)/rate) * time.Second
}
153 changes: 66 additions & 87 deletions pkg/process/package.go
Original file line number Diff line number Diff line change
@@ -1,125 +1,126 @@
package process

import (
"errors"
"fmt"
"net/url"
"os"
"path"
"path/filepath"
"strings"
"time"

"github.com/scylladb/go-set/strset"
"github.com/spf13/afero"

"github.com/anchore/grype-db/internal/log"
"github.com/anchore/grype-db/internal/tarutil"
"github.com/anchore/grype-db/pkg/provider"
v6process "github.com/anchore/grype-db/pkg/process/v6"
grypeDBLegacyDistribution "github.com/anchore/grype/grype/db/legacy/distribution"
v6 "github.com/anchore/grype/grype/db/v6"
v6Distribution "github.com/anchore/grype/grype/db/v6/distribution"
grypeDBLegacy "github.com/anchore/grype/grype/db/v5"
grypeDBLegacyStore "github.com/anchore/grype/grype/db/v5/store"
)

// listingFiles is a set of files that should not be included in the archive
var listingFiles = strset.New("listing.json", "latest.json", "history.json")

func Package(dbDir, publishBaseURL, overrideArchiveExtension string) error {
// check if metadata file exists, if so, then this
if _, err := os.Stat(filepath.Join(dbDir, grypeDBLegacyDistribution.MetadataFileName)); os.IsNotExist(err) {
return packageDB(dbDir, overrideArchiveExtension)
// TODO: detect from disk which version of the DB is present
return v6process.CreateArchive(dbDir, overrideArchiveExtension)
}
return packageLegacyDB(dbDir, publishBaseURL, overrideArchiveExtension)
}

func packageDB(dbDir, overrideArchiveExtension string) error {
extension, err := resolveExtension(overrideArchiveExtension)
func packageLegacyDB(dbDir, publishBaseURL, overrideArchiveExtension string) error { //nolint:funlen
log.WithFields("from", dbDir, "url", publishBaseURL, "extension-override", overrideArchiveExtension).Info("packaging database")

fs := afero.NewOsFs()
metadata, err := grypeDBLegacyDistribution.NewMetadataFromDir(fs, dbDir)
if err != nil {
return err
}
log.WithFields("from", dbDir, "extension", extension).Info("packaging database")

s, err := v6.NewReader(v6.Config{DBDirPath: dbDir})
if metadata == nil {
return fmt.Errorf("no metadata found in %q", dbDir)
}

s, err := grypeDBLegacyStore.New(filepath.Join(dbDir, grypeDBLegacy.VulnerabilityStoreFileName), false)
if err != nil {
return fmt.Errorf("unable to open vulnerability store: %w", err)
}

metadata, err := s.GetDBMetadata()
if err != nil || metadata == nil {
return fmt.Errorf("unable to get vulnerability store metadata: %w", err)
id, err := s.GetID()
if err != nil {
return fmt.Errorf("unable to get vulnerability store ID: %w", err)
}

if metadata.Model != v6.ModelVersion {
return fmt.Errorf("metadata model %d does not match vulnerability store model %d", v6.ModelVersion, metadata.Model)
if id.SchemaVersion != metadata.Version {
return fmt.Errorf("metadata version %d does not match vulnerability store version %d", metadata.Version, id.SchemaVersion)
}

providerModels, err := s.AllProviders()
u, err := url.Parse(publishBaseURL)
if err != nil {
return fmt.Errorf("unable to get all providers: %w", err)
return err
}

if len(providerModels) == 0 {
return fmt.Errorf("no providers found in the vulnerability store")
// we need a well-ordered string to append to the archive name to ensure uniqueness (to avoid overwriting
// existing archives in the CDN) as well as to ensure that multiple archives created in the same day are
// put in the correct order in the listing file. The DB timestamp represents the age of the data in the DB
// not when the DB was created. The trailer represents the time the DB was packaged.
trailer := fmt.Sprintf("%d", secondsSinceEpoch())

var extension = "tar.gz"
if overrideArchiveExtension != "" {
extension = strings.TrimLeft(overrideArchiveExtension, ".")
}

eldest, err := toProviders(providerModels).EarliestTimestamp()
if err != nil {
return err
var found bool
for _, valid := range []string{"tar.zst", "tar.gz"} {
if valid == extension {
found = true
break
}
}

if !found {
return fmt.Errorf("invalid archive extension %q", extension)
}

// output archive vulnerability-db_VERSION_OLDESTDATADATE_BUILTEPOCH.tar.gz, where:
// - VERSION: schema version in the form of v#.#.#
// - OLDESTDATADATE: RFC3338 formatted value of the oldest date capture date found for all contained providers
// - BUILTEPOCH: linux epoch formatted value of the database metadata built field
// we attach a random value at the end of the file name to prevent from overwriting DBs in S3 that are already
// cached in the CDN. Ideally this would be based off of the archive checksum but a random string is simpler.
tarName := fmt.Sprintf(
"vulnerability-db_v%s_%s_%d.%s",
fmt.Sprintf("%d.%d.%d", metadata.Model, metadata.Revision, metadata.Addition),
eldest.UTC().Format(time.RFC3339),
metadata.BuildTimestamp.Unix(),
"vulnerability-db_v%d_%s_%s.%s",
metadata.Version,
metadata.Built.Format(time.RFC3339),
trailer,
extension,
)
tarPath := path.Join(dbDir, tarName)

tarPath := filepath.Join(dbDir, tarName)

if err := populateTar(tarPath); err != nil {
if err := populateLegacyTar(tarPath); err != nil {
return err
}

log.WithFields("path", tarPath).Info("created database archive")

return writeLatestDocument(tarPath, *metadata)
}

func toProviders(states []v6.Provider) provider.States {
var result provider.States
for _, state := range states {
result = append(result, provider.State{
Provider: state.ID,
Timestamp: *state.DateCaptured,
})
entry, err := grypeDBLegacyDistribution.NewListingEntryFromArchive(fs, *metadata, tarPath, u)
if err != nil {
return fmt.Errorf("unable to create listing entry from archive: %w", err)
}
return result
}

func resolveExtension(overrideArchiveExtension string) (string, error) {
var extension = "tar.zst"

if overrideArchiveExtension != "" {
extension = strings.TrimLeft(overrideArchiveExtension, ".")
listing := grypeDBLegacyDistribution.NewListing(entry)
listingPath := path.Join(dbDir, grypeDBLegacyDistribution.ListingFileName)
if err = listing.Write(listingPath); err != nil {
return err
}

var found bool
for _, valid := range []string{"tar.zst", "tar.xz", "tar.gz"} {
if valid == extension {
found = true
break
}
}
log.WithFields("path", listingPath).Debug("created initial listing file")

if !found {
return "", fmt.Errorf("unsupported archive extension %q", extension)
}
return extension, nil
return nil
}

var listingFiles = strset.New("listing.json", "latest.json", "history.json")

func populateTar(tarPath string) error {
func populateLegacyTar(tarPath string) error {
originalDir, err := os.Getwd()
if err != nil {
return fmt.Errorf("unable to get CWD: %w", err)
Expand Down Expand Up @@ -158,28 +159,6 @@ func populateTar(tarPath string) error {
return nil
}

func writeLatestDocument(tarPath string, metadata v6.DBMetadata) error {
archive, err := v6Distribution.NewArchive(tarPath, *metadata.BuildTimestamp, metadata.Model, metadata.Revision, metadata.Addition)
if err != nil || archive == nil {
return fmt.Errorf("unable to create archive: %w", err)
}

doc := v6Distribution.NewLatestDocument(*archive)
if doc == nil {
return errors.New("unable to create latest document")
}

dbDir := filepath.Dir(tarPath)

latestPath := path.Join(dbDir, v6Distribution.LatestFileName)

fh, err := os.OpenFile(latestPath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644)
if err != nil {
return fmt.Errorf("unable to create latest file: %w", err)
}

if err = doc.Write(fh); err != nil {
return fmt.Errorf("unable to write latest document: %w", err)
}
return nil
func secondsSinceEpoch() int64 {
return time.Now().UTC().Unix()
}
Loading

0 comments on commit 06d7fe3

Please sign in to comment.