From 2f7783efe8d8130aed960e7757576d777b1f21de Mon Sep 17 00:00:00 2001 From: yaioqin <495529040@qq.com> Date: Tue, 18 Jul 2023 22:28:14 +0800 Subject: [PATCH] figure123 --- .../commands/importer.py | 7 ++- .../timesketch_import_client/importer.py | 15 ++++-- timesketch/api/v1/resources/upload.py | 14 ++--- timesketch/lib/tasks.py | 51 +++++++++++-------- 4 files changed, 54 insertions(+), 33 deletions(-) diff --git a/cli_client/python/timesketch_cli_client/commands/importer.py b/cli_client/python/timesketch_cli_client/commands/importer.py index 2eca7e340a..41e8fa2ab7 100644 --- a/cli_client/python/timesketch_cli_client/commands/importer.py +++ b/cli_client/python/timesketch_cli_client/commands/importer.py @@ -21,15 +21,19 @@ @click.command("import") +@click.option( + '--event-filter', default=None, + help='Optional event filter to pass to psort.') @click.option("--name", help="Name of the timeline.") @click.option("--timeout", type=int, default=600, help="Seconds to wait for indexing.") @click.argument("file_path", type=click.Path(exists=True)) @click.pass_context -def importer(ctx, name, timeout, file_path): +def importer(ctx, event_filter, name, timeout, file_path): """Import timeline. Args: ctx: Click CLI context object. + event_filter: Event filter to pass to psort. name: Name of the timeline to create. timeout: Seconds to wait for indexing. file_path: File path to the file to import. @@ -47,6 +51,7 @@ def importer(ctx, name, timeout, file_path): # TODO: Consider using the whole command as upload context instead # of the file path. streamer.set_upload_context(file_path) + streamer.set_event_filter(event_filter) streamer.add_file(file_path) timeline = streamer.timeline if not timeline: diff --git a/importer_client/python/timesketch_import_client/importer.py b/importer_client/python/timesketch_import_client/importer.py index da9cfe1978..0dc666a6f7 100644 --- a/importer_client/python/timesketch_import_client/importer.py +++ b/importer_client/python/timesketch_import_client/importer.py @@ -62,6 +62,7 @@ def __init__(self): self._data_lines = [] self._data_type = None self._datetime_field = None + self._event_filter = None self._format_string = None self._index = "" self._last_response = None @@ -334,6 +335,7 @@ def _upload_data_frame(self, data_frame, end_stream, retry_count=0): "provider": self._provider, "events": data_frame.to_json(orient="records", lines=True), } + if self._index: data["index_name"] = self._index @@ -397,6 +399,9 @@ def _upload_binary_file(self, file_path): "provider": self._provider, "data_label": self._data_label, } + if self._event_filter: + data['event_filter'] = self._event_filter + if self._index: data["index_name"] = self._index @@ -618,7 +623,7 @@ def add_excel_file(self, filepath, **kwargs): self.add_data_frame(data_frame) def add_file(self, filepath, delimiter=","): - """Add a CSV, JSONL or a PLASO file to the buffer. + """Add a CSV, JSONL or a Plaso storage file to the buffer. Args: filepath: the path to the file to add. @@ -775,6 +780,10 @@ def set_entry_threshold(self, threshold): """Set the threshold for number of entries per chunk.""" self._threshold_entry = threshold + def set_event_filter(self, event_filter): + """Set the event filter to pass to psort.""" + self._event_filter = event_filter + def set_filesize_threshold(self, threshold): """Set the threshold for file size per chunk.""" self._threshold_filesize = threshold @@ -848,14 +857,14 @@ def timeline(self): logger.warning("No timeline ID has been stored as of yet.") return None - timeline_obj = timeline.Timeline( + return timeline.Timeline( timeline_id=self._timeline_id, sketch_id=self._sketch.id, api=self._sketch.api, name=self._timeline_name, searchindex=self._index, ) - return timeline_obj + def __enter__(self): """Make it possible to use "with" statement.""" diff --git a/timesketch/api/v1/resources/upload.py b/timesketch/api/v1/resources/upload.py index 25ba23e511..e90b3f4d67 100644 --- a/timesketch/api/v1/resources/upload.py +++ b/timesketch/api/v1/resources/upload.py @@ -260,6 +260,8 @@ def _upload_and_index( db_session.add(timeline) db_session.commit() + event_filter = form.get('event_filter', None) + sketch_id = sketch.id # Start Celery pipeline for indexing and analysis. # Import here to avoid circular imports. @@ -267,17 +269,15 @@ def _upload_and_index( from timesketch.lib import tasks pipeline = tasks.build_index_pipeline( - file_path=file_path, + event_filter=event_filter, events=events, - timeline_name=timeline_name, - index_name=searchindex.index_name, file_extension=file_extension, - sketch_id=sketch_id, + file_path=file_path, + index_name=searchindex.index_name, only_index=enable_stream, + sketch_id=sketch_id, timeline_id=timeline.id, - headers_mapping=headers_mapping, - delimiter=delimiter, - ) + timeline_name=timeline_name) task_id = uuid.uuid4().hex pipeline.apply_async(task_id=task_id) diff --git a/timesketch/lib/tasks.py b/timesketch/lib/tasks.py index 129805018c..91e6140c51 100644 --- a/timesketch/lib/tasks.py +++ b/timesketch/lib/tasks.py @@ -240,33 +240,35 @@ def _get_index_task_class(file_extension): def build_index_pipeline( - file_path="", - events="", - timeline_name="", - index_name="", - file_extension="", - sketch_id=None, + event_filter=None, + events='', + file_extension='', + file_path='', + index_name='', only_index=False, + sketch_id=None, timeline_id=None, + timeline_name='', headers_mapping=None, delimiter=",", ): """Build a pipeline for index and analysis. Args: - file_path: The full path to a file to upload, either a file_path or - or events need to be defined. + event_filter: Event filter to pass to psort. events: String with the event data, either file_path or events needs to be defined. - timeline_name: Name of the timeline to create. - index_name: Name of the index to index to. file_extension: The file extension of the file. - sketch_id: The ID of the sketch to analyze. + file_path: The full path to a file to upload, either a file_path or + or events need to be defined. + index_name: Name of the index to index to. only_index: If set to true then only indexing tasks are run, not analyzers. This is to be used when uploading data in chunks, we don't want to run the analyzers until all chunks have been uploaded. + sketch_id: The ID of the sketch to analyze. timeline_id: Optional ID of the timeline object this data belongs to. + timeline_name: Name of the timeline to create. headers_mapping: list of dicts containing: (i) target header we want to replace [key=target], (ii) source header we want to insert [key=source], and @@ -275,10 +277,16 @@ def build_index_pipeline( Returns: Celery chain with indexing task (or single indexing task) and analyzer task group. + + Raises: + RuntimeError: if no file path or events were specified. """ if not (file_path or events): raise RuntimeError("Unable to upload data, missing either a file or events.") - index_task_class = _get_index_task_class(file_extension) + + if file_extension not in ('csv', 'jsonl', 'plaso'): + raise KeyError('No task that supports {0:s}'.format(file_extension)) + sketch_analyzer_chain = None searchindex = SearchIndex.query.filter_by(index_name=index_name).first() @@ -542,20 +550,18 @@ def run_sketch_analyzer( @celery.task(track_started=True, base=SqlAlchemyTask) -def run_plaso(file_path, events, timeline_name, index_name, source_type, timeline_id): +def run_plaso(file_path, event_filter, timeline_name, index_name, source_type, timeline_id): """Create a Celery task for processing Plaso storage file. Args: file_path: Path to the plaso file on disk. - events: String with event data, invalid for plaso files. + event_filter: Event filter to pass to psort. timeline_name: Name of the Timesketch timeline. index_name: Name of the datastore index. source_type: Type of file, csv or jsonl. timeline_id: ID of the timeline object this data belongs to. - Raises: - RuntimeError: If the function is called using events, plaso - is not installed or is of unsupported version. + RuntimeError: If Plaso is not installed or is of unsupported version. Returns: Name (str) of the index. """ @@ -564,7 +570,7 @@ def run_plaso(file_path, events, timeline_name, index_name, source_type, timelin ("Plaso isn't installed, " "unable to continue processing plaso files.") ) - plaso_version = int(plaso.__version__) + plaso_version = int(plaso.__version__, 10) if plaso_version <= PLASO_MINIMUM_VERSION: raise RuntimeError( "Plaso version is out of date (version {0:d}, please upgrade to a " @@ -573,9 +579,6 @@ def run_plaso(file_path, events, timeline_name, index_name, source_type, timelin ) ) - if events: - raise RuntimeError("Plaso uploads needs a file, not events.") - mappings = None mappings_file_path = current_app.config.get("PLASO_MAPPING_FILE", "") if os.path.isfile(mappings_file_path): @@ -676,7 +679,6 @@ def run_plaso(file_path, events, timeline_name, index_name, source_type, timelin psort_path, "-o", "opensearch_ts", - file_path, "--server", opensearch_server, "--port", @@ -714,6 +716,11 @@ def run_plaso(file_path, events, timeline_name, index_name, source_type, timelin ) if opensearch_flush_interval: cmd.extend(["--flush_interval", str(opensearch_flush_interval)]) + + cmd.append(file_path) + + if event_filter: + cmd.append(event_filter) # Run psort.py try: