Skip to content

Commit

Permalink
Enable passing on auto-run analyzers parameter when using importer li…
Browse files Browse the repository at this point in the history
…brary (#3143)

* Add a run_analyzer function to the importer
* Add an analyzer trigger to the importer class
* Adding a flag to the timesketch_importer CLI script
* Adding unit tests for the run_analyzer function

---------

Co-authored-by: Janosch <[email protected]>
  • Loading branch information
YiChiCanCode and jkppr authored Oct 2, 2024
1 parent fa03fc9 commit d34afff
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 9 deletions.
50 changes: 42 additions & 8 deletions importer_client/python/timesketch_import_client/importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,41 @@

from timesketch_api_client import timeline
from timesketch_api_client import definitions
from timesketch_api_client.error import UnableToRunAnalyzer
from timesketch_import_client import utils

logger = logging.getLogger("timesketch_importer.importer")


def run_analyzers(analyzer_names=None, timeline_obj=None):
"""Run the analyzers on the uploaded timeline."""

if not timeline_obj:
logger.error("Unable to run analyzers: Timeline object not found.")
raise ValueError("Timeline object not found.")

if timeline_obj.status not in ("ready", "success"):
logger.error("The provided timeline '%s' is not ready yet!", timeline_obj.name)
return None

if not analyzer_names:
logger.info("No analyzer names provided, skipping analysis.")
return None

try:
analyzer_results = timeline_obj.run_analyzers(analyzer_names)
except UnableToRunAnalyzer as e:
logger.error(
"Failed to run requested analyzers '%s'! Error: %s",
str(analyzer_names),
str(e),
)
return None

logger.debug("Analyzer results: %s", analyzer_results)
return analyzer_results


class ImportStreamer(object):
"""Upload object used to stream results to Timesketch."""

Expand Down Expand Up @@ -708,8 +738,18 @@ def celery_task_id(self):
"""Return the celery task identification for the upload."""
return self._celery_task_id

def _trigger_analyzers(self, analyzer_names=None):
"""Run the analyzers on the uploaded timeline."""

self._ready()

if self._data_lines:
self.flush(end_stream=True)

return run_analyzers(analyzer_names=analyzer_names, timeline_obj=self.timeline)

def close(self):
"""Close the streamer."""
"""Close the streamer"""
try:
self._ready()
except ValueError:
Expand All @@ -718,13 +758,6 @@ def close(self):
if self._data_lines:
self.flush(end_stream=True)

# Trigger auto analyzer pipeline to kick in.
pipe_resource = "{0:s}/sketches/{1:d}/analyzer/".format(
self._sketch.api.api_root, self._sketch.id
)
data = {"index_name": self._index}
_ = self._sketch.api.session.post(pipe_resource, json=data)

def flush(self, end_stream=True):
"""Flushes the buffer and uploads to timesketch.
Expand All @@ -736,6 +769,7 @@ def flush(self, end_stream=True):
ValueError: if the stream object is not fully configured.
RuntimeError: if the stream was not uploaded.
"""

if not self._data_lines:
return

Expand Down
67 changes: 66 additions & 1 deletion importer_client/python/timesketch_import_client/importer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
import json
import unittest
import mock

import pandas

from timesketch_api_client.error import UnableToRunAnalyzer

from . import importer


Expand Down Expand Up @@ -270,3 +271,67 @@ def _run_all_tests(self, columns, lines):
]
)
self.assertSetEqual(set(messages), message_correct)


class RunAnalyzersTest(unittest.TestCase):
"""Tests for the run_analyzers function."""

@mock.patch("timesketch_import_client.importer.logger")
def test_run_analyzers_without_timeline(self, mock_logger):
"""Test calling run_analyzers without a timeline object."""
with self.assertRaises(ValueError):
importer.run_analyzers(analyzer_names=["test_analyzer"])
mock_logger.error.assert_called_with(
"Unable to run analyzers: Timeline object not found."
)

