Skip to content

Commit

Permalink
Merge pull request #96 from splunk/bugfix-new-detection
Browse files Browse the repository at this point in the history
Allow the creation of new detections and new stories through use of contentctl new command!
  • Loading branch information
pyth0n1c authored Jan 30, 2024
2 parents 497a6e4 + fcc6cb4 commit 9e3eade
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 69 deletions.
4 changes: 2 additions & 2 deletions contentctl/actions/new_content.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ class NewContentInputDto:
class NewContent:

def execute(self, input_dto: NewContentInputDto) -> None:
new_content_generator_output_dto = NewContentGeneratorOutputDto({})
new_content_generator_output_dto = NewContentGeneratorOutputDto({},{})
new_content_generator = NewContentGenerator(new_content_generator_output_dto)
new_content_generator.execute(input_dto.new_content_generator_input_dto)

new_content_yml_output = NewContentYmlOutput(input_dto.output_path)
new_content_yml_output.writeObjectNewContent(new_content_generator_output_dto.obj, input_dto.new_content_generator_input_dto.type)
new_content_yml_output.writeObjectNewContent(new_content_generator_output_dto.obj, new_content_generator_output_dto.answers.get("detection_kind",None), input_dto.new_content_generator_input_dto.type)
32 changes: 23 additions & 9 deletions contentctl/input/new_content_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class NewContentGeneratorInputDto:
@dataclass(frozen=True)
class NewContentGeneratorOutputDto:
obj: dict
answers: dict


class NewContentGenerator():
Expand All @@ -29,15 +30,16 @@ def execute(self, input_dto: NewContentGeneratorInputDto) -> None:
if input_dto.type == SecurityContentType.detections:
questions = NewContentQuestions.get_questions_detection()
answers = questionary.prompt(questions)
self.output_dto.answers.update(answers)
self.output_dto.obj['name'] = answers['detection_name']
self.output_dto.obj['id'] = str(uuid.uuid4())
self.output_dto.obj['version'] = 1
self.output_dto.obj['date'] = datetime.today().strftime('%Y-%m-%d')
self.output_dto.obj['author'] = answers['detection_author']
self.output_dto.obj['data_source'] = answers['data_source']
self.output_dto.obj['type'] = answers['detection_type']
self.output_dto.obj['datamodel'] = answers['datamodels']
self.output_dto.obj['datamodel'] = answers['datamodels']
self.output_dto.obj['description'] = 'UPDATE_DESCRIPTION'
self.output_dto.obj['description'] = "production" #start everything as production since that's what we INTEND the content to become
file_name = self.output_dto.obj['name'].replace(' ', '_').replace('-','_').replace('.','_').replace('/','_').lower()
self.output_dto.obj['search'] = answers['detection_search'] + ' | `' + file_name + '_filter`'
self.output_dto.obj['how_to_implement'] = 'UPDATE_HOW_TO_IMPLEMENT'
Expand All @@ -46,25 +48,36 @@ def execute(self, input_dto: NewContentGeneratorInputDto) -> None:
self.output_dto.obj['tags'] = dict()
self.output_dto.obj['tags']['analytic_story'] = ['UPDATE_STORY_NAME']
self.output_dto.obj['tags']['asset_type'] = 'UPDATE asset_type'
self.output_dto.obj['tags']['cis20'] = ['CIS 3', 'CIS 5', 'CIS 16']
self.output_dto.obj['tags']['confidence'] = 'UPDATE value between 1-100'
self.output_dto.obj['tags']['context'] = ['Update context']
self.output_dto.obj['tags']['dataset'] = ['UPDATE_DATASET_URL']
self.output_dto.obj['tags']['impact'] = 'UPDATE value between 1-100'
self.output_dto.obj['tags']['kill_chain_phases'] = answers['kill_chain_phases']
self.output_dto.obj['tags']['message'] = 'UPDATE message'
self.output_dto.obj['tags']['mitre_attack_id'] = [x.strip() for x in answers['mitre_attack_ids'].split(',')]
self.output_dto.obj['tags']['nist'] = ['DE.CM']
self.output_dto.obj['tags']['observable'] = [{'name': 'UPDATE', 'type': 'UPDATE', 'role': ['UPDATE']}]
self.output_dto.obj['tags']['product'] = ['Splunk Enterprise','Splunk Enterprise Security','Splunk Cloud']
self.output_dto.obj['tags']['required_fields'] = ['UPDATE']
self.output_dto.obj['tags']['risk_score'] = 'UPDATE (impact * confidence)/100'
self.output_dto.obj['tags']['security_domain'] = answers['security_domain']
#self.output_dto.obj['source'] = answers['detection_kind']
self.output_dto.obj['tags']['cve'] = ['UPDATE WITH CVE(S) IF APPLICABLE']

#generate the tests section
self.output_dto.obj['tests'] = [
{
'name': "True Positive Test",
'attack_data': [
{
'data': "Enter URL for Dataset Here. This may also be a relative or absolute path on your local system for testing.",
"sourcetype": "UPDATE SOURCETYPE",
"source": "UPDATE SOURCE"
}
]
}
]



