From 0d7a978113a1d7a940464f0d04a705e14ca4dc3e Mon Sep 17 00:00:00 2001 From: Janosch <99879757+jkppr@users.noreply.github.com> Date: Mon, 7 Oct 2024 17:50:01 +0200 Subject: [PATCH] DFIQ Analyzer Implementation (#3178) * DFIQ Analyzer implementation * Dynamic import of DFIQ analyzers * Integration into the analyzer framework * Trigger via DFIQ Approaches being added to a sketch * Linked Analysis with Approach objects * Trigger chck for analysis from the API Endpoint * DFIQ analyzer trigger via uploaded timeline * Adding a function to deregister analyzers to the manager * Ensuring the index is ready before analyzers are executed * Linking Analysis and InvestigativeQuestionConclusion objects. * Adding unit tests for the scenarios API `check_and_run_dfiq_analysis_steps` function. --- timesketch/api/v1/resources/analysis.py | 30 +- timesketch/api/v1/resources/scenarios.py | 70 ++++ timesketch/api/v1/resources_test.py | 129 ++++++++ timesketch/lib/analyzers/__init__.py | 1 + .../lib/analyzers/dfiq_plugins/__init__.py | 5 + .../lib/analyzers/dfiq_plugins/manager.py | 308 ++++++++++++++++++ timesketch/lib/analyzers/interface.py | 1 + timesketch/lib/analyzers/manager.py | 29 +- timesketch/lib/tasks.py | 124 +++++-- .../87d24c7252fc_linking_analysis_and_.py | 42 +++ timesketch/models/sketch.py | 10 + 11 files changed, 720 insertions(+), 29 deletions(-) create mode 100644 timesketch/lib/analyzers/dfiq_plugins/__init__.py create mode 100644 timesketch/lib/analyzers/dfiq_plugins/manager.py create mode 100644 timesketch/migrations/versions/87d24c7252fc_linking_analysis_and_.py diff --git a/timesketch/api/v1/resources/analysis.py b/timesketch/api/v1/resources/analysis.py index 7bb7c77c2d..9e68386d7e 100644 --- a/timesketch/api/v1/resources/analysis.py +++ b/timesketch/api/v1/resources/analysis.py @@ -51,6 +51,7 @@ logger = logging.getLogger("timesketch.analysis_api") +# TODO: Filter DFIQ analyzer results from this! class AnalysisResource(resources.ResourceMixin, Resource): """Resource to get analyzer session.""" @@ -180,6 +181,7 @@ def get(self, sketch_id): * display_name: Display name of the analyzer for the UI * description: Description of the analyzer provided in the class * is_multi: Boolean indicating if the analyzer is a multi analyzer + * is_dfiq: Boolean indicating if the analyzer is a DFIQ analyzer """ sketch = Sketch.get_with_acl(sketch_id) if not sketch: @@ -188,22 +190,26 @@ def get(self, sketch_id): abort( HTTP_STATUS_CODE_FORBIDDEN, "User does not have read access to sketch" ) - analyzers = [x for x, y in analyzer_manager.AnalysisManager.get_analyzers()] - analyzers = analyzer_manager.AnalysisManager.get_analyzers() + include_dfiq = ( + request.args.get("include_dfiq", default="false").lower() == "true" + ) + + analyzers = analyzer_manager.AnalysisManager.get_analyzers( + include_dfiq=include_dfiq + ) analyzers_detail = [] for analyzer_name, analyzer_class in analyzers: # TODO: update the multi_analyzer detection logic for edgecases # where analyzers are using custom parameters (e.g. misp) - multi = False - if len(analyzer_class.get_kwargs()) > 0: - multi = True analyzers_detail.append( { "name": analyzer_name, "display_name": analyzer_class.DISPLAY_NAME, "description": analyzer_class.DESCRIPTION, - "is_multi": multi, + "is_multi": len(analyzer_class.get_kwargs()) > 0, + "is_dfiq": hasattr(analyzer_class, "IS_DFIQ_ANALYZER") + and analyzer_class.IS_DFIQ_ANALYZER, } ) @@ -266,8 +272,17 @@ def post(self, sketch_id): if form.get("analyzer_force_run"): analyzer_force_run = True + include_dfiq = False + if form.get("include_dfiq"): + include_dfiq = True + analyzers = [] - all_analyzers = [x for x, _ in analyzer_manager.AnalysisManager.get_analyzers()] + all_analyzers = [ + x + for x, _ in analyzer_manager.AnalysisManager.get_analyzers( + include_dfiq=include_dfiq + ) + ] for analyzer in analyzer_names: for correct_name in all_analyzers: if fnmatch.fnmatch(correct_name, analyzer): @@ -301,6 +316,7 @@ def post(self, sketch_id): analyzer_kwargs=analyzer_kwargs, timeline_id=timeline_id, analyzer_force_run=analyzer_force_run, + include_dfiq=include_dfiq, ) except KeyError as e: logger.warning( diff --git a/timesketch/api/v1/resources/scenarios.py b/timesketch/api/v1/resources/scenarios.py index d825c4bc61..04846a593c 100644 --- a/timesketch/api/v1/resources/scenarios.py +++ b/timesketch/api/v1/resources/scenarios.py @@ -37,6 +37,7 @@ from timesketch.models.sketch import InvestigativeQuestion from timesketch.models.sketch import InvestigativeQuestionApproach from timesketch.models.sketch import InvestigativeQuestionConclusion +from timesketch.lib.analyzers.dfiq_plugins.manager import DFIQAnalyzerManager logger = logging.getLogger("timesketch.scenario_api") @@ -58,6 +59,59 @@ def load_dfiq_from_config(): return DFIQ(dfiq_path) +def check_and_run_dfiq_analysis_steps(dfiq_obj, sketch, analyzer_manager=None): + """Checks if any DFIQ analyzers need to be executed for the given DFIQ object. + + Args: + dfiq_obj: The DFIQ object (Scenario, Question, or Approach). + sketch: The sketch object associated with the DFIQ object. + analyzer_manager: Optional. An existing instance of DFIQAnalyzerManager. + + Returns: + List of analyzer_session objects (can be empty) or False. + """ + # Initialize the analyzer manager only once. + if not analyzer_manager: + analyzer_manager = DFIQAnalyzerManager(sketch=sketch) + + analyzer_sessions = [] + if isinstance(dfiq_obj, InvestigativeQuestionApproach): + session = analyzer_manager.trigger_analyzers_for_approach(approach=dfiq_obj) + if session: + analyzer_sessions.extend(session) + elif isinstance(dfiq_obj, InvestigativeQuestion): + for approach in dfiq_obj.approaches: + session = analyzer_manager.trigger_analyzers_for_approach(approach=approach) + if session: + analyzer_sessions.extend(session) + elif isinstance(dfiq_obj, Facet): + for question in dfiq_obj.questions: + result = check_and_run_dfiq_analysis_steps( + question, sketch, analyzer_manager + ) + if result: + analyzer_sessions.extend(result) + elif isinstance(dfiq_obj, Scenario): + if dfiq_obj.facets: + for facet in dfiq_obj.facets: + result = check_and_run_dfiq_analysis_steps( + facet, sketch, analyzer_manager + ) + if result: + analyzer_sessions.extend(result) + if dfiq_obj.questions: + for question in dfiq_obj.questions: + result = check_and_run_dfiq_analysis_steps( + question, sketch, analyzer_manager + ) + if result: + analyzer_sessions.extend(result) + else: + return False # Invalid DFIQ object type + + return analyzer_sessions if analyzer_sessions else False + + class ScenarioTemplateListResource(resources.ResourceMixin, Resource): """List all scenarios available.""" @@ -241,9 +295,23 @@ def post(self, sketch_id): question_sql.approaches.append(approach_sql) + db_session.add(question_sql) + + # TODO: Remove commit and check function here when questions are + # linked to Scenarios again! + # Needs a tmp commit here so we can run the analyzer on the question. + db_session.commit() + # Check if any of the questions contains analyzer approaches + check_and_run_dfiq_analysis_steps(question_sql, sketch) + db_session.add(scenario_sql) db_session.commit() + # This does not work, since we don't have Scnearios linked down to + # Approaches anymore! We intentionally broke the link to facets to show + # Questions in the frontend. + # check_and_run_dfiq_analysis_steps(scenario_sql, sketch) + return self.to_json(scenario_sql) @@ -594,6 +662,8 @@ def post(self, sketch_id): db_session.add(new_question) db_session.commit() + check_and_run_dfiq_analysis_steps(new_question, sketch) + return self.to_json(new_question) diff --git a/timesketch/api/v1/resources_test.py b/timesketch/api/v1/resources_test.py index 791c4b2f7e..236b8e9bb1 100644 --- a/timesketch/api/v1/resources_test.py +++ b/timesketch/api/v1/resources_test.py @@ -26,6 +26,12 @@ from timesketch.lib.definitions import HTTP_STATUS_CODE_INTERNAL_SERVER_ERROR from timesketch.lib.testlib import BaseTest from timesketch.lib.testlib import MockDataStore +from timesketch.lib.dfiq import DFIQ +from timesketch.api.v1.resources import scenarios +from timesketch.models.sketch import Scenario +from timesketch.models.sketch import InvestigativeQuestion +from timesketch.models.sketch import InvestigativeQuestionApproach +from timesketch.models.sketch import Facet from timesketch.api.v1.resources import ResourceMixin @@ -1403,3 +1409,126 @@ def test_system_settings_resource(self): response = self.client.get(self.resource_url) expected_response = {"DFIQ_ENABLED": False, "LLM_PROVIDER": "test"} self.assertEqual(response.json, expected_response) + + +class ScenariosResourceTest(BaseTest): + """Tests the scenarios resource.""" + + @mock.patch("timesketch.lib.analyzers.dfiq_plugins.manager.DFIQAnalyzerManager") + def test_check_and_run_dfiq_analysis_steps(self, mock_analyzer_manager): + """Test triggering analyzers for different DFIQ objects.""" + test_sketch = self.sketch1 + test_user = self.user1 + self.sketch1.set_status("ready") + self._commit_to_database(test_sketch) + + # Load DFIQ objects + dfiq_obj = DFIQ("./test_data/dfiq/") + + scenario = dfiq_obj.scenarios[0] + scenario_sql = Scenario( + dfiq_identifier=scenario.id, + uuid=scenario.uuid, + name=scenario.name, + display_name=scenario.name, + description=scenario.description, + spec_json=scenario.to_json(), + sketch=test_sketch, + user=test_user, + ) + + facet = dfiq_obj.facets[0] + facet_sql = Facet( + dfiq_identifier=facet.id, + uuid=facet.uuid, + name=facet.name, + display_name=facet.name, + description=facet.description, + spec_json=facet.to_json(), + sketch=test_sketch, + user=test_user, + ) + scenario_sql.facets = [facet_sql] + + question = dfiq_obj.questions[0] + question_sql = InvestigativeQuestion( + dfiq_identifier=question.id, + uuid=question.uuid, + name=question.name, + display_name=question.name, + description=question.description, + spec_json=question.to_json(), + sketch=test_sketch, + scenario=scenario_sql, + user=test_user, + ) + facet_sql.questions = [question_sql] + + approach = question.approaches[0] + approach_sql = InvestigativeQuestionApproach( + name=approach.name, + display_name=approach.name, + description=approach.description, + spec_json=approach.to_json(), + user=test_user, + ) + question_sql.approaches = [approach_sql] + + self._commit_to_database(approach_sql) + self._commit_to_database(question_sql) + self._commit_to_database(facet_sql) + self._commit_to_database(scenario_sql) + + # Test without analysis step + result = scenarios.check_and_run_dfiq_analysis_steps(scenario_sql, test_sketch) + self.assertFalse(result) + + result = scenarios.check_and_run_dfiq_analysis_steps(facet_sql, test_sketch) + self.assertFalse(result) + + result = scenarios.check_and_run_dfiq_analysis_steps(approach_sql, test_sketch) + self.assertFalse(result) + + # Add analysis step to approach + approach.steps.append( + { + "stage": "analysis", + "type": "timesketch-analyzer", + "value": "test_analyzer", + } + ) + approach_sql.spec_json = approach.to_json() + + # Mocking analyzer manager response. + mock_analyzer_manager.trigger_analyzers_for_approach.return_value = [ + mock.MagicMock() + ] + + # Test with analysis step + result = scenarios.check_and_run_dfiq_analysis_steps( + scenario_sql, test_sketch, mock_analyzer_manager + ) + self.assertEqual(result, [mock.ANY, mock.ANY]) + mock_analyzer_manager.trigger_analyzers_for_approach.assert_called_with( + approach=approach_sql + ) + + result = scenarios.check_and_run_dfiq_analysis_steps( + facet_sql, test_sketch, mock_analyzer_manager + ) + self.assertEqual(result, [mock.ANY]) + mock_analyzer_manager.trigger_analyzers_for_approach.assert_called_with( + approach=approach_sql + ) + + result = scenarios.check_and_run_dfiq_analysis_steps( + question_sql, test_sketch, mock_analyzer_manager + ) + self.assertEqual(result, [mock.ANY]) + mock_analyzer_manager.trigger_analyzers_for_approach.assert_called_with( + approach=approach_sql + ) + + # Test with invalid object + result = scenarios.check_and_run_dfiq_analysis_steps("invalid", test_sketch) + self.assertFalse(result) diff --git a/timesketch/lib/analyzers/__init__.py b/timesketch/lib/analyzers/__init__.py index dd63913dbd..1e0ab72e75 100644 --- a/timesketch/lib/analyzers/__init__.py +++ b/timesketch/lib/analyzers/__init__.py @@ -40,3 +40,4 @@ import timesketch.lib.analyzers.authentication import timesketch.lib.analyzers.contrib +import timesketch.lib.analyzers.dfiq_plugins diff --git a/timesketch/lib/analyzers/dfiq_plugins/__init__.py b/timesketch/lib/analyzers/dfiq_plugins/__init__.py new file mode 100644 index 0000000000..c107e0c6cb --- /dev/null +++ b/timesketch/lib/analyzers/dfiq_plugins/__init__.py @@ -0,0 +1,5 @@ +"""DFIQ Analyzer module.""" + +from timesketch.lib.analyzers.dfiq_plugins import manager as dfiq_analyzer_manager + +dfiq_analyzer_manager.load_dfiq_analyzers() diff --git a/timesketch/lib/analyzers/dfiq_plugins/manager.py b/timesketch/lib/analyzers/dfiq_plugins/manager.py new file mode 100644 index 0000000000..de8fabbb76 --- /dev/null +++ b/timesketch/lib/analyzers/dfiq_plugins/manager.py @@ -0,0 +1,308 @@ +# Copyright 2024 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""This file contains a class for managing DFIQ analyzers.""" + +import importlib +import inspect +import json +import logging +import os + +from timesketch.lib.aggregators import manager as aggregator_manager +from timesketch.lib.analyzers import interface +from timesketch.lib.analyzers import manager as analyzer_manager +from timesketch.models.sketch import Timeline + + +logger = logging.getLogger("timesketch.analyzers.dfiq_plugins.manager") + + +def load_dfiq_analyzers(): + """Loads DFIQ analyzer classes.""" + + DFIQ_ANALYZER_PATH = os.path.dirname(os.path.abspath(__file__)) + + # Clear existing registrations before reloading + for name, analyzer_class in analyzer_manager.AnalysisManager.get_analyzers( + include_dfiq=True + ): + if ( + hasattr(analyzer_class, "IS_DFIQ_ANALYZER") + and analyzer_class.IS_DFIQ_ANALYZER + ): + try: + analyzer_manager.AnalysisManager.deregister_analyzer(name) + logger.info("Deregistered DFIQ analyzer: %s", name) + except KeyError as e: + logger.error(str(e)) + + # Dynamically load DFIQ Analyzers + for filename in os.listdir(DFIQ_ANALYZER_PATH): + if filename.endswith(".py") and not filename.startswith("__"): + module_name = os.path.splitext(filename)[0] # Remove .py extension + if module_name == "manager" or module_name.endswith("_test"): + continue + module_path = f"timesketch.lib.analyzers.dfiq_plugins.{module_name}" + try: + module = importlib.import_module(module_path) + for name, obj in inspect.getmembers(module): + if name not in [ + "interface", + "logger", + "logging", + "manager", + ] and not name.startswith("__"): + if ( + inspect.isclass(obj) + and issubclass(obj, interface.BaseAnalyzer) + and hasattr(obj, "IS_DFIQ_ANALYZER") + and obj.IS_DFIQ_ANALYZER + ): + analyzer_manager.AnalysisManager.register_analyzer(obj) + logger.info("Registered DFIQ analyzer: %s", obj.NAME) + else: + logger.error( + 'Skipped loading "%s" as analyzer, since it did' + " not meet the requirements.", + str(module_path), + ) + except ImportError as error: + logger.error( + "Failed to import dfiq analyzer module: %s, %s", + str(module_path), + str(error), + ) + + +class DFIQAnalyzerManager: + """Manager for executing DFIQ analyzers.""" + + def __init__(self, sketch): + """Initializes the manager. + + Args: + sketch: The sketch object. + """ + self.sketch = sketch + self.aggregator_manager = aggregator_manager + self.aggregation_max_tries = 3 + + def trigger_analyzers_for_approach(self, approach): + """Triggers DFIQ analyzers for a newly added approach. + + Args: + approach (InvestigativeQuestionApproach): An approach object to link + with the analyssis + + Returns: + analyzer_sessions or None + """ + dfiq_analyzers = self._get_dfiq_analyzer(approach) + + analyzer_sessions = [] + if dfiq_analyzers: + analyzer_sessions = self._run_dfiq_analyzers(dfiq_analyzers, approach) + + return analyzer_sessions if analyzer_sessions else False + + def trigger_analyzers_for_timelines(self, timelines): + """Triggers DFIQ analyzers for a newly added timeline. + + Args: + timelines []: List of timeline + objects. + + Returns: + analyzer_sessions or None + """ + if isinstance(timelines, Timeline): + timelines = [timelines] + analyzer_sessions = [] + for approach in self._find_analyzer_approaches(): + dfiq_analyzers = self._get_dfiq_analyzer(approach) + if dfiq_analyzers: + session = self._run_dfiq_analyzers( + dfiq_analyzers=dfiq_analyzers, + approach=approach, + timelines=timelines, + ) + if session: + analyzer_sessions.extend(session) + + return analyzer_sessions if analyzer_sessions else False + + def _find_analyzer_approaches(self): + """Finds approaches with a defined analyzer step. + + Returns: + A list of InvestigativeQuestionApproach objects that have a defined + analyzer step in their specification. + """ + approaches = [] + for question in self.sketch.questions: + for approach in question.approaches: + approach_spec = json.loads(approach.spec_json) + if any( + step.get("stage") == "analysis" + and step.get("type") == "timesketch-analyzer" + for step in approach_spec.get("steps", []) + ): + approaches.append(approach) + return approaches + + def _get_dfiq_analyzer(self, approach): + """Checks if the approach has analyzer steps to execute.""" + dfiq_analyzers = set() + approach_spec = json.loads(approach.spec_json) + if approach_spec.get("steps"): + for step in approach_spec.get("steps"): + if ( + step.get("stage") == "analysis" + and step.get("type") == "timesketch-analyzer" + ): + dfiq_analyzers.add(step.get("value")) + + return dfiq_analyzers + + def _get_analyzers_by_data_type(self, dfiq_analyzers): + """Groups DFIQ analyzers by their required data types. + + Args: + dfiq_analyzers (set): A set of DFIQ analyzer names. + + Returns: + dict: A dictionary mapping data types to lists of analyzer names. + The special key "ALL" will be used for classical analyzers + and DFIQ analyzers that don't have a REQUIRED_DATA_TYPES + attribute (i.e., an empty list). It will trigger the analyzer + to run on all timelines in the sketch. + """ + analyzer_by_datatypes = {} + for ( + analyzer_name, + analyzer_class, + ) in analyzer_manager.AnalysisManager.get_analyzers(include_dfiq=True): + if analyzer_name not in dfiq_analyzers: + continue + + required_data_types = getattr(analyzer_class, "REQUIRED_DATA_TYPES", []) + if not required_data_types: + # Classical or DFIQ analyzer without REQUIRED_DATA_TYPES + analyzer_by_datatypes.setdefault("ALL", []).append(analyzer_class.NAME) + else: + for data_type in required_data_types: + analyzer_by_datatypes.setdefault(data_type, []).append( + analyzer_class.NAME + ) + return analyzer_by_datatypes + + def _get_data_types_per_timeline(self, timelines=None): + """Retrieves data types present in each eligible timeline. + + Args: + timelines: (optional) A list of timeline objects. + + Returns: + dict: A dictionary mapping timeline IDs to lists of data types. + """ + if not timelines: + timelines = self.sketch.timelines + + datatype_per_timeline = {} + for timeline in timelines: + if timeline.get_status.status.lower() != "ready": + continue + + aggregation = self.aggregator_manager.AggregatorManager.get_aggregator( + "field_bucket" + )(sketch_id=self.sketch.id, timeline_ids=[timeline.id]) + agg_result = aggregation.run(field="data_type", limit="1000") + datatype_per_timeline[timeline.id] = [ + entry["data_type"] for entry in agg_result.values + ] + return datatype_per_timeline + + def _run_dfiq_analyzers(self, dfiq_analyzers, approach, timelines=None): + """Executes DFIQ analyzers on matching timelines. + + Args: + dfiq_analyzers (set): A set of DFIQ analyzer names. + approach (InvestigativeQuestionApproach): An approach object to link + with the analyssis + timelines ([]): Optional list of timelines to limit the + analysis on. + + Returns: + list: A list of analyzer sessions (potentially empty). + """ + analyzer_by_datatypes = self._get_analyzers_by_data_type(dfiq_analyzers) + if not analyzer_by_datatypes: + logger.error( + "None of the requested DFIQ analyzers exist on this Timesketch " + "instance Requested: %s", + str(dfiq_analyzers), + ) + return [] + + datatype_per_timeline = self._get_data_types_per_timeline(timelines) + analyzer_by_timeline = {} + for timeline_id, timeline_datatypes in datatype_per_timeline.items(): + analyzer_by_timeline[timeline_id] = [] + for data_type, analyzer_names in analyzer_by_datatypes.items(): + # Handle classical analyzers by always including them. + if data_type == "ALL": + analyzer_by_timeline[timeline_id].extend(analyzer_names) + elif data_type in timeline_datatypes: + analyzer_by_timeline[timeline_id].extend(analyzer_names) + + # Import here to avoid circular imports. + # pylint: disable=import-outside-toplevel + from timesketch.lib import tasks + + sessions = [] + for timeline_id, analyzer_names in analyzer_by_timeline.items(): + if not analyzer_names: + continue + timeline = Timeline.get_by_id(timeline_id) + if not timeline or timeline.status[0].status != "ready": + continue + try: + analyzer_group, session = tasks.build_sketch_analysis_pipeline( + sketch_id=self.sketch.id, + searchindex_id=timeline.searchindex.id, + user_id=approach.user.id, + analyzer_names=analyzer_names, + analyzer_kwargs=None, + timeline_id=timeline_id, + analyzer_force_run=False, + include_dfiq=True, + approach_id=approach.id, + ) + except KeyError as e: + logger.warning( + "Unable to build analyzer pipeline, analyzer does not exist: %s", + str(e), + ) + continue + if analyzer_group: + pipeline = ( + tasks.run_sketch_init.s([timeline.searchindex.index_name]) + | analyzer_group + ) + pipeline.apply_async() + + if session: + sessions.append(session) + + return sessions diff --git a/timesketch/lib/analyzers/interface.py b/timesketch/lib/analyzers/interface.py index 7da05681d2..54cd90c5c1 100644 --- a/timesketch/lib/analyzers/interface.py +++ b/timesketch/lib/analyzers/interface.py @@ -904,6 +904,7 @@ class BaseAnalyzer: NAME = "name" DISPLAY_NAME = None DESCRIPTION = None + IS_DFIQ_ANALYZER = False # If this analyzer depends on another analyzer # it needs to be included in this frozenset by using diff --git a/timesketch/lib/analyzers/manager.py b/timesketch/lib/analyzers/manager.py index 7785d723cd..45400f9c09 100644 --- a/timesketch/lib/analyzers/manager.py +++ b/timesketch/lib/analyzers/manager.py @@ -84,11 +84,13 @@ def clear_registration(cls): cls._class_registry = {} @classmethod - def get_analyzers(cls, analyzer_names=None): + def get_analyzers(cls, analyzer_names=None, include_dfiq=False): """Retrieves the registered analyzers. Args: analyzer_names (list): List of analyzer names. + include_dfiq (bool): Optional. Whether to include DFIQ analyzers + in the results. Defaults to False. Yields: tuple: containing: @@ -105,6 +107,14 @@ def get_analyzers(cls, analyzer_names=None): if analyzer_name in completed_analyzers: continue analyzer_class = cls.get_analyzer(analyzer_name) + # Apply DFIQ filtering + if ( + not include_dfiq + and hasattr(analyzer_class, "IS_DFIQ_ANALYZER") + and analyzer_class.IS_DFIQ_ANALYZER + ): + continue + yield analyzer_name, analyzer_class completed_analyzers.add(analyzer_name) @@ -138,3 +148,20 @@ def register_analyzer(cls, analyzer_class): "Class already set for name: {0:s}.".format(analyzer_class.NAME) ) cls._class_registry[analyzer_name] = analyzer_class + + @classmethod + def deregister_analyzer(cls, analyzer_name): + """Deregister an analyzer class. + + The analyzer classes are identified by their lower case name. + + Args: + analyzer_name (string): the analyzer name to deregister. + + Raises: + KeyError: If class is not registered for the corresponding name. + """ + if analyzer_name not in cls._class_registry: + # Do we really need a KeyError here? Isn't logging enough? + raise KeyError("Class not set for name: {0:s}.".format(analyzer_name)) + _ = cls._class_registry.pop(analyzer_name, None) diff --git a/timesketch/lib/tasks.py b/timesketch/lib/tasks.py index 64b23723a5..0581a66f4d 100644 --- a/timesketch/lib/tasks.py +++ b/timesketch/lib/tasks.py @@ -15,39 +15,30 @@ from __future__ import unicode_literals -import os -import logging -import subprocess -import traceback - import codecs +from hashlib import sha1 import io import json -from hashlib import sha1 +import logging +import os +import subprocess +import traceback import six import yaml -from opensearchpy.exceptions import NotFoundError -from opensearchpy.exceptions import RequestError -from flask import current_app - from celery import chain from celery import group from celery import signals +from flask import current_app +from opensearchpy.exceptions import NotFoundError +from opensearchpy.exceptions import RequestError from sqlalchemy import create_engine - -# To be able to determine plaso's version. -try: - import plaso - from plaso.cli import pinfo_tool -except ImportError: - plaso = None - from timesketch.app import configure_logger from timesketch.app import create_celery_app from timesketch.lib import datafinder from timesketch.lib import errors from timesketch.lib.analyzers import manager +from timesketch.lib.analyzers.dfiq_plugins.manager import DFIQAnalyzerManager from timesketch.lib.datastores.opensearch import OpenSearchDataStore from timesketch.lib.utils import read_and_validate_csv from timesketch.lib.utils import read_and_validate_jsonl @@ -58,9 +49,19 @@ from timesketch.models.sketch import SearchIndex from timesketch.models.sketch import Sketch from timesketch.models.sketch import Timeline +from timesketch.models.sketch import InvestigativeQuestionApproach +from timesketch.models.sketch import InvestigativeQuestionConclusion from timesketch.models.user import User +# To be able to determine plaso's version. +try: + import plaso + from plaso.cli import pinfo_tool +except ImportError: + plaso = None + + logger = logging.getLogger("timesketch.tasks") celery = create_celery_app() @@ -167,9 +168,11 @@ def _close_index(index_name, data_store, timeline_id): def _set_timeline_status(timeline_id, status, error_msg=None): """Helper function to set status for searchindex and all related timelines. + Args: timeline_id: Timeline ID. """ + # TODO: Clean-up function, since neither status nor error_msg are used! timeline = Timeline.get_by_id(timeline_id) if not timeline: logger.warning("Cannot set status: No such timeline") @@ -194,6 +197,32 @@ def _set_timeline_status(timeline_id, status, error_msg=None): db_session.add(timeline) db_session.commit() + # Refresh the index so it is searchable for the analyzers right away. + datastore = OpenSearchDataStore( + host=current_app.config["OPENSEARCH_HOST"], + port=current_app.config["OPENSEARCH_PORT"], + ) + try: + datastore.client.indices.refresh(index=timeline.searchindex.index_name) + except NotFoundError: + logger.error( + "Unable to refresh index: {0:s}, not found, " + "removing from list.".format(timeline.searchindex.index_name) + ) + + # If status is set to ready, check for analyzers to execute. + if timeline.get_status.status == "ready": + analyzer_manager = DFIQAnalyzerManager(sketch=timeline.sketch) + sessions = analyzer_manager.trigger_analyzers_for_timelines( + timelines=[timeline] + ) + if sessions: + logger.info( + "Executed %d analyzers on the new timeline: '%s'", + len(sessions), + timeline.name, + ) + def _set_datasource_status(timeline_id, file_path, status, error_message=None): timeline = Timeline.get_by_id(timeline_id) @@ -324,6 +353,43 @@ def build_index_pipeline( return chain(index_task) +def _create_question_conclusion(user_id, approach_id, analysis_results, analysis): + """Creates a QuestionConclusion for a user and approach. + + Args: + user_id (int): The user ID. + approach_id (int): The approach ID. + conclusion (str): The actual conclusion of the analysis. + + Returns: + InvestigativeQuestionConclusion: A QuestionConclusion object or None. + """ + approach = InvestigativeQuestionApproach.get_by_id(approach_id) + if not approach: + logging.error("No approach with ID '%d' found.", approach_id) + return None + + if not analysis_results: + logging.error( + "Can't create an InvestigativeQuestionConclusion without any " + "conclusion or analysis_results provided." + ) + return None + + # TODO: (jkppr) Parse the analysis_results and extract added stories, + # searches, graphs, aggregations and add to the object! + question_conclusion = InvestigativeQuestionConclusion( + conclusion=analysis_results, + investigativequestion_id=approach.investigativequestion_id, + automated=True, + ) + question_conclusion.analysis.append(analysis) + db_session.add(question_conclusion) + db_session.commit() + + return question_conclusion if question_conclusion else None + + def build_sketch_analysis_pipeline( sketch_id, searchindex_id, @@ -332,6 +398,8 @@ def build_sketch_analysis_pipeline( analyzer_kwargs=None, analyzer_force_run=False, timeline_id=None, + include_dfiq=False, + approach_id=None, ): """Build a pipeline for sketch analysis. @@ -349,13 +417,14 @@ def build_sketch_analysis_pipeline( analyzer_kwargs (dict): Arguments to the analyzers. analyzer_force_run (bool): If true then force the analyzer to run. timeline_id (int): Optional int of the timeline to run the analyzer on. + include_dfiq (bool): If trie then include dfiq analyzers in the task. + approach_id (int): Optional ID of the approach triggering the analyzer. Returns: A tuple with a Celery group with analysis tasks or None if no analyzers are enabled and an analyzer session ID. """ tasks = [] - if not analyzer_names: analyzer_names = current_app.config.get("AUTO_SKETCH_ANALYZERS", []) if not analyzer_kwargs: @@ -377,7 +446,7 @@ def build_sketch_analysis_pipeline( analysis_session = AnalysisSession(user=user, sketch=sketch) db_session.add(analysis_session) - analyzers = manager.AnalysisManager.get_analyzers(analyzer_names) + analyzers = manager.AnalysisManager.get_analyzers(analyzer_names, include_dfiq) for analyzer_name, analyzer_class in analyzers: base_kwargs = analyzer_kwargs.get(analyzer_name, {}) searchindex = SearchIndex.get_by_id(searchindex_id) @@ -436,6 +505,7 @@ def build_sketch_analysis_pipeline( user=user, sketch=sketch, timeline=timeline, + approach_id=approach_id, ) analysis.add_attribute(name="kwargs_hash", value=kwargs_list_hash) analysis.set_status("PENDING") @@ -452,7 +522,6 @@ def build_sketch_analysis_pipeline( **kwargs, ) ) - # Commit the analysis session to the database. if len(analysis_session.analyses) > 0: db_session.add(analysis_session) @@ -568,6 +637,19 @@ def run_sketch_analyzer( result = analyzer.run_wrapper(analysis_id) logger.info("[{0:s}] result: {1:s}".format(analyzer_name, result)) + if hasattr(analyzer_class, "IS_DFIQ_ANALYZER") and analyzer_class.IS_DFIQ_ANALYZER: + analysis = Analysis.get_by_id(analysis_id) + user_id = analysis.user.id + approach_id = analysis.approach_id + question_conclusion = _create_question_conclusion( + user_id, approach_id, result, analysis + ) + if question_conclusion: + logger.info( + '[{0:s}] added a conclusion to dfiq: "{1:s}"'.format( + analyzer_name, question_conclusion.investigativequestion.name + ) + ) return index_name diff --git a/timesketch/migrations/versions/87d24c7252fc_linking_analysis_and_.py b/timesketch/migrations/versions/87d24c7252fc_linking_analysis_and_.py new file mode 100644 index 0000000000..1ccff2499b --- /dev/null +++ b/timesketch/migrations/versions/87d24c7252fc_linking_analysis_and_.py @@ -0,0 +1,42 @@ +"""Linking Analysis and InvestigativeQuestion models. + +Revision ID: 87d24c7252fc +Revises: c5560d97a2c8 +Create Date: 2024-10-02 16:17:42.576745 + +""" + +from alembic import op +import sqlalchemy as sa + +# revision identifiers, used by Alembic. +revision = "87d24c7252fc" +down_revision = "c5560d97a2c8" + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("analysis", schema=None) as batch_op: + batch_op.add_column(sa.Column("approach_id", sa.Integer(), nullable=True)) + batch_op.add_column( + sa.Column("question_conclusion_id", sa.Integer(), nullable=True) + ) + batch_op.create_foreign_key( + None, "investigativequestionconclusion", ["question_conclusion_id"], ["id"] + ) + batch_op.create_foreign_key( + None, "investigativequestionapproach", ["approach_id"], ["id"] + ) + + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("analysis", schema=None) as batch_op: + batch_op.drop_constraint(None, type_="foreignkey") + batch_op.drop_constraint(None, type_="foreignkey") + batch_op.drop_column("question_conclusion_id") + batch_op.drop_column("approach_id") + + # ### end Alembic commands ### diff --git a/timesketch/models/sketch.py b/timesketch/models/sketch.py index 4e1c76d5f7..cda99d7bf0 100644 --- a/timesketch/models/sketch.py +++ b/timesketch/models/sketch.py @@ -351,6 +351,10 @@ class Analysis(GenericAttributeMixin, LabelMixin, StatusMixin, CommentMixin, Bas sketch_id = Column(Integer, ForeignKey("sketch.id")) timeline_id = Column(Integer, ForeignKey("timeline.id")) searchindex_id = Column(Integer, ForeignKey("searchindex.id")) + approach_id = Column(Integer, ForeignKey("investigativequestionapproach.id")) + question_conclusion_id = Column( + Integer, ForeignKey("investigativequestionconclusion.id") + ) class AnalysisSession(LabelMixin, StatusMixin, CommentMixin, BaseModel): @@ -724,6 +728,9 @@ class InvestigativeQuestionConclusion(LabelMixin, StatusMixin, CommentMixin, Bas saved_aggregations = relationship( "Aggregation", secondary=questionconclusion_aggregation_association_table ) + analysis = relationship( + "Analysis", backref="investigativequestionconclusion", lazy="select" + ) class InvestigativeQuestion( @@ -807,3 +814,6 @@ class InvestigativeQuestionApproach( search_histories = relationship( "SearchHistory", backref="investigativequestionapproach", lazy="select" ) + analysis = relationship( + "Analysis", backref="investigativequestionapproach", lazy="dynamic" + )