@mock.patch("timesketch_import_client.importer.logger")
def test_run_analyzers_timeline_not_ready(self, mock_logger):
"""Test calling run_analyzers with a timeline that is not ready."""
timeline_obj = mock.Mock(status="pending", name="test")
importer.run_analyzers(
analyzer_names=["test_analyzer"], timeline_obj=timeline_obj
)
mock_logger.error.assert_called_with(
"The provided timeline '%s' is not ready yet!", timeline_obj.name
)

@mock.patch("timesketch_import_client.importer.logger")
def test_run_analyzers_without_analyzers(self, mock_logger):
"""Test calling run_analyzers without analyzers."""
timeline_obj = mock.Mock(status="ready")
importer.run_analyzers(timeline_obj=timeline_obj)
mock_logger.info.assert_called_with(
"No analyzer names provided, skipping analysis."
)

@mock.patch("timesketch_import_client.importer.logger")
def test_run_analyzers_success(self, mock_logger):
"""Test calling run_analyzers successfully."""
timeline_obj = mock.Mock(
status="ready", run_analyzers=mock.Mock(return_value="Success")
)
return_value = importer.run_analyzers(
analyzer_names=["test_analyzer"], timeline_obj=timeline_obj
)
self.assertEqual(return_value, "Success")
mock_logger.debug.assert_called_with("Analyzer results: %s", "Success")

@mock.patch("timesketch_import_client.importer.logger")
def test_run_analyzers_failed(self, mock_logger):
"""Test calling run_analyzers with an exception."""
timeline_obj = mock.Mock(
status="ready",
run_analyzers=mock.Mock(
side_effect=UnableToRunAnalyzer("Analyzer failed.")
),
)
return_value = importer.run_analyzers(
analyzer_names=["test_analyzer"], timeline_obj=timeline_obj
)
self.assertIsNone(return_value)
mock_logger.error.assert_called_with(
"Failed to run requested analyzers '%s'! Error: %s",
"['test_analyzer']",
"Analyzer failed.",
)
32 changes: 32 additions & 0 deletions importer_client/python/tools/timesketch_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ def upload_file(
timeline = streamer.timeline
task_id = streamer.celery_task_id

streamer.close()

logger.info("File upload completed.")
return timeline, task_id

Expand Down Expand Up @@ -462,6 +464,20 @@ def main(args=None):
help=("Path to the file that is to be imported."),
)

config_group.add_argument(
"--analyzer_names",
"--analyzer-names",
nargs="*",
action="store",
dest="analyzer_names",
default=[],
help=(
"Set of analyzers that we will automatically run right after the "
"timelines are uploaded. The input needs to be the analyzers names."
"Provided as strings separated by space"
),
)

options = argument_parser.parse_args(args)

if options.show_version:
Expand Down Expand Up @@ -616,6 +632,7 @@ def main(args=None):
"log_config_file": options.log_config_file,
"data_label": options.data_label,
"context": options.context,
"analyzer_names": options.analyzer_names,
}

logger.info("Uploading file.")
Expand All @@ -627,6 +644,11 @@ def main(args=None):
logger.info(
"File got successfully uploaded to sketch: {0:d}".format(my_sketch.id)
)
if options.analyzer_names:
logger.warning(
"Argument 'analyzer_names' only works with 'wait_timeline = "
"True'! Skipping execution of analyzers: {analyzer_names}"
)
return

if not timeline:
Expand Down Expand Up @@ -664,6 +686,16 @@ def main(args=None):
print(f"Status of the index is: {task_state}")
break

if options.analyzer_names:
logger.info(
"Trigger analyzers: %s on Timeline '%s'",
str(options.analyzer_names),
str(timeline.name),
)
_ = importer.run_analyzers(
analyzer_names=options.analyzer_names, timeline_obj=timeline
)


if __name__ == "__main__":
main()

0 comments on commit d34afff

Please sign in to comment.