elif input_dto.type == SecurityContentType.stories:
questions = NewContentQuestions.get_questions_story()
self.output_dto.answers=answers
answers = questionary.prompt(questions)
self.output_dto.obj['name'] = answers['story_name']
self.output_dto.obj['id'] = str(uuid.uuid4())
Expand All @@ -78,4 +91,5 @@ def execute(self, input_dto: NewContentGeneratorInputDto) -> None:
self.output_dto.obj['tags']['analytic_story'] = self.output_dto.obj['name']
self.output_dto.obj['tags']['category'] = answers['category']
self.output_dto.obj['tags']['product'] = ['Splunk Enterprise','Splunk Enterprise Security','Splunk Cloud']
self.output_dto.obj['tags']['usecase'] = answers['usecase']
self.output_dto.obj['tags']['usecase'] = answers['usecase']
self.output_dto.obj['tags']['cve'] = ['UPDATE WITH CVE(S) IF APPLICABLE']
85 changes: 54 additions & 31 deletions contentctl/input/new_content_questions.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,19 @@ def get_questions_detection(self) -> list:
'name': 'detection_name',
'default': 'Powershell Encoded Command',
},
{
'type': 'select',
'message': 'what kind of detection is this',
'name': 'detection_kind',
'choices': [
'endpoint',
'cloud',
'application',
'network',
'web'
],
'default': 'endpoint'
},
{
'type': 'text',
'message': 'enter author name',
Expand All @@ -32,22 +45,48 @@ def get_questions_detection(self) -> list:
},
{
'type': 'checkbox',
'message': 'select the datamodels used in the detection',
'name': 'datamodels',
'message': 'Your data source',
'name': 'data_source',
'choices': [
'Endpoint',
'Authentication',
'Change',
'Email',
'Network_Resolution',
'Network_Traffic',
'Network_Sessions',
'Updates',
'Vulnerabilities',
'Web',
'Risk'
],
'default': 'Endpoint'
"OSQuery ES Process Events",
"Powershell 4104",
"Sysmon Event ID 1",
"Sysmon Event ID 3",
"Sysmon Event ID 5",
"Sysmon Event ID 6",
"Sysmon Event ID 7",
"Sysmon Event ID 8",
"Sysmon Event ID 9",
"Sysmon Event ID 10",
"Sysmon Event ID 11",
"Sysmon Event ID 13",
"Sysmon Event ID 15",
"Sysmon Event ID 20",
"Sysmon Event ID 21",
"Sysmon Event ID 22",
"Sysmon Event ID 23",
"Windows Security 4624",
"Windows Security 4625",
"Windows Security 4648",
"Windows Security 4663",
"Windows Security 4688",
"Windows Security 4698",
"Windows Security 4703",
"Windows Security 4720",
"Windows Security 4732",
"Windows Security 4738",
"Windows Security 4741",
"Windows Security 4742",
"Windows Security 4768",
"Windows Security 4769",
"Windows Security 4771",
"Windows Security 4776",
"Windows Security 4781",
"Windows Security 4798",
"Windows Security 5136",
"Windows Security 5145",
"Windows System 7045"
]
},
{
'type': 'text',
Expand All @@ -61,22 +100,6 @@ def get_questions_detection(self) -> list:
'name': 'mitre_attack_ids',
'default': 'T1003.002'
},
{
'type': 'checkbox',
'message': 'select kill chain phases related to the detection',
'name': 'kill_chain_phases',
'choices': [
'Reconnaissance',
'Weaponization',
'Delivery',
'Exploitation',
'Installation',
'Command & Control',
'Actions on Objectives',
'Denial of Service'
],
'default': 'Exploitation'
},
{
'type': 'select',
'message': 'security_domain for detection',
Expand Down
33 changes: 7 additions & 26 deletions contentctl/output/new_content_yml_output.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import os

import pathlib
from contentctl.objects.enums import SecurityContentType
from contentctl.output.yml_writer import YmlWriter

Expand All @@ -11,32 +11,13 @@ def __init__(self, output_path:str):
self.output_path = output_path


def writeObjectNewContent(self, object: dict, type: SecurityContentType) -> None:
def writeObjectNewContent(self, object: dict, subdirectory_name: str, type: SecurityContentType) -> None:
if type == SecurityContentType.detections:
file_path = os.path.join(self.output_path, 'detections', self.convertNameToFileName(object['name'], object['tags']['product']))
test_obj = {}
test_obj['name'] = object['name'] + ' Unit Test'
test_obj['tests'] = [
{
'name': object['name'],
'file': self.convertNameToFileName(object['name'],object['tags']['product']),
'pass_condition': '| stats count | where count > 0',
'earliest_time': '-24h',
'latest_time': 'now',
'attack_data': [
{
'file_name': 'UPDATE',
'data': 'UPDATE',
'source': 'UPDATE',
'sourcetype': 'UPDATE',
'update_timestamp': True
}
]
}
]
file_path_test = os.path.join(self.output_path, 'tests', self.convertNameToTestFileName(object['name'], object['tags']['product']))
YmlWriter.writeYmlFile(file_path_test, test_obj)
#object.pop('source')
file_path = os.path.join(self.output_path, 'detections', subdirectory_name, self.convertNameToFileName(object['name'], object['tags']['product']))
output_folder = pathlib.Path(self.output_path)/'detections'/subdirectory_name
#make sure the output folder exists for this detection
output_folder.mkdir(exist_ok=True)

YmlWriter.writeYmlFile(file_path, object)
print("Successfully created detection " + file_path)

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "contentctl"
version = "3.0.0"
version = "3.0.1"
description = "Splunk Content Control Tool"
authors = ["STRT <[email protected]>"]
license = "Apache 2.0"
Expand Down

0 comments on commit 9e3eade

Please sign in to comment.