Skip to content

Commit

Permalink
Feat: Support DFIQ objects (#987)
Browse files Browse the repository at this point in the history
  • Loading branch information
tomchop authored Feb 2, 2024
1 parent f54597d commit e2ade76
Show file tree
Hide file tree
Showing 16 changed files with 1,384 additions and 302 deletions.
30 changes: 23 additions & 7 deletions core/database_arango.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,18 @@ def connect(
self.graph("threat_graph"),
{
"edge_collection": "links",
"from_vertex_collections": ["observables", "entities", "indicators"],
"to_vertex_collections": ["observables", "entities", "indicators"],
"from_vertex_collections": [
"observables",
"entities",
"indicators",
"dfiq",
],
"to_vertex_collections": [
"observables",
"entities",
"indicators",
"dfiq",
],
},
)
self.db.collection("observables").add_persistent_index(
Expand All @@ -99,6 +109,9 @@ def connect(
self.db.collection("indicators").add_persistent_index(
fields=["name", "type"], unique=True
)
self.db.collection("dfiq").add_persistent_index(
fields=["name", "type"], unique=True
)

def clear(self, truncate=True):
if not self.db:
Expand Down Expand Up @@ -143,7 +156,7 @@ def create_edge_definition(self, graph, definition):
if not self.db.has_collection(definition["edge_collection"]):
collection = graph.create_edge_definition(**definition)
else:
collection = graph.edge_collection(definition["edge_collection"])
collection = graph.replace_edge_definition(**definition)

self.collections[definition["edge_collection"]] = collection
return collection
Expand Down Expand Up @@ -274,7 +287,7 @@ def find(cls: Type[TYetiObject], **kwargs) -> TYetiObject | None:
"""Fetches a single object by value.
Args:
value: The value to search for.
**kwargs: Keyword arguments that will be matched to the document.
Returns:
A Yeti object.
Expand Down Expand Up @@ -538,8 +551,10 @@ def neighbors(
raw: Whether to return a raw dictionary or a Yeti object.
Returns:
A tuple of two lists: the first one contains the neighbors (vertices),
the second one contains the relationships (edges)
Tuple[dict, list, int]:
- the neighbors (vertices),
- the relationships (edges),
- total neighbor (vertices) count
"""
query_filter = ""
args = {
Expand Down Expand Up @@ -622,14 +637,15 @@ def _build_edges(self, arango_edges) -> List["RelationshipTypes"]:

def _build_vertices(self, vertices, arango_vertices):
# Import happens here to avoid circular dependency
from core.schemas import entity, indicator, observable, tag
from core.schemas import dfiq, entity, indicator, observable, tag

type_mapping = {
"tag": tag.Tag,
}
type_mapping.update(observable.TYPE_MAPPING)
type_mapping.update(entity.TYPE_MAPPING)
type_mapping.update(indicator.TYPE_MAPPING)
type_mapping.update(dfiq.TYPE_MAPPING)

for vertex in arango_vertices:
if vertex["_key"] in vertices:
Expand Down
253 changes: 253 additions & 0 deletions core/schemas/dfiq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
import datetime
from enum import Enum
from typing import ClassVar, Literal, Type

import yaml
from pydantic import BaseModel, Field, computed_field

from core import database_arango
from core.helpers import now
from core.schemas.model import YetiModel


class DFIQType(str, Enum):
scenario = "scenario"
facet = "facet"
question = "question"
approach = "approach"


class DFIQBase(YetiModel, database_arango.ArangoYetiConnector):
_collection_name: ClassVar[str] = "dfiq"
_type_filter: ClassVar[str] = ""
_root_type: Literal["dfiq"] = "dfiq"

name: str
dfiq_id: str
dfiq_version: str
dfiq_tags: list[str] | None = None
contributors: list[str] | None = None
dfiq_yaml: str

created: datetime.datetime = Field(default_factory=now)
modified: datetime.datetime = Field(default_factory=now)

@computed_field(return_type=Literal["root_type"])
@property
def root_type(self):
return self._root_type

@classmethod
def load(cls, object: dict):
if object["type"] in TYPE_MAPPING:
return TYPE_MAPPING[object["type"]](**object)
return cls(**object)

def to_yaml(self):
dump = self.model_dump(
exclude={"created", "modified", "id", "root_type", "dfiq_yaml"}
)
dump["type"] = dump["type"].removeprefix("DFIQType.")
dump["display_name"] = dump.pop("name")
dump["tags"] = dump.pop("dfiq_tags")
dump["id"] = dump.pop("dfiq_id")
if dump["contributors"] is None:
dump.pop("contributors")
return yaml.dump(dump)

def update_parents(self):
intended_parent_ids = None
if hasattr(self, "parent_ids"):
intended_parent_ids = self.parent_ids
elif self.type == DFIQType.approach:
intended_parent_ids = [self.dfiq_id.split(".")[0]]
else:
return

intended_parents = [
DFIQBase.find(dfiq_id=parent_id) for parent_id in intended_parent_ids
]
if not all(intended_parents):
raise ValueError(
f"Missing parent(s) {intended_parent_ids} for {self.dfiq_id}"
)

# remove all links:
vertices, relationships, total = self.neighbors()
for edge in relationships:
for rel in edge:
if rel.type not in {t.value for t in DFIQType}:
continue
if rel.target != self.extended_id:
continue
if vertices[rel.source].dfiq_id not in intended_parent_ids:
rel.delete()

for parent in intended_parents:
parent.link_to(self, self.type, f"Uses DFIQ {self.type}")


class DFIQScenario(DFIQBase):
description: str

type: Literal[DFIQType.scenario] = DFIQType.scenario

@classmethod
def from_yaml(cls: Type["DFIQScenario"], yaml_string: str) -> "DFIQScenario":
try:
yaml_data = yaml.safe_load(yaml_string)
except yaml.YAMLError as e:
raise ValueError(f"Invalid YAML: {e}")
if yaml_data["type"] != "scenario":
raise ValueError(f"Invalid type for DFIQ scenario: {yaml_data['type']}")

return cls(
name=yaml_data["display_name"],
description=yaml_data["description"],
dfiq_id=yaml_data["id"],
dfiq_version=yaml_data["dfiq_version"],
dfiq_tags=yaml_data.get("tags"),
contributors=yaml_data.get("contributors"),
dfiq_yaml=yaml_string,
)


class DFIQFacet(DFIQBase):
description: str | None

parent_ids: list[str]

type: Literal[DFIQType.facet] = DFIQType.facet

@classmethod
def from_yaml(cls: Type["DFIQFacet"], yaml_string: str) -> "DFIQFacet":
try:
yaml_data = yaml.safe_load(yaml_string)
except yaml.YAMLError as e:
raise ValueError(f"Invalid YAML: {e}")
if yaml_data["type"] != "facet":
raise ValueError(f"Invalid type for DFIQ facet: {yaml_data['type']}")

return cls(
name=yaml_data["display_name"],
description=yaml_data.get("description"),
dfiq_id=yaml_data["id"],
dfiq_version=yaml_data["dfiq_version"],
dfiq_tags=yaml_data.get("tags"),
contributors=yaml_data.get("contributors"),
parent_ids=yaml_data["parent_ids"],
dfiq_yaml=yaml_string,
)


class DFIQQuestion(DFIQBase):
description: str | None
parent_ids: list[str]

type: Literal[DFIQType.question] = DFIQType.question

@classmethod
def from_yaml(cls: Type["DFIQQuestion"], yaml_string: str) -> "DFIQQuestion":
try:
yaml_data = yaml.safe_load(yaml_string)
except yaml.YAMLError as e:
raise ValueError(f"Invalid YAML: {e}")
if yaml_data["type"] != "question":
raise ValueError(f"Invalid type for DFIQ question: {yaml_data['type']}")

return cls(
name=yaml_data["display_name"],
description=yaml_data.get("description"),
dfiq_id=yaml_data["id"],
dfiq_version=yaml_data["dfiq_version"],
dfiq_tags=yaml_data.get("tags"),
contributors=yaml_data.get("contributors"),
parent_ids=yaml_data["parent_ids"],
dfiq_yaml=yaml_string,
)


class DFIQData(BaseModel):
type: str
value: str


class DFIQProcessorOption(BaseModel):
type: str
value: str


class DFIQAnalysisStep(BaseModel):
description: str
type: str
value: str


class DFIQAnalysis(BaseModel):
name: str
steps: list[DFIQAnalysisStep] = []


class DFIQProcessors(BaseModel):
name: str
options: list[DFIQProcessorOption] = []
analysis: list[DFIQAnalysis] = []


class DFIQApproachDescription(BaseModel):
summary: str
details: str
references: list[str] = []
references_internal: list[str] | None = None


class DFIQApproachNotes(BaseModel):
covered: list[str] = []
not_covered: list[str] = []


class DFIQApproachView(BaseModel):
data: list[DFIQData] = []
notes: DFIQApproachNotes
processors: list[DFIQProcessors] = []


class DFIQApproach(DFIQBase):
description: DFIQApproachDescription
view: DFIQApproachView

type: Literal[DFIQType.approach] = DFIQType.approach

@classmethod
def from_yaml(cls: Type["DFIQApproach"], yaml_string: str) -> "DFIQApproach":
try:
yaml_data = yaml.safe_load(yaml_string)
except yaml.YAMLError as e:
raise ValueError(f"Invalid YAML: {e}")
if yaml_data["type"] != "approach":
raise ValueError(f"Invalid type for DFIQ approach: {yaml_data['type']}")
return cls(
name=yaml_data["display_name"],
description=DFIQApproachDescription(**yaml_data["description"]),
view=DFIQApproachView(**yaml_data["view"]),
dfiq_id=yaml_data["id"],
dfiq_version=yaml_data["dfiq_version"],
dfiq_tags=yaml_data.get("tags"),
contributors=yaml_data.get("contributors"),
dfiq_yaml=yaml_string,
)


TYPE_MAPPING = {
"scenario": DFIQScenario,
"facet": DFIQFacet,
"question": DFIQQuestion,
"approach": DFIQApproach,
"dfiq": DFIQBase,
}


DFIQTypes = DFIQScenario | DFIQFacet | DFIQQuestion | DFIQApproach
DFIQClasses = (
Type[DFIQScenario] | Type[DFIQFacet] | Type[DFIQQuestion] | Type[DFIQApproach]
)
Loading

0 comments on commit e2ade76

Please sign in to comment.