Skip to content

Commit

Permalink
fix: optimize import of product related entities during csaf ingestion
Browse files Browse the repository at this point in the history
  • Loading branch information
dejanb committed Dec 18, 2024
1 parent a98d468 commit 678284e
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 21 deletions.
3 changes: 2 additions & 1 deletion modules/ingestor/src/graph/product/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ impl Graph {
cpe_key: organization_cpe_key,
website: None,
};
let org = self.ingest_organization(vendor, org, connection).await?;
let org: OrganizationContext<'_> =
self.ingest_organization(vendor, org, connection).await?;

product::ActiveModel {
id: Default::default(),
Expand Down
99 changes: 79 additions & 20 deletions modules/ingestor/src/service/advisory/csaf/creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ use crate::{
graph::{
advisory::advisory_vulnerability::{Version, VersionInfo, VersionSpec},
cpe::CpeCreator,
product::ProductInformation,
organization::{OrganizationContext, OrganizationInformation},
purl::creator::PurlCreator,
Graph,
},
service::{
advisory::csaf::product_status::ProductStatus, advisory::csaf::util::ResolveProductIdCache,
advisory::csaf::{product_status::ProductStatus, util::ResolveProductIdCache},
Error,
},
};
Expand All @@ -18,7 +18,8 @@ use std::collections::{hash_map::Entry, HashMap, HashSet};
use tracing::instrument;
use trustify_common::{cpe::Cpe, db::chunk::EntityChunkedIter, purl::Purl};
use trustify_entity::{
product_status, purl_status, status, version_range, version_scheme::VersionScheme,
organization, product, product_status, product_version_range, purl_status, status,
version_range, version_scheme::VersionScheme,
};
use uuid::Uuid;

Expand Down Expand Up @@ -108,6 +109,11 @@ impl<'a> StatusCreator<'a> {
let mut purls = PurlCreator::new();
let mut cpes = CpeCreator::new();

let mut org_cache: HashMap<&String, organization::Model> = HashMap::new();
let mut products = Vec::new();
let mut version_ranges = Vec::new();
let mut product_version_ranges = Vec::new();

for product in &self.products {
// ensure a correct status, and get id
if let Entry::Vacant(entry) = checked.entry(product.status) {
Expand All @@ -120,24 +126,61 @@ impl<'a> StatusCreator<'a> {
))
})?;

// Ingest product
let pr = graph
.ingest_product(
&product.product,
ProductInformation {
vendor: product.vendor.clone(),
cpe: product.cpe.clone(),
},
connection,
)
.await?;
// There should be only a few organizations per document,
// so simple caching should work here.
// If we find examples where this is not a case, we can switch to
// batch ingesting of organizations as well.
let org_id = match &product.vendor {
Some(vendor) => match org_cache.get(vendor) {
Some(entry) => Some(entry.id),
None => {
let organization_cpe_key = product
.cpe
.clone()
.map(|cpe| cpe.vendor().as_ref().to_string());
let org = OrganizationInformation {
cpe_key: organization_cpe_key,
website: None,
};
let org: OrganizationContext<'_> =
graph.ingest_organization(vendor, org, connection).await?;
org_cache.entry(vendor).or_insert(org.organization.clone());
Some(org.organization.id)
}
},
None => None,
};

// Create all product entities for batch ingesting
let product_cpe_key = product
.cpe
.clone()
.map(|cpe| cpe.product().as_ref().to_string());

let product_entity = product::ActiveModel {
id: Set(Uuid::now_v7()),
name: Set(product.product.clone()),
vendor_id: Set(org_id),
cpe_key: Set(product_cpe_key),
};
products.push(product_entity.clone());

// Ingest product range
// Create all product ranges for batch ingesting
let product_version_range = match product.version {
Some(ref ver) => Some(
pr.ingest_product_version_range(ver.clone(), None, connection)
.await?,
),
Some(ref ver) => {
let mut version_range_entity = ver.clone().into_active_model();
version_range_entity.id = Set(Uuid::now_v7());
version_ranges.push(version_range_entity.clone());

let product_version_range_entity = product_version_range::ActiveModel {
id: Set(Uuid::now_v7()),
product_id: product_entity.id,
version_range_id: version_range_entity.id,
cpe_key: Set(None),
};
product_version_ranges.push(product_version_range_entity.clone());
Some(product_version_range_entity)
}
None => None,
};

Expand All @@ -156,7 +199,7 @@ impl<'a> StatusCreator<'a> {
for package in packages {
let base_product = product_status::ActiveModel {
id: Default::default(),
product_version_range_id: Set(range.id),
product_version_range_id: range.clone().id,
advisory_id: Set(self.advisory_id),
vulnerability_id: Set(self.vulnerability_id.clone()),
package: Set(package),
Expand Down Expand Up @@ -213,6 +256,22 @@ impl<'a> StatusCreator<'a> {

self.create_status(connection, checked).await?;

for batch in &products.chunked() {
product::Entity::insert_many(batch).exec(connection).await?;
}

for batch in &version_ranges.chunked() {
version_range::Entity::insert_many(batch)
.exec(connection)
.await?;
}

for batch in &product_version_ranges.chunked() {
product_version_range::Entity::insert_many(batch)
.exec(connection)
.await?;
}

for batch in &product_statuses.chunked() {
product_status::Entity::insert_many(batch)
.exec(connection)
Expand Down

0 comments on commit 678284e

Please sign in to comment.